diff --git a/mqttpacket.cpp b/mqttpacket.cpp
index 5ffa6cd..6417ed3 100644
--- a/mqttpacket.cpp
+++ b/mqttpacket.cpp
@@ -434,7 +434,7 @@ void MqttPacket::handleConnect()
{
case Mqtt5Properties::WillDelayInterval:
willpublish.will_delay = readFourBytesToUint32();
- willpublish.createdAt = std::chrono::steady_clock::now();
+ willpublish.setCreatedAt(std::chrono::steady_clock::now());
break;
case Mqtt5Properties::PayloadFormatIndicator:
willpublish.propertyBuilder->writePayloadFormatIndicator(readByte());
@@ -455,9 +455,8 @@ void MqttPacket::handleConnect()
}
case Mqtt5Properties::MessageExpiryInterval:
{
- willpublish.createdAt = std::chrono::steady_clock::now();
- uint32_t expiresAfter = readFourBytesToUint32();
- willpublish.expiresAfter = std::chrono::seconds(expiresAfter);
+ const uint32_t expiresAfter = readFourBytesToUint32();
+ willpublish.setExpireAfter(expiresAfter);
break;
}
case Mqtt5Properties::CorrelationData:
@@ -827,8 +826,7 @@ void MqttPacket::parsePublishData()
publishData.propertyBuilder->writePayloadFormatIndicator(readByte());
break;
case Mqtt5Properties::MessageExpiryInterval:
- publishData.createdAt = std::chrono::steady_clock::now();
- publishData.expiresAfter = std::chrono::seconds(readFourBytesToUint32());
+ publishData.setExpireAfter(readFourBytesToUint32());
break;
case Mqtt5Properties::TopicAlias:
{
@@ -1328,7 +1326,8 @@ bool MqttPacket::containsClientSpecificProperties() const
if (protocolVersion <= ProtocolVersion::Mqtt311 || !publishData.propertyBuilder)
return false;
- if (publishData.createdAt.time_since_epoch().count() == 0 || this->hasTopicAlias) // TODO: better
+ // TODO: for the on-line clients, even with expire info, we can just copy the same packet. So, that case can be excluded.
+ if (publishData.getHasExpireInfo() || this->hasTopicAlias)
{
return true;
}
diff --git a/sessionsandsubscriptionsdb.cpp b/sessionsandsubscriptionsdb.cpp
index 8a0e0ef..5d65923 100644
--- a/sessionsandsubscriptionsdb.cpp
+++ b/sessionsandsubscriptionsdb.cpp
@@ -18,6 +18,7 @@ License along with FlashMQ. If not, see .
#include "sessionsandsubscriptionsdb.h"
#include "mqttpacket.h"
#include "threadglobals.h"
+#include "utils.h"
#include "cassert"
@@ -113,6 +114,7 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
{
const uint16_t fixed_header_length = readUint16(eofFound);
const uint16_t id = readUint16(eofFound);
+ const uint32_t originalPubAge = readUint32(eofFound);
const uint32_t packlen = readUint32(eofFound);
assert(id > 0);
@@ -127,7 +129,8 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
pack.parsePublishData();
Publish pub(pack.getPublishData());
- // TODO: update the pub.createdAt
+ const uint32_t newPubAge = persistence_state_age + originalPubAge;
+ pub.setCreatedAt(timepointFromAge(newPubAge));
logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str());
ses->qosPacketQueue.queuePublish(std::move(pub), id);
@@ -250,8 +253,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vectorlogf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str());
- pub.clearClientSpecificProperties(); // TODO: unnecessary? Unwanted even? I need to store the expiration interval. And how to load it?
-
MqttPacket pack(ProtocolVersion::Mqtt5, pub);
pack.setPacketId(p.getPacketId());
const uint32_t packSize = pack.getSizeIncludingNonPresentHeader();
@@ -259,10 +260,11 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector p = (*it).lock();
if (p)
{
- if (p->createdAt + std::chrono::seconds(p->will_delay) > now)
+ if (p->getCreatedAt() + std::chrono::seconds(p->will_delay) > now)
break;
logger->logf(LOG_DEBUG, "Sending delayed will on topic '%s'.", p->topic.c_str() );
diff --git a/types.cpp b/types.cpp
index 01fd761..33dddea 100644
--- a/types.cpp
+++ b/types.cpp
@@ -138,7 +138,7 @@ size_t PublishBase::getLengthWithoutFixedHeader() const
*/
void PublishBase::setClientSpecificProperties()
{
- if (this->createdAt.time_since_epoch().count() && this->topicAlias == 0)
+ if (!hasExpireInfo && this->topicAlias == 0)
return;
if (propertyBuilder)
@@ -146,11 +146,13 @@ void PublishBase::setClientSpecificProperties()
else
propertyBuilder = std::make_shared();
- if (createdAt.time_since_epoch().count() > 0)
+ if (hasExpireInfo)
{
auto now = std::chrono::steady_clock::now();
- std::chrono::seconds newExpiresAfter = std::chrono::duration_cast(now - createdAt);
- propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count());
+ std::chrono::seconds delay = std::chrono::duration_cast(now - createdAt);
+ int32_t newExpire = (this->expiresAfter - delay).count();
+ if (newExpire > 0)
+ propertyBuilder->writeMessageExpiryInterval(newExpire);
}
if (topicAlias > 0)
@@ -178,10 +180,35 @@ bool PublishBase::hasUserProperties() const
bool PublishBase::hasExpired() const
{
+ if (!hasExpireInfo)
+ return false;
+
const std::chrono::seconds age = std::chrono::duration_cast(std::chrono::steady_clock::now() - this->createdAt);
return (expiresAfter > age);
}
+void PublishBase::setCreatedAt(std::chrono::time_point t)
+{
+ this->createdAt = t;
+}
+
+void PublishBase::setExpireAfter(uint32_t s)
+{
+ this->createdAt = std::chrono::steady_clock::now();
+ this->expiresAfter = std::chrono::seconds(s);
+ this->hasExpireInfo = true;
+}
+
+bool PublishBase::getHasExpireInfo() const
+{
+ return this->hasExpireInfo;
+}
+
+const std::chrono::time_point PublishBase::getCreatedAt() const
+{
+ return this->createdAt;
+}
+
Publish::Publish(const Publish &other) :
PublishBase(other)
{
diff --git a/types.h b/types.h
index 7b2df7e..2c7265c 100644
--- a/types.h
+++ b/types.h
@@ -192,6 +192,10 @@ public:
*/
class PublishBase
{
+ bool hasExpireInfo = false;
+ std::chrono::time_point createdAt;
+ std::chrono::seconds expiresAfter;
+
public:
std::string topic;
std::string payload;
@@ -199,8 +203,6 @@ public:
bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9]
uint32_t will_delay = 0; // if will, this is the delay. Just storing here, to avoid having to make a WillMessage class
bool splitTopic = true;
- std::chrono::time_point createdAt;
- std::chrono::seconds expiresAfter;
uint16_t topicAlias = 0;
bool skipTopic = false;
std::shared_ptr propertyBuilder; // Only contains data for sending, not receiving
@@ -213,6 +215,11 @@ public:
void constructPropertyBuilder();
bool hasUserProperties() const;
bool hasExpired() const;
+
+ void setCreatedAt(std::chrono::time_point t);
+ void setExpireAfter(uint32_t s);
+ bool getHasExpireInfo() const;
+ const std::chrono::time_point getCreatedAt() const;
};
class Publish : public PublishBase
diff --git a/utils.cpp b/utils.cpp
index 0deadaf..4bd759b 100644
--- a/utils.cpp
+++ b/utils.cpp
@@ -668,3 +668,17 @@ const std::string protocolVersionString(ProtocolVersion p)
return "unknown";
}
}
+
+uint32_t ageFromTimePoint(const std::chrono::time_point &point)
+{
+ auto duration = std::chrono::steady_clock::now() - point;
+ auto seconds = std::chrono::duration_cast(duration);
+ return seconds.count();
+}
+
+std::chrono::time_point timepointFromAge(const uint32_t age)
+{
+ std::chrono::seconds seconds(age);
+ std::chrono::time_point newPoint = std::chrono::steady_clock::now() + seconds;
+ return newPoint;
+}
diff --git a/utils.h b/utils.h
index 746c433..f99b34a 100644
--- a/utils.h
+++ b/utils.h
@@ -127,5 +127,8 @@ const std::string websocketCloseCodeToString(uint16_t code);
const std::string protocolVersionString(ProtocolVersion p);
+uint32_t ageFromTimePoint(const std::chrono::time_point &point);
+std::chrono::time_point timepointFromAge(const uint32_t age);
+
#endif // UTILS_H