Commit bf2193f961a3a00a605c1ea8e47c81a4bf02151d

Authored by Wiebe Cazemier
1 parent 44de6670

Move message counting to the end of the call stack

This makes much more sense than returning the amount of messages sent
all the way up the call stack.
FlashMQTests/tst_maintests.cpp
@@ -1033,15 +1033,13 @@ void MainTests::testSavingSessions() @@ -1033,15 +1033,13 @@ void MainTests::testSavingSessions()
1033 splitTopic(topic4, subtopics); 1033 splitTopic(topic4, subtopics);
1034 store->addSubscription(c2, topic4, subtopics, 0); 1034 store->addSubscription(c2, topic4, subtopics, 0);
1035 1035
1036 - uint64_t count = 0;  
1037 -  
1038 Publish publish("a/b/c", "Hello Barry", 1); 1036 Publish publish("a/b/c", "Hello Barry", 1);
1039 1037
1040 std::shared_ptr<Session> c1ses = c1->getSession(); 1038 std::shared_ptr<Session> c1ses = c1->getSession();
1041 c1.reset(); 1039 c1.reset();
1042 MqttPacket publishPacket(ProtocolVersion::Mqtt311, publish); 1040 MqttPacket publishPacket(ProtocolVersion::Mqtt311, publish);
1043 PublishCopyFactory fac(&publishPacket); 1041 PublishCopyFactory fac(&publishPacket);
1044 - c1ses->writePacket(fac, 1, count); 1042 + c1ses->writePacket(fac, 1);
1045 1043
1046 store->saveSessionsAndSubscriptions("/tmp/flashmqtests_sessions.db"); 1044 store->saveSessionsAndSubscriptions("/tmp/flashmqtests_sessions.db");
1047 1045
client.cpp
@@ -191,7 +191,7 @@ void Client::writeText(const std::string &amp;text) @@ -191,7 +191,7 @@ void Client::writeText(const std::string &amp;text)
191 setReadyForWriting(true); 191 setReadyForWriting(true);
192 } 192 }
193 193
194 -int Client::writeMqttPacket(const MqttPacket &packet) 194 +void Client::writeMqttPacket(const MqttPacket &packet)
195 { 195 {
196 const size_t packetSize = packet.getSizeIncludingNonPresentHeader(); 196 const size_t packetSize = packet.getSizeIncludingNonPresentHeader();
197 197
@@ -199,7 +199,7 @@ int Client::writeMqttPacket(const MqttPacket &amp;packet) @@ -199,7 +199,7 @@ int Client::writeMqttPacket(const MqttPacket &amp;packet)
199 // sending that Application Message [MQTT-3.1.2-25]." 199 // sending that Application Message [MQTT-3.1.2-25]."
200 if (packetSize > this->maxOutgoingPacketSize) 200 if (packetSize > this->maxOutgoingPacketSize)
201 { 201 {
202 - return 0; 202 + return;
203 } 203 }
204 204
205 std::lock_guard<std::mutex> locker(writeBufMutex); 205 std::lock_guard<std::mutex> locker(writeBufMutex);
@@ -215,19 +215,23 @@ int Client::writeMqttPacket(const MqttPacket &amp;packet) @@ -215,19 +215,23 @@ int Client::writeMqttPacket(const MqttPacket &amp;packet)
215 // QoS packet are queued and limited elsewhere. 215 // QoS packet are queued and limited elsewhere.
216 if (packet.packetType == PacketType::PUBLISH && packet.getQos() == 0 && packetSize > writebuf.freeSpace()) 216 if (packet.packetType == PacketType::PUBLISH && packet.getQos() == 0 && packetSize > writebuf.freeSpace())
217 { 217 {
218 - return 0; 218 + return;
219 } 219 }
220 220
221 packet.readIntoBuf(writebuf); 221 packet.readIntoBuf(writebuf);
222 222
223 - if (packet.packetType == PacketType::DISCONNECT) 223 + if (packet.packetType == PacketType::PUBLISH)
  224 + {
  225 + ThreadData *td = ThreadGlobals::getThreadData();
  226 + td->incrementSentMessageCount(1);
  227 + }
  228 + else if (packet.packetType == PacketType::DISCONNECT)
224 setReadyForDisconnect(); 229 setReadyForDisconnect();
225 230
226 setReadyForWriting(true); 231 setReadyForWriting(true);
227 - return 1;  
228 } 232 }
229 233
230 -int Client::writeMqttPacketAndBlameThisClient(PublishCopyFactory &copyFactory, char max_qos, uint16_t packet_id) 234 +void Client::writeMqttPacketAndBlameThisClient(PublishCopyFactory &copyFactory, char max_qos, uint16_t packet_id)
231 { 235 {
232 uint16_t topic_alias = 0; 236 uint16_t topic_alias = 0;
233 bool skip_topic = false; 237 bool skip_topic = false;
@@ -256,22 +260,20 @@ int Client::writeMqttPacketAndBlameThisClient(PublishCopyFactory &amp;copyFactory, c @@ -256,22 +260,20 @@ int Client::writeMqttPacketAndBlameThisClient(PublishCopyFactory &amp;copyFactory, c
256 p->setQos(max_qos); 260 p->setQos(max_qos);
257 } 261 }
258 262
259 - return writeMqttPacketAndBlameThisClient(*p); 263 + writeMqttPacketAndBlameThisClient(*p);
260 } 264 }
261 265
262 // Helper method to avoid the exception ending up at the sender of messages, which would then get disconnected. 266 // Helper method to avoid the exception ending up at the sender of messages, which would then get disconnected.
263 -int Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet) 267 +void Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet)
264 { 268 {
265 try 269 try
266 { 270 {
267 - return this->writeMqttPacket(packet); 271 + this->writeMqttPacket(packet);
268 } 272 }
269 catch (std::exception &ex) 273 catch (std::exception &ex)
270 { 274 {
271 threadData->removeClientQueued(fd); 275 threadData->removeClientQueued(fd);
272 } 276 }
273 -  
274 - return 0;  
275 } 277 }
276 278
277 // Ping responses are always the same, so hardcoding it for optimization. 279 // Ping responses are always the same, so hardcoding it for optimization.
client.h
@@ -151,9 +151,9 @@ public: @@ -151,9 +151,9 @@ public:
151 151
152 void writeText(const std::string &text); 152 void writeText(const std::string &text);
153 void writePingResp(); 153 void writePingResp();
154 - int writeMqttPacket(const MqttPacket &packet);  
155 - int writeMqttPacketAndBlameThisClient(PublishCopyFactory &copyFactory, char max_qos, uint16_t packet_id);  
156 - int writeMqttPacketAndBlameThisClient(const MqttPacket &packet); 154 + void writeMqttPacket(const MqttPacket &packet);
  155 + void writeMqttPacketAndBlameThisClient(PublishCopyFactory &copyFactory, char max_qos, uint16_t packet_id);
  156 + void writeMqttPacketAndBlameThisClient(const MqttPacket &packet);
157 bool writeBufIntoFd(); 157 bool writeBufIntoFd();
158 bool isBeingDisconnected() const { return disconnectWhenBytesWritten; } 158 bool isBeingDisconnected() const { return disconnectWhenBytesWritten; }
159 bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; } 159 bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; }
session.cpp
@@ -130,7 +130,7 @@ void Session::assignActiveConnection(std::shared_ptr&lt;Client&gt; &amp;client) @@ -130,7 +130,7 @@ void Session::assignActiveConnection(std::shared_ptr&lt;Client&gt; &amp;client)
130 * @param retain. Keep MQTT-3.3.1-9 in mind: existing subscribers don't get retain=1 on packets. 130 * @param retain. Keep MQTT-3.3.1-9 in mind: existing subscribers don't get retain=1 on packets.
131 * @param count. Reference value is updated. It's for statistics. 131 * @param count. Reference value is updated. It's for statistics.
132 */ 132 */
133 -void Session::writePacket(PublishCopyFactory &copyFactory, const char max_qos, uint64_t &count) 133 +void Session::writePacket(PublishCopyFactory &copyFactory, const char max_qos)
134 { 134 {
135 assert(max_qos <= 2); 135 assert(max_qos <= 2);
136 136
@@ -148,7 +148,7 @@ void Session::writePacket(PublishCopyFactory &amp;copyFactory, const char max_qos, u @@ -148,7 +148,7 @@ void Session::writePacket(PublishCopyFactory &amp;copyFactory, const char max_qos, u
148 { 148 {
149 if (c) 149 if (c)
150 { 150 {
151 - count += c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, 0); 151 + c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, 0);
152 } 152 }
153 } 153 }
154 else if (effectiveQos > 0) 154 else if (effectiveQos > 0)
@@ -174,7 +174,7 @@ void Session::writePacket(PublishCopyFactory &amp;copyFactory, const char max_qos, u @@ -174,7 +174,7 @@ void Session::writePacket(PublishCopyFactory &amp;copyFactory, const char max_qos, u
174 174
175 if (c) 175 if (c)
176 { 176 {
177 - count += c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, nextPacketId); 177 + c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, nextPacketId);
178 } 178 }
179 } 179 }
180 } 180 }
@@ -212,7 +212,6 @@ bool Session::clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds) @@ -212,7 +212,6 @@ bool Session::clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds)
212 212
213 /** 213 /**
214 * @brief Session::sendAllPendingQosData sends pending publishes and QoS2 control packets. 214 * @brief Session::sendAllPendingQosData sends pending publishes and QoS2 control packets.
215 - * @return the amount of messages/packets published.  
216 * 215 *
217 * [MQTT-4.4.0-1] (about MQTT 3.1.1): "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST 216 * [MQTT-4.4.0-1] (about MQTT 3.1.1): "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST
218 * re-send any unacknowledged PUBLISH Packets (where QoS > 0) and PUBREL Packets using their original Packet Identifiers. This 217 * re-send any unacknowledged PUBLISH Packets (where QoS > 0) and PUBREL Packets using their original Packet Identifiers. This
@@ -225,10 +224,8 @@ bool Session::clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds) @@ -225,10 +224,8 @@ bool Session::clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds)
225 * never know that, because IT will have received the PUBACK from FlashMQ. The QoS system is not between publisher 224 * never know that, because IT will have received the PUBACK from FlashMQ. The QoS system is not between publisher
226 * and subscriber. Users are required to implement something themselves. 225 * and subscriber. Users are required to implement something themselves.
227 */ 226 */
228 -uint64_t Session::sendAllPendingQosData() 227 +void Session::sendAllPendingQosData()
229 { 228 {
230 - uint64_t count = 0;  
231 -  
232 std::shared_ptr<Client> c = makeSharedClient(); 229 std::shared_ptr<Client> c = makeSharedClient();
233 if (c) 230 if (c)
234 { 231 {
@@ -259,7 +256,7 @@ uint64_t Session::sendAllPendingQosData() @@ -259,7 +256,7 @@ uint64_t Session::sendAllPendingQosData()
259 p.setPacketId(queuedPublish.getPacketId()); 256 p.setPacketId(queuedPublish.getPacketId());
260 //p.setDuplicate(); // TODO: this is wrong. Until we have a retransmission system, no packets can have the DUP bit set. 257 //p.setDuplicate(); // TODO: this is wrong. Until we have a retransmission system, no packets can have the DUP bit set.
261 258
262 - count += c->writeMqttPacketAndBlameThisClient(p); 259 + c->writeMqttPacketAndBlameThisClient(p);
263 260
264 pos++; 261 pos++;
265 } 262 }
@@ -268,11 +265,9 @@ uint64_t Session::sendAllPendingQosData() @@ -268,11 +265,9 @@ uint64_t Session::sendAllPendingQosData()
268 { 265 {
269 PubResponse pubRel(c->getProtocolVersion(), PacketType::PUBREL, ReasonCodes::Success, packet_id); 266 PubResponse pubRel(c->getProtocolVersion(), PacketType::PUBREL, ReasonCodes::Success, packet_id);
270 MqttPacket packet(pubRel); 267 MqttPacket packet(pubRel);
271 - count += c->writeMqttPacketAndBlameThisClient(packet); 268 + c->writeMqttPacketAndBlameThisClient(packet);
272 } 269 }
273 } 270 }
274 -  
275 - return count;  
276 } 271 }
277 272
278 bool Session::hasActiveClient() const 273 bool Session::hasActiveClient() const
session.h
@@ -78,9 +78,9 @@ public: @@ -78,9 +78,9 @@ public:
78 const std::string &getClientId() const { return client_id; } 78 const std::string &getClientId() const { return client_id; }
79 std::shared_ptr<Client> makeSharedClient() const; 79 std::shared_ptr<Client> makeSharedClient() const;
80 void assignActiveConnection(std::shared_ptr<Client> &client); 80 void assignActiveConnection(std::shared_ptr<Client> &client);
81 - void writePacket(PublishCopyFactory &copyFactory, const char max_qos, uint64_t &count); 81 + void writePacket(PublishCopyFactory &copyFactory, const char max_qos);
82 bool clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds); 82 bool clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds);
83 - uint64_t sendAllPendingQosData(); 83 + void sendAllPendingQosData();
84 bool hasActiveClient() const; 84 bool hasActiveClient() const;
85 void clearWill(); 85 void clearWill();
86 std::shared_ptr<WillPublish> &getWill(); 86 std::shared_ptr<WillPublish> &getWill();
subscriptionstore.cpp
@@ -147,8 +147,7 @@ void SubscriptionStore::addSubscription(std::shared_ptr&lt;Client&gt; &amp;client, const s @@ -147,8 +147,7 @@ void SubscriptionStore::addSubscription(std::shared_ptr&lt;Client&gt; &amp;client, const s
147 const std::shared_ptr<Session> &ses = session_it->second; 147 const std::shared_ptr<Session> &ses = session_it->second;
148 deepestNode->addSubscriber(ses, qos); 148 deepestNode->addSubscriber(ses, qos);
149 lock_guard.unlock(); 149 lock_guard.unlock();
150 - uint64_t count = giveClientRetainedMessages(ses, subtopics, qos);  
151 - client->getThreadData()->incrementSentMessageCount(count); 150 + giveClientRetainedMessages(ses, subtopics, qos);
152 } 151 }
153 } 152 }
154 } 153 }
@@ -264,8 +263,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt; @@ -264,8 +263,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
264 session->assignActiveConnection(client); 263 session->assignActiveConnection(client);
265 client->assignSession(session); 264 client->assignSession(session);
266 session->setSessionProperties(clientReceiveMax, sessionExpiryInterval, clean_start, client->getProtocolVersion()); 265 session->setSessionProperties(clientReceiveMax, sessionExpiryInterval, clean_start, client->getProtocolVersion());
267 - uint64_t count = session->sendAllPendingQosData();  
268 - client->getThreadData()->incrementSentMessageCount(count); 266 + session->sendAllPendingQosData();
269 } 267 }
270 268
271 /** 269 /**
@@ -451,7 +449,6 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory &amp;copyFactory @@ -451,7 +449,6 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory &amp;copyFactory
451 { 449 {
452 SubscriptionNode *startNode = dollar ? &rootDollar : &root; 450 SubscriptionNode *startNode = dollar ? &rootDollar : &root;
453 451
454 - uint64_t count = 0;  
455 std::forward_list<ReceivingSubscriber> subscriberSessions; 452 std::forward_list<ReceivingSubscriber> subscriberSessions;
456 453
457 { 454 {
@@ -463,13 +460,7 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory &amp;copyFactory @@ -463,13 +460,7 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory &amp;copyFactory
463 460
464 for(const ReceivingSubscriber &x : subscriberSessions) 461 for(const ReceivingSubscriber &x : subscriberSessions)
465 { 462 {
466 - x.session->writePacket(copyFactory, x.qos, count);  
467 - }  
468 -  
469 - std::shared_ptr<Client> sender = copyFactory.getSender();  
470 - if (sender)  
471 - {  
472 - sender->getThreadData()->incrementSentMessageCount(count); 463 + x.session->writePacket(copyFactory, x.qos);
473 } 464 }
474 } 465 }
475 466
@@ -524,11 +515,9 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector&lt;std::s @@ -524,11 +515,9 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector&lt;std::s
524 } 515 }
525 } 516 }
526 517
527 -uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr<Session> &ses,  
528 - const std::vector<std::string> &subscribeSubtopics, char max_qos) 518 +void SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr<Session> &ses,
  519 + const std::vector<std::string> &subscribeSubtopics, char max_qos)
529 { 520 {
530 - uint64_t count = 0;  
531 -  
532 RetainedMessageNode *startNode = &retainedMessagesRoot; 521 RetainedMessageNode *startNode = &retainedMessagesRoot;
533 if (!subscribeSubtopics.empty() && !subscribeSubtopics[0].empty() > 0 && subscribeSubtopics[0][0] == '$') 522 if (!subscribeSubtopics.empty() && !subscribeSubtopics[0].empty() > 0 && subscribeSubtopics[0][0] == '$')
534 startNode = &retainedMessagesRootDollar; 523 startNode = &retainedMessagesRootDollar;
@@ -544,10 +533,8 @@ uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr&lt;Ses @@ -544,10 +533,8 @@ uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr&lt;Ses
544 for(Publish &publish : packetList) 533 for(Publish &publish : packetList)
545 { 534 {
546 PublishCopyFactory copyFactory(&publish); 535 PublishCopyFactory copyFactory(&publish);
547 - ses->writePacket(copyFactory, max_qos, count); 536 + ses->writePacket(copyFactory, max_qos);
548 } 537 }
549 -  
550 - return count;  
551 } 538 }
552 539
553 void SubscriptionStore::setRetainedMessage(const Publish &publish, const std::vector<std::string> &subtopics) 540 void SubscriptionStore::setRetainedMessage(const Publish &publish, const std::vector<std::string> &subtopics)
subscriptionstore.h
@@ -150,8 +150,8 @@ public: @@ -150,8 +150,8 @@ public:
150 void sendQueuedWillMessages(); 150 void sendQueuedWillMessages();
151 void queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow = false); 151 void queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow = false);
152 void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, bool dollar = false); 152 void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, bool dollar = false);
153 - uint64_t giveClientRetainedMessages(const std::shared_ptr<Session> &ses,  
154 - const std::vector<std::string> &subscribeSubtopics, char max_qos); 153 + void giveClientRetainedMessages(const std::shared_ptr<Session> &ses,
  154 + const std::vector<std::string> &subscribeSubtopics, char max_qos);
155 155
156 void setRetainedMessage(const Publish &publish, const std::vector<std::string> &subtopics); 156 void setRetainedMessage(const Publish &publish, const std::vector<std::string> &subtopics);
157 157