Commit 6fed4d58b7d71da026140b2e99b171b31e9a58f2

Authored by Wiebe Cazemier
1 parent 03fd22bd

Half idea of using the circular buffer to store packets

I'm not sure how I'm continueing. I need a safe point.
mqttpacket.cpp
@@ -776,7 +776,7 @@ void MqttPacket::handleUnsubscribe() @@ -776,7 +776,7 @@ void MqttPacket::handleUnsubscribe()
776 sender->writeMqttPacket(response); 776 sender->writeMqttPacket(response);
777 } 777 }
778 778
779 -void MqttPacket::handlePublish() 779 +void MqttPacket::handlePublish(const bool stopAfterParsing)
780 { 780 {
781 const uint16_t variable_header_length = readTwoBytesToUInt16(); 781 const uint16_t variable_header_length = readTwoBytesToUInt16();
782 782
@@ -900,11 +900,15 @@ void MqttPacket::handlePublish() @@ -900,11 +900,15 @@ void MqttPacket::handlePublish()
900 logger->logf(LOG_DEBUG, "Publish received, topic '%s'. QoS=%d. Retain=%d, dup=%d", publishData.topic.c_str(), qos, retain, dup); 900 logger->logf(LOG_DEBUG, "Publish received, topic '%s'. QoS=%d. Retain=%d, dup=%d", publishData.topic.c_str(), qos, retain, dup);
901 #endif 901 #endif
902 902
903 - sender->getThreadData()->incrementReceivedMessageCount();  
904 -  
905 payloadLen = remainingAfterPos(); 903 payloadLen = remainingAfterPos();
906 payloadStart = pos; 904 payloadStart = pos;
907 905
  906 + sender->getThreadData()->incrementReceivedMessageCount();
  907 +
  908 + // TODO: or maybe create a function parsePublishData().
  909 + if (stopAfterParsing)
  910 + return;
  911 +
908 Authentication &authentication = *ThreadGlobals::getAuth(); 912 Authentication &authentication = *ThreadGlobals::getAuth();
909 913
910 // Working with a local copy because the subscribing action will modify this->packet_id. See the PublishCopyFactory. 914 // Working with a local copy because the subscribing action will modify this->packet_id. See the PublishCopyFactory.
mqttpacket.h
@@ -96,7 +96,7 @@ public: @@ -96,7 +96,7 @@ public:
96 void handleSubscribe(); 96 void handleSubscribe();
97 void handleUnsubscribe(); 97 void handleUnsubscribe();
98 void handlePing(); 98 void handlePing();
99 - void handlePublish(); 99 + void handlePublish(const bool stopAfterParsing = false);
100 void handlePubAck(); 100 void handlePubAck();
101 void handlePubRec(); 101 void handlePubRec();
102 void handlePubRel(); 102 void handlePubRel();
sessionsandsubscriptionsdb.cpp
@@ -82,6 +82,12 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -82,6 +82,12 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
82 continue; 82 continue;
83 83
84 std::vector<char> reserved(RESERVED_SPACE_SESSIONS_DB_V2); 84 std::vector<char> reserved(RESERVED_SPACE_SESSIONS_DB_V2);
  85 + CirBuf cirbuf(1024);
  86 +
  87 + // TODO: all that settings and thread data needs to be removed from Client.
  88 + std::shared_ptr<ThreadData> dummyThreadData;
  89 + std::shared_ptr<Settings> dummySettings(new Settings()); // TODO: this is wrong: these are not from config file
  90 + std::shared_ptr<Client> dummyClient(new Client(0, dummyThreadData, nullptr, false, nullptr, dummySettings, false));
85 91
86 for (uint32_t i = 0; i < nrOfSessions; i++) 92 for (uint32_t i = 0; i < nrOfSessions; i++)
87 { 93 {
@@ -106,22 +112,20 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -106,22 +112,20 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
106 for (uint32_t i = 0; i < nrOfQueuedQoSPackets; i++) 112 for (uint32_t i = 0; i < nrOfQueuedQoSPackets; i++)
107 { 113 {
108 const uint16_t id = readUint16(eofFound); 114 const uint16_t id = readUint16(eofFound);
109 - const uint32_t topicSize = readUint32(eofFound);  
110 - const uint32_t payloadSize = readUint32(eofFound); 115 + const uint32_t packlen = readUint32(eofFound);
111 116
112 assert(id > 0); 117 assert(id > 0);
113 118
114 - readCheck(buf.data(), 1, 1, f);  
115 - const unsigned char qos = buf[0]; 119 + cirbuf.reset();
  120 + cirbuf.ensureFreeSpace(packlen + 32);
116 121
117 - readCheck(buf.data(), 1, topicSize, f);  
118 - const std::string topic(buf.data(), topicSize); 122 + readCheck(cirbuf.headPtr(), 1, packlen, f);
  123 + cirbuf.advanceHead(packlen);
  124 + MqttPacket pack(cirbuf, packlen, 2, dummyClient); // TODO: store the 2 in the file
119 125
120 - makeSureBufSize(payloadSize);  
121 - readCheck(buf.data(), 1, payloadSize, f);  
122 - const std::string payload(buf.data(), payloadSize); 126 + pack.handlePublish(true);
  127 + Publish pub(pack.getPublishData());
123 128
124 - Publish pub(topic, payload, qos);  
125 logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str()); 129 logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str());
126 ses->qosPacketQueue.queuePublish(std::move(pub), id); 130 ses->qosPacketQueue.queuePublish(std::move(pub), id);
127 } 131 }
@@ -211,6 +215,8 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -211,6 +215,8 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
211 215
212 writeUint32(sessions.size()); 216 writeUint32(sessions.size());
213 217
  218 + CirBuf cirbuf(1024);
  219 +
214 for (const std::unique_ptr<Session> &ses : sessions) 220 for (const std::unique_ptr<Session> &ses : sessions)
215 { 221 {
216 logger->logf(LOG_DEBUG, "Saving session '%s'.", ses->getClientId().c_str()); 222 logger->logf(LOG_DEBUG, "Saving session '%s'.", ses->getClientId().c_str());
@@ -231,22 +237,23 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -231,22 +237,23 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
231 237
232 for (const QueuedPublish &p: ses->qosPacketQueue) 238 for (const QueuedPublish &p: ses->qosPacketQueue)
233 { 239 {
  240 + qosPacketsCounted++;
  241 +
234 const Publish &pub = p.getPublish(); 242 const Publish &pub = p.getPublish();
  243 + assert(!pub.splitTopic);
  244 + assert(pub.topicAlias == 0);
235 245
236 logger->logf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str()); 246 logger->logf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str());
237 247
238 - qosPacketsCounted++; 248 + const MqttPacket pack(ProtocolVersion::Mqtt5, pub);
  249 + const uint32_t packSize = pack.getSizeIncludingNonPresentHeader();
  250 + cirbuf.reset();
  251 + cirbuf.ensureFreeSpace(packSize + 32);
  252 + pack.readIntoBuf(cirbuf);
239 253
240 writeUint16(p.getPacketId()); 254 writeUint16(p.getPacketId());
241 -  
242 - writeUint32(pub.topic.length());  
243 - writeUint32(pub.payload.size());  
244 -  
245 - const char qos = pub.qos;  
246 - writeCheck(&qos, 1, 1, f);  
247 -  
248 - writeCheck(pub.topic.c_str(), 1, pub.topic.length(), f);  
249 - writeCheck(pub.payload.c_str(), 1, pub.payload.length(), f); 255 + writeUint32(packSize);
  256 + writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f);
250 } 257 }
251 258
252 assert(qosPacketsExpected == qosPacketsCounted); 259 assert(qosPacketsExpected == qosPacketsCounted);