Commit 8198690242e0173fbf1a8e20f9ecd302a50884b1

Authored by Wiebe Cazemier
1 parent fdef7ff3

Queue session removal with second granularity

There's no point in keeping a vector of removals per nanosecond.
subscriptionstore.cpp
@@ -661,6 +661,8 @@ void SubscriptionStore::removeExpiredSessionsClients() @@ -661,6 +661,8 @@ void SubscriptionStore::removeExpiredSessionsClients()
661 logger->logf(LOG_DEBUG, "Cleaning out old sessions"); 661 logger->logf(LOG_DEBUG, "Cleaning out old sessions");
662 662
663 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now(); 663 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
  664 + const std::chrono::seconds secondsSinceEpoch = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
  665 +
664 int removedSessions = 0; 666 int removedSessions = 0;
665 int processedRemovals = 0; 667 int processedRemovals = 0;
666 int queuedRemovalsLeft = -1; 668 int queuedRemovalsLeft = -1;
@@ -673,9 +675,9 @@ void SubscriptionStore::removeExpiredSessionsClients() @@ -673,9 +675,9 @@ void SubscriptionStore::removeExpiredSessionsClients()
673 auto it = queuedSessionRemovals.begin(); 675 auto it = queuedSessionRemovals.begin();
674 while (it != queuedSessionRemovals.end()) 676 while (it != queuedSessionRemovals.end())
675 { 677 {
676 - const std::chrono::time_point<std::chrono::steady_clock> &removeAt = it->first; 678 + const std::chrono::seconds &removeAt = it->first;
677 679
678 - if (removeAt > now) 680 + if (removeAt > secondsSinceEpoch)
679 { 681 {
680 break; 682 break;
681 } 683 }
@@ -725,10 +727,11 @@ void SubscriptionStore::queueSessionRemoval(const std::shared_ptr&lt;Session&gt; &amp;sess @@ -725,10 +727,11 @@ void SubscriptionStore::queueSessionRemoval(const std::shared_ptr&lt;Session&gt; &amp;sess
725 return; 727 return;
726 728
727 std::chrono::time_point<std::chrono::steady_clock> removeAt = std::chrono::steady_clock::now() + std::chrono::seconds(session->getSessionExpiryInterval()); 729 std::chrono::time_point<std::chrono::steady_clock> removeAt = std::chrono::steady_clock::now() + std::chrono::seconds(session->getSessionExpiryInterval());
  730 + std::chrono::seconds secondsSinceEpoch = std::chrono::duration_cast<std::chrono::seconds>(removeAt.time_since_epoch());
728 session->setQueuedRemovalAt(); 731 session->setQueuedRemovalAt();
729 732
730 std::lock_guard<std::mutex> locker(this->queuedSessionRemovalsMutex); 733 std::lock_guard<std::mutex> locker(this->queuedSessionRemovalsMutex);
731 - queuedSessionRemovals[removeAt].push_back(session); 734 + queuedSessionRemovals[secondsSinceEpoch].push_back(session);
732 } 735 }
733 736
734 int64_t SubscriptionStore::getRetainedMessageCount() const 737 int64_t SubscriptionStore::getRetainedMessageCount() const
subscriptionstore.h
@@ -111,7 +111,7 @@ class SubscriptionStore @@ -111,7 +111,7 @@ class SubscriptionStore
111 const std::unordered_map<std::string, std::shared_ptr<Session>> &sessionsByIdConst; 111 const std::unordered_map<std::string, std::shared_ptr<Session>> &sessionsByIdConst;
112 112
113 std::mutex queuedSessionRemovalsMutex; 113 std::mutex queuedSessionRemovalsMutex;
114 - std::map<std::chrono::time_point<std::chrono::steady_clock>, std::vector<std::weak_ptr<Session>>> queuedSessionRemovals; 114 + std::map<std::chrono::seconds, std::vector<std::weak_ptr<Session>>> queuedSessionRemovals;
115 115
116 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; 116 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER;
117 RetainedMessageNode retainedMessagesRoot; 117 RetainedMessageNode retainedMessagesRoot;