From 81fcc49d60a85d8aa89c426629e39113ce63a4ba Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Mon, 14 Feb 2022 20:19:51 +0100 Subject: [PATCH] Add subscription count in $SYS --- subscriptionstore.cpp | 42 ++++++++++++++++++++++++++++++++++++++++++ subscriptionstore.h | 2 ++ threaddata.cpp | 2 ++ 3 files changed, 46 insertions(+), 0 deletions(-) diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index ffb2769..71b48ad 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -569,6 +569,19 @@ uint64_t SubscriptionStore::getSessionCount() const return sessionsByIdConst.size(); } +int64_t SubscriptionStore::getSubscriptionCount() +{ + int64_t count = 0; + + RWLockGuard lock_guard(&subscriptionsRwlock); + lock_guard.rdlock(); + + countSubscriptions(&root, count); + countSubscriptions(&rootDollar, count); + + return count; +} + void SubscriptionStore::getRetainedMessages(RetainedMessageNode *this_node, std::vector &outputList) const { for(const RetainedMessage &rm : this_node->retainedMessages) @@ -624,6 +637,35 @@ void SubscriptionStore::getSubscriptions(SubscriptionNode *this_node, const std: } } +void SubscriptionStore::countSubscriptions(SubscriptionNode *this_node, int64_t &count) const +{ + for (auto &pair : this_node->getSubscribers()) + { + const Subscription &node = pair.second; + std::shared_ptr ses = node.session.lock(); + if (ses) + { + count++; + } + } + + for (auto &pair : this_node->children) + { + SubscriptionNode *node = pair.second.get(); + countSubscriptions(node, count); + } + + if (this_node->childrenPlus) + { + countSubscriptions(this_node->childrenPlus.get(), count); + } + + if (this_node->childrenPound) + { + countSubscriptions(this_node->childrenPound.get(), count); + } +} + void SubscriptionStore::saveRetainedMessages(const std::string &filePath) { logger->logf(LOG_INFO, "Saving retained messages to '%s'", filePath.c_str()); diff --git a/subscriptionstore.h b/subscriptionstore.h index 7923c98..2c2ab2e 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -110,6 +110,7 @@ class SubscriptionStore void getRetainedMessages(RetainedMessageNode *this_node, std::vector &outputList) const; void getSubscriptions(SubscriptionNode *this_node, const std::string &composedTopic, bool root, std::unordered_map> &outputList) const; + void countSubscriptions(SubscriptionNode *this_node, int64_t &count) const; SubscriptionNode *getDeepestNode(const std::string &topic, const std::vector &subtopics); public: @@ -132,6 +133,7 @@ public: int64_t getRetainedMessageCount() const; uint64_t getSessionCount() const; + int64_t getSubscriptionCount(); void saveRetainedMessages(const std::string &filePath); void loadRetainedMessages(const std::string &filePath); diff --git a/threaddata.cpp b/threaddata.cpp index 5a366ca..ebe03aa 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -119,6 +119,8 @@ void ThreadData::publishStatsOnDollarTopic(std::vectorgetRetainedMessageCount()); publishStat("$SYS/broker/sessions/total", subscriptionStore->getSessionCount()); + + publishStat("$SYS/broker/subscriptions/count", subscriptionStore->getSubscriptionCount()); } void ThreadData::publishStat(const std::string &topic, uint64_t n) -- libgit2 0.21.4