diff --git a/mqttpacket.cpp b/mqttpacket.cpp index e3eb51c..184170b 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -86,6 +86,7 @@ MqttPacket::MqttPacket(const Publish &publish) : } packetType = PacketType::PUBLISH; + this->qos = publish.qos; first_byte = static_cast(packetType) << 4; first_byte |= (publish.qos << 1); first_byte |= (static_cast(publish.retain) & 0b00000001); @@ -98,7 +99,10 @@ MqttPacket::MqttPacket(const Publish &publish) : if (publish.qos) { - throw NotImplementedException("I would write two bytes containing the packet id here, but QoS is not done yet."); + // Reserve the space for the packet id, which will be assigned later. + packet_id_pos = pos; + char zero[2]; + writeBytes(zero, 2); } writeBytes(publish.payload.c_str(), publish.payload.length()); @@ -398,8 +402,8 @@ RemainingLength MqttPacket::getRemainingLength() const void MqttPacket::setPacketId(uint16_t packet_id) { - // In other words, we assume that this code can only be called on packets of which we have all the bytes, including fixed header. - assert(fixed_header_length > 0); + assert(fixed_header_length == 0 || first_byte == bites[0]); + assert(packet_id_pos > 0); assert(packetType == PacketType::PUBLISH); assert(qos > 0); @@ -414,15 +418,17 @@ void MqttPacket::setPacketId(uint16_t packet_id) // If I read the specs correctly, the DUP flag is merely for show. It doesn't control anything? void MqttPacket::setDuplicate() { - // In other words, we assume that this code can only be called on packets of which we have all the bytes, including fixed header. - assert(fixed_header_length > 0); assert(packetType == PacketType::PUBLISH); assert(qos > 0); + assert(fixed_header_length == 0 || first_byte == bites[0]); + + first_byte |= 0b00001000; - char byte1 = bites[0]; - byte1 |= 0b00001000; - pos = 0; - writeByte(byte1); + if (fixed_header_length > 0) + { + pos = 0; + writeByte(first_byte); + } } size_t MqttPacket::getTotalMemoryFootprint() diff --git a/session.cpp b/session.cpp index 00a2a76..b9a02b8 100644 --- a/session.cpp +++ b/session.cpp @@ -24,9 +24,9 @@ void Session::assignActiveConnection(std::shared_ptr &client) this->client_id = client->getClientId(); } -void Session::writePacket(const MqttPacket &packet, char qos_arg) +void Session::writePacket(const MqttPacket &packet, char max_qos) { - const char qos = std::min(packet.getQos(), qos_arg); + const char qos = std::min(packet.getQos(), max_qos); if (qos == 0) { diff --git a/session.h b/session.h index 258da12..1aa4b76 100644 --- a/session.h +++ b/session.h @@ -36,7 +36,7 @@ public: bool clientDisconnected() const; std::shared_ptr makeSharedClient() const; void assignActiveConnection(std::shared_ptr &client); - void writePacket(const MqttPacket &packet, char qos_arg); + void writePacket(const MqttPacket &packet, char max_qos); void clearQosMessage(uint16_t packet_id); void sendPendingQosMessages(); }; diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index a4ddb55..1829dcd 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -65,19 +65,22 @@ void SubscriptionStore::addSubscription(Client_p &client, const std::string &top deepestNode = node.get(); } + assert(deepestNode); + if (deepestNode) { auto session_it = sessionsByIdConst.find(client->getClientId()); if (session_it != sessionsByIdConst.end()) { - std::weak_ptr b = session_it->second; - deepestNode->addSubscriber(session_it->second, qos); + const std::shared_ptr &ses = session_it->second; + deepestNode->addSubscriber(ses, qos); + giveClientRetainedMessages(ses, topic, qos); } } lock_guard.unlock(); - giveClientRetainedMessages(client, topic); + } // Removes an existing client when it already exists [MQTT-3.1.4-2]. @@ -176,7 +179,7 @@ void SubscriptionStore::queuePacketAtSubscribers(const std::string &topic, const publishRecursively(subtopics.begin(), subtopics.end(), root, packet); } -void SubscriptionStore::giveClientRetainedMessages(Client_p &client, const std::string &subscribe_topic) +void SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr &ses, const std::string &subscribe_topic, char max_qos) { RWLockGuard locker(&retainedMessagesRwlock); locker.rdlock(); @@ -188,7 +191,7 @@ void SubscriptionStore::giveClientRetainedMessages(Client_p &client, const std:: const MqttPacket packet(publish); if (topicsMatch(subscribe_topic, rm.topic)) - client->writeMqttPacket(packet); // TODO: I think this needs to be session, not client, and then I can store it if it's QoS? I need to research how retain+qos works + ses->writePacket(packet, max_qos); } } diff --git a/subscriptionstore.h b/subscriptionstore.h index d1c009d..4d62022 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -69,7 +69,7 @@ public: void registerClientAndKickExistingOne(Client_p &client); void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet, const Client_p &sender); - void giveClientRetainedMessages(Client_p &client, const std::string &subscribe_topic); + void giveClientRetainedMessages(const std::shared_ptr &ses, const std::string &subscribe_topic, char max_qos); void setRetainedMessage(const std::string &topic, const std::string &payload, char qos); };