From 27b65bef85f713382f55dab4e30148438ac6ab4a Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Wed, 29 Jun 2022 20:09:58 +0200 Subject: [PATCH] Add tests for will --- FlashMQTests/tst_maintests.cpp | 151 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ flashmqtestclient.cpp | 31 +++++++++++++++++++++++++++---- flashmqtestclient.h | 8 ++++++-- mainapp.cpp | 6 +++++- mqtt5properties.cpp | 5 +++++ mqtt5properties.h | 1 + mqttpacket.cpp | 38 ++++++++++++++++++++++++++++++++++---- types.cpp | 28 +++++++++++++++++++++++++--- types.h | 4 ++++ 9 files changed, 258 insertions(+), 14 deletions(-) diff --git a/FlashMQTests/tst_maintests.cpp b/FlashMQTests/tst_maintests.cpp index 5c40916..208ced9 100644 --- a/FlashMQTests/tst_maintests.cpp +++ b/FlashMQTests/tst_maintests.cpp @@ -122,6 +122,12 @@ private slots: void testBasicsWithFlashMQTestClient(); + void testMqtt3will(); + void testMqtt3NoWillOnDisconnect(); + void testMqtt5NoWillOnDisconnect(); + void testMqtt5DelayedWill(); + void testMqtt5DelayedWillAlwaysOnSessionEnd(); + }; MainTests::MainTests() @@ -1424,6 +1430,151 @@ void MainTests::testBasicsWithFlashMQTestClient() } +void MainTests::testMqtt3will() +{ + std::unique_ptr sender = std::make_unique(); + sender->start(); + std::shared_ptr will = std::make_shared(); + will->topic = "my/will"; + will->payload = "mypayload"; + sender->setWill(will); + sender->connectClient(ProtocolVersion::Mqtt311); + + FlashMQTestClient receiver; + receiver.start(); + receiver.connectClient(ProtocolVersion::Mqtt311); + receiver.subscribe("my/will", 0); + + sender.reset(); + + receiver.waitForMessageCount(1); + + MqttPacket pubPack = receiver.receivedPublishes.front(); + pubPack.parsePublishData(); + + QCOMPARE(pubPack.getPublishData().topic, "my/will"); + QCOMPARE(pubPack.getPublishData().payload, "mypayload"); + QCOMPARE(pubPack.getPublishData().qos, 0); +} + +void MainTests::testMqtt3NoWillOnDisconnect() +{ + std::unique_ptr sender = std::make_unique(); + sender->start(); + std::shared_ptr will = std::make_shared(); + will->topic = "my/will/testMqtt3NoWillOnDisconnect"; + will->payload = "mypayload"; + sender->setWill(will); + sender->connectClient(ProtocolVersion::Mqtt311); + + FlashMQTestClient receiver; + receiver.start(); + receiver.connectClient(ProtocolVersion::Mqtt311); + receiver.subscribe("my/will/testMqtt3NoWillOnDisconnect", 0); + + receiver.clearReceivedLists(); + + sender->disconnect(ReasonCodes::Success); + sender.reset(); + + usleep(250000); + + QVERIFY(receiver.receivedPackets.empty()); +} + +void MainTests::testMqtt5NoWillOnDisconnect() +{ + std::unique_ptr sender = std::make_unique(); + sender->start(); + std::shared_ptr will = std::make_shared(); + will->topic = "my/will/testMqtt5NoWillOnDisconnect"; + will->payload = "mypayload"; + sender->setWill(will); + sender->connectClient(ProtocolVersion::Mqtt5); + + FlashMQTestClient receiver; + receiver.start(); + receiver.connectClient(ProtocolVersion::Mqtt5); + receiver.subscribe("my/will/testMqtt3NoWillOnDisconnect", 0); + + receiver.clearReceivedLists(); + + sender->disconnect(ReasonCodes::Success); + sender.reset(); + + usleep(250000); + + QVERIFY(receiver.receivedPackets.empty()); +} + +void MainTests::testMqtt5DelayedWill() +{ + std::unique_ptr sender = std::make_unique(); + sender->start(); + std::shared_ptr will = std::make_shared(); + will->topic = "my/will/testMqtt5DelayedWill"; + will->payload = "mypayload"; + will->constructPropertyBuilder(); + will->propertyBuilder->writeWillDelay(2); + sender->setWill(will); + sender->connectClient(ProtocolVersion::Mqtt5, true, 60); + + FlashMQTestClient receiver; + receiver.start(); + receiver.connectClient(ProtocolVersion::Mqtt5, true, 60); + receiver.subscribe("my/will/testMqtt5DelayedWill", 0); + + receiver.clearReceivedLists(); + + sender.reset(); + + usleep(250000); + QVERIFY(receiver.receivedPackets.empty()); + + receiver.waitForMessageCount(1, 3); + + MqttPacket pubPack = receiver.receivedPublishes.front(); + pubPack.parsePublishData(); + + QCOMPARE(pubPack.getPublishData().topic, "my/will/testMqtt5DelayedWill"); + QCOMPARE(pubPack.getPublishData().payload, "mypayload"); + QCOMPARE(pubPack.getPublishData().qos, 0); +} + +void MainTests::testMqtt5DelayedWillAlwaysOnSessionEnd() +{ + std::unique_ptr sender = std::make_unique(); + sender->start(); + std::shared_ptr will = std::make_shared(); + will->topic = "my/will/testMqtt5DelayedWillAlwaysOnSessionEnd"; + will->payload = "mypayload"; + will->constructPropertyBuilder(); + will->propertyBuilder->writeWillDelay(120); // This long delay should not matter, because the session expires after 2s. + sender->setWill(will); + sender->connectClient(ProtocolVersion::Mqtt5, true, 2); + + FlashMQTestClient receiver; + receiver.start(); + receiver.connectClient(ProtocolVersion::Mqtt5, true, 60); + receiver.subscribe("my/will/testMqtt5DelayedWillAlwaysOnSessionEnd", 0); + + receiver.clearReceivedLists(); + + sender.reset(); + + usleep(1000000); + QVERIFY(receiver.receivedPackets.empty()); + + receiver.waitForMessageCount(1, 2); + + MqttPacket pubPack = receiver.receivedPublishes.front(); + pubPack.parsePublishData(); + + QCOMPARE(pubPack.getPublishData().topic, "my/will/testMqtt5DelayedWillAlwaysOnSessionEnd"); + QCOMPARE(pubPack.getPublishData().payload, "mypayload"); + QCOMPARE(pubPack.getPublishData().qos, 0); +} + int main(int argc, char *argv[]) { diff --git a/flashmqtestclient.cpp b/flashmqtestclient.cpp index 2efc450..9045182 100644 --- a/flashmqtestclient.cpp +++ b/flashmqtestclient.cpp @@ -32,10 +32,12 @@ FlashMQTestClient::~FlashMQTestClient() waitForQuit(); } -void FlashMQTestClient::waitForCondition(std::function f) +void FlashMQTestClient::waitForCondition(std::function f, int timeout) { + const int loopCount = (timeout * 1000) / 10; + int n = 0; - while(n++ < 100) + while(n++ < loopCount) { usleep(10000); @@ -60,6 +62,18 @@ void FlashMQTestClient::clearReceivedLists() receivedPublishes.clear(); } +void FlashMQTestClient::setWill(std::shared_ptr &will) +{ + this->will = will; +} + +void FlashMQTestClient::disconnect(ReasonCodes reason) +{ + client->setReadyForDisconnect(); + Disconnect d(this->client->getProtocolVersion(), reason); + client->writeMqttPacket(d); +} + void FlashMQTestClient::start() { testServerWorkerThreadData->start(&do_thread_work); @@ -67,6 +81,11 @@ void FlashMQTestClient::start() void FlashMQTestClient::connectClient(ProtocolVersion protocolVersion) { + connectClient(protocolVersion, true, 0); +} + +void FlashMQTestClient::connectClient(ProtocolVersion protocolVersion, bool clean_start, uint32_t session_expiry_interval) +{ int sockfd = check(socket(AF_INET, SOCK_STREAM, 0)); struct sockaddr_in servaddr; @@ -131,6 +150,10 @@ void FlashMQTestClient::connectClient(ProtocolVersion protocolVersion) }; Connect connect(protocolVersion, client->getClientId()); + connect.will = this->will; + connect.clean_start = clean_start; + connect.constructPropertyBuilder(); + connect.propertyBuilder->writeSessionExpiry(session_expiry_interval); MqttPacket connectPack(connect); this->client->writeMqttPacketAndBlameThisClient(connectPack); @@ -227,9 +250,9 @@ void FlashMQTestClient::waitForConnack() }); } -void FlashMQTestClient::waitForMessageCount(const size_t count) +void FlashMQTestClient::waitForMessageCount(const size_t count, int timeout) { waitForCondition([&]() { return this->receivedPublishes.size() >= count; - }); + }, timeout); } diff --git a/flashmqtestclient.h b/flashmqtestclient.h index 8fe4f1c..1bc046d 100644 --- a/flashmqtestclient.h +++ b/flashmqtestclient.h @@ -14,6 +14,7 @@ class FlashMQTestClient std::shared_ptr settings; std::shared_ptr testServerWorkerThreadData; std::shared_ptr client; + std::shared_ptr will; std::shared_ptr dummyThreadData; @@ -21,7 +22,7 @@ class FlashMQTestClient static int clientCount; - void waitForCondition(std::function f); + void waitForCondition(std::function f, int timeout = 1); public: @@ -33,13 +34,16 @@ public: void start(); void connectClient(ProtocolVersion protocolVersion); + void connectClient(ProtocolVersion protocolVersion, bool clean_start, uint32_t session_expiry_interval); void subscribe(const std::string topic, char qos); void publish(const std::string &topic, const std::string &payload, char qos); void clearReceivedLists(); + void setWill(std::shared_ptr &will); + void disconnect(ReasonCodes reason); void waitForQuit(); void waitForConnack(); - void waitForMessageCount(const size_t count); + void waitForMessageCount(const size_t count, int timeout = 1); }; #endif // FLASHMQTESTCLIENT_H diff --git a/mainapp.cpp b/mainapp.cpp index d80f93e..ab3ad98 100644 --- a/mainapp.cpp +++ b/mainapp.cpp @@ -65,7 +65,11 @@ MainApp::MainApp(const std::string &configFilePath) : auto f = std::bind(&MainApp::queueCleanup, this); //const uint64_t derrivedSessionCheckInterval = std::max((settings->expireSessionsAfterSeconds)*1000*2, 600000); //const uint64_t sessionCheckInterval = std::min(derrivedSessionCheckInterval, 86400000); - timer.addCallback(f, 10000, "session expiration"); + uint64_t interval = 10000; +#ifdef TESTING + interval = 1000; +#endif + timer.addCallback(f, interval, "session expiration"); } auto fKeepAlive = std::bind(&MainApp::queueKeepAliveCheckAtAllThreads, this); diff --git a/mqtt5properties.cpp b/mqtt5properties.cpp index 7d31ea0..edaab93 100644 --- a/mqtt5properties.cpp +++ b/mqtt5properties.cpp @@ -145,6 +145,11 @@ void Mqtt5PropertyBuilder::writeAuthenticationData(const std::string &data) writeStr(Mqtt5Properties::AuthenticationData, data); } +void Mqtt5PropertyBuilder::writeWillDelay(uint32_t delay) +{ + writeUint32(Mqtt5Properties::WillDelayInterval, delay, genericBytes); +} + void Mqtt5PropertyBuilder::setNewUserProperties(const std::shared_ptr>> &userProperties) { assert(!this->userProperties); diff --git a/mqtt5properties.h b/mqtt5properties.h index 689c3cc..1c0b5f8 100644 --- a/mqtt5properties.h +++ b/mqtt5properties.h @@ -48,6 +48,7 @@ public: void writeTopicAlias(const uint16_t id); void writeAuthenticationMethod(const std::string &method); void writeAuthenticationData(const std::string &data); + void writeWillDelay(uint32_t delay); void setNewUserProperties(const std::shared_ptr>> &userProperties); }; diff --git a/mqttpacket.cpp b/mqttpacket.cpp index 84f8b7d..3eb9139 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -204,16 +204,25 @@ MqttPacket::MqttPacket(const PubResponse &pubAck) : } } +/** + * @brief Constructor to create a disconnect packet. In normal server mode, only MQTT5 is supposed to do that (MQTT3 has no concept of server-initiated + * disconnect packet). But, we also use it in the test client. + * @param disconnect + */ MqttPacket::MqttPacket(const Disconnect &disconnect) : bites(disconnect.getLengthWithoutFixedHeader()) { - this->protocolVersion = ProtocolVersion::Mqtt5; + this->protocolVersion = disconnect.protocolVersion; packetType = PacketType::DISCONNECT; first_byte = static_cast(packetType) << 4; - writeByte(static_cast(disconnect.reasonCode)); - writeProperties(disconnect.propertyBuilder); + if (this->protocolVersion >= ProtocolVersion::Mqtt5) + { + writeByte(static_cast(disconnect.reasonCode)); + writeProperties(disconnect.propertyBuilder); + } + calculateRemainingLength(); } @@ -244,7 +253,17 @@ MqttPacket::MqttPacket(const Connect &connect) : writeString(magicString); writeByte(static_cast(protocolVersion)); - writeByte(2); // flags; The only bit set is 'clean session'. + + uint8_t flags = connect.clean_start << 1; + + if (connect.will) + { + flags |= 4; + flags |= (connect.will->qos << 3); + flags |= (connect.will->retain << 5); + } + + writeByte(flags); // Keep-alive writeUint16(60); @@ -256,6 +275,17 @@ MqttPacket::MqttPacket(const Connect &connect) : writeString(connect.clientid); + if (connect.will) + { + if (connect.protocolVersion >= ProtocolVersion::Mqtt5) + { + writeProperties(connect.will->propertyBuilder); + } + + writeString(connect.will->topic); + writeString(connect.will->payload); + } + calculateRemainingLength(); } diff --git a/types.cpp b/types.cpp index 86f23b6..d17ffaa 100644 --- a/types.cpp +++ b/types.cpp @@ -312,15 +312,17 @@ size_t UnsubAck::getLengthWithoutFixedHeader() const } Disconnect::Disconnect(const ProtocolVersion protVersion, ReasonCodes reason_code) : + protocolVersion(protVersion), reasonCode(reason_code) { - assert(protVersion >= ProtocolVersion::Mqtt5); - } size_t Disconnect::getLengthWithoutFixedHeader() const { + if (this->protocolVersion < ProtocolVersion::Mqtt5) + return 0; + size_t result = 1; const size_t proplen = propertyBuilder ? propertyBuilder->getLength() : 1; result += proplen; @@ -364,8 +366,20 @@ size_t Connect::getLengthWithoutFixedHeader() const const size_t proplen = propertyBuilder ? propertyBuilder->getLength() : 1; result += proplen; } - return result; + if (will) + { + if (this->protocolVersion >= ProtocolVersion::Mqtt5) + { + const size_t proplen = will->propertyBuilder ? will->propertyBuilder->getLength() : 1; + result += proplen; + } + + result += will->topic.length() + 2; + result += will->payload.length() + 2; + } + + return result; } std::string Connect::getMagicString() const @@ -376,6 +390,14 @@ std::string Connect::getMagicString() const return "MQTT"; } +void Connect::constructPropertyBuilder() +{ + if (this->propertyBuilder) + return; + + this->propertyBuilder = std::make_shared(); +} + Subscribe::Subscribe(const ProtocolVersion protocolVersion, uint16_t packetId, const std::string &topic, char qos) : protocolVersion(protocolVersion), packetId(packetId), diff --git a/types.h b/types.h index 2fc79ce..c421a96 100644 --- a/types.h +++ b/types.h @@ -262,6 +262,7 @@ public: class Disconnect { public: + ProtocolVersion protocolVersion; ReasonCodes reasonCode; std::shared_ptr propertyBuilder; Disconnect(const ProtocolVersion protVersion, ReasonCodes reason_code); @@ -280,14 +281,17 @@ public: struct Connect { const ProtocolVersion protocolVersion; + bool clean_start = true; std::string clientid; std::string username; std::string password; + std::shared_ptr will; std::shared_ptr propertyBuilder; Connect(ProtocolVersion protocolVersion, const std::string &clientid); size_t getLengthWithoutFixedHeader() const; std::string getMagicString() const; + void constructPropertyBuilder(); }; /** -- libgit2 0.21.4