From f2f0b866c4035d893f70129fefe2be5fff65cba4 Mon Sep 17 00:00:00 2001 From: Patric Stout Date: Sun, 25 Sep 2022 11:44:25 +0200 Subject: [PATCH] feat(stress): reworked stress-test to show status, including avg latency --- CMakeLists.txt | 2 +- example/pubstress/CMakeLists.txt | 18 ------------------ example/pubstress/main.cpp | 62 -------------------------------------------------------------- example/pubsub/main.cpp | 1 - example/stress/CMakeLists.txt | 18 ++++++++++++++++++ example/stress/main.cpp | 87 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 106 insertions(+), 82 deletions(-) delete mode 100644 example/pubstress/CMakeLists.txt delete mode 100644 example/pubstress/main.cpp create mode 100644 example/stress/CMakeLists.txt create mode 100644 example/stress/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 53eaf19..77f1590 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -52,5 +52,5 @@ target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wextra -Wpedantic -Werror) install(TARGETS ${PROJECT_NAME} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}) install(FILES ${CMAKE_BINARY_DIR}/truemqtt.pc DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/pkgconfig) -add_subdirectory(example/pubstress) add_subdirectory(example/pubsub) +add_subdirectory(example/stress) diff --git a/example/pubstress/CMakeLists.txt b/example/pubstress/CMakeLists.txt deleted file mode 100644 index eee0cb3..0000000 --- a/example/pubstress/CMakeLists.txt +++ /dev/null @@ -1,18 +0,0 @@ -# -# Copyright (c) TrueBrain -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. -# - -cmake_minimum_required(VERSION 3.16) - -project(truemqtt_pubstress) - -set(CMAKE_CXX_STANDARD 17) -set(CMAKE_CXX_STANDARD_REQUIRED ON) - -include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../../include) - -add_executable(${PROJECT_NAME} main.cpp) -target_link_libraries(${PROJECT_NAME} truemqtt) diff --git a/example/pubstress/main.cpp b/example/pubstress/main.cpp deleted file mode 100644 index 359944d..0000000 --- a/example/pubstress/main.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) TrueBrain - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -#include -#include -#include - -int main() -{ - // Create a connection to the local broker. - TrueMQTT::Client client("localhost", 1883, "test"); - - client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string message) - { std::cout << "Log " << level << ": " << message << std::endl; }); - client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 100); - client.setErrorCallback([](TrueMQTT::Client::Error error, std::string message) - { std::cout << "Error " << error << ": " << message << std::endl; }); - client.setLastWill("test/lastwill", "example pubsub finished", true); - - client.connect(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - int stop = 0; - - // Subscribe to the topic we are going to stress test. - client.subscribe("test/test/test", [&stop](const std::string topic, const std::string payload) {}); - - // Send a lot of packets constantly, while telling us when publishing is failing. - // The expected behaviour is that this goes okay for a while, till the broker - // backs up, after which it starts to fail intermittently. To push the broker - // to its breaking point, it helps to add additional subscriptions by other - // means. - bool is_failing = true; - while (true) - { - if (!client.publish("test/test/test", "Hello World!", false)) - { - if (!is_failing) - { - is_failing = true; - std::cout << "Failed to publish message" << std::endl; - } - } - else - { - if (is_failing) - { - is_failing = false; - std::cout << "Succeeded to publish message" << std::endl; - } - } - } - - // This application never ends, but for good measure, a disconnect. - client.disconnect(); - - return 0; -} diff --git a/example/pubsub/main.cpp b/example/pubsub/main.cpp index 2e5c817..56ce614 100644 --- a/example/pubsub/main.cpp +++ b/example/pubsub/main.cpp @@ -22,7 +22,6 @@ int main() client.setLastWill("test/lastwill", "example pubsub finished", true); client.connect(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); int stop = 0; diff --git a/example/stress/CMakeLists.txt b/example/stress/CMakeLists.txt new file mode 100644 index 0000000..deb83d1 --- /dev/null +++ b/example/stress/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright (c) TrueBrain +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. +# + +cmake_minimum_required(VERSION 3.16) + +project(truemqtt_stress) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../../include) + +add_executable(${PROJECT_NAME} main.cpp) +target_link_libraries(${PROJECT_NAME} truemqtt) diff --git a/example/stress/main.cpp b/example/stress/main.cpp new file mode 100644 index 0000000..0f06c41 --- /dev/null +++ b/example/stress/main.cpp @@ -0,0 +1,87 @@ +/* + * Copyright (c) TrueBrain + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include + +int main() +{ + // Create a connection to the local broker. + TrueMQTT::Client client("localhost", 1883, "test"); + + client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string message) + { std::cout << "Log " << level << ": " << message << std::endl; }); + client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 100); + client.setErrorCallback([](TrueMQTT::Client::Error error, std::string message) + { std::cout << "Error " << error << ": " << message << std::endl; }); + client.setLastWill("test/lastwill", "example pubsub finished", true); + + client.connect(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + size_t received = 0; + size_t sent = 0; + size_t failed = 0; + int64_t totalLatency = 0; + + // Subscribe to the topic we are going to stress test. + client.subscribe("test/test/test", [&received, &totalLatency](const std::string topic, const std::string payload) + { + // Calculate the latency. + auto now = std::chrono::steady_clock::now(); + auto then = std::chrono::time_point(std::chrono::microseconds(std::stoll(payload))); + auto latency = std::chrono::duration_cast(now - then).count(); + + totalLatency += latency; + received++; }); + + // Send a lot of packets constantly, while telling us when publishing is failing. + // The expected behaviour is that this goes okay for a while, till the broker + // backs up, after which it starts to fail intermittently. To push the broker + // to its breaking point, it helps to add additional subscriptions by other + // means. + bool is_failing = true; + auto start = std::chrono::steady_clock::now(); + while (true) + { + auto now = std::chrono::steady_clock::now(); + auto now_ms = std::chrono::duration_cast(now.time_since_epoch()).count(); + + // Publish the current time, so we can check the latency. + if (!client.publish("test/test/test", std::to_string(now_ms), false)) + { + failed++; + } + else + { + sent++; + } + + // Every second, tell how much messages per second we sent, received and failed. + if (now - start > std::chrono::seconds(1)) + { + if (received != 0) + { + std::cout << "Sent: " << sent << "/s - Received: " << received << "/s - Failed: " << failed << "/s - Avg Latency: " << (totalLatency / received) << "us" << std::endl; + } + sent = 0; + received = 0; + failed = 0; + totalLatency = 0; + start = now; + } + + // Don't go too fast, to get a better idea of the latency. + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + + // This application never ends, but for good measure, a disconnect. + client.disconnect(); + + return 0; +} -- libgit2 0.21.4