From ec518d7a7a4574f9cc1e0ccae03a9cd5c0674b3b Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Sun, 10 Apr 2022 12:40:52 +0200 Subject: [PATCH] Load and store createdAt time of stored QoS packets --- mqttpacket.cpp | 13 ++++++------- sessionsandsubscriptionsdb.cpp | 10 ++++++---- subscriptionstore.cpp | 2 +- types.cpp | 35 +++++++++++++++++++++++++++++++---- types.h | 11 +++++++++-- utils.cpp | 14 ++++++++++++++ utils.h | 3 +++ 7 files changed, 70 insertions(+), 18 deletions(-) diff --git a/mqttpacket.cpp b/mqttpacket.cpp index 5ffa6cd..6417ed3 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -434,7 +434,7 @@ void MqttPacket::handleConnect() { case Mqtt5Properties::WillDelayInterval: willpublish.will_delay = readFourBytesToUint32(); - willpublish.createdAt = std::chrono::steady_clock::now(); + willpublish.setCreatedAt(std::chrono::steady_clock::now()); break; case Mqtt5Properties::PayloadFormatIndicator: willpublish.propertyBuilder->writePayloadFormatIndicator(readByte()); @@ -455,9 +455,8 @@ void MqttPacket::handleConnect() } case Mqtt5Properties::MessageExpiryInterval: { - willpublish.createdAt = std::chrono::steady_clock::now(); - uint32_t expiresAfter = readFourBytesToUint32(); - willpublish.expiresAfter = std::chrono::seconds(expiresAfter); + const uint32_t expiresAfter = readFourBytesToUint32(); + willpublish.setExpireAfter(expiresAfter); break; } case Mqtt5Properties::CorrelationData: @@ -827,8 +826,7 @@ void MqttPacket::parsePublishData() publishData.propertyBuilder->writePayloadFormatIndicator(readByte()); break; case Mqtt5Properties::MessageExpiryInterval: - publishData.createdAt = std::chrono::steady_clock::now(); - publishData.expiresAfter = std::chrono::seconds(readFourBytesToUint32()); + publishData.setExpireAfter(readFourBytesToUint32()); break; case Mqtt5Properties::TopicAlias: { @@ -1328,7 +1326,8 @@ bool MqttPacket::containsClientSpecificProperties() const if (protocolVersion <= ProtocolVersion::Mqtt311 || !publishData.propertyBuilder) return false; - if (publishData.createdAt.time_since_epoch().count() == 0 || this->hasTopicAlias) // TODO: better + // TODO: for the on-line clients, even with expire info, we can just copy the same packet. So, that case can be excluded. + if (publishData.getHasExpireInfo() || this->hasTopicAlias) { return true; } diff --git a/sessionsandsubscriptionsdb.cpp b/sessionsandsubscriptionsdb.cpp index 8a0e0ef..5d65923 100644 --- a/sessionsandsubscriptionsdb.cpp +++ b/sessionsandsubscriptionsdb.cpp @@ -18,6 +18,7 @@ License along with FlashMQ. If not, see . #include "sessionsandsubscriptionsdb.h" #include "mqttpacket.h" #include "threadglobals.h" +#include "utils.h" #include "cassert" @@ -113,6 +114,7 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() { const uint16_t fixed_header_length = readUint16(eofFound); const uint16_t id = readUint16(eofFound); + const uint32_t originalPubAge = readUint32(eofFound); const uint32_t packlen = readUint32(eofFound); assert(id > 0); @@ -127,7 +129,8 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() pack.parsePublishData(); Publish pub(pack.getPublishData()); - // TODO: update the pub.createdAt + const uint32_t newPubAge = persistence_state_age + originalPubAge; + pub.setCreatedAt(timepointFromAge(newPubAge)); logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str()); ses->qosPacketQueue.queuePublish(std::move(pub), id); @@ -250,8 +253,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vectorlogf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str()); - pub.clearClientSpecificProperties(); // TODO: unnecessary? Unwanted even? I need to store the expiration interval. And how to load it? - MqttPacket pack(ProtocolVersion::Mqtt5, pub); pack.setPacketId(p.getPacketId()); const uint32_t packSize = pack.getSizeIncludingNonPresentHeader(); @@ -259,10 +260,11 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector p = (*it).lock(); if (p) { - if (p->createdAt + std::chrono::seconds(p->will_delay) > now) + if (p->getCreatedAt() + std::chrono::seconds(p->will_delay) > now) break; logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() ); diff --git a/types.cpp b/types.cpp index 01fd761..33dddea 100644 --- a/types.cpp +++ b/types.cpp @@ -138,7 +138,7 @@ size_t PublishBase::getLengthWithoutFixedHeader() const */ void PublishBase::setClientSpecificProperties() { - if (this->createdAt.time_since_epoch().count() && this->topicAlias == 0) + if (!hasExpireInfo && this->topicAlias == 0) return; if (propertyBuilder) @@ -146,11 +146,13 @@ void PublishBase::setClientSpecificProperties() else propertyBuilder = std::make_shared(); - if (createdAt.time_since_epoch().count() > 0) + if (hasExpireInfo) { auto now = std::chrono::steady_clock::now(); - std::chrono::seconds newExpiresAfter = std::chrono::duration_cast(now - createdAt); - propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count()); + std::chrono::seconds delay = std::chrono::duration_cast(now - createdAt); + int32_t newExpire = (this->expiresAfter - delay).count(); + if (newExpire > 0) + propertyBuilder->writeMessageExpiryInterval(newExpire); } if (topicAlias > 0) @@ -178,10 +180,35 @@ bool PublishBase::hasUserProperties() const bool PublishBase::hasExpired() const { + if (!hasExpireInfo) + return false; + const std::chrono::seconds age = std::chrono::duration_cast(std::chrono::steady_clock::now() - this->createdAt); return (expiresAfter > age); } +void PublishBase::setCreatedAt(std::chrono::time_point t) +{ + this->createdAt = t; +} + +void PublishBase::setExpireAfter(uint32_t s) +{ + this->createdAt = std::chrono::steady_clock::now(); + this->expiresAfter = std::chrono::seconds(s); + this->hasExpireInfo = true; +} + +bool PublishBase::getHasExpireInfo() const +{ + return this->hasExpireInfo; +} + +const std::chrono::time_point PublishBase::getCreatedAt() const +{ + return this->createdAt; +} + Publish::Publish(const Publish &other) : PublishBase(other) { diff --git a/types.h b/types.h index 7b2df7e..2c7265c 100644 --- a/types.h +++ b/types.h @@ -192,6 +192,10 @@ public: */ class PublishBase { + bool hasExpireInfo = false; + std::chrono::time_point createdAt; + std::chrono::seconds expiresAfter; + public: std::string topic; std::string payload; @@ -199,8 +203,6 @@ public: bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9] uint32_t will_delay = 0; // if will, this is the delay. Just storing here, to avoid having to make a WillMessage class bool splitTopic = true; - std::chrono::time_point createdAt; - std::chrono::seconds expiresAfter; uint16_t topicAlias = 0; bool skipTopic = false; std::shared_ptr propertyBuilder; // Only contains data for sending, not receiving @@ -213,6 +215,11 @@ public: void constructPropertyBuilder(); bool hasUserProperties() const; bool hasExpired() const; + + void setCreatedAt(std::chrono::time_point t); + void setExpireAfter(uint32_t s); + bool getHasExpireInfo() const; + const std::chrono::time_point getCreatedAt() const; }; class Publish : public PublishBase diff --git a/utils.cpp b/utils.cpp index 0deadaf..4bd759b 100644 --- a/utils.cpp +++ b/utils.cpp @@ -668,3 +668,17 @@ const std::string protocolVersionString(ProtocolVersion p) return "unknown"; } } + +uint32_t ageFromTimePoint(const std::chrono::time_point &point) +{ + auto duration = std::chrono::steady_clock::now() - point; + auto seconds = std::chrono::duration_cast(duration); + return seconds.count(); +} + +std::chrono::time_point timepointFromAge(const uint32_t age) +{ + std::chrono::seconds seconds(age); + std::chrono::time_point newPoint = std::chrono::steady_clock::now() + seconds; + return newPoint; +} diff --git a/utils.h b/utils.h index 746c433..f99b34a 100644 --- a/utils.h +++ b/utils.h @@ -127,5 +127,8 @@ const std::string websocketCloseCodeToString(uint16_t code); const std::string protocolVersionString(ProtocolVersion p); +uint32_t ageFromTimePoint(const std::chrono::time_point &point); +std::chrono::time_point timepointFromAge(const uint32_t age); + #endif // UTILS_H -- libgit2 0.21.4