diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 398df8e..31dd1ec 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -666,19 +666,23 @@ void SubscriptionStore::removeExpiredSessionsClients() auto it = queuedSessionRemovals.begin(); while (it != queuedSessionRemovals.end()) { - QueuedSessionRemoval &qsr = *it; - std::shared_ptr session = (*it).getSession(); - if (session) + const std::chrono::time_point &removeAt = it->first; + + if (removeAt > now) { - if (qsr.getExpiresAt() > now) - { - break; - } + break; + } + + std::vector> &sessionsFromSlot = it->second; + + for (std::weak_ptr ses : sessionsFromSlot) + { + std::shared_ptr lockedSession = ses.lock(); // A session could have been picked up again, so we have to verify its expiration status. - if (!session->hasActiveClient()) + if (lockedSession && !lockedSession->hasActiveClient()) { - removeSession(session); + removeSession(lockedSession); removedSessions++; } } @@ -690,7 +694,7 @@ void SubscriptionStore::removeExpiredSessionsClients() queuedRemovalsLeft = queuedSessionRemovals.size(); } - logger->logf(LOG_DEBUG, "Processed %d queued session removals, resuling in %d deleted expired sessions. %d queued removals in the future.", + logger->logf(LOG_DEBUG, "Processed %d queued session removals, resulting in %d deleted expired sessions. %d queued removals in the future.", processedRemovals, removedSessions, queuedRemovalsLeft); if (lastTreeCleanup + std::chrono::minutes(30) < now) @@ -705,7 +709,7 @@ void SubscriptionStore::removeExpiredSessionsClients() } /** - * @brief SubscriptionStore::queueSessionRemoval places session efficiently in a sorted list that is periodically dequeued. + * @brief SubscriptionStore::queueSessionRemoval places session efficiently in a sorted map that is periodically dequeued. * @param session */ void SubscriptionStore::queueSessionRemoval(const std::shared_ptr &session) @@ -713,18 +717,11 @@ void SubscriptionStore::queueSessionRemoval(const std::shared_ptr &sess if (!session) return; - QueuedSessionRemoval qsr(session); - - auto comp = [](const QueuedSessionRemoval &a, const QueuedSessionRemoval &b) - { - return a.getExpiresAt() < b.getExpiresAt(); - }; - + std::chrono::time_point removeAt = std::chrono::steady_clock::now() + std::chrono::seconds(session->getSessionExpiryInterval()); session->setQueuedRemovalAt(); std::lock_guard(this->queuedSessionRemovalsMutex); - auto pos = std::upper_bound(this->queuedSessionRemovals.begin(), this->queuedSessionRemovals.end(), qsr, comp); - this->queuedSessionRemovals.insert(pos, qsr); + queuedSessionRemovals[removeAt].push_back(session); } int64_t SubscriptionStore::getRetainedMessageCount() const @@ -1026,23 +1023,6 @@ RetainedMessageNode *RetainedMessageNode::getChildren(const std::string &subtopi return nullptr; } -QueuedSessionRemoval::QueuedSessionRemoval(const std::shared_ptr &session) : - session(session), - expiresAt(std::chrono::steady_clock::now() + std::chrono::seconds(session->getSessionExpiryInterval())) -{ - -} - -std::chrono::time_point QueuedSessionRemoval::getExpiresAt() const -{ - return this->expiresAt; -} - -std::shared_ptr QueuedSessionRemoval::getSession() const -{ - return session.lock(); -} - QueuedWill::QueuedWill(const std::shared_ptr &will, const std::shared_ptr &session) : will(will), session(session), diff --git a/subscriptionstore.h b/subscriptionstore.h index daf3e35..fb88ab6 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -22,6 +22,8 @@ License along with FlashMQ. If not, see . #include #include #include +#include +#include #include #include "forward_declarations.h" @@ -84,23 +86,6 @@ class RetainedMessageNode RetainedMessageNode *getChildren(const std::string &subtopic) const; }; -/** - * @brief A QueuedSessionRemoval is a sort of delayed request for removal. They are kept in a sorted list for fast insertion, - * and fast dequeueing of expired entries from the start. - * - * You can have multiple of these in the pending list. If a client has picked up the session again, the removal is not executed. - */ -class QueuedSessionRemoval -{ - std::weak_ptr session; - std::chrono::time_point expiresAt; - -public: - QueuedSessionRemoval(const std::shared_ptr &session); - std::chrono::time_point getExpiresAt() const; - std::shared_ptr getSession() const; -}; - class QueuedWill { std::weak_ptr will; @@ -128,7 +113,7 @@ class SubscriptionStore const std::unordered_map> &sessionsByIdConst; std::mutex queuedSessionRemovalsMutex; - std::list queuedSessionRemovals; + std::map, std::vector>> queuedSessionRemovals; pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; RetainedMessageNode retainedMessagesRoot;