From 515f796f527d81cfdab0e176cfb26a9857d2399a Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Thu, 14 Apr 2022 14:07:35 +0200 Subject: [PATCH] Send all non-delayed wills on exit --- client.cpp | 13 +++++++++++++ client.h | 2 ++ mainapp.cpp | 16 ++++++++++++++++ mainapp.h | 1 + threaddata.cpp | 22 ++++++++++++++++++++++ threaddata.h | 4 ++++ 6 files changed, 58 insertions(+), 0 deletions(-) diff --git a/client.cpp b/client.cpp index 53ad438..1d7c6a2 100644 --- a/client.cpp +++ b/client.cpp @@ -394,6 +394,19 @@ uint32_t Client::getMaxIncomingPacketSize() const return this->maxIncomingPacketSize; } +void Client::sendOrQueueWill() +{ + if (!this->threadData) + return; + + if (!this->willPublish) + return; + + std::shared_ptr &store = this->threadData->getSubscriptionStore(); + store->queueWillMessage(willPublish, session); + this->willPublish.reset(); +} + #ifndef NDEBUG /** * @brief IoWrapper::setFakeUpgraded(). diff --git a/client.h b/client.h index ae8ab32..d8da2bc 100644 --- a/client.h +++ b/client.h @@ -149,6 +149,8 @@ public: uint32_t getMaxIncomingPacketSize() const; + void sendOrQueueWill(); + #ifndef NDEBUG void setFakeUpgraded(); #endif diff --git a/mainapp.cpp b/mainapp.cpp index 36487f7..60ae030 100644 --- a/mainapp.cpp +++ b/mainapp.cpp @@ -285,6 +285,14 @@ void MainApp::queueRemoveExpiredSessions() } } +void MainApp::waitForAllThreadsQueuedWills() +{ + while(std::any_of(threads.begin(), threads.end(), [](std::shared_ptr t){ return !t->allWilssSentForExit; })) + { + usleep(1000); + } +} + void MainApp::saveState() { std::lock_guard lg(saveStateMutex); @@ -607,6 +615,14 @@ void MainApp::start() } } + logger->logf(LOG_DEBUG, "Having all client in all threads send or queue their will."); + for(std::shared_ptr &thread : threads) + { + thread->queueSendAllWills(); + } + + waitForAllThreadsQueuedWills(); + oneInstanceLock.unlock(); logger->logf(LOG_DEBUG, "Signaling threads to finish."); diff --git a/mainapp.h b/mainapp.h index 71809ba..0629c8e 100644 --- a/mainapp.h +++ b/mainapp.h @@ -93,6 +93,7 @@ class MainApp void saveStateInThread(); void queueSendQueuedWills(); void queueRemoveExpiredSessions(); + void waitForAllThreadsQueuedWills(); MainApp(const std::string &configFilePath); public: diff --git a/threaddata.cpp b/threaddata.cpp index d3ad1d8..b918296 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -162,6 +162,18 @@ void ThreadData::removeExpiredSessions() subscriptionStore->removeExpiredSessionsClients(); } +void ThreadData::sendAllWils() +{ + std::lock_guard lck(clients_by_fd_mutex); + + for(auto pairs : clients_by_fd) + { + pairs.second->sendOrQueueWill(); + } + + allWilssSentForExit = true; +} + void ThreadData::removeQueuedClients() { std::vector fds; @@ -389,6 +401,16 @@ void ThreadData::authPluginPeriodicEvent() authentication.periodicEvent(); } +void ThreadData::queueSendAllWills() +{ + std::lock_guard locker(taskQueueMutex); + + auto f = std::bind(&ThreadData::sendAllWils, this); + taskQueue.push_front(f); + + wakeUpThread(); +} + // TODO: profile how fast hash iteration is. Perhaps having a second list/vector is beneficial? void ThreadData::doKeepAliveCheck() { diff --git a/threaddata.h b/threaddata.h index 35ecdb7..3e0727b 100644 --- a/threaddata.h +++ b/threaddata.h @@ -66,6 +66,7 @@ class ThreadData void publishStat(const std::string &topic, uint64_t n); void sendQueuedWills(); void removeExpiredSessions(); + void sendAllWils(); void removeQueuedClients(); @@ -74,6 +75,7 @@ public: Authentication authentication; bool running = true; bool finished = false; + bool allWilssSentForExit = false; std::thread thread; int threadnr = 0; int epollfd = 0; @@ -117,6 +119,8 @@ public: void queueAuthPluginPeriodicEvent(); void authPluginPeriodicEvent(); + + void queueSendAllWills(); }; #endif // THREADDATA_H -- libgit2 0.21.4