Commit 81fcc49d60a85d8aa89c426629e39113ce63a4ba
1 parent
268b5874
Add subscription count in $SYS
Showing
3 changed files
with
46 additions
and
0 deletions
subscriptionstore.cpp
| @@ -569,6 +569,19 @@ uint64_t SubscriptionStore::getSessionCount() const | @@ -569,6 +569,19 @@ uint64_t SubscriptionStore::getSessionCount() const | ||
| 569 | return sessionsByIdConst.size(); | 569 | return sessionsByIdConst.size(); |
| 570 | } | 570 | } |
| 571 | 571 | ||
| 572 | +int64_t SubscriptionStore::getSubscriptionCount() | ||
| 573 | +{ | ||
| 574 | + int64_t count = 0; | ||
| 575 | + | ||
| 576 | + RWLockGuard lock_guard(&subscriptionsRwlock); | ||
| 577 | + lock_guard.rdlock(); | ||
| 578 | + | ||
| 579 | + countSubscriptions(&root, count); | ||
| 580 | + countSubscriptions(&rootDollar, count); | ||
| 581 | + | ||
| 582 | + return count; | ||
| 583 | +} | ||
| 584 | + | ||
| 572 | void SubscriptionStore::getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList) const | 585 | void SubscriptionStore::getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList) const |
| 573 | { | 586 | { |
| 574 | for(const RetainedMessage &rm : this_node->retainedMessages) | 587 | for(const RetainedMessage &rm : this_node->retainedMessages) |
| @@ -624,6 +637,35 @@ void SubscriptionStore::getSubscriptions(SubscriptionNode *this_node, const std: | @@ -624,6 +637,35 @@ void SubscriptionStore::getSubscriptions(SubscriptionNode *this_node, const std: | ||
| 624 | } | 637 | } |
| 625 | } | 638 | } |
| 626 | 639 | ||
| 640 | +void SubscriptionStore::countSubscriptions(SubscriptionNode *this_node, int64_t &count) const | ||
| 641 | +{ | ||
| 642 | + for (auto &pair : this_node->getSubscribers()) | ||
| 643 | + { | ||
| 644 | + const Subscription &node = pair.second; | ||
| 645 | + std::shared_ptr<Session> ses = node.session.lock(); | ||
| 646 | + if (ses) | ||
| 647 | + { | ||
| 648 | + count++; | ||
| 649 | + } | ||
| 650 | + } | ||
| 651 | + | ||
| 652 | + for (auto &pair : this_node->children) | ||
| 653 | + { | ||
| 654 | + SubscriptionNode *node = pair.second.get(); | ||
| 655 | + countSubscriptions(node, count); | ||
| 656 | + } | ||
| 657 | + | ||
| 658 | + if (this_node->childrenPlus) | ||
| 659 | + { | ||
| 660 | + countSubscriptions(this_node->childrenPlus.get(), count); | ||
| 661 | + } | ||
| 662 | + | ||
| 663 | + if (this_node->childrenPound) | ||
| 664 | + { | ||
| 665 | + countSubscriptions(this_node->childrenPound.get(), count); | ||
| 666 | + } | ||
| 667 | +} | ||
| 668 | + | ||
| 627 | void SubscriptionStore::saveRetainedMessages(const std::string &filePath) | 669 | void SubscriptionStore::saveRetainedMessages(const std::string &filePath) |
| 628 | { | 670 | { |
| 629 | logger->logf(LOG_INFO, "Saving retained messages to '%s'", filePath.c_str()); | 671 | logger->logf(LOG_INFO, "Saving retained messages to '%s'", filePath.c_str()); |
subscriptionstore.h
| @@ -110,6 +110,7 @@ class SubscriptionStore | @@ -110,6 +110,7 @@ class SubscriptionStore | ||
| 110 | void getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList) const; | 110 | void getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList) const; |
| 111 | void getSubscriptions(SubscriptionNode *this_node, const std::string &composedTopic, bool root, | 111 | void getSubscriptions(SubscriptionNode *this_node, const std::string &composedTopic, bool root, |
| 112 | std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &outputList) const; | 112 | std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &outputList) const; |
| 113 | + void countSubscriptions(SubscriptionNode *this_node, int64_t &count) const; | ||
| 113 | 114 | ||
| 114 | SubscriptionNode *getDeepestNode(const std::string &topic, const std::vector<std::string> &subtopics); | 115 | SubscriptionNode *getDeepestNode(const std::string &topic, const std::vector<std::string> &subtopics); |
| 115 | public: | 116 | public: |
| @@ -132,6 +133,7 @@ public: | @@ -132,6 +133,7 @@ public: | ||
| 132 | 133 | ||
| 133 | int64_t getRetainedMessageCount() const; | 134 | int64_t getRetainedMessageCount() const; |
| 134 | uint64_t getSessionCount() const; | 135 | uint64_t getSessionCount() const; |
| 136 | + int64_t getSubscriptionCount(); | ||
| 135 | 137 | ||
| 136 | void saveRetainedMessages(const std::string &filePath); | 138 | void saveRetainedMessages(const std::string &filePath); |
| 137 | void loadRetainedMessages(const std::string &filePath); | 139 | void loadRetainedMessages(const std::string &filePath); |
threaddata.cpp
| @@ -119,6 +119,8 @@ void ThreadData::publishStatsOnDollarTopic(std::vector<std::shared_ptr<ThreadDat | @@ -119,6 +119,8 @@ void ThreadData::publishStatsOnDollarTopic(std::vector<std::shared_ptr<ThreadDat | ||
| 119 | publishStat("$SYS/broker/retained messages/count", subscriptionStore->getRetainedMessageCount()); | 119 | publishStat("$SYS/broker/retained messages/count", subscriptionStore->getRetainedMessageCount()); |
| 120 | 120 | ||
| 121 | publishStat("$SYS/broker/sessions/total", subscriptionStore->getSessionCount()); | 121 | publishStat("$SYS/broker/sessions/total", subscriptionStore->getSessionCount()); |
| 122 | + | ||
| 123 | + publishStat("$SYS/broker/subscriptions/count", subscriptionStore->getSubscriptionCount()); | ||
| 122 | } | 124 | } |
| 123 | 125 | ||
| 124 | void ThreadData::publishStat(const std::string &topic, uint64_t n) | 126 | void ThreadData::publishStat(const std::string &topic, uint64_t n) |