diff --git a/session.cpp b/session.cpp index 702794c..00a2a76 100644 --- a/session.cpp +++ b/session.cpp @@ -47,7 +47,10 @@ void Session::writePacket(const MqttPacket &packet, char qos_arg) } const uint16_t pid = nextPacketId++; copyPacket->setPacketId(pid); - qosPacketQueue[pid] = copyPacket; + QueuedQosPacket p; + p.packet = copyPacket; + p.id = pid; + qosPacketQueue.push_back(p); qosQueueBytes += copyPacket->getTotalMemoryFootprint(); locker.unlock(); @@ -60,18 +63,31 @@ void Session::writePacket(const MqttPacket &packet, char qos_arg) } } +// Normatively, this while loop will break on the first element, because all messages are sent out in order and +// should be acked in order. void Session::clearQosMessage(uint16_t packet_id) { std::lock_guard locker(qosQueueMutex); - auto it = qosPacketQueue.find(packet_id); - if (it != qosPacketQueue.end()) + + auto it = qosPacketQueue.begin(); + auto end = qosPacketQueue.end(); + while (it != end) { - std::shared_ptr packet = it->second; - qosPacketQueue.erase(it); - qosQueueBytes -= packet->getTotalMemoryFootprint(); - assert(qosQueueBytes >= 0); - if (qosQueueBytes < 0) // Should not happen, but correcting a hypothetical bug is fine for this purpose. - qosQueueBytes = 0; + QueuedQosPacket &p = *it; + if (p.id == packet_id) + { + size_t mem = p.packet->getTotalMemoryFootprint(); + qosQueueBytes -= mem; + assert(qosQueueBytes >= 0); + if (qosQueueBytes < 0) // Should not happen, but correcting a hypothetical bug is fine for this purpose. + qosQueueBytes = 0; + + qosPacketQueue.erase(it); + + break; + } + + it++; } } @@ -87,10 +103,10 @@ void Session::sendPendingQosMessages() { Client_p c = makeSharedClient(); std::lock_guard locker(qosQueueMutex); - for (auto &qosMessage : qosPacketQueue) // TODO: wrong: the order must be maintained. Combine the fix with that vector idea + for (QueuedQosPacket &qosMessage : qosPacketQueue) { - c->writeMqttPacketAndBlameThisClient(*qosMessage.second.get()); - qosMessage.second->setDuplicate(); // Any dealings with this packet from here will be a duplicate. + c->writeMqttPacketAndBlameThisClient(*qosMessage.packet.get()); + qosMessage.packet->setDuplicate(); // Any dealings with this packet from here will be a duplicate. } } } diff --git a/session.h b/session.h index ac16714..258da12 100644 --- a/session.h +++ b/session.h @@ -2,21 +2,27 @@ #define SESSION_H #include -#include +#include #include #include "forward_declarations.h" #include "logger.h" -// TODO make settings +// TODO make settings. But, num of packets can't exceed 65536, because the counter is 16 bit. #define MAX_QOS_MSG_PENDING_PER_CLIENT 32 #define MAX_QOS_BYTES_PENDING_PER_CLIENT 4096 +struct QueuedQosPacket +{ + uint16_t id; + std::shared_ptr packet; +}; + class Session { std::weak_ptr client; std::string client_id; - std::unordered_map> qosPacketQueue; // TODO: because the max queue length should remain low-ish, perhaps a vector is better here. + std::list qosPacketQueue; // Using list because it's easiest to maintain order [MQTT-4.6.0-6] std::mutex qosQueueMutex; uint16_t nextPacketId = 0; ssize_t qosQueueBytes = 0;