Commit b18af18a2f4e0292acdd9995920e4df5b7da8220

Authored by Wiebe Cazemier
1 parent 71ffc7d5

Queue wills in ordered map

This is fast(er).
subscriptionstore.cpp
@@ -288,8 +288,6 @@ std::shared_ptr<Session> SubscriptionStore::lockSession(const std::string &clien @@ -288,8 +288,6 @@ std::shared_ptr<Session> SubscriptionStore::lockSession(const std::string &clien
288 /** 288 /**
289 * @brief SubscriptionStore::sendQueuedWillMessages sends queued will messages. 289 * @brief SubscriptionStore::sendQueuedWillMessages sends queued will messages.
290 * 290 *
291 - * The list of pendingWillMessages is sorted. This allows for fast insertion and dequeueing of wills that have expired.  
292 - *  
293 * The expiry interval as set in the properties of the will message is not used to check for expiration here. To 291 * The expiry interval as set in the properties of the will message is not used to check for expiration here. To
294 * quote the specs: "If present, the Four Byte value is the lifetime of the Will Message in seconds and is sent as 292 * quote the specs: "If present, the Four Byte value is the lifetime of the Will Message in seconds and is sent as
295 * the Publication Expiry Interval when the Server publishes the Will Message." 293 * the Publication Expiry Interval when the Server publishes the Will Message."
@@ -305,39 +303,48 @@ void SubscriptionStore::sendQueuedWillMessages() @@ -305,39 +303,48 @@ void SubscriptionStore::sendQueuedWillMessages()
305 auto it = pendingWillMessages.begin(); 303 auto it = pendingWillMessages.begin();
306 while (it != pendingWillMessages.end()) 304 while (it != pendingWillMessages.end())
307 { 305 {
308 - QueuedWill &qw = *it; 306 + const std::chrono::time_point<std::chrono::steady_clock> &sendAt = it->first;
309 307
310 - std::shared_ptr<Publish> p = qw.getWill().lock();  
311 - if (p)  
312 - {  
313 - if (qw.getSendAt() > now)  
314 - break; 308 + if (sendAt > now)
  309 + break;
315 310
316 - std::shared_ptr<Session> s = qw.getSession(); 311 + std::vector<QueuedWill> &willsOfSlot = it->second;
  312 +
  313 + for(QueuedWill &will : willsOfSlot)
  314 + {
  315 + std::shared_ptr<Publish> p = will.getWill().lock();
317 316
318 - if (!s || s->hasActiveClient()) 317 + // If sessions get a new will, or the will is cleared from a new connecting client, this entry
  318 + // will be null and we can ignore it.
  319 + if (p)
319 { 320 {
320 - it = pendingWillMessages.erase(it);  
321 - continue;  
322 - } 321 + std::shared_ptr<Session> s = will.getSession();
323 322
324 - logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() );  
325 - PublishCopyFactory factory(p.get());  
326 - queuePacketAtSubscribers(factory); 323 + // Check for stale wills, or sessions that have become active again.
  324 + if (s && !s->hasActiveClient())
  325 + {
  326 + logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() );
  327 + PublishCopyFactory factory(p.get());
  328 + queuePacketAtSubscribers(factory);
327 329
328 - if (p->retain)  
329 - setRetainedMessage(*p.get(), (*p.get()).subtopics); 330 + if (p->retain)
  331 + setRetainedMessage(*p, p->subtopics);
330 332
331 - s->clearWill(); 333 + s->clearWill();
  334 + }
  335 + }
332 } 336 }
333 it = pendingWillMessages.erase(it); 337 it = pendingWillMessages.erase(it);
334 } 338 }
335 } 339 }
336 340
337 /** 341 /**
338 - * @brief SubscriptionStore::queueWillMessage queues the will message by bin-searching its place in the sorted list. 342 + * @brief SubscriptionStore::queueWillMessage queues the will message in a sorted map.
339 * @param willMessage 343 * @param willMessage
340 * @param forceNow 344 * @param forceNow
  345 + *
  346 + * The queued will is only valid for that time. Should a new will be placed in the map for a session, the original shared_ptr
  347 + * will be cleared and the previously queued entry is void (but still there, so it needs to be checked).
341 */ 348 */
342 void SubscriptionStore::queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow) 349 void SubscriptionStore::queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow)
343 { 350 {
@@ -365,10 +372,10 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr&lt;WillPublish&gt; &amp;wil @@ -365,10 +372,10 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr&lt;WillPublish&gt; &amp;wil
365 willMessage->setQueuedAt(); 372 willMessage->setQueuedAt();
366 373
367 QueuedWill queuedWill(willMessage, session); 374 QueuedWill queuedWill(willMessage, session);
  375 + const std::chrono::time_point<std::chrono::steady_clock> sendWillAt = std::chrono::steady_clock::now() + std::chrono::seconds(willMessage->will_delay);
368 376
369 std::lock_guard<std::mutex> locker(this->pendingWillsMutex); 377 std::lock_guard<std::mutex> locker(this->pendingWillsMutex);
370 - auto pos = std::upper_bound(this->pendingWillMessages.begin(), this->pendingWillMessages.end(), willMessage, willDelayCompare);  
371 - this->pendingWillMessages.insert(pos, queuedWill); 378 + this->pendingWillMessages[sendWillAt].push_back(queuedWill);
372 } 379 }
373 380
374 void SubscriptionStore::publishNonRecursively(const std::unordered_map<std::string, Subscription> &subscribers, 381 void SubscriptionStore::publishNonRecursively(const std::unordered_map<std::string, Subscription> &subscribers,
@@ -1025,8 +1032,7 @@ RetainedMessageNode *RetainedMessageNode::getChildren(const std::string &amp;subtopi @@ -1025,8 +1032,7 @@ RetainedMessageNode *RetainedMessageNode::getChildren(const std::string &amp;subtopi
1025 1032
1026 QueuedWill::QueuedWill(const std::shared_ptr<WillPublish> &will, const std::shared_ptr<Session> &session) : 1033 QueuedWill::QueuedWill(const std::shared_ptr<WillPublish> &will, const std::shared_ptr<Session> &session) :
1027 will(will), 1034 will(will),
1028 - session(session),  
1029 - sendAt(std::chrono::steady_clock::now() + std::chrono::seconds(will->will_delay)) 1035 + session(session)
1030 { 1036 {
1031 1037
1032 } 1038 }
@@ -1036,23 +1042,9 @@ const std::weak_ptr&lt;WillPublish&gt; &amp;QueuedWill::getWill() const @@ -1036,23 +1042,9 @@ const std::weak_ptr&lt;WillPublish&gt; &amp;QueuedWill::getWill() const
1036 return this->will; 1042 return this->will;
1037 } 1043 }
1038 1044
1039 -std::chrono::time_point<std::chrono::steady_clock> QueuedWill::getSendAt() const  
1040 -{  
1041 - return this->sendAt;  
1042 -}  
1043 -  
1044 std::shared_ptr<Session> QueuedWill::getSession() 1045 std::shared_ptr<Session> QueuedWill::getSession()
1045 { 1046 {
1046 return this->session.lock(); 1047 return this->session.lock();
1047 } 1048 }
1048 1049
1049 -bool willDelayCompare(const std::shared_ptr<WillPublish> &a, const QueuedWill &b)  
1050 -{  
1051 - std::shared_ptr<WillPublish> _b = b.getWill().lock();  
1052 -  
1053 - if (!_b)  
1054 - return true;  
1055 -  
1056 - return a->will_delay < _b->will_delay;  
1057 -};  
1058 1050
subscriptionstore.h
@@ -90,13 +90,11 @@ class QueuedWill @@ -90,13 +90,11 @@ class QueuedWill
90 { 90 {
91 std::weak_ptr<WillPublish> will; 91 std::weak_ptr<WillPublish> will;
92 std::weak_ptr<Session> session; 92 std::weak_ptr<Session> session;
93 - std::chrono::time_point<std::chrono::steady_clock> sendAt;  
94 93
95 public: 94 public:
96 QueuedWill(const std::shared_ptr<WillPublish> &will, const std::shared_ptr<Session> &session); 95 QueuedWill(const std::shared_ptr<WillPublish> &will, const std::shared_ptr<Session> &session);
97 96
98 const std::weak_ptr<WillPublish> &getWill() const; 97 const std::weak_ptr<WillPublish> &getWill() const;
99 - std::chrono::time_point<std::chrono::steady_clock> getSendAt() const;  
100 std::shared_ptr<Session> getSession(); 98 std::shared_ptr<Session> getSession();
101 }; 99 };
102 100
@@ -121,7 +119,7 @@ class SubscriptionStore @@ -121,7 +119,7 @@ class SubscriptionStore
121 int64_t retainedMessageCount = 0; 119 int64_t retainedMessageCount = 0;
122 120
123 std::mutex pendingWillsMutex; 121 std::mutex pendingWillsMutex;
124 - std::list<QueuedWill> pendingWillMessages; 122 + std::map<std::chrono::time_point<std::chrono::steady_clock>, std::vector<QueuedWill>> pendingWillMessages;
125 123
126 std::chrono::time_point<std::chrono::steady_clock> lastTreeCleanup; 124 std::chrono::time_point<std::chrono::steady_clock> lastTreeCleanup;
127 125
@@ -173,6 +171,4 @@ public: @@ -173,6 +171,4 @@ public:
173 void queueSessionRemoval(const std::shared_ptr<Session> &session); 171 void queueSessionRemoval(const std::shared_ptr<Session> &session);
174 }; 172 };
175 173
176 -bool willDelayCompare(const std::shared_ptr<WillPublish> &a, const QueuedWill &b);  
177 -  
178 #endif // SUBSCRIPTIONSTORE_H 174 #endif // SUBSCRIPTIONSTORE_H