You need to sign in before continuing.

Commit 7870dc8ab7805e676fb7fb784ae4b9d8257a37d2

Authored by Wiebe Cazemier
1 parent ac53fea0

Flow control based on receive maximum

It merely drops packets when they exceed it. The specs are unclear about
whether you're supposed to delay transmission until the quota is
non-negative again. I decided against it because of increased
complexity, and because on a continously overloaded client, this makes
no sense.

Effectively, this formalizes the 'max qos pending' mechanism that was
already in place.

It also includes PUBACK/PUBREL/PUBCOMP error handling, because that
needed to be done for proper quota control.
client.cpp
@@ -27,9 +27,9 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>. @@ -27,9 +27,9 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>.
27 #include "utils.h" 27 #include "utils.h"
28 #include "threadglobals.h" 28 #include "threadglobals.h"
29 29
30 -StowedClientRegistrationData::StowedClientRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval) : 30 +StowedClientRegistrationData::StowedClientRegistrationData(bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval) :
31 clean_start(clean_start), 31 clean_start(clean_start),
32 - maxQosPackets(maxQosPackets), 32 + clientReceiveMax(clientReceiveMax),
33 sessionExpiryInterval(sessionExpiryInterval) 33 sessionExpiryInterval(sessionExpiryInterval)
34 { 34 {
35 35
@@ -448,9 +448,9 @@ void Client::serverInitiatedDisconnect(ReasonCodes reason) @@ -448,9 +448,9 @@ void Client::serverInitiatedDisconnect(ReasonCodes reason)
448 * @param maxQosPackets 448 * @param maxQosPackets
449 * @param sessionExpiryInterval 449 * @param sessionExpiryInterval
450 */ 450 */
451 -void Client::setRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval) 451 +void Client::setRegistrationData(bool clean_start, uint16_t client_receive_max, uint32_t sessionExpiryInterval)
452 { 452 {
453 - this->registrationData = std::make_unique<StowedClientRegistrationData>(clean_start, maxQosPackets, sessionExpiryInterval); 453 + this->registrationData = std::make_unique<StowedClientRegistrationData>(clean_start, client_receive_max, sessionExpiryInterval);
454 } 454 }
455 455
456 const std::unique_ptr<StowedClientRegistrationData> &Client::getRegistrationData() const 456 const std::unique_ptr<StowedClientRegistrationData> &Client::getRegistrationData() const
client.h
@@ -48,10 +48,10 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -48,10 +48,10 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
48 struct StowedClientRegistrationData 48 struct StowedClientRegistrationData
49 { 49 {
50 const bool clean_start; 50 const bool clean_start;
51 - const uint16_t maxQosPackets; 51 + const uint16_t clientReceiveMax;
52 const uint32_t sessionExpiryInterval; 52 const uint32_t sessionExpiryInterval;
53 53
54 - StowedClientRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval); 54 + StowedClientRegistrationData(bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval);
55 }; 55 };
56 56
57 class Client 57 class Client
@@ -171,7 +171,7 @@ public: @@ -171,7 +171,7 @@ public:
171 void sendOrQueueWill(); 171 void sendOrQueueWill();
172 void serverInitiatedDisconnect(ReasonCodes reason); 172 void serverInitiatedDisconnect(ReasonCodes reason);
173 173
174 - void setRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval); 174 + void setRegistrationData(bool clean_start, uint16_t client_receive_max, uint32_t sessionExpiryInterval);
175 const std::unique_ptr<StowedClientRegistrationData> &getRegistrationData() const; 175 const std::unique_ptr<StowedClientRegistrationData> &getRegistrationData() const;
176 void clearRegistrationData(); 176 void clearRegistrationData();
177 177
configfileparser.cpp
@@ -445,9 +445,9 @@ void ConfigFileParser::loadFile(bool test) @@ -445,9 +445,9 @@ void ConfigFileParser::loadFile(bool test)
445 if (key == "max_qos_msg_pending_per_client") 445 if (key == "max_qos_msg_pending_per_client")
446 { 446 {
447 int newVal = std::stoi(value); 447 int newVal = std::stoi(value);
448 - if (newVal < 32 || newVal > 65530) 448 + if (newVal < 32 || newVal > 65535)
449 { 449 {
450 - throw ConfigFileException(formatString("max_qos_msg_pending_per_client value '%d' is invalid. Valid values between 32 and 65530.", newVal)); 450 + throw ConfigFileException(formatString("max_qos_msg_pending_per_client value '%d' is invalid. Valid values between 32 and 65535.", newVal));
451 } 451 }
452 tmpSettings->maxQosMsgPendingPerClient = newVal; 452 tmpSettings->maxQosMsgPendingPerClient = newVal;
453 } 453 }
mqttpacket.cpp
@@ -385,7 +385,7 @@ void MqttPacket::handleConnect() @@ -385,7 +385,7 @@ void MqttPacket::handleConnect()
385 385
386 uint16_t keep_alive = readTwoBytesToUInt16(); 386 uint16_t keep_alive = readTwoBytesToUInt16();
387 387
388 - uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient; 388 + uint16_t client_receive_max = settings.maxQosMsgPendingPerClient;
389 uint32_t session_expire = settings.getExpireSessionAfterSeconds(); 389 uint32_t session_expire = settings.getExpireSessionAfterSeconds();
390 uint32_t max_outgoing_packet_size = settings.maxPacketSize; 390 uint32_t max_outgoing_packet_size = settings.maxPacketSize;
391 uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases 391 uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases
@@ -410,7 +410,7 @@ void MqttPacket::handleConnect() @@ -410,7 +410,7 @@ void MqttPacket::handleConnect()
410 session_expire = std::min<uint32_t>(readFourBytesToUint32(), session_expire); 410 session_expire = std::min<uint32_t>(readFourBytesToUint32(), session_expire);
411 break; 411 break;
412 case Mqtt5Properties::ReceiveMaximum: 412 case Mqtt5Properties::ReceiveMaximum:
413 - max_qos_packets = std::min<int16_t>(readTwoBytesToUInt16(), max_qos_packets); 413 + client_receive_max = std::min<int16_t>(readTwoBytesToUInt16(), client_receive_max);
414 break; 414 break;
415 case Mqtt5Properties::MaximumPacketSize: 415 case Mqtt5Properties::MaximumPacketSize:
416 max_outgoing_packet_size = std::min<uint32_t>(readFourBytesToUint32(), max_outgoing_packet_size); 416 max_outgoing_packet_size = std::min<uint32_t>(readFourBytesToUint32(), max_outgoing_packet_size);
@@ -445,6 +445,11 @@ void MqttPacket::handleConnect() @@ -445,6 +445,11 @@ void MqttPacket::handleConnect()
445 } 445 }
446 } 446 }
447 447
  448 + if (client_receive_max == 0 || max_outgoing_packet_size == 0)
  449 + {
  450 + throw ProtocolError("Receive max or max outgoing packet size can't be 0.", ReasonCodes::ProtocolError);
  451 + }
  452 +
448 std::string client_id = readBytesToString(); 453 std::string client_id = readBytesToString();
449 454
450 std::string username; 455 std::string username;
@@ -610,7 +615,7 @@ void MqttPacket::handleConnect() @@ -610,7 +615,7 @@ void MqttPacket::handleConnect()
610 { 615 {
611 connAck->propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>(); 616 connAck->propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>();
612 connAck->propertyBuilder->writeSessionExpiry(session_expire); 617 connAck->propertyBuilder->writeSessionExpiry(session_expire);
613 - connAck->propertyBuilder->writeReceiveMax(max_qos_packets); 618 + connAck->propertyBuilder->writeReceiveMax(settings.maxQosMsgPendingPerClient);
614 connAck->propertyBuilder->writeRetainAvailable(1); 619 connAck->propertyBuilder->writeRetainAvailable(1);
615 connAck->propertyBuilder->writeMaxPacketSize(sender->getMaxIncomingPacketSize()); 620 connAck->propertyBuilder->writeMaxPacketSize(sender->getMaxIncomingPacketSize());
616 if (clientIdGenerated) 621 if (clientIdGenerated)
@@ -629,7 +634,7 @@ void MqttPacket::handleConnect() @@ -629,7 +634,7 @@ void MqttPacket::handleConnect()
629 sender->stageConnack(std::move(connAck)); 634 sender->stageConnack(std::move(connAck));
630 } 635 }
631 636
632 - sender->setRegistrationData(clean_start, max_qos_packets, session_expire); 637 + sender->setRegistrationData(clean_start, client_receive_max, session_expire);
633 638
634 Authentication &authentication = *ThreadGlobals::getAuth(); 639 Authentication &authentication = *ThreadGlobals::getAuth();
635 AuthResult authResult = AuthResult::login_denied; 640 AuthResult authResult = AuthResult::login_denied;
@@ -1176,7 +1181,7 @@ void MqttPacket::handlePublish() @@ -1176,7 +1181,7 @@ void MqttPacket::handlePublish()
1176 void MqttPacket::handlePubAck() 1181 void MqttPacket::handlePubAck()
1177 { 1182 {
1178 uint16_t packet_id = readTwoBytesToUInt16(); 1183 uint16_t packet_id = readTwoBytesToUInt16();
1179 - sender->getSession()->clearQosMessage(packet_id); 1184 + sender->getSession()->clearQosMessage(packet_id, true);
1180 } 1185 }
1181 1186
1182 /** 1187 /**
@@ -1185,12 +1190,29 @@ void MqttPacket::handlePubAck() @@ -1185,12 +1190,29 @@ void MqttPacket::handlePubAck()
1185 void MqttPacket::handlePubRec() 1190 void MqttPacket::handlePubRec()
1186 { 1191 {
1187 const uint16_t packet_id = readTwoBytesToUInt16(); 1192 const uint16_t packet_id = readTwoBytesToUInt16();
1188 - sender->getSession()->clearQosMessage(packet_id);  
1189 - sender->getSession()->addOutgoingQoS2MessageId(packet_id);  
1190 1193
1191 - PubResponse pubRel(this->protocolVersion, PacketType::PUBREL, ReasonCodes::Success, packet_id);  
1192 - MqttPacket response(pubRel);  
1193 - sender->writeMqttPacket(response); 1194 + ReasonCodes reasonCode = ReasonCodes::Success; // Default when not specified, or MQTT3
  1195 +
  1196 + if (!atEnd())
  1197 + {
  1198 + reasonCode = static_cast<ReasonCodes>(readByte());
  1199 + }
  1200 +
  1201 + const bool publishTerminatesHere = reasonCode >= ReasonCodes::UnspecifiedError;
  1202 + const bool foundAndRemoved = sender->getSession()->clearQosMessage(packet_id, publishTerminatesHere);
  1203 +
  1204 + // "If it has sent a PUBREC with a Reason Code of 0x80 or greater, the receiver MUST treat any subsequent PUBLISH packet
  1205 + // that contains that Packet Identifier as being a new Application Message."
  1206 + if (!publishTerminatesHere)
  1207 + {
  1208 + sender->getSession()->addOutgoingQoS2MessageId(packet_id);
  1209 +
  1210 + // MQTT5: "[The sender] MUST send a PUBREL packet when it receives a PUBREC packet from the receiver with a Reason Code value less than 0x80"
  1211 + const ReasonCodes reason = foundAndRemoved ? ReasonCodes::Success : ReasonCodes::PacketIdentifierNotFound;
  1212 + PubResponse pubRel(this->protocolVersion, PacketType::PUBREL, reason, packet_id);
  1213 + MqttPacket response(pubRel);
  1214 + sender->writeMqttPacket(response);
  1215 + }
1194 } 1216 }
1195 1217
1196 /** 1218 /**
@@ -1203,9 +1225,10 @@ void MqttPacket::handlePubRel() @@ -1203,9 +1225,10 @@ void MqttPacket::handlePubRel()
1203 throw ProtocolError("PUBREL first byte LSB must be 0010.", ReasonCodes::MalformedPacket); 1225 throw ProtocolError("PUBREL first byte LSB must be 0010.", ReasonCodes::MalformedPacket);
1204 1226
1205 const uint16_t packet_id = readTwoBytesToUInt16(); 1227 const uint16_t packet_id = readTwoBytesToUInt16();
1206 - sender->getSession()->removeIncomingQoS2MessageId(packet_id); 1228 + const bool foundAndRemoved = sender->getSession()->removeIncomingQoS2MessageId(packet_id);
  1229 + const ReasonCodes reason = foundAndRemoved ? ReasonCodes::Success : ReasonCodes::PacketIdentifierNotFound;
1207 1230
1208 - PubResponse pubcomp(this->protocolVersion, PacketType::PUBCOMP, ReasonCodes::Success, packet_id); 1231 + PubResponse pubcomp(this->protocolVersion, PacketType::PUBCOMP, reason, packet_id);
1209 MqttPacket response(pubcomp); 1232 MqttPacket response(pubcomp);
1210 sender->writeMqttPacket(response); 1233 sender->writeMqttPacket(response);
1211 } 1234 }
qospacketqueue.cpp
@@ -27,8 +27,10 @@ size_t QueuedPublish::getApproximateMemoryFootprint() const @@ -27,8 +27,10 @@ size_t QueuedPublish::getApproximateMemoryFootprint() const
27 } 27 }
28 28
29 29
30 -void QoSPublishQueue::erase(const uint16_t packet_id) 30 +bool QoSPublishQueue::erase(const uint16_t packet_id)
31 { 31 {
  32 + bool result = false;
  33 +
32 auto it = queue.begin(); 34 auto it = queue.begin();
33 auto end = queue.end(); 35 auto end = queue.end();
34 while (it != end) 36 while (it != end)
@@ -43,12 +45,15 @@ void QoSPublishQueue::erase(const uint16_t packet_id) @@ -43,12 +45,15 @@ void QoSPublishQueue::erase(const uint16_t packet_id)
43 qosQueueBytes = 0; 45 qosQueueBytes = 0;
44 46
45 queue.erase(it); 47 queue.erase(it);
  48 + result = true;
46 49
47 break; 50 break;
48 } 51 }
49 52
50 it++; 53 it++;
51 } 54 }
  55 +
  56 + return result;
52 } 57 }
53 58
54 std::list<QueuedPublish>::iterator QoSPublishQueue::erase(std::list<QueuedPublish>::iterator pos) 59 std::list<QueuedPublish>::iterator QoSPublishQueue::erase(std::list<QueuedPublish>::iterator pos)
qospacketqueue.h
@@ -30,7 +30,7 @@ class QoSPublishQueue @@ -30,7 +30,7 @@ class QoSPublishQueue
30 ssize_t qosQueueBytes = 0; 30 ssize_t qosQueueBytes = 0;
31 31
32 public: 32 public:
33 - void erase(const uint16_t packet_id); 33 + bool erase(const uint16_t packet_id);
34 std::list<QueuedPublish>::iterator erase(std::list<QueuedPublish>::iterator pos); 34 std::list<QueuedPublish>::iterator erase(std::list<QueuedPublish>::iterator pos);
35 size_t size() const; 35 size_t size() const;
36 size_t getByteSize() const; 36 size_t getByteSize() const;
session.cpp
@@ -27,12 +27,20 @@ Session::Session() @@ -27,12 +27,20 @@ Session::Session()
27 const Settings &settings = *ThreadGlobals::getSettings(); 27 const Settings &settings = *ThreadGlobals::getSettings();
28 28
29 // Sessions also get defaults from the handleConnect() method, but when you create sessions elsewhere, we do need some sensible defaults. 29 // Sessions also get defaults from the handleConnect() method, but when you create sessions elsewhere, we do need some sensible defaults.
30 - this->maxQosMsgPending = settings.maxQosMsgPendingPerClient; 30 + this->flowControlQuota = settings.maxQosMsgPendingPerClient;
31 this->sessionExpiryInterval = settings.expireSessionsAfterSeconds; 31 this->sessionExpiryInterval = settings.expireSessionsAfterSeconds;
32 } 32 }
33 33
34 -bool Session::requiresPacketRetransmission() const 34 +void Session::increaseFlowControlQuota()
35 { 35 {
  36 + flowControlQuota++;
  37 + this->flowControlQuota = std::min<int>(flowControlQuota, flowControlCealing);
  38 +}
  39 +
  40 +bool Session::requiresQoSQueueing() const
  41 +{
  42 + return true;
  43 +
36 const std::shared_ptr<Client> client = makeSharedClient(); 44 const std::shared_ptr<Client> client = makeSharedClient();
37 45
38 if (!client) 46 if (!client)
@@ -49,8 +57,7 @@ bool Session::requiresPacketRetransmission() const @@ -49,8 +57,7 @@ bool Session::requiresPacketRetransmission() const
49 void Session::increasePacketId() 57 void Session::increasePacketId()
50 { 58 {
51 nextPacketId++; 59 nextPacketId++;
52 - if (nextPacketId == 0)  
53 - nextPacketId++; 60 + nextPacketId = std::max<uint16_t>(nextPacketId, 1);
54 } 61 }
55 62
56 /** 63 /**
@@ -146,81 +153,65 @@ void Session::writePacket(PublishCopyFactory &amp;copyFactory, const char max_qos, u @@ -146,81 +153,65 @@ void Session::writePacket(PublishCopyFactory &amp;copyFactory, const char max_qos, u
146 } 153 }
147 else if (effectiveQos > 0) 154 else if (effectiveQos > 0)
148 { 155 {
149 - const bool requiresRetransmission = requiresPacketRetransmission(); 156 + std::unique_lock<std::mutex> locker(qosQueueMutex);
150 157
151 - if (requiresRetransmission) 158 + if (this->flowControlQuota <= 0 || (qosPacketQueue.getByteSize() >= settings->maxQosBytesPendingPerClient && qosPacketQueue.size() > 0))
152 { 159 {
153 - std::unique_lock<std::mutex> locker(qosQueueMutex);  
154 -  
155 - const size_t totalQosPacketsInTransit = qosPacketQueue.size() + incomingQoS2MessageIds.size() + outgoingQoS2MessageIds.size();  
156 - if (totalQosPacketsInTransit >= maxQosMsgPending  
157 - || (qosPacketQueue.getByteSize() >= settings->maxQosBytesPendingPerClient && qosPacketQueue.size() > 0)) 160 + if (QoSLogPrintedAtId != nextPacketId)
158 { 161 {
159 - if (QoSLogPrintedAtId != nextPacketId)  
160 - {  
161 - logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because max in-transit packet count reached.", client_id.c_str());  
162 - QoSLogPrintedAtId = nextPacketId;  
163 - }  
164 - return; 162 + logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because it hasn't seen enough PUBACK/PUBCOMP/PUBRECs to release places "
  163 + "or it exceeded 'max_qos_bytes_pending_per_client'.", client_id.c_str());
  164 + QoSLogPrintedAtId = nextPacketId;
165 } 165 }
  166 + return;
  167 + }
166 168
167 - increasePacketId(); 169 + increasePacketId();
  170 + flowControlQuota--;
168 171
  172 + if (requiresQoSQueueing())
169 qosPacketQueue.queuePublish(copyFactory, nextPacketId, effectiveQos); 173 qosPacketQueue.queuePublish(copyFactory, nextPacketId, effectiveQos);
170 174
171 - if (c)  
172 - {  
173 - count += c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, nextPacketId);  
174 - }  
175 - }  
176 - else 175 + if (c)
177 { 176 {
178 - // We don't need to make a copy of the packet in this branch, because:  
179 - // - The packet to give the client won't shrink in size because source and client have a packet_id.  
180 - // - We don't have to store the copy in the session for retransmission, see Session::requiresPacketRetransmission()  
181 - // So, we just keep altering the original published packet.  
182 -  
183 - std::unique_lock<std::mutex> locker(qosQueueMutex);  
184 -  
185 - if (qosInFlightCounter >= 65530) // Includes a small safety margin.  
186 - {  
187 - if (QoSLogPrintedAtId != nextPacketId)  
188 - {  
189 - logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because it hasn't seen enough PUBACKs to release places.", client_id.c_str());  
190 - QoSLogPrintedAtId = nextPacketId;  
191 - }  
192 - return;  
193 - }  
194 -  
195 - increasePacketId();  
196 -  
197 - qosInFlightCounter++;  
198 - assert(c); // with requiresRetransmission==false, there must be a client.  
199 count += c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, nextPacketId); 177 count += c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, nextPacketId);
200 } 178 }
201 } 179 }
202 } 180 }
203 } 181 }
204 182
205 -void Session::clearQosMessage(uint16_t packet_id) 183 +/**
  184 + * @brief Session::clearQosMessage clears a QOS message from the queue. Note that in QoS 2, that doesn't complete the handshake.
  185 + * @param packet_id
  186 + * @param qosHandshakeEnds can be set to true when you know the QoS handshake ends, (like) when PUBREC contains an error.
  187 + * @return whether the packet_id in question was found.
  188 + */
  189 +bool Session::clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds)
206 { 190 {
207 #ifndef NDEBUG 191 #ifndef NDEBUG
208 logger->logf(LOG_DEBUG, "Clearing QoS message for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, qosPacketQueue.size()); 192 logger->logf(LOG_DEBUG, "Clearing QoS message for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, qosPacketQueue.size());
209 #endif 193 #endif
210 194
  195 + bool result = false;
  196 +
211 std::lock_guard<std::mutex> locker(qosQueueMutex); 197 std::lock_guard<std::mutex> locker(qosQueueMutex);
212 - if (requiresPacketRetransmission())  
213 - qosPacketQueue.erase(packet_id); 198 + if (requiresQoSQueueing())
  199 + result = qosPacketQueue.erase(packet_id);
214 else 200 else
215 { 201 {
216 - qosInFlightCounter--;  
217 - qosInFlightCounter = std::max<int>(0, qosInFlightCounter); // Should never happen, but in case we receive too many PUBACKs. 202 + result = true;
218 } 203 }
219 -}  
220 204
  205 + if (qosHandshakeEnds)
  206 + {
  207 + increaseFlowControlQuota();
  208 + }
  209 +
  210 + return result;
  211 +}
221 212
222 /** 213 /**
223 - * @brief Session::sendPendingQosMessages sends pending publishes and QoS2 control packets. 214 + * @brief Session::sendAllPendingQosData sends pending publishes and QoS2 control packets.
224 * @return the amount of messages/packets published. 215 * @return the amount of messages/packets published.
225 * 216 *
226 * [MQTT-4.4.0-1] (about MQTT 3.1.1): "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST 217 * [MQTT-4.4.0-1] (about MQTT 3.1.1): "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST
@@ -234,7 +225,7 @@ void Session::clearQosMessage(uint16_t packet_id) @@ -234,7 +225,7 @@ void Session::clearQosMessage(uint16_t packet_id)
234 * never know that, because IT will have received the PUBACK from FlashMQ. The QoS system is not between publisher 225 * never know that, because IT will have received the PUBACK from FlashMQ. The QoS system is not between publisher
235 * and subscriber. Users are required to implement something themselves. 226 * and subscriber. Users are required to implement something themselves.
236 */ 227 */
237 -uint64_t Session::sendPendingQosMessages() 228 +uint64_t Session::sendAllPendingQosData()
238 { 229 {
239 uint64_t count = 0; 230 uint64_t count = 0;
240 231
@@ -254,12 +245,23 @@ uint64_t Session::sendPendingQosMessages() @@ -254,12 +245,23 @@ uint64_t Session::sendPendingQosMessages()
254 pos = qosPacketQueue.erase(pos); 245 pos = qosPacketQueue.erase(pos);
255 continue; 246 continue;
256 } 247 }
257 - pos++; 248 +
  249 + if (flowControlQuota <= 0)
  250 + {
  251 + logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because it exceeds its receive maximum.", client_id.c_str());
  252 + pos = qosPacketQueue.erase(pos);
  253 + continue;
  254 + }
  255 +
  256 + flowControlQuota--;
258 257
259 MqttPacket p(c->getProtocolVersion(), pub); 258 MqttPacket p(c->getProtocolVersion(), pub);
260 - p.setDuplicate(); 259 + p.setPacketId(queuedPublish.getPacketId());
  260 + //p.setDuplicate(); // TODO: this is wrong. Until we have a retransmission system, no packets can have the DUP bit set.
261 261
262 count += c->writeMqttPacketAndBlameThisClient(p); 262 count += c->writeMqttPacketAndBlameThisClient(p);
  263 +
  264 + pos++;
263 } 265 }
264 266
265 for (const uint16_t packet_id : outgoingQoS2MessageIds) 267 for (const uint16_t packet_id : outgoingQoS2MessageIds)
@@ -310,7 +312,7 @@ bool Session::incomingQoS2MessageIdInTransit(uint16_t packet_id) @@ -310,7 +312,7 @@ bool Session::incomingQoS2MessageIdInTransit(uint16_t packet_id)
310 return it != incomingQoS2MessageIds.end(); 312 return it != incomingQoS2MessageIds.end();
311 } 313 }
312 314
313 -void Session::removeIncomingQoS2MessageId(u_int16_t packet_id) 315 +bool Session::removeIncomingQoS2MessageId(u_int16_t packet_id)
314 { 316 {
315 assert(packet_id > 0); 317 assert(packet_id > 0);
316 318
@@ -320,9 +322,16 @@ void Session::removeIncomingQoS2MessageId(u_int16_t packet_id) @@ -320,9 +322,16 @@ void Session::removeIncomingQoS2MessageId(u_int16_t packet_id)
320 logger->logf(LOG_DEBUG, "As QoS 2 receiver: publish released (PUBREL) for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, incomingQoS2MessageIds.size()); 322 logger->logf(LOG_DEBUG, "As QoS 2 receiver: publish released (PUBREL) for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, incomingQoS2MessageIds.size());
321 #endif 323 #endif
322 324
  325 + bool result = false;
  326 +
323 const auto it = incomingQoS2MessageIds.find(packet_id); 327 const auto it = incomingQoS2MessageIds.find(packet_id);
324 if (it != incomingQoS2MessageIds.end()) 328 if (it != incomingQoS2MessageIds.end())
  329 + {
325 incomingQoS2MessageIds.erase(it); 330 incomingQoS2MessageIds.erase(it);
  331 + result = true;
  332 + }
  333 +
  334 + return result;
326 } 335 }
327 336
328 void Session::addOutgoingQoS2MessageId(uint16_t packet_id) 337 void Session::addOutgoingQoS2MessageId(uint16_t packet_id)
@@ -342,6 +351,8 @@ void Session::removeOutgoingQoS2MessageId(u_int16_t packet_id) @@ -342,6 +351,8 @@ void Session::removeOutgoingQoS2MessageId(u_int16_t packet_id)
342 const auto it = outgoingQoS2MessageIds.find(packet_id); 351 const auto it = outgoingQoS2MessageIds.find(packet_id);
343 if (it != outgoingQoS2MessageIds.end()) 352 if (it != outgoingQoS2MessageIds.end())
344 outgoingQoS2MessageIds.erase(it); 353 outgoingQoS2MessageIds.erase(it);
  354 +
  355 + increaseFlowControlQuota();
345 } 356 }
346 357
347 /** 358 /**
@@ -355,9 +366,10 @@ bool Session::getDestroyOnDisconnect() const @@ -355,9 +366,10 @@ bool Session::getDestroyOnDisconnect() const
355 return destroyOnDisconnect; 366 return destroyOnDisconnect;
356 } 367 }
357 368
358 -void Session::setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version) 369 +void Session::setSessionProperties(uint16_t clientReceiveMax, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version)
359 { 370 {
360 - this->maxQosMsgPending = maxQosPackets; 371 + this->flowControlQuota = clientReceiveMax;
  372 + this->flowControlCealing = clientReceiveMax;
361 this->sessionExpiryInterval = sessionExpiryInterval; 373 this->sessionExpiryInterval = sessionExpiryInterval;
362 374
363 if (protocol_version <= ProtocolVersion::Mqtt311 && clean_start) 375 if (protocol_version <= ProtocolVersion::Mqtt311 && clean_start)
session.h
@@ -45,9 +45,15 @@ class Session @@ -45,9 +45,15 @@ class Session
45 std::set<uint16_t> outgoingQoS2MessageIds; 45 std::set<uint16_t> outgoingQoS2MessageIds;
46 std::mutex qosQueueMutex; 46 std::mutex qosQueueMutex;
47 uint16_t nextPacketId = 0; 47 uint16_t nextPacketId = 0;
48 - uint16_t qosInFlightCounter = 0; 48 +
  49 + /**
  50 + * Even though flow control data is not part of the session state, I'm keeping it here because there are already
  51 + * mutexes that they can be placed under, saving additional synchronization.
  52 + */
  53 + int flowControlCealing = 0xFFFF;
  54 + int flowControlQuota = 0xFFFF;
  55 +
49 uint32_t sessionExpiryInterval = 0; 56 uint32_t sessionExpiryInterval = 0;
50 - uint16_t maxQosMsgPending;  
51 uint16_t QoSLogPrintedAtId = 0; 57 uint16_t QoSLogPrintedAtId = 0;
52 bool destroyOnDisconnect = false; 58 bool destroyOnDisconnect = false;
53 std::shared_ptr<WillPublish> willPublish; 59 std::shared_ptr<WillPublish> willPublish;
@@ -55,7 +61,9 @@ class Session @@ -55,7 +61,9 @@ class Session
55 std::chrono::time_point<std::chrono::steady_clock> removalQueuedAt; 61 std::chrono::time_point<std::chrono::steady_clock> removalQueuedAt;
56 Logger *logger = Logger::getInstance(); 62 Logger *logger = Logger::getInstance();
57 63
58 - bool requiresPacketRetransmission() const; 64 + void increaseFlowControlQuota();
  65 +
  66 + bool requiresQoSQueueing() const;
59 void increasePacketId(); 67 void increasePacketId();
60 68
61 Session(const Session &other); 69 Session(const Session &other);
@@ -71,8 +79,8 @@ public: @@ -71,8 +79,8 @@ public:
71 std::shared_ptr<Client> makeSharedClient() const; 79 std::shared_ptr<Client> makeSharedClient() const;
72 void assignActiveConnection(std::shared_ptr<Client> &client); 80 void assignActiveConnection(std::shared_ptr<Client> &client);
73 void writePacket(PublishCopyFactory &copyFactory, const char max_qos, uint64_t &count); 81 void writePacket(PublishCopyFactory &copyFactory, const char max_qos, uint64_t &count);
74 - void clearQosMessage(uint16_t packet_id);  
75 - uint64_t sendPendingQosMessages(); 82 + bool clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds);
  83 + uint64_t sendAllPendingQosData();
76 bool hasActiveClient() const; 84 bool hasActiveClient() const;
77 void clearWill(); 85 void clearWill();
78 std::shared_ptr<WillPublish> &getWill(); 86 std::shared_ptr<WillPublish> &getWill();
@@ -80,14 +88,14 @@ public: @@ -80,14 +88,14 @@ public:
80 88
81 void addIncomingQoS2MessageId(uint16_t packet_id); 89 void addIncomingQoS2MessageId(uint16_t packet_id);
82 bool incomingQoS2MessageIdInTransit(uint16_t packet_id); 90 bool incomingQoS2MessageIdInTransit(uint16_t packet_id);
83 - void removeIncomingQoS2MessageId(u_int16_t packet_id); 91 + bool removeIncomingQoS2MessageId(u_int16_t packet_id);
84 92
85 void addOutgoingQoS2MessageId(uint16_t packet_id); 93 void addOutgoingQoS2MessageId(uint16_t packet_id);
86 void removeOutgoingQoS2MessageId(u_int16_t packet_id); 94 void removeOutgoingQoS2MessageId(u_int16_t packet_id);
87 95
88 bool getDestroyOnDisconnect() const; 96 bool getDestroyOnDisconnect() const;
89 97
90 - void setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version); 98 + void setSessionProperties(uint16_t clientReceiveMax, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version);
91 void setSessionExpiryInterval(uint32_t newVal); 99 void setSessionExpiryInterval(uint32_t newVal);
92 void setQueuedRemovalAt(); 100 void setQueuedRemovalAt();
93 uint32_t getSessionExpiryInterval() const; 101 uint32_t getSessionExpiryInterval() const;
sessionsandsubscriptionsdb.cpp
@@ -161,12 +161,10 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -161,12 +161,10 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
161 const uint32_t compensatedSessionExpiry = persistence_state_age > originalSessionExpiryInterval ? 0 : originalSessionExpiryInterval - persistence_state_age; 161 const uint32_t compensatedSessionExpiry = persistence_state_age > originalSessionExpiryInterval ? 0 : originalSessionExpiryInterval - persistence_state_age;
162 const uint32_t sessionExpiryInterval = std::min<uint32_t>(compensatedSessionExpiry, settings->getExpireSessionAfterSeconds()); 162 const uint32_t sessionExpiryInterval = std::min<uint32_t>(compensatedSessionExpiry, settings->getExpireSessionAfterSeconds());
163 163
164 - const uint16_t maxQosPending = std::min<uint16_t>(readUint16(eofFound), settings->maxQosMsgPendingPerClient);  
165 -  
166 // We will set the session expiry interval as it would have had time continued. If a connection picks up session, it will update 164 // We will set the session expiry interval as it would have had time continued. If a connection picks up session, it will update
167 // it with a more relevant value. 165 // it with a more relevant value.
168 // The protocol version 5 is just dummy, to get the behavior I want. 166 // The protocol version 5 is just dummy, to get the behavior I want.
169 - ses->setSessionProperties(maxQosPending, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5); 167 + ses->setSessionProperties(0xFFFF, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5);
170 168
171 const uint16_t hasWill = readUint16(eofFound); 169 const uint16_t hasWill = readUint16(eofFound);
172 170
@@ -314,7 +312,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -314,7 +312,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
314 writeUint16(ses->nextPacketId); 312 writeUint16(ses->nextPacketId);
315 313
316 writeUint32(ses->getCurrentSessionExpiryInterval()); 314 writeUint32(ses->getCurrentSessionExpiryInterval());
317 - writeUint16(ses->maxQosMsgPending);  
318 315
319 const bool hasWillThatShouldSurviveRestart = ses->getWill().operator bool() && ses->getWill()->will_delay > 0; 316 const bool hasWillThatShouldSurviveRestart = ses->getWill().operator bool() && ses->getWill()->will_delay > 0;
320 writeUint16(static_cast<uint16_t>(hasWillThatShouldSurviveRestart)); 317 writeUint16(static_cast<uint16_t>(hasWillThatShouldSurviveRestart));
subscriptionstore.cpp
@@ -213,7 +213,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt; @@ -213,7 +213,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
213 213
214 if (registrationData) 214 if (registrationData)
215 { 215 {
216 - registerClientAndKickExistingOne(client, registrationData->clean_start, registrationData->maxQosPackets, registrationData->sessionExpiryInterval); 216 + registerClientAndKickExistingOne(client, registrationData->clean_start, registrationData->clientReceiveMax, registrationData->sessionExpiryInterval);
217 client->clearRegistrationData(); 217 client->clearRegistrationData();
218 } 218 }
219 else 219 else
@@ -224,7 +224,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt; @@ -224,7 +224,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
224 } 224 }
225 225
226 // Removes an existing client when it already exists [MQTT-3.1.4-2]. 226 // Removes an existing client when it already exists [MQTT-3.1.4-2].
227 -void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr<Client> &client, bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval) 227 +void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr<Client> &client, bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval)
228 { 228 {
229 RWLockGuard lock_guard(&subscriptionsRwlock); 229 RWLockGuard lock_guard(&subscriptionsRwlock);
230 lock_guard.wrlock(); 230 lock_guard.wrlock();
@@ -261,8 +261,8 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt; @@ -261,8 +261,8 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
261 261
262 session->assignActiveConnection(client); 262 session->assignActiveConnection(client);
263 client->assignSession(session); 263 client->assignSession(session);
264 - session->setSessionProperties(maxQosPackets, sessionExpiryInterval, clean_start, client->getProtocolVersion());  
265 - uint64_t count = session->sendPendingQosMessages(); 264 + session->setSessionProperties(clientReceiveMax, sessionExpiryInterval, clean_start, client->getProtocolVersion());
  265 + uint64_t count = session->sendAllPendingQosData();
266 client->getThreadData()->incrementSentMessageCount(count); 266 client->getThreadData()->incrementSentMessageCount(count);
267 } 267 }
268 268
subscriptionstore.h
@@ -161,7 +161,7 @@ public: @@ -161,7 +161,7 @@ public:
161 void addSubscription(std::shared_ptr<Client> &client, const std::string &topic, const std::vector<std::string> &subtopics, char qos); 161 void addSubscription(std::shared_ptr<Client> &client, const std::string &topic, const std::vector<std::string> &subtopics, char qos);
162 void removeSubscription(std::shared_ptr<Client> &client, const std::string &topic); 162 void removeSubscription(std::shared_ptr<Client> &client, const std::string &topic);
163 void registerClientAndKickExistingOne(std::shared_ptr<Client> &client); 163 void registerClientAndKickExistingOne(std::shared_ptr<Client> &client);
164 - void registerClientAndKickExistingOne(std::shared_ptr<Client> &client, bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval); 164 + void registerClientAndKickExistingOne(std::shared_ptr<Client> &client, bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval);
165 std::shared_ptr<Session> lockSession(const std::string &clientid); 165 std::shared_ptr<Session> lockSession(const std::string &clientid);
166 166
167 void sendQueuedWillMessages(); 167 void sendQueuedWillMessages();
@@ -132,6 +132,7 @@ enum class ReasonCodes @@ -132,6 +132,7 @@ enum class ReasonCodes
132 TopicFilterInvalid = 143, 132 TopicFilterInvalid = 143,
133 TopicNameInvalid = 144, 133 TopicNameInvalid = 144,
134 PacketIdentifierInUse = 145, 134 PacketIdentifierInUse = 145,
  135 + PacketIdentifierNotFound = 146,
135 ReceiveMaximumExceeded = 147, 136 ReceiveMaximumExceeded = 147,
136 TopicAliasInvalid = 148, 137 TopicAliasInvalid = 148,
137 PacketTooLarge = 149, 138 PacketTooLarge = 149,