Commit 515f796f527d81cfdab0e176cfb26a9857d2399a

Authored by Wiebe Cazemier
1 parent 3e13b436

Send all non-delayed wills on exit

This includes some logic to have threads finish their work before
quitting.
client.cpp
@@ -394,6 +394,19 @@ uint32_t Client::getMaxIncomingPacketSize() const @@ -394,6 +394,19 @@ uint32_t Client::getMaxIncomingPacketSize() const
394 return this->maxIncomingPacketSize; 394 return this->maxIncomingPacketSize;
395 } 395 }
396 396
  397 +void Client::sendOrQueueWill()
  398 +{
  399 + if (!this->threadData)
  400 + return;
  401 +
  402 + if (!this->willPublish)
  403 + return;
  404 +
  405 + std::shared_ptr<SubscriptionStore> &store = this->threadData->getSubscriptionStore();
  406 + store->queueWillMessage(willPublish, session);
  407 + this->willPublish.reset();
  408 +}
  409 +
397 #ifndef NDEBUG 410 #ifndef NDEBUG
398 /** 411 /**
399 * @brief IoWrapper::setFakeUpgraded(). 412 * @brief IoWrapper::setFakeUpgraded().
client.h
@@ -149,6 +149,8 @@ public: @@ -149,6 +149,8 @@ public:
149 149
150 uint32_t getMaxIncomingPacketSize() const; 150 uint32_t getMaxIncomingPacketSize() const;
151 151
  152 + void sendOrQueueWill();
  153 +
152 #ifndef NDEBUG 154 #ifndef NDEBUG
153 void setFakeUpgraded(); 155 void setFakeUpgraded();
154 #endif 156 #endif
mainapp.cpp
@@ -285,6 +285,14 @@ void MainApp::queueRemoveExpiredSessions() @@ -285,6 +285,14 @@ void MainApp::queueRemoveExpiredSessions()
285 } 285 }
286 } 286 }
287 287
  288 +void MainApp::waitForAllThreadsQueuedWills()
  289 +{
  290 + while(std::any_of(threads.begin(), threads.end(), [](std::shared_ptr<ThreadData> t){ return !t->allWilssSentForExit; }))
  291 + {
  292 + usleep(1000);
  293 + }
  294 +}
  295 +
288 void MainApp::saveState() 296 void MainApp::saveState()
289 { 297 {
290 std::lock_guard<std::mutex> lg(saveStateMutex); 298 std::lock_guard<std::mutex> lg(saveStateMutex);
@@ -607,6 +615,14 @@ void MainApp::start() @@ -607,6 +615,14 @@ void MainApp::start()
607 } 615 }
608 } 616 }
609 617
  618 + logger->logf(LOG_DEBUG, "Having all client in all threads send or queue their will.");
  619 + for(std::shared_ptr<ThreadData> &thread : threads)
  620 + {
  621 + thread->queueSendAllWills();
  622 + }
  623 +
  624 + waitForAllThreadsQueuedWills();
  625 +
610 oneInstanceLock.unlock(); 626 oneInstanceLock.unlock();
611 627
612 logger->logf(LOG_DEBUG, "Signaling threads to finish."); 628 logger->logf(LOG_DEBUG, "Signaling threads to finish.");
mainapp.h
@@ -93,6 +93,7 @@ class MainApp @@ -93,6 +93,7 @@ class MainApp
93 void saveStateInThread(); 93 void saveStateInThread();
94 void queueSendQueuedWills(); 94 void queueSendQueuedWills();
95 void queueRemoveExpiredSessions(); 95 void queueRemoveExpiredSessions();
  96 + void waitForAllThreadsQueuedWills();
96 97
97 MainApp(const std::string &configFilePath); 98 MainApp(const std::string &configFilePath);
98 public: 99 public:
threaddata.cpp
@@ -162,6 +162,18 @@ void ThreadData::removeExpiredSessions() @@ -162,6 +162,18 @@ void ThreadData::removeExpiredSessions()
162 subscriptionStore->removeExpiredSessionsClients(); 162 subscriptionStore->removeExpiredSessionsClients();
163 } 163 }
164 164
  165 +void ThreadData::sendAllWils()
  166 +{
  167 + std::lock_guard<std::mutex> lck(clients_by_fd_mutex);
  168 +
  169 + for(auto pairs : clients_by_fd)
  170 + {
  171 + pairs.second->sendOrQueueWill();
  172 + }
  173 +
  174 + allWilssSentForExit = true;
  175 +}
  176 +
165 void ThreadData::removeQueuedClients() 177 void ThreadData::removeQueuedClients()
166 { 178 {
167 std::vector<int> fds; 179 std::vector<int> fds;
@@ -389,6 +401,16 @@ void ThreadData::authPluginPeriodicEvent() @@ -389,6 +401,16 @@ void ThreadData::authPluginPeriodicEvent()
389 authentication.periodicEvent(); 401 authentication.periodicEvent();
390 } 402 }
391 403
  404 +void ThreadData::queueSendAllWills()
  405 +{
  406 + std::lock_guard<std::mutex> locker(taskQueueMutex);
  407 +
  408 + auto f = std::bind(&ThreadData::sendAllWils, this);
  409 + taskQueue.push_front(f);
  410 +
  411 + wakeUpThread();
  412 +}
  413 +
392 // TODO: profile how fast hash iteration is. Perhaps having a second list/vector is beneficial? 414 // TODO: profile how fast hash iteration is. Perhaps having a second list/vector is beneficial?
393 void ThreadData::doKeepAliveCheck() 415 void ThreadData::doKeepAliveCheck()
394 { 416 {
threaddata.h
@@ -66,6 +66,7 @@ class ThreadData @@ -66,6 +66,7 @@ class ThreadData
66 void publishStat(const std::string &topic, uint64_t n); 66 void publishStat(const std::string &topic, uint64_t n);
67 void sendQueuedWills(); 67 void sendQueuedWills();
68 void removeExpiredSessions(); 68 void removeExpiredSessions();
  69 + void sendAllWils();
69 70
70 void removeQueuedClients(); 71 void removeQueuedClients();
71 72
@@ -74,6 +75,7 @@ public: @@ -74,6 +75,7 @@ public:
74 Authentication authentication; 75 Authentication authentication;
75 bool running = true; 76 bool running = true;
76 bool finished = false; 77 bool finished = false;
  78 + bool allWilssSentForExit = false;
77 std::thread thread; 79 std::thread thread;
78 int threadnr = 0; 80 int threadnr = 0;
79 int epollfd = 0; 81 int epollfd = 0;
@@ -117,6 +119,8 @@ public: @@ -117,6 +119,8 @@ public:
117 119
118 void queueAuthPluginPeriodicEvent(); 120 void queueAuthPluginPeriodicEvent();
119 void authPluginPeriodicEvent(); 121 void authPluginPeriodicEvent();
  122 +
  123 + void queueSendAllWills();
120 }; 124 };
121 125
122 #endif // THREADDATA_H 126 #endif // THREADDATA_H