diff --git a/mqtt5properties.cpp b/mqtt5properties.cpp index ab01aca..2a543de 100644 --- a/mqtt5properties.cpp +++ b/mqtt5properties.cpp @@ -152,7 +152,7 @@ void Mqtt5PropertyBuilder::writeStr(Mqtt5Properties prop, const std::string &str const uint16_t strlen = str.length(); size_t pos = genericBytes.size(); - const size_t newSize = pos + strlen + 2; + const size_t newSize = pos + strlen + 3; genericBytes.resize(newSize); const uint8_t a = static_cast(strlen >> 8); diff --git a/mqttpacket.cpp b/mqttpacket.cpp index 9a58834..18d2f03 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -33,6 +33,7 @@ MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_lengt assert(packet_len > 0); buf.read(bites.data(), packet_len); + protocolVersion = sender->getProtocolVersion(); first_byte = bites[0]; unsigned char _packetType = (first_byte & 0xF0) >> 4; packetType = (PacketType)_packetType; @@ -758,10 +759,12 @@ void MqttPacket::handlePublish() const size_t proplen = decodeVariableByteIntAtPos(); const size_t prop_end_at = pos + proplen; + if (proplen > 0) + publishData.propertyBuilder = std::make_shared(); + while (pos < prop_end_at) { const Mqtt5Properties prop = static_cast(readByte()); - publishData.propertyBuilder = std::make_shared(); switch (prop) { @@ -1131,6 +1134,21 @@ const Publish &MqttPacket::getPublishData() return publishData; } +bool MqttPacket::containsClientSpecificProperties() const +{ + assert(packetType == PacketType::PUBLISH); + + if (protocolVersion <= ProtocolVersion::Mqtt311 || !publishData.propertyBuilder) + return false; + + if (publishData.createdAt.time_since_epoch().count() == 0) // TODO: better + { + return true; + } + + return false; +} + void MqttPacket::readIntoBuf(CirBuf &buf) const { assert(packetType != PacketType::PUBLISH || (first_byte & 0b00000110) >> 1 == publishData.qos); diff --git a/mqttpacket.h b/mqttpacket.h index ade9f89..b98b8dd 100644 --- a/mqttpacket.h +++ b/mqttpacket.h @@ -122,6 +122,7 @@ public: bool getRetain() const; void setRetain(); const Publish &getPublishData(); + bool containsClientSpecificProperties() const; }; #endif // MQTTPACKET_H diff --git a/publishcopyfactory.cpp b/publishcopyfactory.cpp index 47e85f9..bf3a60a 100644 --- a/publishcopyfactory.cpp +++ b/publishcopyfactory.cpp @@ -21,7 +21,16 @@ MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const Proto { if (packet) { - // TODO: you can't do this when there are client specific mqtt5 properties + if (packet->containsClientSpecificProperties()) + { + Publish newPublish(packet->getPublishData()); + newPublish.splitTopic = false; + newPublish.qos = max_qos; + newPublish.setClientSpecificProperties(); + this->oneShotPacket = std::make_unique(protocolVersion, newPublish); + return this->oneShotPacket.get(); + } + if (packet->getProtocolVersion() == protocolVersion && orgQos == max_qos) { assert(orgQos == packet->getQos()); diff --git a/types.cpp b/types.cpp index 02a5ee4..3ee73bc 100644 --- a/types.cpp +++ b/types.cpp @@ -126,7 +126,18 @@ void PublishBase::setClientSpecificProperties() { if (propertyBuilder) propertyBuilder->clearClientSpecificBytes(); - // TODO. Expires at? + + auto now = std::chrono::steady_clock::now(); + std::chrono::seconds newExpiresAfter = std::chrono::duration_cast(now - createdAt); + + if (newExpiresAfter.count() > 0) + propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count()); +} + +bool PublishBase::hasExpired() const +{ + auto now = std::chrono::steady_clock::now(); + return (createdAt + expiresAfter) > now; } Publish::Publish(const Publish &other) : diff --git a/types.h b/types.h index a155290..595472e 100644 --- a/types.h +++ b/types.h @@ -67,7 +67,7 @@ enum class Mqtt5Properties SubscriptionIdentifier = 11, SessionExpiryInterval = 17, AssignedClientIdentifier = 18, - ServerKeepAlive = 13, + ServerKeepAlive = 19, AuthenticationMethod = 21, AuthenticationData = 22, RequestProblemInformation = 23, @@ -203,12 +203,13 @@ public: bool splitTopic = true; std::chrono::time_point createdAt; std::chrono::seconds expiresAfter; - std::shared_ptr propertyBuilder; + std::shared_ptr propertyBuilder; // Only contains data for sending, not receiving PublishBase() = default; PublishBase(const std::string &topic, const std::string &payload, char qos); size_t getLengthWithoutFixedHeader() const; void setClientSpecificProperties(); + bool hasExpired() const; }; class Publish : public PublishBase