Commit 27b65bef85f713382f55dab4e30148438ac6ab4a

Authored by Wiebe Cazemier
1 parent 56d9c510

Add tests for will

FlashMQTests/tst_maintests.cpp
... ... @@ -122,6 +122,12 @@ private slots:
122 122  
123 123 void testBasicsWithFlashMQTestClient();
124 124  
  125 + void testMqtt3will();
  126 + void testMqtt3NoWillOnDisconnect();
  127 + void testMqtt5NoWillOnDisconnect();
  128 + void testMqtt5DelayedWill();
  129 + void testMqtt5DelayedWillAlwaysOnSessionEnd();
  130 +
125 131 };
126 132  
127 133 MainTests::MainTests()
... ... @@ -1424,6 +1430,151 @@ void MainTests::testBasicsWithFlashMQTestClient()
1424 1430  
1425 1431 }
1426 1432  
  1433 +void MainTests::testMqtt3will()
  1434 +{
  1435 + std::unique_ptr<FlashMQTestClient> sender = std::make_unique<FlashMQTestClient>();
  1436 + sender->start();
  1437 + std::shared_ptr<WillPublish> will = std::make_shared<WillPublish>();
  1438 + will->topic = "my/will";
  1439 + will->payload = "mypayload";
  1440 + sender->setWill(will);
  1441 + sender->connectClient(ProtocolVersion::Mqtt311);
  1442 +
  1443 + FlashMQTestClient receiver;
  1444 + receiver.start();
  1445 + receiver.connectClient(ProtocolVersion::Mqtt311);
  1446 + receiver.subscribe("my/will", 0);
  1447 +
  1448 + sender.reset();
  1449 +
  1450 + receiver.waitForMessageCount(1);
  1451 +
  1452 + MqttPacket pubPack = receiver.receivedPublishes.front();
  1453 + pubPack.parsePublishData();
  1454 +
  1455 + QCOMPARE(pubPack.getPublishData().topic, "my/will");
  1456 + QCOMPARE(pubPack.getPublishData().payload, "mypayload");
  1457 + QCOMPARE(pubPack.getPublishData().qos, 0);
  1458 +}
  1459 +
  1460 +void MainTests::testMqtt3NoWillOnDisconnect()
  1461 +{
  1462 + std::unique_ptr<FlashMQTestClient> sender = std::make_unique<FlashMQTestClient>();
  1463 + sender->start();
  1464 + std::shared_ptr<WillPublish> will = std::make_shared<WillPublish>();
  1465 + will->topic = "my/will/testMqtt3NoWillOnDisconnect";
  1466 + will->payload = "mypayload";
  1467 + sender->setWill(will);
  1468 + sender->connectClient(ProtocolVersion::Mqtt311);
  1469 +
  1470 + FlashMQTestClient receiver;
  1471 + receiver.start();
  1472 + receiver.connectClient(ProtocolVersion::Mqtt311);
  1473 + receiver.subscribe("my/will/testMqtt3NoWillOnDisconnect", 0);
  1474 +
  1475 + receiver.clearReceivedLists();
  1476 +
  1477 + sender->disconnect(ReasonCodes::Success);
  1478 + sender.reset();
  1479 +
  1480 + usleep(250000);
  1481 +
  1482 + QVERIFY(receiver.receivedPackets.empty());
  1483 +}
  1484 +
  1485 +void MainTests::testMqtt5NoWillOnDisconnect()
  1486 +{
  1487 + std::unique_ptr<FlashMQTestClient> sender = std::make_unique<FlashMQTestClient>();
  1488 + sender->start();
  1489 + std::shared_ptr<WillPublish> will = std::make_shared<WillPublish>();
  1490 + will->topic = "my/will/testMqtt5NoWillOnDisconnect";
  1491 + will->payload = "mypayload";
  1492 + sender->setWill(will);
  1493 + sender->connectClient(ProtocolVersion::Mqtt5);
  1494 +
  1495 + FlashMQTestClient receiver;
  1496 + receiver.start();
  1497 + receiver.connectClient(ProtocolVersion::Mqtt5);
  1498 + receiver.subscribe("my/will/testMqtt3NoWillOnDisconnect", 0);
  1499 +
  1500 + receiver.clearReceivedLists();
  1501 +
  1502 + sender->disconnect(ReasonCodes::Success);
  1503 + sender.reset();
  1504 +
  1505 + usleep(250000);
  1506 +
  1507 + QVERIFY(receiver.receivedPackets.empty());
  1508 +}
  1509 +
  1510 +void MainTests::testMqtt5DelayedWill()
  1511 +{
  1512 + std::unique_ptr<FlashMQTestClient> sender = std::make_unique<FlashMQTestClient>();
  1513 + sender->start();
  1514 + std::shared_ptr<WillPublish> will = std::make_shared<WillPublish>();
  1515 + will->topic = "my/will/testMqtt5DelayedWill";
  1516 + will->payload = "mypayload";
  1517 + will->constructPropertyBuilder();
  1518 + will->propertyBuilder->writeWillDelay(2);
  1519 + sender->setWill(will);
  1520 + sender->connectClient(ProtocolVersion::Mqtt5, true, 60);
  1521 +
  1522 + FlashMQTestClient receiver;
  1523 + receiver.start();
  1524 + receiver.connectClient(ProtocolVersion::Mqtt5, true, 60);
  1525 + receiver.subscribe("my/will/testMqtt5DelayedWill", 0);
  1526 +
  1527 + receiver.clearReceivedLists();
  1528 +
  1529 + sender.reset();
  1530 +
  1531 + usleep(250000);
  1532 + QVERIFY(receiver.receivedPackets.empty());
  1533 +
  1534 + receiver.waitForMessageCount(1, 3);
  1535 +
  1536 + MqttPacket pubPack = receiver.receivedPublishes.front();
  1537 + pubPack.parsePublishData();
  1538 +
  1539 + QCOMPARE(pubPack.getPublishData().topic, "my/will/testMqtt5DelayedWill");
  1540 + QCOMPARE(pubPack.getPublishData().payload, "mypayload");
  1541 + QCOMPARE(pubPack.getPublishData().qos, 0);
  1542 +}
  1543 +
  1544 +void MainTests::testMqtt5DelayedWillAlwaysOnSessionEnd()
  1545 +{
  1546 + std::unique_ptr<FlashMQTestClient> sender = std::make_unique<FlashMQTestClient>();
  1547 + sender->start();
  1548 + std::shared_ptr<WillPublish> will = std::make_shared<WillPublish>();
  1549 + will->topic = "my/will/testMqtt5DelayedWillAlwaysOnSessionEnd";
  1550 + will->payload = "mypayload";
  1551 + will->constructPropertyBuilder();
  1552 + will->propertyBuilder->writeWillDelay(120); // This long delay should not matter, because the session expires after 2s.
  1553 + sender->setWill(will);
  1554 + sender->connectClient(ProtocolVersion::Mqtt5, true, 2);
  1555 +
  1556 + FlashMQTestClient receiver;
  1557 + receiver.start();
  1558 + receiver.connectClient(ProtocolVersion::Mqtt5, true, 60);
  1559 + receiver.subscribe("my/will/testMqtt5DelayedWillAlwaysOnSessionEnd", 0);
  1560 +
  1561 + receiver.clearReceivedLists();
  1562 +
  1563 + sender.reset();
  1564 +
  1565 + usleep(1000000);
  1566 + QVERIFY(receiver.receivedPackets.empty());
  1567 +
  1568 + receiver.waitForMessageCount(1, 2);
  1569 +
  1570 + MqttPacket pubPack = receiver.receivedPublishes.front();
  1571 + pubPack.parsePublishData();
  1572 +
  1573 + QCOMPARE(pubPack.getPublishData().topic, "my/will/testMqtt5DelayedWillAlwaysOnSessionEnd");
  1574 + QCOMPARE(pubPack.getPublishData().payload, "mypayload");
  1575 + QCOMPARE(pubPack.getPublishData().qos, 0);
  1576 +}
  1577 +
1427 1578  
1428 1579 int main(int argc, char *argv[])
1429 1580 {
... ...
flashmqtestclient.cpp
... ... @@ -32,10 +32,12 @@ FlashMQTestClient::~FlashMQTestClient()
32 32 waitForQuit();
33 33 }
34 34  
35   -void FlashMQTestClient::waitForCondition(std::function<bool()> f)
  35 +void FlashMQTestClient::waitForCondition(std::function<bool()> f, int timeout)
36 36 {
  37 + const int loopCount = (timeout * 1000) / 10;
  38 +
37 39 int n = 0;
38   - while(n++ < 100)
  40 + while(n++ < loopCount)
39 41 {
40 42 usleep(10000);
41 43  
... ... @@ -60,6 +62,18 @@ void FlashMQTestClient::clearReceivedLists()
60 62 receivedPublishes.clear();
61 63 }
62 64  
  65 +void FlashMQTestClient::setWill(std::shared_ptr<WillPublish> &will)
  66 +{
  67 + this->will = will;
  68 +}
  69 +
  70 +void FlashMQTestClient::disconnect(ReasonCodes reason)
  71 +{
  72 + client->setReadyForDisconnect();
  73 + Disconnect d(this->client->getProtocolVersion(), reason);
  74 + client->writeMqttPacket(d);
  75 +}
  76 +
63 77 void FlashMQTestClient::start()
64 78 {
65 79 testServerWorkerThreadData->start(&do_thread_work);
... ... @@ -67,6 +81,11 @@ void FlashMQTestClient::start()
67 81  
68 82 void FlashMQTestClient::connectClient(ProtocolVersion protocolVersion)
69 83 {
  84 + connectClient(protocolVersion, true, 0);
  85 +}
  86 +
  87 +void FlashMQTestClient::connectClient(ProtocolVersion protocolVersion, bool clean_start, uint32_t session_expiry_interval)
  88 +{
70 89 int sockfd = check<std::runtime_error>(socket(AF_INET, SOCK_STREAM, 0));
71 90  
72 91 struct sockaddr_in servaddr;
... ... @@ -131,6 +150,10 @@ void FlashMQTestClient::connectClient(ProtocolVersion protocolVersion)
131 150 };
132 151  
133 152 Connect connect(protocolVersion, client->getClientId());
  153 + connect.will = this->will;
  154 + connect.clean_start = clean_start;
  155 + connect.constructPropertyBuilder();
  156 + connect.propertyBuilder->writeSessionExpiry(session_expiry_interval);
134 157 MqttPacket connectPack(connect);
135 158 this->client->writeMqttPacketAndBlameThisClient(connectPack);
136 159  
... ... @@ -227,9 +250,9 @@ void FlashMQTestClient::waitForConnack()
227 250 });
228 251 }
229 252  
230   -void FlashMQTestClient::waitForMessageCount(const size_t count)
  253 +void FlashMQTestClient::waitForMessageCount(const size_t count, int timeout)
231 254 {
232 255 waitForCondition([&]() {
233 256 return this->receivedPublishes.size() >= count;
234   - });
  257 + }, timeout);
235 258 }
... ...
flashmqtestclient.h
... ... @@ -14,6 +14,7 @@ class FlashMQTestClient
14 14 std::shared_ptr<Settings> settings;
15 15 std::shared_ptr<ThreadData> testServerWorkerThreadData;
16 16 std::shared_ptr<Client> client;
  17 + std::shared_ptr<WillPublish> will;
17 18  
18 19 std::shared_ptr<ThreadData> dummyThreadData;
19 20  
... ... @@ -21,7 +22,7 @@ class FlashMQTestClient
21 22  
22 23 static int clientCount;
23 24  
24   - void waitForCondition(std::function<bool()> f);
  25 + void waitForCondition(std::function<bool()> f, int timeout = 1);
25 26  
26 27  
27 28 public:
... ... @@ -33,13 +34,16 @@ public:
33 34  
34 35 void start();
35 36 void connectClient(ProtocolVersion protocolVersion);
  37 + void connectClient(ProtocolVersion protocolVersion, bool clean_start, uint32_t session_expiry_interval);
36 38 void subscribe(const std::string topic, char qos);
37 39 void publish(const std::string &topic, const std::string &payload, char qos);
38 40 void clearReceivedLists();
  41 + void setWill(std::shared_ptr<WillPublish> &will);
  42 + void disconnect(ReasonCodes reason);
39 43  
40 44 void waitForQuit();
41 45 void waitForConnack();
42   - void waitForMessageCount(const size_t count);
  46 + void waitForMessageCount(const size_t count, int timeout = 1);
43 47 };
44 48  
45 49 #endif // FLASHMQTESTCLIENT_H
... ...
mainapp.cpp
... ... @@ -65,7 +65,11 @@ MainApp::MainApp(const std::string &amp;configFilePath) :
65 65 auto f = std::bind(&MainApp::queueCleanup, this);
66 66 //const uint64_t derrivedSessionCheckInterval = std::max<uint64_t>((settings->expireSessionsAfterSeconds)*1000*2, 600000);
67 67 //const uint64_t sessionCheckInterval = std::min<uint64_t>(derrivedSessionCheckInterval, 86400000);
68   - timer.addCallback(f, 10000, "session expiration");
  68 + uint64_t interval = 10000;
  69 +#ifdef TESTING
  70 + interval = 1000;
  71 +#endif
  72 + timer.addCallback(f, interval, "session expiration");
69 73 }
70 74  
71 75 auto fKeepAlive = std::bind(&MainApp::queueKeepAliveCheckAtAllThreads, this);
... ...
mqtt5properties.cpp
... ... @@ -145,6 +145,11 @@ void Mqtt5PropertyBuilder::writeAuthenticationData(const std::string &amp;data)
145 145 writeStr(Mqtt5Properties::AuthenticationData, data);
146 146 }
147 147  
  148 +void Mqtt5PropertyBuilder::writeWillDelay(uint32_t delay)
  149 +{
  150 + writeUint32(Mqtt5Properties::WillDelayInterval, delay, genericBytes);
  151 +}
  152 +
148 153 void Mqtt5PropertyBuilder::setNewUserProperties(const std::shared_ptr<std::vector<std::pair<std::string, std::string>>> &userProperties)
149 154 {
150 155 assert(!this->userProperties);
... ...
mqtt5properties.h
... ... @@ -48,6 +48,7 @@ public:
48 48 void writeTopicAlias(const uint16_t id);
49 49 void writeAuthenticationMethod(const std::string &method);
50 50 void writeAuthenticationData(const std::string &data);
  51 + void writeWillDelay(uint32_t delay);
51 52 void setNewUserProperties(const std::shared_ptr<std::vector<std::pair<std::string, std::string>>> &userProperties);
52 53 };
53 54  
... ...
mqttpacket.cpp
... ... @@ -204,16 +204,25 @@ MqttPacket::MqttPacket(const PubResponse &amp;pubAck) :
204 204 }
205 205 }
206 206  
  207 +/**
  208 + * @brief Constructor to create a disconnect packet. In normal server mode, only MQTT5 is supposed to do that (MQTT3 has no concept of server-initiated
  209 + * disconnect packet). But, we also use it in the test client.
  210 + * @param disconnect
  211 + */
207 212 MqttPacket::MqttPacket(const Disconnect &disconnect) :
208 213 bites(disconnect.getLengthWithoutFixedHeader())
209 214 {
210   - this->protocolVersion = ProtocolVersion::Mqtt5;
  215 + this->protocolVersion = disconnect.protocolVersion;
211 216  
212 217 packetType = PacketType::DISCONNECT;
213 218 first_byte = static_cast<char>(packetType) << 4;
214 219  
215   - writeByte(static_cast<uint8_t>(disconnect.reasonCode));
216   - writeProperties(disconnect.propertyBuilder);
  220 + if (this->protocolVersion >= ProtocolVersion::Mqtt5)
  221 + {
  222 + writeByte(static_cast<uint8_t>(disconnect.reasonCode));
  223 + writeProperties(disconnect.propertyBuilder);
  224 + }
  225 +
217 226 calculateRemainingLength();
218 227 }
219 228  
... ... @@ -244,7 +253,17 @@ MqttPacket::MqttPacket(const Connect &amp;connect) :
244 253 writeString(magicString);
245 254  
246 255 writeByte(static_cast<char>(protocolVersion));
247   - writeByte(2); // flags; The only bit set is 'clean session'.
  256 +
  257 + uint8_t flags = connect.clean_start << 1;
  258 +
  259 + if (connect.will)
  260 + {
  261 + flags |= 4;
  262 + flags |= (connect.will->qos << 3);
  263 + flags |= (connect.will->retain << 5);
  264 + }
  265 +
  266 + writeByte(flags);
248 267  
249 268 // Keep-alive
250 269 writeUint16(60);
... ... @@ -256,6 +275,17 @@ MqttPacket::MqttPacket(const Connect &amp;connect) :
256 275  
257 276 writeString(connect.clientid);
258 277  
  278 + if (connect.will)
  279 + {
  280 + if (connect.protocolVersion >= ProtocolVersion::Mqtt5)
  281 + {
  282 + writeProperties(connect.will->propertyBuilder);
  283 + }
  284 +
  285 + writeString(connect.will->topic);
  286 + writeString(connect.will->payload);
  287 + }
  288 +
259 289 calculateRemainingLength();
260 290 }
261 291  
... ...
types.cpp
... ... @@ -312,15 +312,17 @@ size_t UnsubAck::getLengthWithoutFixedHeader() const
312 312 }
313 313  
314 314 Disconnect::Disconnect(const ProtocolVersion protVersion, ReasonCodes reason_code) :
  315 + protocolVersion(protVersion),
315 316 reasonCode(reason_code)
316 317 {
317   - assert(protVersion >= ProtocolVersion::Mqtt5);
318   -
319 318  
320 319 }
321 320  
322 321 size_t Disconnect::getLengthWithoutFixedHeader() const
323 322 {
  323 + if (this->protocolVersion < ProtocolVersion::Mqtt5)
  324 + return 0;
  325 +
324 326 size_t result = 1;
325 327 const size_t proplen = propertyBuilder ? propertyBuilder->getLength() : 1;
326 328 result += proplen;
... ... @@ -364,8 +366,20 @@ size_t Connect::getLengthWithoutFixedHeader() const
364 366 const size_t proplen = propertyBuilder ? propertyBuilder->getLength() : 1;
365 367 result += proplen;
366 368 }
367   - return result;
368 369  
  370 + if (will)
  371 + {
  372 + if (this->protocolVersion >= ProtocolVersion::Mqtt5)
  373 + {
  374 + const size_t proplen = will->propertyBuilder ? will->propertyBuilder->getLength() : 1;
  375 + result += proplen;
  376 + }
  377 +
  378 + result += will->topic.length() + 2;
  379 + result += will->payload.length() + 2;
  380 + }
  381 +
  382 + return result;
369 383 }
370 384  
371 385 std::string Connect::getMagicString() const
... ... @@ -376,6 +390,14 @@ std::string Connect::getMagicString() const
376 390 return "MQTT";
377 391 }
378 392  
  393 +void Connect::constructPropertyBuilder()
  394 +{
  395 + if (this->propertyBuilder)
  396 + return;
  397 +
  398 + this->propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>();
  399 +}
  400 +
379 401 Subscribe::Subscribe(const ProtocolVersion protocolVersion, uint16_t packetId, const std::string &topic, char qos) :
380 402 protocolVersion(protocolVersion),
381 403 packetId(packetId),
... ...
... ... @@ -262,6 +262,7 @@ public:
262 262 class Disconnect
263 263 {
264 264 public:
  265 + ProtocolVersion protocolVersion;
265 266 ReasonCodes reasonCode;
266 267 std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder;
267 268 Disconnect(const ProtocolVersion protVersion, ReasonCodes reason_code);
... ... @@ -280,14 +281,17 @@ public:
280 281 struct Connect
281 282 {
282 283 const ProtocolVersion protocolVersion;
  284 + bool clean_start = true;
283 285 std::string clientid;
284 286 std::string username;
285 287 std::string password;
  288 + std::shared_ptr<WillPublish> will;
286 289 std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder;
287 290  
288 291 Connect(ProtocolVersion protocolVersion, const std::string &clientid);
289 292 size_t getLengthWithoutFixedHeader() const;
290 293 std::string getMagicString() const;
  294 + void constructPropertyBuilder();
291 295 };
292 296  
293 297 /**
... ...