diff --git a/CMakeLists.txt b/CMakeLists.txt index 53eaf19..77f1590 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -52,5 +52,5 @@ target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wextra -Wpedantic -Werror) install(TARGETS ${PROJECT_NAME} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}) install(FILES ${CMAKE_BINARY_DIR}/truemqtt.pc DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/pkgconfig) -add_subdirectory(example/pubstress) add_subdirectory(example/pubsub) +add_subdirectory(example/stress) diff --git a/example/pubsub/main.cpp b/example/pubsub/main.cpp index 2e5c817..56ce614 100644 --- a/example/pubsub/main.cpp +++ b/example/pubsub/main.cpp @@ -22,7 +22,6 @@ int main() client.setLastWill("test/lastwill", "example pubsub finished", true); client.connect(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); int stop = 0; diff --git a/example/pubstress/CMakeLists.txt b/example/stress/CMakeLists.txt index eee0cb3..deb83d1 100644 --- a/example/pubstress/CMakeLists.txt +++ b/example/stress/CMakeLists.txt @@ -7,7 +7,7 @@ cmake_minimum_required(VERSION 3.16) -project(truemqtt_pubstress) +project(truemqtt_stress) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) diff --git a/example/pubstress/main.cpp b/example/stress/main.cpp index 359944d..0f06c41 100644 --- a/example/pubstress/main.cpp +++ b/example/stress/main.cpp @@ -24,10 +24,21 @@ int main() client.connect(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); - int stop = 0; + size_t received = 0; + size_t sent = 0; + size_t failed = 0; + int64_t totalLatency = 0; // Subscribe to the topic we are going to stress test. - client.subscribe("test/test/test", [&stop](const std::string topic, const std::string payload) {}); + client.subscribe("test/test/test", [&received, &totalLatency](const std::string topic, const std::string payload) + { + // Calculate the latency. + auto now = std::chrono::steady_clock::now(); + auto then = std::chrono::time_point(std::chrono::microseconds(std::stoll(payload))); + auto latency = std::chrono::duration_cast(now - then).count(); + + totalLatency += latency; + received++; }); // Send a lot of packets constantly, while telling us when publishing is failing. // The expected behaviour is that this goes okay for a while, till the broker @@ -35,24 +46,38 @@ int main() // to its breaking point, it helps to add additional subscriptions by other // means. bool is_failing = true; + auto start = std::chrono::steady_clock::now(); while (true) { - if (!client.publish("test/test/test", "Hello World!", false)) + auto now = std::chrono::steady_clock::now(); + auto now_ms = std::chrono::duration_cast(now.time_since_epoch()).count(); + + // Publish the current time, so we can check the latency. + if (!client.publish("test/test/test", std::to_string(now_ms), false)) { - if (!is_failing) - { - is_failing = true; - std::cout << "Failed to publish message" << std::endl; - } + failed++; } else { - if (is_failing) + sent++; + } + + // Every second, tell how much messages per second we sent, received and failed. + if (now - start > std::chrono::seconds(1)) + { + if (received != 0) { - is_failing = false; - std::cout << "Succeeded to publish message" << std::endl; + std::cout << "Sent: " << sent << "/s - Received: " << received << "/s - Failed: " << failed << "/s - Avg Latency: " << (totalLatency / received) << "us" << std::endl; } + sent = 0; + received = 0; + failed = 0; + totalLatency = 0; + start = now; } + + // Don't go too fast, to get a better idea of the latency. + std::this_thread::sleep_for(std::chrono::microseconds(10)); } // This application never ends, but for good measure, a disconnect.