Commit 4a16d48b7ec1a14d06eb1afe31a6d97192d5df27

Authored by Wiebe Cazemier
1 parent abe7fa16

Fix QoS reduction of queued messages

Also add a test to test all scenarios.

This also fixes session clean_start, which was never done since the
MQTT5 refactor to clean_start (vs clean session).
FlashMQTests/tst_maintests.cpp
@@ -133,6 +133,8 @@ private slots: @@ -133,6 +133,8 @@ private slots:
133 133
134 void testReceivingRetainedMessageWithQoS(); 134 void testReceivingRetainedMessageWithQoS();
135 135
  136 + void testQosDowngradeOnOfflineClients();
  137 +
136 }; 138 };
137 139
138 MainTests::MainTests() 140 MainTests::MainTests()
@@ -1750,6 +1752,66 @@ void MainTests::testReceivingRetainedMessageWithQoS() @@ -1750,6 +1752,66 @@ void MainTests::testReceivingRetainedMessageWithQoS()
1750 MYCASTCOMPARE(9, testCount); 1752 MYCASTCOMPARE(9, testCount);
1751 } 1753 }
1752 1754
  1755 +void MainTests::testQosDowngradeOnOfflineClients()
  1756 +{
  1757 + int testCount = 0;
  1758 +
  1759 + std::vector<std::string> subscribePaths {"topic1/FOOBAR", "+/+", "#"};
  1760 +
  1761 + for (char sendQos = 1; sendQos < 3; sendQos++)
  1762 + {
  1763 + for (char subscribeQos = 1; subscribeQos < 3; subscribeQos++)
  1764 + {
  1765 + for (const std::string &subscribePath : subscribePaths)
  1766 + {
  1767 + testCount++;
  1768 +
  1769 + // First start with clean_start to reset the session.
  1770 + std::unique_ptr<FlashMQTestClient> receiver = std::make_unique<FlashMQTestClient>();
  1771 + receiver->start();
  1772 + receiver->connectClient(ProtocolVersion::Mqtt5, true, 600, [](Connect &connect) {
  1773 + connect.clientid = "TheReceiver";
  1774 + });
  1775 + receiver->subscribe(subscribePath, subscribeQos);
  1776 + receiver->disconnect(ReasonCodes::Success);
  1777 + receiver.reset();
  1778 +
  1779 + const std::string payload = "We are testing";
  1780 +
  1781 + FlashMQTestClient sender;
  1782 + sender.start();
  1783 + sender.connectClient(ProtocolVersion::Mqtt311);
  1784 +
  1785 + Publish p1("topic1/FOOBAR", payload, sendQos);
  1786 +
  1787 + for (int i = 0; i < 10; i++)
  1788 + {
  1789 + sender.publish(p1);
  1790 + }
  1791 +
  1792 + // Now we connect again, and we should now pick up the existing session.
  1793 + receiver = std::make_unique<FlashMQTestClient>();
  1794 + receiver->start();
  1795 + receiver->connectClient(ProtocolVersion::Mqtt5, false, 600, [](Connect &connect) {
  1796 + connect.clientid = "TheReceiver";
  1797 + });
  1798 +
  1799 + receiver->waitForMessageCount(10);
  1800 +
  1801 + const char expQos = std::min<char>(sendQos, subscribeQos);
  1802 +
  1803 + MYCASTCOMPARE(receiver->receivedPublishes.size(), 10);
  1804 +
  1805 + QVERIFY(std::all_of(receiver->receivedPublishes.begin(), receiver->receivedPublishes.end(), [&](MqttPacket &pack) { return pack.getQos() == expQos;}));
  1806 + QVERIFY(std::all_of(receiver->receivedPublishes.begin(), receiver->receivedPublishes.end(), [&](MqttPacket &pack) { return pack.getTopic() == "topic1/FOOBAR";}));
  1807 + QVERIFY(std::all_of(receiver->receivedPublishes.begin(), receiver->receivedPublishes.end(), [&](MqttPacket &pack) { return pack.getPayloadCopy() == payload;}));
  1808 + }
  1809 + }
  1810 + }
  1811 +
  1812 + MYCASTCOMPARE(12, testCount);
  1813 +}
  1814 +
1753 int main(int argc, char *argv[]) 1815 int main(int argc, char *argv[])
1754 { 1816 {
1755 QCoreApplication app(argc, argv); 1817 QCoreApplication app(argc, argv);
publishcopyfactory.cpp
@@ -95,22 +95,36 @@ bool PublishCopyFactory::getRetain() const @@ -95,22 +95,36 @@ bool PublishCopyFactory::getRetain() const
95 return publish->retain; 95 return publish->retain;
96 } 96 }
97 97
98 -Publish PublishCopyFactory::getNewPublish() const 98 +/**
  99 + * @brief PublishCopyFactory::getNewPublish gets a new publish object from an existing packet or publish.
  100 + * @param new_max_qos
  101 + * @return
  102 + *
  103 + * It being a public function, the idea is that it's only needed for creating publish objects for storing QoS messages for off-line
  104 + * clients. For on-line clients, you're always making a packet (with getOptimumPacket()).
  105 + */
  106 +Publish PublishCopyFactory::getNewPublish(char new_max_qos) const
99 { 107 {
  108 + // (At time of writing) we only need to construct new publishes for QoS (because we're storing QoS publishes for offline clients). If
  109 + // you're doing it elsewhere, it's a bug.
  110 + assert(orgQos > 0);
  111 + assert(new_max_qos > 0);
  112 +
  113 + const char actualQos = getEffectiveQos(new_max_qos);
  114 +
100 if (packet) 115 if (packet)
101 { 116 {
102 assert(packet->getQos() > 0); 117 assert(packet->getQos() > 0);
103 - assert(orgQos > 0); // We only need to construct new publishes for QoS. If you're doing it elsewhere, it's a bug.  
104 118
105 Publish p(packet->getPublishData()); 119 Publish p(packet->getPublishData());
106 - p.qos = orgQos; 120 + p.qos = actualQos;
107 return p; 121 return p;
108 } 122 }
109 123
110 assert(publish->qos > 0); // Same check as above, but then for Publish objects. 124 assert(publish->qos > 0); // Same check as above, but then for Publish objects.
111 125
112 Publish p(*publish); 126 Publish p(*publish);
113 - p.qos = orgQos; 127 + p.qos = actualQos;
114 return p; 128 return p;
115 } 129 }
116 130
publishcopyfactory.h
@@ -34,7 +34,7 @@ public: @@ -34,7 +34,7 @@ public:
34 const std::string &getTopic() const; 34 const std::string &getTopic() const;
35 const std::vector<std::string> &getSubtopics(); 35 const std::vector<std::string> &getSubtopics();
36 bool getRetain() const; 36 bool getRetain() const;
37 - Publish getNewPublish() const; 37 + Publish getNewPublish(char new_max_qos) const;
38 std::shared_ptr<Client> getSender(); 38 std::shared_ptr<Client> getSender();
39 const std::vector<std::pair<std::string, std::string>> *getUserProperties() const; 39 const std::vector<std::pair<std::string, std::string>> *getUserProperties() const;
40 40
qospacketqueue.cpp
@@ -76,7 +76,7 @@ void QoSPublishQueue::queuePublish(PublishCopyFactory &amp;copyFactory, uint16_t id, @@ -76,7 +76,7 @@ void QoSPublishQueue::queuePublish(PublishCopyFactory &amp;copyFactory, uint16_t id,
76 assert(new_max_qos > 0); 76 assert(new_max_qos > 0);
77 assert(id > 0); 77 assert(id > 0);
78 78
79 - Publish pub = copyFactory.getNewPublish(); 79 + Publish pub = copyFactory.getNewPublish(new_max_qos);
80 queue.emplace_back(std::move(pub), id); 80 queue.emplace_back(std::move(pub), id);
81 qosQueueBytes += queue.back().getApproximateMemoryFootprint(); 81 qosQueueBytes += queue.back().getApproximateMemoryFootprint();
82 } 82 }
subscriptionstore.cpp
@@ -253,7 +253,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt; @@ -253,7 +253,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
253 } 253 }
254 } 254 }
255 255
256 - if (!session || session->getDestroyOnDisconnect()) 256 + if (!session || session->getDestroyOnDisconnect() || clean_start)
257 { 257 {
258 session = std::make_shared<Session>(); 258 session = std::make_shared<Session>();
259 259