Commit 0759ed08a9852e15e1cc317519d6e0ef33e4ba21
1 parent
a7ba40e8
Convert testUnSubscribe to native test client
Also fix sending an UNSUBACK instead of SUBACK on unsubscribe.
Showing
7 changed files
with
113 additions
and
29 deletions
FlashMQTests/tst_maintests.cpp
| @@ -1420,53 +1420,57 @@ void MainTests::testNotMessingUpQosLevels() | @@ -1420,53 +1420,57 @@ void MainTests::testNotMessingUpQosLevels() | ||
| 1420 | 1420 | ||
| 1421 | void MainTests::testUnSubscribe() | 1421 | void MainTests::testUnSubscribe() |
| 1422 | { | 1422 | { |
| 1423 | - TwoClientTestContext testContext; | 1423 | + FlashMQTestClient sender; |
| 1424 | + FlashMQTestClient receiver; | ||
| 1425 | + | ||
| 1426 | + sender.start(); | ||
| 1427 | + sender.connectClient(ProtocolVersion::Mqtt311); | ||
| 1424 | 1428 | ||
| 1425 | - testContext.connectSender(); | ||
| 1426 | - testContext.connectReceiver(); | 1429 | + receiver.start(); |
| 1430 | + receiver.connectClient(ProtocolVersion::Mqtt311); | ||
| 1427 | 1431 | ||
| 1428 | - testContext.subscribeReceiver("Rebecca/Bunch", 2); | ||
| 1429 | - testContext.subscribeReceiver("Josh/Chan", 1); | ||
| 1430 | - testContext.subscribeReceiver("White/Josh", 1); | 1432 | + receiver.subscribe("Rebecca/Bunch", 2); |
| 1433 | + receiver.subscribe("Josh/Chan", 1); | ||
| 1434 | + receiver.subscribe("White/Josh", 1); | ||
| 1431 | 1435 | ||
| 1432 | - testContext.publish("Rebecca/Bunch", "Bunch here", 2); | ||
| 1433 | - testContext.publish("White/Josh", "Anteater", 2); | ||
| 1434 | - testContext.publish("Josh/Chan", "Human flip-flop", 2); | 1436 | + sender.publish("Rebecca/Bunch", "Bunch here", 2); |
| 1437 | + sender.publish("White/Josh", "Anteater", 2); | ||
| 1438 | + sender.publish("Josh/Chan", "Human flip-flop", 2); | ||
| 1435 | 1439 | ||
| 1436 | - testContext.waitReceiverReceived(3); | 1440 | + receiver.waitForMessageCount(3); |
| 1437 | 1441 | ||
| 1438 | - QVERIFY(std::any_of(testContext.receivedMessages.begin(), testContext.receivedMessages.end(), [](const QMQTT::Message &msg) { | ||
| 1439 | - return msg.payload() == "Bunch here" && msg.topic() == "Rebecca/Bunch"; | 1442 | + QVERIFY(std::any_of(receiver.receivedPublishes.begin(), receiver.receivedPublishes.end(), [](const MqttPacket &pack) { |
| 1443 | + return pack.getPayloadCopy() == "Bunch here" && pack.getTopic() == "Rebecca/Bunch"; | ||
| 1440 | })); | 1444 | })); |
| 1441 | 1445 | ||
| 1442 | - QVERIFY(std::any_of(testContext.receivedMessages.begin(), testContext.receivedMessages.end(), [](const QMQTT::Message &msg) { | ||
| 1443 | - return msg.payload() == "Anteater" && msg.topic() == "White/Josh"; | 1446 | + QVERIFY(std::any_of(receiver.receivedPublishes.begin(), receiver.receivedPublishes.end(), [](const MqttPacket &pack) { |
| 1447 | + return pack.getPayloadCopy() == "Anteater" && pack.getTopic() == "White/Josh"; | ||
| 1444 | })); | 1448 | })); |
| 1445 | 1449 | ||
| 1446 | - QVERIFY(std::any_of(testContext.receivedMessages.begin(), testContext.receivedMessages.end(), [](const QMQTT::Message &msg) { | ||
| 1447 | - return msg.payload() == "Human flip-flop" && msg.topic() == "Josh/Chan"; | 1450 | + QVERIFY(std::any_of(receiver.receivedPublishes.begin(), receiver.receivedPublishes.end(), [](const MqttPacket &pack) { |
| 1451 | + return pack.getPayloadCopy() == "Human flip-flop" && pack.getTopic() == "Josh/Chan"; | ||
| 1448 | })); | 1452 | })); |
| 1449 | 1453 | ||
| 1450 | - QCOMPARE(testContext.receivedMessages.count(), 3); | 1454 | + MYCASTCOMPARE(receiver.receivedPublishes.size(), 3); |
| 1451 | 1455 | ||
| 1452 | - testContext.receivedMessages.clear(); | 1456 | + receiver.clearReceivedLists(); |
| 1453 | 1457 | ||
| 1454 | - testContext.unsubscribeReceiver("Josh/Chan"); | 1458 | + receiver.unsubscribe("Josh/Chan"); |
| 1455 | 1459 | ||
| 1456 | - testContext.publish("Rebecca/Bunch", "Bunch here", 2); | ||
| 1457 | - testContext.publish("White/Josh", "Anteater", 2); | ||
| 1458 | - testContext.publish("Josh/Chan", "Human flip-flop", 2); | 1460 | + sender.publish("Rebecca/Bunch", "Bunch here", 2); |
| 1461 | + sender.publish("White/Josh", "Anteater", 2); | ||
| 1462 | + sender.publish("Josh/Chan", "Human flip-flop", 2); | ||
| 1459 | 1463 | ||
| 1460 | - testContext.waitReceiverReceived(2); | 1464 | + receiver.waitForMessageCount(2); |
| 1461 | 1465 | ||
| 1462 | - QCOMPARE(testContext.receivedMessages.count(), 2); | 1466 | + MYCASTCOMPARE(receiver.receivedPublishes.size(), 2); |
| 1463 | 1467 | ||
| 1464 | - QVERIFY(std::any_of(testContext.receivedMessages.begin(), testContext.receivedMessages.end(), [](const QMQTT::Message &msg) { | ||
| 1465 | - return msg.payload() == "Bunch here" && msg.topic() == "Rebecca/Bunch"; | 1468 | + QVERIFY(std::any_of(receiver.receivedPublishes.begin(), receiver.receivedPublishes.end(), [](const MqttPacket &pack) { |
| 1469 | + return pack.getPayloadCopy() == "Bunch here" && pack.getTopic() == "Rebecca/Bunch"; | ||
| 1466 | })); | 1470 | })); |
| 1467 | 1471 | ||
| 1468 | - QVERIFY(std::any_of(testContext.receivedMessages.begin(), testContext.receivedMessages.end(), [](const QMQTT::Message &msg) { | ||
| 1469 | - return msg.payload() == "Anteater" && msg.topic() == "White/Josh"; | 1472 | + QVERIFY(std::any_of(receiver.receivedPublishes.begin(), receiver.receivedPublishes.end(), [](const MqttPacket &pack) { |
| 1473 | + return pack.getPayloadCopy() == "Anteater" && pack.getTopic() == "White/Josh"; | ||
| 1470 | })); | 1474 | })); |
| 1471 | } | 1475 | } |
| 1472 | 1476 |
flashmqtestclient.cpp
| @@ -195,6 +195,23 @@ void FlashMQTestClient::subscribe(const std::string topic, char qos) | @@ -195,6 +195,23 @@ void FlashMQTestClient::subscribe(const std::string topic, char qos) | ||
| 195 | } | 195 | } |
| 196 | } | 196 | } |
| 197 | 197 | ||
| 198 | +void FlashMQTestClient::unsubscribe(const std::string &topic) | ||
| 199 | +{ | ||
| 200 | + clearReceivedLists(); | ||
| 201 | + | ||
| 202 | + const uint16_t packet_id = 66; | ||
| 203 | + | ||
| 204 | + Unsubscribe unsub(client->getProtocolVersion(), packet_id, topic); | ||
| 205 | + MqttPacket unsubPack(unsub); | ||
| 206 | + client->writeMqttPacketAndBlameThisClient(unsubPack); | ||
| 207 | + | ||
| 208 | + waitForCondition([&]() { | ||
| 209 | + return !this->receivedPackets.empty() && this->receivedPackets.front().packetType == PacketType::UNSUBACK; | ||
| 210 | + }); | ||
| 211 | + | ||
| 212 | + // TODO: parse the UNSUBACK and check reason codes. | ||
| 213 | +} | ||
| 214 | + | ||
| 198 | void FlashMQTestClient::publish(Publish &pub) | 215 | void FlashMQTestClient::publish(Publish &pub) |
| 199 | { | 216 | { |
| 200 | clearReceivedLists(); | 217 | clearReceivedLists(); |
flashmqtestclient.h
| @@ -37,6 +37,7 @@ public: | @@ -37,6 +37,7 @@ public: | ||
| 37 | void connectClient(ProtocolVersion protocolVersion, bool clean_start, uint32_t session_expiry_interval); | 37 | void connectClient(ProtocolVersion protocolVersion, bool clean_start, uint32_t session_expiry_interval); |
| 38 | void connectClient(ProtocolVersion protocolVersion, bool clean_start, uint32_t session_expiry_interval, std::function<void(Connect&)> manipulateConnect); | 38 | void connectClient(ProtocolVersion protocolVersion, bool clean_start, uint32_t session_expiry_interval, std::function<void(Connect&)> manipulateConnect); |
| 39 | void subscribe(const std::string topic, char qos); | 39 | void subscribe(const std::string topic, char qos); |
| 40 | + void unsubscribe(const std::string &topic); | ||
| 40 | void publish(const std::string &topic, const std::string &payload, char qos); | 41 | void publish(const std::string &topic, const std::string &payload, char qos); |
| 41 | void publish(Publish &pub); | 42 | void publish(Publish &pub); |
| 42 | void clearReceivedLists(); | 43 | void clearReceivedLists(); |
mqttpacket.cpp
| @@ -93,7 +93,7 @@ MqttPacket::MqttPacket(const SubAck &subAck) : | @@ -93,7 +93,7 @@ MqttPacket::MqttPacket(const SubAck &subAck) : | ||
| 93 | MqttPacket::MqttPacket(const UnsubAck &unsubAck) : | 93 | MqttPacket::MqttPacket(const UnsubAck &unsubAck) : |
| 94 | bites(unsubAck.getLengthWithoutFixedHeader()) | 94 | bites(unsubAck.getLengthWithoutFixedHeader()) |
| 95 | { | 95 | { |
| 96 | - packetType = PacketType::SUBACK; | 96 | + packetType = PacketType::UNSUBACK; |
| 97 | first_byte = static_cast<char>(packetType) << 4; | 97 | first_byte = static_cast<char>(packetType) << 4; |
| 98 | writeUint16(unsubAck.packet_id); | 98 | writeUint16(unsubAck.packet_id); |
| 99 | 99 | ||
| @@ -311,6 +311,29 @@ MqttPacket::MqttPacket(const Subscribe &subscribe) : | @@ -311,6 +311,29 @@ MqttPacket::MqttPacket(const Subscribe &subscribe) : | ||
| 311 | calculateRemainingLength(); | 311 | calculateRemainingLength(); |
| 312 | } | 312 | } |
| 313 | 313 | ||
| 314 | +MqttPacket::MqttPacket(const Unsubscribe &unsubscribe) : | ||
| 315 | + bites(unsubscribe.getLengthWithoutFixedHeader()), | ||
| 316 | + packetType(PacketType::UNSUBSCRIBE) | ||
| 317 | +{ | ||
| 318 | +#ifndef TESTING | ||
| 319 | + throw NotImplementedException("Code is only for testing."); | ||
| 320 | +#endif | ||
| 321 | + | ||
| 322 | + first_byte = static_cast<char>(packetType) << 4; | ||
| 323 | + first_byte |= 2; // required reserved bit | ||
| 324 | + | ||
| 325 | + writeUint16(unsubscribe.packetId); | ||
| 326 | + | ||
| 327 | + if (unsubscribe.protocolVersion >= ProtocolVersion::Mqtt5) | ||
| 328 | + { | ||
| 329 | + writeProperties(unsubscribe.propertyBuilder); | ||
| 330 | + } | ||
| 331 | + | ||
| 332 | + writeString(unsubscribe.topic); | ||
| 333 | + | ||
| 334 | + calculateRemainingLength(); | ||
| 335 | +} | ||
| 336 | + | ||
| 314 | void MqttPacket::bufferToMqttPackets(CirBuf &buf, std::vector<MqttPacket> &packetQueueIn, std::shared_ptr<Client> &sender) | 337 | void MqttPacket::bufferToMqttPackets(CirBuf &buf, std::vector<MqttPacket> &packetQueueIn, std::shared_ptr<Client> &sender) |
| 315 | { | 338 | { |
| 316 | while (buf.usedBytes() >= MQTT_HEADER_LENGH) | 339 | while (buf.usedBytes() >= MQTT_HEADER_LENGH) |
mqttpacket.h
| @@ -116,6 +116,7 @@ public: | @@ -116,6 +116,7 @@ public: | ||
| 116 | MqttPacket(const Auth &auth); | 116 | MqttPacket(const Auth &auth); |
| 117 | MqttPacket(const Connect &connect); | 117 | MqttPacket(const Connect &connect); |
| 118 | MqttPacket(const Subscribe &subscribe); | 118 | MqttPacket(const Subscribe &subscribe); |
| 119 | + MqttPacket(const Unsubscribe &unsubscribe); | ||
| 119 | 120 | ||
| 120 | static void bufferToMqttPackets(CirBuf &buf, std::vector<MqttPacket> &packetQueueIn, std::shared_ptr<Client> &sender); | 121 | static void bufferToMqttPackets(CirBuf &buf, std::vector<MqttPacket> &packetQueueIn, std::shared_ptr<Client> &sender); |
| 121 | 122 |
types.cpp
| @@ -439,3 +439,25 @@ size_t Subscribe::getLengthWithoutFixedHeader() const | @@ -439,3 +439,25 @@ size_t Subscribe::getLengthWithoutFixedHeader() const | ||
| 439 | 439 | ||
| 440 | return result; | 440 | return result; |
| 441 | } | 441 | } |
| 442 | + | ||
| 443 | +Unsubscribe::Unsubscribe(const ProtocolVersion protocolVersion, uint16_t packetId, const std::string &topic) : | ||
| 444 | + protocolVersion(protocolVersion), | ||
| 445 | + packetId(packetId), | ||
| 446 | + topic(topic) | ||
| 447 | +{ | ||
| 448 | + | ||
| 449 | +} | ||
| 450 | + | ||
| 451 | +size_t Unsubscribe::getLengthWithoutFixedHeader() const | ||
| 452 | +{ | ||
| 453 | + size_t result = topic.size() + 2; | ||
| 454 | + result += 2; // packet id | ||
| 455 | + | ||
| 456 | + if (this->protocolVersion >= ProtocolVersion::Mqtt5) | ||
| 457 | + { | ||
| 458 | + const size_t proplen = propertyBuilder ? propertyBuilder->getLength() : 1; | ||
| 459 | + result += proplen; | ||
| 460 | + } | ||
| 461 | + | ||
| 462 | + return result; | ||
| 463 | +} |
types.h
| @@ -315,4 +315,20 @@ struct Subscribe | @@ -315,4 +315,20 @@ struct Subscribe | ||
| 315 | size_t getLengthWithoutFixedHeader() const; | 315 | size_t getLengthWithoutFixedHeader() const; |
| 316 | }; | 316 | }; |
| 317 | 317 | ||
| 318 | +/** | ||
| 319 | + * @brief The Unsubscribe struct can be used to construct a mqtt packet of type 'unsubscribe'. | ||
| 320 | + * | ||
| 321 | + * It's rudimentary. Offically you can unsubscribe to multiple topics at once, but I have no need for that. | ||
| 322 | + */ | ||
| 323 | +struct Unsubscribe | ||
| 324 | +{ | ||
| 325 | + const ProtocolVersion protocolVersion; | ||
| 326 | + uint16_t packetId; | ||
| 327 | + std::string topic; | ||
| 328 | + std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder; | ||
| 329 | + | ||
| 330 | + Unsubscribe(const ProtocolVersion protocolVersion, uint16_t packetId, const std::string &topic); | ||
| 331 | + size_t getLengthWithoutFixedHeader() const; | ||
| 332 | +}; | ||
| 333 | + | ||
| 318 | #endif // TYPES_H | 334 | #endif // TYPES_H |