Commit 03fd22bd47dd54ffb2e3b2127c585f20c7099f4b

Authored by Wiebe Cazemier
1 parent 34929680

Check for publish message expiration

qospacketqueue.cpp
@@ -51,6 +51,11 @@ void QoSPublishQueue::erase(const uint16_t packet_id) @@ -51,6 +51,11 @@ void QoSPublishQueue::erase(const uint16_t packet_id)
51 } 51 }
52 } 52 }
53 53
  54 +std::list<QueuedPublish>::const_iterator QoSPublishQueue::erase(std::list<QueuedPublish>::const_iterator pos)
  55 +{
  56 + return this->queue.erase(pos);
  57 +}
  58 +
54 size_t QoSPublishQueue::size() const 59 size_t QoSPublishQueue::size() const
55 { 60 {
56 return queue.size(); 61 return queue.size();
qospacketqueue.h
@@ -31,6 +31,7 @@ class QoSPublishQueue @@ -31,6 +31,7 @@ class QoSPublishQueue
31 31
32 public: 32 public:
33 void erase(const uint16_t packet_id); 33 void erase(const uint16_t packet_id);
  34 + std::list<QueuedPublish>::const_iterator erase(std::list<QueuedPublish>::const_iterator pos);
34 size_t size() const; 35 size_t size() const;
35 size_t getByteSize() const; 36 size_t getByteSize() const;
36 void queuePublish(PublishCopyFactory &copyFactory, uint16_t id, char new_max_qos); 37 void queuePublish(PublishCopyFactory &copyFactory, uint16_t id, char new_max_qos);
session.cpp
@@ -210,12 +210,22 @@ void Session::clearQosMessage(uint16_t packet_id) @@ -210,12 +210,22 @@ void Session::clearQosMessage(uint16_t packet_id)
210 } 210 }
211 } 211 }
212 212
213 -// [MQTT-4.4.0-1]: "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST re-send any  
214 -// unacknowledged PUBLISH Packets (where QoS > 0) and PUBREL Packets using their original Packet Identifiers. This  
215 -// is the only circumstance where a Client or Server is REQUIRED to redeliver messages."  
216 -//  
217 -// There is a bit of a hole there, I think. When we write out a packet to a receiver, it may decide to drop it, if its buffers  
218 -// are full, for instance. We are not required to (periodically) retry. TODO Perhaps I will implement that retry anyway. 213 +
  214 +/**
  215 + * @brief Session::sendPendingQosMessages sends pending publishes and QoS2 control packets.
  216 + * @return the amount of messages/packets published.
  217 + *
  218 + * [MQTT-4.4.0-1] (about MQTT 3.1.1): "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST
  219 + * re-send any unacknowledged PUBLISH Packets (where QoS > 0) and PUBREL Packets using their original Packet Identifiers. This
  220 + * is the only circumstance where a Client or Server is REQUIRED to redeliver messages."
  221 + *
  222 + * Only MQTT 3.1 requires retransmission. MQTT 3.1.1 and MQTT 5 only send on reconnect. At time of writing this comment,
  223 + * FlashMQ doesn't have a retransmission system. I don't think I want to implement one for the sake of 3.1 compliance,
  224 + * because it's just not that great an idea in terms of server load and quality of modern TCP. However, receiving clients
  225 + * can still decide to drop packets, like when their buffers are full. The clients from where the packet originates will
  226 + * never know that, because IT will have received the PUBACK from FlashMQ. The QoS system is not between publisher
  227 + * and subscriber. Users are required to implement something themselves.
  228 + */
219 uint64_t Session::sendPendingQosMessages() 229 uint64_t Session::sendPendingQosMessages()
220 { 230 {
221 uint64_t count = 0; 231 uint64_t count = 0;
@@ -224,8 +234,18 @@ uint64_t Session::sendPendingQosMessages() @@ -224,8 +234,18 @@ uint64_t Session::sendPendingQosMessages()
224 if (c) 234 if (c)
225 { 235 {
226 std::lock_guard<std::mutex> locker(qosQueueMutex); 236 std::lock_guard<std::mutex> locker(qosQueueMutex);
227 - for (const QueuedPublish &queuedPublish : qosPacketQueue) 237 +
  238 + auto pos = qosPacketQueue.begin();
  239 + while (pos != qosPacketQueue.end())
228 { 240 {
  241 + const QueuedPublish &queuedPublish = *pos;
  242 +
  243 + if (queuedPublish.getPublish().hasExpired())
  244 + {
  245 + pos = qosPacketQueue.erase(pos);
  246 + continue;
  247 + }
  248 +
229 MqttPacket p(c->getProtocolVersion(), queuedPublish.getPublish()); 249 MqttPacket p(c->getProtocolVersion(), queuedPublish.getPublish());
230 p.setDuplicate(); 250 p.setDuplicate();
231 251
subscriptionstore.cpp
@@ -272,6 +272,15 @@ std::shared_ptr&lt;Session&gt; SubscriptionStore::lockSession(const std::string &amp;clien @@ -272,6 +272,15 @@ std::shared_ptr&lt;Session&gt; SubscriptionStore::lockSession(const std::string &amp;clien
272 return std::shared_ptr<Session>(); 272 return std::shared_ptr<Session>();
273 } 273 }
274 274
  275 +/**
  276 + * @brief SubscriptionStore::sendQueuedWillMessages sends queued will messages.
  277 + *
  278 + * The list of pendingWillMessages is sorted. This allows for fast insertion and dequeueing of wills that have expired.
  279 + *
  280 + * The expiry interval as set in the properties of the will message is not used to check for expiration here. To
  281 + * quote the specs: "If present, the Four Byte value is the lifetime of the Will Message in seconds and is sent as
  282 + * the Publication Expiry Interval when the Server publishes the Will Message."
  283 + */
275 void SubscriptionStore::sendQueuedWillMessages() 284 void SubscriptionStore::sendQueuedWillMessages()
276 { 285 {
277 const auto now = std::chrono::steady_clock::now(); 286 const auto now = std::chrono::steady_clock::now();
@@ -294,6 +303,11 @@ void SubscriptionStore::sendQueuedWillMessages() @@ -294,6 +303,11 @@ void SubscriptionStore::sendQueuedWillMessages()
294 } 303 }
295 } 304 }
296 305
  306 +/**
  307 + * @brief SubscriptionStore::queueWillMessage queues the will message by bin-searching its place in the sorted list.
  308 + * @param willMessage
  309 + * @param forceNow
  310 + */
297 void SubscriptionStore::queueWillMessage(std::shared_ptr<Publish> &willMessage, bool forceNow) 311 void SubscriptionStore::queueWillMessage(std::shared_ptr<Publish> &willMessage, bool forceNow)
298 { 312 {
299 if (!willMessage) 313 if (!willMessage)
types.cpp
@@ -170,6 +170,12 @@ bool PublishBase::hasUserProperties() const @@ -170,6 +170,12 @@ bool PublishBase::hasUserProperties() const
170 return this->propertyBuilder.operator bool() && this->propertyBuilder->getUserProperties().operator bool(); 170 return this->propertyBuilder.operator bool() && this->propertyBuilder->getUserProperties().operator bool();
171 } 171 }
172 172
  173 +bool PublishBase::hasExpired() const
  174 +{
  175 + const std::chrono::seconds age = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - this->createdAt);
  176 + return (expiresAfter > age);
  177 +}
  178 +
173 Publish::Publish(const Publish &other) : 179 Publish::Publish(const Publish &other) :
174 PublishBase(other) 180 PublishBase(other)
175 { 181 {
@@ -211,6 +211,7 @@ public: @@ -211,6 +211,7 @@ public:
211 void setClientSpecificProperties(); 211 void setClientSpecificProperties();
212 void constructPropertyBuilder(); 212 void constructPropertyBuilder();
213 bool hasUserProperties() const; 213 bool hasUserProperties() const;
  214 + bool hasExpired() const;
214 }; 215 };
215 216
216 class Publish : public PublishBase 217 class Publish : public PublishBase