Commit 7be65e9555d40b2b407ce314fd645eeade110106

Authored by Wiebe Cazemier
1 parent e69a2c35

Store fixed header length in saved queued packet

And the TODOs are for the next thing: saving the createdAt date.
mqttpacket.cpp
@@ -1074,14 +1074,28 @@ std::string MqttPacket::getPayloadCopy() const @@ -1074,14 +1074,28 @@ std::string MqttPacket::getPayloadCopy() const
1074 return payload; 1074 return payload;
1075 } 1075 }
1076 1076
  1077 +
  1078 +
  1079 +uint8_t MqttPacket::getFixedHeaderLength() const
  1080 +{
  1081 + size_t result = this->fixed_header_length;
  1082 +
  1083 + if (result == 0)
  1084 + {
  1085 + result++; // first byte it always there.
  1086 + result += remainingLength.getLen();
  1087 + }
  1088 +
  1089 + return result;
  1090 +}
  1091 +
1077 size_t MqttPacket::getSizeIncludingNonPresentHeader() const 1092 size_t MqttPacket::getSizeIncludingNonPresentHeader() const
1078 { 1093 {
1079 size_t total = bites.size(); 1094 size_t total = bites.size();
1080 1095
1081 if (fixed_header_length == 0) 1096 if (fixed_header_length == 0)
1082 { 1097 {
1083 - total++;  
1084 - total += remainingLength.getLen(); 1098 + total += getFixedHeaderLength();
1085 } 1099 }
1086 1100
1087 return total; 1101 return total;
mqttpacket.h
@@ -116,6 +116,7 @@ public: @@ -116,6 +116,7 @@ public:
116 void handlePubRel(); 116 void handlePubRel();
117 void handlePubComp(); 117 void handlePubComp();
118 118
  119 + uint8_t getFixedHeaderLength() const;
119 size_t getSizeIncludingNonPresentHeader() const; 120 size_t getSizeIncludingNonPresentHeader() const;
120 const std::vector<char> &getBites() const { return bites; } 121 const std::vector<char> &getBites() const { return bites; }
121 char getQos() const { return publishData.qos; } 122 char getQos() const { return publishData.qos; }
sessionsandsubscriptionsdb.cpp
@@ -111,6 +111,7 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -111,6 +111,7 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
111 const uint32_t nrOfQueuedQoSPackets = readUint32(eofFound); 111 const uint32_t nrOfQueuedQoSPackets = readUint32(eofFound);
112 for (uint32_t i = 0; i < nrOfQueuedQoSPackets; i++) 112 for (uint32_t i = 0; i < nrOfQueuedQoSPackets; i++)
113 { 113 {
  114 + const uint16_t fixed_header_length = readUint16(eofFound);
114 const uint16_t id = readUint16(eofFound); 115 const uint16_t id = readUint16(eofFound);
115 const uint32_t packlen = readUint32(eofFound); 116 const uint32_t packlen = readUint32(eofFound);
116 117
@@ -121,11 +122,13 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -121,11 +122,13 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
121 122
122 readCheck(cirbuf.headPtr(), 1, packlen, f); 123 readCheck(cirbuf.headPtr(), 1, packlen, f);
123 cirbuf.advanceHead(packlen); 124 cirbuf.advanceHead(packlen);
124 - MqttPacket pack(cirbuf, packlen, 2, dummyClient); // TODO: store the 2 in the file 125 + MqttPacket pack(cirbuf, packlen, fixed_header_length, dummyClient);
125 126
126 pack.parsePublishData(); 127 pack.parsePublishData();
127 Publish pub(pack.getPublishData()); 128 Publish pub(pack.getPublishData());
128 129
  130 + // TODO: update the pub.createdAt
  131 +
129 logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str()); 132 logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str());
130 ses->qosPacketQueue.queuePublish(std::move(pub), id); 133 ses->qosPacketQueue.queuePublish(std::move(pub), id);
131 } 134 }
@@ -247,7 +250,7 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -247,7 +250,7 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
247 250
248 logger->logf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str()); 251 logger->logf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str());
249 252
250 - pub.clearClientSpecificProperties(); 253 + pub.clearClientSpecificProperties(); // TODO: unnecessary? Unwanted even? I need to store the expiration interval. And how to load it?
251 254
252 MqttPacket pack(ProtocolVersion::Mqtt5, pub); 255 MqttPacket pack(ProtocolVersion::Mqtt5, pub);
253 pack.setPacketId(p.getPacketId()); 256 pack.setPacketId(p.getPacketId());
@@ -256,6 +259,9 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -256,6 +259,9 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
256 cirbuf.ensureFreeSpace(packSize + 32); 259 cirbuf.ensureFreeSpace(packSize + 32);
257 pack.readIntoBuf(cirbuf); 260 pack.readIntoBuf(cirbuf);
258 261
  262 + // TODO: save age
  263 +
  264 + writeUint16(pack.getFixedHeaderLength());
259 writeUint16(p.getPacketId()); 265 writeUint16(p.getPacketId());
260 writeUint32(packSize); 266 writeUint32(packSize);
261 writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f); 267 writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f);