Commit 8b6c686c5c30e221082c8487849cb7dd9c36787b

Authored by Wiebe Cazemier
1 parent f5c677fe

Fix assert error on exceeding write buffer

Also fixes not downgrading QoS on subscribe.
client.cpp
@@ -157,7 +157,7 @@ void Client::writeText(const std::string &text) @@ -157,7 +157,7 @@ void Client::writeText(const std::string &text)
157 setReadyForWriting(true); 157 setReadyForWriting(true);
158 } 158 }
159 159
160 -void Client::writeMqttPacket(const MqttPacket &packet) 160 +void Client::writeMqttPacket(const MqttPacket &packet, const char qos)
161 { 161 {
162 std::lock_guard<std::mutex> locker(writeBufMutex); 162 std::lock_guard<std::mutex> locker(writeBufMutex);
163 163
@@ -170,11 +170,13 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet) @@ -170,11 +170,13 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet)
170 170
171 // And drop a publish when it doesn't fit, even after resizing. This means we do allow pings. And 171 // And drop a publish when it doesn't fit, even after resizing. This means we do allow pings. And
172 // QoS packet are queued and limited elsewhere. 172 // QoS packet are queued and limited elsewhere.
173 - if (packet.packetType == PacketType::PUBLISH && packet.getQos() == 0 && packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) 173 + if (packet.packetType == PacketType::PUBLISH && qos == 0 && packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace())
174 { 174 {
175 return; 175 return;
176 } 176 }
177 177
  178 + writebuf.ensureFreeSpace(packet.getSizeIncludingNonPresentHeader());
  179 +
178 if (!packet.containsFixedHeader()) 180 if (!packet.containsFixedHeader())
179 { 181 {
180 writebuf.headPtr()[0] = packet.getFirstByte(); 182 writebuf.headPtr()[0] = packet.getFirstByte();
@@ -192,11 +194,11 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet) @@ -192,11 +194,11 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet)
192 } 194 }
193 195
194 // Helper method to avoid the exception ending up at the sender of messages, which would then get disconnected. 196 // Helper method to avoid the exception ending up at the sender of messages, which would then get disconnected.
195 -void Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet) 197 +void Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet, const char qos)
196 { 198 {
197 try 199 try
198 { 200 {
199 - this->writeMqttPacket(packet); 201 + this->writeMqttPacket(packet, qos);
200 } 202 }
201 catch (std::exception &ex) 203 catch (std::exception &ex)
202 { 204 {
client.h
@@ -119,8 +119,8 @@ public: @@ -119,8 +119,8 @@ public:
119 119
120 void writeText(const std::string &text); 120 void writeText(const std::string &text);
121 void writePingResp(); 121 void writePingResp();
122 - void writeMqttPacket(const MqttPacket &packet);  
123 - void writeMqttPacketAndBlameThisClient(const MqttPacket &packet); 122 + void writeMqttPacket(const MqttPacket &packet, const char qos = 0);
  123 + void writeMqttPacketAndBlameThisClient(const MqttPacket &packet, const char qos);
124 bool writeBufIntoFd(); 124 bool writeBufIntoFd();
125 bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; } 125 bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; }
126 126
session.cpp
@@ -61,7 +61,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos) @@ -61,7 +61,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos)
61 if (!clientDisconnected()) 61 if (!clientDisconnected())
62 { 62 {
63 std::shared_ptr<Client> c = makeSharedClient(); 63 std::shared_ptr<Client> c = makeSharedClient();
64 - c->writeMqttPacketAndBlameThisClient(packet); 64 + c->writeMqttPacketAndBlameThisClient(packet, qos);
65 } 65 }
66 } 66 }
67 else if (qos == 1) 67 else if (qos == 1)
@@ -85,7 +85,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos) @@ -85,7 +85,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos)
85 if (!clientDisconnected()) 85 if (!clientDisconnected())
86 { 86 {
87 std::shared_ptr<Client> c = makeSharedClient(); 87 std::shared_ptr<Client> c = makeSharedClient();
88 - c->writeMqttPacketAndBlameThisClient(*copyPacket.get()); 88 + c->writeMqttPacketAndBlameThisClient(*copyPacket.get(), qos);
89 copyPacket->setDuplicate(); // Any dealings with this packet from here will be a duplicate. 89 copyPacket->setDuplicate(); // Any dealings with this packet from here will be a duplicate.
90 } 90 }
91 } 91 }
@@ -134,7 +134,7 @@ void Session::sendPendingQosMessages() @@ -134,7 +134,7 @@ void Session::sendPendingQosMessages()
134 std::lock_guard<std::mutex> locker(qosQueueMutex); 134 std::lock_guard<std::mutex> locker(qosQueueMutex);
135 for (QueuedQosPacket &qosMessage : qosPacketQueue) 135 for (QueuedQosPacket &qosMessage : qosPacketQueue)
136 { 136 {
137 - c->writeMqttPacketAndBlameThisClient(*qosMessage.packet.get()); 137 + c->writeMqttPacketAndBlameThisClient(*qosMessage.packet.get(), qosMessage.packet->getQos());
138 qosMessage.packet->setDuplicate(); // Any dealings with this packet from here will be a duplicate. 138 qosMessage.packet->setDuplicate(); // Any dealings with this packet from here will be a duplicate.
139 } 139 }
140 } 140 }