Commit 99354574ce7fa38e42045a5f418fb5a85b9664d1

Authored by Wiebe Cazemier
1 parent c20ba7c5

Publish $SYS topics in a thread, because authentication is thread local

It was a mistake that it ran inside the timer thread, which came to
light by asserting the thread local authentication pointer (see a few
commits ago).
mainapp.cpp
@@ -195,9 +195,8 @@ MainApp::MainApp(const std::string &configFilePath) : @@ -195,9 +195,8 @@ MainApp::MainApp(const std::string &configFilePath) :
195 auto fPasswordFileReload = std::bind(&MainApp::queuePasswordFileReloadAllThreads, this); 195 auto fPasswordFileReload = std::bind(&MainApp::queuePasswordFileReloadAllThreads, this);
196 timer.addCallback(fPasswordFileReload, 2000, "Password file reload."); 196 timer.addCallback(fPasswordFileReload, 2000, "Password file reload.");
197 197
198 - auto fPublishStats = std::bind(&MainApp::publishStatsOnDollarTopic, this); 198 + auto fPublishStats = std::bind(&MainApp::queuePublishStatsOnDollarTopic, this);
199 timer.addCallback(fPublishStats, 10000, "Publish stats on $SYS"); 199 timer.addCallback(fPublishStats, 10000, "Publish stats on $SYS");
200 - publishStatsOnDollarTopic();  
201 200
202 if (settings->authPluginTimerPeriod > 0) 201 if (settings->authPluginTimerPeriod > 0)
203 { 202 {
@@ -340,44 +339,20 @@ void MainApp::setFuzzFile(const std::string &fuzzFilePath) @@ -340,44 +339,20 @@ void MainApp::setFuzzFile(const std::string &fuzzFilePath)
340 this->fuzzFilePath = fuzzFilePath; 339 this->fuzzFilePath = fuzzFilePath;
341 } 340 }
342 341
343 -void MainApp::publishStatsOnDollarTopic() 342 +/**
  343 + * @brief MainApp::queuePublishStatsOnDollarTopic publishes the dollar topics, on a thread that has thread local authentication.
  344 + */
  345 +void MainApp::queuePublishStatsOnDollarTopic()
344 { 346 {
345 - uint nrOfClients = 0;  
346 - uint64_t receivedMessageCountPerSecond = 0;  
347 - uint64_t receivedMessageCount = 0;  
348 - uint64_t sentMessageCountPerSecond = 0;  
349 - uint64_t sentMessageCount = 0; 347 + std::lock_guard<std::mutex> locker(eventMutex);
350 348
351 - for (std::shared_ptr<ThreadData> &thread : threads) 349 + if (!threads.empty())
352 { 350 {
353 - nrOfClients += thread->getNrOfClients(); 351 + auto f = std::bind(&ThreadData::queuePublishStatsOnDollarTopic, threads.front().get(), threads);
  352 + taskQueue.push_front(f);
354 353
355 - receivedMessageCountPerSecond += thread->getReceivedMessagePerSecond();  
356 - receivedMessageCount += thread->getReceivedMessageCount();  
357 -  
358 - sentMessageCountPerSecond += thread->getSentMessagePerSecond();  
359 - sentMessageCount += thread->getSentMessageCount(); 354 + wakeUpThread();
360 } 355 }
361 -  
362 - publishStat("$SYS/broker/clients/total", nrOfClients);  
363 -  
364 - publishStat("$SYS/broker/load/messages/received/total", receivedMessageCount);  
365 - publishStat("$SYS/broker/load/messages/received/persecond", receivedMessageCountPerSecond);  
366 -  
367 - publishStat("$SYS/broker/load/messages/sent/total", sentMessageCount);  
368 - publishStat("$SYS/broker/load/messages/sent/persecond", sentMessageCountPerSecond);  
369 -  
370 - publishStat("$SYS/broker/retained messages/count", subscriptionStore->getRetainedMessageCount());  
371 -}  
372 -  
373 -void MainApp::publishStat(const std::string &topic, uint64_t n)  
374 -{  
375 - std::vector<std::string> subtopics;  
376 - splitTopic(topic, subtopics);  
377 - const std::string payload = std::to_string(n);  
378 - Publish p(topic, payload, 0);  
379 - subscriptionStore->queuePacketAtSubscribers(subtopics, p, true);  
380 - subscriptionStore->setRetainedMessage(topic, subtopics, payload, 0);  
381 } 356 }
382 357
383 void MainApp::saveState() 358 void MainApp::saveState()
@@ -604,6 +579,9 @@ void MainApp::start() @@ -604,6 +579,9 @@ void MainApp::start()
604 threads.push_back(t); 579 threads.push_back(t);
605 } 580 }
606 581
  582 + // Populate the $SYS topics, otherwise you have to wait until the timer expires.
  583 + threads.front()->queuePublishStatsOnDollarTopic(threads);
  584 +
607 uint next_thread_index = 0; 585 uint next_thread_index = 0;
608 586
609 struct epoll_event events[MAX_EVENTS]; 587 struct epoll_event events[MAX_EVENTS];
mainapp.h
@@ -82,8 +82,7 @@ class MainApp @@ -82,8 +82,7 @@ class MainApp
82 void queuePasswordFileReloadAllThreads(); 82 void queuePasswordFileReloadAllThreads();
83 void queueAuthPluginPeriodicEventAllThreads(); 83 void queueAuthPluginPeriodicEventAllThreads();
84 void setFuzzFile(const std::string &fuzzFilePath); 84 void setFuzzFile(const std::string &fuzzFilePath);
85 - void publishStatsOnDollarTopic();  
86 - void publishStat(const std::string &topic, uint64_t n); 85 + void queuePublishStatsOnDollarTopic();
87 void saveState(); 86 void saveState();
88 87
89 MainApp(const std::string &configFilePath); 88 MainApp(const std::string &configFilePath);
threaddata.cpp
@@ -73,6 +73,62 @@ void ThreadData::quit() @@ -73,6 +73,62 @@ void ThreadData::quit()
73 running = false; 73 running = false;
74 } 74 }
75 75
  76 +/**
  77 + * @brief ThreadData::queuePublishStatsOnDollarTopic makes this thread publish the $SYS topics.
  78 + * @param threads
  79 + *
  80 + * We want to do that in a thread because all authentication state is thread local.
  81 + */
  82 +void ThreadData::queuePublishStatsOnDollarTopic(std::vector<std::shared_ptr<ThreadData>> &threads)
  83 +{
  84 + std::lock_guard<std::mutex> locker(taskQueueMutex);
  85 +
  86 + auto f = std::bind(&ThreadData::publishStatsOnDollarTopic, this, threads);
  87 + taskQueue.push_front(f);
  88 +
  89 + wakeUpThread();
  90 +}
  91 +
  92 +void ThreadData::publishStatsOnDollarTopic(std::vector<std::shared_ptr<ThreadData>> &threads)
  93 +{
  94 + uint nrOfClients = 0;
  95 + uint64_t receivedMessageCountPerSecond = 0;
  96 + uint64_t receivedMessageCount = 0;
  97 + uint64_t sentMessageCountPerSecond = 0;
  98 + uint64_t sentMessageCount = 0;
  99 +
  100 + for (const std::shared_ptr<ThreadData> &thread : threads)
  101 + {
  102 + nrOfClients += thread->getNrOfClients();
  103 +
  104 + receivedMessageCountPerSecond += thread->getReceivedMessagePerSecond();
  105 + receivedMessageCount += thread->getReceivedMessageCount();
  106 +
  107 + sentMessageCountPerSecond += thread->getSentMessagePerSecond();
  108 + sentMessageCount += thread->getSentMessageCount();
  109 + }
  110 +
  111 + publishStat("$SYS/broker/clients/total", nrOfClients);
  112 +
  113 + publishStat("$SYS/broker/load/messages/received/total", receivedMessageCount);
  114 + publishStat("$SYS/broker/load/messages/received/persecond", receivedMessageCountPerSecond);
  115 +
  116 + publishStat("$SYS/broker/load/messages/sent/total", sentMessageCount);
  117 + publishStat("$SYS/broker/load/messages/sent/persecond", sentMessageCountPerSecond);
  118 +
  119 + publishStat("$SYS/broker/retained messages/count", subscriptionStore->getRetainedMessageCount());
  120 +}
  121 +
  122 +void ThreadData::publishStat(const std::string &topic, uint64_t n)
  123 +{
  124 + std::vector<std::string> subtopics;
  125 + splitTopic(topic, subtopics);
  126 + const std::string payload = std::to_string(n);
  127 + Publish p(topic, payload, 0);
  128 + subscriptionStore->queuePacketAtSubscribers(subtopics, p, true);
  129 + subscriptionStore->setRetainedMessage(topic, subtopics, payload, 0);
  130 +}
  131 +
76 void ThreadData::giveClient(std::shared_ptr<Client> client) 132 void ThreadData::giveClient(std::shared_ptr<Client> client)
77 { 133 {
78 clients_by_fd_mutex.lock(); 134 clients_by_fd_mutex.lock();
threaddata.h
@@ -61,6 +61,8 @@ class ThreadData @@ -61,6 +61,8 @@ class ThreadData
61 void wakeUpThread(); 61 void wakeUpThread();
62 void doKeepAliveCheck(); 62 void doKeepAliveCheck();
63 void quit(); 63 void quit();
  64 + void publishStatsOnDollarTopic(std::vector<std::shared_ptr<ThreadData>> &threads);
  65 + void publishStat(const std::string &topic, uint64_t n);
64 66
65 public: 67 public:
66 Settings settingsLocalCopy; // Is updated on reload, within the thread loop. 68 Settings settingsLocalCopy; // Is updated on reload, within the thread loop.
@@ -91,6 +93,7 @@ public: @@ -91,6 +93,7 @@ public:
91 void queueQuit(); 93 void queueQuit();
92 void waitForQuit(); 94 void waitForQuit();
93 void queuePasswdFileReload(); 95 void queuePasswdFileReload();
  96 + void queuePublishStatsOnDollarTopic(std::vector<std::shared_ptr<ThreadData>> &threads);
94 97
95 int getNrOfClients() const; 98 int getNrOfClients() const;
96 99