From 03fd22bd47dd54ffb2e3b2127c585f20c7099f4b Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Thu, 7 Apr 2022 19:53:56 +0200 Subject: [PATCH] Check for publish message expiration --- qospacketqueue.cpp | 5 +++++ qospacketqueue.h | 1 + session.cpp | 34 +++++++++++++++++++++++++++------- subscriptionstore.cpp | 14 ++++++++++++++ types.cpp | 6 ++++++ types.h | 1 + 6 files changed, 54 insertions(+), 7 deletions(-) diff --git a/qospacketqueue.cpp b/qospacketqueue.cpp index 21fe206..5a21343 100644 --- a/qospacketqueue.cpp +++ b/qospacketqueue.cpp @@ -51,6 +51,11 @@ void QoSPublishQueue::erase(const uint16_t packet_id) } } +std::list::const_iterator QoSPublishQueue::erase(std::list::const_iterator pos) +{ + return this->queue.erase(pos); +} + size_t QoSPublishQueue::size() const { return queue.size(); diff --git a/qospacketqueue.h b/qospacketqueue.h index df950bd..30d7805 100644 --- a/qospacketqueue.h +++ b/qospacketqueue.h @@ -31,6 +31,7 @@ class QoSPublishQueue public: void erase(const uint16_t packet_id); + std::list::const_iterator erase(std::list::const_iterator pos); size_t size() const; size_t getByteSize() const; void queuePublish(PublishCopyFactory ©Factory, uint16_t id, char new_max_qos); diff --git a/session.cpp b/session.cpp index 842e69b..edd50bb 100644 --- a/session.cpp +++ b/session.cpp @@ -210,12 +210,22 @@ void Session::clearQosMessage(uint16_t packet_id) } } -// [MQTT-4.4.0-1]: "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST re-send any -// unacknowledged PUBLISH Packets (where QoS > 0) and PUBREL Packets using their original Packet Identifiers. This -// is the only circumstance where a Client or Server is REQUIRED to redeliver messages." -// -// There is a bit of a hole there, I think. When we write out a packet to a receiver, it may decide to drop it, if its buffers -// are full, for instance. We are not required to (periodically) retry. TODO Perhaps I will implement that retry anyway. + +/** + * @brief Session::sendPendingQosMessages sends pending publishes and QoS2 control packets. + * @return the amount of messages/packets published. + * + * [MQTT-4.4.0-1] (about MQTT 3.1.1): "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST + * re-send any unacknowledged PUBLISH Packets (where QoS > 0) and PUBREL Packets using their original Packet Identifiers. This + * is the only circumstance where a Client or Server is REQUIRED to redeliver messages." + * + * Only MQTT 3.1 requires retransmission. MQTT 3.1.1 and MQTT 5 only send on reconnect. At time of writing this comment, + * FlashMQ doesn't have a retransmission system. I don't think I want to implement one for the sake of 3.1 compliance, + * because it's just not that great an idea in terms of server load and quality of modern TCP. However, receiving clients + * can still decide to drop packets, like when their buffers are full. The clients from where the packet originates will + * never know that, because IT will have received the PUBACK from FlashMQ. The QoS system is not between publisher + * and subscriber. Users are required to implement something themselves. + */ uint64_t Session::sendPendingQosMessages() { uint64_t count = 0; @@ -224,8 +234,18 @@ uint64_t Session::sendPendingQosMessages() if (c) { std::lock_guard locker(qosQueueMutex); - for (const QueuedPublish &queuedPublish : qosPacketQueue) + + auto pos = qosPacketQueue.begin(); + while (pos != qosPacketQueue.end()) { + const QueuedPublish &queuedPublish = *pos; + + if (queuedPublish.getPublish().hasExpired()) + { + pos = qosPacketQueue.erase(pos); + continue; + } + MqttPacket p(c->getProtocolVersion(), queuedPublish.getPublish()); p.setDuplicate(); diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 737382f..717eebd 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -272,6 +272,15 @@ std::shared_ptr SubscriptionStore::lockSession(const std::string &clien return std::shared_ptr(); } +/** + * @brief SubscriptionStore::sendQueuedWillMessages sends queued will messages. + * + * The list of pendingWillMessages is sorted. This allows for fast insertion and dequeueing of wills that have expired. + * + * The expiry interval as set in the properties of the will message is not used to check for expiration here. To + * quote the specs: "If present, the Four Byte value is the lifetime of the Will Message in seconds and is sent as + * the Publication Expiry Interval when the Server publishes the Will Message." + */ void SubscriptionStore::sendQueuedWillMessages() { const auto now = std::chrono::steady_clock::now(); @@ -294,6 +303,11 @@ void SubscriptionStore::sendQueuedWillMessages() } } +/** + * @brief SubscriptionStore::queueWillMessage queues the will message by bin-searching its place in the sorted list. + * @param willMessage + * @param forceNow + */ void SubscriptionStore::queueWillMessage(std::shared_ptr &willMessage, bool forceNow) { if (!willMessage) diff --git a/types.cpp b/types.cpp index 14bde2f..6f77687 100644 --- a/types.cpp +++ b/types.cpp @@ -170,6 +170,12 @@ bool PublishBase::hasUserProperties() const return this->propertyBuilder.operator bool() && this->propertyBuilder->getUserProperties().operator bool(); } +bool PublishBase::hasExpired() const +{ + const std::chrono::seconds age = std::chrono::duration_cast(std::chrono::steady_clock::now() - this->createdAt); + return (expiresAfter > age); +} + Publish::Publish(const Publish &other) : PublishBase(other) { diff --git a/types.h b/types.h index c5cf5e2..680143c 100644 --- a/types.h +++ b/types.h @@ -211,6 +211,7 @@ public: void setClientSpecificProperties(); void constructPropertyBuilder(); bool hasUserProperties() const; + bool hasExpired() const; }; class Publish : public PublishBase -- libgit2 0.21.4