Commit 20b1d0a002b6ffa1d113020dc66b9ab2309a52c8

Authored by Wiebe Cazemier
Committed by Wiebe Cazemier
1 parent c5f8e527

Use ordered map for session removals

This is way faster.
subscriptionstore.cpp
@@ -666,19 +666,23 @@ void SubscriptionStore::removeExpiredSessionsClients() @@ -666,19 +666,23 @@ void SubscriptionStore::removeExpiredSessionsClients()
666 auto it = queuedSessionRemovals.begin(); 666 auto it = queuedSessionRemovals.begin();
667 while (it != queuedSessionRemovals.end()) 667 while (it != queuedSessionRemovals.end())
668 { 668 {
669 - QueuedSessionRemoval &qsr = *it;  
670 - std::shared_ptr<Session> session = (*it).getSession();  
671 - if (session) 669 + const std::chrono::time_point<std::chrono::steady_clock> &removeAt = it->first;
  670 +
  671 + if (removeAt > now)
672 { 672 {
673 - if (qsr.getExpiresAt() > now)  
674 - {  
675 - break;  
676 - } 673 + break;
  674 + }
  675 +
  676 + std::vector<std::weak_ptr<Session>> &sessionsFromSlot = it->second;
  677 +
  678 + for (std::weak_ptr<Session> ses : sessionsFromSlot)
  679 + {
  680 + std::shared_ptr<Session> lockedSession = ses.lock();
677 681
678 // A session could have been picked up again, so we have to verify its expiration status. 682 // A session could have been picked up again, so we have to verify its expiration status.
679 - if (!session->hasActiveClient()) 683 + if (lockedSession && !lockedSession->hasActiveClient())
680 { 684 {
681 - removeSession(session); 685 + removeSession(lockedSession);
682 removedSessions++; 686 removedSessions++;
683 } 687 }
684 } 688 }
@@ -690,7 +694,7 @@ void SubscriptionStore::removeExpiredSessionsClients() @@ -690,7 +694,7 @@ void SubscriptionStore::removeExpiredSessionsClients()
690 queuedRemovalsLeft = queuedSessionRemovals.size(); 694 queuedRemovalsLeft = queuedSessionRemovals.size();
691 } 695 }
692 696
693 - logger->logf(LOG_DEBUG, "Processed %d queued session removals, resuling in %d deleted expired sessions. %d queued removals in the future.", 697 + logger->logf(LOG_DEBUG, "Processed %d queued session removals, resulting in %d deleted expired sessions. %d queued removals in the future.",
694 processedRemovals, removedSessions, queuedRemovalsLeft); 698 processedRemovals, removedSessions, queuedRemovalsLeft);
695 699
696 if (lastTreeCleanup + std::chrono::minutes(30) < now) 700 if (lastTreeCleanup + std::chrono::minutes(30) < now)
@@ -705,7 +709,7 @@ void SubscriptionStore::removeExpiredSessionsClients() @@ -705,7 +709,7 @@ void SubscriptionStore::removeExpiredSessionsClients()
705 } 709 }
706 710
707 /** 711 /**
708 - * @brief SubscriptionStore::queueSessionRemoval places session efficiently in a sorted list that is periodically dequeued. 712 + * @brief SubscriptionStore::queueSessionRemoval places session efficiently in a sorted map that is periodically dequeued.
709 * @param session 713 * @param session
710 */ 714 */
711 void SubscriptionStore::queueSessionRemoval(const std::shared_ptr<Session> &session) 715 void SubscriptionStore::queueSessionRemoval(const std::shared_ptr<Session> &session)
@@ -713,18 +717,11 @@ void SubscriptionStore::queueSessionRemoval(const std::shared_ptr&lt;Session&gt; &amp;sess @@ -713,18 +717,11 @@ void SubscriptionStore::queueSessionRemoval(const std::shared_ptr&lt;Session&gt; &amp;sess
713 if (!session) 717 if (!session)
714 return; 718 return;
715 719
716 - QueuedSessionRemoval qsr(session);  
717 -  
718 - auto comp = [](const QueuedSessionRemoval &a, const QueuedSessionRemoval &b)  
719 - {  
720 - return a.getExpiresAt() < b.getExpiresAt();  
721 - };  
722 - 720 + std::chrono::time_point<std::chrono::steady_clock> removeAt = std::chrono::steady_clock::now() + std::chrono::seconds(session->getSessionExpiryInterval());
723 session->setQueuedRemovalAt(); 721 session->setQueuedRemovalAt();
724 722
725 std::lock_guard<std::mutex>(this->queuedSessionRemovalsMutex); 723 std::lock_guard<std::mutex>(this->queuedSessionRemovalsMutex);
726 - auto pos = std::upper_bound(this->queuedSessionRemovals.begin(), this->queuedSessionRemovals.end(), qsr, comp);  
727 - this->queuedSessionRemovals.insert(pos, qsr); 724 + queuedSessionRemovals[removeAt].push_back(session);
728 } 725 }
729 726
730 int64_t SubscriptionStore::getRetainedMessageCount() const 727 int64_t SubscriptionStore::getRetainedMessageCount() const
@@ -1026,23 +1023,6 @@ RetainedMessageNode *RetainedMessageNode::getChildren(const std::string &amp;subtopi @@ -1026,23 +1023,6 @@ RetainedMessageNode *RetainedMessageNode::getChildren(const std::string &amp;subtopi
1026 return nullptr; 1023 return nullptr;
1027 } 1024 }
1028 1025
1029 -QueuedSessionRemoval::QueuedSessionRemoval(const std::shared_ptr<Session> &session) :  
1030 - session(session),  
1031 - expiresAt(std::chrono::steady_clock::now() + std::chrono::seconds(session->getSessionExpiryInterval()))  
1032 -{  
1033 -  
1034 -}  
1035 -  
1036 -std::chrono::time_point<std::chrono::steady_clock> QueuedSessionRemoval::getExpiresAt() const  
1037 -{  
1038 - return this->expiresAt;  
1039 -}  
1040 -  
1041 -std::shared_ptr<Session> QueuedSessionRemoval::getSession() const  
1042 -{  
1043 - return session.lock();  
1044 -}  
1045 -  
1046 QueuedWill::QueuedWill(const std::shared_ptr<WillPublish> &will, const std::shared_ptr<Session> &session) : 1026 QueuedWill::QueuedWill(const std::shared_ptr<WillPublish> &will, const std::shared_ptr<Session> &session) :
1047 will(will), 1027 will(will),
1048 session(session), 1028 session(session),
subscriptionstore.h
@@ -22,6 +22,8 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -22,6 +22,8 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
22 #include <forward_list> 22 #include <forward_list>
23 #include <list> 23 #include <list>
24 #include <mutex> 24 #include <mutex>
  25 +#include <map>
  26 +#include <vector>
25 #include <pthread.h> 27 #include <pthread.h>
26 28
27 #include "forward_declarations.h" 29 #include "forward_declarations.h"
@@ -84,23 +86,6 @@ class RetainedMessageNode @@ -84,23 +86,6 @@ class RetainedMessageNode
84 RetainedMessageNode *getChildren(const std::string &subtopic) const; 86 RetainedMessageNode *getChildren(const std::string &subtopic) const;
85 }; 87 };
86 88
87 -/**  
88 - * @brief A QueuedSessionRemoval is a sort of delayed request for removal. They are kept in a sorted list for fast insertion,  
89 - * and fast dequeueing of expired entries from the start.  
90 - *  
91 - * You can have multiple of these in the pending list. If a client has picked up the session again, the removal is not executed.  
92 - */  
93 -class QueuedSessionRemoval  
94 -{  
95 - std::weak_ptr<Session> session;  
96 - std::chrono::time_point<std::chrono::steady_clock> expiresAt;  
97 -  
98 -public:  
99 - QueuedSessionRemoval(const std::shared_ptr<Session> &session);  
100 - std::chrono::time_point<std::chrono::steady_clock> getExpiresAt() const;  
101 - std::shared_ptr<Session> getSession() const;  
102 -};  
103 -  
104 class QueuedWill 89 class QueuedWill
105 { 90 {
106 std::weak_ptr<WillPublish> will; 91 std::weak_ptr<WillPublish> will;
@@ -128,7 +113,7 @@ class SubscriptionStore @@ -128,7 +113,7 @@ class SubscriptionStore
128 const std::unordered_map<std::string, std::shared_ptr<Session>> &sessionsByIdConst; 113 const std::unordered_map<std::string, std::shared_ptr<Session>> &sessionsByIdConst;
129 114
130 std::mutex queuedSessionRemovalsMutex; 115 std::mutex queuedSessionRemovalsMutex;
131 - std::list<QueuedSessionRemoval> queuedSessionRemovals; 116 + std::map<std::chrono::time_point<std::chrono::steady_clock>, std::vector<std::weak_ptr<Session>>> queuedSessionRemovals;
132 117
133 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; 118 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER;
134 RetainedMessageNode retainedMessagesRoot; 119 RetainedMessageNode retainedMessagesRoot;