Commit a0596b6e1992e1244c567a713eb5f42cae93348d

Authored by Wiebe Cazemier
1 parent 502ffeec

Fix bug in max packet size and big packet handling

The MQTT docs showed a wrong condition for checking malformed packets.

And, we have to grow buffers to MAX_PACKET_SIZE, otherwise we can't
process. I still have some inteligent buffer shrink logic in mind.
FlashMQTests/tst_maintests.cpp
@@ -377,7 +377,7 @@ void MainTests::test_very_big_packet() @@ -377,7 +377,7 @@ void MainTests::test_very_big_packet()
377 testContext.publish(topic, payload); 377 testContext.publish(topic, payload);
378 testContext.waitReceiverReceived(); 378 testContext.waitReceiverReceived();
379 379
380 - QVERIFY2(testContext.receivedMessages.count() == 1, "There must be one message in the received list"); 380 + QCOMPARE(testContext.receivedMessages.count(), 1);
381 381
382 QMQTT::Message msg = testContext.receivedMessages.first(); 382 QMQTT::Message msg = testContext.receivedMessages.first();
383 QCOMPARE(msg.payload(), payload); 383 QCOMPARE(msg.payload(), payload);
FlashMQTests/twoclienttestcontext.cpp
@@ -3,12 +3,17 @@ @@ -3,12 +3,17 @@
3 #include <QEventLoop> 3 #include <QEventLoop>
4 #include <QTimer> 4 #include <QTimer>
5 5
  6 +// TODO: port to QMqttClient that newer Qts now have?
  7 +
6 TwoClientTestContext::TwoClientTestContext(QObject *parent) : QObject(parent) 8 TwoClientTestContext::TwoClientTestContext(QObject *parent) : QObject(parent)
7 { 9 {
8 QHostInfo targetHostInfo = QHostInfo::fromName("localhost"); 10 QHostInfo targetHostInfo = QHostInfo::fromName("localhost");
9 QHostAddress targetHost(targetHostInfo.addresses().first()); 11 QHostAddress targetHost(targetHostInfo.addresses().first());
10 sender.reset(new QMQTT::Client(targetHost)); 12 sender.reset(new QMQTT::Client(targetHost));
11 receiver.reset(new QMQTT::Client(targetHost)); 13 receiver.reset(new QMQTT::Client(targetHost));
  14 +
  15 + connect(sender.data(), &QMQTT::Client::error, this, &TwoClientTestContext::onClientError);
  16 + connect(receiver.data(), &QMQTT::Client::error, this, &TwoClientTestContext::onClientError);
12 } 17 }
13 18
14 void TwoClientTestContext::publish(const QString &topic, const QByteArray &payload, bool retain) 19 void TwoClientTestContext::publish(const QString &topic, const QByteArray &payload, bool retain)
@@ -64,6 +69,33 @@ void TwoClientTestContext::waitReceiverReceived() @@ -64,6 +69,33 @@ void TwoClientTestContext::waitReceiverReceived()
64 waiter.exec(); 69 waiter.exec();
65 } 70 }
66 71
  72 +void TwoClientTestContext::onClientError(const QMQTT::ClientError error)
  73 +{
  74 + const QMQTT::Client *_sender = sender.data();
  75 +
  76 + // TODO: arg, doesn't qmqtt have a better way for this?
  77 + QString errStr = QString("unknown error");
  78 + if (error == QMQTT::SocketConnectionRefusedError)
  79 + errStr = "Connection refused";
  80 + if (error == QMQTT::SocketRemoteHostClosedError)
  81 + errStr = "Remote host closed";
  82 + if (error == QMQTT::SocketHostNotFoundError)
  83 + errStr = "Remote host not found";
  84 + if (error == QMQTT::MqttBadUserNameOrPasswordError)
  85 + errStr = "MQTT bad user or password";
  86 + if (error == QMQTT::MqttNotAuthorizedError)
  87 + errStr = "MQTT not authorized";
  88 + if (error == QMQTT::SocketResourceError)
  89 + errStr = "Socket resource error. Is your OS limiting you? Ulimit, etc?";
  90 + if (error == QMQTT::SocketSslInternalError)
  91 + errStr = "Socket SSL internal error.";
  92 + if (error == QMQTT::SocketTimeoutError)
  93 + errStr = "Socket timeout";
  94 +
  95 + QString msg = QString("Client %1 error code: %2 (%3). Initiated delayed reconnect.\n").arg(_sender->clientId()).arg(error).arg(errStr);
  96 + throw new std::runtime_error(msg.toStdString());
  97 +}
  98 +
67 void TwoClientTestContext::onReceiverReceived(const QMQTT::Message &message) 99 void TwoClientTestContext::onReceiverReceived(const QMQTT::Message &message)
68 { 100 {
69 receivedMessages.append(message); 101 receivedMessages.append(message);
FlashMQTests/twoclienttestcontext.h
@@ -23,6 +23,7 @@ public: @@ -23,6 +23,7 @@ public:
23 void disconnectReceiver(); 23 void disconnectReceiver();
24 void subscribeReceiver(const QString &topic); 24 void subscribeReceiver(const QString &topic);
25 void waitReceiverReceived(); 25 void waitReceiverReceived();
  26 + void onClientError(const QMQTT::ClientError error);
26 27
27 QList<QMQTT::Message> receivedMessages; 28 QList<QMQTT::Message> receivedMessages;
28 29
client.cpp
@@ -57,7 +57,7 @@ bool Client::readFdIntoBuffer() @@ -57,7 +57,7 @@ bool Client::readFdIntoBuffer()
57 // Make sure we either always have enough space for a next call of this method, or stop reading the fd. 57 // Make sure we either always have enough space for a next call of this method, or stop reading the fd.
58 if (readbuf.freeSpace() == 0) 58 if (readbuf.freeSpace() == 0)
59 { 59 {
60 - if (readbuf.getSize() * 2 < CLIENT_MAX_BUFFER_SIZE) 60 + if (readbuf.getSize() * 2 < MAX_PACKET_SIZE)
61 { 61 {
62 readbuf.doubleSize(); 62 readbuf.doubleSize();
63 } 63 }
@@ -81,13 +81,19 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet) @@ -81,13 +81,19 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet)
81 { 81 {
82 std::lock_guard<std::mutex> locker(writeBufMutex); 82 std::lock_guard<std::mutex> locker(writeBufMutex);
83 83
84 - while (packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) 84 + // Grow as far as we can. We have to make room for one MQTT packet.
  85 + while (packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace() && writebuf.getSize() < MAX_PACKET_SIZE)
85 { 86 {
86 - if (packet.packetType == PacketType::PUBLISH && writebuf.getSize() >= CLIENT_MAX_BUFFER_SIZE)  
87 - return;  
88 writebuf.doubleSize(); 87 writebuf.doubleSize();
89 } 88 }
90 89
  90 + // And drop a publish when it doesn't fit, even after resizing. This means we do allow pings.
  91 + // TODO: when QoS is implemented, different filtering may be required.
  92 + if (packet.packetType == PacketType::PUBLISH && packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace())
  93 + {
  94 + return;
  95 + }
  96 +
91 if (!packet.containsFixedHeader()) 97 if (!packet.containsFixedHeader())
92 { 98 {
93 writebuf.headPtr()[0] = packet.getFirstByte(); 99 writebuf.headPtr()[0] = packet.getFirstByte();
@@ -257,8 +263,8 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_ @@ -257,8 +263,8 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_
257 encodedByte = readbuf.peakAhead(remaining_length_i++); 263 encodedByte = readbuf.peakAhead(remaining_length_i++);
258 packet_length += (encodedByte & 127) * multiplier; 264 packet_length += (encodedByte & 127) * multiplier;
259 multiplier *= 128; 265 multiplier *= 128;
260 - if (multiplier > 128*128*128)  
261 - return false; 266 + if (multiplier > 128*128*128*128)
  267 + throw ProtocolError("Malformed Remaining Length.");
262 } 268 }
263 while ((encodedByte & 128) != 0); 269 while ((encodedByte & 128) != 0);
264 packet_length += fixed_header_length; 270 packet_length += fixed_header_length;
client.h
@@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
16 16
17 17
18 #define CLIENT_BUFFER_SIZE 1024 // Must be power of 2 18 #define CLIENT_BUFFER_SIZE 1024 // Must be power of 2
19 -#define CLIENT_MAX_BUFFER_SIZE 65536 19 +#define MAX_PACKET_SIZE 268435461 // 256 MB + 5
20 #define MQTT_HEADER_LENGH 2 20 #define MQTT_HEADER_LENGH 2
21 21
22 class Client 22 class Client