Commit 9c61bff73ca87278ad04af6c5620ecf9fe21ee2a

Authored by Wiebe Cazemier
1 parent 794a28bb

Fix various will things

- Delay is properly counted from moment of disconnect.
- When a session is picked up again, the will is not sent.
- An actual fix to what I tried a few commits ago: fix sending will
  twice.

This logic should make storing wills also easier.
client.cpp
@@ -55,12 +55,18 @@ Client::~Client() @@ -55,12 +55,18 @@ Client::~Client()
55 if (!this->threadData) 55 if (!this->threadData)
56 return; 56 return;
57 57
58 - std::shared_ptr<SubscriptionStore> &store = this->threadData->getSubscriptionStore();  
59 -  
60 if (disconnectReason.empty()) 58 if (disconnectReason.empty())
61 disconnectReason = "not specified"; 59 disconnectReason = "not specified";
62 60
63 logger->logf(LOG_NOTICE, "Removing client '%s'. Reason(s): %s", repr().c_str(), disconnectReason.c_str()); 61 logger->logf(LOG_NOTICE, "Removing client '%s'. Reason(s): %s", repr().c_str(), disconnectReason.c_str());
  62 +
  63 + std::shared_ptr<SubscriptionStore> &store = this->threadData->getSubscriptionStore();
  64 +
  65 + if (willPublish)
  66 + {
  67 + store->queueWillMessage(willPublish, session);
  68 + }
  69 +
64 if (fd > 0) // this check is essentially for testing, when working with a dummy fd. 70 if (fd > 0) // this check is essentially for testing, when working with a dummy fd.
65 { 71 {
66 if (epoll_ctl(threadData->epollfd, EPOLL_CTL_DEL, fd, NULL) != 0) 72 if (epoll_ctl(threadData->epollfd, EPOLL_CTL_DEL, fd, NULL) != 0)
forward_declarations.h
@@ -27,6 +27,7 @@ class SubscriptionStore; @@ -27,6 +27,7 @@ class SubscriptionStore;
27 class Session; 27 class Session;
28 class Settings; 28 class Settings;
29 class Mqtt5PropertyBuilder; 29 class Mqtt5PropertyBuilder;
  30 +class SessionsAndSubscriptionsDB;
30 31
31 32
32 #endif // FORWARD_DECLARATIONS_H 33 #endif // FORWARD_DECLARATIONS_H
mqttpacket.cpp
@@ -447,7 +447,6 @@ void MqttPacket::handleConnect() @@ -447,7 +447,6 @@ void MqttPacket::handleConnect()
447 { 447 {
448 case Mqtt5Properties::WillDelayInterval: 448 case Mqtt5Properties::WillDelayInterval:
449 willpublish.will_delay = readFourBytesToUint32(); 449 willpublish.will_delay = readFourBytesToUint32();
450 - willpublish.setCreatedAt(std::chrono::steady_clock::now());  
451 break; 450 break;
452 case Mqtt5Properties::PayloadFormatIndicator: 451 case Mqtt5Properties::PayloadFormatIndicator:
453 willpublish.propertyBuilder->writePayloadFormatIndicator(readByte()); 452 willpublish.propertyBuilder->writePayloadFormatIndicator(readByte());
sessionsandsubscriptionsdb.cpp
@@ -130,7 +130,7 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -130,7 +130,7 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
130 Publish pub(pack.getPublishData()); 130 Publish pub(pack.getPublishData());
131 131
132 const uint32_t newPubAge = persistence_state_age + originalPubAge; 132 const uint32_t newPubAge = persistence_state_age + originalPubAge;
133 - pub.setCreatedAt(timepointFromAge(newPubAge)); 133 + pub.createdAt = timepointFromAge(newPubAge);
134 134
135 logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s' for session '%s'.", pub.qos, pub.topic.c_str(), ses->getClientId().c_str()); 135 logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s' for session '%s'.", pub.qos, pub.topic.c_str(), ses->getClientId().c_str());
136 ses->qosPacketQueue.queuePublish(std::move(pub), id); 136 ses->qosPacketQueue.queuePublish(std::move(pub), id);
subscriptionstore.cpp
@@ -280,6 +280,9 @@ std::shared_ptr&lt;Session&gt; SubscriptionStore::lockSession(const std::string &amp;clien @@ -280,6 +280,9 @@ std::shared_ptr&lt;Session&gt; SubscriptionStore::lockSession(const std::string &amp;clien
280 * The expiry interval as set in the properties of the will message is not used to check for expiration here. To 280 * The expiry interval as set in the properties of the will message is not used to check for expiration here. To
281 * quote the specs: "If present, the Four Byte value is the lifetime of the Will Message in seconds and is sent as 281 * quote the specs: "If present, the Four Byte value is the lifetime of the Will Message in seconds and is sent as
282 * the Publication Expiry Interval when the Server publishes the Will Message." 282 * the Publication Expiry Interval when the Server publishes the Will Message."
  283 + *
  284 + * If a new Network Connection to this Session is made before the Will Delay Interval has passed, the Server
  285 + * MUST NOT send the Will Message [MQTT-3.1.3-9].
283 */ 286 */
284 void SubscriptionStore::sendQueuedWillMessages() 287 void SubscriptionStore::sendQueuedWillMessages()
285 { 288 {
@@ -289,15 +292,27 @@ void SubscriptionStore::sendQueuedWillMessages() @@ -289,15 +292,27 @@ void SubscriptionStore::sendQueuedWillMessages()
289 auto it = pendingWillMessages.begin(); 292 auto it = pendingWillMessages.begin();
290 while (it != pendingWillMessages.end()) 293 while (it != pendingWillMessages.end())
291 { 294 {
292 - std::shared_ptr<Publish> p = (*it).lock(); 295 + QueuedWill &qw = *it;
  296 +
  297 + std::shared_ptr<Publish> p = qw.getWill().lock();
293 if (p) 298 if (p)
294 { 299 {
295 - if (p->getCreatedAt() + std::chrono::seconds(p->will_delay) > now) 300 + if (qw.getSendAt() > now)
296 break; 301 break;
297 302
  303 + std::shared_ptr<Session> s = qw.getSession();
  304 +
  305 + if (!s || s->hasActiveClient())
  306 + {
  307 + it = pendingWillMessages.erase(it);
  308 + continue;
  309 + }
  310 +
298 logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() ); 311 logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() );
299 PublishCopyFactory factory(p.get()); 312 PublishCopyFactory factory(p.get());
300 queuePacketAtSubscribers(factory); 313 queuePacketAtSubscribers(factory);
  314 +
  315 + s->clearWill();
301 } 316 }
302 it = pendingWillMessages.erase(it); 317 it = pendingWillMessages.erase(it);
303 } 318 }
@@ -308,7 +323,7 @@ void SubscriptionStore::sendQueuedWillMessages() @@ -308,7 +323,7 @@ void SubscriptionStore::sendQueuedWillMessages()
308 * @param willMessage 323 * @param willMessage
309 * @param forceNow 324 * @param forceNow
310 */ 325 */
311 -void SubscriptionStore::queueWillMessage(std::shared_ptr<Publish> &willMessage, bool forceNow) 326 +void SubscriptionStore::queueWillMessage(const std::shared_ptr<Publish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow)
312 { 327 {
313 if (!willMessage) 328 if (!willMessage)
314 return; 329 return;
@@ -320,12 +335,19 @@ void SubscriptionStore::queueWillMessage(std::shared_ptr&lt;Publish&gt; &amp;willMessage, @@ -320,12 +335,19 @@ void SubscriptionStore::queueWillMessage(std::shared_ptr&lt;Publish&gt; &amp;willMessage,
320 { 335 {
321 PublishCopyFactory factory(willMessage.get()); 336 PublishCopyFactory factory(willMessage.get());
322 queuePacketAtSubscribers(factory); 337 queuePacketAtSubscribers(factory);
  338 +
  339 + // Avoid sending two immediate wills when a session is destroyed with the client disconnect.
  340 + if (session) // session is null when you're destroying a client before a session is assigned.
  341 + session->clearWill();
  342 +
323 return; 343 return;
324 } 344 }
325 345
  346 + QueuedWill queuedWill(willMessage, session);
  347 +
326 std::lock_guard<std::mutex>(this->pendingWillsMutex); 348 std::lock_guard<std::mutex>(this->pendingWillsMutex);
327 - auto pos = std::upper_bound(this->pendingWillMessages.begin(), this->pendingWillMessages.end(), willMessage, WillDelayCompare);  
328 - this->pendingWillMessages.insert(pos, willMessage); 349 + auto pos = std::upper_bound(this->pendingWillMessages.begin(), this->pendingWillMessages.end(), willMessage, willDelayCompare);
  350 + this->pendingWillMessages.insert(pos, queuedWill);
329 } 351 }
330 352
331 void SubscriptionStore::publishNonRecursively(const std::unordered_map<std::string, Subscription> &subscribers, 353 void SubscriptionStore::publishNonRecursively(const std::unordered_map<std::string, Subscription> &subscribers,
@@ -588,7 +610,7 @@ void SubscriptionStore::removeSession(const std::shared_ptr&lt;Session&gt; &amp;session) @@ -588,7 +610,7 @@ void SubscriptionStore::removeSession(const std::shared_ptr&lt;Session&gt; &amp;session)
588 std::shared_ptr<Publish> &will = session->getWill(); 610 std::shared_ptr<Publish> &will = session->getWill();
589 if (will) 611 if (will)
590 { 612 {
591 - queueWillMessage(will, true); 613 + queueWillMessage(will, session, true);
592 } 614 }
593 615
594 RWLockGuard lock_guard(&subscriptionsRwlock); 616 RWLockGuard lock_guard(&subscriptionsRwlock);
@@ -984,3 +1006,36 @@ std::shared_ptr&lt;Session&gt; QueuedSessionRemoval::getSession() const @@ -984,3 +1006,36 @@ std::shared_ptr&lt;Session&gt; QueuedSessionRemoval::getSession() const
984 { 1006 {
985 return session.lock(); 1007 return session.lock();
986 } 1008 }
  1009 +
  1010 +QueuedWill::QueuedWill(const std::shared_ptr<Publish> &will, const std::shared_ptr<Session> &session) :
  1011 + will(will),
  1012 + session(session),
  1013 + sendAt(std::chrono::steady_clock::now() + std::chrono::seconds(will->will_delay))
  1014 +{
  1015 +
  1016 +}
  1017 +
  1018 +const std::weak_ptr<Publish> &QueuedWill::getWill() const
  1019 +{
  1020 + return this->will;
  1021 +}
  1022 +
  1023 +std::chrono::time_point<std::chrono::steady_clock> QueuedWill::getSendAt() const
  1024 +{
  1025 + return this->sendAt;
  1026 +}
  1027 +
  1028 +std::shared_ptr<Session> QueuedWill::getSession()
  1029 +{
  1030 + return this->session.lock();
  1031 +}
  1032 +
  1033 +bool willDelayCompare(const std::shared_ptr<Publish> &a, const QueuedWill &b)
  1034 +{
  1035 + std::shared_ptr<Publish> _b = b.getWill().lock();
  1036 +
  1037 + if (!_b)
  1038 + return true;
  1039 +
  1040 + return a->will_delay < _b->will_delay;
  1041 +};
subscriptionstore.h
@@ -101,6 +101,20 @@ public: @@ -101,6 +101,20 @@ public:
101 std::shared_ptr<Session> getSession() const; 101 std::shared_ptr<Session> getSession() const;
102 }; 102 };
103 103
  104 +class QueuedWill
  105 +{
  106 + std::weak_ptr<Publish> will;
  107 + std::weak_ptr<Session> session;
  108 + std::chrono::time_point<std::chrono::steady_clock> sendAt;
  109 +
  110 +public:
  111 + QueuedWill(const std::shared_ptr<Publish> &will, const std::shared_ptr<Session> &session);
  112 +
  113 + const std::weak_ptr<Publish> &getWill() const;
  114 + std::chrono::time_point<std::chrono::steady_clock> getSendAt() const;
  115 + std::shared_ptr<Session> getSession();
  116 +};
  117 +
104 class SubscriptionStore 118 class SubscriptionStore
105 { 119 {
106 #ifdef TESTING 120 #ifdef TESTING
@@ -122,7 +136,7 @@ class SubscriptionStore @@ -122,7 +136,7 @@ class SubscriptionStore
122 int64_t retainedMessageCount = 0; 136 int64_t retainedMessageCount = 0;
123 137
124 std::mutex pendingWillsMutex; 138 std::mutex pendingWillsMutex;
125 - std::list<std::weak_ptr<Publish>> pendingWillMessages; 139 + std::list<QueuedWill> pendingWillMessages;
126 140
127 std::chrono::time_point<std::chrono::steady_clock> lastTreeCleanup; 141 std::chrono::time_point<std::chrono::steady_clock> lastTreeCleanup;
128 142
@@ -151,7 +165,7 @@ public: @@ -151,7 +165,7 @@ public:
151 std::shared_ptr<Session> lockSession(const std::string &clientid); 165 std::shared_ptr<Session> lockSession(const std::string &clientid);
152 166
153 void sendQueuedWillMessages(); 167 void sendQueuedWillMessages();
154 - void queueWillMessage(std::shared_ptr<Publish> &willMessage, bool forceNow = false); 168 + void queueWillMessage(const std::shared_ptr<Publish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow = false);
155 void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, bool dollar = false); 169 void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, bool dollar = false);
156 uint64_t giveClientRetainedMessages(const std::shared_ptr<Client> &client, const std::shared_ptr<Session> &ses, 170 uint64_t giveClientRetainedMessages(const std::shared_ptr<Client> &client, const std::shared_ptr<Session> &ses,
157 const std::vector<std::string> &subscribeSubtopics, char max_qos); 171 const std::vector<std::string> &subscribeSubtopics, char max_qos);
@@ -174,4 +188,6 @@ public: @@ -174,4 +188,6 @@ public:
174 void queueSessionRemoval(const std::shared_ptr<Session> &session); 188 void queueSessionRemoval(const std::shared_ptr<Session> &session);
175 }; 189 };
176 190
  191 +bool willDelayCompare(const std::shared_ptr<Publish> &a, const QueuedWill &b);
  192 +
177 #endif // SUBSCRIPTIONSTORE_H 193 #endif // SUBSCRIPTIONSTORE_H
types.cpp
@@ -187,11 +187,6 @@ bool PublishBase::hasExpired() const @@ -187,11 +187,6 @@ bool PublishBase::hasExpired() const
187 return (expiresAfter > age); 187 return (expiresAfter > age);
188 } 188 }
189 189
190 -void PublishBase::setCreatedAt(std::chrono::time_point<std::chrono::steady_clock> t)  
191 -{  
192 - this->createdAt = t;  
193 -}  
194 -  
195 void PublishBase::setExpireAfter(uint32_t s) 190 void PublishBase::setExpireAfter(uint32_t s)
196 { 191 {
197 this->createdAt = std::chrono::steady_clock::now(); 192 this->createdAt = std::chrono::steady_clock::now();
@@ -221,16 +216,6 @@ Publish::Publish(const std::string &amp;topic, const std::string &amp;payload, char qos) @@ -221,16 +216,6 @@ Publish::Publish(const std::string &amp;topic, const std::string &amp;payload, char qos)
221 216
222 } 217 }
223 218
224 -bool WillDelayCompare(const std::shared_ptr<Publish> &a, const std::weak_ptr<Publish> &b)  
225 -{  
226 - std::shared_ptr<Publish> _b = b.lock();  
227 -  
228 - if (!_b)  
229 - return true;  
230 -  
231 - return a->will_delay < _b->will_delay;  
232 -};  
233 -  
234 PubResponse::PubResponse(const ProtocolVersion protVersion, const PacketType packet_type, ReasonCodes reason_code, uint16_t packet_id) : 219 PubResponse::PubResponse(const ProtocolVersion protVersion, const PacketType packet_type, ReasonCodes reason_code, uint16_t packet_id) :
235 packet_type(packet_type), 220 packet_type(packet_type),
236 protocol_version(protVersion), 221 protocol_version(protVersion),
@@ -192,6 +192,8 @@ public: @@ -192,6 +192,8 @@ public:
192 */ 192 */
193 class PublishBase 193 class PublishBase
194 { 194 {
  195 + friend class SessionsAndSubscriptionsDB;
  196 +
195 bool hasExpireInfo = false; 197 bool hasExpireInfo = false;
196 std::chrono::time_point<std::chrono::steady_clock> createdAt; 198 std::chrono::time_point<std::chrono::steady_clock> createdAt;
197 std::chrono::seconds expiresAfter; 199 std::chrono::seconds expiresAfter;
@@ -201,7 +203,7 @@ public: @@ -201,7 +203,7 @@ public:
201 std::string payload; 203 std::string payload;
202 char qos = 0; 204 char qos = 0;
203 bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9] 205 bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9]
204 - uint32_t will_delay = 0; // if will, this is the delay. Just storing here, to avoid having to make a WillMessage class 206 + uint32_t will_delay = 0; // if will, this is the delay.
205 bool splitTopic = true; 207 bool splitTopic = true;
206 uint16_t topicAlias = 0; 208 uint16_t topicAlias = 0;
207 bool skipTopic = false; 209 bool skipTopic = false;
@@ -216,7 +218,6 @@ public: @@ -216,7 +218,6 @@ public:
216 bool hasUserProperties() const; 218 bool hasUserProperties() const;
217 bool hasExpired() const; 219 bool hasExpired() const;
218 220
219 - void setCreatedAt(std::chrono::time_point<std::chrono::steady_clock> t);  
220 void setExpireAfter(uint32_t s); 221 void setExpireAfter(uint32_t s);
221 bool getHasExpireInfo() const; 222 bool getHasExpireInfo() const;
222 const std::chrono::time_point<std::chrono::steady_clock> getCreatedAt() const; 223 const std::chrono::time_point<std::chrono::steady_clock> getCreatedAt() const;
@@ -232,8 +233,6 @@ public: @@ -232,8 +233,6 @@ public:
232 Publish(const std::string &topic, const std::string &payload, char qos); 233 Publish(const std::string &topic, const std::string &payload, char qos);
233 }; 234 };
234 235
235 -bool WillDelayCompare(const std::shared_ptr<Publish> &a, const std::weak_ptr<Publish> &b);  
236 -  
237 class PubResponse 236 class PubResponse
238 { 237 {
239 public: 238 public: