Commit f86677226ddd5c69be73ded03b89c43cc86a28a1

Authored by Wiebe Cazemier
1 parent 6141bbcc

QoS message queue is a list, for easy ordering

Showing 2 changed files with 37 additions and 15 deletions
session.cpp
... ... @@ -47,7 +47,10 @@ void Session::writePacket(const MqttPacket &packet, char qos_arg)
47 47 }
48 48 const uint16_t pid = nextPacketId++;
49 49 copyPacket->setPacketId(pid);
50   - qosPacketQueue[pid] = copyPacket;
  50 + QueuedQosPacket p;
  51 + p.packet = copyPacket;
  52 + p.id = pid;
  53 + qosPacketQueue.push_back(p);
51 54 qosQueueBytes += copyPacket->getTotalMemoryFootprint();
52 55 locker.unlock();
53 56  
... ... @@ -60,18 +63,31 @@ void Session::writePacket(const MqttPacket &packet, char qos_arg)
60 63 }
61 64 }
62 65  
  66 +// Normatively, this while loop will break on the first element, because all messages are sent out in order and
  67 +// should be acked in order.
63 68 void Session::clearQosMessage(uint16_t packet_id)
64 69 {
65 70 std::lock_guard<std::mutex> locker(qosQueueMutex);
66   - auto it = qosPacketQueue.find(packet_id);
67   - if (it != qosPacketQueue.end())
  71 +
  72 + auto it = qosPacketQueue.begin();
  73 + auto end = qosPacketQueue.end();
  74 + while (it != end)
68 75 {
69   - std::shared_ptr<MqttPacket> packet = it->second;
70   - qosPacketQueue.erase(it);
71   - qosQueueBytes -= packet->getTotalMemoryFootprint();
72   - assert(qosQueueBytes >= 0);
73   - if (qosQueueBytes < 0) // Should not happen, but correcting a hypothetical bug is fine for this purpose.
74   - qosQueueBytes = 0;
  76 + QueuedQosPacket &p = *it;
  77 + if (p.id == packet_id)
  78 + {
  79 + size_t mem = p.packet->getTotalMemoryFootprint();
  80 + qosQueueBytes -= mem;
  81 + assert(qosQueueBytes >= 0);
  82 + if (qosQueueBytes < 0) // Should not happen, but correcting a hypothetical bug is fine for this purpose.
  83 + qosQueueBytes = 0;
  84 +
  85 + qosPacketQueue.erase(it);
  86 +
  87 + break;
  88 + }
  89 +
  90 + it++;
75 91 }
76 92 }
77 93  
... ... @@ -87,10 +103,10 @@ void Session::sendPendingQosMessages()
87 103 {
88 104 Client_p c = makeSharedClient();
89 105 std::lock_guard<std::mutex> locker(qosQueueMutex);
90   - for (auto &qosMessage : qosPacketQueue) // TODO: wrong: the order must be maintained. Combine the fix with that vector idea
  106 + for (QueuedQosPacket &qosMessage : qosPacketQueue)
91 107 {
92   - c->writeMqttPacketAndBlameThisClient(*qosMessage.second.get());
93   - qosMessage.second->setDuplicate(); // Any dealings with this packet from here will be a duplicate.
  108 + c->writeMqttPacketAndBlameThisClient(*qosMessage.packet.get());
  109 + qosMessage.packet->setDuplicate(); // Any dealings with this packet from here will be a duplicate.
94 110 }
95 111 }
96 112 }
... ...
session.h
... ... @@ -2,21 +2,27 @@
2 2 #define SESSION_H
3 3  
4 4 #include <memory>
5   -#include <unordered_map>
  5 +#include <list>
6 6 #include <mutex>
7 7  
8 8 #include "forward_declarations.h"
9 9 #include "logger.h"
10 10  
11   -// TODO make settings
  11 +// TODO make settings. But, num of packets can't exceed 65536, because the counter is 16 bit.
12 12 #define MAX_QOS_MSG_PENDING_PER_CLIENT 32
13 13 #define MAX_QOS_BYTES_PENDING_PER_CLIENT 4096
14 14  
  15 +struct QueuedQosPacket
  16 +{
  17 + uint16_t id;
  18 + std::shared_ptr<MqttPacket> packet;
  19 +};
  20 +
15 21 class Session
16 22 {
17 23 std::weak_ptr<Client> client;
18 24 std::string client_id;
19   - std::unordered_map<uint16_t, std::shared_ptr<MqttPacket>> qosPacketQueue; // TODO: because the max queue length should remain low-ish, perhaps a vector is better here.
  25 + std::list<QueuedQosPacket> qosPacketQueue; // Using list because it's easiest to maintain order [MQTT-4.6.0-6]
20 26 std::mutex qosQueueMutex;
21 27 uint16_t nextPacketId = 0;
22 28 ssize_t qosQueueBytes = 0;
... ...