diff --git a/client.cpp b/client.cpp
index ae35344..3a457ee 100644
--- a/client.cpp
+++ b/client.cpp
@@ -27,9 +27,9 @@ License along with FlashMQ. If not, see .
#include "utils.h"
#include "threadglobals.h"
-StowedClientRegistrationData::StowedClientRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval) :
+StowedClientRegistrationData::StowedClientRegistrationData(bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval) :
clean_start(clean_start),
- maxQosPackets(maxQosPackets),
+ clientReceiveMax(clientReceiveMax),
sessionExpiryInterval(sessionExpiryInterval)
{
@@ -448,9 +448,9 @@ void Client::serverInitiatedDisconnect(ReasonCodes reason)
* @param maxQosPackets
* @param sessionExpiryInterval
*/
-void Client::setRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval)
+void Client::setRegistrationData(bool clean_start, uint16_t client_receive_max, uint32_t sessionExpiryInterval)
{
- this->registrationData = std::make_unique(clean_start, maxQosPackets, sessionExpiryInterval);
+ this->registrationData = std::make_unique(clean_start, client_receive_max, sessionExpiryInterval);
}
const std::unique_ptr &Client::getRegistrationData() const
diff --git a/client.h b/client.h
index ecde99a..7a2561a 100644
--- a/client.h
+++ b/client.h
@@ -48,10 +48,10 @@ License along with FlashMQ. If not, see .
struct StowedClientRegistrationData
{
const bool clean_start;
- const uint16_t maxQosPackets;
+ const uint16_t clientReceiveMax;
const uint32_t sessionExpiryInterval;
- StowedClientRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval);
+ StowedClientRegistrationData(bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval);
};
class Client
@@ -171,7 +171,7 @@ public:
void sendOrQueueWill();
void serverInitiatedDisconnect(ReasonCodes reason);
- void setRegistrationData(bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval);
+ void setRegistrationData(bool clean_start, uint16_t client_receive_max, uint32_t sessionExpiryInterval);
const std::unique_ptr &getRegistrationData() const;
void clearRegistrationData();
diff --git a/configfileparser.cpp b/configfileparser.cpp
index 458f3cd..ca2702d 100644
--- a/configfileparser.cpp
+++ b/configfileparser.cpp
@@ -445,9 +445,9 @@ void ConfigFileParser::loadFile(bool test)
if (key == "max_qos_msg_pending_per_client")
{
int newVal = std::stoi(value);
- if (newVal < 32 || newVal > 65530)
+ if (newVal < 32 || newVal > 65535)
{
- throw ConfigFileException(formatString("max_qos_msg_pending_per_client value '%d' is invalid. Valid values between 32 and 65530.", newVal));
+ throw ConfigFileException(formatString("max_qos_msg_pending_per_client value '%d' is invalid. Valid values between 32 and 65535.", newVal));
}
tmpSettings->maxQosMsgPendingPerClient = newVal;
}
diff --git a/mqttpacket.cpp b/mqttpacket.cpp
index 23b7a91..eda83d1 100644
--- a/mqttpacket.cpp
+++ b/mqttpacket.cpp
@@ -385,7 +385,7 @@ void MqttPacket::handleConnect()
uint16_t keep_alive = readTwoBytesToUInt16();
- uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient;
+ uint16_t client_receive_max = settings.maxQosMsgPendingPerClient;
uint32_t session_expire = settings.getExpireSessionAfterSeconds();
uint32_t max_outgoing_packet_size = settings.maxPacketSize;
uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases
@@ -410,7 +410,7 @@ void MqttPacket::handleConnect()
session_expire = std::min(readFourBytesToUint32(), session_expire);
break;
case Mqtt5Properties::ReceiveMaximum:
- max_qos_packets = std::min(readTwoBytesToUInt16(), max_qos_packets);
+ client_receive_max = std::min(readTwoBytesToUInt16(), client_receive_max);
break;
case Mqtt5Properties::MaximumPacketSize:
max_outgoing_packet_size = std::min(readFourBytesToUint32(), max_outgoing_packet_size);
@@ -445,6 +445,11 @@ void MqttPacket::handleConnect()
}
}
+ if (client_receive_max == 0 || max_outgoing_packet_size == 0)
+ {
+ throw ProtocolError("Receive max or max outgoing packet size can't be 0.", ReasonCodes::ProtocolError);
+ }
+
std::string client_id = readBytesToString();
std::string username;
@@ -610,7 +615,7 @@ void MqttPacket::handleConnect()
{
connAck->propertyBuilder = std::make_shared();
connAck->propertyBuilder->writeSessionExpiry(session_expire);
- connAck->propertyBuilder->writeReceiveMax(max_qos_packets);
+ connAck->propertyBuilder->writeReceiveMax(settings.maxQosMsgPendingPerClient);
connAck->propertyBuilder->writeRetainAvailable(1);
connAck->propertyBuilder->writeMaxPacketSize(sender->getMaxIncomingPacketSize());
if (clientIdGenerated)
@@ -629,7 +634,7 @@ void MqttPacket::handleConnect()
sender->stageConnack(std::move(connAck));
}
- sender->setRegistrationData(clean_start, max_qos_packets, session_expire);
+ sender->setRegistrationData(clean_start, client_receive_max, session_expire);
Authentication &authentication = *ThreadGlobals::getAuth();
AuthResult authResult = AuthResult::login_denied;
@@ -1176,7 +1181,7 @@ void MqttPacket::handlePublish()
void MqttPacket::handlePubAck()
{
uint16_t packet_id = readTwoBytesToUInt16();
- sender->getSession()->clearQosMessage(packet_id);
+ sender->getSession()->clearQosMessage(packet_id, true);
}
/**
@@ -1185,12 +1190,29 @@ void MqttPacket::handlePubAck()
void MqttPacket::handlePubRec()
{
const uint16_t packet_id = readTwoBytesToUInt16();
- sender->getSession()->clearQosMessage(packet_id);
- sender->getSession()->addOutgoingQoS2MessageId(packet_id);
- PubResponse pubRel(this->protocolVersion, PacketType::PUBREL, ReasonCodes::Success, packet_id);
- MqttPacket response(pubRel);
- sender->writeMqttPacket(response);
+ ReasonCodes reasonCode = ReasonCodes::Success; // Default when not specified, or MQTT3
+
+ if (!atEnd())
+ {
+ reasonCode = static_cast(readByte());
+ }
+
+ const bool publishTerminatesHere = reasonCode >= ReasonCodes::UnspecifiedError;
+ const bool foundAndRemoved = sender->getSession()->clearQosMessage(packet_id, publishTerminatesHere);
+
+ // "If it has sent a PUBREC with a Reason Code of 0x80 or greater, the receiver MUST treat any subsequent PUBLISH packet
+ // that contains that Packet Identifier as being a new Application Message."
+ if (!publishTerminatesHere)
+ {
+ sender->getSession()->addOutgoingQoS2MessageId(packet_id);
+
+ // MQTT5: "[The sender] MUST send a PUBREL packet when it receives a PUBREC packet from the receiver with a Reason Code value less than 0x80"
+ const ReasonCodes reason = foundAndRemoved ? ReasonCodes::Success : ReasonCodes::PacketIdentifierNotFound;
+ PubResponse pubRel(this->protocolVersion, PacketType::PUBREL, reason, packet_id);
+ MqttPacket response(pubRel);
+ sender->writeMqttPacket(response);
+ }
}
/**
@@ -1203,9 +1225,10 @@ void MqttPacket::handlePubRel()
throw ProtocolError("PUBREL first byte LSB must be 0010.", ReasonCodes::MalformedPacket);
const uint16_t packet_id = readTwoBytesToUInt16();
- sender->getSession()->removeIncomingQoS2MessageId(packet_id);
+ const bool foundAndRemoved = sender->getSession()->removeIncomingQoS2MessageId(packet_id);
+ const ReasonCodes reason = foundAndRemoved ? ReasonCodes::Success : ReasonCodes::PacketIdentifierNotFound;
- PubResponse pubcomp(this->protocolVersion, PacketType::PUBCOMP, ReasonCodes::Success, packet_id);
+ PubResponse pubcomp(this->protocolVersion, PacketType::PUBCOMP, reason, packet_id);
MqttPacket response(pubcomp);
sender->writeMqttPacket(response);
}
diff --git a/qospacketqueue.cpp b/qospacketqueue.cpp
index fffc3f1..7bb2f1c 100644
--- a/qospacketqueue.cpp
+++ b/qospacketqueue.cpp
@@ -27,8 +27,10 @@ size_t QueuedPublish::getApproximateMemoryFootprint() const
}
-void QoSPublishQueue::erase(const uint16_t packet_id)
+bool QoSPublishQueue::erase(const uint16_t packet_id)
{
+ bool result = false;
+
auto it = queue.begin();
auto end = queue.end();
while (it != end)
@@ -43,12 +45,15 @@ void QoSPublishQueue::erase(const uint16_t packet_id)
qosQueueBytes = 0;
queue.erase(it);
+ result = true;
break;
}
it++;
}
+
+ return result;
}
std::list::iterator QoSPublishQueue::erase(std::list::iterator pos)
diff --git a/qospacketqueue.h b/qospacketqueue.h
index febe263..d577ba9 100644
--- a/qospacketqueue.h
+++ b/qospacketqueue.h
@@ -30,7 +30,7 @@ class QoSPublishQueue
ssize_t qosQueueBytes = 0;
public:
- void erase(const uint16_t packet_id);
+ bool erase(const uint16_t packet_id);
std::list::iterator erase(std::list::iterator pos);
size_t size() const;
size_t getByteSize() const;
diff --git a/session.cpp b/session.cpp
index 853dbb3..cd7d358 100644
--- a/session.cpp
+++ b/session.cpp
@@ -27,12 +27,20 @@ Session::Session()
const Settings &settings = *ThreadGlobals::getSettings();
// Sessions also get defaults from the handleConnect() method, but when you create sessions elsewhere, we do need some sensible defaults.
- this->maxQosMsgPending = settings.maxQosMsgPendingPerClient;
+ this->flowControlQuota = settings.maxQosMsgPendingPerClient;
this->sessionExpiryInterval = settings.expireSessionsAfterSeconds;
}
-bool Session::requiresPacketRetransmission() const
+void Session::increaseFlowControlQuota()
{
+ flowControlQuota++;
+ this->flowControlQuota = std::min(flowControlQuota, flowControlCealing);
+}
+
+bool Session::requiresQoSQueueing() const
+{
+ return true;
+
const std::shared_ptr client = makeSharedClient();
if (!client)
@@ -49,8 +57,7 @@ bool Session::requiresPacketRetransmission() const
void Session::increasePacketId()
{
nextPacketId++;
- if (nextPacketId == 0)
- nextPacketId++;
+ nextPacketId = std::max(nextPacketId, 1);
}
/**
@@ -146,81 +153,65 @@ void Session::writePacket(PublishCopyFactory ©Factory, const char max_qos, u
}
else if (effectiveQos > 0)
{
- const bool requiresRetransmission = requiresPacketRetransmission();
+ std::unique_lock locker(qosQueueMutex);
- if (requiresRetransmission)
+ if (this->flowControlQuota <= 0 || (qosPacketQueue.getByteSize() >= settings->maxQosBytesPendingPerClient && qosPacketQueue.size() > 0))
{
- std::unique_lock locker(qosQueueMutex);
-
- const size_t totalQosPacketsInTransit = qosPacketQueue.size() + incomingQoS2MessageIds.size() + outgoingQoS2MessageIds.size();
- if (totalQosPacketsInTransit >= maxQosMsgPending
- || (qosPacketQueue.getByteSize() >= settings->maxQosBytesPendingPerClient && qosPacketQueue.size() > 0))
+ if (QoSLogPrintedAtId != nextPacketId)
{
- if (QoSLogPrintedAtId != nextPacketId)
- {
- logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because max in-transit packet count reached.", client_id.c_str());
- QoSLogPrintedAtId = nextPacketId;
- }
- return;
+ logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because it hasn't seen enough PUBACK/PUBCOMP/PUBRECs to release places "
+ "or it exceeded 'max_qos_bytes_pending_per_client'.", client_id.c_str());
+ QoSLogPrintedAtId = nextPacketId;
}
+ return;
+ }
- increasePacketId();
+ increasePacketId();
+ flowControlQuota--;
+ if (requiresQoSQueueing())
qosPacketQueue.queuePublish(copyFactory, nextPacketId, effectiveQos);
- if (c)
- {
- count += c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, nextPacketId);
- }
- }
- else
+ if (c)
{
- // We don't need to make a copy of the packet in this branch, because:
- // - The packet to give the client won't shrink in size because source and client have a packet_id.
- // - We don't have to store the copy in the session for retransmission, see Session::requiresPacketRetransmission()
- // So, we just keep altering the original published packet.
-
- std::unique_lock locker(qosQueueMutex);
-
- if (qosInFlightCounter >= 65530) // Includes a small safety margin.
- {
- if (QoSLogPrintedAtId != nextPacketId)
- {
- logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because it hasn't seen enough PUBACKs to release places.", client_id.c_str());
- QoSLogPrintedAtId = nextPacketId;
- }
- return;
- }
-
- increasePacketId();
-
- qosInFlightCounter++;
- assert(c); // with requiresRetransmission==false, there must be a client.
count += c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, nextPacketId);
}
}
}
}
-void Session::clearQosMessage(uint16_t packet_id)
+/**
+ * @brief Session::clearQosMessage clears a QOS message from the queue. Note that in QoS 2, that doesn't complete the handshake.
+ * @param packet_id
+ * @param qosHandshakeEnds can be set to true when you know the QoS handshake ends, (like) when PUBREC contains an error.
+ * @return whether the packet_id in question was found.
+ */
+bool Session::clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds)
{
#ifndef NDEBUG
logger->logf(LOG_DEBUG, "Clearing QoS message for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, qosPacketQueue.size());
#endif
+ bool result = false;
+
std::lock_guard locker(qosQueueMutex);
- if (requiresPacketRetransmission())
- qosPacketQueue.erase(packet_id);
+ if (requiresQoSQueueing())
+ result = qosPacketQueue.erase(packet_id);
else
{
- qosInFlightCounter--;
- qosInFlightCounter = std::max(0, qosInFlightCounter); // Should never happen, but in case we receive too many PUBACKs.
+ result = true;
}
-}
+ if (qosHandshakeEnds)
+ {
+ increaseFlowControlQuota();
+ }
+
+ return result;
+}
/**
- * @brief Session::sendPendingQosMessages sends pending publishes and QoS2 control packets.
+ * @brief Session::sendAllPendingQosData sends pending publishes and QoS2 control packets.
* @return the amount of messages/packets published.
*
* [MQTT-4.4.0-1] (about MQTT 3.1.1): "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST
@@ -234,7 +225,7 @@ void Session::clearQosMessage(uint16_t packet_id)
* never know that, because IT will have received the PUBACK from FlashMQ. The QoS system is not between publisher
* and subscriber. Users are required to implement something themselves.
*/
-uint64_t Session::sendPendingQosMessages()
+uint64_t Session::sendAllPendingQosData()
{
uint64_t count = 0;
@@ -254,12 +245,23 @@ uint64_t Session::sendPendingQosMessages()
pos = qosPacketQueue.erase(pos);
continue;
}
- pos++;
+
+ if (flowControlQuota <= 0)
+ {
+ logger->logf(LOG_WARNING, "Dropping QoS message(s) for client '%s', because it exceeds its receive maximum.", client_id.c_str());
+ pos = qosPacketQueue.erase(pos);
+ continue;
+ }
+
+ flowControlQuota--;
MqttPacket p(c->getProtocolVersion(), pub);
- p.setDuplicate();
+ p.setPacketId(queuedPublish.getPacketId());
+ //p.setDuplicate(); // TODO: this is wrong. Until we have a retransmission system, no packets can have the DUP bit set.
count += c->writeMqttPacketAndBlameThisClient(p);
+
+ pos++;
}
for (const uint16_t packet_id : outgoingQoS2MessageIds)
@@ -310,7 +312,7 @@ bool Session::incomingQoS2MessageIdInTransit(uint16_t packet_id)
return it != incomingQoS2MessageIds.end();
}
-void Session::removeIncomingQoS2MessageId(u_int16_t packet_id)
+bool Session::removeIncomingQoS2MessageId(u_int16_t packet_id)
{
assert(packet_id > 0);
@@ -320,9 +322,16 @@ void Session::removeIncomingQoS2MessageId(u_int16_t packet_id)
logger->logf(LOG_DEBUG, "As QoS 2 receiver: publish released (PUBREL) for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, incomingQoS2MessageIds.size());
#endif
+ bool result = false;
+
const auto it = incomingQoS2MessageIds.find(packet_id);
if (it != incomingQoS2MessageIds.end())
+ {
incomingQoS2MessageIds.erase(it);
+ result = true;
+ }
+
+ return result;
}
void Session::addOutgoingQoS2MessageId(uint16_t packet_id)
@@ -342,6 +351,8 @@ void Session::removeOutgoingQoS2MessageId(u_int16_t packet_id)
const auto it = outgoingQoS2MessageIds.find(packet_id);
if (it != outgoingQoS2MessageIds.end())
outgoingQoS2MessageIds.erase(it);
+
+ increaseFlowControlQuota();
}
/**
@@ -355,9 +366,10 @@ bool Session::getDestroyOnDisconnect() const
return destroyOnDisconnect;
}
-void Session::setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version)
+void Session::setSessionProperties(uint16_t clientReceiveMax, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version)
{
- this->maxQosMsgPending = maxQosPackets;
+ this->flowControlQuota = clientReceiveMax;
+ this->flowControlCealing = clientReceiveMax;
this->sessionExpiryInterval = sessionExpiryInterval;
if (protocol_version <= ProtocolVersion::Mqtt311 && clean_start)
diff --git a/session.h b/session.h
index f21ef41..d4bd962 100644
--- a/session.h
+++ b/session.h
@@ -45,9 +45,15 @@ class Session
std::set outgoingQoS2MessageIds;
std::mutex qosQueueMutex;
uint16_t nextPacketId = 0;
- uint16_t qosInFlightCounter = 0;
+
+ /**
+ * Even though flow control data is not part of the session state, I'm keeping it here because there are already
+ * mutexes that they can be placed under, saving additional synchronization.
+ */
+ int flowControlCealing = 0xFFFF;
+ int flowControlQuota = 0xFFFF;
+
uint32_t sessionExpiryInterval = 0;
- uint16_t maxQosMsgPending;
uint16_t QoSLogPrintedAtId = 0;
bool destroyOnDisconnect = false;
std::shared_ptr willPublish;
@@ -55,7 +61,9 @@ class Session
std::chrono::time_point removalQueuedAt;
Logger *logger = Logger::getInstance();
- bool requiresPacketRetransmission() const;
+ void increaseFlowControlQuota();
+
+ bool requiresQoSQueueing() const;
void increasePacketId();
Session(const Session &other);
@@ -71,8 +79,8 @@ public:
std::shared_ptr makeSharedClient() const;
void assignActiveConnection(std::shared_ptr &client);
void writePacket(PublishCopyFactory ©Factory, const char max_qos, uint64_t &count);
- void clearQosMessage(uint16_t packet_id);
- uint64_t sendPendingQosMessages();
+ bool clearQosMessage(uint16_t packet_id, bool qosHandshakeEnds);
+ uint64_t sendAllPendingQosData();
bool hasActiveClient() const;
void clearWill();
std::shared_ptr &getWill();
@@ -80,14 +88,14 @@ public:
void addIncomingQoS2MessageId(uint16_t packet_id);
bool incomingQoS2MessageIdInTransit(uint16_t packet_id);
- void removeIncomingQoS2MessageId(u_int16_t packet_id);
+ bool removeIncomingQoS2MessageId(u_int16_t packet_id);
void addOutgoingQoS2MessageId(uint16_t packet_id);
void removeOutgoingQoS2MessageId(u_int16_t packet_id);
bool getDestroyOnDisconnect() const;
- void setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version);
+ void setSessionProperties(uint16_t clientReceiveMax, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version);
void setSessionExpiryInterval(uint32_t newVal);
void setQueuedRemovalAt();
uint32_t getSessionExpiryInterval() const;
diff --git a/sessionsandsubscriptionsdb.cpp b/sessionsandsubscriptionsdb.cpp
index 823613e..87f001f 100644
--- a/sessionsandsubscriptionsdb.cpp
+++ b/sessionsandsubscriptionsdb.cpp
@@ -161,12 +161,10 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
const uint32_t compensatedSessionExpiry = persistence_state_age > originalSessionExpiryInterval ? 0 : originalSessionExpiryInterval - persistence_state_age;
const uint32_t sessionExpiryInterval = std::min(compensatedSessionExpiry, settings->getExpireSessionAfterSeconds());
- const uint16_t maxQosPending = std::min(readUint16(eofFound), settings->maxQosMsgPendingPerClient);
-
// We will set the session expiry interval as it would have had time continued. If a connection picks up session, it will update
// it with a more relevant value.
// The protocol version 5 is just dummy, to get the behavior I want.
- ses->setSessionProperties(maxQosPending, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5);
+ ses->setSessionProperties(0xFFFF, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5);
const uint16_t hasWill = readUint16(eofFound);
@@ -314,7 +312,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vectornextPacketId);
writeUint32(ses->getCurrentSessionExpiryInterval());
- writeUint16(ses->maxQosMsgPending);
const bool hasWillThatShouldSurviveRestart = ses->getWill().operator bool() && ses->getWill()->will_delay > 0;
writeUint16(static_cast(hasWillThatShouldSurviveRestart));
diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp
index 96d70be..5c0e6dd 100644
--- a/subscriptionstore.cpp
+++ b/subscriptionstore.cpp
@@ -213,7 +213,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr
if (registrationData)
{
- registerClientAndKickExistingOne(client, registrationData->clean_start, registrationData->maxQosPackets, registrationData->sessionExpiryInterval);
+ registerClientAndKickExistingOne(client, registrationData->clean_start, registrationData->clientReceiveMax, registrationData->sessionExpiryInterval);
client->clearRegistrationData();
}
else
@@ -224,7 +224,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr
}
// Removes an existing client when it already exists [MQTT-3.1.4-2].
-void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr &client, bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval)
+void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr &client, bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval)
{
RWLockGuard lock_guard(&subscriptionsRwlock);
lock_guard.wrlock();
@@ -261,8 +261,8 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr
session->assignActiveConnection(client);
client->assignSession(session);
- session->setSessionProperties(maxQosPackets, sessionExpiryInterval, clean_start, client->getProtocolVersion());
- uint64_t count = session->sendPendingQosMessages();
+ session->setSessionProperties(clientReceiveMax, sessionExpiryInterval, clean_start, client->getProtocolVersion());
+ uint64_t count = session->sendAllPendingQosData();
client->getThreadData()->incrementSentMessageCount(count);
}
diff --git a/subscriptionstore.h b/subscriptionstore.h
index db46f61..daf3e35 100644
--- a/subscriptionstore.h
+++ b/subscriptionstore.h
@@ -161,7 +161,7 @@ public:
void addSubscription(std::shared_ptr &client, const std::string &topic, const std::vector &subtopics, char qos);
void removeSubscription(std::shared_ptr &client, const std::string &topic);
void registerClientAndKickExistingOne(std::shared_ptr &client);
- void registerClientAndKickExistingOne(std::shared_ptr &client, bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval);
+ void registerClientAndKickExistingOne(std::shared_ptr &client, bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval);
std::shared_ptr lockSession(const std::string &clientid);
void sendQueuedWillMessages();
diff --git a/types.h b/types.h
index 8678be8..193dfc4 100644
--- a/types.h
+++ b/types.h
@@ -132,6 +132,7 @@ enum class ReasonCodes
TopicFilterInvalid = 143,
TopicNameInvalid = 144,
PacketIdentifierInUse = 145,
+ PacketIdentifierNotFound = 146,
ReceiveMaximumExceeded = 147,
TopicAliasInvalid = 148,
PacketTooLarge = 149,