Commit ae1fbb00a48d1a9bdda7e7e05f03d389819c15c9

Authored by Wiebe Cazemier
1 parent 339a55fe

Don't count dropped publish message in stats

client.cpp
@@ -177,7 +177,7 @@ void Client::writeText(const std::string &text) @@ -177,7 +177,7 @@ void Client::writeText(const std::string &text)
177 setReadyForWriting(true); 177 setReadyForWriting(true);
178 } 178 }
179 179
180 -void Client::writeMqttPacket(const MqttPacket &packet, const char qos) 180 +int Client::writeMqttPacket(const MqttPacket &packet, const char qos)
181 { 181 {
182 std::lock_guard<std::mutex> locker(writeBufMutex); 182 std::lock_guard<std::mutex> locker(writeBufMutex);
183 183
@@ -192,7 +192,7 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet, const char qos) @@ -192,7 +192,7 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet, const char qos)
192 // QoS packet are queued and limited elsewhere. 192 // QoS packet are queued and limited elsewhere.
193 if (packet.packetType == PacketType::PUBLISH && qos == 0 && packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) 193 if (packet.packetType == PacketType::PUBLISH && qos == 0 && packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace())
194 { 194 {
195 - return; 195 + return 0;
196 } 196 }
197 197
198 writebuf.ensureFreeSpace(packet.getSizeIncludingNonPresentHeader()); 198 writebuf.ensureFreeSpace(packet.getSizeIncludingNonPresentHeader());
@@ -211,19 +211,22 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet, const char qos) @@ -211,19 +211,22 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet, const char qos)
211 setReadyForDisconnect(); 211 setReadyForDisconnect();
212 212
213 setReadyForWriting(true); 213 setReadyForWriting(true);
  214 + return 1;
214 } 215 }
215 216
216 // Helper method to avoid the exception ending up at the sender of messages, which would then get disconnected. 217 // Helper method to avoid the exception ending up at the sender of messages, which would then get disconnected.
217 -void Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet, const char qos) 218 +int Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet, const char qos)
218 { 219 {
219 try 220 try
220 { 221 {
221 - this->writeMqttPacket(packet, qos); 222 + return this->writeMqttPacket(packet, qos);
222 } 223 }
223 catch (std::exception &ex) 224 catch (std::exception &ex)
224 { 225 {
225 threadData->removeClientQueued(fd); 226 threadData->removeClientQueued(fd);
226 } 227 }
  228 +
  229 + return 0;
227 } 230 }
228 231
229 // Ping responses are always the same, so hardcoding it for optimization. 232 // Ping responses are always the same, so hardcoding it for optimization.
client.h
@@ -121,8 +121,8 @@ public: @@ -121,8 +121,8 @@ public:
121 121
122 void writeText(const std::string &text); 122 void writeText(const std::string &text);
123 void writePingResp(); 123 void writePingResp();
124 - void writeMqttPacket(const MqttPacket &packet, const char qos = 0);  
125 - void writeMqttPacketAndBlameThisClient(const MqttPacket &packet, const char qos); 124 + int writeMqttPacket(const MqttPacket &packet, const char qos = 0);
  125 + int writeMqttPacketAndBlameThisClient(const MqttPacket &packet, const char qos);
126 bool writeBufIntoFd(); 126 bool writeBufIntoFd();
127 bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; } 127 bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; }
128 128
session.cpp
@@ -137,8 +137,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos, bool retain, u @@ -137,8 +137,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos, bool retain, u
137 137
138 if (c) 138 if (c)
139 { 139 {
140 - c->writeMqttPacketAndBlameThisClient(packet, qos);  
141 - count++; 140 + count += c->writeMqttPacketAndBlameThisClient(packet, qos);
142 } 141 }
143 } 142 }
144 else if (qos > 0) 143 else if (qos > 0)
@@ -165,9 +164,8 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos, bool retain, u @@ -165,9 +164,8 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos, bool retain, u
165 std::shared_ptr<Client> c = makeSharedClient(); 164 std::shared_ptr<Client> c = makeSharedClient();
166 if (c) 165 if (c)
167 { 166 {
168 - c->writeMqttPacketAndBlameThisClient(*copyPacket.get(), qos); 167 + count += c->writeMqttPacketAndBlameThisClient(*copyPacket.get(), qos);
169 copyPacket->setDuplicate(); // Any dealings with this packet from here will be a duplicate. 168 copyPacket->setDuplicate(); // Any dealings with this packet from here will be a duplicate.
170 - count++;  
171 } 169 }
172 } 170 }
173 } 171 }
@@ -201,16 +199,15 @@ uint64_t Session::sendPendingQosMessages() @@ -201,16 +199,15 @@ uint64_t Session::sendPendingQosMessages()
201 std::lock_guard<std::mutex> locker(qosQueueMutex); 199 std::lock_guard<std::mutex> locker(qosQueueMutex);
202 for (const std::shared_ptr<MqttPacket> &qosMessage : qosPacketQueue) 200 for (const std::shared_ptr<MqttPacket> &qosMessage : qosPacketQueue)
203 { 201 {
204 - c->writeMqttPacketAndBlameThisClient(*qosMessage.get(), qosMessage->getQos()); 202 + count += c->writeMqttPacketAndBlameThisClient(*qosMessage.get(), qosMessage->getQos());
205 qosMessage->setDuplicate(); // Any dealings with this packet from here will be a duplicate. 203 qosMessage->setDuplicate(); // Any dealings with this packet from here will be a duplicate.
206 - count++;  
207 } 204 }
208 205
209 for (const uint16_t packet_id : outgoingQoS2MessageIds) 206 for (const uint16_t packet_id : outgoingQoS2MessageIds)
210 { 207 {
211 PubRel pubRel(packet_id); 208 PubRel pubRel(packet_id);
212 MqttPacket packet(pubRel); 209 MqttPacket packet(pubRel);
213 - c->writeMqttPacketAndBlameThisClient(packet, 2); 210 + count += c->writeMqttPacketAndBlameThisClient(packet, 2);
214 } 211 }
215 } 212 }
216 213