diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index e0907da..a5e85b0 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -288,8 +288,6 @@ std::shared_ptr SubscriptionStore::lockSession(const std::string &clien /** * @brief SubscriptionStore::sendQueuedWillMessages sends queued will messages. * - * The list of pendingWillMessages is sorted. This allows for fast insertion and dequeueing of wills that have expired. - * * The expiry interval as set in the properties of the will message is not used to check for expiration here. To * quote the specs: "If present, the Four Byte value is the lifetime of the Will Message in seconds and is sent as * the Publication Expiry Interval when the Server publishes the Will Message." @@ -305,39 +303,48 @@ void SubscriptionStore::sendQueuedWillMessages() auto it = pendingWillMessages.begin(); while (it != pendingWillMessages.end()) { - QueuedWill &qw = *it; + const std::chrono::time_point &sendAt = it->first; - std::shared_ptr p = qw.getWill().lock(); - if (p) - { - if (qw.getSendAt() > now) - break; + if (sendAt > now) + break; - std::shared_ptr s = qw.getSession(); + std::vector &willsOfSlot = it->second; + + for(QueuedWill &will : willsOfSlot) + { + std::shared_ptr p = will.getWill().lock(); - if (!s || s->hasActiveClient()) + // If sessions get a new will, or the will is cleared from a new connecting client, this entry + // will be null and we can ignore it. + if (p) { - it = pendingWillMessages.erase(it); - continue; - } + std::shared_ptr s = will.getSession(); - logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() ); - PublishCopyFactory factory(p.get()); - queuePacketAtSubscribers(factory); + // Check for stale wills, or sessions that have become active again. + if (s && !s->hasActiveClient()) + { + logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() ); + PublishCopyFactory factory(p.get()); + queuePacketAtSubscribers(factory); - if (p->retain) - setRetainedMessage(*p.get(), (*p.get()).subtopics); + if (p->retain) + setRetainedMessage(*p, p->subtopics); - s->clearWill(); + s->clearWill(); + } + } } it = pendingWillMessages.erase(it); } } /** - * @brief SubscriptionStore::queueWillMessage queues the will message by bin-searching its place in the sorted list. + * @brief SubscriptionStore::queueWillMessage queues the will message in a sorted map. * @param willMessage * @param forceNow + * + * The queued will is only valid for that time. Should a new will be placed in the map for a session, the original shared_ptr + * will be cleared and the previously queued entry is void (but still there, so it needs to be checked). */ void SubscriptionStore::queueWillMessage(const std::shared_ptr &willMessage, const std::shared_ptr &session, bool forceNow) { @@ -365,10 +372,10 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr &wil willMessage->setQueuedAt(); QueuedWill queuedWill(willMessage, session); + const std::chrono::time_point sendWillAt = std::chrono::steady_clock::now() + std::chrono::seconds(willMessage->will_delay); std::lock_guard locker(this->pendingWillsMutex); - auto pos = std::upper_bound(this->pendingWillMessages.begin(), this->pendingWillMessages.end(), willMessage, willDelayCompare); - this->pendingWillMessages.insert(pos, queuedWill); + this->pendingWillMessages[sendWillAt].push_back(queuedWill); } void SubscriptionStore::publishNonRecursively(const std::unordered_map &subscribers, @@ -1025,8 +1032,7 @@ RetainedMessageNode *RetainedMessageNode::getChildren(const std::string &subtopi QueuedWill::QueuedWill(const std::shared_ptr &will, const std::shared_ptr &session) : will(will), - session(session), - sendAt(std::chrono::steady_clock::now() + std::chrono::seconds(will->will_delay)) + session(session) { } @@ -1036,23 +1042,9 @@ const std::weak_ptr &QueuedWill::getWill() const return this->will; } -std::chrono::time_point QueuedWill::getSendAt() const -{ - return this->sendAt; -} - std::shared_ptr QueuedWill::getSession() { return this->session.lock(); } -bool willDelayCompare(const std::shared_ptr &a, const QueuedWill &b) -{ - std::shared_ptr _b = b.getWill().lock(); - - if (!_b) - return true; - - return a->will_delay < _b->will_delay; -}; diff --git a/subscriptionstore.h b/subscriptionstore.h index fb88ab6..0ce5651 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -90,13 +90,11 @@ class QueuedWill { std::weak_ptr will; std::weak_ptr session; - std::chrono::time_point sendAt; public: QueuedWill(const std::shared_ptr &will, const std::shared_ptr &session); const std::weak_ptr &getWill() const; - std::chrono::time_point getSendAt() const; std::shared_ptr getSession(); }; @@ -121,7 +119,7 @@ class SubscriptionStore int64_t retainedMessageCount = 0; std::mutex pendingWillsMutex; - std::list pendingWillMessages; + std::map, std::vector> pendingWillMessages; std::chrono::time_point lastTreeCleanup; @@ -173,6 +171,4 @@ public: void queueSessionRemoval(const std::shared_ptr &session); }; -bool willDelayCompare(const std::shared_ptr &a, const QueuedWill &b); - #endif // SUBSCRIPTIONSTORE_H