Commit 4fcca5749bbd04aa711628c462ba51a43668b5a4

Authored by Wiebe Cazemier
1 parent 4526f0bd

Fix packet order in subscribing to topic with retained messages

The order packets would (mostly) arrive, is 'retained publish', then
'sub ack'. This is now fixed.

This also stablizes test_retained_changed(). The test was also
refactored to use the new test client.
FlashMQTests/tst_maintests.cpp
@@ -436,27 +436,31 @@ void MainTests::test_retained() @@ -436,27 +436,31 @@ void MainTests::test_retained()
436 436
437 void MainTests::test_retained_changed() 437 void MainTests::test_retained_changed()
438 { 438 {
439 - TwoClientTestContext testContext; 439 + FlashMQTestClient sender;
  440 + sender.start();
  441 + sender.connectClient(ProtocolVersion::Mqtt311);
440 442
441 - QByteArray payload = "We are testing";  
442 - QString topic = "retaintopic"; 443 + const std::string topic = "retaintopic";
443 444
444 - testContext.connectSender();  
445 - testContext.publish(topic, payload, true); 445 + Publish p(topic, "We are testing", 0);
  446 + p.retain = true;
  447 + sender.publish(p);
446 448
447 - payload = "Changed payload"; 449 + p.payload = "Changed payload";
  450 + sender.publish(p);
448 451
449 - testContext.publish(topic, payload, true); 452 + FlashMQTestClient receiver;
  453 + receiver.start();
  454 + receiver.connectClient(ProtocolVersion::Mqtt5);
  455 + receiver.subscribe(topic, 0);
450 456
451 - testContext.connectReceiver();  
452 - testContext.subscribeReceiver(topic);  
453 - testContext.waitReceiverReceived(1); 457 + receiver.waitForMessageCount(1);
454 458
455 - QCOMPARE(testContext.receivedMessages.count(), 1); 459 + MYCASTCOMPARE(receiver.receivedPublishes.size(), 1);
456 460
457 - QMQTT::Message msg = testContext.receivedMessages.first();  
458 - QCOMPARE(msg.payload(), payload);  
459 - QVERIFY(msg.retain()); 461 + MqttPacket &pack = receiver.receivedPublishes.front();
  462 + QCOMPARE(pack.getPayloadCopy(), p.payload);
  463 + QVERIFY(pack.getRetain());
460 } 464 }
461 465
462 void MainTests::test_retained_removed() 466 void MainTests::test_retained_removed()
mqttpacket.cpp
@@ -1011,6 +1011,8 @@ void MqttPacket::handleSubscribe() @@ -1011,6 +1011,8 @@ void MqttPacket::handleSubscribe()
1011 1011
1012 Authentication &authentication = *ThreadGlobals::getAuth(); 1012 Authentication &authentication = *ThreadGlobals::getAuth();
1013 1013
  1014 + std::forward_list<SubscriptionTuple> deferredSubscribes;
  1015 +
1014 std::list<ReasonCodes> subs_reponse_codes; 1016 std::list<ReasonCodes> subs_reponse_codes;
1015 while (remainingAfterPos() > 0) 1017 while (remainingAfterPos() > 0)
1016 { 1018 {
@@ -1031,8 +1033,7 @@ void MqttPacket::handleSubscribe() @@ -1031,8 +1033,7 @@ void MqttPacket::handleSubscribe()
1031 splitTopic(topic, subtopics); 1033 splitTopic(topic, subtopics);
1032 if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, subtopics, AclAccess::subscribe, qos, false, getUserProperties()) == AuthResult::success) 1034 if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, subtopics, AclAccess::subscribe, qos, false, getUserProperties()) == AuthResult::success)
1033 { 1035 {
1034 - logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribed to '%s' QoS %d", sender->repr().c_str(), topic.c_str(), qos);  
1035 - MainApp::getMainApp()->getSubscriptionStore()->addSubscription(sender, topic, subtopics, qos); 1036 + deferredSubscribes.emplace_front(topic, subtopics, qos);
1036 subs_reponse_codes.push_back(static_cast<ReasonCodes>(qos)); 1037 subs_reponse_codes.push_back(static_cast<ReasonCodes>(qos));
1037 } 1038 }
1038 else 1039 else
@@ -1054,6 +1055,13 @@ void MqttPacket::handleSubscribe() @@ -1054,6 +1055,13 @@ void MqttPacket::handleSubscribe()
1054 SubAck subAck(this->protocolVersion, packet_id, subs_reponse_codes); 1055 SubAck subAck(this->protocolVersion, packet_id, subs_reponse_codes);
1055 MqttPacket response(subAck); 1056 MqttPacket response(subAck);
1056 sender->writeMqttPacket(response); 1057 sender->writeMqttPacket(response);
  1058 +
  1059 + // Adding the subscription will also send publishes for retained messages, so that's why we're doing it at the end.
  1060 + for(const SubscriptionTuple &tup : deferredSubscribes)
  1061 + {
  1062 + logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribed to '%s' QoS %d", sender->repr().c_str(), tup.topic.c_str(), tup.qos);
  1063 + MainApp::getMainApp()->getSubscriptionStore()->addSubscription(sender, tup.topic, tup.subtopics, tup.qos);
  1064 + }
1057 } 1065 }
1058 1066
1059 void MqttPacket::handleUnsubscribe() 1067 void MqttPacket::handleUnsubscribe()
@@ -1804,6 +1812,14 @@ void MqttPacket::readIntoBuf(CirBuf &amp;buf) const @@ -1804,6 +1812,14 @@ void MqttPacket::readIntoBuf(CirBuf &amp;buf) const
1804 buf.write(bites.data(), bites.size()); 1812 buf.write(bites.data(), bites.size());
1805 } 1813 }
1806 1814
  1815 +SubscriptionTuple::SubscriptionTuple(const std::string &topic, const std::vector<std::string> &subtopics, char qos) :
  1816 + topic(topic),
  1817 + subtopics(subtopics),
  1818 + qos(qos)
  1819 +{
  1820 +
  1821 +}
  1822 +
1807 1823
1808 1824
1809 1825
mqttpacket.h
@@ -163,4 +163,13 @@ public: @@ -163,4 +163,13 @@ public:
163 const std::vector<std::pair<std::string, std::string>> *getUserProperties() const; 163 const std::vector<std::pair<std::string, std::string>> *getUserProperties() const;
164 }; 164 };
165 165
  166 +struct SubscriptionTuple
  167 +{
  168 + const std::string topic;
  169 + const std::vector<std::string> subtopics;
  170 + const char qos;
  171 +
  172 + SubscriptionTuple(const std::string &topic, const std::vector<std::string> &subtopics, char qos);
  173 +};
  174 +
166 #endif // MQTTPACKET_H 175 #endif // MQTTPACKET_H