Commit c32f063dc5f86f81eb5e6155aae653442e1a6485
1 parent
a6d7395e
more progress
Showing
6 changed files
with
46 additions
and
6 deletions
mqtt5properties.cpp
| @@ -152,7 +152,7 @@ void Mqtt5PropertyBuilder::writeStr(Mqtt5Properties prop, const std::string &str | @@ -152,7 +152,7 @@ void Mqtt5PropertyBuilder::writeStr(Mqtt5Properties prop, const std::string &str | ||
| 152 | const uint16_t strlen = str.length(); | 152 | const uint16_t strlen = str.length(); |
| 153 | 153 | ||
| 154 | size_t pos = genericBytes.size(); | 154 | size_t pos = genericBytes.size(); |
| 155 | - const size_t newSize = pos + strlen + 2; | 155 | + const size_t newSize = pos + strlen + 3; |
| 156 | genericBytes.resize(newSize); | 156 | genericBytes.resize(newSize); |
| 157 | 157 | ||
| 158 | const uint8_t a = static_cast<uint8_t>(strlen >> 8); | 158 | const uint8_t a = static_cast<uint8_t>(strlen >> 8); |
mqttpacket.cpp
| @@ -33,6 +33,7 @@ MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_lengt | @@ -33,6 +33,7 @@ MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_lengt | ||
| 33 | assert(packet_len > 0); | 33 | assert(packet_len > 0); |
| 34 | buf.read(bites.data(), packet_len); | 34 | buf.read(bites.data(), packet_len); |
| 35 | 35 | ||
| 36 | + protocolVersion = sender->getProtocolVersion(); | ||
| 36 | first_byte = bites[0]; | 37 | first_byte = bites[0]; |
| 37 | unsigned char _packetType = (first_byte & 0xF0) >> 4; | 38 | unsigned char _packetType = (first_byte & 0xF0) >> 4; |
| 38 | packetType = (PacketType)_packetType; | 39 | packetType = (PacketType)_packetType; |
| @@ -758,10 +759,12 @@ void MqttPacket::handlePublish() | @@ -758,10 +759,12 @@ void MqttPacket::handlePublish() | ||
| 758 | const size_t proplen = decodeVariableByteIntAtPos(); | 759 | const size_t proplen = decodeVariableByteIntAtPos(); |
| 759 | const size_t prop_end_at = pos + proplen; | 760 | const size_t prop_end_at = pos + proplen; |
| 760 | 761 | ||
| 762 | + if (proplen > 0) | ||
| 763 | + publishData.propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>(); | ||
| 764 | + | ||
| 761 | while (pos < prop_end_at) | 765 | while (pos < prop_end_at) |
| 762 | { | 766 | { |
| 763 | const Mqtt5Properties prop = static_cast<Mqtt5Properties>(readByte()); | 767 | const Mqtt5Properties prop = static_cast<Mqtt5Properties>(readByte()); |
| 764 | - publishData.propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>(); | ||
| 765 | 768 | ||
| 766 | switch (prop) | 769 | switch (prop) |
| 767 | { | 770 | { |
| @@ -1131,6 +1134,21 @@ const Publish &MqttPacket::getPublishData() | @@ -1131,6 +1134,21 @@ const Publish &MqttPacket::getPublishData() | ||
| 1131 | return publishData; | 1134 | return publishData; |
| 1132 | } | 1135 | } |
| 1133 | 1136 | ||
| 1137 | +bool MqttPacket::containsClientSpecificProperties() const | ||
| 1138 | +{ | ||
| 1139 | + assert(packetType == PacketType::PUBLISH); | ||
| 1140 | + | ||
| 1141 | + if (protocolVersion <= ProtocolVersion::Mqtt311 || !publishData.propertyBuilder) | ||
| 1142 | + return false; | ||
| 1143 | + | ||
| 1144 | + if (publishData.createdAt.time_since_epoch().count() == 0) // TODO: better | ||
| 1145 | + { | ||
| 1146 | + return true; | ||
| 1147 | + } | ||
| 1148 | + | ||
| 1149 | + return false; | ||
| 1150 | +} | ||
| 1151 | + | ||
| 1134 | void MqttPacket::readIntoBuf(CirBuf &buf) const | 1152 | void MqttPacket::readIntoBuf(CirBuf &buf) const |
| 1135 | { | 1153 | { |
| 1136 | assert(packetType != PacketType::PUBLISH || (first_byte & 0b00000110) >> 1 == publishData.qos); | 1154 | assert(packetType != PacketType::PUBLISH || (first_byte & 0b00000110) >> 1 == publishData.qos); |
mqttpacket.h
| @@ -122,6 +122,7 @@ public: | @@ -122,6 +122,7 @@ public: | ||
| 122 | bool getRetain() const; | 122 | bool getRetain() const; |
| 123 | void setRetain(); | 123 | void setRetain(); |
| 124 | const Publish &getPublishData(); | 124 | const Publish &getPublishData(); |
| 125 | + bool containsClientSpecificProperties() const; | ||
| 125 | }; | 126 | }; |
| 126 | 127 | ||
| 127 | #endif // MQTTPACKET_H | 128 | #endif // MQTTPACKET_H |
publishcopyfactory.cpp
| @@ -21,7 +21,16 @@ MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const Proto | @@ -21,7 +21,16 @@ MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const Proto | ||
| 21 | { | 21 | { |
| 22 | if (packet) | 22 | if (packet) |
| 23 | { | 23 | { |
| 24 | - // TODO: you can't do this when there are client specific mqtt5 properties | 24 | + if (packet->containsClientSpecificProperties()) |
| 25 | + { | ||
| 26 | + Publish newPublish(packet->getPublishData()); | ||
| 27 | + newPublish.splitTopic = false; | ||
| 28 | + newPublish.qos = max_qos; | ||
| 29 | + newPublish.setClientSpecificProperties(); | ||
| 30 | + this->oneShotPacket = std::make_unique<MqttPacket>(protocolVersion, newPublish); | ||
| 31 | + return this->oneShotPacket.get(); | ||
| 32 | + } | ||
| 33 | + | ||
| 25 | if (packet->getProtocolVersion() == protocolVersion && orgQos == max_qos) | 34 | if (packet->getProtocolVersion() == protocolVersion && orgQos == max_qos) |
| 26 | { | 35 | { |
| 27 | assert(orgQos == packet->getQos()); | 36 | assert(orgQos == packet->getQos()); |
types.cpp
| @@ -126,7 +126,18 @@ void PublishBase::setClientSpecificProperties() | @@ -126,7 +126,18 @@ void PublishBase::setClientSpecificProperties() | ||
| 126 | { | 126 | { |
| 127 | if (propertyBuilder) | 127 | if (propertyBuilder) |
| 128 | propertyBuilder->clearClientSpecificBytes(); | 128 | propertyBuilder->clearClientSpecificBytes(); |
| 129 | - // TODO. Expires at? | 129 | + |
| 130 | + auto now = std::chrono::steady_clock::now(); | ||
| 131 | + std::chrono::seconds newExpiresAfter = std::chrono::duration_cast<std::chrono::seconds>(now - createdAt); | ||
| 132 | + | ||
| 133 | + if (newExpiresAfter.count() > 0) | ||
| 134 | + propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count()); | ||
| 135 | +} | ||
| 136 | + | ||
| 137 | +bool PublishBase::hasExpired() const | ||
| 138 | +{ | ||
| 139 | + auto now = std::chrono::steady_clock::now(); | ||
| 140 | + return (createdAt + expiresAfter) > now; | ||
| 130 | } | 141 | } |
| 131 | 142 | ||
| 132 | Publish::Publish(const Publish &other) : | 143 | Publish::Publish(const Publish &other) : |
types.h
| @@ -67,7 +67,7 @@ enum class Mqtt5Properties | @@ -67,7 +67,7 @@ enum class Mqtt5Properties | ||
| 67 | SubscriptionIdentifier = 11, | 67 | SubscriptionIdentifier = 11, |
| 68 | SessionExpiryInterval = 17, | 68 | SessionExpiryInterval = 17, |
| 69 | AssignedClientIdentifier = 18, | 69 | AssignedClientIdentifier = 18, |
| 70 | - ServerKeepAlive = 13, | 70 | + ServerKeepAlive = 19, |
| 71 | AuthenticationMethod = 21, | 71 | AuthenticationMethod = 21, |
| 72 | AuthenticationData = 22, | 72 | AuthenticationData = 22, |
| 73 | RequestProblemInformation = 23, | 73 | RequestProblemInformation = 23, |
| @@ -203,12 +203,13 @@ public: | @@ -203,12 +203,13 @@ public: | ||
| 203 | bool splitTopic = true; | 203 | bool splitTopic = true; |
| 204 | std::chrono::time_point<std::chrono::steady_clock> createdAt; | 204 | std::chrono::time_point<std::chrono::steady_clock> createdAt; |
| 205 | std::chrono::seconds expiresAfter; | 205 | std::chrono::seconds expiresAfter; |
| 206 | - std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder; | 206 | + std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder; // Only contains data for sending, not receiving |
| 207 | 207 | ||
| 208 | PublishBase() = default; | 208 | PublishBase() = default; |
| 209 | PublishBase(const std::string &topic, const std::string &payload, char qos); | 209 | PublishBase(const std::string &topic, const std::string &payload, char qos); |
| 210 | size_t getLengthWithoutFixedHeader() const; | 210 | size_t getLengthWithoutFixedHeader() const; |
| 211 | void setClientSpecificProperties(); | 211 | void setClientSpecificProperties(); |
| 212 | + bool hasExpired() const; | ||
| 212 | }; | 213 | }; |
| 213 | 214 | ||
| 214 | class Publish : public PublishBase | 215 | class Publish : public PublishBase |