diff --git a/client.cpp b/client.cpp index a225f71..b33d3b5 100644 --- a/client.cpp +++ b/client.cpp @@ -157,7 +157,7 @@ void Client::writeText(const std::string &text) setReadyForWriting(true); } -void Client::writeMqttPacket(const MqttPacket &packet) +void Client::writeMqttPacket(const MqttPacket &packet, const char qos) { std::lock_guard locker(writeBufMutex); @@ -170,11 +170,13 @@ void Client::writeMqttPacket(const MqttPacket &packet) // And drop a publish when it doesn't fit, even after resizing. This means we do allow pings. And // QoS packet are queued and limited elsewhere. - if (packet.packetType == PacketType::PUBLISH && packet.getQos() == 0 && packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) + if (packet.packetType == PacketType::PUBLISH && qos == 0 && packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) { return; } + writebuf.ensureFreeSpace(packet.getSizeIncludingNonPresentHeader()); + if (!packet.containsFixedHeader()) { writebuf.headPtr()[0] = packet.getFirstByte(); @@ -192,11 +194,11 @@ void Client::writeMqttPacket(const MqttPacket &packet) } // Helper method to avoid the exception ending up at the sender of messages, which would then get disconnected. -void Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet) +void Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet, const char qos) { try { - this->writeMqttPacket(packet); + this->writeMqttPacket(packet, qos); } catch (std::exception &ex) { diff --git a/client.h b/client.h index a730a8e..832b166 100644 --- a/client.h +++ b/client.h @@ -119,8 +119,8 @@ public: void writeText(const std::string &text); void writePingResp(); - void writeMqttPacket(const MqttPacket &packet); - void writeMqttPacketAndBlameThisClient(const MqttPacket &packet); + void writeMqttPacket(const MqttPacket &packet, const char qos = 0); + void writeMqttPacketAndBlameThisClient(const MqttPacket &packet, const char qos); bool writeBufIntoFd(); bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; } diff --git a/session.cpp b/session.cpp index 77b73ca..a4a06fa 100644 --- a/session.cpp +++ b/session.cpp @@ -61,7 +61,7 @@ void Session::writePacket(const MqttPacket &packet, char max_qos) if (!clientDisconnected()) { std::shared_ptr c = makeSharedClient(); - c->writeMqttPacketAndBlameThisClient(packet); + c->writeMqttPacketAndBlameThisClient(packet, qos); } } else if (qos == 1) @@ -85,7 +85,7 @@ void Session::writePacket(const MqttPacket &packet, char max_qos) if (!clientDisconnected()) { std::shared_ptr c = makeSharedClient(); - c->writeMqttPacketAndBlameThisClient(*copyPacket.get()); + c->writeMqttPacketAndBlameThisClient(*copyPacket.get(), qos); copyPacket->setDuplicate(); // Any dealings with this packet from here will be a duplicate. } } @@ -134,7 +134,7 @@ void Session::sendPendingQosMessages() std::lock_guard locker(qosQueueMutex); for (QueuedQosPacket &qosMessage : qosPacketQueue) { - c->writeMqttPacketAndBlameThisClient(*qosMessage.packet.get()); + c->writeMqttPacketAndBlameThisClient(*qosMessage.packet.get(), qosMessage.packet->getQos()); qosMessage.packet->setDuplicate(); // Any dealings with this packet from here will be a duplicate. } }