Commit e61b7fbfe35eaefdbe767039a7524179f1a3cef1

Authored by Wiebe Cazemier
1 parent 9a946d14

Change topic splitting to lazy evaluation

Only split when they're needed, instead of pre-determining whether I'll
need the subtopics.

This makes the coming refactor of authentication easier.

Also treated user properties with the same brush a bit. Downside: it now
always assigns user properties to newly constructed publish objects,
even if they are not needed (because the generated packet may only be
needed for writing to the client's output buffer). But determining the
flow of when and when not they'll be needed is impossible with the
coming authentication refactor.
mqttpacket.cpp
@@ -141,10 +141,6 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, Publish &_publish) @@ -141,10 +141,6 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, Publish &_publish)
141 if (!_publish.skipTopic) 141 if (!_publish.skipTopic)
142 this->publishData.topic = _publish.topic; 142 this->publishData.topic = _publish.topic;
143 143
144 - // We often don't need to split because we already did the ACL checks and subscriber searching. But we do split on fresh publishes like wills and $SYS messages.  
145 - if (_publish.splitTopic)  
146 - splitTopic(this->publishData.topic, this->publishData.subtopics);  
147 -  
148 packetType = PacketType::PUBLISH; 144 packetType = PacketType::PUBLISH;
149 this->publishData.qos = _publish.qos; 145 this->publishData.qos = _publish.qos;
150 first_byte = static_cast<char>(packetType) << 4; 146 first_byte = static_cast<char>(packetType) << 4;
@@ -164,7 +160,7 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, Publish &amp;_publish) @@ -164,7 +160,7 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, Publish &amp;_publish)
164 if (protocolVersion >= ProtocolVersion::Mqtt5) 160 if (protocolVersion >= ProtocolVersion::Mqtt5)
165 { 161 {
166 // Step 1: make certain properties available as objects, because FlashMQ needs access to them for internal logic (only ACL checking at this point). 162 // Step 1: make certain properties available as objects, because FlashMQ needs access to them for internal logic (only ACL checking at this point).
167 - if (_publish.splitTopic && _publish.hasUserProperties()) 163 + if (_publish.hasUserProperties())
168 { 164 {
169 this->publishData.constructPropertyBuilder(); 165 this->publishData.constructPropertyBuilder();
170 this->publishData.propertyBuilder->setNewUserProperties(_publish.propertyBuilder->getUserProperties()); 166 this->publishData.propertyBuilder->setNewUserProperties(_publish.propertyBuilder->getUserProperties());
@@ -1266,14 +1262,12 @@ void MqttPacket::handlePublish() @@ -1266,14 +1262,12 @@ void MqttPacket::handlePublish()
1266 if (publishData.qos == 2) 1262 if (publishData.qos == 2)
1267 sender->getSession()->addIncomingQoS2MessageId(_packet_id); 1263 sender->getSession()->addIncomingQoS2MessageId(_packet_id);
1268 1264
1269 - splitTopic(publishData.topic, publishData.subtopics);  
1270 -  
1271 - if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), publishData.topic, publishData.subtopics, AclAccess::write, publishData.qos, publishData.retain, getUserProperties()) == AuthResult::success) 1265 + if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), publishData.topic, publishData.getSubtopics(), AclAccess::write, publishData.qos, publishData.retain, getUserProperties()) == AuthResult::success)
1272 { 1266 {
1273 if (publishData.retain) 1267 if (publishData.retain)
1274 { 1268 {
1275 publishData.payload = getPayloadCopy(); 1269 publishData.payload = getPayloadCopy();
1276 - MainApp::getMainApp()->getSubscriptionStore()->setRetainedMessage(publishData, publishData.subtopics); 1270 + MainApp::getMainApp()->getSubscriptionStore()->setRetainedMessage(publishData, publishData.getSubtopics());
1277 } 1271 }
1278 1272
1279 // Set dup flag to 0, because that must not be propagated [MQTT-3.3.1-3]. 1273 // Set dup flag to 0, because that must not be propagated [MQTT-3.3.1-3].
@@ -1556,16 +1550,11 @@ const std::string &amp;MqttPacket::getTopic() const @@ -1556,16 +1550,11 @@ const std::string &amp;MqttPacket::getTopic() const
1556 return this->publishData.topic; 1550 return this->publishData.topic;
1557 } 1551 }
1558 1552
1559 -/**  
1560 - * @brief MqttPacket::getSubtopics returns a pointer to the parsed subtopics. Use with care!  
1561 - * @return a pointer to a vector of subtopics that will be overwritten the next packet!  
1562 - */  
1563 -const std::vector<std::string> &MqttPacket::getSubtopics() const 1553 +const std::vector<std::string> &MqttPacket::getSubtopics()
1564 { 1554 {
1565 - return this->publishData.subtopics; 1555 + return this->publishData.getSubtopics();
1566 } 1556 }
1567 1557
1568 -  
1569 std::shared_ptr<Client> MqttPacket::getSender() const 1558 std::shared_ptr<Client> MqttPacket::getSender() const
1570 { 1559 {
1571 return sender; 1560 return sender;
@@ -1734,10 +1723,7 @@ std::string MqttPacket::readBytesToString(bool validateUtf8, bool alsoCheckInval @@ -1734,10 +1723,7 @@ std::string MqttPacket::readBytesToString(bool validateUtf8, bool alsoCheckInval
1734 1723
1735 const std::vector<std::pair<std::string, std::string>> *MqttPacket::getUserProperties() const 1724 const std::vector<std::pair<std::string, std::string>> *MqttPacket::getUserProperties() const
1736 { 1725 {
1737 - if (this->publishData.propertyBuilder)  
1738 - return this->publishData.propertyBuilder->getUserProperties().get();  
1739 -  
1740 - return nullptr; 1726 + return this->publishData.getUserProperties();
1741 } 1727 }
1742 1728
1743 bool MqttPacket::getRetain() const 1729 bool MqttPacket::getRetain() const
mqttpacket.h
@@ -147,7 +147,7 @@ public: @@ -147,7 +147,7 @@ public:
147 void setQos(const char new_qos); 147 void setQos(const char new_qos);
148 ProtocolVersion getProtocolVersion() const { return protocolVersion;} 148 ProtocolVersion getProtocolVersion() const { return protocolVersion;}
149 const std::string &getTopic() const; 149 const std::string &getTopic() const;
150 - const std::vector<std::string> &getSubtopics() const; 150 + const std::vector<std::string> &getSubtopics();
151 std::shared_ptr<Client> getSender() const; 151 std::shared_ptr<Client> getSender() const;
152 void setSender(const std::shared_ptr<Client> &value); 152 void setSender(const std::shared_ptr<Client> &value);
153 bool containsFixedHeader() const; 153 bool containsFixedHeader() const;
publishcopyfactory.cpp
@@ -24,7 +24,6 @@ MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const Proto @@ -24,7 +24,6 @@ MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const Proto
24 if (protocolVersion >= ProtocolVersion::Mqtt5 && (packet->containsClientSpecificProperties() || topic_alias > 0)) 24 if (protocolVersion >= ProtocolVersion::Mqtt5 && (packet->containsClientSpecificProperties() || topic_alias > 0))
25 { 25 {
26 Publish newPublish(packet->getPublishData()); 26 Publish newPublish(packet->getPublishData());
27 - newPublish.splitTopic = false;  
28 newPublish.qos = max_qos; 27 newPublish.qos = max_qos;
29 newPublish.topicAlias = topic_alias; 28 newPublish.topicAlias = topic_alias;
30 newPublish.skipTopic = skip_topic; 29 newPublish.skipTopic = skip_topic;
@@ -44,7 +43,6 @@ MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const Proto @@ -44,7 +43,6 @@ MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const Proto
44 if (!cachedPack) 43 if (!cachedPack)
45 { 44 {
46 Publish newPublish(packet->getPublishData()); 45 Publish newPublish(packet->getPublishData());
47 - newPublish.splitTopic = false;  
48 newPublish.qos = max_qos; 46 newPublish.qos = max_qos;
49 cachedPack = std::make_unique<MqttPacket>(protocolVersion, newPublish); 47 cachedPack = std::make_unique<MqttPacket>(protocolVersion, newPublish);
50 } 48 }
@@ -77,14 +75,11 @@ const std::vector&lt;std::string&gt; &amp;PublishCopyFactory::getSubtopics() @@ -77,14 +75,11 @@ const std::vector&lt;std::string&gt; &amp;PublishCopyFactory::getSubtopics()
77 { 75 {
78 if (packet) 76 if (packet)
79 { 77 {
80 - assert(!packet->getSubtopics().empty());  
81 return packet->getSubtopics(); 78 return packet->getSubtopics();
82 } 79 }
83 else if (publish) 80 else if (publish)
84 { 81 {
85 - if (publish->subtopics.empty())  
86 - splitTopic(publish->topic, publish->subtopics);  
87 - return publish->subtopics; 82 + return publish->getSubtopics();
88 } 83 }
89 84
90 throw std::runtime_error("Bug in &PublishCopyFactory::getSubtopics()"); 85 throw std::runtime_error("Bug in &PublishCopyFactory::getSubtopics()");
@@ -131,10 +126,5 @@ const std::vector&lt;std::pair&lt;std::string, std::string&gt; &gt; *PublishCopyFactory::get @@ -131,10 +126,5 @@ const std::vector&lt;std::pair&lt;std::string, std::string&gt; &gt; *PublishCopyFactory::get
131 126
132 assert(publish); 127 assert(publish);
133 128
134 - if (publish->propertyBuilder)  
135 - {  
136 - return publish->propertyBuilder->getUserProperties().get();  
137 - }  
138 -  
139 - return nullptr; 129 + return publish->getUserProperties();
140 } 130 }
qospacketqueue.cpp
@@ -77,7 +77,6 @@ void QoSPublishQueue::queuePublish(PublishCopyFactory &amp;copyFactory, uint16_t id, @@ -77,7 +77,6 @@ void QoSPublishQueue::queuePublish(PublishCopyFactory &amp;copyFactory, uint16_t id,
77 assert(id > 0); 77 assert(id > 0);
78 78
79 Publish pub = copyFactory.getNewPublish(); 79 Publish pub = copyFactory.getNewPublish();
80 - pub.splitTopic = false;  
81 queue.emplace_back(std::move(pub), id); 80 queue.emplace_back(std::move(pub), id);
82 qosQueueBytes += queue.back().getApproximateMemoryFootprint(); 81 qosQueueBytes += queue.back().getApproximateMemoryFootprint();
83 } 82 }
@@ -86,7 +85,6 @@ void QoSPublishQueue::queuePublish(Publish &amp;&amp;pub, uint16_t id) @@ -86,7 +85,6 @@ void QoSPublishQueue::queuePublish(Publish &amp;&amp;pub, uint16_t id)
86 { 85 {
87 assert(id > 0); 86 assert(id > 0);
88 87
89 - pub.splitTopic = false;  
90 queue.emplace_back(std::move(pub), id); 88 queue.emplace_back(std::move(pub), id);
91 qosQueueBytes += queue.back().getApproximateMemoryFootprint(); 89 qosQueueBytes += queue.back().getApproximateMemoryFootprint();
92 } 90 }
retainedmessage.cpp
@@ -21,7 +21,6 @@ RetainedMessage::RetainedMessage(const Publish &amp;publish) : @@ -21,7 +21,6 @@ RetainedMessage::RetainedMessage(const Publish &amp;publish) :
21 publish(publish) 21 publish(publish)
22 { 22 {
23 this->publish.retain = true; 23 this->publish.retain = true;
24 - this->publish.splitTopic = false;  
25 } 24 }
26 25
27 bool RetainedMessage::operator==(const RetainedMessage &rhs) const 26 bool RetainedMessage::operator==(const RetainedMessage &rhs) const
sessionsandsubscriptionsdb.cpp
@@ -270,7 +270,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -270,7 +270,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
270 270
271 Publish &pub = p.getPublish(); 271 Publish &pub = p.getPublish();
272 272
273 - assert(!pub.splitTopic);  
274 assert(!pub.skipTopic); 273 assert(!pub.skipTopic);
275 assert(pub.topicAlias == 0); 274 assert(pub.topicAlias == 0);
276 275
subscriptionstore.cpp
@@ -329,7 +329,7 @@ void SubscriptionStore::sendQueuedWillMessages() @@ -329,7 +329,7 @@ void SubscriptionStore::sendQueuedWillMessages()
329 queuePacketAtSubscribers(factory); 329 queuePacketAtSubscribers(factory);
330 330
331 if (p->retain) 331 if (p->retain)
332 - setRetainedMessage(*p, p->subtopics); 332 + setRetainedMessage(*p, p->getSubtopics());
333 333
334 s->clearWill(); 334 s->clearWill();
335 } 335 }
@@ -361,7 +361,7 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr&lt;WillPublish&gt; &amp;wil @@ -361,7 +361,7 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr&lt;WillPublish&gt; &amp;wil
361 queuePacketAtSubscribers(factory); 361 queuePacketAtSubscribers(factory);
362 362
363 if (willMessage->retain) 363 if (willMessage->retain)
364 - setRetainedMessage(*willMessage.get(), (*willMessage.get()).subtopics); 364 + setRetainedMessage(*willMessage.get(), (*willMessage).getSubtopics());
365 365
366 // Avoid sending two immediate wills when a session is destroyed with the client disconnect. 366 // Avoid sending two immediate wills when a session is destroyed with the client disconnect.
367 if (session) // session is null when you're destroying a client before a session is assigned. 367 if (session) // session is null when you're destroying a client before a session is assigned.
@@ -876,8 +876,7 @@ void SubscriptionStore::loadRetainedMessages(const std::string &amp;filePath) @@ -876,8 +876,7 @@ void SubscriptionStore::loadRetainedMessages(const std::string &amp;filePath)
876 std::vector<std::string> subtopics; 876 std::vector<std::string> subtopics;
877 for (RetainedMessage &rm : messages) 877 for (RetainedMessage &rm : messages)
878 { 878 {
879 - splitTopic(rm.publish.topic, rm.publish.subtopics);  
880 - setRetainedMessage(rm.publish, rm.publish.subtopics); 879 + setRetainedMessage(rm.publish, rm.publish.getSubtopics());
881 } 880 }
882 } 881 }
883 catch (PersistenceFileCantBeOpened &ex) 882 catch (PersistenceFileCantBeOpened &ex)
types.cpp
@@ -21,6 +21,8 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -21,6 +21,8 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
21 #include "mqtt5properties.h" 21 #include "mqtt5properties.h"
22 #include "mqttpacket.h" 22 #include "mqttpacket.h"
23 23
  24 +#include "utils.h"
  25 +
24 ConnAck::ConnAck(const ProtocolVersion protVersion, ReasonCodes return_code, bool session_present) : 26 ConnAck::ConnAck(const ProtocolVersion protVersion, ReasonCodes return_code, bool session_present) :
25 protocol_version(protVersion), 27 protocol_version(protVersion),
26 session_present(session_present) 28 session_present(session_present)
@@ -195,6 +197,14 @@ bool PublishBase::hasExpired() const @@ -195,6 +197,14 @@ bool PublishBase::hasExpired() const
195 return (age > expiresAfter); 197 return (age > expiresAfter);
196 } 198 }
197 199
  200 +const std::vector<std::pair<std::string, std::string>> *PublishBase::getUserProperties() const
  201 +{
  202 + if (this->propertyBuilder)
  203 + return this->propertyBuilder->getUserProperties().get();
  204 +
  205 + return nullptr;
  206 +}
  207 +
198 void PublishBase::setExpireAfter(uint32_t s) 208 void PublishBase::setExpireAfter(uint32_t s)
199 { 209 {
200 this->createdAt = std::chrono::steady_clock::now(); 210 this->createdAt = std::chrono::steady_clock::now();
@@ -224,6 +234,14 @@ Publish::Publish(const std::string &amp;topic, const std::string &amp;payload, char qos) @@ -224,6 +234,14 @@ Publish::Publish(const std::string &amp;topic, const std::string &amp;payload, char qos)
224 234
225 } 235 }
226 236
  237 +const std::vector<std::string> &Publish::getSubtopics()
  238 +{
  239 + if (subtopics.empty())
  240 + splitTopic(this->topic, this->subtopics);
  241 +
  242 + return this->subtopics;
  243 +}
  244 +
227 WillPublish::WillPublish(const Publish &other) : 245 WillPublish::WillPublish(const Publish &other) :
228 Publish(other) 246 Publish(other)
229 { 247 {
@@ -203,7 +203,6 @@ public: @@ -203,7 +203,6 @@ public:
203 std::string payload; 203 std::string payload;
204 char qos = 0; 204 char qos = 0;
205 bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9] 205 bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9]
206 - bool splitTopic = true;  
207 uint16_t topicAlias = 0; 206 uint16_t topicAlias = 0;
208 bool skipTopic = false; 207 bool skipTopic = false;
209 std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder; // Only contains data for sending, not receiving 208 std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder; // Only contains data for sending, not receiving
@@ -216,6 +215,7 @@ public: @@ -216,6 +215,7 @@ public:
216 void constructPropertyBuilder(); 215 void constructPropertyBuilder();
217 bool hasUserProperties() const; 216 bool hasUserProperties() const;
218 bool hasExpired() const; 217 bool hasExpired() const;
  218 + const std::vector<std::pair<std::string, std::string>> *getUserProperties() const;
219 219
220 void setExpireAfter(uint32_t s); 220 void setExpireAfter(uint32_t s);
221 bool getHasExpireInfo() const; 221 bool getHasExpireInfo() const;
@@ -224,12 +224,14 @@ public: @@ -224,12 +224,14 @@ public:
224 224
225 class Publish : public PublishBase 225 class Publish : public PublishBase
226 { 226 {
227 -public:  
228 std::vector<std::string> subtopics; 227 std::vector<std::string> subtopics;
229 228
  229 +public:
230 Publish() = default; 230 Publish() = default;
231 Publish(const Publish &other); 231 Publish(const Publish &other);
232 Publish(const std::string &topic, const std::string &payload, char qos); 232 Publish(const std::string &topic, const std::string &payload, char qos);
  233 +
  234 + const std::vector<std::string> &getSubtopics();
233 }; 235 };
234 236
235 class WillPublish : public Publish 237 class WillPublish : public Publish