From 96c1dd91cf5580d83d80d6903067e4b24765ef7b Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Wed, 23 Mar 2022 17:25:25 +0100 Subject: [PATCH] Queue session removals --- client.cpp | 7 +++++-- subscriptionstore.cpp | 110 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------------------- subscriptionstore.h | 22 ++++++++++++++++++---- types.cpp | 6 ------ types.h | 1 - 5 files changed, 94 insertions(+), 52 deletions(-) diff --git a/client.cpp b/client.cpp index ec4ab55..088a9c1 100644 --- a/client.cpp +++ b/client.cpp @@ -69,10 +69,13 @@ Client::~Client() close(fd); } - // MQTT-3.1.2-6 if (session->getDestroyOnDisconnect()) { - store->removeSession(clientid); + store->removeSession(session); + } + else + { + store->queueSessionRemoval(session); } } diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 21fd2b5..61be9bd 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -269,24 +269,9 @@ bool SubscriptionStore::sessionPresent(const std::string &clientid) return result; } -/** - * @brief SubscriptionStore::purgeEmptyWills doesn't lock a mutex, because it's a helper for elsewhere. - */ -void SubscriptionStore::purgeEmptyWills() -{ - auto it = pendingWillMessages.begin(); - while (it != pendingWillMessages.end()) - { - std::shared_ptr p = (*it).lock(); - if (!p) - { - it = pendingWillMessages.erase(it); - } - } -} - void SubscriptionStore::sendQueuedWillMessages() { + const auto now = std::chrono::steady_clock::now(); std::lock_guard(this->pendingWillsMutex); auto it = pendingWillMessages.begin(); @@ -295,7 +280,7 @@ void SubscriptionStore::sendQueuedWillMessages() std::shared_ptr p = (*it).lock(); if (p) { - if (p->createdAt + std::chrono::seconds(p->will_delay) > std::chrono::steady_clock::now()) + if (p->createdAt + std::chrono::seconds(p->will_delay) > now) break; logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() ); @@ -306,14 +291,15 @@ void SubscriptionStore::sendQueuedWillMessages() } } -void SubscriptionStore::queueWillMessage(std::shared_ptr &willMessage) +void SubscriptionStore::queueWillMessage(std::shared_ptr &willMessage, bool forceNow) { if (!willMessage) return; - logger->logf(LOG_DEBUG, "Queueing will on topic '%s', with delay %d seconds.", willMessage->topic.c_str(), willMessage->will_delay ); + const int delay = forceNow ? 0 : willMessage->will_delay; + logger->logf(LOG_DEBUG, "Queueing will on topic '%s', with delay %d seconds.", willMessage->topic.c_str(), delay ); - if (willMessage->will_delay == 0) + if (delay == 0) { PublishCopyFactory factory(willMessage.get()); queuePacketAtSubscribers(factory); @@ -577,13 +563,20 @@ int SubscriptionNode::cleanSubscriptions() return subscribers.size() + subscribersLeftInChildren; } -void SubscriptionStore::removeSession(const std::string &clientid) +void SubscriptionStore::removeSession(const std::shared_ptr &session) { + const std::string &clientid = session->getClientId(); + logger->logf(LOG_DEBUG, "Removing session of client '%s'.", clientid.c_str()); + + std::shared_ptr &will = session->getWill(); + if (will) + { + queueWillMessage(will, true); + } + RWLockGuard lock_guard(&subscriptionsRwlock); lock_guard.wrlock(); - logger->logf(LOG_DEBUG, "Removing session of client '%s'.", clientid.c_str()); - auto session_it = sessionsById.find(clientid); if (session_it != sessionsById.end()) { @@ -600,37 +593,59 @@ void SubscriptionStore::removeExpiredSessionsClients() { logger->logf(LOG_DEBUG, "Cleaning out old sessions"); - RWLockGuard lock_guard(&subscriptionsRwlock); - lock_guard.wrlock(); + const std::chrono::time_point now = std::chrono::steady_clock::now(); - auto session_it = sessionsById.begin(); - while (session_it != sessionsById.end()) { - std::shared_ptr &session = session_it->second; + std::lock_guard(this->queuedSessionRemovalsMutex); - if (session->hasExpired()) + auto it = queuedSessionRemovals.begin(); + while (it != queuedSessionRemovals.end()) { - logger->logf(LOG_DEBUG, "Removing expired session from store %s", session->getClientId().c_str()); - std::shared_ptr &will = session->getWill(); - if (will) + QueuedSessionRemoval &qsr = *it; + std::shared_ptr session = (*it).getSession(); + if (session) { - will->will_delay = 0; - queueWillMessage(will); + if (qsr.getExpiresAt() > now) + { + logger->logf(LOG_DEBUG, "Breaking from sorted list of queued session removals. %d left in the future.", queuedSessionRemovals.size()); + break; + } + + // A session could have been picked up again, so we have to verify its expiration status. + if (session->hasExpired()) + { + removeSession(session); + } } - session_it = sessionsById.erase(session_it); + it = queuedSessionRemovals.erase(it); } - else - session_it++; } - if (lastTreeCleanup + std::chrono::minutes(30) < std::chrono::steady_clock::now()) + if (lastTreeCleanup + std::chrono::minutes(30) < now) { + RWLockGuard lock_guard(&subscriptionsRwlock); + lock_guard.wrlock(); + logger->logf(LOG_NOTICE, "Rebuilding subscription tree"); root.cleanSubscriptions(); - lastTreeCleanup = std::chrono::steady_clock::now(); + lastTreeCleanup = now; } } +void SubscriptionStore::queueSessionRemoval(const std::shared_ptr &session) +{ + QueuedSessionRemoval qsr(session); + + auto comp = [](const QueuedSessionRemoval &a, const QueuedSessionRemoval &b) + { + return a.getExpiresAt() < b.getExpiresAt(); + }; + + std::lock_guard(this->queuedSessionRemovalsMutex); + auto pos = std::upper_bound(this->queuedSessionRemovals.begin(), this->queuedSessionRemovals.end(), qsr, comp); + this->queuedSessionRemovals.insert(pos, qsr); +} + int64_t SubscriptionStore::getRetainedMessageCount() const { return retainedMessageCount; @@ -927,3 +942,20 @@ RetainedMessageNode *RetainedMessageNode::getChildren(const std::string &subtopi return it->second.get(); return nullptr; } + +QueuedSessionRemoval::QueuedSessionRemoval(const std::shared_ptr &session) : + session(session), + expiresAt(std::chrono::steady_clock::now() + std::chrono::seconds(session->getSessionExpiryInterval())) +{ + +} + +std::chrono::time_point QueuedSessionRemoval::getExpiresAt() const +{ + return this->expiresAt; +} + +std::shared_ptr QueuedSessionRemoval::getSession() const +{ + return session.lock(); +} diff --git a/subscriptionstore.h b/subscriptionstore.h index 187cd25..d626b91 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -84,6 +84,17 @@ class RetainedMessageNode RetainedMessageNode *getChildren(const std::string &subtopic) const; }; +class QueuedSessionRemoval +{ + std::weak_ptr session; + std::chrono::time_point expiresAt; + +public: + QueuedSessionRemoval(const std::shared_ptr &session); + std::chrono::time_point getExpiresAt() const; + std::shared_ptr getSession() const; +}; + class SubscriptionStore { #ifdef TESTING @@ -96,6 +107,9 @@ class SubscriptionStore std::unordered_map> sessionsById; const std::unordered_map> &sessionsByIdConst; + std::mutex queuedSessionRemovalsMutex; + std::list queuedSessionRemovals; + pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; RetainedMessageNode retainedMessagesRoot; RetainedMessageNode retainedMessagesRootDollar; @@ -121,8 +135,6 @@ class SubscriptionStore void countSubscriptions(SubscriptionNode *this_node, int64_t &count) const; SubscriptionNode *getDeepestNode(const std::string &topic, const std::vector &subtopics); - - void purgeEmptyWills(); public: SubscriptionStore(); @@ -133,14 +145,14 @@ public: bool sessionPresent(const std::string &clientid); void sendQueuedWillMessages(); - void queueWillMessage(std::shared_ptr &willMessage); + void queueWillMessage(std::shared_ptr &willMessage, bool forceNow = false); void queuePacketAtSubscribers(PublishCopyFactory ©Factory, bool dollar = false); uint64_t giveClientRetainedMessages(const std::shared_ptr &client, const std::shared_ptr &ses, const std::vector &subscribeSubtopics, char max_qos); void setRetainedMessage(const std::string &topic, const std::vector &subtopics, const std::string &payload, char qos); - void removeSession(const std::string &clientid); + void removeSession(const std::shared_ptr &session); void removeExpiredSessionsClients(); int64_t getRetainedMessageCount() const; @@ -152,6 +164,8 @@ public: void saveSessionsAndSubscriptions(const std::string &filePath); void loadSessionsAndSubscriptions(const std::string &filePath); + + void queueSessionRemoval(const std::shared_ptr &session); }; #endif // SUBSCRIPTIONSTORE_H diff --git a/types.cpp b/types.cpp index 2fe1fc0..b4ea0a0 100644 --- a/types.cpp +++ b/types.cpp @@ -134,12 +134,6 @@ void PublishBase::setClientSpecificProperties() propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count()); } -bool PublishBase::hasExpired() const -{ - auto now = std::chrono::steady_clock::now(); - return (createdAt + expiresAfter) > now; -} - Publish::Publish(const Publish &other) : PublishBase(other) { diff --git a/types.h b/types.h index b690859..6875c55 100644 --- a/types.h +++ b/types.h @@ -209,7 +209,6 @@ public: PublishBase(const std::string &topic, const std::string &payload, char qos); size_t getLengthWithoutFixedHeader() const; void setClientSpecificProperties(); - bool hasExpired() const; }; class Publish : public PublishBase -- libgit2 0.21.4