Commit 96c1dd91cf5580d83d80d6903067e4b24765ef7b

Authored by Wiebe Cazemier
1 parent 4cd9300c

Queue session removals

This replaces the old style checking of expired sessions once every 10
minutes or so.
client.cpp
@@ -69,10 +69,13 @@ Client::~Client() @@ -69,10 +69,13 @@ Client::~Client()
69 close(fd); 69 close(fd);
70 } 70 }
71 71
72 - // MQTT-3.1.2-6  
73 if (session->getDestroyOnDisconnect()) 72 if (session->getDestroyOnDisconnect())
74 { 73 {
75 - store->removeSession(clientid); 74 + store->removeSession(session);
  75 + }
  76 + else
  77 + {
  78 + store->queueSessionRemoval(session);
76 } 79 }
77 } 80 }
78 81
subscriptionstore.cpp
@@ -269,24 +269,9 @@ bool SubscriptionStore::sessionPresent(const std::string &clientid) @@ -269,24 +269,9 @@ bool SubscriptionStore::sessionPresent(const std::string &clientid)
269 return result; 269 return result;
270 } 270 }
271 271
272 -/**  
273 - * @brief SubscriptionStore::purgeEmptyWills doesn't lock a mutex, because it's a helper for elsewhere.  
274 - */  
275 -void SubscriptionStore::purgeEmptyWills()  
276 -{  
277 - auto it = pendingWillMessages.begin();  
278 - while (it != pendingWillMessages.end())  
279 - {  
280 - std::shared_ptr<Publish> p = (*it).lock();  
281 - if (!p)  
282 - {  
283 - it = pendingWillMessages.erase(it);  
284 - }  
285 - }  
286 -}  
287 -  
288 void SubscriptionStore::sendQueuedWillMessages() 272 void SubscriptionStore::sendQueuedWillMessages()
289 { 273 {
  274 + const auto now = std::chrono::steady_clock::now();
290 std::lock_guard<std::mutex>(this->pendingWillsMutex); 275 std::lock_guard<std::mutex>(this->pendingWillsMutex);
291 276
292 auto it = pendingWillMessages.begin(); 277 auto it = pendingWillMessages.begin();
@@ -295,7 +280,7 @@ void SubscriptionStore::sendQueuedWillMessages() @@ -295,7 +280,7 @@ void SubscriptionStore::sendQueuedWillMessages()
295 std::shared_ptr<Publish> p = (*it).lock(); 280 std::shared_ptr<Publish> p = (*it).lock();
296 if (p) 281 if (p)
297 { 282 {
298 - if (p->createdAt + std::chrono::seconds(p->will_delay) > std::chrono::steady_clock::now()) 283 + if (p->createdAt + std::chrono::seconds(p->will_delay) > now)
299 break; 284 break;
300 285
301 logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() ); 286 logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() );
@@ -306,14 +291,15 @@ void SubscriptionStore::sendQueuedWillMessages() @@ -306,14 +291,15 @@ void SubscriptionStore::sendQueuedWillMessages()
306 } 291 }
307 } 292 }
308 293
309 -void SubscriptionStore::queueWillMessage(std::shared_ptr<Publish> &willMessage) 294 +void SubscriptionStore::queueWillMessage(std::shared_ptr<Publish> &willMessage, bool forceNow)
310 { 295 {
311 if (!willMessage) 296 if (!willMessage)
312 return; 297 return;
313 298
314 - logger->logf(LOG_DEBUG, "Queueing will on topic '%s', with delay %d seconds.", willMessage->topic.c_str(), willMessage->will_delay ); 299 + const int delay = forceNow ? 0 : willMessage->will_delay;
  300 + logger->logf(LOG_DEBUG, "Queueing will on topic '%s', with delay %d seconds.", willMessage->topic.c_str(), delay );
315 301
316 - if (willMessage->will_delay == 0) 302 + if (delay == 0)
317 { 303 {
318 PublishCopyFactory factory(willMessage.get()); 304 PublishCopyFactory factory(willMessage.get());
319 queuePacketAtSubscribers(factory); 305 queuePacketAtSubscribers(factory);
@@ -577,13 +563,20 @@ int SubscriptionNode::cleanSubscriptions() @@ -577,13 +563,20 @@ int SubscriptionNode::cleanSubscriptions()
577 return subscribers.size() + subscribersLeftInChildren; 563 return subscribers.size() + subscribersLeftInChildren;
578 } 564 }
579 565
580 -void SubscriptionStore::removeSession(const std::string &clientid) 566 +void SubscriptionStore::removeSession(const std::shared_ptr<Session> &session)
581 { 567 {
  568 + const std::string &clientid = session->getClientId();
  569 + logger->logf(LOG_DEBUG, "Removing session of client '%s'.", clientid.c_str());
  570 +
  571 + std::shared_ptr<Publish> &will = session->getWill();
  572 + if (will)
  573 + {
  574 + queueWillMessage(will, true);
  575 + }
  576 +
582 RWLockGuard lock_guard(&subscriptionsRwlock); 577 RWLockGuard lock_guard(&subscriptionsRwlock);
583 lock_guard.wrlock(); 578 lock_guard.wrlock();
584 579
585 - logger->logf(LOG_DEBUG, "Removing session of client '%s'.", clientid.c_str());  
586 -  
587 auto session_it = sessionsById.find(clientid); 580 auto session_it = sessionsById.find(clientid);
588 if (session_it != sessionsById.end()) 581 if (session_it != sessionsById.end())
589 { 582 {
@@ -600,37 +593,59 @@ void SubscriptionStore::removeExpiredSessionsClients() @@ -600,37 +593,59 @@ void SubscriptionStore::removeExpiredSessionsClients()
600 { 593 {
601 logger->logf(LOG_DEBUG, "Cleaning out old sessions"); 594 logger->logf(LOG_DEBUG, "Cleaning out old sessions");
602 595
603 - RWLockGuard lock_guard(&subscriptionsRwlock);  
604 - lock_guard.wrlock(); 596 + const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
605 597
606 - auto session_it = sessionsById.begin();  
607 - while (session_it != sessionsById.end())  
608 { 598 {
609 - std::shared_ptr<Session> &session = session_it->second; 599 + std::lock_guard<std::mutex>(this->queuedSessionRemovalsMutex);
610 600
611 - if (session->hasExpired()) 601 + auto it = queuedSessionRemovals.begin();
  602 + while (it != queuedSessionRemovals.end())
612 { 603 {
613 - logger->logf(LOG_DEBUG, "Removing expired session from store %s", session->getClientId().c_str());  
614 - std::shared_ptr<Publish> &will = session->getWill();  
615 - if (will) 604 + QueuedSessionRemoval &qsr = *it;
  605 + std::shared_ptr<Session> session = (*it).getSession();
  606 + if (session)
616 { 607 {
617 - will->will_delay = 0;  
618 - queueWillMessage(will); 608 + if (qsr.getExpiresAt() > now)
  609 + {
  610 + logger->logf(LOG_DEBUG, "Breaking from sorted list of queued session removals. %d left in the future.", queuedSessionRemovals.size());
  611 + break;
  612 + }
  613 +
  614 + // A session could have been picked up again, so we have to verify its expiration status.
  615 + if (session->hasExpired())
  616 + {
  617 + removeSession(session);
  618 + }
619 } 619 }
620 - session_it = sessionsById.erase(session_it); 620 + it = queuedSessionRemovals.erase(it);
621 } 621 }
622 - else  
623 - session_it++;  
624 } 622 }
625 623
626 - if (lastTreeCleanup + std::chrono::minutes(30) < std::chrono::steady_clock::now()) 624 + if (lastTreeCleanup + std::chrono::minutes(30) < now)
627 { 625 {
  626 + RWLockGuard lock_guard(&subscriptionsRwlock);
  627 + lock_guard.wrlock();
  628 +
628 logger->logf(LOG_NOTICE, "Rebuilding subscription tree"); 629 logger->logf(LOG_NOTICE, "Rebuilding subscription tree");
629 root.cleanSubscriptions(); 630 root.cleanSubscriptions();
630 - lastTreeCleanup = std::chrono::steady_clock::now(); 631 + lastTreeCleanup = now;
631 } 632 }
632 } 633 }
633 634
  635 +void SubscriptionStore::queueSessionRemoval(const std::shared_ptr<Session> &session)
  636 +{
  637 + QueuedSessionRemoval qsr(session);
  638 +
  639 + auto comp = [](const QueuedSessionRemoval &a, const QueuedSessionRemoval &b)
  640 + {
  641 + return a.getExpiresAt() < b.getExpiresAt();
  642 + };
  643 +
  644 + std::lock_guard<std::mutex>(this->queuedSessionRemovalsMutex);
  645 + auto pos = std::upper_bound(this->queuedSessionRemovals.begin(), this->queuedSessionRemovals.end(), qsr, comp);
  646 + this->queuedSessionRemovals.insert(pos, qsr);
  647 +}
  648 +
634 int64_t SubscriptionStore::getRetainedMessageCount() const 649 int64_t SubscriptionStore::getRetainedMessageCount() const
635 { 650 {
636 return retainedMessageCount; 651 return retainedMessageCount;
@@ -927,3 +942,20 @@ RetainedMessageNode *RetainedMessageNode::getChildren(const std::string &amp;subtopi @@ -927,3 +942,20 @@ RetainedMessageNode *RetainedMessageNode::getChildren(const std::string &amp;subtopi
927 return it->second.get(); 942 return it->second.get();
928 return nullptr; 943 return nullptr;
929 } 944 }
  945 +
  946 +QueuedSessionRemoval::QueuedSessionRemoval(const std::shared_ptr<Session> &session) :
  947 + session(session),
  948 + expiresAt(std::chrono::steady_clock::now() + std::chrono::seconds(session->getSessionExpiryInterval()))
  949 +{
  950 +
  951 +}
  952 +
  953 +std::chrono::time_point<std::chrono::steady_clock> QueuedSessionRemoval::getExpiresAt() const
  954 +{
  955 + return this->expiresAt;
  956 +}
  957 +
  958 +std::shared_ptr<Session> QueuedSessionRemoval::getSession() const
  959 +{
  960 + return session.lock();
  961 +}
subscriptionstore.h
@@ -84,6 +84,17 @@ class RetainedMessageNode @@ -84,6 +84,17 @@ class RetainedMessageNode
84 RetainedMessageNode *getChildren(const std::string &subtopic) const; 84 RetainedMessageNode *getChildren(const std::string &subtopic) const;
85 }; 85 };
86 86
  87 +class QueuedSessionRemoval
  88 +{
  89 + std::weak_ptr<Session> session;
  90 + std::chrono::time_point<std::chrono::steady_clock> expiresAt;
  91 +
  92 +public:
  93 + QueuedSessionRemoval(const std::shared_ptr<Session> &session);
  94 + std::chrono::time_point<std::chrono::steady_clock> getExpiresAt() const;
  95 + std::shared_ptr<Session> getSession() const;
  96 +};
  97 +
87 class SubscriptionStore 98 class SubscriptionStore
88 { 99 {
89 #ifdef TESTING 100 #ifdef TESTING
@@ -96,6 +107,9 @@ class SubscriptionStore @@ -96,6 +107,9 @@ class SubscriptionStore
96 std::unordered_map<std::string, std::shared_ptr<Session>> sessionsById; 107 std::unordered_map<std::string, std::shared_ptr<Session>> sessionsById;
97 const std::unordered_map<std::string, std::shared_ptr<Session>> &sessionsByIdConst; 108 const std::unordered_map<std::string, std::shared_ptr<Session>> &sessionsByIdConst;
98 109
  110 + std::mutex queuedSessionRemovalsMutex;
  111 + std::list<QueuedSessionRemoval> queuedSessionRemovals;
  112 +
99 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; 113 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER;
100 RetainedMessageNode retainedMessagesRoot; 114 RetainedMessageNode retainedMessagesRoot;
101 RetainedMessageNode retainedMessagesRootDollar; 115 RetainedMessageNode retainedMessagesRootDollar;
@@ -121,8 +135,6 @@ class SubscriptionStore @@ -121,8 +135,6 @@ class SubscriptionStore
121 void countSubscriptions(SubscriptionNode *this_node, int64_t &count) const; 135 void countSubscriptions(SubscriptionNode *this_node, int64_t &count) const;
122 136
123 SubscriptionNode *getDeepestNode(const std::string &topic, const std::vector<std::string> &subtopics); 137 SubscriptionNode *getDeepestNode(const std::string &topic, const std::vector<std::string> &subtopics);
124 -  
125 - void purgeEmptyWills();  
126 public: 138 public:
127 SubscriptionStore(); 139 SubscriptionStore();
128 140
@@ -133,14 +145,14 @@ public: @@ -133,14 +145,14 @@ public:
133 bool sessionPresent(const std::string &clientid); 145 bool sessionPresent(const std::string &clientid);
134 146
135 void sendQueuedWillMessages(); 147 void sendQueuedWillMessages();
136 - void queueWillMessage(std::shared_ptr<Publish> &willMessage); 148 + void queueWillMessage(std::shared_ptr<Publish> &willMessage, bool forceNow = false);
137 void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, bool dollar = false); 149 void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, bool dollar = false);
138 uint64_t giveClientRetainedMessages(const std::shared_ptr<Client> &client, const std::shared_ptr<Session> &ses, 150 uint64_t giveClientRetainedMessages(const std::shared_ptr<Client> &client, const std::shared_ptr<Session> &ses,
139 const std::vector<std::string> &subscribeSubtopics, char max_qos); 151 const std::vector<std::string> &subscribeSubtopics, char max_qos);
140 152
141 void setRetainedMessage(const std::string &topic, const std::vector<std::string> &subtopics, const std::string &payload, char qos); 153 void setRetainedMessage(const std::string &topic, const std::vector<std::string> &subtopics, const std::string &payload, char qos);
142 154
143 - void removeSession(const std::string &clientid); 155 + void removeSession(const std::shared_ptr<Session> &session);
144 void removeExpiredSessionsClients(); 156 void removeExpiredSessionsClients();
145 157
146 int64_t getRetainedMessageCount() const; 158 int64_t getRetainedMessageCount() const;
@@ -152,6 +164,8 @@ public: @@ -152,6 +164,8 @@ public:
152 164
153 void saveSessionsAndSubscriptions(const std::string &filePath); 165 void saveSessionsAndSubscriptions(const std::string &filePath);
154 void loadSessionsAndSubscriptions(const std::string &filePath); 166 void loadSessionsAndSubscriptions(const std::string &filePath);
  167 +
  168 + void queueSessionRemoval(const std::shared_ptr<Session> &session);
155 }; 169 };
156 170
157 #endif // SUBSCRIPTIONSTORE_H 171 #endif // SUBSCRIPTIONSTORE_H
types.cpp
@@ -134,12 +134,6 @@ void PublishBase::setClientSpecificProperties() @@ -134,12 +134,6 @@ void PublishBase::setClientSpecificProperties()
134 propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count()); 134 propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count());
135 } 135 }
136 136
137 -bool PublishBase::hasExpired() const  
138 -{  
139 - auto now = std::chrono::steady_clock::now();  
140 - return (createdAt + expiresAfter) > now;  
141 -}  
142 -  
143 Publish::Publish(const Publish &other) : 137 Publish::Publish(const Publish &other) :
144 PublishBase(other) 138 PublishBase(other)
145 { 139 {
@@ -209,7 +209,6 @@ public: @@ -209,7 +209,6 @@ public:
209 PublishBase(const std::string &topic, const std::string &payload, char qos); 209 PublishBase(const std::string &topic, const std::string &payload, char qos);
210 size_t getLengthWithoutFixedHeader() const; 210 size_t getLengthWithoutFixedHeader() const;
211 void setClientSpecificProperties(); 211 void setClientSpecificProperties();
212 - bool hasExpired() const;  
213 }; 212 };
214 213
215 class Publish : public PublishBase 214 class Publish : public PublishBase