diff --git a/mainapp.cpp b/mainapp.cpp index be14250..19c8b52 100644 --- a/mainapp.cpp +++ b/mainapp.cpp @@ -195,9 +195,8 @@ MainApp::MainApp(const std::string &configFilePath) : auto fPasswordFileReload = std::bind(&MainApp::queuePasswordFileReloadAllThreads, this); timer.addCallback(fPasswordFileReload, 2000, "Password file reload."); - auto fPublishStats = std::bind(&MainApp::publishStatsOnDollarTopic, this); + auto fPublishStats = std::bind(&MainApp::queuePublishStatsOnDollarTopic, this); timer.addCallback(fPublishStats, 10000, "Publish stats on $SYS"); - publishStatsOnDollarTopic(); if (settings->authPluginTimerPeriod > 0) { @@ -340,44 +339,20 @@ void MainApp::setFuzzFile(const std::string &fuzzFilePath) this->fuzzFilePath = fuzzFilePath; } -void MainApp::publishStatsOnDollarTopic() +/** + * @brief MainApp::queuePublishStatsOnDollarTopic publishes the dollar topics, on a thread that has thread local authentication. + */ +void MainApp::queuePublishStatsOnDollarTopic() { - uint nrOfClients = 0; - uint64_t receivedMessageCountPerSecond = 0; - uint64_t receivedMessageCount = 0; - uint64_t sentMessageCountPerSecond = 0; - uint64_t sentMessageCount = 0; + std::lock_guard locker(eventMutex); - for (std::shared_ptr &thread : threads) + if (!threads.empty()) { - nrOfClients += thread->getNrOfClients(); + auto f = std::bind(&ThreadData::queuePublishStatsOnDollarTopic, threads.front().get(), threads); + taskQueue.push_front(f); - receivedMessageCountPerSecond += thread->getReceivedMessagePerSecond(); - receivedMessageCount += thread->getReceivedMessageCount(); - - sentMessageCountPerSecond += thread->getSentMessagePerSecond(); - sentMessageCount += thread->getSentMessageCount(); + wakeUpThread(); } - - publishStat("$SYS/broker/clients/total", nrOfClients); - - publishStat("$SYS/broker/load/messages/received/total", receivedMessageCount); - publishStat("$SYS/broker/load/messages/received/persecond", receivedMessageCountPerSecond); - - publishStat("$SYS/broker/load/messages/sent/total", sentMessageCount); - publishStat("$SYS/broker/load/messages/sent/persecond", sentMessageCountPerSecond); - - publishStat("$SYS/broker/retained messages/count", subscriptionStore->getRetainedMessageCount()); -} - -void MainApp::publishStat(const std::string &topic, uint64_t n) -{ - std::vector subtopics; - splitTopic(topic, subtopics); - const std::string payload = std::to_string(n); - Publish p(topic, payload, 0); - subscriptionStore->queuePacketAtSubscribers(subtopics, p, true); - subscriptionStore->setRetainedMessage(topic, subtopics, payload, 0); } void MainApp::saveState() @@ -604,6 +579,9 @@ void MainApp::start() threads.push_back(t); } + // Populate the $SYS topics, otherwise you have to wait until the timer expires. + threads.front()->queuePublishStatsOnDollarTopic(threads); + uint next_thread_index = 0; struct epoll_event events[MAX_EVENTS]; diff --git a/mainapp.h b/mainapp.h index 7a31509..6f4ae62 100644 --- a/mainapp.h +++ b/mainapp.h @@ -82,8 +82,7 @@ class MainApp void queuePasswordFileReloadAllThreads(); void queueAuthPluginPeriodicEventAllThreads(); void setFuzzFile(const std::string &fuzzFilePath); - void publishStatsOnDollarTopic(); - void publishStat(const std::string &topic, uint64_t n); + void queuePublishStatsOnDollarTopic(); void saveState(); MainApp(const std::string &configFilePath); diff --git a/threaddata.cpp b/threaddata.cpp index 6bc1891..db615de 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -73,6 +73,62 @@ void ThreadData::quit() running = false; } +/** + * @brief ThreadData::queuePublishStatsOnDollarTopic makes this thread publish the $SYS topics. + * @param threads + * + * We want to do that in a thread because all authentication state is thread local. + */ +void ThreadData::queuePublishStatsOnDollarTopic(std::vector> &threads) +{ + std::lock_guard locker(taskQueueMutex); + + auto f = std::bind(&ThreadData::publishStatsOnDollarTopic, this, threads); + taskQueue.push_front(f); + + wakeUpThread(); +} + +void ThreadData::publishStatsOnDollarTopic(std::vector> &threads) +{ + uint nrOfClients = 0; + uint64_t receivedMessageCountPerSecond = 0; + uint64_t receivedMessageCount = 0; + uint64_t sentMessageCountPerSecond = 0; + uint64_t sentMessageCount = 0; + + for (const std::shared_ptr &thread : threads) + { + nrOfClients += thread->getNrOfClients(); + + receivedMessageCountPerSecond += thread->getReceivedMessagePerSecond(); + receivedMessageCount += thread->getReceivedMessageCount(); + + sentMessageCountPerSecond += thread->getSentMessagePerSecond(); + sentMessageCount += thread->getSentMessageCount(); + } + + publishStat("$SYS/broker/clients/total", nrOfClients); + + publishStat("$SYS/broker/load/messages/received/total", receivedMessageCount); + publishStat("$SYS/broker/load/messages/received/persecond", receivedMessageCountPerSecond); + + publishStat("$SYS/broker/load/messages/sent/total", sentMessageCount); + publishStat("$SYS/broker/load/messages/sent/persecond", sentMessageCountPerSecond); + + publishStat("$SYS/broker/retained messages/count", subscriptionStore->getRetainedMessageCount()); +} + +void ThreadData::publishStat(const std::string &topic, uint64_t n) +{ + std::vector subtopics; + splitTopic(topic, subtopics); + const std::string payload = std::to_string(n); + Publish p(topic, payload, 0); + subscriptionStore->queuePacketAtSubscribers(subtopics, p, true); + subscriptionStore->setRetainedMessage(topic, subtopics, payload, 0); +} + void ThreadData::giveClient(std::shared_ptr client) { clients_by_fd_mutex.lock(); diff --git a/threaddata.h b/threaddata.h index 0e09b2a..9e75ae5 100644 --- a/threaddata.h +++ b/threaddata.h @@ -61,6 +61,8 @@ class ThreadData void wakeUpThread(); void doKeepAliveCheck(); void quit(); + void publishStatsOnDollarTopic(std::vector> &threads); + void publishStat(const std::string &topic, uint64_t n); public: Settings settingsLocalCopy; // Is updated on reload, within the thread loop. @@ -91,6 +93,7 @@ public: void queueQuit(); void waitForQuit(); void queuePasswdFileReload(); + void queuePublishStatsOnDollarTopic(std::vector> &threads); int getNrOfClients() const;