From 3022c275e8f778c89ee97c58b2e0df8d04fdb904 Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Mon, 28 Mar 2022 21:24:05 +0200 Subject: [PATCH] Initiate topic alias to subscribers --- client.cpp | 18 +++++++++++++++++- client.h | 3 +++ mqtt5properties.cpp | 22 ++++++++++++++++------ mqtt5properties.h | 4 +++- mqttpacket.cpp | 8 +++++--- publishcopyfactory.cpp | 6 ++++-- publishcopyfactory.h | 2 +- settings.h | 3 ++- types.cpp | 20 +++++++++++++++----- types.h | 2 ++ 10 files changed, 68 insertions(+), 20 deletions(-) diff --git a/client.cpp b/client.cpp index c114f1a..a720d1e 100644 --- a/client.cpp +++ b/client.cpp @@ -208,7 +208,23 @@ int Client::writeMqttPacket(const MqttPacket &packet) int Client::writeMqttPacketAndBlameThisClient(PublishCopyFactory ©Factory, char max_qos, uint16_t packet_id) { - MqttPacket *p = copyFactory.getOptimumPacket(max_qos, this->protocolVersion); + const Settings *settings = ThreadGlobals::getSettings(); + uint16_t topic_alias = 0; + bool skip_topic = false; + + if (protocolVersion >= ProtocolVersion::Mqtt5 && settings->maxOutgoingTopicAliases > this->curOutgoingTopicAlias) + { + uint16_t &id = this->outgoingTopicAliases[copyFactory.getTopic()]; + + if (id > 0) + skip_topic = true; + else + id = ++this->curOutgoingTopicAlias; + + topic_alias = id; + } + + MqttPacket *p = copyFactory.getOptimumPacket(max_qos, this->protocolVersion, topic_alias, skip_topic); assert(p->getQos() <= max_qos); diff --git a/client.h b/client.h index 737b64b..d47654e 100644 --- a/client.h +++ b/client.h @@ -84,6 +84,9 @@ class Client std::unordered_map topicAliases; + uint16_t curOutgoingTopicAlias = 0; + std::unordered_map outgoingTopicAliases; + Logger *logger = Logger::getInstance(); void setReadyForWriting(bool val); diff --git a/mqtt5properties.cpp b/mqtt5properties.cpp index d66693d..d6a6c96 100644 --- a/mqtt5properties.cpp +++ b/mqtt5properties.cpp @@ -125,6 +125,11 @@ void Mqtt5PropertyBuilder::writeCorrelationData(const std::string &correlationDa writeStr(Mqtt5Properties::CorrelationData, correlationData); } +void Mqtt5PropertyBuilder::writeTopicAlias(const uint16_t id) +{ + writeUint16(Mqtt5Properties::TopicAlias, id, clientSpecificBytes); +} + void Mqtt5PropertyBuilder::setNewUserProperties(const std::shared_ptr>> &userProperties) { assert(!this->userProperties); @@ -134,7 +139,7 @@ void Mqtt5PropertyBuilder::setNewUserProperties(const std::shared_ptruserProperties = userProperties; } -void Mqtt5PropertyBuilder::writeUint32(Mqtt5Properties prop, const uint32_t x, std::vector &target) +void Mqtt5PropertyBuilder::writeUint32(Mqtt5Properties prop, const uint32_t x, std::vector &target) const { size_t pos = target.size(); const size_t newSize = pos + 5; @@ -154,16 +159,21 @@ void Mqtt5PropertyBuilder::writeUint32(Mqtt5Properties prop, const uint32_t x, s void Mqtt5PropertyBuilder::writeUint16(Mqtt5Properties prop, const uint16_t x) { - size_t pos = genericBytes.size(); + writeUint16(prop, x, this->genericBytes); +} + +void Mqtt5PropertyBuilder::writeUint16(Mqtt5Properties prop, const uint16_t x, std::vector &target) const +{ + size_t pos = target.size(); const size_t newSize = pos + 3; - genericBytes.resize(newSize); + target.resize(newSize); const uint8_t a = static_cast(x >> 8); const uint8_t b = static_cast(x); - genericBytes[pos++] = static_cast(prop); - genericBytes[pos++] = a; - genericBytes[pos] = b; + target[pos++] = static_cast(prop); + target[pos++] = a; + target[pos] = b; } void Mqtt5PropertyBuilder::writeUint8(Mqtt5Properties prop, const uint8_t x) diff --git a/mqtt5properties.h b/mqtt5properties.h index bdc3cc4..7560eb8 100644 --- a/mqtt5properties.h +++ b/mqtt5properties.h @@ -13,8 +13,9 @@ class Mqtt5PropertyBuilder std::shared_ptr>> userProperties; VariableByteInt length; - void writeUint32(Mqtt5Properties prop, const uint32_t x, std::vector &target); + void writeUint32(Mqtt5Properties prop, const uint32_t x, std::vector &target) const; void writeUint16(Mqtt5Properties prop, const uint16_t x); + void writeUint16(Mqtt5Properties prop, const uint16_t x, std::vector &target) const; void writeUint8(Mqtt5Properties prop, const uint8_t x); void writeStr(Mqtt5Properties prop, const std::string &str); void write2Str(Mqtt5Properties prop, const std::string &one, const std::string &two); @@ -43,6 +44,7 @@ public: void writeResponseTopic(const std::string &str); void writeUserProperty(std::string &&key, std::string &&value); void writeCorrelationData(const std::string &correlationData); + void writeTopicAlias(const uint16_t id); void setNewUserProperties(const std::shared_ptr>> &userProperties); }; diff --git a/mqttpacket.cpp b/mqttpacket.cpp index 689bc53..6ed226c 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -108,7 +108,8 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &_pu this->protocolVersion = protocolVersion; - this->publishData.topic = _publish.topic; + if (!_publish.skipTopic) + this->publishData.topic = _publish.topic; if (_publish.splitTopic) splitTopic(this->publishData.topic, this->publishData.subtopics); @@ -133,7 +134,7 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &_pu if (protocolVersion >= ProtocolVersion::Mqtt5) { // Step 1: make certain properties available as objects, because FlashMQ needs access to them for internal logic. - if (_publish.propertyBuilder) + if (_publish.propertyBuilder) // TODO: only do this when there are user properties. Otherwise we don't need it. { this->publishData.constructPropertyBuilder(); this->publishData.propertyBuilder->setNewUserProperties(_publish.propertyBuilder->getUserProperties()); @@ -343,7 +344,7 @@ void MqttPacket::handleConnect() uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient; uint32_t session_expire = settings.expireSessionsAfterSeconds > 0 ? settings.expireSessionsAfterSeconds : std::numeric_limits::max(); uint32_t max_packet_size = settings.maxPacketSize; - uint16_t max_topic_aliases = settings.maxTopicAliases; + uint16_t max_topic_aliases = settings.maxOutgoingTopicAliases; bool request_response_information = false; bool request_problem_information = false; @@ -820,6 +821,7 @@ void MqttPacket::handlePublish() case Mqtt5Properties::MessageExpiryInterval: publishData.createdAt = std::chrono::steady_clock::now(); publishData.expiresAfter = std::chrono::seconds(readFourBytesToUint32()); + break; case Mqtt5Properties::TopicAlias: { const uint16_t alias_id = readTwoBytesToUInt16(); diff --git a/publishcopyfactory.cpp b/publishcopyfactory.cpp index cc759cc..2e21bf0 100644 --- a/publishcopyfactory.cpp +++ b/publishcopyfactory.cpp @@ -17,15 +17,17 @@ PublishCopyFactory::PublishCopyFactory(Publish *publish) : } -MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion) +MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion, uint16_t topic_alias, bool skip_topic) { if (packet) { - if (packet->containsClientSpecificProperties()) + if (protocolVersion >= ProtocolVersion::Mqtt5 && (packet->containsClientSpecificProperties() || topic_alias > 0)) { Publish newPublish(packet->getPublishData()); newPublish.splitTopic = false; newPublish.qos = max_qos; + newPublish.topicAlias = topic_alias; + newPublish.skipTopic = skip_topic; newPublish.setClientSpecificProperties(); this->oneShotPacket = std::make_unique(protocolVersion, newPublish); return this->oneShotPacket.get(); diff --git a/publishcopyfactory.h b/publishcopyfactory.h index b35e3db..38734e6 100644 --- a/publishcopyfactory.h +++ b/publishcopyfactory.h @@ -29,7 +29,7 @@ public: PublishCopyFactory(const PublishCopyFactory &other) = delete; PublishCopyFactory(PublishCopyFactory &&other) = delete; - MqttPacket *getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion); + MqttPacket *getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion, uint16_t topic_alias, bool skip_topic); char getEffectiveQos(char max_qos) const; const std::string &getTopic() const; const std::vector &getSubtopics(); diff --git a/settings.h b/settings.h index e60c506..1cc0f84 100644 --- a/settings.h +++ b/settings.h @@ -42,7 +42,8 @@ public: bool authPluginSerializeAuthChecks = false; int clientInitialBufferSize = 1024; // Must be power of 2 int maxPacketSize = 268435461; // 256 MB + 5 - uint16_t maxTopicAliases = 65535; + uint16_t maxIncomingTopicAliases = 65535; + uint16_t maxOutgoingTopicAliases = 0; // TODO: setting, when I can confirm with clients that support it that it works. #ifdef TESTING bool logDebug = true; #else diff --git a/types.cpp b/types.cpp index d6866d1..76b88eb 100644 --- a/types.cpp +++ b/types.cpp @@ -117,7 +117,8 @@ PublishBase::PublishBase(const std::string &topic, const std::string &payload, c size_t PublishBase::getLengthWithoutFixedHeader() const { - int result = topic.length() + payload.length() + 2; + const int topicLength = this->skipTopic ? 0 : topic.length(); + int result = topicLength + payload.length() + 2; if (qos) result += 2; @@ -131,14 +132,23 @@ size_t PublishBase::getLengthWithoutFixedHeader() const */ void PublishBase::setClientSpecificProperties() { + if (this->createdAt.time_since_epoch().count() && this->topicAlias == 0) + return; + if (propertyBuilder) propertyBuilder->clearClientSpecificBytes(); + else + propertyBuilder = std::make_shared(); - auto now = std::chrono::steady_clock::now(); - std::chrono::seconds newExpiresAfter = std::chrono::duration_cast(now - createdAt); - - if (newExpiresAfter.count() > 0) + if (createdAt.time_since_epoch().count() > 0) + { + auto now = std::chrono::steady_clock::now(); + std::chrono::seconds newExpiresAfter = std::chrono::duration_cast(now - createdAt); propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count()); + } + + if (topicAlias > 0) + propertyBuilder->writeTopicAlias(this->topicAlias); } void PublishBase::constructPropertyBuilder() diff --git a/types.h b/types.h index 503b61a..6c7de0c 100644 --- a/types.h +++ b/types.h @@ -206,6 +206,8 @@ public: bool splitTopic = true; std::chrono::time_point createdAt; std::chrono::seconds expiresAfter; + uint16_t topicAlias = 0; + bool skipTopic = false; std::shared_ptr propertyBuilder; // Only contains data for sending, not receiving PublishBase() = default; -- libgit2 0.21.4