Commit fc2ba5b6cc53045ca3fa22a963864d4649b6fbf8

Authored by Wiebe Cazemier
1 parent 1654508d

Publish retained message count on $SYS

mainapp.cpp
@@ -355,6 +355,8 @@ void MainApp::publishStatsOnDollarTopic() @@ -355,6 +355,8 @@ void MainApp::publishStatsOnDollarTopic()
355 355
356 publishStat("$SYS/broker/load/messages/sent/total", sentMessageCount); 356 publishStat("$SYS/broker/load/messages/sent/total", sentMessageCount);
357 publishStat("$SYS/broker/load/messages/sent/persecond", sentMessageCountPerSecond); 357 publishStat("$SYS/broker/load/messages/sent/persecond", sentMessageCountPerSecond);
  358 +
  359 + publishStat("$SYS/broker/retained messages/count", subscriptionStore->getRetainedMessageCount());
358 } 360 }
359 361
360 void MainApp::publishStat(const std::string &topic, uint64_t n) 362 void MainApp::publishStat(const std::string &topic, uint64_t n)
subscriptionstore.cpp
@@ -402,7 +402,7 @@ void SubscriptionStore::setRetainedMessage(const std::string &topic, const std:: @@ -402,7 +402,7 @@ void SubscriptionStore::setRetainedMessage(const std::string &topic, const std::
402 402
403 if (deepestNode) 403 if (deepestNode)
404 { 404 {
405 - deepestNode->addPayload(topic, payload, qos); 405 + deepestNode->addPayload(topic, payload, qos, retainedMessageCount);
406 } 406 }
407 407
408 locker.unlock(); 408 locker.unlock();
@@ -489,6 +489,11 @@ void SubscriptionStore::removeExpiredSessionsClients(int expireSessionsAfterSeco @@ -489,6 +489,11 @@ void SubscriptionStore::removeExpiredSessionsClients(int expireSessionsAfterSeco
489 root.cleanSubscriptions(); 489 root.cleanSubscriptions();
490 } 490 }
491 491
  492 +int64_t SubscriptionStore::getRetainedMessageCount() const
  493 +{
  494 + return retainedMessageCount;
  495 +}
  496 +
492 // QoS is not used in the comparision. This means you upgrade your QoS by subscribing again. The 497 // QoS is not used in the comparision. This means you upgrade your QoS by subscribing again. The
493 // specs don't specify what to do there. 498 // specs don't specify what to do there.
494 bool Subscription::operator==(const Subscription &rhs) const 499 bool Subscription::operator==(const Subscription &rhs) const
@@ -515,8 +520,9 @@ bool Subscription::sessionGone() const @@ -515,8 +520,9 @@ bool Subscription::sessionGone() const
515 return session.expired(); 520 return session.expired();
516 } 521 }
517 522
518 -void RetainedMessageNode::addPayload(const std::string &topic, const std::string &payload, char qos) 523 +void RetainedMessageNode::addPayload(const std::string &topic, const std::string &payload, char qos, int64_t &totalCount)
519 { 524 {
  525 + const int64_t countBefore = retainedMessages.size();
520 RetainedMessage rm(topic, payload, qos); 526 RetainedMessage rm(topic, payload, qos);
521 527
522 auto retained_ptr = retainedMessages.find(rm); 528 auto retained_ptr = retainedMessages.find(rm);
@@ -528,6 +534,8 @@ void RetainedMessageNode::addPayload(const std::string &topic, const std::string @@ -528,6 +534,8 @@ void RetainedMessageNode::addPayload(const std::string &topic, const std::string
528 if (retained_found && payload.empty()) 534 if (retained_found && payload.empty())
529 { 535 {
530 retainedMessages.erase(rm); 536 retainedMessages.erase(rm);
  537 + const int64_t diffCount = (retainedMessages.size() - countBefore);
  538 + totalCount += diffCount;
531 return; 539 return;
532 } 540 }
533 541
@@ -535,6 +543,8 @@ void RetainedMessageNode::addPayload(const std::string &topic, const std::string @@ -535,6 +543,8 @@ void RetainedMessageNode::addPayload(const std::string &topic, const std::string
535 retainedMessages.erase(rm); 543 retainedMessages.erase(rm);
536 544
537 retainedMessages.insert(std::move(rm)); 545 retainedMessages.insert(std::move(rm));
  546 + const int64_t diffCount = (retainedMessages.size() - countBefore);
  547 + totalCount += diffCount;
538 } 548 }
539 549
540 /** 550 /**
subscriptionstore.h
@@ -71,7 +71,7 @@ class RetainedMessageNode @@ -71,7 +71,7 @@ class RetainedMessageNode
71 std::unordered_map<std::string, std::unique_ptr<RetainedMessageNode>> children; 71 std::unordered_map<std::string, std::unique_ptr<RetainedMessageNode>> children;
72 std::unordered_set<RetainedMessage> retainedMessages; 72 std::unordered_set<RetainedMessage> retainedMessages;
73 73
74 - void addPayload(const std::string &topic, const std::string &payload, char qos); 74 + void addPayload(const std::string &topic, const std::string &payload, char qos, int64_t &totalCount);
75 RetainedMessageNode *getChildren(const std::string &subtopic) const; 75 RetainedMessageNode *getChildren(const std::string &subtopic) const;
76 }; 76 };
77 77
@@ -86,6 +86,7 @@ class SubscriptionStore @@ -86,6 +86,7 @@ class SubscriptionStore
86 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; 86 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER;
87 RetainedMessageNode retainedMessagesRoot; 87 RetainedMessageNode retainedMessagesRoot;
88 RetainedMessageNode retainedMessagesRootDollar; 88 RetainedMessageNode retainedMessagesRootDollar;
  89 + int64_t retainedMessageCount = 0;
89 90
90 Logger *logger = Logger::getInstance(); 91 Logger *logger = Logger::getInstance();
91 92
@@ -111,6 +112,8 @@ public: @@ -111,6 +112,8 @@ public:
111 void setRetainedMessage(const std::string &topic, const std::vector<std::string> &subtopics, const std::string &payload, char qos); 112 void setRetainedMessage(const std::string &topic, const std::vector<std::string> &subtopics, const std::string &payload, char qos);
112 113
113 void removeExpiredSessionsClients(int expireSessionsAfterSeconds); 114 void removeExpiredSessionsClients(int expireSessionsAfterSeconds);
  115 +
  116 + int64_t getRetainedMessageCount() const;
114 }; 117 };
115 118
116 #endif // SUBSCRIPTIONSTORE_H 119 #endif // SUBSCRIPTIONSTORE_H