From bf2193f961a3a00a605c1ea8e47c81a4bf02151d Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Tue, 14 Jun 2022 02:46:21 +0200 Subject: [PATCH] Move message counting to the end of the call stack --- FlashMQTests/tst_maintests.cpp | 4 +--- client.cpp | 24 +++++++++++++----------- client.h | 6 +++--- session.cpp | 17 ++++++----------- session.h | 4 ++-- subscriptionstore.cpp | 25 ++++++------------------- subscriptionstore.h | 4 ++-- 7 files changed, 33 insertions(+), 51 deletions(-) diff --git a/FlashMQTests/tst_maintests.cpp b/FlashMQTests/tst_maintests.cpp index 891acf8..e83843c 100644 --- a/FlashMQTests/tst_maintests.cpp +++ b/FlashMQTests/tst_maintests.cpp @@ -1033,15 +1033,13 @@ void MainTests::testSavingSessions() splitTopic(topic4, subtopics); store->addSubscription(c2, topic4, subtopics, 0); - uint64_t count = 0; - Publish publish("a/b/c", "Hello Barry", 1); std::shared_ptr c1ses = c1->getSession(); c1.reset(); MqttPacket publishPacket(ProtocolVersion::Mqtt311, publish); PublishCopyFactory fac(&publishPacket); - c1ses->writePacket(fac, 1, count); + c1ses->writePacket(fac, 1); store->saveSessionsAndSubscriptions("/tmp/flashmqtests_sessions.db"); diff --git a/client.cpp b/client.cpp index 5b0c543..370e21f 100644 --- a/client.cpp +++ b/client.cpp @@ -191,7 +191,7 @@ void Client::writeText(const std::string &text) setReadyForWriting(true); } -int Client::writeMqttPacket(const MqttPacket &packet) +void Client::writeMqttPacket(const MqttPacket &packet) { const size_t packetSize = packet.getSizeIncludingNonPresentHeader(); @@ -199,7 +199,7 @@ int Client::writeMqttPacket(const MqttPacket &packet) // sending that Application Message [MQTT-3.1.2-25]." if (packetSize > this->maxOutgoingPacketSize) { - return 0; + return; } std::lock_guard locker(writeBufMutex); @@ -215,19 +215,23 @@ int Client::writeMqttPacket(const MqttPacket &packet) // QoS packet are queued and limited elsewhere. if (packet.packetType == PacketType::PUBLISH && packet.getQos() == 0 && packetSize > writebuf.freeSpace()) { - return 0; + return; } packet.readIntoBuf(writebuf); - if (packet.packetType == PacketType::DISCONNECT) + if (packet.packetType == PacketType::PUBLISH) + { + ThreadData *td = ThreadGlobals::getThreadData(); + td->incrementSentMessageCount(1); + } + else if (packet.packetType == PacketType::DISCONNECT) setReadyForDisconnect(); setReadyForWriting(true); - return 1; } -int Client::writeMqttPacketAndBlameThisClient(PublishCopyFactory ©Factory, char max_qos, uint16_t packet_id) +void Client::writeMqttPacketAndBlameThisClient(PublishCopyFactory ©Factory, char max_qos, uint16_t packet_id) { uint16_t topic_alias = 0; bool skip_topic = false; @@ -256,22 +260,20 @@ int Client::writeMqttPacketAndBlameThisClient(PublishCopyFactory ©Factory, c p->setQos(max_qos); } - return writeMqttPacketAndBlameThisClient(*p); + writeMqttPacketAndBlameThisClient(*p); } // Helper method to avoid the exception ending up at the sender of messages, which would then get disconnected. -int Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet) +void Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet) { try { - return this->writeMqttPacket(packet); + this->writeMqttPacket(packet); } catch (std::exception &ex) { threadData->removeClientQueued(fd); } - - return 0; } // Ping responses are always the same, so hardcoding it for optimization. diff --git a/client.h b/client.h index 6577173..dd66abb 100644 --- a/client.h +++ b/client.h @@ -151,9 +151,9 @@ public: void writeText(const std::string &text); void writePingResp(); - int writeMqttPacket(const MqttPacket &packet); - int writeMqttPacketAndBlameThisClient(PublishCopyFactory ©Factory, char max_qos, uint16_t packet_id); - int writeMqttPacketAndBlameThisClient(const MqttPacket &packet); + void writeMqttPacket(const MqttPacket &packet); + void writeMqttPacketAndBlameThisClient(PublishCopyFactory ©Factory, char max_qos, uint16_t packet_id); + void writeMqttPacketAndBlameThisClient(const MqttPacket &packet); bool writeBufIntoFd(); bool isBeingDisconnected() const { return disconnectWhenBytesWritten; } bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; } diff --git a/session.cpp b/session.cpp index cd7d358..b7770e7 100644 --- a/session.cpp +++ b/session.cpp @@ -130,7 +130,7 @@ void Session::assignActiveConnection(std::shared_ptr &client) * @param retain. Keep MQTT-3.3.1-9 in mind: existing subscribers don't get retain=1 on packets. * @param count. Reference value is updated. It's for statistics. */ -void Session::writePacket(PublishCopyFactory ©Factory, const char max_qos, uint64_t &count) +void Session::writePacket(PublishCopyFactory ©Factory, const char max_qos) { assert(max_qos <= 2); @@ -148,7 +148,7 @@ void Session::writePacket(PublishCopyFactory ©Factory, const char max_qos, u { if (c) { - count += c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, 0); + c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, 0); } } else if (effectiveQos > 0) @@ -174,7 +174,7 @@ void Session::writePacket(PublishCopyFactory ©Factory, const char max_qos, u if (c) { - count += c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, nextPacketId); + c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, nextPacketId); } } } @@ -212,7 +212,6 @@ bool Session::clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds) /** * @brief Session::sendAllPendingQosData sends pending publishes and QoS2 control packets. - * @return the amount of messages/packets published. * * [MQTT-4.4.0-1] (about MQTT 3.1.1): "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST * re-send any unacknowledged PUBLISH Packets (where QoS > 0) and PUBREL Packets using their original Packet Identifiers. This @@ -225,10 +224,8 @@ bool Session::clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds) * never know that, because IT will have received the PUBACK from FlashMQ. The QoS system is not between publisher * and subscriber. Users are required to implement something themselves. */ -uint64_t Session::sendAllPendingQosData() +void Session::sendAllPendingQosData() { - uint64_t count = 0; - std::shared_ptr c = makeSharedClient(); if (c) { @@ -259,7 +256,7 @@ uint64_t Session::sendAllPendingQosData() p.setPacketId(queuedPublish.getPacketId()); //p.setDuplicate(); // TODO: this is wrong. Until we have a retransmission system, no packets can have the DUP bit set. - count += c->writeMqttPacketAndBlameThisClient(p); + c->writeMqttPacketAndBlameThisClient(p); pos++; } @@ -268,11 +265,9 @@ uint64_t Session::sendAllPendingQosData() { PubResponse pubRel(c->getProtocolVersion(), PacketType::PUBREL, ReasonCodes::Success, packet_id); MqttPacket packet(pubRel); - count += c->writeMqttPacketAndBlameThisClient(packet); + c->writeMqttPacketAndBlameThisClient(packet); } } - - return count; } bool Session::hasActiveClient() const diff --git a/session.h b/session.h index d4bd962..45720e5 100644 --- a/session.h +++ b/session.h @@ -78,9 +78,9 @@ public: const std::string &getClientId() const { return client_id; } std::shared_ptr makeSharedClient() const; void assignActiveConnection(std::shared_ptr &client); - void writePacket(PublishCopyFactory ©Factory, const char max_qos, uint64_t &count); + void writePacket(PublishCopyFactory ©Factory, const char max_qos); bool clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds); - uint64_t sendAllPendingQosData(); + void sendAllPendingQosData(); bool hasActiveClient() const; void clearWill(); std::shared_ptr &getWill(); diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 2d200d3..a7ca0f1 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -147,8 +147,7 @@ void SubscriptionStore::addSubscription(std::shared_ptr &client, const s const std::shared_ptr &ses = session_it->second; deepestNode->addSubscriber(ses, qos); lock_guard.unlock(); - uint64_t count = giveClientRetainedMessages(ses, subtopics, qos); - client->getThreadData()->incrementSentMessageCount(count); + giveClientRetainedMessages(ses, subtopics, qos); } } } @@ -264,8 +263,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr session->assignActiveConnection(client); client->assignSession(session); session->setSessionProperties(clientReceiveMax, sessionExpiryInterval, clean_start, client->getProtocolVersion()); - uint64_t count = session->sendAllPendingQosData(); - client->getThreadData()->incrementSentMessageCount(count); + session->sendAllPendingQosData(); } /** @@ -451,7 +449,6 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory ©Factory { SubscriptionNode *startNode = dollar ? &rootDollar : &root; - uint64_t count = 0; std::forward_list subscriberSessions; { @@ -463,13 +460,7 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory ©Factory for(const ReceivingSubscriber &x : subscriberSessions) { - x.session->writePacket(copyFactory, x.qos, count); - } - - std::shared_ptr sender = copyFactory.getSender(); - if (sender) - { - sender->getThreadData()->incrementSentMessageCount(count); + x.session->writePacket(copyFactory, x.qos); } } @@ -524,11 +515,9 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector &ses, - const std::vector &subscribeSubtopics, char max_qos) +void SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr &ses, + const std::vector &subscribeSubtopics, char max_qos) { - uint64_t count = 0; - RetainedMessageNode *startNode = &retainedMessagesRoot; if (!subscribeSubtopics.empty() && !subscribeSubtopics[0].empty() > 0 && subscribeSubtopics[0][0] == '$') startNode = &retainedMessagesRootDollar; @@ -544,10 +533,8 @@ uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptrwritePacket(copyFactory, max_qos, count); + ses->writePacket(copyFactory, max_qos); } - - return count; } void SubscriptionStore::setRetainedMessage(const Publish &publish, const std::vector &subtopics) diff --git a/subscriptionstore.h b/subscriptionstore.h index 1dc359d..3584576 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -150,8 +150,8 @@ public: void sendQueuedWillMessages(); void queueWillMessage(const std::shared_ptr &willMessage, const std::shared_ptr &session, bool forceNow = false); void queuePacketAtSubscribers(PublishCopyFactory ©Factory, bool dollar = false); - uint64_t giveClientRetainedMessages(const std::shared_ptr &ses, - const std::vector &subscribeSubtopics, char max_qos); + void giveClientRetainedMessages(const std::shared_ptr &ses, + const std::vector &subscribeSubtopics, char max_qos); void setRetainedMessage(const Publish &publish, const std::vector &subtopics); -- libgit2 0.21.4