Commit f2f0b866c4035d893f70129fefe2be5fff65cba4

Authored by Patric Stout
1 parent d5d85340

feat(stress): reworked stress-test to show status, including avg latency

CMakeLists.txt
@@ -52,5 +52,5 @@ target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wextra -Wpedantic -Werror) @@ -52,5 +52,5 @@ target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wextra -Wpedantic -Werror)
52 install(TARGETS ${PROJECT_NAME} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}) 52 install(TARGETS ${PROJECT_NAME} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR})
53 install(FILES ${CMAKE_BINARY_DIR}/truemqtt.pc DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/pkgconfig) 53 install(FILES ${CMAKE_BINARY_DIR}/truemqtt.pc DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/pkgconfig)
54 54
55 -add_subdirectory(example/pubstress)  
56 add_subdirectory(example/pubsub) 55 add_subdirectory(example/pubsub)
  56 +add_subdirectory(example/stress)
example/pubsub/main.cpp
@@ -22,7 +22,6 @@ int main() @@ -22,7 +22,6 @@ int main()
22 client.setLastWill("test/lastwill", "example pubsub finished", true); 22 client.setLastWill("test/lastwill", "example pubsub finished", true);
23 23
24 client.connect(); 24 client.connect();
25 - std::this_thread::sleep_for(std::chrono::milliseconds(100));  
26 25
27 int stop = 0; 26 int stop = 0;
28 27
example/pubstress/CMakeLists.txt renamed to example/stress/CMakeLists.txt
@@ -7,7 +7,7 @@ @@ -7,7 +7,7 @@
7 7
8 cmake_minimum_required(VERSION 3.16) 8 cmake_minimum_required(VERSION 3.16)
9 9
10 -project(truemqtt_pubstress) 10 +project(truemqtt_stress)
11 11
12 set(CMAKE_CXX_STANDARD 17) 12 set(CMAKE_CXX_STANDARD 17)
13 set(CMAKE_CXX_STANDARD_REQUIRED ON) 13 set(CMAKE_CXX_STANDARD_REQUIRED ON)
example/pubstress/main.cpp renamed to example/stress/main.cpp
@@ -24,10 +24,21 @@ int main() @@ -24,10 +24,21 @@ int main()
24 client.connect(); 24 client.connect();
25 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 25 std::this_thread::sleep_for(std::chrono::milliseconds(100));
26 26
27 - int stop = 0; 27 + size_t received = 0;
  28 + size_t sent = 0;
  29 + size_t failed = 0;
  30 + int64_t totalLatency = 0;
28 31
29 // Subscribe to the topic we are going to stress test. 32 // Subscribe to the topic we are going to stress test.
30 - client.subscribe("test/test/test", [&stop](const std::string topic, const std::string payload) {}); 33 + client.subscribe("test/test/test", [&received, &totalLatency](const std::string topic, const std::string payload)
  34 + {
  35 + // Calculate the latency.
  36 + auto now = std::chrono::steady_clock::now();
  37 + auto then = std::chrono::time_point<std::chrono::steady_clock>(std::chrono::microseconds(std::stoll(payload)));
  38 + auto latency = std::chrono::duration_cast<std::chrono::microseconds>(now - then).count();
  39 +
  40 + totalLatency += latency;
  41 + received++; });
31 42
32 // Send a lot of packets constantly, while telling us when publishing is failing. 43 // Send a lot of packets constantly, while telling us when publishing is failing.
33 // The expected behaviour is that this goes okay for a while, till the broker 44 // The expected behaviour is that this goes okay for a while, till the broker
@@ -35,24 +46,38 @@ int main() @@ -35,24 +46,38 @@ int main()
35 // to its breaking point, it helps to add additional subscriptions by other 46 // to its breaking point, it helps to add additional subscriptions by other
36 // means. 47 // means.
37 bool is_failing = true; 48 bool is_failing = true;
  49 + auto start = std::chrono::steady_clock::now();
38 while (true) 50 while (true)
39 { 51 {
40 - if (!client.publish("test/test/test", "Hello World!", false)) 52 + auto now = std::chrono::steady_clock::now();
  53 + auto now_ms = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
  54 +
  55 + // Publish the current time, so we can check the latency.
  56 + if (!client.publish("test/test/test", std::to_string(now_ms), false))
41 { 57 {
42 - if (!is_failing)  
43 - {  
44 - is_failing = true;  
45 - std::cout << "Failed to publish message" << std::endl;  
46 - } 58 + failed++;
47 } 59 }
48 else 60 else
49 { 61 {
50 - if (is_failing) 62 + sent++;
  63 + }
  64 +
  65 + // Every second, tell how much messages per second we sent, received and failed.
  66 + if (now - start > std::chrono::seconds(1))
  67 + {
  68 + if (received != 0)
51 { 69 {
52 - is_failing = false;  
53 - std::cout << "Succeeded to publish message" << std::endl; 70 + std::cout << "Sent: " << sent << "/s - Received: " << received << "/s - Failed: " << failed << "/s - Avg Latency: " << (totalLatency / received) << "us" << std::endl;
54 } 71 }
  72 + sent = 0;
  73 + received = 0;
  74 + failed = 0;
  75 + totalLatency = 0;
  76 + start = now;
55 } 77 }
  78 +
  79 + // Don't go too fast, to get a better idea of the latency.
  80 + std::this_thread::sleep_for(std::chrono::microseconds(10));
56 } 81 }
57 82
58 // This application never ends, but for good measure, a disconnect. 83 // This application never ends, but for good measure, a disconnect.