Commit a0fff92aa3466eec04937ec3d597cf400d07589e

Authored by Wiebe Cazemier
1 parent 313b3346

WIP on will messages

which also has implications on how I think to handle 'queuePacketAtSub'
client.cpp
@@ -53,7 +53,7 @@ Client::~Client() @@ -53,7 +53,7 @@ Client::~Client()
53 std::shared_ptr<SubscriptionStore> &store = getThreadData()->getSubscriptionStore(); 53 std::shared_ptr<SubscriptionStore> &store = getThreadData()->getSubscriptionStore();
54 54
55 // Will payload can be empty, apparently. 55 // Will payload can be empty, apparently.
56 - if (!will_topic.empty()) 56 + if (willPublish)
57 { 57 {
58 Publish will(will_topic, will_payload, will_qos); 58 Publish will(will_topic, will_payload, will_qos);
59 will.retain = will_retain; 59 will.retain = will_retain;
@@ -436,12 +436,10 @@ void Client::setClientProperties(ProtocolVersion protocolVersion, const std::str @@ -436,12 +436,10 @@ void Client::setClientProperties(ProtocolVersion protocolVersion, const std::str
436 this->maxTopicAliases = maxTopicAliases; 436 this->maxTopicAliases = maxTopicAliases;
437 } 437 }
438 438
439 -void Client::setWill(const std::string &topic, const std::string &payload, bool retain, char qos) 439 +void Client::setWill(Publish &&willPublish)
440 { 440 {
441 - this->will_topic = topic;  
442 - this->will_payload = payload;  
443 - this->will_retain = retain;  
444 - this->will_qos = qos; 441 + this->willPublish = std::make_shared<Publish>(std::move(willPublish));
  442 + // TODO: also session. Or only the session?
445 } 443 }
446 444
447 void Client::assignSession(std::shared_ptr<Session> &session) 445 void Client::assignSession(std::shared_ptr<Session> &session)
client.h
@@ -75,10 +75,7 @@ class Client @@ -75,10 +75,7 @@ class Client
75 std::string username; 75 std::string username;
76 uint16_t keepalive = 0; 76 uint16_t keepalive = 0;
77 77
78 - std::string will_topic;  
79 - std::string will_payload;  
80 - bool will_retain = false;  
81 - char will_qos = 0; 78 + std::shared_ptr<Publish> willPublish;
82 79
83 std::shared_ptr<ThreadData> threadData; 80 std::shared_ptr<ThreadData> threadData;
84 std::mutex writeBufMutex; 81 std::mutex writeBufMutex;
@@ -111,6 +108,7 @@ public: @@ -111,6 +108,7 @@ public:
111 void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, 108 void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive,
112 uint32_t maxPacketSize, uint16_t maxTopicAliases); 109 uint32_t maxPacketSize, uint16_t maxTopicAliases);
113 void setWill(const std::string &topic, const std::string &payload, bool retain, char qos); 110 void setWill(const std::string &topic, const std::string &payload, bool retain, char qos);
  111 + void setWill(Publish &&willPublish);
114 void clearWill(); 112 void clearWill();
115 void setAuthenticated(bool value) { authenticated = value;} 113 void setAuthenticated(bool value) { authenticated = value;}
116 bool getAuthenticated() { return authenticated; } 114 bool getAuthenticated() { return authenticated; }
mqtt5properties.cpp
@@ -59,11 +59,36 @@ void Mqtt5PropertyBuilder::writeWildcardSubscriptionAvailable(uint8_t val) @@ -59,11 +59,36 @@ void Mqtt5PropertyBuilder::writeWildcardSubscriptionAvailable(uint8_t val)
59 writeUint8(Mqtt5Properties::WildcardSubscriptionAvailable, val); 59 writeUint8(Mqtt5Properties::WildcardSubscriptionAvailable, val);
60 } 60 }
61 61
  62 +void Mqtt5PropertyBuilder::writeSubscriptionIdentifiersAvailable(uint8_t val)
  63 +{
  64 + writeUint8(Mqtt5Properties::SubscriptionIdentifierAvailable, val);
  65 +}
  66 +
62 void Mqtt5PropertyBuilder::writeSharedSubscriptionAvailable(uint8_t val) 67 void Mqtt5PropertyBuilder::writeSharedSubscriptionAvailable(uint8_t val)
63 { 68 {
64 writeUint8(Mqtt5Properties::SharedSubscriptionAvailable, val); 69 writeUint8(Mqtt5Properties::SharedSubscriptionAvailable, val);
65 } 70 }
66 71
  72 +void Mqtt5PropertyBuilder::writeContentType(const std::string &format)
  73 +{
  74 + writeStr(Mqtt5Properties::ContentType, format);
  75 +}
  76 +
  77 +void Mqtt5PropertyBuilder::writePayloadFormatIndicator(uint8_t val)
  78 +{
  79 + writeUint8(Mqtt5Properties::PayloadFormatIndicator, val);
  80 +}
  81 +
  82 +void Mqtt5PropertyBuilder::writeMessageExpiryInterval(uint32_t val)
  83 +{
  84 + writeUint32(Mqtt5Properties::MessageExpiryInterval, val);
  85 +}
  86 +
  87 +void Mqtt5PropertyBuilder::writeResponseTopic(const std::string &str)
  88 +{
  89 + writeStr(Mqtt5Properties::ResponseTopic, str);
  90 +}
  91 +
67 void Mqtt5PropertyBuilder::writeUint32(Mqtt5Properties prop, const uint32_t x) 92 void Mqtt5PropertyBuilder::writeUint32(Mqtt5Properties prop, const uint32_t x)
68 { 93 {
69 size_t pos = bites.size(); 94 size_t pos = bites.size();
mqtt5properties.h
@@ -29,7 +29,12 @@ public: @@ -29,7 +29,12 @@ public:
29 void writeAssignedClientId(const std::string &clientid); 29 void writeAssignedClientId(const std::string &clientid);
30 void writeMaxTopicAliases(uint16_t val); 30 void writeMaxTopicAliases(uint16_t val);
31 void writeWildcardSubscriptionAvailable(uint8_t val); 31 void writeWildcardSubscriptionAvailable(uint8_t val);
  32 + void writeSubscriptionIdentifiersAvailable(uint8_t val);
32 void writeSharedSubscriptionAvailable(uint8_t val); 33 void writeSharedSubscriptionAvailable(uint8_t val);
  34 + void writeContentType(const std::string &format);
  35 + void writePayloadFormatIndicator(uint8_t val);
  36 + void writeMessageExpiryInterval(uint32_t val);
  37 + void writeResponseTopic(const std::string &str);
33 }; 38 };
34 39
35 #endif // MQTT5PROPERTIES_H 40 #endif // MQTT5PROPERTIES_H
mqttpacket.cpp
@@ -152,16 +152,29 @@ MqttPacket::MqttPacket(const UnsubAck &amp;unsubAck) : @@ -152,16 +152,29 @@ MqttPacket::MqttPacket(const UnsubAck &amp;unsubAck) :
152 calculateRemainingLength(); 152 calculateRemainingLength();
153 } 153 }
154 154
155 -MqttPacket::MqttPacket(const Publish &publish) :  
156 - bites(publish.getLengthWithoutFixedHeader()) 155 +size_t MqttPacket::getRequiredSizeForPublish(const ProtocolVersion protocolVersion, const Publish &publish) const
  156 +{
  157 + size_t result = publish.getLengthWithoutFixedHeader();
  158 + if (protocolVersion >= ProtocolVersion::Mqtt5)
  159 + {
  160 + const size_t proplen = publish.propertyBuilder ? publish.propertyBuilder->getLength() : 1;
  161 + result += proplen;
  162 + }
  163 + return result;
  164 +}
  165 +
  166 +MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &publish) :
  167 + bites(getRequiredSizeForPublish(protocolVersion, publish))
157 { 168 {
158 if (publish.topic.length() > 0xFFFF) 169 if (publish.topic.length() > 0xFFFF)
159 { 170 {
160 throw ProtocolError("Topic path too long."); 171 throw ProtocolError("Topic path too long.");
161 } 172 }
162 173
  174 + this->protocolVersion = protocolVersion;
  175 +
163 this->topic = publish.topic; 176 this->topic = publish.topic;
164 - splitTopic(this->topic, subtopics); 177 + splitTopic(this->topic, subtopics); // TODO: I think I can make this conditional, because the (planned) use will already have used the subtopics.
165 178
166 packetType = PacketType::PUBLISH; 179 packetType = PacketType::PUBLISH;
167 this->qos = publish.qos; 180 this->qos = publish.qos;
@@ -180,6 +193,12 @@ MqttPacket::MqttPacket(const Publish &amp;publish) : @@ -180,6 +193,12 @@ MqttPacket::MqttPacket(const Publish &amp;publish) :
180 writeBytes(zero, 2); 193 writeBytes(zero, 2);
181 } 194 }
182 195
  196 + if (protocolVersion >= ProtocolVersion::Mqtt5)
  197 + {
  198 + // TODO: first write a new expiry interval into propertybuilder.
  199 + writeProperties(publish.propertyBuilder);
  200 + }
  201 +
183 payloadStart = pos; 202 payloadStart = pos;
184 payloadLen = publish.payload.length(); 203 payloadLen = publish.payload.length();
185 204
@@ -429,16 +448,65 @@ void MqttPacket::handleConnect() @@ -429,16 +448,65 @@ void MqttPacket::handleConnect()
429 448
430 std::string username; 449 std::string username;
431 std::string password; 450 std::string password;
432 - std::string will_topic;  
433 - std::string will_payload; 451 +
  452 + Publish willpublish;
  453 + willpublish.qos = will_qos;
  454 + willpublish.retain = will_retain;
434 455
435 if (will_flag) 456 if (will_flag)
436 { 457 {
  458 + if (protocolVersion == ProtocolVersion::Mqtt5)
  459 + {
  460 + willpublish.propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>();
  461 +
  462 + const size_t proplen = decodeVariableByteIntAtPos();
  463 + const size_t prop_end_at = pos + proplen;
  464 +
  465 + while (pos < prop_end_at)
  466 + {
  467 + const Mqtt5Properties prop = static_cast<Mqtt5Properties>(readByte());
  468 +
  469 + switch (prop)
  470 + {
  471 + case Mqtt5Properties::WillDelayInterval:
  472 + willpublish.will_delay = readFourBytesToUint32();
  473 + break;
  474 + case Mqtt5Properties::PayloadFormatIndicator:
  475 + willpublish.propertyBuilder->writePayloadFormatIndicator(readByte());
  476 + break;
  477 + case Mqtt5Properties::ContentType:
  478 + {
  479 + const uint16_t len = readTwoBytesToUInt16();
  480 + const std::string contentType(readBytes(len), len);
  481 + willpublish.propertyBuilder->writeContentType(contentType);
  482 + break;
  483 + }
  484 + case Mqtt5Properties::ResponseTopic:
  485 + {
  486 + const uint16_t len = readTwoBytesToUInt16();
  487 + const std::string responseTopic(readBytes(len), len);
  488 + willpublish.propertyBuilder->writeResponseTopic(responseTopic);
  489 + break;
  490 + }
  491 + case Mqtt5Properties::MessageExpiryInterval:
  492 + {
  493 + willpublish.createdAt = std::chrono::steady_clock::now();
  494 + uint32_t expiresAfter = readFourBytesToUint32();
  495 + willpublish.expiresAfter = std::chrono::seconds(expiresAfter);
  496 + break;
  497 + }
  498 + default:
  499 + break;
  500 + //throw ProtocolError("Invalid will property in connect.");
  501 + }
  502 + }
  503 + }
  504 +
437 uint16_t will_topic_length = readTwoBytesToUInt16(); 505 uint16_t will_topic_length = readTwoBytesToUInt16();
438 - will_topic = std::string(readBytes(will_topic_length), will_topic_length); 506 + willpublish.topic = std::string(readBytes(will_topic_length), will_topic_length);
439 507
440 uint16_t will_payload_length = readTwoBytesToUInt16(); 508 uint16_t will_payload_length = readTwoBytesToUInt16();
441 - will_payload = std::string(readBytes(will_payload_length), will_payload_length); 509 + willpublish.payload = std::string(readBytes(will_payload_length), will_payload_length);
442 } 510 }
443 if (user_name_flag) 511 if (user_name_flag)
444 { 512 {
@@ -455,7 +523,7 @@ void MqttPacket::handleConnect() @@ -455,7 +523,7 @@ void MqttPacket::handleConnect()
455 } 523 }
456 524
457 // The specs don't really say what to do when client id not UTF8, so including here. 525 // The specs don't really say what to do when client id not UTF8, so including here.
458 - if (!isValidUtf8(client_id) || !isValidUtf8(username) || !isValidUtf8(password) || !isValidUtf8(will_topic)) 526 + if (!isValidUtf8(client_id) || !isValidUtf8(username) || !isValidUtf8(password) || !isValidUtf8(willpublish.topic))
459 { 527 {
460 ConnAck connAck(protocolVersion, ReasonCodes::BadUserNameOrPassword); 528 ConnAck connAck(protocolVersion, ReasonCodes::BadUserNameOrPassword);
461 MqttPacket response(connAck); 529 MqttPacket response(connAck);
@@ -502,7 +570,7 @@ void MqttPacket::handleConnect() @@ -502,7 +570,7 @@ void MqttPacket::handleConnect()
502 } 570 }
503 571
504 sender->setClientProperties(protocolVersion, client_id, username, true, keep_alive, max_packet_size, max_topic_aliases); 572 sender->setClientProperties(protocolVersion, client_id, username, true, keep_alive, max_packet_size, max_topic_aliases);
505 - sender->setWill(will_topic, will_payload, will_retain, will_qos); 573 + sender->setWill(std::move(willpublish));
506 574
507 bool accessGranted = false; 575 bool accessGranted = false;
508 std::string denyLogMsg; 576 std::string denyLogMsg;
@@ -542,6 +610,7 @@ void MqttPacket::handleConnect() @@ -542,6 +610,7 @@ void MqttPacket::handleConnect()
542 connAck.propertyBuilder->writeAssignedClientId(client_id); 610 connAck.propertyBuilder->writeAssignedClientId(client_id);
543 connAck.propertyBuilder->writeMaxTopicAliases(max_topic_aliases); 611 connAck.propertyBuilder->writeMaxTopicAliases(max_topic_aliases);
544 connAck.propertyBuilder->writeWildcardSubscriptionAvailable(1); 612 connAck.propertyBuilder->writeWildcardSubscriptionAvailable(1);
  613 + connAck.propertyBuilder->writeSubscriptionIdentifiersAvailable(0);
545 connAck.propertyBuilder->writeSharedSubscriptionAvailable(0); 614 connAck.propertyBuilder->writeSharedSubscriptionAvailable(0);
546 } 615 }
547 616
mqttpacket.h
@@ -82,11 +82,13 @@ public: @@ -82,11 +82,13 @@ public:
82 82
83 std::shared_ptr<MqttPacket> getCopy(char new_max_qos) const; 83 std::shared_ptr<MqttPacket> getCopy(char new_max_qos) const;
84 84
  85 + size_t getRequiredSizeForPublish(const ProtocolVersion protocolVersion, const Publish &publish) const;
  86 +
85 // Constructor for outgoing packets. These may not allocate room for the fixed header, because we don't (always) know the length in advance. 87 // Constructor for outgoing packets. These may not allocate room for the fixed header, because we don't (always) know the length in advance.
86 MqttPacket(const ConnAck &connAck); 88 MqttPacket(const ConnAck &connAck);
87 MqttPacket(const SubAck &subAck); 89 MqttPacket(const SubAck &subAck);
88 MqttPacket(const UnsubAck &unsubAck); 90 MqttPacket(const UnsubAck &unsubAck);
89 - MqttPacket(const Publish &publish); 91 + MqttPacket(const ProtocolVersion protocolVersion, const Publish &publish);
90 MqttPacket(const PubAck &pubAck); 92 MqttPacket(const PubAck &pubAck);
91 MqttPacket(const PubRec &pubRec); 93 MqttPacket(const PubRec &pubRec);
92 MqttPacket(const PubComp &pubComp); 94 MqttPacket(const PubComp &pubComp);
@@ -22,6 +22,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -22,6 +22,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
22 #include <list> 22 #include <list>
23 #include <string> 23 #include <string>
24 #include <memory> 24 #include <memory>
  25 +#include <chrono>
25 26
26 #include "forward_declarations.h" 27 #include "forward_declarations.h"
27 28
@@ -194,6 +195,12 @@ public: @@ -194,6 +195,12 @@ public:
194 std::string payload; 195 std::string payload;
195 char qos = 0; 196 char qos = 0;
196 bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9] 197 bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9]
  198 + uint32_t will_delay = 0; // if will, this is the delay. Just storing here, to avoid having to make a WillMessage class
  199 + std::chrono::time_point<std::chrono::steady_clock> createdAt;
  200 + std::chrono::seconds expiresAfter;
  201 + std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder;
  202 +
  203 + Publish();
197 Publish(const std::string &topic, const std::string &payload, char qos); 204 Publish(const std::string &topic, const std::string &payload, char qos);
198 size_t getLengthWithoutFixedHeader() const; 205 size_t getLengthWithoutFixedHeader() const;
199 }; 206 };