/* * Copyright (c) TrueBrain * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #include #include #include #include #include TEST_CASE("regression", "[regression]") { // last_message / logger_mutex needs to outlive client1, as it is captured in a lambda. std::string last_message = {}; std::mutex logger_mutex; TrueMQTT::Client client1("localhost", 1883, "client-1"); TrueMQTT::Client client2("localhost", 1883, "client-2"); client1.setLogger( TrueMQTT::Client::LogLevel::INFO, [&last_message, &logger_mutex](TrueMQTT::Client::LogLevel level, const std::string_view message) { std::scoped_lock lock(logger_mutex); last_message = message; }); client2.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 10); uint8_t connected = 0x0; client1.setStateChangeCallback( [&connected](TrueMQTT::Client::State state) { if (state == TrueMQTT::Client::State::CONNECTED) { connected |= 0x1; } }); client2.setStateChangeCallback( [&connected](TrueMQTT::Client::State state) { if (state == TrueMQTT::Client::State::CONNECTED) { connected |= 0x2; } }); // TC: Cannot publish/subscribe/unsubscribe when disconnected { CHECK(client1.publish("regression/topic1", "Hello World!", false) == false); CHECK(last_message == "Cannot publish when disconnected"); client1.subscribe("regression/topic1", [](const std::string_view topic, const std::string_view message) { /* empty */ }); CHECK(last_message == "Cannot subscribe when disconnected"); client1.unsubscribe("regression/topic1"); CHECK(last_message == "Cannot unsubscribe when disconnected"); } uint8_t received = 0; uint8_t received2 = 0; // TC: Connect the subscribing client first and instantly make a subscription; subscription should be created after connect. { client1.connect(); CHECK(last_message == "Connecting to localhost:1883"); client1.subscribe( "regression/topic1", [&received](const std::string_view topic, const std::string_view payload) { CHECK(std::string(topic) == "regression/topic1"); received++; }); auto start = std::chrono::steady_clock::now(); while ((connected & 0x1) == 0) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (std::chrono::steady_clock::now() - start > std::chrono::seconds(5)) { FAIL("Timeout waiting for client to connect"); } } // Give some time for the subscription to actually be made. std::this_thread::sleep_for(std::chrono::milliseconds(50)); } // TC: Connect the publishing client next and instantly publish a message; message should be published after connect. { client2.connect(); CHECK(client2.publish("regression/topic1", "Hello World!", false) == true); auto start = std::chrono::steady_clock::now(); while ((connected & 0x2) == 0) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (std::chrono::steady_clock::now() - start > std::chrono::seconds(1)) { FAIL("Timeout waiting for client to connect"); } } } // TC: Publish before connect happened and check message arrives after connection is made { // Wait for the pending publish to arrive. auto start = std::chrono::steady_clock::now(); while (received != 1) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (std::chrono::steady_clock::now() - start > std::chrono::seconds(1)) { FAIL("Timeout waiting for message to arrive"); } } received = 0; } // TC: Cannot connect when already connected { client1.connect(); CHECK(last_message == "Can't connect when already connecting / connected"); } // TC: Publish to delayed subscription must work { CHECK(client2.publish("regression/topic1", "Hello World!", false) == true); CHECK(client2.publish("regression/topic1", "Hello World!", false) == true); CHECK(client2.publish("regression/topic1", "Hello World!", false) == true); CHECK(client2.publish("regression/topic1", "Hello World!", false) == true); auto start = std::chrono::steady_clock::now(); while (received != 4) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (std::chrono::steady_clock::now() - start > std::chrono::seconds(1)) { FAIL("Timeout waiting for message to arrive"); } } received = 0; } // TC: Subscribe to new topic and publish on it { client1.subscribe( "regression/topic2", [&received2](const std::string_view topic, const std::string_view payload) { CHECK(std::string(topic) == "regression/topic2"); received2++; }); // Give some time for the subscription to actually be made. std::this_thread::sleep_for(std::chrono::milliseconds(50)); CHECK(client2.publish("regression/topic2", "Hello World!", false) == true); // Wait for the pending publish to arrive. auto start = std::chrono::steady_clock::now(); while (received2 != 1) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (std::chrono::steady_clock::now() - start > std::chrono::seconds(1)) { FAIL("Timeout waiting for message to arrive"); } } received2 = 0; } // TC: Unsubscribe and check a publish to the topic no longer arrives { client1.unsubscribe("regression/topic1"); client1.unsubscribe("regression/topic2"); CHECK(client2.publish("regression/topic1", "Hello World!", false) == true); CHECK(client2.publish("regression/topic2", "Hello World!", false) == true); std::this_thread::sleep_for(std::chrono::milliseconds(50)); REQUIRE(received == 0); REQUIRE(received2 == 0); } // TC: Check wildcard subscriptions { client1.subscribe( "regression/topic3/+", [&received](const std::string_view topic, const std::string_view payload) { CHECK(std::string(topic) == "regression/topic3/1"); received++; }); client1.subscribe( "regression/#", [&received2](const std::string_view topic, const std::string_view payload) { if (received2 == 0) { CHECK(std::string(topic) == "regression/topic3/1"); } else { CHECK(std::string(topic) == "regression/topic4/2"); } received2++; }); std::this_thread::sleep_for(std::chrono::milliseconds(50)); CHECK(client2.publish("regression/topic3/1", "Hello World!", false) == true); CHECK(client2.publish("regression/topic4/2", "Hello World!", false) == true); auto start = std::chrono::steady_clock::now(); while (received != 1 || received2 != 2) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (std::chrono::steady_clock::now() - start > std::chrono::seconds(1)) { FAIL("Timeout waiting for message to arrive"); } } received = 0; received2 = 0; } // TC: Cannot unsubscribe from topic not subscribed to { client1.unsubscribe("regression/topic1"); CHECK(last_message == "Cannot unsubscribe from topic 'regression/topic1' because we are not subscribed to it"); } // TC: Validate disconnect is seen { client1.disconnect(); CHECK(last_message == "Disconnecting from broker"); client2.disconnect(); } // TC: Cannot disconnect when not connected { client1.disconnect(); CHECK(last_message == "Can't disconnect when already disconnected"); } // TC: Connect to a non-existing broker { TrueMQTT::Client client3("localhost", 1884, "client-3"); client3.setLogger( TrueMQTT::Client::LogLevel::INFO, [&last_message, &logger_mutex](TrueMQTT::Client::LogLevel level, const std::string_view message) { std::scoped_lock lock(logger_mutex); last_message = message; }); client3.setStateChangeCallback( [&connected](TrueMQTT::Client::State state) { if (state == TrueMQTT::Client::State::CONNECTED) { connected |= 0x4; } }); client3.connect(); CHECK(last_message == "Connecting to localhost:1884"); bool failed = false; auto start = std::chrono::steady_clock::now(); while ((connected & 0x4) == 0) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (std::chrono::steady_clock::now() - start > std::chrono::milliseconds(100)) { failed = true; break; } } REQUIRE(failed == true); client3.disconnect(); CHECK(last_message == "Disconnecting from broker"); } }