diff --git a/FlashMQTests/tst_maintests.cpp b/FlashMQTests/tst_maintests.cpp index daea2c8..2c85a9f 100644 --- a/FlashMQTests/tst_maintests.cpp +++ b/FlashMQTests/tst_maintests.cpp @@ -1148,6 +1148,7 @@ void testCopyPacketHelper(const std::string &topic, char from_qos, char to_qos, QCOMPARE(stagingPacketOne.getTopic(), parsedPacketOne.getTopic()); QCOMPARE(stagingPacketOne.getPayloadCopy(), parsedPacketOne.getPayloadCopy()); + /* std::shared_ptr copiedPacketOne = parsedPacketOne.getCopy(to_qos); QCOMPARE(payloadOne, copiedPacketOne->getPayloadCopy()); @@ -1166,6 +1167,7 @@ void testCopyPacketHelper(const std::string &topic, char from_qos, char to_qos, copiedPacketOne->readIntoBuf(bufOfCopied); QVERIFY2(bufOfCopied == bufOfReference, formatString("Failure on length %d for topic %s, from qos %d to qos %d, retain: %d.", len, topic.c_str(), from_qos, to_qos, retain).c_str()); + */ } } diff --git a/mqttpacket.cpp b/mqttpacket.cpp index b77bf61..3db7c9e 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -39,77 +39,6 @@ MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_lengt pos += fixed_header_length; } -/** - * @brief MqttPacket::getCopy (using default copy constructor and resetting some selected fields) is easier than using the copy constructor - * publically, because then I have to keep maintaining a functioning copy constructor for each new field I add. - * @return a shared pointer because that's typically how we need it; we only need to copy it if we pass it around as shared resource. - * - * The idea is that because a packet with QoS is longer than one without, we just copy as much as possible if both packets have the same QoS. - * - * Note that there can be two types of packets: one with the fixed header (including remaining length), and one without. The latter we could be - * more clever about, but I'm forgoing that right now. Their use is mostly for retained messages. - * - * Also note that some fields are undeterminstic in the copy: dup, retain and packetid for instance. Sometimes they come from the original, - * sometimes not. The current planned usage is that those fields will either ONLY or NEVER be used in the copy, so it doesn't matter what I do - * with them here. I may reconsider. - */ -std::shared_ptr MqttPacket::getCopy(char new_max_qos) const -{ - assert(packetType == PacketType::PUBLISH); - - // You're not supposed to copy a duplicate packet. The only packets that get the dup flag, should not be copied AGAIN. This - // has to do with the Session::writePacket() and Session::sendPendingQosMessages() logic. - assert((first_byte & 0b00001000) == 0); - - if (publishData.qos > 0 && new_max_qos == 0) - { - // if shrinking the packet doesn't alter the amount of bytes in the 'remaining length' part of the header, we can - // just memmove+shrink the packet. This is because the packet id always is two bytes before the payload, so we just move the payload - // over it. When testing 100M copies, it went from 21000 ms to 10000 ms. In other words, about 200 ns to 100 ns per copy. - // There is an elaborate unit test to test this optimization. - if ((fixed_header_length == 2 && bites.size() < 125)) - { - // I don't know yet if this is true, but I don't want to forget when I implemenet MQTT5. - assert(sender && sender->getProtocolVersion() <= ProtocolVersion::Mqtt311); - - std::shared_ptr p(new MqttPacket(*this)); - p->sender.reset(); - - if (payloadLen > 0) - std::memmove(&p->bites[packet_id_pos], &p->bites[packet_id_pos+2], payloadLen); - p->bites.erase(p->bites.end() - 2, p->bites.end()); - p->packet_id_pos = 0; - p->publishData.qos = 0; - p->payloadStart -= 2; - if (pos > p->bites.size()) // pos can possible be set elsewhere, so we only set it back if it was after the payload. - p->pos -= 2; - p->packet_id = 0; - - // Clear QoS bits from the header. - p->first_byte &= 0b11111001; - p->bites[0] = p->first_byte; - - assert((p->bites[1] & 0b10000000) == 0); // when there is an MSB, I musn't get rid of it. - assert(p->bites[1] > 3); // There has to be a remaining value after subtracting 2. - - p->bites[1] -= 2; // Reduce the value in the 'remaining length' part of the header. - - return p; - } - - Publish pub(publishData.topic, getPayloadCopy(), new_max_qos); - pub.retain = getRetain(); - std::shared_ptr copyPacket(new MqttPacket(ProtocolVersion::Mqtt311, pub)); // TODO: don't hard-code the protocol version. - return copyPacket; - } - - std::shared_ptr copyPacket(new MqttPacket(*this)); - copyPacket->sender.reset(); - if (publishData.qos != new_max_qos) - copyPacket->setQos(new_max_qos); - return copyPacket; -} - MqttPacket::MqttPacket(const ConnAck &connAck) : bites(connAck.getLengthWithoutFixedHeader()) { @@ -174,7 +103,9 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &_pu this->protocolVersion = protocolVersion; this->publishData.topic = _publish.topic; - splitTopic(this->publishData.topic, this->publishData.subtopics); // TODO: I think I can make this conditional, because the (planned) use will already have used the subtopics. + + if (_publish.splitTopic) + splitTopic(this->publishData.topic, this->publishData.subtopics); packetType = PacketType::PUBLISH; this->publishData.qos = _publish.qos; @@ -1197,7 +1128,7 @@ void MqttPacket::setRetain() } } -Publish *MqttPacket::getPublish() +Publish *MqttPacket::getPublishData() { if (payloadLen > 0 && publishData.payload.empty()) publishData.payload = getPayloadCopy(); diff --git a/mqttpacket.h b/mqttpacket.h index 97d827f..2575564 100644 --- a/mqttpacket.h +++ b/mqttpacket.h @@ -78,8 +78,6 @@ public: MqttPacket(MqttPacket &&other) = default; - std::shared_ptr getCopy(char new_max_qos) const; - size_t getRequiredSizeForPublish(const ProtocolVersion protocolVersion, const Publish &publishData) const; // Constructor for outgoing packets. These may not allocate room for the fixed header, because we don't (always) know the length in advance. @@ -123,7 +121,7 @@ public: std::string getPayloadCopy() const; bool getRetain() const; void setRetain(); - Publish *getPublish(); + Publish *getPublishData(); }; #endif // MQTTPACKET_H diff --git a/publishcopyfactory.cpp b/publishcopyfactory.cpp index d2c087f..d0bb77c 100644 --- a/publishcopyfactory.cpp +++ b/publishcopyfactory.cpp @@ -17,36 +17,39 @@ PublishCopyFactory::PublishCopyFactory(Publish *publish) : } -MqttPacket *PublishCopyFactory::getOptimumPacket(char max_qos, ProtocolVersion protocolVersion) +MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion) { // TODO: cache idea: a set of constructed packets per branch in this code, witn an int identifier. if (packet) { - // TODO: some flag on packet 'containsClientSpecificStuff' - if (max_qos == 0 && max_qos < packet->getQos() && packet->getProtocolVersion() == protocolVersion && protocolVersion <= ProtocolVersion::Mqtt311) + if (packet->getProtocolVersion() == protocolVersion && orgQos == max_qos) { - // TODO: getCopy now also makes packets with Publish objects. I think I only want to do that here. - - if (!downgradedQos0PacketCopy) - downgradedQos0PacketCopy = packet->getCopy(max_qos); // TODO: getCopy perhaps std::unique_ptr - assert(downgradedQos0PacketCopy->getQos() == 0); - return downgradedQos0PacketCopy.get(); + assert(orgQos == packet->getQos()); + return packet; } - else if (packet->getProtocolVersion() >= ProtocolVersion::Mqtt5 || protocolVersion >= ProtocolVersion::Mqtt5) + + const int cache_key = (static_cast(protocolVersion) * 10) + max_qos; + std::unique_ptr &cachedPack = constructedPacketCache[cache_key]; + + if (!cachedPack) { - Publish *pub = packet->getPublish(); - pub->setClientSpecificProperties(); // TODO - this->oneShotPacket = std::make_unique(protocolVersion, *pub); - return this->oneShotPacket.get(); + Publish *orgPublish = packet->getPublishData(); + orgPublish->splitTopic = false; + orgPublish->qos = max_qos; + if (protocolVersion >= ProtocolVersion::Mqtt5) + orgPublish->setClientSpecificProperties(); + cachedPack = std::make_unique(protocolVersion, *orgPublish); } - return packet; + return cachedPack.get(); } + // Getting a packet of a Publish object happens on will messages and SYS topics and maybe some others. It's low traffic, anyway. assert(publish); - publish->setClientSpecificProperties(); + if (protocolVersion >= ProtocolVersion::Mqtt5) + publish->setClientSpecificProperties(); this->oneShotPacket = std::make_unique(protocolVersion, *publish); return this->oneShotPacket.get(); } @@ -92,11 +95,12 @@ bool PublishCopyFactory::getRetain() const Publish PublishCopyFactory::getNewPublish() const { - assert(packet->getQos() > 0); // We only need to construct new publishes for QoS. If you're doing it elsewhere, it's a bug. + assert(packet->getQos() > 0); + assert(orgQos > 0); // We only need to construct new publishes for QoS. If you're doing it elsewhere, it's a bug. if (packet) { - Publish p(*packet->getPublish()); + Publish p(*packet->getPublishData()); return p; } diff --git a/publishcopyfactory.h b/publishcopyfactory.h index d60da9b..80c775c 100644 --- a/publishcopyfactory.h +++ b/publishcopyfactory.h @@ -5,6 +5,7 @@ #include "forward_declarations.h" #include "types.h" +#include "unordered_map" /** * @brief The PublishCopyFactory class is for managing copies of an incoming publish, including sometimes not making copies at all. @@ -21,16 +22,14 @@ class PublishCopyFactory Publish *publish = nullptr; std::unique_ptr oneShotPacket; const char orgQos; - std::shared_ptr downgradedQos0PacketCopy; - - // TODO: constructed mqtt3 packet and mqtt5 packet? + std::unordered_map> constructedPacketCache; public: PublishCopyFactory(MqttPacket *packet); PublishCopyFactory(Publish *publish); PublishCopyFactory(const PublishCopyFactory &other) = delete; PublishCopyFactory(PublishCopyFactory &&other) = delete; - MqttPacket *getOptimumPacket(char max_qos, ProtocolVersion protocolVersion); + MqttPacket *getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion); char getEffectiveQos(char max_qos) const; const std::string &getTopic() const; const std::vector &getSubtopics(); diff --git a/qospacketqueue.cpp b/qospacketqueue.cpp index aecec36..21fe206 100644 --- a/qospacketqueue.cpp +++ b/qospacketqueue.cpp @@ -67,6 +67,7 @@ void QoSPublishQueue::queuePublish(PublishCopyFactory ©Factory, uint16_t id, assert(id > 0); Publish pub = copyFactory.getNewPublish(); + pub.splitTopic = false; queue.emplace_back(std::move(pub), id); qosQueueBytes += queue.back().getApproximateMemoryFootprint(); } @@ -75,6 +76,7 @@ void QoSPublishQueue::queuePublish(Publish &&pub, uint16_t id) { assert(id > 0); + pub.splitTopic = false; queue.emplace_back(std::move(pub), id); qosQueueBytes += queue.back().getApproximateMemoryFootprint(); } diff --git a/types.cpp b/types.cpp index 0499eff..bb9d050 100644 --- a/types.cpp +++ b/types.cpp @@ -117,9 +117,14 @@ size_t Publish::getLengthWithoutFixedHeader() const return result; } +/** + * @brief Publish::setClientSpecificProperties generates the properties byte array for one client. You're supposed to call it before any publish. + * + */ void Publish::setClientSpecificProperties() { - propertyBuilder->clearClientSpecificBytes(); + if (propertyBuilder) + propertyBuilder->clearClientSpecificBytes(); // TODO. Expires at? } diff --git a/types.h b/types.h index 7e843aa..ce222c8 100644 --- a/types.h +++ b/types.h @@ -198,6 +198,7 @@ public: char qos = 0; bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9] uint32_t will_delay = 0; // if will, this is the delay. Just storing here, to avoid having to make a WillMessage class + bool splitTopic = true; std::chrono::time_point createdAt; std::chrono::seconds expiresAfter; std::shared_ptr propertyBuilder;