From fc2ba5b6cc53045ca3fa22a963864d4649b6fbf8 Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Tue, 15 Jun 2021 22:42:03 +0200 Subject: [PATCH] Publish retained message count on $SYS --- mainapp.cpp | 2 ++ subscriptionstore.cpp | 14 ++++++++++++-- subscriptionstore.h | 5 ++++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/mainapp.cpp b/mainapp.cpp index 348b53e..de05ac7 100644 --- a/mainapp.cpp +++ b/mainapp.cpp @@ -355,6 +355,8 @@ void MainApp::publishStatsOnDollarTopic() publishStat("$SYS/broker/load/messages/sent/total", sentMessageCount); publishStat("$SYS/broker/load/messages/sent/persecond", sentMessageCountPerSecond); + + publishStat("$SYS/broker/retained messages/count", subscriptionStore->getRetainedMessageCount()); } void MainApp::publishStat(const std::string &topic, uint64_t n) diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index aa021f8..e216228 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -402,7 +402,7 @@ void SubscriptionStore::setRetainedMessage(const std::string &topic, const std:: if (deepestNode) { - deepestNode->addPayload(topic, payload, qos); + deepestNode->addPayload(topic, payload, qos, retainedMessageCount); } locker.unlock(); @@ -489,6 +489,11 @@ void SubscriptionStore::removeExpiredSessionsClients(int expireSessionsAfterSeco root.cleanSubscriptions(); } +int64_t SubscriptionStore::getRetainedMessageCount() const +{ + return retainedMessageCount; +} + // QoS is not used in the comparision. This means you upgrade your QoS by subscribing again. The // specs don't specify what to do there. bool Subscription::operator==(const Subscription &rhs) const @@ -515,8 +520,9 @@ bool Subscription::sessionGone() const return session.expired(); } -void RetainedMessageNode::addPayload(const std::string &topic, const std::string &payload, char qos) +void RetainedMessageNode::addPayload(const std::string &topic, const std::string &payload, char qos, int64_t &totalCount) { + const int64_t countBefore = retainedMessages.size(); RetainedMessage rm(topic, payload, qos); auto retained_ptr = retainedMessages.find(rm); @@ -528,6 +534,8 @@ void RetainedMessageNode::addPayload(const std::string &topic, const std::string if (retained_found && payload.empty()) { retainedMessages.erase(rm); + const int64_t diffCount = (retainedMessages.size() - countBefore); + totalCount += diffCount; return; } @@ -535,6 +543,8 @@ void RetainedMessageNode::addPayload(const std::string &topic, const std::string retainedMessages.erase(rm); retainedMessages.insert(std::move(rm)); + const int64_t diffCount = (retainedMessages.size() - countBefore); + totalCount += diffCount; } /** diff --git a/subscriptionstore.h b/subscriptionstore.h index 31e6b81..35416b4 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -71,7 +71,7 @@ class RetainedMessageNode std::unordered_map> children; std::unordered_set retainedMessages; - void addPayload(const std::string &topic, const std::string &payload, char qos); + void addPayload(const std::string &topic, const std::string &payload, char qos, int64_t &totalCount); RetainedMessageNode *getChildren(const std::string &subtopic) const; }; @@ -86,6 +86,7 @@ class SubscriptionStore pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; RetainedMessageNode retainedMessagesRoot; RetainedMessageNode retainedMessagesRootDollar; + int64_t retainedMessageCount = 0; Logger *logger = Logger::getInstance(); @@ -111,6 +112,8 @@ public: void setRetainedMessage(const std::string &topic, const std::vector &subtopics, const std::string &payload, char qos); void removeExpiredSessionsClients(int expireSessionsAfterSeconds); + + int64_t getRetainedMessageCount() const; }; #endif // SUBSCRIPTIONSTORE_H -- libgit2 0.21.4