Commit 8094510fe9c1aef9aee705fcd63c6a6afc16d941

Authored by Wiebe Cazemier
1 parent f8667722

QoS on retained messages

mqttpacket.cpp
@@ -86,6 +86,7 @@ MqttPacket::MqttPacket(const Publish &publish) : @@ -86,6 +86,7 @@ MqttPacket::MqttPacket(const Publish &publish) :
86 } 86 }
87 87
88 packetType = PacketType::PUBLISH; 88 packetType = PacketType::PUBLISH;
  89 + this->qos = publish.qos;
89 first_byte = static_cast<char>(packetType) << 4; 90 first_byte = static_cast<char>(packetType) << 4;
90 first_byte |= (publish.qos << 1); 91 first_byte |= (publish.qos << 1);
91 first_byte |= (static_cast<char>(publish.retain) & 0b00000001); 92 first_byte |= (static_cast<char>(publish.retain) & 0b00000001);
@@ -98,7 +99,10 @@ MqttPacket::MqttPacket(const Publish &amp;publish) : @@ -98,7 +99,10 @@ MqttPacket::MqttPacket(const Publish &amp;publish) :
98 99
99 if (publish.qos) 100 if (publish.qos)
100 { 101 {
101 - throw NotImplementedException("I would write two bytes containing the packet id here, but QoS is not done yet."); 102 + // Reserve the space for the packet id, which will be assigned later.
  103 + packet_id_pos = pos;
  104 + char zero[2];
  105 + writeBytes(zero, 2);
102 } 106 }
103 107
104 writeBytes(publish.payload.c_str(), publish.payload.length()); 108 writeBytes(publish.payload.c_str(), publish.payload.length());
@@ -398,8 +402,8 @@ RemainingLength MqttPacket::getRemainingLength() const @@ -398,8 +402,8 @@ RemainingLength MqttPacket::getRemainingLength() const
398 402
399 void MqttPacket::setPacketId(uint16_t packet_id) 403 void MqttPacket::setPacketId(uint16_t packet_id)
400 { 404 {
401 - // In other words, we assume that this code can only be called on packets of which we have all the bytes, including fixed header.  
402 - assert(fixed_header_length > 0); 405 + assert(fixed_header_length == 0 || first_byte == bites[0]);
  406 + assert(packet_id_pos > 0);
403 assert(packetType == PacketType::PUBLISH); 407 assert(packetType == PacketType::PUBLISH);
404 assert(qos > 0); 408 assert(qos > 0);
405 409
@@ -414,15 +418,17 @@ void MqttPacket::setPacketId(uint16_t packet_id) @@ -414,15 +418,17 @@ void MqttPacket::setPacketId(uint16_t packet_id)
414 // If I read the specs correctly, the DUP flag is merely for show. It doesn't control anything? 418 // If I read the specs correctly, the DUP flag is merely for show. It doesn't control anything?
415 void MqttPacket::setDuplicate() 419 void MqttPacket::setDuplicate()
416 { 420 {
417 - // In other words, we assume that this code can only be called on packets of which we have all the bytes, including fixed header.  
418 - assert(fixed_header_length > 0);  
419 assert(packetType == PacketType::PUBLISH); 421 assert(packetType == PacketType::PUBLISH);
420 assert(qos > 0); 422 assert(qos > 0);
  423 + assert(fixed_header_length == 0 || first_byte == bites[0]);
  424 +
  425 + first_byte |= 0b00001000;
421 426
422 - char byte1 = bites[0];  
423 - byte1 |= 0b00001000;  
424 - pos = 0;  
425 - writeByte(byte1); 427 + if (fixed_header_length > 0)
  428 + {
  429 + pos = 0;
  430 + writeByte(first_byte);
  431 + }
426 } 432 }
427 433
428 size_t MqttPacket::getTotalMemoryFootprint() 434 size_t MqttPacket::getTotalMemoryFootprint()
session.cpp
@@ -24,9 +24,9 @@ void Session::assignActiveConnection(std::shared_ptr&lt;Client&gt; &amp;client) @@ -24,9 +24,9 @@ void Session::assignActiveConnection(std::shared_ptr&lt;Client&gt; &amp;client)
24 this->client_id = client->getClientId(); 24 this->client_id = client->getClientId();
25 } 25 }
26 26
27 -void Session::writePacket(const MqttPacket &packet, char qos_arg) 27 +void Session::writePacket(const MqttPacket &packet, char max_qos)
28 { 28 {
29 - const char qos = std::min<char>(packet.getQos(), qos_arg); 29 + const char qos = std::min<char>(packet.getQos(), max_qos);
30 30
31 if (qos == 0) 31 if (qos == 0)
32 { 32 {
session.h
@@ -36,7 +36,7 @@ public: @@ -36,7 +36,7 @@ public:
36 bool clientDisconnected() const; 36 bool clientDisconnected() const;
37 std::shared_ptr<Client> makeSharedClient() const; 37 std::shared_ptr<Client> makeSharedClient() const;
38 void assignActiveConnection(std::shared_ptr<Client> &client); 38 void assignActiveConnection(std::shared_ptr<Client> &client);
39 - void writePacket(const MqttPacket &packet, char qos_arg); 39 + void writePacket(const MqttPacket &packet, char max_qos);
40 void clearQosMessage(uint16_t packet_id); 40 void clearQosMessage(uint16_t packet_id);
41 void sendPendingQosMessages(); 41 void sendPendingQosMessages();
42 }; 42 };
subscriptionstore.cpp
@@ -65,19 +65,22 @@ void SubscriptionStore::addSubscription(Client_p &amp;client, const std::string &amp;top @@ -65,19 +65,22 @@ void SubscriptionStore::addSubscription(Client_p &amp;client, const std::string &amp;top
65 deepestNode = node.get(); 65 deepestNode = node.get();
66 } 66 }
67 67
  68 + assert(deepestNode);
  69 +
68 if (deepestNode) 70 if (deepestNode)
69 { 71 {
70 auto session_it = sessionsByIdConst.find(client->getClientId()); 72 auto session_it = sessionsByIdConst.find(client->getClientId());
71 if (session_it != sessionsByIdConst.end()) 73 if (session_it != sessionsByIdConst.end())
72 { 74 {
73 - std::weak_ptr<Session> b = session_it->second;  
74 - deepestNode->addSubscriber(session_it->second, qos); 75 + const std::shared_ptr<Session> &ses = session_it->second;
  76 + deepestNode->addSubscriber(ses, qos);
  77 + giveClientRetainedMessages(ses, topic, qos);
75 } 78 }
76 } 79 }
77 80
78 lock_guard.unlock(); 81 lock_guard.unlock();
79 82
80 - giveClientRetainedMessages(client, topic); 83 +
81 } 84 }
82 85
83 // Removes an existing client when it already exists [MQTT-3.1.4-2]. 86 // Removes an existing client when it already exists [MQTT-3.1.4-2].
@@ -176,7 +179,7 @@ void SubscriptionStore::queuePacketAtSubscribers(const std::string &amp;topic, const @@ -176,7 +179,7 @@ void SubscriptionStore::queuePacketAtSubscribers(const std::string &amp;topic, const
176 publishRecursively(subtopics.begin(), subtopics.end(), root, packet); 179 publishRecursively(subtopics.begin(), subtopics.end(), root, packet);
177 } 180 }
178 181
179 -void SubscriptionStore::giveClientRetainedMessages(Client_p &client, const std::string &subscribe_topic) 182 +void SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr<Session> &ses, const std::string &subscribe_topic, char max_qos)
180 { 183 {
181 RWLockGuard locker(&retainedMessagesRwlock); 184 RWLockGuard locker(&retainedMessagesRwlock);
182 locker.rdlock(); 185 locker.rdlock();
@@ -188,7 +191,7 @@ void SubscriptionStore::giveClientRetainedMessages(Client_p &amp;client, const std:: @@ -188,7 +191,7 @@ void SubscriptionStore::giveClientRetainedMessages(Client_p &amp;client, const std::
188 const MqttPacket packet(publish); 191 const MqttPacket packet(publish);
189 192
190 if (topicsMatch(subscribe_topic, rm.topic)) 193 if (topicsMatch(subscribe_topic, rm.topic))
191 - client->writeMqttPacket(packet); // TODO: I think this needs to be session, not client, and then I can store it if it's QoS? I need to research how retain+qos works 194 + ses->writePacket(packet, max_qos);
192 } 195 }
193 } 196 }
194 197
subscriptionstore.h
@@ -69,7 +69,7 @@ public: @@ -69,7 +69,7 @@ public:
69 void registerClientAndKickExistingOne(Client_p &client); 69 void registerClientAndKickExistingOne(Client_p &client);
70 70
71 void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet, const Client_p &sender); 71 void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet, const Client_p &sender);
72 - void giveClientRetainedMessages(Client_p &client, const std::string &subscribe_topic); 72 + void giveClientRetainedMessages(const std::shared_ptr<Session> &ses, const std::string &subscribe_topic, char max_qos);
73 73
74 void setRetainedMessage(const std::string &topic, const std::string &payload, char qos); 74 void setRetainedMessage(const std::string &topic, const std::string &payload, char qos);
75 }; 75 };