Commit ebe7c845c4b537055ea543c08cc8a7aa8cbeb813

Authored by Wiebe Cazemier
1 parent 58ec60b2

Support saving wills into the session db

This required a special type WillPublish to make this easier and more
logical.
client.cpp
@@ -509,9 +509,9 @@ void Client::setClientProperties(ProtocolVersion protocolVersion, const std::str @@ -509,9 +509,9 @@ void Client::setClientProperties(ProtocolVersion protocolVersion, const std::str
509 this->maxOutgoingTopicAliasValue = maxOutgoingTopicAliasValue; 509 this->maxOutgoingTopicAliasValue = maxOutgoingTopicAliasValue;
510 } 510 }
511 511
512 -void Client::setWill(Publish &&willPublish) 512 +void Client::setWill(WillPublish &&willPublish)
513 { 513 {
514 - this->willPublish = std::make_shared<Publish>(std::move(willPublish)); 514 + this->willPublish = std::make_shared<WillPublish>(std::move(willPublish));
515 } 515 }
516 516
517 void Client::assignSession(std::shared_ptr<Session> &session) 517 void Client::assignSession(std::shared_ptr<Session> &session)
client.h
@@ -77,7 +77,7 @@ class Client @@ -77,7 +77,7 @@ class Client
77 std::string username; 77 std::string username;
78 uint16_t keepalive = 0; 78 uint16_t keepalive = 0;
79 79
80 - std::shared_ptr<Publish> willPublish; 80 + std::shared_ptr<WillPublish> willPublish;
81 81
82 std::shared_ptr<ThreadData> threadData; 82 std::shared_ptr<ThreadData> threadData;
83 std::mutex writeBufMutex; 83 std::mutex writeBufMutex;
@@ -115,7 +115,7 @@ public: @@ -115,7 +115,7 @@ public:
115 void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, 115 void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive,
116 uint32_t maxOutgoingPacketSize, uint16_t maxOutgoingTopicAliasValue); 116 uint32_t maxOutgoingPacketSize, uint16_t maxOutgoingTopicAliasValue);
117 void setWill(const std::string &topic, const std::string &payload, bool retain, char qos); 117 void setWill(const std::string &topic, const std::string &payload, bool retain, char qos);
118 - void setWill(Publish &&willPublish); 118 + void setWill(WillPublish &&willPublish);
119 void clearWill(); 119 void clearWill();
120 void setAuthenticated(bool value) { authenticated = value;} 120 void setAuthenticated(bool value) { authenticated = value;}
121 bool getAuthenticated() { return authenticated; } 121 bool getAuthenticated() { return authenticated; }
@@ -123,7 +123,7 @@ public: @@ -123,7 +123,7 @@ public:
123 std::shared_ptr<ThreadData> getThreadData() { return threadData; } 123 std::shared_ptr<ThreadData> getThreadData() { return threadData; }
124 std::string &getClientId() { return this->clientid; } 124 std::string &getClientId() { return this->clientid; }
125 const std::string &getUsername() const { return this->username; } 125 const std::string &getUsername() const { return this->username; }
126 - std::shared_ptr<Publish> &getWill() { return this->willPublish; } 126 + std::shared_ptr<WillPublish> &getWill() { return this->willPublish; }
127 void assignSession(std::shared_ptr<Session> &session); 127 void assignSession(std::shared_ptr<Session> &session);
128 std::shared_ptr<Session> getSession(); 128 std::shared_ptr<Session> getSession();
129 void setDisconnectReason(const std::string &reason); 129 void setDisconnectReason(const std::string &reason);
mqttpacket.cpp
@@ -423,7 +423,7 @@ void MqttPacket::handleConnect() @@ -423,7 +423,7 @@ void MqttPacket::handleConnect()
423 std::string username; 423 std::string username;
424 std::string password; 424 std::string password;
425 425
426 - Publish willpublish; 426 + WillPublish willpublish;
427 willpublish.qos = will_qos; 427 willpublish.qos = will_qos;
428 willpublish.retain = will_retain; 428 willpublish.retain = will_retain;
429 429
session.cpp
@@ -71,6 +71,7 @@ Session::Session(const Session &amp;other) @@ -71,6 +71,7 @@ Session::Session(const Session &amp;other)
71 this->outgoingQoS2MessageIds = other.outgoingQoS2MessageIds; 71 this->outgoingQoS2MessageIds = other.outgoingQoS2MessageIds;
72 this->nextPacketId = other.nextPacketId; 72 this->nextPacketId = other.nextPacketId;
73 this->sessionExpiryInterval = other.sessionExpiryInterval; 73 this->sessionExpiryInterval = other.sessionExpiryInterval;
  74 + this->willPublish = other.willPublish;
74 75
75 // TODO: see git history for a change here. We now copy the whole queued publish. Do we want to address that? 76 // TODO: see git history for a change here. We now copy the whole queued publish. Do we want to address that?
76 this->qosPacketQueue = other.qosPacketQueue; 77 this->qosPacketQueue = other.qosPacketQueue;
@@ -276,11 +277,16 @@ void Session::clearWill() @@ -276,11 +277,16 @@ void Session::clearWill()
276 this->willPublish.reset(); 277 this->willPublish.reset();
277 } 278 }
278 279
279 -std::shared_ptr<Publish> &Session::getWill() 280 +std::shared_ptr<WillPublish> &Session::getWill()
280 { 281 {
281 return this->willPublish; 282 return this->willPublish;
282 } 283 }
283 284
  285 +void Session::setWill(WillPublish &&pub)
  286 +{
  287 + this->willPublish = std::make_shared<WillPublish>(std::move(pub));
  288 +}
  289 +
284 void Session::addIncomingQoS2MessageId(uint16_t packet_id) 290 void Session::addIncomingQoS2MessageId(uint16_t packet_id)
285 { 291 {
286 assert(packet_id > 0); 292 assert(packet_id > 0);
session.h
@@ -50,7 +50,7 @@ class Session @@ -50,7 +50,7 @@ class Session
50 uint16_t maxQosMsgPending; 50 uint16_t maxQosMsgPending;
51 uint16_t QoSLogPrintedAtId = 0; 51 uint16_t QoSLogPrintedAtId = 0;
52 bool destroyOnDisconnect = false; 52 bool destroyOnDisconnect = false;
53 - std::shared_ptr<Publish> willPublish; 53 + std::shared_ptr<WillPublish> willPublish;
54 Logger *logger = Logger::getInstance(); 54 Logger *logger = Logger::getInstance();
55 55
56 bool requiresPacketRetransmission() const; 56 bool requiresPacketRetransmission() const;
@@ -73,7 +73,8 @@ public: @@ -73,7 +73,8 @@ public:
73 uint64_t sendPendingQosMessages(); 73 uint64_t sendPendingQosMessages();
74 bool hasActiveClient() const; 74 bool hasActiveClient() const;
75 void clearWill(); 75 void clearWill();
76 - std::shared_ptr<Publish> &getWill(); 76 + std::shared_ptr<WillPublish> &getWill();
  77 + void setWill(WillPublish &&pub);
77 78
78 void addIncomingQoS2MessageId(uint16_t packet_id); 79 void addIncomingQoS2MessageId(uint16_t packet_id);
79 bool incomingQoS2MessageIdInTransit(uint16_t packet_id); 80 bool incomingQoS2MessageIdInTransit(uint16_t packet_id);
sessionsandsubscriptionsdb.cpp
@@ -167,6 +167,32 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -167,6 +167,32 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
167 // it with a more relevant value. 167 // it with a more relevant value.
168 // The protocol version 5 is just dummy, to get the behavior I want. 168 // The protocol version 5 is just dummy, to get the behavior I want.
169 ses->setSessionProperties(maxQosPending, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5); 169 ses->setSessionProperties(maxQosPending, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5);
  170 +
  171 + const uint16_t hasWill = readUint16(eofFound);
  172 +
  173 + if (hasWill)
  174 + {
  175 + const uint16_t fixed_header_length = readUint16(eofFound);
  176 + const uint32_t originalWillDelay = readUint32(eofFound);
  177 + const uint32_t originalWillQueueAge = readUint32(eofFound);
  178 + const uint32_t newWillDelayAfterMaybeAlreadyBeingQueued = originalWillQueueAge < originalWillDelay ? originalWillDelay - originalWillQueueAge : 0;
  179 + const uint32_t packlen = readUint32(eofFound);
  180 +
  181 + const uint32_t stateAgecompensatedWillDelay =
  182 + persistence_state_age > newWillDelayAfterMaybeAlreadyBeingQueued ? 0 : newWillDelayAfterMaybeAlreadyBeingQueued - persistence_state_age;
  183 +
  184 + cirbuf.reset();
  185 + cirbuf.ensureFreeSpace(packlen + 32);
  186 +
  187 + readCheck(cirbuf.headPtr(), 1, packlen, f);
  188 + cirbuf.advanceHead(packlen);
  189 + MqttPacket publishpack(cirbuf, packlen, fixed_header_length, dummyClient);
  190 + publishpack.parsePublishData();
  191 + WillPublish willPublish = publishpack.getPublishData();
  192 + willPublish.will_delay = stateAgecompensatedWillDelay;
  193 +
  194 + ses->setWill(std::move(willPublish));
  195 + }
170 } 196 }
171 197
172 const uint32_t nrOfSubscriptions = readUint32(eofFound); 198 const uint32_t nrOfSubscriptions = readUint32(eofFound);
@@ -289,6 +315,30 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -289,6 +315,30 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
289 315
290 writeUint32(ses->sessionExpiryInterval); 316 writeUint32(ses->sessionExpiryInterval);
291 writeUint16(ses->maxQosMsgPending); 317 writeUint16(ses->maxQosMsgPending);
  318 +
  319 + const bool hasWillThatShouldSurviveRestart = ses->getWill().operator bool() && ses->getWill()->will_delay > 0;
  320 + writeUint16(static_cast<uint16_t>(hasWillThatShouldSurviveRestart));
  321 +
  322 + if (hasWillThatShouldSurviveRestart)
  323 + {
  324 + WillPublish &will = *ses->getWill().get();
  325 + MqttPacket willpacket(ProtocolVersion::Mqtt5, will);
  326 +
  327 + // Dummy, to please the parser on reading.
  328 + if (will.qos > 0)
  329 + willpacket.setPacketId(666);
  330 +
  331 + const uint32_t packSize = willpacket.getSizeIncludingNonPresentHeader();
  332 + cirbuf.reset();
  333 + cirbuf.ensureFreeSpace(packSize + 32);
  334 + willpacket.readIntoBuf(cirbuf);
  335 +
  336 + writeUint16(willpacket.getFixedHeaderLength());
  337 + writeUint32(will.will_delay);
  338 + writeUint32(will.getQueuedAtAge());
  339 + writeUint32(packSize);
  340 + writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f);
  341 + }
292 } 342 }
293 343
294 writeUint32(subscriptions.size()); 344 writeUint32(subscriptions.size());
subscriptionstore.cpp
@@ -326,7 +326,7 @@ void SubscriptionStore::sendQueuedWillMessages() @@ -326,7 +326,7 @@ void SubscriptionStore::sendQueuedWillMessages()
326 * @param willMessage 326 * @param willMessage
327 * @param forceNow 327 * @param forceNow
328 */ 328 */
329 -void SubscriptionStore::queueWillMessage(const std::shared_ptr<Publish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow) 329 +void SubscriptionStore::queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow)
330 { 330 {
331 if (!willMessage) 331 if (!willMessage)
332 return; 332 return;
@@ -349,6 +349,8 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr&lt;Publish&gt; &amp;willMes @@ -349,6 +349,8 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr&lt;Publish&gt; &amp;willMes
349 return; 349 return;
350 } 350 }
351 351
  352 + willMessage->setQueuedAt();
  353 +
352 QueuedWill queuedWill(willMessage, session); 354 QueuedWill queuedWill(willMessage, session);
353 355
354 std::lock_guard<std::mutex>(this->pendingWillsMutex); 356 std::lock_guard<std::mutex>(this->pendingWillsMutex);
@@ -613,7 +615,7 @@ void SubscriptionStore::removeSession(const std::shared_ptr&lt;Session&gt; &amp;session) @@ -613,7 +615,7 @@ void SubscriptionStore::removeSession(const std::shared_ptr&lt;Session&gt; &amp;session)
613 const std::string &clientid = session->getClientId(); 615 const std::string &clientid = session->getClientId();
614 logger->logf(LOG_DEBUG, "Removing session of client '%s'.", clientid.c_str()); 616 logger->logf(LOG_DEBUG, "Removing session of client '%s'.", clientid.c_str());
615 617
616 - std::shared_ptr<Publish> &will = session->getWill(); 618 + std::shared_ptr<WillPublish> &will = session->getWill();
617 if (will) 619 if (will)
618 { 620 {
619 queueWillMessage(will, session, true); 621 queueWillMessage(will, session, true);
@@ -905,6 +907,7 @@ void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &amp;filePath @@ -905,6 +907,7 @@ void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &amp;filePath
905 { 907 {
906 sessionsById[session->getClientId()] = session; 908 sessionsById[session->getClientId()] = session;
907 queueSessionRemoval(session); 909 queueSessionRemoval(session);
  910 + queueWillMessage(session->getWill(), session);
908 } 911 }
909 912
910 std::vector<std::string> subtopics; 913 std::vector<std::string> subtopics;
@@ -1013,7 +1016,7 @@ std::shared_ptr&lt;Session&gt; QueuedSessionRemoval::getSession() const @@ -1013,7 +1016,7 @@ std::shared_ptr&lt;Session&gt; QueuedSessionRemoval::getSession() const
1013 return session.lock(); 1016 return session.lock();
1014 } 1017 }
1015 1018
1016 -QueuedWill::QueuedWill(const std::shared_ptr<Publish> &will, const std::shared_ptr<Session> &session) : 1019 +QueuedWill::QueuedWill(const std::shared_ptr<WillPublish> &will, const std::shared_ptr<Session> &session) :
1017 will(will), 1020 will(will),
1018 session(session), 1021 session(session),
1019 sendAt(std::chrono::steady_clock::now() + std::chrono::seconds(will->will_delay)) 1022 sendAt(std::chrono::steady_clock::now() + std::chrono::seconds(will->will_delay))
@@ -1021,7 +1024,7 @@ QueuedWill::QueuedWill(const std::shared_ptr&lt;Publish&gt; &amp;will, const std::shared_p @@ -1021,7 +1024,7 @@ QueuedWill::QueuedWill(const std::shared_ptr&lt;Publish&gt; &amp;will, const std::shared_p
1021 1024
1022 } 1025 }
1023 1026
1024 -const std::weak_ptr<Publish> &QueuedWill::getWill() const 1027 +const std::weak_ptr<WillPublish> &QueuedWill::getWill() const
1025 { 1028 {
1026 return this->will; 1029 return this->will;
1027 } 1030 }
@@ -1036,9 +1039,9 @@ std::shared_ptr&lt;Session&gt; QueuedWill::getSession() @@ -1036,9 +1039,9 @@ std::shared_ptr&lt;Session&gt; QueuedWill::getSession()
1036 return this->session.lock(); 1039 return this->session.lock();
1037 } 1040 }
1038 1041
1039 -bool willDelayCompare(const std::shared_ptr<Publish> &a, const QueuedWill &b) 1042 +bool willDelayCompare(const std::shared_ptr<WillPublish> &a, const QueuedWill &b)
1040 { 1043 {
1041 - std::shared_ptr<Publish> _b = b.getWill().lock(); 1044 + std::shared_ptr<WillPublish> _b = b.getWill().lock();
1042 1045
1043 if (!_b) 1046 if (!_b)
1044 return true; 1047 return true;
subscriptionstore.h
@@ -103,14 +103,14 @@ public: @@ -103,14 +103,14 @@ public:
103 103
104 class QueuedWill 104 class QueuedWill
105 { 105 {
106 - std::weak_ptr<Publish> will; 106 + std::weak_ptr<WillPublish> will;
107 std::weak_ptr<Session> session; 107 std::weak_ptr<Session> session;
108 std::chrono::time_point<std::chrono::steady_clock> sendAt; 108 std::chrono::time_point<std::chrono::steady_clock> sendAt;
109 109
110 public: 110 public:
111 - QueuedWill(const std::shared_ptr<Publish> &will, const std::shared_ptr<Session> &session); 111 + QueuedWill(const std::shared_ptr<WillPublish> &will, const std::shared_ptr<Session> &session);
112 112
113 - const std::weak_ptr<Publish> &getWill() const; 113 + const std::weak_ptr<WillPublish> &getWill() const;
114 std::chrono::time_point<std::chrono::steady_clock> getSendAt() const; 114 std::chrono::time_point<std::chrono::steady_clock> getSendAt() const;
115 std::shared_ptr<Session> getSession(); 115 std::shared_ptr<Session> getSession();
116 }; 116 };
@@ -165,7 +165,7 @@ public: @@ -165,7 +165,7 @@ public:
165 std::shared_ptr<Session> lockSession(const std::string &clientid); 165 std::shared_ptr<Session> lockSession(const std::string &clientid);
166 166
167 void sendQueuedWillMessages(); 167 void sendQueuedWillMessages();
168 - void queueWillMessage(const std::shared_ptr<Publish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow = false); 168 + void queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow = false);
169 void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, bool dollar = false); 169 void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, bool dollar = false);
170 uint64_t giveClientRetainedMessages(const std::shared_ptr<Session> &ses, 170 uint64_t giveClientRetainedMessages(const std::shared_ptr<Session> &ses,
171 const std::vector<std::string> &subscribeSubtopics, char max_qos); 171 const std::vector<std::string> &subscribeSubtopics, char max_qos);
@@ -188,6 +188,6 @@ public: @@ -188,6 +188,6 @@ public:
188 void queueSessionRemoval(const std::shared_ptr<Session> &session); 188 void queueSessionRemoval(const std::shared_ptr<Session> &session);
189 }; 189 };
190 190
191 -bool willDelayCompare(const std::shared_ptr<Publish> &a, const QueuedWill &b); 191 +bool willDelayCompare(const std::shared_ptr<WillPublish> &a, const QueuedWill &b);
192 192
193 #endif // SUBSCRIPTIONSTORE_H 193 #endif // SUBSCRIPTIONSTORE_H
types.cpp
@@ -216,6 +216,34 @@ Publish::Publish(const std::string &amp;topic, const std::string &amp;payload, char qos) @@ -216,6 +216,34 @@ Publish::Publish(const std::string &amp;topic, const std::string &amp;payload, char qos)
216 216
217 } 217 }
218 218
  219 +WillPublish::WillPublish(const Publish &other) :
  220 + Publish(other)
  221 +{
  222 +
  223 +}
  224 +
  225 +void WillPublish::setQueuedAt()
  226 +{
  227 + this->isQueued = true;
  228 + this->queuedAt = std::chrono::steady_clock::now();
  229 +}
  230 +
  231 +/**
  232 + * @brief WillPublish::getQueuedAtAge gets the time ago in seconds when this will was queued. The time is set externally by the queue action.
  233 + * @return
  234 + *
  235 + * This age is required when saving wills to disk, because the new will delay to set on load is not the original will delay, but minus the
  236 + * elapsed time after queueing.
  237 + */
  238 +uint32_t WillPublish::getQueuedAtAge() const
  239 +{
  240 + if (!isQueued)
  241 + return 0;
  242 +
  243 + const std::chrono::seconds age = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - this->queuedAt);
  244 + return age.count();
  245 +}
  246 +
219 PubResponse::PubResponse(const ProtocolVersion protVersion, const PacketType packet_type, ReasonCodes reason_code, uint16_t packet_id) : 247 PubResponse::PubResponse(const ProtocolVersion protVersion, const PacketType packet_type, ReasonCodes reason_code, uint16_t packet_id) :
220 packet_type(packet_type), 248 packet_type(packet_type),
221 protocol_version(protVersion), 249 protocol_version(protVersion),
@@ -203,7 +203,6 @@ public: @@ -203,7 +203,6 @@ public:
203 std::string payload; 203 std::string payload;
204 char qos = 0; 204 char qos = 0;
205 bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9] 205 bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9]
206 - uint32_t will_delay = 0; // if will, this is the delay.  
207 bool splitTopic = true; 206 bool splitTopic = true;
208 uint16_t topicAlias = 0; 207 uint16_t topicAlias = 0;
209 bool skipTopic = false; 208 bool skipTopic = false;
@@ -233,6 +232,18 @@ public: @@ -233,6 +232,18 @@ public:
233 Publish(const std::string &topic, const std::string &payload, char qos); 232 Publish(const std::string &topic, const std::string &payload, char qos);
234 }; 233 };
235 234
  235 +class WillPublish : public Publish
  236 +{
  237 + bool isQueued = false;
  238 + std::chrono::time_point<std::chrono::steady_clock> queuedAt;
  239 +public:
  240 + uint32_t will_delay = 0;
  241 + WillPublish() = default;
  242 + WillPublish(const Publish &other);
  243 + void setQueuedAt();
  244 + uint32_t getQueuedAtAge() const;
  245 +};
  246 +
236 class PubResponse 247 class PubResponse
237 { 248 {
238 public: 249 public: