From 7870dc8ab7805e676fb7fb784ae4b9d8257a37d2 Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Thu, 28 Apr 2022 23:27:41 +0200 Subject: [PATCH] Flow control based on receive maximum --- client.cpp | 8 ++++---- client.h | 6 +++--- configfileparser.cpp | 4 ++-- mqttpacket.cpp | 47 +++++++++++++++++++++++++++++++++++------------ qospacketqueue.cpp | 7 ++++++- qospacketqueue.h | 2 +- session.cpp | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------------------------------- session.h | 22 +++++++++++++++------- sessionsandsubscriptionsdb.cpp | 5 +---- subscriptionstore.cpp | 8 ++++---- subscriptionstore.h | 2 +- types.h | 1 + 12 files changed, 143 insertions(+), 97 deletions(-) diff --git a/client.cpp b/client.cpp index ae35344..3a457ee 100644 --- a/client.cpp +++ b/client.cpp @@ -27,9 +27,9 @@ License along with FlashMQ. If not, see . #include "utils.h" #include "threadglobals.h" -StowedClientRegistrationData::StowedClientRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval) : +StowedClientRegistrationData::StowedClientRegistrationData(bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval) : clean_start(clean_start), - maxQosPackets(maxQosPackets), + clientReceiveMax(clientReceiveMax), sessionExpiryInterval(sessionExpiryInterval) { @@ -448,9 +448,9 @@ void Client::serverInitiatedDisconnect(ReasonCodes reason) * @param maxQosPackets * @param sessionExpiryInterval */ -void Client::setRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval) +void Client::setRegistrationData(bool clean_start, uint16_t client_receive_max, uint32_t sessionExpiryInterval) { - this->registrationData = std::make_unique(clean_start, maxQosPackets, sessionExpiryInterval); + this->registrationData = std::make_unique(clean_start, client_receive_max, sessionExpiryInterval); } const std::unique_ptr &Client::getRegistrationData() const diff --git a/client.h b/client.h index ecde99a..7a2561a 100644 --- a/client.h +++ b/client.h @@ -48,10 +48,10 @@ License along with FlashMQ. If not, see . struct StowedClientRegistrationData { const bool clean_start; - const uint16_t maxQosPackets; + const uint16_t clientReceiveMax; const uint32_t sessionExpiryInterval; - StowedClientRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval); + StowedClientRegistrationData(bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval); }; class Client @@ -171,7 +171,7 @@ public: void sendOrQueueWill(); void serverInitiatedDisconnect(ReasonCodes reason); - void setRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval); + void setRegistrationData(bool clean_start, uint16_t client_receive_max, uint32_t sessionExpiryInterval); const std::unique_ptr &getRegistrationData() const; void clearRegistrationData(); diff --git a/configfileparser.cpp b/configfileparser.cpp index 458f3cd..ca2702d 100644 --- a/configfileparser.cpp +++ b/configfileparser.cpp @@ -445,9 +445,9 @@ void ConfigFileParser::loadFile(bool test) if (key == "max_qos_msg_pending_per_client") { int newVal = std::stoi(value); - if (newVal < 32 || newVal > 65530) + if (newVal < 32 || newVal > 65535) { - throw ConfigFileException(formatString("max_qos_msg_pending_per_client value '%d' is invalid. Valid values between 32 and 65530.", newVal)); + throw ConfigFileException(formatString("max_qos_msg_pending_per_client value '%d' is invalid. Valid values between 32 and 65535.", newVal)); } tmpSettings->maxQosMsgPendingPerClient = newVal; } diff --git a/mqttpacket.cpp b/mqttpacket.cpp index 23b7a91..eda83d1 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -385,7 +385,7 @@ void MqttPacket::handleConnect() uint16_t keep_alive = readTwoBytesToUInt16(); - uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient; + uint16_t client_receive_max = settings.maxQosMsgPendingPerClient; uint32_t session_expire = settings.getExpireSessionAfterSeconds(); uint32_t max_outgoing_packet_size = settings.maxPacketSize; uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases @@ -410,7 +410,7 @@ void MqttPacket::handleConnect() session_expire = std::min(readFourBytesToUint32(), session_expire); break; case Mqtt5Properties::ReceiveMaximum: - max_qos_packets = std::min(readTwoBytesToUInt16(), max_qos_packets); + client_receive_max = std::min(readTwoBytesToUInt16(), client_receive_max); break; case Mqtt5Properties::MaximumPacketSize: max_outgoing_packet_size = std::min(readFourBytesToUint32(), max_outgoing_packet_size); @@ -445,6 +445,11 @@ void MqttPacket::handleConnect() } } + if (client_receive_max == 0 || max_outgoing_packet_size == 0) + { + throw ProtocolError("Receive max or max outgoing packet size can't be 0.", ReasonCodes::ProtocolError); + } + std::string client_id = readBytesToString(); std::string username; @@ -610,7 +615,7 @@ void MqttPacket::handleConnect() { connAck->propertyBuilder = std::make_shared(); connAck->propertyBuilder->writeSessionExpiry(session_expire); - connAck->propertyBuilder->writeReceiveMax(max_qos_packets); + connAck->propertyBuilder->writeReceiveMax(settings.maxQosMsgPendingPerClient); connAck->propertyBuilder->writeRetainAvailable(1); connAck->propertyBuilder->writeMaxPacketSize(sender->getMaxIncomingPacketSize()); if (clientIdGenerated) @@ -629,7 +634,7 @@ void MqttPacket::handleConnect() sender->stageConnack(std::move(connAck)); } - sender->setRegistrationData(clean_start, max_qos_packets, session_expire); + sender->setRegistrationData(clean_start, client_receive_max, session_expire); Authentication &authentication = *ThreadGlobals::getAuth(); AuthResult authResult = AuthResult::login_denied; @@ -1176,7 +1181,7 @@ void MqttPacket::handlePublish() void MqttPacket::handlePubAck() { uint16_t packet_id = readTwoBytesToUInt16(); - sender->getSession()->clearQosMessage(packet_id); + sender->getSession()->clearQosMessage(packet_id, true); } /** @@ -1185,12 +1190,29 @@ void MqttPacket::handlePubAck() void MqttPacket::handlePubRec() { const uint16_t packet_id = readTwoBytesToUInt16(); - sender->getSession()->clearQosMessage(packet_id); - sender->getSession()->addOutgoingQoS2MessageId(packet_id); - PubResponse pubRel(this->protocolVersion, PacketType::PUBREL, ReasonCodes::Success, packet_id); - MqttPacket response(pubRel); - sender->writeMqttPacket(response); + ReasonCodes reasonCode = ReasonCodes::Success; // Default when not specified, or MQTT3 + + if (!atEnd()) + { + reasonCode = static_cast(readByte()); + } + + const bool publishTerminatesHere = reasonCode >= ReasonCodes::UnspecifiedError; + const bool foundAndRemoved = sender->getSession()->clearQosMessage(packet_id, publishTerminatesHere); + + // "If it has sent a PUBREC with a Reason Code of 0x80 or greater, the receiver MUST treat any subsequent PUBLISH packet + // that contains that Packet Identifier as being a new Application Message." + if (!publishTerminatesHere) + { + sender->getSession()->addOutgoingQoS2MessageId(packet_id); + + // MQTT5: "[The sender] MUST send a PUBREL packet when it receives a PUBREC packet from the receiver with a Reason Code value less than 0x80" + const ReasonCodes reason = foundAndRemoved ? ReasonCodes::Success : ReasonCodes::PacketIdentifierNotFound; + PubResponse pubRel(this->protocolVersion, PacketType::PUBREL, reason, packet_id); + MqttPacket response(pubRel); + sender->writeMqttPacket(response); + } } /** @@ -1203,9 +1225,10 @@ void MqttPacket::handlePubRel() throw ProtocolError("PUBREL first byte LSB must be 0010.", ReasonCodes::MalformedPacket); const uint16_t packet_id = readTwoBytesToUInt16(); - sender->getSession()->removeIncomingQoS2MessageId(packet_id); + const bool foundAndRemoved = sender->getSession()->removeIncomingQoS2MessageId(packet_id); + const ReasonCodes reason = foundAndRemoved ? ReasonCodes::Success : ReasonCodes::PacketIdentifierNotFound; - PubResponse pubcomp(this->protocolVersion, PacketType::PUBCOMP, ReasonCodes::Success, packet_id); + PubResponse pubcomp(this->protocolVersion, PacketType::PUBCOMP, reason, packet_id); MqttPacket response(pubcomp); sender->writeMqttPacket(response); } diff --git a/qospacketqueue.cpp b/qospacketqueue.cpp index fffc3f1..7bb2f1c 100644 --- a/qospacketqueue.cpp +++ b/qospacketqueue.cpp @@ -27,8 +27,10 @@ size_t QueuedPublish::getApproximateMemoryFootprint() const } -void QoSPublishQueue::erase(const uint16_t packet_id) +bool QoSPublishQueue::erase(const uint16_t packet_id) { + bool result = false; + auto it = queue.begin(); auto end = queue.end(); while (it != end) @@ -43,12 +45,15 @@ void QoSPublishQueue::erase(const uint16_t packet_id) qosQueueBytes = 0; queue.erase(it); + result = true; break; } it++; } + + return result; } std::list::iterator QoSPublishQueue::erase(std::list::iterator pos) diff --git a/qospacketqueue.h b/qospacketqueue.h index febe263..d577ba9 100644 --- a/qospacketqueue.h +++ b/qospacketqueue.h @@ -30,7 +30,7 @@ class QoSPublishQueue ssize_t qosQueueBytes = 0; public: - void erase(const uint16_t packet_id); + bool erase(const uint16_t packet_id); std::list::iterator erase(std::list::iterator pos); size_t size() const; size_t getByteSize() const; diff --git a/session.cpp b/session.cpp index 853dbb3..cd7d358 100644 --- a/session.cpp +++ b/session.cpp @@ -27,12 +27,20 @@ Session::Session() const Settings &settings = *ThreadGlobals::getSettings(); // Sessions also get defaults from the handleConnect() method, but when you create sessions elsewhere, we do need some sensible defaults. - this->maxQosMsgPending = settings.maxQosMsgPendingPerClient; + this->flowControlQuota = settings.maxQosMsgPendingPerClient; this->sessionExpiryInterval = settings.expireSessionsAfterSeconds; } -bool Session::requiresPacketRetransmission() const +void Session::increaseFlowControlQuota() { + flowControlQuota++; + this->flowControlQuota = std::min(flowControlQuota, flowControlCealing); +} + +bool Session::requiresQoSQueueing() const +{ + return true; + const std::shared_ptr client = makeSharedClient(); if (!client) @@ -49,8 +57,7 @@ bool Session::requiresPacketRetransmission() const void Session::increasePacketId() { nextPacketId++; - if (nextPacketId == 0) - nextPacketId++; + nextPacketId = std::max(nextPacketId, 1); } /** @@ -146,81 +153,65 @@ void Session::writePacket(PublishCopyFactory ©Factory, const char max_qos, u } else if (effectiveQos > 0) { - const bool requiresRetransmission = requiresPacketRetransmission(); + std::unique_lock locker(qosQueueMutex); - if (requiresRetransmission) + if (this->flowControlQuota <= 0 || (qosPacketQueue.getByteSize() >= settings->maxQosBytesPendingPerClient && qosPacketQueue.size() > 0)) { - std::unique_lock locker(qosQueueMutex); - - const size_t totalQosPacketsInTransit = qosPacketQueue.size() + incomingQoS2MessageIds.size() + outgoingQoS2MessageIds.size(); - if (totalQosPacketsInTransit >= maxQosMsgPending - || (qosPacketQueue.getByteSize() >= settings->maxQosBytesPendingPerClient && qosPacketQueue.size() > 0)) + if (QoSLogPrintedAtId != nextPacketId) { - if (QoSLogPrintedAtId != nextPacketId) - { - logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because max in-transit packet count reached.", client_id.c_str()); - QoSLogPrintedAtId = nextPacketId; - } - return; + logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because it hasn't seen enough PUBACK/PUBCOMP/PUBRECs to release places " + "or it exceeded 'max_qos_bytes_pending_per_client'.", client_id.c_str()); + QoSLogPrintedAtId = nextPacketId; } + return; + } - increasePacketId(); + increasePacketId(); + flowControlQuota--; + if (requiresQoSQueueing()) qosPacketQueue.queuePublish(copyFactory, nextPacketId, effectiveQos); - if (c) - { - count += c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, nextPacketId); - } - } - else + if (c) { - // We don't need to make a copy of the packet in this branch, because: - // - The packet to give the client won't shrink in size because source and client have a packet_id. - // - We don't have to store the copy in the session for retransmission, see Session::requiresPacketRetransmission() - // So, we just keep altering the original published packet. - - std::unique_lock locker(qosQueueMutex); - - if (qosInFlightCounter >= 65530) // Includes a small safety margin. - { - if (QoSLogPrintedAtId != nextPacketId) - { - logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because it hasn't seen enough PUBACKs to release places.", client_id.c_str()); - QoSLogPrintedAtId = nextPacketId; - } - return; - } - - increasePacketId(); - - qosInFlightCounter++; - assert(c); // with requiresRetransmission==false, there must be a client. count += c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, nextPacketId); } } } } -void Session::clearQosMessage(uint16_t packet_id) +/** + * @brief Session::clearQosMessage clears a QOS message from the queue. Note that in QoS 2, that doesn't complete the handshake. + * @param packet_id + * @param qosHandshakeEnds can be set to true when you know the QoS handshake ends, (like) when PUBREC contains an error. + * @return whether the packet_id in question was found. + */ +bool Session::clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds) { #ifndef NDEBUG logger->logf(LOG_DEBUG, "Clearing QoS message for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, qosPacketQueue.size()); #endif + bool result = false; + std::lock_guard locker(qosQueueMutex); - if (requiresPacketRetransmission()) - qosPacketQueue.erase(packet_id); + if (requiresQoSQueueing()) + result = qosPacketQueue.erase(packet_id); else { - qosInFlightCounter--; - qosInFlightCounter = std::max(0, qosInFlightCounter); // Should never happen, but in case we receive too many PUBACKs. + result = true; } -} + if (qosHandshakeEnds) + { + increaseFlowControlQuota(); + } + + return result; +} /** - * @brief Session::sendPendingQosMessages sends pending publishes and QoS2 control packets. + * @brief Session::sendAllPendingQosData sends pending publishes and QoS2 control packets. * @return the amount of messages/packets published. * * [MQTT-4.4.0-1] (about MQTT 3.1.1): "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST @@ -234,7 +225,7 @@ void Session::clearQosMessage(uint16_t packet_id) * never know that, because IT will have received the PUBACK from FlashMQ. The QoS system is not between publisher * and subscriber. Users are required to implement something themselves. */ -uint64_t Session::sendPendingQosMessages() +uint64_t Session::sendAllPendingQosData() { uint64_t count = 0; @@ -254,12 +245,23 @@ uint64_t Session::sendPendingQosMessages() pos = qosPacketQueue.erase(pos); continue; } - pos++; + + if (flowControlQuota <= 0) + { + logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because it exceeds its receive maximum.", client_id.c_str()); + pos = qosPacketQueue.erase(pos); + continue; + } + + flowControlQuota--; MqttPacket p(c->getProtocolVersion(), pub); - p.setDuplicate(); + p.setPacketId(queuedPublish.getPacketId()); + //p.setDuplicate(); // TODO: this is wrong. Until we have a retransmission system, no packets can have the DUP bit set. count += c->writeMqttPacketAndBlameThisClient(p); + + pos++; } for (const uint16_t packet_id : outgoingQoS2MessageIds) @@ -310,7 +312,7 @@ bool Session::incomingQoS2MessageIdInTransit(uint16_t packet_id) return it != incomingQoS2MessageIds.end(); } -void Session::removeIncomingQoS2MessageId(u_int16_t packet_id) +bool Session::removeIncomingQoS2MessageId(u_int16_t packet_id) { assert(packet_id > 0); @@ -320,9 +322,16 @@ void Session::removeIncomingQoS2MessageId(u_int16_t packet_id) logger->logf(LOG_DEBUG, "As QoS 2 receiver: publish released (PUBREL) for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, incomingQoS2MessageIds.size()); #endif + bool result = false; + const auto it = incomingQoS2MessageIds.find(packet_id); if (it != incomingQoS2MessageIds.end()) + { incomingQoS2MessageIds.erase(it); + result = true; + } + + return result; } void Session::addOutgoingQoS2MessageId(uint16_t packet_id) @@ -342,6 +351,8 @@ void Session::removeOutgoingQoS2MessageId(u_int16_t packet_id) const auto it = outgoingQoS2MessageIds.find(packet_id); if (it != outgoingQoS2MessageIds.end()) outgoingQoS2MessageIds.erase(it); + + increaseFlowControlQuota(); } /** @@ -355,9 +366,10 @@ bool Session::getDestroyOnDisconnect() const return destroyOnDisconnect; } -void Session::setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version) +void Session::setSessionProperties(uint16_t clientReceiveMax, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version) { - this->maxQosMsgPending = maxQosPackets; + this->flowControlQuota = clientReceiveMax; + this->flowControlCealing = clientReceiveMax; this->sessionExpiryInterval = sessionExpiryInterval; if (protocol_version <= ProtocolVersion::Mqtt311 && clean_start) diff --git a/session.h b/session.h index f21ef41..d4bd962 100644 --- a/session.h +++ b/session.h @@ -45,9 +45,15 @@ class Session std::set outgoingQoS2MessageIds; std::mutex qosQueueMutex; uint16_t nextPacketId = 0; - uint16_t qosInFlightCounter = 0; + + /** + * Even though flow control data is not part of the session state, I'm keeping it here because there are already + * mutexes that they can be placed under, saving additional synchronization. + */ + int flowControlCealing = 0xFFFF; + int flowControlQuota = 0xFFFF; + uint32_t sessionExpiryInterval = 0; - uint16_t maxQosMsgPending; uint16_t QoSLogPrintedAtId = 0; bool destroyOnDisconnect = false; std::shared_ptr willPublish; @@ -55,7 +61,9 @@ class Session std::chrono::time_point removalQueuedAt; Logger *logger = Logger::getInstance(); - bool requiresPacketRetransmission() const; + void increaseFlowControlQuota(); + + bool requiresQoSQueueing() const; void increasePacketId(); Session(const Session &other); @@ -71,8 +79,8 @@ public: std::shared_ptr makeSharedClient() const; void assignActiveConnection(std::shared_ptr &client); void writePacket(PublishCopyFactory ©Factory, const char max_qos, uint64_t &count); - void clearQosMessage(uint16_t packet_id); - uint64_t sendPendingQosMessages(); + bool clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds); + uint64_t sendAllPendingQosData(); bool hasActiveClient() const; void clearWill(); std::shared_ptr &getWill(); @@ -80,14 +88,14 @@ public: void addIncomingQoS2MessageId(uint16_t packet_id); bool incomingQoS2MessageIdInTransit(uint16_t packet_id); - void removeIncomingQoS2MessageId(u_int16_t packet_id); + bool removeIncomingQoS2MessageId(u_int16_t packet_id); void addOutgoingQoS2MessageId(uint16_t packet_id); void removeOutgoingQoS2MessageId(u_int16_t packet_id); bool getDestroyOnDisconnect() const; - void setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version); + void setSessionProperties(uint16_t clientReceiveMax, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version); void setSessionExpiryInterval(uint32_t newVal); void setQueuedRemovalAt(); uint32_t getSessionExpiryInterval() const; diff --git a/sessionsandsubscriptionsdb.cpp b/sessionsandsubscriptionsdb.cpp index 823613e..87f001f 100644 --- a/sessionsandsubscriptionsdb.cpp +++ b/sessionsandsubscriptionsdb.cpp @@ -161,12 +161,10 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() const uint32_t compensatedSessionExpiry = persistence_state_age > originalSessionExpiryInterval ? 0 : originalSessionExpiryInterval - persistence_state_age; const uint32_t sessionExpiryInterval = std::min(compensatedSessionExpiry, settings->getExpireSessionAfterSeconds()); - const uint16_t maxQosPending = std::min(readUint16(eofFound), settings->maxQosMsgPendingPerClient); - // We will set the session expiry interval as it would have had time continued. If a connection picks up session, it will update // it with a more relevant value. // The protocol version 5 is just dummy, to get the behavior I want. - ses->setSessionProperties(maxQosPending, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5); + ses->setSessionProperties(0xFFFF, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5); const uint16_t hasWill = readUint16(eofFound); @@ -314,7 +312,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vectornextPacketId); writeUint32(ses->getCurrentSessionExpiryInterval()); - writeUint16(ses->maxQosMsgPending); const bool hasWillThatShouldSurviveRestart = ses->getWill().operator bool() && ses->getWill()->will_delay > 0; writeUint16(static_cast(hasWillThatShouldSurviveRestart)); diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 96d70be..5c0e6dd 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -213,7 +213,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr if (registrationData) { - registerClientAndKickExistingOne(client, registrationData->clean_start, registrationData->maxQosPackets, registrationData->sessionExpiryInterval); + registerClientAndKickExistingOne(client, registrationData->clean_start, registrationData->clientReceiveMax, registrationData->sessionExpiryInterval); client->clearRegistrationData(); } else @@ -224,7 +224,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr } // Removes an existing client when it already exists [MQTT-3.1.4-2]. -void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr &client, bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval) +void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr &client, bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval) { RWLockGuard lock_guard(&subscriptionsRwlock); lock_guard.wrlock(); @@ -261,8 +261,8 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr session->assignActiveConnection(client); client->assignSession(session); - session->setSessionProperties(maxQosPackets, sessionExpiryInterval, clean_start, client->getProtocolVersion()); - uint64_t count = session->sendPendingQosMessages(); + session->setSessionProperties(clientReceiveMax, sessionExpiryInterval, clean_start, client->getProtocolVersion()); + uint64_t count = session->sendAllPendingQosData(); client->getThreadData()->incrementSentMessageCount(count); } diff --git a/subscriptionstore.h b/subscriptionstore.h index db46f61..daf3e35 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -161,7 +161,7 @@ public: void addSubscription(std::shared_ptr &client, const std::string &topic, const std::vector &subtopics, char qos); void removeSubscription(std::shared_ptr &client, const std::string &topic); void registerClientAndKickExistingOne(std::shared_ptr &client); - void registerClientAndKickExistingOne(std::shared_ptr &client, bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval); + void registerClientAndKickExistingOne(std::shared_ptr &client, bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval); std::shared_ptr lockSession(const std::string &clientid); void sendQueuedWillMessages(); diff --git a/types.h b/types.h index 8678be8..193dfc4 100644 --- a/types.h +++ b/types.h @@ -132,6 +132,7 @@ enum class ReasonCodes TopicFilterInvalid = 143, TopicNameInvalid = 144, PacketIdentifierInUse = 145, + PacketIdentifierNotFound = 146, ReceiveMaximumExceeded = 147, TopicAliasInvalid = 148, PacketTooLarge = 149, -- libgit2 0.21.4