diff --git a/mqttpacket.cpp b/mqttpacket.cpp index 25a5830..f285373 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -141,10 +141,6 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, Publish &_publish) if (!_publish.skipTopic) this->publishData.topic = _publish.topic; - // We often don't need to split because we already did the ACL checks and subscriber searching. But we do split on fresh publishes like wills and $SYS messages. - if (_publish.splitTopic) - splitTopic(this->publishData.topic, this->publishData.subtopics); - packetType = PacketType::PUBLISH; this->publishData.qos = _publish.qos; first_byte = static_cast(packetType) << 4; @@ -164,7 +160,7 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, Publish &_publish) if (protocolVersion >= ProtocolVersion::Mqtt5) { // Step 1: make certain properties available as objects, because FlashMQ needs access to them for internal logic (only ACL checking at this point). - if (_publish.splitTopic && _publish.hasUserProperties()) + if (_publish.hasUserProperties()) { this->publishData.constructPropertyBuilder(); this->publishData.propertyBuilder->setNewUserProperties(_publish.propertyBuilder->getUserProperties()); @@ -1266,14 +1262,12 @@ void MqttPacket::handlePublish() if (publishData.qos == 2) sender->getSession()->addIncomingQoS2MessageId(_packet_id); - splitTopic(publishData.topic, publishData.subtopics); - - if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), publishData.topic, publishData.subtopics, AclAccess::write, publishData.qos, publishData.retain, getUserProperties()) == AuthResult::success) + if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), publishData.topic, publishData.getSubtopics(), AclAccess::write, publishData.qos, publishData.retain, getUserProperties()) == AuthResult::success) { if (publishData.retain) { publishData.payload = getPayloadCopy(); - MainApp::getMainApp()->getSubscriptionStore()->setRetainedMessage(publishData, publishData.subtopics); + MainApp::getMainApp()->getSubscriptionStore()->setRetainedMessage(publishData, publishData.getSubtopics()); } // Set dup flag to 0, because that must not be propagated [MQTT-3.3.1-3]. @@ -1556,16 +1550,11 @@ const std::string &MqttPacket::getTopic() const return this->publishData.topic; } -/** - * @brief MqttPacket::getSubtopics returns a pointer to the parsed subtopics. Use with care! - * @return a pointer to a vector of subtopics that will be overwritten the next packet! - */ -const std::vector &MqttPacket::getSubtopics() const +const std::vector &MqttPacket::getSubtopics() { - return this->publishData.subtopics; + return this->publishData.getSubtopics(); } - std::shared_ptr MqttPacket::getSender() const { return sender; @@ -1734,10 +1723,7 @@ std::string MqttPacket::readBytesToString(bool validateUtf8, bool alsoCheckInval const std::vector> *MqttPacket::getUserProperties() const { - if (this->publishData.propertyBuilder) - return this->publishData.propertyBuilder->getUserProperties().get(); - - return nullptr; + return this->publishData.getUserProperties(); } bool MqttPacket::getRetain() const diff --git a/mqttpacket.h b/mqttpacket.h index 5984158..6c67509 100644 --- a/mqttpacket.h +++ b/mqttpacket.h @@ -147,7 +147,7 @@ public: void setQos(const char new_qos); ProtocolVersion getProtocolVersion() const { return protocolVersion;} const std::string &getTopic() const; - const std::vector &getSubtopics() const; + const std::vector &getSubtopics(); std::shared_ptr getSender() const; void setSender(const std::shared_ptr &value); bool containsFixedHeader() const; diff --git a/publishcopyfactory.cpp b/publishcopyfactory.cpp index d75044f..81de4ae 100644 --- a/publishcopyfactory.cpp +++ b/publishcopyfactory.cpp @@ -24,7 +24,6 @@ MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const Proto if (protocolVersion >= ProtocolVersion::Mqtt5 && (packet->containsClientSpecificProperties() || topic_alias > 0)) { Publish newPublish(packet->getPublishData()); - newPublish.splitTopic = false; newPublish.qos = max_qos; newPublish.topicAlias = topic_alias; newPublish.skipTopic = skip_topic; @@ -44,7 +43,6 @@ MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const Proto if (!cachedPack) { Publish newPublish(packet->getPublishData()); - newPublish.splitTopic = false; newPublish.qos = max_qos; cachedPack = std::make_unique(protocolVersion, newPublish); } @@ -77,14 +75,11 @@ const std::vector &PublishCopyFactory::getSubtopics() { if (packet) { - assert(!packet->getSubtopics().empty()); return packet->getSubtopics(); } else if (publish) { - if (publish->subtopics.empty()) - splitTopic(publish->topic, publish->subtopics); - return publish->subtopics; + return publish->getSubtopics(); } throw std::runtime_error("Bug in &PublishCopyFactory::getSubtopics()"); @@ -131,10 +126,5 @@ const std::vector > *PublishCopyFactory::get assert(publish); - if (publish->propertyBuilder) - { - return publish->propertyBuilder->getUserProperties().get(); - } - - return nullptr; + return publish->getUserProperties(); } diff --git a/qospacketqueue.cpp b/qospacketqueue.cpp index 7bb2f1c..84fb967 100644 --- a/qospacketqueue.cpp +++ b/qospacketqueue.cpp @@ -77,7 +77,6 @@ void QoSPublishQueue::queuePublish(PublishCopyFactory ©Factory, uint16_t id, assert(id > 0); Publish pub = copyFactory.getNewPublish(); - pub.splitTopic = false; queue.emplace_back(std::move(pub), id); qosQueueBytes += queue.back().getApproximateMemoryFootprint(); } @@ -86,7 +85,6 @@ void QoSPublishQueue::queuePublish(Publish &&pub, uint16_t id) { assert(id > 0); - pub.splitTopic = false; queue.emplace_back(std::move(pub), id); qosQueueBytes += queue.back().getApproximateMemoryFootprint(); } diff --git a/retainedmessage.cpp b/retainedmessage.cpp index afd403b..c4cd5cd 100644 --- a/retainedmessage.cpp +++ b/retainedmessage.cpp @@ -21,7 +21,6 @@ RetainedMessage::RetainedMessage(const Publish &publish) : publish(publish) { this->publish.retain = true; - this->publish.splitTopic = false; } bool RetainedMessage::operator==(const RetainedMessage &rhs) const diff --git a/sessionsandsubscriptionsdb.cpp b/sessionsandsubscriptionsdb.cpp index 87f001f..cb39e24 100644 --- a/sessionsandsubscriptionsdb.cpp +++ b/sessionsandsubscriptionsdb.cpp @@ -270,7 +270,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vectorretain) - setRetainedMessage(*p, p->subtopics); + setRetainedMessage(*p, p->getSubtopics()); s->clearWill(); } @@ -361,7 +361,7 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr &wil queuePacketAtSubscribers(factory); if (willMessage->retain) - setRetainedMessage(*willMessage.get(), (*willMessage.get()).subtopics); + setRetainedMessage(*willMessage.get(), (*willMessage).getSubtopics()); // Avoid sending two immediate wills when a session is destroyed with the client disconnect. if (session) // session is null when you're destroying a client before a session is assigned. @@ -876,8 +876,7 @@ void SubscriptionStore::loadRetainedMessages(const std::string &filePath) std::vector subtopics; for (RetainedMessage &rm : messages) { - splitTopic(rm.publish.topic, rm.publish.subtopics); - setRetainedMessage(rm.publish, rm.publish.subtopics); + setRetainedMessage(rm.publish, rm.publish.getSubtopics()); } } catch (PersistenceFileCantBeOpened &ex) diff --git a/types.cpp b/types.cpp index d17ffaa..b46cf52 100644 --- a/types.cpp +++ b/types.cpp @@ -21,6 +21,8 @@ License along with FlashMQ. If not, see . #include "mqtt5properties.h" #include "mqttpacket.h" +#include "utils.h" + ConnAck::ConnAck(const ProtocolVersion protVersion, ReasonCodes return_code, bool session_present) : protocol_version(protVersion), session_present(session_present) @@ -195,6 +197,14 @@ bool PublishBase::hasExpired() const return (age > expiresAfter); } +const std::vector> *PublishBase::getUserProperties() const +{ + if (this->propertyBuilder) + return this->propertyBuilder->getUserProperties().get(); + + return nullptr; +} + void PublishBase::setExpireAfter(uint32_t s) { this->createdAt = std::chrono::steady_clock::now(); @@ -224,6 +234,14 @@ Publish::Publish(const std::string &topic, const std::string &payload, char qos) } +const std::vector &Publish::getSubtopics() +{ + if (subtopics.empty()) + splitTopic(this->topic, this->subtopics); + + return this->subtopics; +} + WillPublish::WillPublish(const Publish &other) : Publish(other) { diff --git a/types.h b/types.h index c421a96..74f39da 100644 --- a/types.h +++ b/types.h @@ -203,7 +203,6 @@ public: std::string payload; char qos = 0; bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9] - bool splitTopic = true; uint16_t topicAlias = 0; bool skipTopic = false; std::shared_ptr propertyBuilder; // Only contains data for sending, not receiving @@ -216,6 +215,7 @@ public: void constructPropertyBuilder(); bool hasUserProperties() const; bool hasExpired() const; + const std::vector> *getUserProperties() const; void setExpireAfter(uint32_t s); bool getHasExpireInfo() const; @@ -224,12 +224,14 @@ public: class Publish : public PublishBase { -public: std::vector subtopics; +public: Publish() = default; Publish(const Publish &other); Publish(const std::string &topic, const std::string &payload, char qos); + + const std::vector &getSubtopics(); }; class WillPublish : public Publish