From a0fff92aa3466eec04937ec3d597cf400d07589e Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Sat, 12 Mar 2022 11:37:33 +0100 Subject: [PATCH] WIP on will messages --- client.cpp | 10 ++++------ client.h | 6 ++---- mqtt5properties.cpp | 25 +++++++++++++++++++++++++ mqtt5properties.h | 5 +++++ mqttpacket.cpp | 87 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------- mqttpacket.h | 4 +++- types.h | 7 +++++++ 7 files changed, 124 insertions(+), 20 deletions(-) diff --git a/client.cpp b/client.cpp index b17d8d2..123c175 100644 --- a/client.cpp +++ b/client.cpp @@ -53,7 +53,7 @@ Client::~Client() std::shared_ptr &store = getThreadData()->getSubscriptionStore(); // Will payload can be empty, apparently. - if (!will_topic.empty()) + if (willPublish) { Publish will(will_topic, will_payload, will_qos); will.retain = will_retain; @@ -436,12 +436,10 @@ void Client::setClientProperties(ProtocolVersion protocolVersion, const std::str this->maxTopicAliases = maxTopicAliases; } -void Client::setWill(const std::string &topic, const std::string &payload, bool retain, char qos) +void Client::setWill(Publish &&willPublish) { - this->will_topic = topic; - this->will_payload = payload; - this->will_retain = retain; - this->will_qos = qos; + this->willPublish = std::make_shared(std::move(willPublish)); + // TODO: also session. Or only the session? } void Client::assignSession(std::shared_ptr &session) diff --git a/client.h b/client.h index 16577ce..04e9ef6 100644 --- a/client.h +++ b/client.h @@ -75,10 +75,7 @@ class Client std::string username; uint16_t keepalive = 0; - std::string will_topic; - std::string will_payload; - bool will_retain = false; - char will_qos = 0; + std::shared_ptr willPublish; std::shared_ptr threadData; std::mutex writeBufMutex; @@ -111,6 +108,7 @@ public: void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, uint32_t maxPacketSize, uint16_t maxTopicAliases); void setWill(const std::string &topic, const std::string &payload, bool retain, char qos); + void setWill(Publish &&willPublish); void clearWill(); void setAuthenticated(bool value) { authenticated = value;} bool getAuthenticated() { return authenticated; } diff --git a/mqtt5properties.cpp b/mqtt5properties.cpp index 89d8d4b..a02d926 100644 --- a/mqtt5properties.cpp +++ b/mqtt5properties.cpp @@ -59,11 +59,36 @@ void Mqtt5PropertyBuilder::writeWildcardSubscriptionAvailable(uint8_t val) writeUint8(Mqtt5Properties::WildcardSubscriptionAvailable, val); } +void Mqtt5PropertyBuilder::writeSubscriptionIdentifiersAvailable(uint8_t val) +{ + writeUint8(Mqtt5Properties::SubscriptionIdentifierAvailable, val); +} + void Mqtt5PropertyBuilder::writeSharedSubscriptionAvailable(uint8_t val) { writeUint8(Mqtt5Properties::SharedSubscriptionAvailable, val); } +void Mqtt5PropertyBuilder::writeContentType(const std::string &format) +{ + writeStr(Mqtt5Properties::ContentType, format); +} + +void Mqtt5PropertyBuilder::writePayloadFormatIndicator(uint8_t val) +{ + writeUint8(Mqtt5Properties::PayloadFormatIndicator, val); +} + +void Mqtt5PropertyBuilder::writeMessageExpiryInterval(uint32_t val) +{ + writeUint32(Mqtt5Properties::MessageExpiryInterval, val); +} + +void Mqtt5PropertyBuilder::writeResponseTopic(const std::string &str) +{ + writeStr(Mqtt5Properties::ResponseTopic, str); +} + void Mqtt5PropertyBuilder::writeUint32(Mqtt5Properties prop, const uint32_t x) { size_t pos = bites.size(); diff --git a/mqtt5properties.h b/mqtt5properties.h index 2650847..afa6e0c 100644 --- a/mqtt5properties.h +++ b/mqtt5properties.h @@ -29,7 +29,12 @@ public: void writeAssignedClientId(const std::string &clientid); void writeMaxTopicAliases(uint16_t val); void writeWildcardSubscriptionAvailable(uint8_t val); + void writeSubscriptionIdentifiersAvailable(uint8_t val); void writeSharedSubscriptionAvailable(uint8_t val); + void writeContentType(const std::string &format); + void writePayloadFormatIndicator(uint8_t val); + void writeMessageExpiryInterval(uint32_t val); + void writeResponseTopic(const std::string &str); }; #endif // MQTT5PROPERTIES_H diff --git a/mqttpacket.cpp b/mqttpacket.cpp index 87cdf4a..7ed74c4 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -152,16 +152,29 @@ MqttPacket::MqttPacket(const UnsubAck &unsubAck) : calculateRemainingLength(); } -MqttPacket::MqttPacket(const Publish &publish) : - bites(publish.getLengthWithoutFixedHeader()) +size_t MqttPacket::getRequiredSizeForPublish(const ProtocolVersion protocolVersion, const Publish &publish) const +{ + size_t result = publish.getLengthWithoutFixedHeader(); + if (protocolVersion >= ProtocolVersion::Mqtt5) + { + const size_t proplen = publish.propertyBuilder ? publish.propertyBuilder->getLength() : 1; + result += proplen; + } + return result; +} + +MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &publish) : + bites(getRequiredSizeForPublish(protocolVersion, publish)) { if (publish.topic.length() > 0xFFFF) { throw ProtocolError("Topic path too long."); } + this->protocolVersion = protocolVersion; + this->topic = publish.topic; - splitTopic(this->topic, subtopics); + splitTopic(this->topic, subtopics); // TODO: I think I can make this conditional, because the (planned) use will already have used the subtopics. packetType = PacketType::PUBLISH; this->qos = publish.qos; @@ -180,6 +193,12 @@ MqttPacket::MqttPacket(const Publish &publish) : writeBytes(zero, 2); } + if (protocolVersion >= ProtocolVersion::Mqtt5) + { + // TODO: first write a new expiry interval into propertybuilder. + writeProperties(publish.propertyBuilder); + } + payloadStart = pos; payloadLen = publish.payload.length(); @@ -429,16 +448,65 @@ void MqttPacket::handleConnect() std::string username; std::string password; - std::string will_topic; - std::string will_payload; + + Publish willpublish; + willpublish.qos = will_qos; + willpublish.retain = will_retain; if (will_flag) { + if (protocolVersion == ProtocolVersion::Mqtt5) + { + willpublish.propertyBuilder = std::make_shared(); + + const size_t proplen = decodeVariableByteIntAtPos(); + const size_t prop_end_at = pos + proplen; + + while (pos < prop_end_at) + { + const Mqtt5Properties prop = static_cast(readByte()); + + switch (prop) + { + case Mqtt5Properties::WillDelayInterval: + willpublish.will_delay = readFourBytesToUint32(); + break; + case Mqtt5Properties::PayloadFormatIndicator: + willpublish.propertyBuilder->writePayloadFormatIndicator(readByte()); + break; + case Mqtt5Properties::ContentType: + { + const uint16_t len = readTwoBytesToUInt16(); + const std::string contentType(readBytes(len), len); + willpublish.propertyBuilder->writeContentType(contentType); + break; + } + case Mqtt5Properties::ResponseTopic: + { + const uint16_t len = readTwoBytesToUInt16(); + const std::string responseTopic(readBytes(len), len); + willpublish.propertyBuilder->writeResponseTopic(responseTopic); + break; + } + case Mqtt5Properties::MessageExpiryInterval: + { + willpublish.createdAt = std::chrono::steady_clock::now(); + uint32_t expiresAfter = readFourBytesToUint32(); + willpublish.expiresAfter = std::chrono::seconds(expiresAfter); + break; + } + default: + break; + //throw ProtocolError("Invalid will property in connect."); + } + } + } + uint16_t will_topic_length = readTwoBytesToUInt16(); - will_topic = std::string(readBytes(will_topic_length), will_topic_length); + willpublish.topic = std::string(readBytes(will_topic_length), will_topic_length); uint16_t will_payload_length = readTwoBytesToUInt16(); - will_payload = std::string(readBytes(will_payload_length), will_payload_length); + willpublish.payload = std::string(readBytes(will_payload_length), will_payload_length); } if (user_name_flag) { @@ -455,7 +523,7 @@ void MqttPacket::handleConnect() } // The specs don't really say what to do when client id not UTF8, so including here. - if (!isValidUtf8(client_id) || !isValidUtf8(username) || !isValidUtf8(password) || !isValidUtf8(will_topic)) + if (!isValidUtf8(client_id) || !isValidUtf8(username) || !isValidUtf8(password) || !isValidUtf8(willpublish.topic)) { ConnAck connAck(protocolVersion, ReasonCodes::BadUserNameOrPassword); MqttPacket response(connAck); @@ -502,7 +570,7 @@ void MqttPacket::handleConnect() } sender->setClientProperties(protocolVersion, client_id, username, true, keep_alive, max_packet_size, max_topic_aliases); - sender->setWill(will_topic, will_payload, will_retain, will_qos); + sender->setWill(std::move(willpublish)); bool accessGranted = false; std::string denyLogMsg; @@ -542,6 +610,7 @@ void MqttPacket::handleConnect() connAck.propertyBuilder->writeAssignedClientId(client_id); connAck.propertyBuilder->writeMaxTopicAliases(max_topic_aliases); connAck.propertyBuilder->writeWildcardSubscriptionAvailable(1); + connAck.propertyBuilder->writeSubscriptionIdentifiersAvailable(0); connAck.propertyBuilder->writeSharedSubscriptionAvailable(0); } diff --git a/mqttpacket.h b/mqttpacket.h index ca2fe2b..69b0ebb 100644 --- a/mqttpacket.h +++ b/mqttpacket.h @@ -82,11 +82,13 @@ public: std::shared_ptr getCopy(char new_max_qos) const; + size_t getRequiredSizeForPublish(const ProtocolVersion protocolVersion, const Publish &publish) const; + // Constructor for outgoing packets. These may not allocate room for the fixed header, because we don't (always) know the length in advance. MqttPacket(const ConnAck &connAck); MqttPacket(const SubAck &subAck); MqttPacket(const UnsubAck &unsubAck); - MqttPacket(const Publish &publish); + MqttPacket(const ProtocolVersion protocolVersion, const Publish &publish); MqttPacket(const PubAck &pubAck); MqttPacket(const PubRec &pubRec); MqttPacket(const PubComp &pubComp); diff --git a/types.h b/types.h index e883e2e..df61223 100644 --- a/types.h +++ b/types.h @@ -22,6 +22,7 @@ License along with FlashMQ. If not, see . #include #include #include +#include #include "forward_declarations.h" @@ -194,6 +195,12 @@ public: std::string payload; char qos = 0; bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9] + uint32_t will_delay = 0; // if will, this is the delay. Just storing here, to avoid having to make a WillMessage class + std::chrono::time_point createdAt; + std::chrono::seconds expiresAfter; + std::shared_ptr propertyBuilder; + + Publish(); Publish(const std::string &topic, const std::string &payload, char qos); size_t getLengthWithoutFixedHeader() const; }; -- libgit2 0.21.4