Commit 502ffeec24b3df63887919aa3534348e5fe17288
1 parent
0262ae00
Fix write buffer bug: wasn't increased enough
There still is a bug: writing a very big packet. I wrote the test case already.
Showing
5 changed files
with
60 additions
and
11 deletions
FlashMQTests/tst_maintests.cpp
| @@ -32,6 +32,9 @@ private slots: | @@ -32,6 +32,9 @@ private slots: | ||
| 32 | void test_retained_changed(); | 32 | void test_retained_changed(); |
| 33 | void test_retained_removed(); | 33 | void test_retained_removed(); |
| 34 | 34 | ||
| 35 | + void test_packet_bigger_than_one_doubling(); | ||
| 36 | + void test_very_big_packet(); | ||
| 37 | + | ||
| 35 | }; | 38 | }; |
| 36 | 39 | ||
| 37 | MainTests::MainTests() | 40 | MainTests::MainTests() |
| @@ -267,8 +270,8 @@ void MainTests::test_retained() | @@ -267,8 +270,8 @@ void MainTests::test_retained() | ||
| 267 | QString topic = "retaintopic"; | 270 | QString topic = "retaintopic"; |
| 268 | 271 | ||
| 269 | testContext.connectSender(); | 272 | testContext.connectSender(); |
| 270 | - testContext.publishRetained(topic, payload); | ||
| 271 | - testContext.publishRetained("dummy2", "Nobody sees this"); | 273 | + testContext.publish(topic, payload, true); |
| 274 | + testContext.publish("dummy2", "Nobody sees this", true); | ||
| 272 | 275 | ||
| 273 | testContext.connectReceiver(); | 276 | testContext.connectReceiver(); |
| 274 | testContext.subscribeReceiver("dummy"); | 277 | testContext.subscribeReceiver("dummy"); |
| @@ -283,7 +286,7 @@ void MainTests::test_retained() | @@ -283,7 +286,7 @@ void MainTests::test_retained() | ||
| 283 | 286 | ||
| 284 | testContext.receivedMessages.clear(); | 287 | testContext.receivedMessages.clear(); |
| 285 | 288 | ||
| 286 | - testContext.publishRetained(topic, payload); | 289 | + testContext.publish(topic, payload, true); |
| 287 | testContext.waitReceiverReceived(); | 290 | testContext.waitReceiverReceived(); |
| 288 | 291 | ||
| 289 | QVERIFY2(testContext.receivedMessages.count() == 1, "There must be one message in the received list"); | 292 | QVERIFY2(testContext.receivedMessages.count() == 1, "There must be one message in the received list"); |
| @@ -300,11 +303,11 @@ void MainTests::test_retained_changed() | @@ -300,11 +303,11 @@ void MainTests::test_retained_changed() | ||
| 300 | QString topic = "retaintopic"; | 303 | QString topic = "retaintopic"; |
| 301 | 304 | ||
| 302 | testContext.connectSender(); | 305 | testContext.connectSender(); |
| 303 | - testContext.publishRetained(topic, payload); | 306 | + testContext.publish(topic, payload, true); |
| 304 | 307 | ||
| 305 | payload = "Changed payload"; | 308 | payload = "Changed payload"; |
| 306 | 309 | ||
| 307 | - testContext.publishRetained(topic, payload); | 310 | + testContext.publish(topic, payload, true); |
| 308 | 311 | ||
| 309 | testContext.connectReceiver(); | 312 | testContext.connectReceiver(); |
| 310 | testContext.subscribeReceiver(topic); | 313 | testContext.subscribeReceiver(topic); |
| @@ -325,11 +328,11 @@ void MainTests::test_retained_removed() | @@ -325,11 +328,11 @@ void MainTests::test_retained_removed() | ||
| 325 | QString topic = "retaintopic"; | 328 | QString topic = "retaintopic"; |
| 326 | 329 | ||
| 327 | testContext.connectSender(); | 330 | testContext.connectSender(); |
| 328 | - testContext.publishRetained(topic, payload); | 331 | + testContext.publish(topic, payload, true); |
| 329 | 332 | ||
| 330 | payload = ""; | 333 | payload = ""; |
| 331 | 334 | ||
| 332 | - testContext.publishRetained(topic, payload); | 335 | + testContext.publish(topic, payload, true); |
| 333 | 336 | ||
| 334 | testContext.connectReceiver(); | 337 | testContext.connectReceiver(); |
| 335 | testContext.subscribeReceiver(topic); | 338 | testContext.subscribeReceiver(topic); |
| @@ -338,6 +341,49 @@ void MainTests::test_retained_removed() | @@ -338,6 +341,49 @@ void MainTests::test_retained_removed() | ||
| 338 | QVERIFY2(testContext.receivedMessages.empty(), "We erased the retained message. We shouldn't have received any."); | 341 | QVERIFY2(testContext.receivedMessages.empty(), "We erased the retained message. We shouldn't have received any."); |
| 339 | } | 342 | } |
| 340 | 343 | ||
| 344 | +void MainTests::test_packet_bigger_than_one_doubling() | ||
| 345 | +{ | ||
| 346 | + TwoClientTestContext testContext; | ||
| 347 | + | ||
| 348 | + QByteArray payload(8000, 3); | ||
| 349 | + QString topic = "hugepacket"; | ||
| 350 | + | ||
| 351 | + testContext.connectSender(); | ||
| 352 | + testContext.connectReceiver(); | ||
| 353 | + testContext.subscribeReceiver(topic); | ||
| 354 | + | ||
| 355 | + testContext.publish(topic, payload); | ||
| 356 | + testContext.waitReceiverReceived(); | ||
| 357 | + | ||
| 358 | + QVERIFY2(testContext.receivedMessages.count() == 1, "There must be one message in the received list"); | ||
| 359 | + | ||
| 360 | + QMQTT::Message msg = testContext.receivedMessages.first(); | ||
| 361 | + QCOMPARE(msg.payload(), payload); | ||
| 362 | + QVERIFY(!msg.retain()); | ||
| 363 | +} | ||
| 364 | + | ||
| 365 | +// This tests our write buffer, and that it's emptied during writing already. | ||
| 366 | +void MainTests::test_very_big_packet() | ||
| 367 | +{ | ||
| 368 | + TwoClientTestContext testContext; | ||
| 369 | + | ||
| 370 | + QByteArray payload(10*1024*1024, 3); | ||
| 371 | + QString topic = "hugepacket"; | ||
| 372 | + | ||
| 373 | + testContext.connectSender(); | ||
| 374 | + testContext.connectReceiver(); | ||
| 375 | + testContext.subscribeReceiver(topic); | ||
| 376 | + | ||
| 377 | + testContext.publish(topic, payload); | ||
| 378 | + testContext.waitReceiverReceived(); | ||
| 379 | + | ||
| 380 | + QVERIFY2(testContext.receivedMessages.count() == 1, "There must be one message in the received list"); | ||
| 381 | + | ||
| 382 | + QMQTT::Message msg = testContext.receivedMessages.first(); | ||
| 383 | + QCOMPARE(msg.payload(), payload); | ||
| 384 | + QVERIFY(!msg.retain()); | ||
| 385 | +} | ||
| 386 | + | ||
| 341 | QTEST_GUILESS_MAIN(MainTests) | 387 | QTEST_GUILESS_MAIN(MainTests) |
| 342 | 388 | ||
| 343 | #include "tst_maintests.moc" | 389 | #include "tst_maintests.moc" |
FlashMQTests/twoclienttestcontext.cpp
| @@ -11,11 +11,11 @@ TwoClientTestContext::TwoClientTestContext(QObject *parent) : QObject(parent) | @@ -11,11 +11,11 @@ TwoClientTestContext::TwoClientTestContext(QObject *parent) : QObject(parent) | ||
| 11 | receiver.reset(new QMQTT::Client(targetHost)); | 11 | receiver.reset(new QMQTT::Client(targetHost)); |
| 12 | } | 12 | } |
| 13 | 13 | ||
| 14 | -void TwoClientTestContext::publishRetained(const QString &topic, const QByteArray &payload) | 14 | +void TwoClientTestContext::publish(const QString &topic, const QByteArray &payload, bool retain) |
| 15 | { | 15 | { |
| 16 | QMQTT::Message msg; | 16 | QMQTT::Message msg; |
| 17 | msg.setTopic(topic); | 17 | msg.setTopic(topic); |
| 18 | - msg.setRetain(true); | 18 | + msg.setRetain(retain); |
| 19 | msg.setQos(0); | 19 | msg.setQos(0); |
| 20 | msg.setPayload(payload); | 20 | msg.setPayload(payload); |
| 21 | sender->publish(msg); | 21 | sender->publish(msg); |
FlashMQTests/twoclienttestcontext.h
| @@ -17,7 +17,7 @@ private slots: | @@ -17,7 +17,7 @@ private slots: | ||
| 17 | 17 | ||
| 18 | public: | 18 | public: |
| 19 | explicit TwoClientTestContext(QObject *parent = nullptr); | 19 | explicit TwoClientTestContext(QObject *parent = nullptr); |
| 20 | - void publishRetained(const QString &topic, const QByteArray &payload); | 20 | + void publish(const QString &topic, const QByteArray &payload, bool retain = false); |
| 21 | void connectSender(); | 21 | void connectSender(); |
| 22 | void connectReceiver(); | 22 | void connectReceiver(); |
| 23 | void disconnectReceiver(); | 23 | void disconnectReceiver(); |
client.cpp
| @@ -81,7 +81,7 @@ void Client::writeMqttPacket(const MqttPacket &packet) | @@ -81,7 +81,7 @@ void Client::writeMqttPacket(const MqttPacket &packet) | ||
| 81 | { | 81 | { |
| 82 | std::lock_guard<std::mutex> locker(writeBufMutex); | 82 | std::lock_guard<std::mutex> locker(writeBufMutex); |
| 83 | 83 | ||
| 84 | - if (packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) | 84 | + while (packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) |
| 85 | { | 85 | { |
| 86 | if (packet.packetType == PacketType::PUBLISH && writebuf.getSize() >= CLIENT_MAX_BUFFER_SIZE) | 86 | if (packet.packetType == PacketType::PUBLISH && writebuf.getSize() >= CLIENT_MAX_BUFFER_SIZE) |
| 87 | return; | 87 | return; |
| @@ -99,6 +99,7 @@ void Client::writeMqttPacket(const MqttPacket &packet) | @@ -99,6 +99,7 @@ void Client::writeMqttPacket(const MqttPacket &packet) | ||
| 99 | while (len_left > 0) | 99 | while (len_left > 0) |
| 100 | { | 100 | { |
| 101 | const size_t len = std::min<int>(len_left, writebuf.maxWriteSize()); | 101 | const size_t len = std::min<int>(len_left, writebuf.maxWriteSize()); |
| 102 | + assert(len > 0); | ||
| 102 | std::memcpy(writebuf.headPtr(), &r.bytes[src_i], len); | 103 | std::memcpy(writebuf.headPtr(), &r.bytes[src_i], len); |
| 103 | writebuf.advanceHead(len); | 104 | writebuf.advanceHead(len); |
| 104 | src_i += len; | 105 | src_i += len; |
| @@ -113,6 +114,7 @@ void Client::writeMqttPacket(const MqttPacket &packet) | @@ -113,6 +114,7 @@ void Client::writeMqttPacket(const MqttPacket &packet) | ||
| 113 | while (len_left > 0) | 114 | while (len_left > 0) |
| 114 | { | 115 | { |
| 115 | const size_t len = std::min<int>(len_left, writebuf.maxWriteSize()); | 116 | const size_t len = std::min<int>(len_left, writebuf.maxWriteSize()); |
| 117 | + assert(len > 0); | ||
| 116 | std::memcpy(writebuf.headPtr(), &packet.getBites()[src_i], len); | 118 | std::memcpy(writebuf.headPtr(), &packet.getBites()[src_i], len); |
| 117 | writebuf.advanceHead(len); | 119 | writebuf.advanceHead(len); |
| 118 | src_i += len; | 120 | src_i += len; |
mqttpacket.cpp
| @@ -22,6 +22,7 @@ MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_lengt | @@ -22,6 +22,7 @@ MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_lengt | ||
| 22 | while (_packet_len > 0) | 22 | while (_packet_len > 0) |
| 23 | { | 23 | { |
| 24 | int readlen = std::min<int>(buf.maxReadSize(), _packet_len); | 24 | int readlen = std::min<int>(buf.maxReadSize(), _packet_len); |
| 25 | + assert(readlen > 0); | ||
| 25 | std::memcpy(&bites[i], buf.tailPtr(), readlen); | 26 | std::memcpy(&bites[i], buf.tailPtr(), readlen); |
| 26 | buf.advanceTail(readlen); | 27 | buf.advanceTail(readlen); |
| 27 | i += readlen; | 28 | i += readlen; |