Commit 940d7edcfc79174cff791b336000dbc2025a58c1

Authored by Wiebe Cazemier
1 parent 592e12eb

Replace the thread_local global subtopic memory

It caused typical global variable issues, showing in the retained
messages recursive alghorithm breaking, because the referenced subtopics
changed half way (see previous commit of the test for it).

I need to perform some benchmarks to see if I need to devise an
alternative.
mqttpacket.cpp
@@ -24,10 +24,6 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>. @@ -24,10 +24,6 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>.
24 #include "utils.h" 24 #include "utils.h"
25 #include "threadauth.h" 25 #include "threadauth.h"
26 26
27 -// We can void constant reallocation of space for parsed subtopics by using this. But, beware to only use it during handling of the current  
28 -// packet. Don't access it for a stored packet, because then it will have changed.  
29 -thread_local std::vector<std::string> gSubtopics;  
30 -  
31 RemainingLength::RemainingLength() 27 RemainingLength::RemainingLength()
32 { 28 {
33 memset(bytes, 0, 4); 29 memset(bytes, 0, 4);
@@ -110,8 +106,7 @@ MqttPacket::MqttPacket(const Publish &amp;publish) : @@ -110,8 +106,7 @@ MqttPacket::MqttPacket(const Publish &amp;publish) :
110 } 106 }
111 107
112 this->topic = publish.topic; 108 this->topic = publish.topic;
113 - this->subtopics = &gSubtopics;  
114 - splitTopic(this->topic, gSubtopics); 109 + splitTopic(this->topic, subtopics);
115 110
116 packetType = PacketType::PUBLISH; 111 packetType = PacketType::PUBLISH;
117 this->qos = publish.qos; 112 this->qos = publish.qos;
@@ -417,7 +412,6 @@ void MqttPacket::handleDisconnect() @@ -417,7 +412,6 @@ void MqttPacket::handleDisconnect()
417 412
418 void MqttPacket::handleSubscribe() 413 void MqttPacket::handleSubscribe()
419 { 414 {
420 - this->subtopics = &gSubtopics;  
421 const char firstByteFirstNibble = (first_byte & 0x0F); 415 const char firstByteFirstNibble = (first_byte & 0x0F);
422 416
423 if (firstByteFirstNibble != 2) 417 if (firstByteFirstNibble != 2)
@@ -449,11 +443,11 @@ void MqttPacket::handleSubscribe() @@ -449,11 +443,11 @@ void MqttPacket::handleSubscribe()
449 if (qos > 2) 443 if (qos > 2)
450 throw ProtocolError("QoS is greater than 2, and/or reserved bytes in QoS field are not 0."); 444 throw ProtocolError("QoS is greater than 2, and/or reserved bytes in QoS field are not 0.");
451 445
452 - splitTopic(topic, *subtopics);  
453 - if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, *subtopics, AclAccess::subscribe, qos, false) == AuthResult::success) 446 + splitTopic(topic, subtopics);
  447 + if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, subtopics, AclAccess::subscribe, qos, false) == AuthResult::success)
454 { 448 {
455 logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribed to '%s' QoS %d", sender->repr().c_str(), topic.c_str(), qos); 449 logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribed to '%s' QoS %d", sender->repr().c_str(), topic.c_str(), qos);
456 - sender->getThreadData()->getSubscriptionStore()->addSubscription(sender, topic, *subtopics, qos); 450 + sender->getThreadData()->getSubscriptionStore()->addSubscription(sender, topic, subtopics, qos);
457 subs_reponse_codes.push_back(qos); 451 subs_reponse_codes.push_back(qos);
458 } 452 }
459 else 453 else
@@ -477,7 +471,6 @@ void MqttPacket::handleSubscribe() @@ -477,7 +471,6 @@ void MqttPacket::handleSubscribe()
477 SubAck subAck(packet_id, subs_reponse_codes); 471 SubAck subAck(packet_id, subs_reponse_codes);
478 MqttPacket response(subAck); 472 MqttPacket response(subAck);
479 sender->writeMqttPacket(response); 473 sender->writeMqttPacket(response);
480 - this->subtopics = nullptr;  
481 } 474 }
482 475
483 void MqttPacket::handleUnsubscribe() 476 void MqttPacket::handleUnsubscribe()
@@ -540,8 +533,7 @@ void MqttPacket::handlePublish() @@ -540,8 +533,7 @@ void MqttPacket::handlePublish()
540 throw ProtocolError("Duplicate flag is set for QoS 0 packet. This is illegal."); 533 throw ProtocolError("Duplicate flag is set for QoS 0 packet. This is illegal.");
541 534
542 topic = std::string(readBytes(variable_header_length), variable_header_length); 535 topic = std::string(readBytes(variable_header_length), variable_header_length);
543 - subtopics = &gSubtopics;  
544 - splitTopic(topic, gSubtopics); 536 + splitTopic(topic, subtopics);
545 537
546 if (!isValidUtf8(topic, true)) 538 if (!isValidUtf8(topic, true))
547 { 539 {
@@ -593,12 +585,12 @@ void MqttPacket::handlePublish() @@ -593,12 +585,12 @@ void MqttPacket::handlePublish()
593 payloadStart = pos; 585 payloadStart = pos;
594 586
595 Authentication &authentication = *ThreadAuth::getAuth(); 587 Authentication &authentication = *ThreadAuth::getAuth();
596 - if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, *subtopics, AclAccess::write, qos, retain) == AuthResult::success) 588 + if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, subtopics, AclAccess::write, qos, retain) == AuthResult::success)
597 { 589 {
598 if (retain) 590 if (retain)
599 { 591 {
600 std::string payload(readBytes(payloadLen), payloadLen); 592 std::string payload(readBytes(payloadLen), payloadLen);
601 - sender->getThreadData()->getSubscriptionStore()->setRetainedMessage(topic, *subtopics, payload, qos); 593 + sender->getThreadData()->getSubscriptionStore()->setRetainedMessage(topic, subtopics, payload, qos);
602 } 594 }
603 595
604 // Set dup flag to 0, because that must not be propagated [MQTT-3.3.1-3]. 596 // Set dup flag to 0, because that must not be propagated [MQTT-3.3.1-3].
@@ -607,9 +599,8 @@ void MqttPacket::handlePublish() @@ -607,9 +599,8 @@ void MqttPacket::handlePublish()
607 first_byte = bites[0]; 599 first_byte = bites[0];
608 600
609 // For the existing clients, we can just write the same packet back out, with our small alterations. 601 // For the existing clients, we can just write the same packet back out, with our small alterations.
610 - sender->getThreadData()->getSubscriptionStore()->queuePacketAtSubscribers(*subtopics, *this); 602 + sender->getThreadData()->getSubscriptionStore()->queuePacketAtSubscribers(subtopics, *this);
611 } 603 }
612 - this->subtopics = nullptr;  
613 } 604 }
614 605
615 void MqttPacket::handlePubAck() 606 void MqttPacket::handlePubAck()
@@ -767,7 +758,7 @@ const std::string &amp;MqttPacket::getTopic() const @@ -767,7 +758,7 @@ const std::string &amp;MqttPacket::getTopic() const
767 * @brief MqttPacket::getSubtopics returns a pointer to the parsed subtopics. Use with care! 758 * @brief MqttPacket::getSubtopics returns a pointer to the parsed subtopics. Use with care!
768 * @return a pointer to a vector of subtopics that will be overwritten the next packet! 759 * @return a pointer to a vector of subtopics that will be overwritten the next packet!
769 */ 760 */
770 -const std::vector<std::string> *MqttPacket::getSubtopics() const 761 +const std::vector<std::string> &MqttPacket::getSubtopics() const
771 { 762 {
772 return this->subtopics; 763 return this->subtopics;
773 } 764 }
mqttpacket.h
@@ -48,7 +48,7 @@ class MqttPacket @@ -48,7 +48,7 @@ class MqttPacket
48 #endif 48 #endif
49 49
50 std::string topic; 50 std::string topic;
51 - std::vector<std::string> *subtopics = nullptr; 51 + std::vector<std::string> subtopics;
52 std::vector<char> bites; 52 std::vector<char> bites;
53 size_t fixed_header_length = 0; // if 0, this packet does not contain the bytes of the fixed header. 53 size_t fixed_header_length = 0; // if 0, this packet does not contain the bytes of the fixed header.
54 RemainingLength remainingLength; 54 RemainingLength remainingLength;
@@ -108,7 +108,7 @@ public: @@ -108,7 +108,7 @@ public:
108 const std::vector<char> &getBites() const { return bites; } 108 const std::vector<char> &getBites() const { return bites; }
109 char getQos() const { return qos; } 109 char getQos() const { return qos; }
110 const std::string &getTopic() const; 110 const std::string &getTopic() const;
111 - const std::vector<std::string> *getSubtopics() const; 111 + const std::vector<std::string> &getSubtopics() const;
112 std::shared_ptr<Client> getSender() const; 112 std::shared_ptr<Client> getSender() const;
113 void setSender(const std::shared_ptr<Client> &value); 113 void setSender(const std::shared_ptr<Client> &value);
114 bool containsFixedHeader() const; 114 bool containsFixedHeader() const;
session.cpp
@@ -129,7 +129,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos, bool retain, u @@ -129,7 +129,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos, bool retain, u
129 Authentication *_auth = ThreadAuth::getAuth(); 129 Authentication *_auth = ThreadAuth::getAuth();
130 assert(_auth); 130 assert(_auth);
131 Authentication &auth = *_auth; 131 Authentication &auth = *_auth;
132 - if (auth.aclCheck(client_id, username, packet.getTopic(), *packet.getSubtopics(), AclAccess::read, qos, retain) == AuthResult::success) 132 + if (auth.aclCheck(client_id, username, packet.getTopic(), packet.getSubtopics(), AclAccess::read, qos, retain) == AuthResult::success)
133 { 133 {
134 if (qos == 0) 134 if (qos == 0)
135 { 135 {