Commit 7b907c84c793e707f3ff7c208881260e6759c4b1

Authored by Wiebe Cazemier
1 parent 08ec7f20

Almost works

I need to decide what to do with getPublishData

and that disabled test needs repurposing.
FlashMQTests/tst_maintests.cpp
... ... @@ -1148,6 +1148,7 @@ void testCopyPacketHelper(const std::string &topic, char from_qos, char to_qos,
1148 1148 QCOMPARE(stagingPacketOne.getTopic(), parsedPacketOne.getTopic());
1149 1149 QCOMPARE(stagingPacketOne.getPayloadCopy(), parsedPacketOne.getPayloadCopy());
1150 1150  
  1151 + /*
1151 1152 std::shared_ptr<MqttPacket> copiedPacketOne = parsedPacketOne.getCopy(to_qos);
1152 1153  
1153 1154 QCOMPARE(payloadOne, copiedPacketOne->getPayloadCopy());
... ... @@ -1166,6 +1167,7 @@ void testCopyPacketHelper(const std::string &amp;topic, char from_qos, char to_qos,
1166 1167 copiedPacketOne->readIntoBuf(bufOfCopied);
1167 1168 QVERIFY2(bufOfCopied == bufOfReference, formatString("Failure on length %d for topic %s, from qos %d to qos %d, retain: %d.",
1168 1169 len, topic.c_str(), from_qos, to_qos, retain).c_str());
  1170 + */
1169 1171 }
1170 1172 }
1171 1173  
... ...
mqttpacket.cpp
... ... @@ -39,77 +39,6 @@ MqttPacket::MqttPacket(CirBuf &amp;buf, size_t packet_len, size_t fixed_header_lengt
39 39 pos += fixed_header_length;
40 40 }
41 41  
42   -/**
43   - * @brief MqttPacket::getCopy (using default copy constructor and resetting some selected fields) is easier than using the copy constructor
44   - * publically, because then I have to keep maintaining a functioning copy constructor for each new field I add.
45   - * @return a shared pointer because that's typically how we need it; we only need to copy it if we pass it around as shared resource.
46   - *
47   - * The idea is that because a packet with QoS is longer than one without, we just copy as much as possible if both packets have the same QoS.
48   - *
49   - * Note that there can be two types of packets: one with the fixed header (including remaining length), and one without. The latter we could be
50   - * more clever about, but I'm forgoing that right now. Their use is mostly for retained messages.
51   - *
52   - * Also note that some fields are undeterminstic in the copy: dup, retain and packetid for instance. Sometimes they come from the original,
53   - * sometimes not. The current planned usage is that those fields will either ONLY or NEVER be used in the copy, so it doesn't matter what I do
54   - * with them here. I may reconsider.
55   - */
56   -std::shared_ptr<MqttPacket> MqttPacket::getCopy(char new_max_qos) const
57   -{
58   - assert(packetType == PacketType::PUBLISH);
59   -
60   - // You're not supposed to copy a duplicate packet. The only packets that get the dup flag, should not be copied AGAIN. This
61   - // has to do with the Session::writePacket() and Session::sendPendingQosMessages() logic.
62   - assert((first_byte & 0b00001000) == 0);
63   -
64   - if (publishData.qos > 0 && new_max_qos == 0)
65   - {
66   - // if shrinking the packet doesn't alter the amount of bytes in the 'remaining length' part of the header, we can
67   - // just memmove+shrink the packet. This is because the packet id always is two bytes before the payload, so we just move the payload
68   - // over it. When testing 100M copies, it went from 21000 ms to 10000 ms. In other words, about 200 ns to 100 ns per copy.
69   - // There is an elaborate unit test to test this optimization.
70   - if ((fixed_header_length == 2 && bites.size() < 125))
71   - {
72   - // I don't know yet if this is true, but I don't want to forget when I implemenet MQTT5.
73   - assert(sender && sender->getProtocolVersion() <= ProtocolVersion::Mqtt311);
74   -
75   - std::shared_ptr<MqttPacket> p(new MqttPacket(*this));
76   - p->sender.reset();
77   -
78   - if (payloadLen > 0)
79   - std::memmove(&p->bites[packet_id_pos], &p->bites[packet_id_pos+2], payloadLen);
80   - p->bites.erase(p->bites.end() - 2, p->bites.end());
81   - p->packet_id_pos = 0;
82   - p->publishData.qos = 0;
83   - p->payloadStart -= 2;
84   - if (pos > p->bites.size()) // pos can possible be set elsewhere, so we only set it back if it was after the payload.
85   - p->pos -= 2;
86   - p->packet_id = 0;
87   -
88   - // Clear QoS bits from the header.
89   - p->first_byte &= 0b11111001;
90   - p->bites[0] = p->first_byte;
91   -
92   - assert((p->bites[1] & 0b10000000) == 0); // when there is an MSB, I musn't get rid of it.
93   - assert(p->bites[1] > 3); // There has to be a remaining value after subtracting 2.
94   -
95   - p->bites[1] -= 2; // Reduce the value in the 'remaining length' part of the header.
96   -
97   - return p;
98   - }
99   -
100   - Publish pub(publishData.topic, getPayloadCopy(), new_max_qos);
101   - pub.retain = getRetain();
102   - std::shared_ptr<MqttPacket> copyPacket(new MqttPacket(ProtocolVersion::Mqtt311, pub)); // TODO: don't hard-code the protocol version.
103   - return copyPacket;
104   - }
105   -
106   - std::shared_ptr<MqttPacket> copyPacket(new MqttPacket(*this));
107   - copyPacket->sender.reset();
108   - if (publishData.qos != new_max_qos)
109   - copyPacket->setQos(new_max_qos);
110   - return copyPacket;
111   -}
112   -
113 42 MqttPacket::MqttPacket(const ConnAck &connAck) :
114 43 bites(connAck.getLengthWithoutFixedHeader())
115 44 {
... ... @@ -174,7 +103,9 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &amp;_pu
174 103 this->protocolVersion = protocolVersion;
175 104  
176 105 this->publishData.topic = _publish.topic;
177   - splitTopic(this->publishData.topic, this->publishData.subtopics); // TODO: I think I can make this conditional, because the (planned) use will already have used the subtopics.
  106 +
  107 + if (_publish.splitTopic)
  108 + splitTopic(this->publishData.topic, this->publishData.subtopics);
178 109  
179 110 packetType = PacketType::PUBLISH;
180 111 this->publishData.qos = _publish.qos;
... ... @@ -1197,7 +1128,7 @@ void MqttPacket::setRetain()
1197 1128 }
1198 1129 }
1199 1130  
1200   -Publish *MqttPacket::getPublish()
  1131 +Publish *MqttPacket::getPublishData()
1201 1132 {
1202 1133 if (payloadLen > 0 && publishData.payload.empty())
1203 1134 publishData.payload = getPayloadCopy();
... ...
mqttpacket.h
... ... @@ -78,8 +78,6 @@ public:
78 78  
79 79 MqttPacket(MqttPacket &&other) = default;
80 80  
81   - std::shared_ptr<MqttPacket> getCopy(char new_max_qos) const;
82   -
83 81 size_t getRequiredSizeForPublish(const ProtocolVersion protocolVersion, const Publish &publishData) const;
84 82  
85 83 // Constructor for outgoing packets. These may not allocate room for the fixed header, because we don't (always) know the length in advance.
... ... @@ -123,7 +121,7 @@ public:
123 121 std::string getPayloadCopy() const;
124 122 bool getRetain() const;
125 123 void setRetain();
126   - Publish *getPublish();
  124 + Publish *getPublishData();
127 125 };
128 126  
129 127 #endif // MQTTPACKET_H
... ...
publishcopyfactory.cpp
... ... @@ -17,36 +17,39 @@ PublishCopyFactory::PublishCopyFactory(Publish *publish) :
17 17  
18 18 }
19 19  
20   -MqttPacket *PublishCopyFactory::getOptimumPacket(char max_qos, ProtocolVersion protocolVersion)
  20 +MqttPacket *PublishCopyFactory::getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion)
21 21 {
22 22 // TODO: cache idea: a set of constructed packets per branch in this code, witn an int identifier.
23 23  
24 24 if (packet)
25 25 {
26   - // TODO: some flag on packet 'containsClientSpecificStuff'
27   - if (max_qos == 0 && max_qos < packet->getQos() && packet->getProtocolVersion() == protocolVersion && protocolVersion <= ProtocolVersion::Mqtt311)
  26 + if (packet->getProtocolVersion() == protocolVersion && orgQos == max_qos)
28 27 {
29   - // TODO: getCopy now also makes packets with Publish objects. I think I only want to do that here.
30   -
31   - if (!downgradedQos0PacketCopy)
32   - downgradedQos0PacketCopy = packet->getCopy(max_qos); // TODO: getCopy perhaps std::unique_ptr
33   - assert(downgradedQos0PacketCopy->getQos() == 0);
34   - return downgradedQos0PacketCopy.get();
  28 + assert(orgQos == packet->getQos());
  29 + return packet;
35 30 }
36   - else if (packet->getProtocolVersion() >= ProtocolVersion::Mqtt5 || protocolVersion >= ProtocolVersion::Mqtt5)
  31 +
  32 + const int cache_key = (static_cast<uint8_t>(protocolVersion) * 10) + max_qos;
  33 + std::unique_ptr<MqttPacket> &cachedPack = constructedPacketCache[cache_key];
  34 +
  35 + if (!cachedPack)
37 36 {
38   - Publish *pub = packet->getPublish();
39   - pub->setClientSpecificProperties(); // TODO
40   - this->oneShotPacket = std::make_unique<MqttPacket>(protocolVersion, *pub);
41   - return this->oneShotPacket.get();
  37 + Publish *orgPublish = packet->getPublishData();
  38 + orgPublish->splitTopic = false;
  39 + orgPublish->qos = max_qos;
  40 + if (protocolVersion >= ProtocolVersion::Mqtt5)
  41 + orgPublish->setClientSpecificProperties();
  42 + cachedPack = std::make_unique<MqttPacket>(protocolVersion, *orgPublish);
42 43 }
43 44  
44   - return packet;
  45 + return cachedPack.get();
45 46 }
46 47  
  48 + // Getting a packet of a Publish object happens on will messages and SYS topics and maybe some others. It's low traffic, anyway.
47 49 assert(publish);
48 50  
49   - publish->setClientSpecificProperties();
  51 + if (protocolVersion >= ProtocolVersion::Mqtt5)
  52 + publish->setClientSpecificProperties();
50 53 this->oneShotPacket = std::make_unique<MqttPacket>(protocolVersion, *publish);
51 54 return this->oneShotPacket.get();
52 55 }
... ... @@ -92,11 +95,12 @@ bool PublishCopyFactory::getRetain() const
92 95  
93 96 Publish PublishCopyFactory::getNewPublish() const
94 97 {
95   - assert(packet->getQos() > 0); // We only need to construct new publishes for QoS. If you're doing it elsewhere, it's a bug.
  98 + assert(packet->getQos() > 0);
  99 + assert(orgQos > 0); // We only need to construct new publishes for QoS. If you're doing it elsewhere, it's a bug.
96 100  
97 101 if (packet)
98 102 {
99   - Publish p(*packet->getPublish());
  103 + Publish p(*packet->getPublishData());
100 104 return p;
101 105 }
102 106  
... ...
publishcopyfactory.h
... ... @@ -5,6 +5,7 @@
5 5  
6 6 #include "forward_declarations.h"
7 7 #include "types.h"
  8 +#include "unordered_map"
8 9  
9 10 /**
10 11 * @brief The PublishCopyFactory class is for managing copies of an incoming publish, including sometimes not making copies at all.
... ... @@ -21,16 +22,14 @@ class PublishCopyFactory
21 22 Publish *publish = nullptr;
22 23 std::unique_ptr<MqttPacket> oneShotPacket;
23 24 const char orgQos;
24   - std::shared_ptr<MqttPacket> downgradedQos0PacketCopy;
25   -
26   - // TODO: constructed mqtt3 packet and mqtt5 packet?
  25 + std::unordered_map<uint8_t, std::unique_ptr<MqttPacket>> constructedPacketCache;
27 26 public:
28 27 PublishCopyFactory(MqttPacket *packet);
29 28 PublishCopyFactory(Publish *publish);
30 29 PublishCopyFactory(const PublishCopyFactory &other) = delete;
31 30 PublishCopyFactory(PublishCopyFactory &&other) = delete;
32 31  
33   - MqttPacket *getOptimumPacket(char max_qos, ProtocolVersion protocolVersion);
  32 + MqttPacket *getOptimumPacket(const char max_qos, const ProtocolVersion protocolVersion);
34 33 char getEffectiveQos(char max_qos) const;
35 34 const std::string &getTopic() const;
36 35 const std::vector<std::string> &getSubtopics();
... ...
qospacketqueue.cpp
... ... @@ -67,6 +67,7 @@ void QoSPublishQueue::queuePublish(PublishCopyFactory &amp;copyFactory, uint16_t id,
67 67 assert(id > 0);
68 68  
69 69 Publish pub = copyFactory.getNewPublish();
  70 + pub.splitTopic = false;
70 71 queue.emplace_back(std::move(pub), id);
71 72 qosQueueBytes += queue.back().getApproximateMemoryFootprint();
72 73 }
... ... @@ -75,6 +76,7 @@ void QoSPublishQueue::queuePublish(Publish &amp;&amp;pub, uint16_t id)
75 76 {
76 77 assert(id > 0);
77 78  
  79 + pub.splitTopic = false;
78 80 queue.emplace_back(std::move(pub), id);
79 81 qosQueueBytes += queue.back().getApproximateMemoryFootprint();
80 82 }
... ...
types.cpp
... ... @@ -117,9 +117,14 @@ size_t Publish::getLengthWithoutFixedHeader() const
117 117 return result;
118 118 }
119 119  
  120 +/**
  121 + * @brief Publish::setClientSpecificProperties generates the properties byte array for one client. You're supposed to call it before any publish.
  122 + *
  123 + */
120 124 void Publish::setClientSpecificProperties()
121 125 {
122   - propertyBuilder->clearClientSpecificBytes();
  126 + if (propertyBuilder)
  127 + propertyBuilder->clearClientSpecificBytes();
123 128 // TODO. Expires at?
124 129 }
125 130  
... ...
... ... @@ -198,6 +198,7 @@ public:
198 198 char qos = 0;
199 199 bool retain = false; // Note: existing subscribers don't get publishes of retained messages with retain=1. [MQTT-3.3.1-9]
200 200 uint32_t will_delay = 0; // if will, this is the delay. Just storing here, to avoid having to make a WillMessage class
  201 + bool splitTopic = true;
201 202 std::chrono::time_point<std::chrono::steady_clock> createdAt;
202 203 std::chrono::seconds expiresAfter;
203 204 std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder;
... ...