From 6fed4d58b7d71da026140b2e99b171b31e9a58f2 Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Sat, 9 Apr 2022 09:12:44 +0200 Subject: [PATCH] Half idea of using the circular buffer to store packets --- mqttpacket.cpp | 10 +++++++--- mqttpacket.h | 2 +- sessionsandsubscriptionsdb.cpp | 47 +++++++++++++++++++++++++++-------------------- 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/mqttpacket.cpp b/mqttpacket.cpp index 27a9883..d42916e 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -776,7 +776,7 @@ void MqttPacket::handleUnsubscribe() sender->writeMqttPacket(response); } -void MqttPacket::handlePublish() +void MqttPacket::handlePublish(const bool stopAfterParsing) { const uint16_t variable_header_length = readTwoBytesToUInt16(); @@ -900,11 +900,15 @@ void MqttPacket::handlePublish() logger->logf(LOG_DEBUG, "Publish received, topic '%s'. QoS=%d. Retain=%d, dup=%d", publishData.topic.c_str(), qos, retain, dup); #endif - sender->getThreadData()->incrementReceivedMessageCount(); - payloadLen = remainingAfterPos(); payloadStart = pos; + sender->getThreadData()->incrementReceivedMessageCount(); + + // TODO: or maybe create a function parsePublishData(). + if (stopAfterParsing) + return; + Authentication &authentication = *ThreadGlobals::getAuth(); // Working with a local copy because the subscribing action will modify this->packet_id. See the PublishCopyFactory. diff --git a/mqttpacket.h b/mqttpacket.h index cbdea49..f71a0be 100644 --- a/mqttpacket.h +++ b/mqttpacket.h @@ -96,7 +96,7 @@ public: void handleSubscribe(); void handleUnsubscribe(); void handlePing(); - void handlePublish(); + void handlePublish(const bool stopAfterParsing = false); void handlePubAck(); void handlePubRec(); void handlePubRel(); diff --git a/sessionsandsubscriptionsdb.cpp b/sessionsandsubscriptionsdb.cpp index fcefe86..d8aa974 100644 --- a/sessionsandsubscriptionsdb.cpp +++ b/sessionsandsubscriptionsdb.cpp @@ -82,6 +82,12 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() continue; std::vector reserved(RESERVED_SPACE_SESSIONS_DB_V2); + CirBuf cirbuf(1024); + + // TODO: all that settings and thread data needs to be removed from Client. + std::shared_ptr dummyThreadData; + std::shared_ptr dummySettings(new Settings()); // TODO: this is wrong: these are not from config file + std::shared_ptr dummyClient(new Client(0, dummyThreadData, nullptr, false, nullptr, dummySettings, false)); for (uint32_t i = 0; i < nrOfSessions; i++) { @@ -106,22 +112,20 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() for (uint32_t i = 0; i < nrOfQueuedQoSPackets; i++) { const uint16_t id = readUint16(eofFound); - const uint32_t topicSize = readUint32(eofFound); - const uint32_t payloadSize = readUint32(eofFound); + const uint32_t packlen = readUint32(eofFound); assert(id > 0); - readCheck(buf.data(), 1, 1, f); - const unsigned char qos = buf[0]; + cirbuf.reset(); + cirbuf.ensureFreeSpace(packlen + 32); - readCheck(buf.data(), 1, topicSize, f); - const std::string topic(buf.data(), topicSize); + readCheck(cirbuf.headPtr(), 1, packlen, f); + cirbuf.advanceHead(packlen); + MqttPacket pack(cirbuf, packlen, 2, dummyClient); // TODO: store the 2 in the file - makeSureBufSize(payloadSize); - readCheck(buf.data(), 1, payloadSize, f); - const std::string payload(buf.data(), payloadSize); + pack.handlePublish(true); + Publish pub(pack.getPublishData()); - Publish pub(topic, payload, qos); logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str()); ses->qosPacketQueue.queuePublish(std::move(pub), id); } @@ -211,6 +215,8 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector &ses : sessions) { logger->logf(LOG_DEBUG, "Saving session '%s'.", ses->getClientId().c_str()); @@ -231,22 +237,23 @@ void SessionsAndSubscriptionsDB::saveData(const std::vectorqosPacketQueue) { + qosPacketsCounted++; + const Publish &pub = p.getPublish(); + assert(!pub.splitTopic); + assert(pub.topicAlias == 0); logger->logf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str()); - qosPacketsCounted++; + const MqttPacket pack(ProtocolVersion::Mqtt5, pub); + const uint32_t packSize = pack.getSizeIncludingNonPresentHeader(); + cirbuf.reset(); + cirbuf.ensureFreeSpace(packSize + 32); + pack.readIntoBuf(cirbuf); writeUint16(p.getPacketId()); - - writeUint32(pub.topic.length()); - writeUint32(pub.payload.size()); - - const char qos = pub.qos; - writeCheck(&qos, 1, 1, f); - - writeCheck(pub.topic.c_str(), 1, pub.topic.length(), f); - writeCheck(pub.payload.c_str(), 1, pub.payload.length(), f); + writeUint32(packSize); + writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f); } assert(qosPacketsExpected == qosPacketsCounted); -- libgit2 0.21.4