Commit 3022c275e8f778c89ee97c58b2e0df8d04fdb904

Authored by Wiebe Cazemier
1 parent 9dab7a48

Initiate topic alias to subscribers

I think it's correct, but mosquitto_sub doesn't seem to support it, so I
can't test.

Also some other stuff I happen to see.
client.cpp
@@ -208,7 +208,23 @@ int Client::writeMqttPacket(const MqttPacket &packet) @@ -208,7 +208,23 @@ int Client::writeMqttPacket(const MqttPacket &packet)
208 208
209 int Client::writeMqttPacketAndBlameThisClient(PublishCopyFactory &copyFactory, char max_qos, uint16_t packet_id) 209 int Client::writeMqttPacketAndBlameThisClient(PublishCopyFactory &copyFactory, char max_qos, uint16_t packet_id)
210 { 210 {
211 - MqttPacket *p = copyFactory.getOptimumPacket(max_qos, this->protocolVersion); 211 + const Settings *settings = ThreadGlobals::getSettings();
  212 + uint16_t topic_alias = 0;
  213 + bool skip_topic = false;
  214 +
  215 + if (protocolVersion >= ProtocolVersion::Mqtt5 && settings->maxOutgoingTopicAliases > this->curOutgoingTopicAlias)
  216 + {
  217 + uint16_t &id = this->outgoingTopicAliases[copyFactory.getTopic()];
  218 +
  219 + if (id > 0)
  220 + skip_topic = true;
  221 + else
  222 + id = ++this->curOutgoingTopicAlias;
  223 +
  224 + topic_alias = id;
  225 + }
  226 +
  227 + MqttPacket *p = copyFactory.getOptimumPacket(max_qos, this->protocolVersion, topic_alias, skip_topic);
212 228
213 assert(p->getQos() <= max_qos); 229 assert(p->getQos() <= max_qos);
214 230
client.h
@@ -84,6 +84,9 @@ class Client @@ -84,6 +84,9 @@ class Client
84 84
85 std::unordered_map<uint16_t, std::string> topicAliases; 85 std::unordered_map<uint16_t, std::string> topicAliases;
86 86
  87 + uint16_t curOutgoingTopicAlias = 0;
  88 + std::unordered_map<std::string, uint16_t> outgoingTopicAliases;
  89 +
87 Logger *logger = Logger::getInstance(); 90 Logger *logger = Logger::getInstance();
88 91
89 void setReadyForWriting(bool val); 92 void setReadyForWriting(bool val);
mqtt5properties.cpp
@@ -125,6 +125,11 @@ void Mqtt5PropertyBuilder::writeCorrelationData(const std::string &amp;correlationDa @@ -125,6 +125,11 @@ void Mqtt5PropertyBuilder::writeCorrelationData(const std::string &amp;correlationDa
125 writeStr(Mqtt5Properties::CorrelationData, correlationData); 125 writeStr(Mqtt5Properties::CorrelationData, correlationData);
126 } 126 }
127 127
  128 +void Mqtt5PropertyBuilder::writeTopicAlias(const uint16_t id)
  129 +{
  130 + writeUint16(Mqtt5Properties::TopicAlias, id, clientSpecificBytes);
  131 +}
  132 +
128 void Mqtt5PropertyBuilder::setNewUserProperties(const std::shared_ptr<std::vector<std::pair<std::string, std::string>>> &userProperties) 133 void Mqtt5PropertyBuilder::setNewUserProperties(const std::shared_ptr<std::vector<std::pair<std::string, std::string>>> &userProperties)
129 { 134 {
130 assert(!this->userProperties); 135 assert(!this->userProperties);
@@ -134,7 +139,7 @@ void Mqtt5PropertyBuilder::setNewUserProperties(const std::shared_ptr&lt;std::vecto @@ -134,7 +139,7 @@ void Mqtt5PropertyBuilder::setNewUserProperties(const std::shared_ptr&lt;std::vecto
134 this->userProperties = userProperties; 139 this->userProperties = userProperties;
135 } 140 }
136 141
137 -void Mqtt5PropertyBuilder::writeUint32(Mqtt5Properties prop, const uint32_t x, std::vector<char> &target) 142 +void Mqtt5PropertyBuilder::writeUint32(Mqtt5Properties prop, const uint32_t x, std::vector<char> &target) const
138 { 143 {
139 size_t pos = target.size(); 144 size_t pos = target.size();
140 const size_t newSize = pos + 5; 145 const size_t newSize = pos + 5;
@@ -154,16 +159,21 @@ void Mqtt5PropertyBuilder::writeUint32(Mqtt5Properties prop, const uint32_t x, s @@ -154,16 +159,21 @@ void Mqtt5PropertyBuilder::writeUint32(Mqtt5Properties prop, const uint32_t x, s
154 159
155 void Mqtt5PropertyBuilder::writeUint16(Mqtt5Properties prop, const uint16_t x) 160 void Mqtt5PropertyBuilder::writeUint16(Mqtt5Properties prop, const uint16_t x)
156 { 161 {
157 - size_t pos = genericBytes.size(); 162 + writeUint16(prop, x, this->genericBytes);
  163 +}
  164 +
  165 +void Mqtt5PropertyBuilder::writeUint16(Mqtt5Properties prop, const uint16_t x, std::vector<char> &target) const
  166 +{
  167 + size_t pos = target.size();
158 const size_t newSize = pos + 3; 168 const size_t newSize = pos + 3;
159 - genericBytes.resize(newSize); 169 + target.resize(newSize);
160 170
161 const uint8_t a = static_cast<uint8_t>(x >> 8); 171 const uint8_t a = static_cast<uint8_t>(x >> 8);
162 const uint8_t b = static_cast<uint8_t>(x); 172 const uint8_t b = static_cast<uint8_t>(x);
163 173
164 - genericBytes[pos++] = static_cast<uint8_t>(prop);  
165 - genericBytes[pos++] = a;  
166 - genericBytes[pos] = b; 174 + target[pos++] = static_cast<uint8_t>(prop);
  175 + target[pos++] = a;
  176 + target[pos] = b;
167 } 177 }
168 178
169 void Mqtt5PropertyBuilder::writeUint8(Mqtt5Properties prop, const uint8_t x) 179 void Mqtt5PropertyBuilder::writeUint8(Mqtt5Properties prop, const uint8_t x)
mqtt5properties.h
@@ -13,8 +13,9 @@ class Mqtt5PropertyBuilder @@ -13,8 +13,9 @@ class Mqtt5PropertyBuilder
13 std::shared_ptr<std::vector<std::pair<std::string, std::string>>> userProperties; 13 std::shared_ptr<std::vector<std::pair<std::string, std::string>>> userProperties;
14 VariableByteInt length; 14 VariableByteInt length;
15 15
16 - void writeUint32(Mqtt5Properties prop, const uint32_t x, std::vector<char> &target); 16 + void writeUint32(Mqtt5Properties prop, const uint32_t x, std::vector<char> &target) const;
17 void writeUint16(Mqtt5Properties prop, const uint16_t x); 17 void writeUint16(Mqtt5Properties prop, const uint16_t x);
  18 + void writeUint16(Mqtt5Properties prop, const uint16_t x, std::vector<char> &target) const;
18 void writeUint8(Mqtt5Properties prop, const uint8_t x); 19 void writeUint8(Mqtt5Properties prop, const uint8_t x);
19 void writeStr(Mqtt5Properties prop, const std::string &str); 20 void writeStr(Mqtt5Properties prop, const std::string &str);
20 void write2Str(Mqtt5Properties prop, const std::string &one, const std::string &two); 21 void write2Str(Mqtt5Properties prop, const std::string &one, const std::string &two);
@@ -43,6 +44,7 @@ public: @@ -43,6 +44,7 @@ public:
43 void writeResponseTopic(const std::string &str); 44 void writeResponseTopic(const std::string &str);
44 void writeUserProperty(std::string &&key, std::string &&value); 45 void writeUserProperty(std::string &&key, std::string &&value);
45 void writeCorrelationData(const std::string &correlationData); 46 void writeCorrelationData(const std::string &correlationData);
  47 + void writeTopicAlias(const uint16_t id);
46 void setNewUserProperties(const std::shared_ptr<std::vector<std::pair<std::string, std::string>>> &userProperties); 48 void setNewUserProperties(const std::shared_ptr<std::vector<std::pair<std::string, std::string>>> &userProperties);
47 }; 49 };
48 50
mqttpacket.cpp
@@ -108,7 +108,8 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &amp;_pu @@ -108,7 +108,8 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &amp;_pu
108 108
109 this->protocolVersion = protocolVersion; 109 this->protocolVersion = protocolVersion;
110 110
111 - this->publishData.topic = _publish.topic; 111 + if (!_publish.skipTopic)
  112 + this->publishData.topic = _publish.topic;
112 113
113 if (_publish.splitTopic) 114 if (_publish.splitTopic)
114 splitTopic(this->publishData.topic, this->publishData.subtopics); 115 splitTopic(this->publishData.topic, this->publishData.subtopics);
@@ -133,7 +134,7 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &amp;_pu @@ -133,7 +134,7 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &amp;_pu
133 if (protocolVersion >= ProtocolVersion::Mqtt5) 134 if (protocolVersion >= ProtocolVersion::Mqtt5)
134 { 135 {
135 // Step 1: make certain properties available as objects, because FlashMQ needs access to them for internal logic. 136 // Step 1: make certain properties available as objects, because FlashMQ needs access to them for internal logic.
136 - if (_publish.propertyBuilder) 137 + if (_publish.propertyBuilder) // TODO: only do this when there are user properties. Otherwise we don't need it.
137 { 138 {
138 this->publishData.constructPropertyBuilder(); 139 this->publishData.constructPropertyBuilder();
139 this->publishData.propertyBuilder->setNewUserProperties(_publish.propertyBuilder->getUserProperties()); 140 this->publishData.propertyBuilder->setNewUserProperties(_publish.propertyBuilder->getUserProperties());
@@ -343,7 +344,7 @@ void MqttPacket::handleConnect() @@ -343,7 +344,7 @@ void MqttPacket::handleConnect()
343 uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient; 344 uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient;
344 uint32_t session_expire = settings.expireSessionsAfterSeconds > 0 ? settings.expireSessionsAfterSeconds : std::numeric_limits<uint32_t>::max(); 345 uint32_t session_expire = settings.expireSessionsAfterSeconds > 0 ? settings.expireSessionsAfterSeconds : std::numeric_limits<uint32_t>::max();
345 uint32_t max_packet_size = settings.maxPacketSize; 346 uint32_t max_packet_size = settings.maxPacketSize;
346 - uint16_t max_topic_aliases = settings.maxTopicAliases; 347 + uint16_t max_topic_aliases = settings.maxOutgoingTopicAliases;
347 bool request_response_information = false; 348 bool request_response_information = false;
348 bool request_problem_information = false; 349 bool request_problem_information = false;
349 350
@@ -820,6 +821,7 @@ void MqttPacket::handlePublish() @@ -820,6 +821,7 @@ void MqttPacket::handlePublish()
820 case Mqtt5Properties::MessageExpiryInterval: 821 case Mqtt5Properties::MessageExpiryInterval:
821 publishData.createdAt = std::chrono::steady_clock::now(); 822 publishData.createdAt = std::chrono::steady_clock::now();
822 publishData.expiresAfter = std::chrono::seconds(readFourBytesToUint32()); 823 publishData.expiresAfter = std::chrono::seconds(readFourBytesToUint32());
  824 + break;
823 case Mqtt5Properties::TopicAlias: 825 case Mqtt5Properties::TopicAlias:
824 { 826 {
825 const uint16_t alias_id = readTwoBytesToUInt16(); 827 const uint16_t alias_id = readTwoBytesToUInt16();
publishcopyfactory.cpp
@@ -17,15 +17,17 @@ PublishCopyFactory::PublishCopyFactory(Publish *publish) : @@ -17,15 +17,17 @@ PublishCopyFactory::PublishCopyFactory(Publish *publish) :
17 17
18 } 18 }
19 19
20 -MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion) 20 +MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion, uint16_t topic_alias, bool skip_topic)
21 { 21 {
22 if (packet) 22 if (packet)
23 { 23 {
24 - if (packet->containsClientSpecificProperties()) 24 + if (protocolVersion >= ProtocolVersion::Mqtt5 && (packet->containsClientSpecificProperties() || topic_alias > 0))
25 { 25 {
26 Publish newPublish(packet->getPublishData()); 26 Publish newPublish(packet->getPublishData());
27 newPublish.splitTopic = false; 27 newPublish.splitTopic = false;
28 newPublish.qos = max_qos; 28 newPublish.qos = max_qos;
  29 + newPublish.topicAlias = topic_alias;
  30 + newPublish.skipTopic = skip_topic;
29 newPublish.setClientSpecificProperties(); 31 newPublish.setClientSpecificProperties();
30 this->oneShotPacket = std::make_unique<MqttPacket>(protocolVersion, newPublish); 32 this->oneShotPacket = std::make_unique<MqttPacket>(protocolVersion, newPublish);
31 return this->oneShotPacket.get(); 33 return this->oneShotPacket.get();
publishcopyfactory.h
@@ -29,7 +29,7 @@ public: @@ -29,7 +29,7 @@ public:
29 PublishCopyFactory(const PublishCopyFactory &other) = delete; 29 PublishCopyFactory(const PublishCopyFactory &other) = delete;
30 PublishCopyFactory(PublishCopyFactory &&other) = delete; 30 PublishCopyFactory(PublishCopyFactory &&other) = delete;
31 31
32 - MqttPacket *getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion); 32 + MqttPacket *getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion, uint16_t topic_alias, bool skip_topic);
33 char getEffectiveQos(char max_qos) const; 33 char getEffectiveQos(char max_qos) const;
34 const std::string &getTopic() const; 34 const std::string &getTopic() const;
35 const std::vector<std::string> &getSubtopics(); 35 const std::vector<std::string> &getSubtopics();
settings.h
@@ -42,7 +42,8 @@ public: @@ -42,7 +42,8 @@ public:
42 bool authPluginSerializeAuthChecks = false; 42 bool authPluginSerializeAuthChecks = false;
43 int clientInitialBufferSize = 1024; // Must be power of 2 43 int clientInitialBufferSize = 1024; // Must be power of 2
44 int maxPacketSize = 268435461; // 256 MB + 5 44 int maxPacketSize = 268435461; // 256 MB + 5
45 - uint16_t maxTopicAliases = 65535; 45 + uint16_t maxIncomingTopicAliases = 65535;
  46 + uint16_t maxOutgoingTopicAliases = 0; // TODO: setting, when I can confirm with clients that support it that it works.
46 #ifdef TESTING 47 #ifdef TESTING
47 bool logDebug = true; 48 bool logDebug = true;
48 #else 49 #else
types.cpp
@@ -117,7 +117,8 @@ PublishBase::PublishBase(const std::string &amp;topic, const std::string &amp;payload, c @@ -117,7 +117,8 @@ PublishBase::PublishBase(const std::string &amp;topic, const std::string &amp;payload, c
117 117
118 size_t PublishBase::getLengthWithoutFixedHeader() const 118 size_t PublishBase::getLengthWithoutFixedHeader() const
119 { 119 {
120 - int result = topic.length() + payload.length() + 2; 120 + const int topicLength = this->skipTopic ? 0 : topic.length();
  121 + int result = topicLength + payload.length() + 2;
121 122
122 if (qos) 123 if (qos)
123 result += 2; 124 result += 2;
@@ -131,14 +132,23 @@ size_t PublishBase::getLengthWithoutFixedHeader() const @@ -131,14 +132,23 @@ size_t PublishBase::getLengthWithoutFixedHeader() const
131 */ 132 */
132 void PublishBase::setClientSpecificProperties() 133 void PublishBase::setClientSpecificProperties()
133 { 134 {
  135 + if (this->createdAt.time_since_epoch().count() && this->topicAlias == 0)
  136 + return;
  137 +
134 if (propertyBuilder) 138 if (propertyBuilder)
135 propertyBuilder->clearClientSpecificBytes(); 139 propertyBuilder->clearClientSpecificBytes();
  140 + else
  141 + propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>();
136 142
137 - auto now = std::chrono::steady_clock::now();  
138 - std::chrono::seconds newExpiresAfter = std::chrono::duration_cast<std::chrono::seconds>(now - createdAt);  
139 -  
140 - if (newExpiresAfter.count() > 0) 143 + if (createdAt.time_since_epoch().count() > 0)
  144 + {
  145 + auto now = std::chrono::steady_clock::now();
  146 + std::chrono::seconds newExpiresAfter = std::chrono::duration_cast<std::chrono::seconds>(now - createdAt);
141 propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count()); 147 propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count());
  148 + }
  149 +
  150 + if (topicAlias > 0)
  151 + propertyBuilder->writeTopicAlias(this->topicAlias);
142 } 152 }
143 153
144 void PublishBase::constructPropertyBuilder() 154 void PublishBase::constructPropertyBuilder()
@@ -206,6 +206,8 @@ public: @@ -206,6 +206,8 @@ public:
206 bool splitTopic = true; 206 bool splitTopic = true;
207 std::chrono::time_point<std::chrono::steady_clock> createdAt; 207 std::chrono::time_point<std::chrono::steady_clock> createdAt;
208 std::chrono::seconds expiresAfter; 208 std::chrono::seconds expiresAfter;
  209 + uint16_t topicAlias = 0;
  210 + bool skipTopic = false;
209 std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder; // Only contains data for sending, not receiving 211 std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder; // Only contains data for sending, not receiving
210 212
211 PublishBase() = default; 213 PublishBase() = default;