Commit d594f67b12c1b42fc71bb8b480506dda53cf3c66

Authored by Wiebe Cazemier
1 parent c986edcf

Fix QoS reduction when one subscriber has lower QoS level

Was caught by refactor testNotMessingUpQosLevels away from QMQTT, which
didn't trip on this bug.
FlashMQTests/tst_maintests.cpp
... ... @@ -1296,63 +1296,67 @@ void MainTests::testDowngradeQoSOnSubscribeQos0to0()
1296 1296 */
1297 1297 void MainTests::testNotMessingUpQosLevels()
1298 1298 {
1299   - const QString topic = "HK7c1MFu6kdT69fWY";
1300   - const QByteArray payload = "M4XK2LZ2Smaazba8RobZOgoe6CENxCll";
1301   -
1302   - TwoClientTestContext testContextSender;
1303   - TwoClientTestContext testContextReceiver1(1);
1304   - TwoClientTestContext testContextReceiver2(2);
1305   - TwoClientTestContext testContextReceiver3(3);
1306   - TwoClientTestContext testContextReceiver4(4);
1307   - TwoClientTestContext testContextReceiver5(5);
1308   -
1309   - testContextReceiver1.connectReceiver();
1310   - testContextReceiver1.subscribeReceiver(topic, 0);
1311   -
1312   - testContextReceiver2.connectReceiver();
1313   - testContextReceiver2.subscribeReceiver(topic, 1);
1314   -
1315   - testContextReceiver3.connectReceiver();
1316   - testContextReceiver3.subscribeReceiver(topic, 2);
1317   -
1318   - testContextReceiver4.connectReceiver();
1319   - testContextReceiver4.subscribeReceiver(topic, 1);
1320   -
1321   - testContextReceiver5.connectReceiver();
1322   - testContextReceiver5.subscribeReceiver(topic, 0);
1323   -
1324   - testContextSender.connectSender();
1325   - testContextSender.publish(topic, payload, 2, false);
1326   -
1327   - testContextReceiver1.waitReceiverReceived(1);
1328   - testContextReceiver2.waitReceiverReceived(1);
1329   - testContextReceiver3.waitReceiverReceived(1);
1330   - testContextReceiver4.waitReceiverReceived(1);
1331   - testContextReceiver5.waitReceiverReceived(1);
1332   -
1333   - QCOMPARE(testContextReceiver1.receivedMessages.count(), 1);
1334   - QCOMPARE(testContextReceiver2.receivedMessages.count(), 1);
1335   - QCOMPARE(testContextReceiver3.receivedMessages.count(), 1);
1336   - QCOMPARE(testContextReceiver4.receivedMessages.count(), 1);
1337   - QCOMPARE(testContextReceiver5.receivedMessages.count(), 1);
1338   -
1339   - QCOMPARE(testContextReceiver1.receivedMessages.first().qos(), 0);
1340   - QCOMPARE(testContextReceiver2.receivedMessages.first().qos(), 1);
1341   - QCOMPARE(testContextReceiver3.receivedMessages.first().qos(), 2);
1342   - QCOMPARE(testContextReceiver4.receivedMessages.first().qos(), 1);
1343   - QCOMPARE(testContextReceiver5.receivedMessages.first().qos(), 0);
1344   -
1345   - QCOMPARE(testContextReceiver1.receivedMessages.first().payload(), payload);
1346   - QCOMPARE(testContextReceiver2.receivedMessages.first().payload(), payload);
1347   - QCOMPARE(testContextReceiver3.receivedMessages.first().payload(), payload);
1348   - QCOMPARE(testContextReceiver4.receivedMessages.first().payload(), payload);
1349   - QCOMPARE(testContextReceiver5.receivedMessages.first().payload(), payload);
1350   -
1351   - QCOMPARE(testContextReceiver1.receivedMessages.first().id(), 0);
1352   - QCOMPARE(testContextReceiver2.receivedMessages.first().id(), 1);
1353   - QCOMPARE(testContextReceiver3.receivedMessages.first().id(), 1);
1354   - QCOMPARE(testContextReceiver4.receivedMessages.first().id(), 1);
1355   - QCOMPARE(testContextReceiver5.receivedMessages.first().id(), 0);
  1299 + const std::string topic = "HK7c1MFu6kdT69fWY";
  1300 + const std::string payload = "M4XK2LZ2Smaazba8RobZOgoe6CENxCll";
  1301 +
  1302 + FlashMQTestClient testContextSender;
  1303 + FlashMQTestClient testContextReceiver1;
  1304 + FlashMQTestClient testContextReceiver2;
  1305 + FlashMQTestClient testContextReceiver3;
  1306 + FlashMQTestClient testContextReceiver4;
  1307 + FlashMQTestClient testContextReceiver5;
  1308 +
  1309 + testContextReceiver1.start();
  1310 + testContextReceiver1.connectClient(ProtocolVersion::Mqtt311);
  1311 + testContextReceiver1.subscribe(topic, 0);
  1312 +
  1313 + testContextReceiver2.start();
  1314 + testContextReceiver2.connectClient(ProtocolVersion::Mqtt311);
  1315 + testContextReceiver2.subscribe(topic, 1);
  1316 +
  1317 + testContextReceiver3.start();
  1318 + testContextReceiver3.connectClient(ProtocolVersion::Mqtt311);
  1319 + testContextReceiver3.subscribe(topic, 2);
  1320 +
  1321 + testContextReceiver4.start();
  1322 + testContextReceiver4.connectClient(ProtocolVersion::Mqtt311);
  1323 + testContextReceiver4.subscribe(topic, 1);
  1324 +
  1325 + testContextReceiver5.start();
  1326 + testContextReceiver5.connectClient(ProtocolVersion::Mqtt311);
  1327 + testContextReceiver5.subscribe(topic, 0);
  1328 +
  1329 + testContextSender.start();
  1330 + testContextSender.connectClient(ProtocolVersion::Mqtt311);
  1331 + testContextSender.publish(topic, payload, 2);
  1332 +
  1333 + testContextReceiver1.waitForMessageCount(1);
  1334 + testContextReceiver2.waitForMessageCount(1);
  1335 + testContextReceiver3.waitForMessageCount(1);
  1336 + testContextReceiver4.waitForMessageCount(1);
  1337 + testContextReceiver5.waitForMessageCount(1);
  1338 +
  1339 + MYCASTCOMPARE(testContextReceiver1.receivedPublishes.size(), 1);
  1340 + MYCASTCOMPARE(testContextReceiver2.receivedPublishes.size(), 1);
  1341 + MYCASTCOMPARE(testContextReceiver3.receivedPublishes.size(), 1);
  1342 + MYCASTCOMPARE(testContextReceiver4.receivedPublishes.size(), 1);
  1343 + MYCASTCOMPARE(testContextReceiver5.receivedPublishes.size(), 1);
  1344 +
  1345 + QCOMPARE(testContextReceiver1.receivedPublishes.front().getQos(), 0);
  1346 + QCOMPARE(testContextReceiver2.receivedPublishes.front().getQos(), 1);
  1347 + QCOMPARE(testContextReceiver3.receivedPublishes.front().getQos(), 2);
  1348 + QCOMPARE(testContextReceiver4.receivedPublishes.front().getQos(), 1);
  1349 + QCOMPARE(testContextReceiver5.receivedPublishes.front().getQos(), 0);
  1350 +
  1351 + QCOMPARE(testContextReceiver1.receivedPublishes.front().getPayloadCopy(), payload);
  1352 + QCOMPARE(testContextReceiver2.receivedPublishes.front().getPayloadCopy(), payload);
  1353 + QCOMPARE(testContextReceiver3.receivedPublishes.front().getPayloadCopy(), payload);
  1354 + QCOMPARE(testContextReceiver4.receivedPublishes.front().getPayloadCopy(), payload);
  1355 + QCOMPARE(testContextReceiver5.receivedPublishes.front().getPayloadCopy(), payload);
  1356 +
  1357 + QCOMPARE(testContextReceiver2.receivedPublishes.front().getPacketId(), 1);
  1358 + QCOMPARE(testContextReceiver3.receivedPublishes.front().getPacketId(), 1);
  1359 + QCOMPARE(testContextReceiver4.receivedPublishes.front().getPacketId(), 1);
1356 1360 }
1357 1361  
1358 1362 void MainTests::testUnSubscribe()
... ...
mqttpacket.cpp
... ... @@ -1257,6 +1257,7 @@ void MqttPacket::handlePublish()
1257 1257  
1258 1258 // Working with a local copy because the subscribing action will modify this->packet_id. See the PublishCopyFactory.
1259 1259 const uint16_t _packet_id = this->packet_id;
  1260 + const char _qos = this->publishData.qos;
1260 1261  
1261 1262 if (publishData.qos == 2 && sender->getSession()->incomingQoS2MessageIdInTransit(_packet_id))
1262 1263 {
... ... @@ -1291,13 +1292,14 @@ void MqttPacket::handlePublish()
1291 1292 }
1292 1293  
1293 1294 #ifndef NDEBUG
1294   - // Protection against using the altered packet id.
  1295 + // Protection against using the altered packet id (because we change the incoming byte array for each subscriber).
1295 1296 this->packet_id = 0;
  1297 + this->publishData.qos = 0;
1296 1298 #endif
1297 1299  
1298   - if (publishData.qos > 0)
  1300 + if (_qos > 0)
1299 1301 {
1300   - const PacketType responseType = publishData.qos == 1 ? PacketType::PUBACK : PacketType::PUBREC;
  1302 + const PacketType responseType = _qos == 1 ? PacketType::PUBACK : PacketType::PUBREC;
1301 1303 PubResponse pubAck(this->protocolVersion, responseType, ackCode, _packet_id);
1302 1304 MqttPacket response(pubAck);
1303 1305 sender->writeMqttPacket(response);
... ...