Commit ec518d7a7a4574f9cc1e0ccae03a9cd5c0674b3b

Authored by Wiebe Cazemier
1 parent 7be65e95

Load and store createdAt time of stored QoS packets

Also fixes a bug in new interval calculation.
mqttpacket.cpp
@@ -434,7 +434,7 @@ void MqttPacket::handleConnect() @@ -434,7 +434,7 @@ void MqttPacket::handleConnect()
434 { 434 {
435 case Mqtt5Properties::WillDelayInterval: 435 case Mqtt5Properties::WillDelayInterval:
436 willpublish.will_delay = readFourBytesToUint32(); 436 willpublish.will_delay = readFourBytesToUint32();
437 - willpublish.createdAt = std::chrono::steady_clock::now(); 437 + willpublish.setCreatedAt(std::chrono::steady_clock::now());
438 break; 438 break;
439 case Mqtt5Properties::PayloadFormatIndicator: 439 case Mqtt5Properties::PayloadFormatIndicator:
440 willpublish.propertyBuilder->writePayloadFormatIndicator(readByte()); 440 willpublish.propertyBuilder->writePayloadFormatIndicator(readByte());
@@ -455,9 +455,8 @@ void MqttPacket::handleConnect() @@ -455,9 +455,8 @@ void MqttPacket::handleConnect()
455 } 455 }
456 case Mqtt5Properties::MessageExpiryInterval: 456 case Mqtt5Properties::MessageExpiryInterval:
457 { 457 {
458 - willpublish.createdAt = std::chrono::steady_clock::now();  
459 - uint32_t expiresAfter = readFourBytesToUint32();  
460 - willpublish.expiresAfter = std::chrono::seconds(expiresAfter); 458 + const uint32_t expiresAfter = readFourBytesToUint32();
  459 + willpublish.setExpireAfter(expiresAfter);
461 break; 460 break;
462 } 461 }
463 case Mqtt5Properties::CorrelationData: 462 case Mqtt5Properties::CorrelationData:
@@ -827,8 +826,7 @@ void MqttPacket::parsePublishData() @@ -827,8 +826,7 @@ void MqttPacket::parsePublishData()
827 publishData.propertyBuilder->writePayloadFormatIndicator(readByte()); 826 publishData.propertyBuilder->writePayloadFormatIndicator(readByte());
828 break; 827 break;
829 case Mqtt5Properties::MessageExpiryInterval: 828 case Mqtt5Properties::MessageExpiryInterval:
830 - publishData.createdAt = std::chrono::steady_clock::now();  
831 - publishData.expiresAfter = std::chrono::seconds(readFourBytesToUint32()); 829 + publishData.setExpireAfter(readFourBytesToUint32());
832 break; 830 break;
833 case Mqtt5Properties::TopicAlias: 831 case Mqtt5Properties::TopicAlias:
834 { 832 {
@@ -1328,7 +1326,8 @@ bool MqttPacket::containsClientSpecificProperties() const @@ -1328,7 +1326,8 @@ bool MqttPacket::containsClientSpecificProperties() const
1328 if (protocolVersion <= ProtocolVersion::Mqtt311 || !publishData.propertyBuilder) 1326 if (protocolVersion <= ProtocolVersion::Mqtt311 || !publishData.propertyBuilder)
1329 return false; 1327 return false;
1330 1328
1331 - if (publishData.createdAt.time_since_epoch().count() == 0 || this->hasTopicAlias) // TODO: better 1329 + // TODO: for the on-line clients, even with expire info, we can just copy the same packet. So, that case can be excluded.
  1330 + if (publishData.getHasExpireInfo() || this->hasTopicAlias)
1332 { 1331 {
1333 return true; 1332 return true;
1334 } 1333 }
sessionsandsubscriptionsdb.cpp
@@ -18,6 +18,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -18,6 +18,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
18 #include "sessionsandsubscriptionsdb.h" 18 #include "sessionsandsubscriptionsdb.h"
19 #include "mqttpacket.h" 19 #include "mqttpacket.h"
20 #include "threadglobals.h" 20 #include "threadglobals.h"
  21 +#include "utils.h"
21 22
22 #include "cassert" 23 #include "cassert"
23 24
@@ -113,6 +114,7 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -113,6 +114,7 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
113 { 114 {
114 const uint16_t fixed_header_length = readUint16(eofFound); 115 const uint16_t fixed_header_length = readUint16(eofFound);
115 const uint16_t id = readUint16(eofFound); 116 const uint16_t id = readUint16(eofFound);
  117 + const uint32_t originalPubAge = readUint32(eofFound);
116 const uint32_t packlen = readUint32(eofFound); 118 const uint32_t packlen = readUint32(eofFound);
117 119
118 assert(id > 0); 120 assert(id > 0);
@@ -127,7 +129,8 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -127,7 +129,8 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
127 pack.parsePublishData(); 129 pack.parsePublishData();
128 Publish pub(pack.getPublishData()); 130 Publish pub(pack.getPublishData());
129 131
130 - // TODO: update the pub.createdAt 132 + const uint32_t newPubAge = persistence_state_age + originalPubAge;
  133 + pub.setCreatedAt(timepointFromAge(newPubAge));
131 134
132 logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str()); 135 logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str());
133 ses->qosPacketQueue.queuePublish(std::move(pub), id); 136 ses->qosPacketQueue.queuePublish(std::move(pub), id);
@@ -250,8 +253,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -250,8 +253,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
250 253
251 logger->logf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str()); 254 logger->logf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str());
252 255
253 - pub.clearClientSpecificProperties(); // TODO: unnecessary? Unwanted even? I need to store the expiration interval. And how to load it?  
254 -  
255 MqttPacket pack(ProtocolVersion::Mqtt5, pub); 256 MqttPacket pack(ProtocolVersion::Mqtt5, pub);
256 pack.setPacketId(p.getPacketId()); 257 pack.setPacketId(p.getPacketId());
257 const uint32_t packSize = pack.getSizeIncludingNonPresentHeader(); 258 const uint32_t packSize = pack.getSizeIncludingNonPresentHeader();
@@ -259,10 +260,11 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -259,10 +260,11 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
259 cirbuf.ensureFreeSpace(packSize + 32); 260 cirbuf.ensureFreeSpace(packSize + 32);
260 pack.readIntoBuf(cirbuf); 261 pack.readIntoBuf(cirbuf);
261 262
262 - // TODO: save age 263 + const uint32_t pubAge = ageFromTimePoint(pub.getCreatedAt());
263 264
264 writeUint16(pack.getFixedHeaderLength()); 265 writeUint16(pack.getFixedHeaderLength());
265 writeUint16(p.getPacketId()); 266 writeUint16(p.getPacketId());
  267 + writeUint32(pubAge);
266 writeUint32(packSize); 268 writeUint32(packSize);
267 writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f); 269 writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f);
268 } 270 }
subscriptionstore.cpp
@@ -292,7 +292,7 @@ void SubscriptionStore::sendQueuedWillMessages() @@ -292,7 +292,7 @@ void SubscriptionStore::sendQueuedWillMessages()
292 std::shared_ptr<Publish> p = (*it).lock(); 292 std::shared_ptr<Publish> p = (*it).lock();
293 if (p) 293 if (p)
294 { 294 {
295 - if (p->createdAt + std::chrono::seconds(p->will_delay) > now) 295 + if (p->getCreatedAt() + std::chrono::seconds(p->will_delay) > now)
296 break; 296 break;
297 297
298 logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() ); 298 logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() );
types.cpp
@@ -138,7 +138,7 @@ size_t PublishBase::getLengthWithoutFixedHeader() const @@ -138,7 +138,7 @@ size_t PublishBase::getLengthWithoutFixedHeader() const
138 */ 138 */
139 void PublishBase::setClientSpecificProperties() 139 void PublishBase::setClientSpecificProperties()
140 { 140 {
141 - if (this->createdAt.time_since_epoch().count() && this->topicAlias == 0) 141 + if (!hasExpireInfo && this->topicAlias == 0)
142 return; 142 return;
143 143
144 if (propertyBuilder) 144 if (propertyBuilder)
@@ -146,11 +146,13 @@ void PublishBase::setClientSpecificProperties() @@ -146,11 +146,13 @@ void PublishBase::setClientSpecificProperties()
146 else 146 else
147 propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>(); 147 propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>();
148 148
149 - if (createdAt.time_since_epoch().count() > 0) 149 + if (hasExpireInfo)
150 { 150 {
151 auto now = std::chrono::steady_clock::now(); 151 auto now = std::chrono::steady_clock::now();
152 - std::chrono::seconds newExpiresAfter = std::chrono::duration_cast<std::chrono::seconds>(now - createdAt);  
153 - propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count()); 152 + std::chrono::seconds delay = std::chrono::duration_cast<std::chrono::seconds>(now - createdAt);
  153 + int32_t newExpire = (this->expiresAfter - delay).count();
  154 + if (newExpire > 0)
  155 + propertyBuilder->writeMessageExpiryInterval(newExpire);
154 } 156 }
155 157
156 if (topicAlias > 0) 158 if (topicAlias > 0)
@@ -178,10 +180,35 @@ bool PublishBase::hasUserProperties() const @@ -178,10 +180,35 @@ bool PublishBase::hasUserProperties() const
178 180
179 bool PublishBase::hasExpired() const 181 bool PublishBase::hasExpired() const
180 { 182 {
  183 + if (!hasExpireInfo)
  184 + return false;
  185 +
181 const std::chrono::seconds age = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - this->createdAt); 186 const std::chrono::seconds age = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - this->createdAt);
182 return (expiresAfter > age); 187 return (expiresAfter > age);
183 } 188 }
184 189
  190 +void PublishBase::setCreatedAt(std::chrono::time_point<std::chrono::steady_clock> t)
  191 +{
  192 + this->createdAt = t;
  193 +}
  194 +
  195 +void PublishBase::setExpireAfter(uint32_t s)
  196 +{
  197 + this->createdAt = std::chrono::steady_clock::now();
  198 + this->expiresAfter = std::chrono::seconds(s);
  199 + this->hasExpireInfo = true;
  200 +}
  201 +
  202 +bool PublishBase::getHasExpireInfo() const
  203 +{
  204 + return this->hasExpireInfo;
  205 +}
  206 +
  207 +const std::chrono::time_point<std::chrono::steady_clock> PublishBase::getCreatedAt() const
  208 +{
  209 + return this->createdAt;
  210 +}
  211 +
185 Publish::Publish(const Publish &other) : 212 Publish::Publish(const Publish &other) :
186 PublishBase(other) 213 PublishBase(other)
187 { 214 {
@@ -192,6 +192,10 @@ public: @@ -192,6 +192,10 @@ public:
192 */ 192 */
193 class PublishBase 193 class PublishBase
194 { 194 {
  195 + bool hasExpireInfo = false;
  196 + std::chrono::time_point<std::chrono::steady_clock> createdAt;
  197 + std::chrono::seconds expiresAfter;
  198 +
195 public: 199 public:
196 std::string topic; 200 std::string topic;
197 std::string payload; 201 std::string payload;
@@ -199,8 +203,6 @@ public: @@ -199,8 +203,6 @@ public:
199 bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9] 203 bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9]
200 uint32_t will_delay = 0; // if will, this is the delay. Just storing here, to avoid having to make a WillMessage class 204 uint32_t will_delay = 0; // if will, this is the delay. Just storing here, to avoid having to make a WillMessage class
201 bool splitTopic = true; 205 bool splitTopic = true;
202 - std::chrono::time_point<std::chrono::steady_clock> createdAt;  
203 - std::chrono::seconds expiresAfter;  
204 uint16_t topicAlias = 0; 206 uint16_t topicAlias = 0;
205 bool skipTopic = false; 207 bool skipTopic = false;
206 std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder; // Only contains data for sending, not receiving 208 std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder; // Only contains data for sending, not receiving
@@ -213,6 +215,11 @@ public: @@ -213,6 +215,11 @@ public:
213 void constructPropertyBuilder(); 215 void constructPropertyBuilder();
214 bool hasUserProperties() const; 216 bool hasUserProperties() const;
215 bool hasExpired() const; 217 bool hasExpired() const;
  218 +
  219 + void setCreatedAt(std::chrono::time_point<std::chrono::steady_clock> t);
  220 + void setExpireAfter(uint32_t s);
  221 + bool getHasExpireInfo() const;
  222 + const std::chrono::time_point<std::chrono::steady_clock> getCreatedAt() const;
216 }; 223 };
217 224
218 class Publish : public PublishBase 225 class Publish : public PublishBase
utils.cpp
@@ -668,3 +668,17 @@ const std::string protocolVersionString(ProtocolVersion p) @@ -668,3 +668,17 @@ const std::string protocolVersionString(ProtocolVersion p)
668 return "unknown"; 668 return "unknown";
669 } 669 }
670 } 670 }
  671 +
  672 +uint32_t ageFromTimePoint(const std::chrono::time_point<std::chrono::steady_clock> &point)
  673 +{
  674 + auto duration = std::chrono::steady_clock::now() - point;
  675 + auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration);
  676 + return seconds.count();
  677 +}
  678 +
  679 +std::chrono::time_point<std::chrono::steady_clock> timepointFromAge(const uint32_t age)
  680 +{
  681 + std::chrono::seconds seconds(age);
  682 + std::chrono::time_point<std::chrono::steady_clock> newPoint = std::chrono::steady_clock::now() + seconds;
  683 + return newPoint;
  684 +}
@@ -127,5 +127,8 @@ const std::string websocketCloseCodeToString(uint16_t code); @@ -127,5 +127,8 @@ const std::string websocketCloseCodeToString(uint16_t code);
127 127
128 const std::string protocolVersionString(ProtocolVersion p); 128 const std::string protocolVersionString(ProtocolVersion p);
129 129
  130 +uint32_t ageFromTimePoint(const std::chrono::time_point<std::chrono::steady_clock> &point);
  131 +std::chrono::time_point<std::chrono::steady_clock> timepointFromAge(const uint32_t age);
  132 +
130 133
131 #endif // UTILS_H 134 #endif // UTILS_H