Commit d71f2aabae2c701d728d5e3e5d94f061979864d1

Authored by Hayk Martirosyan
1 parent b4971936

Enhance jitter test example

Better timing statistics, better printing, adding pub sub test modes.
examples/jitter_test.cpp
@@ -3,42 +3,73 @@ @@ -3,42 +3,73 @@
3 */ 3 */
4 4
5 #include <iostream> 5 #include <iostream>
  6 +#include <iomanip>
6 #include <string.h> 7 #include <string.h>
7 #include "redox.hpp" 8 #include "redox.hpp"
8 9
9 using namespace std; 10 using namespace std;
10 using redox::Redox; 11 using redox::Redox;
11 using redox::Command; 12 using redox::Command;
  13 +using redox::Subscriber;
12 14
13 double time_s() { 15 double time_s() {
14 unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); 16 unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1);
15 return (double)ms / 1e6; 17 return (double)ms / 1e6;
16 } 18 }
17 19
  20 +/**
  21 +* Prints time statistics on the received reply.
  22 +*
  23 +* t: Time since the program start.
  24 +* dt_callback: Time since the last reply was received.
  25 +* dt_msg: Time of the new message minus time of the last message.
  26 +* age_of_data: Time of message received minus time of message set.
  27 +*/
  28 +void print_time(double t, double dt_callback, double dt_msg, double age_of_data) {
  29 + cout << "t: " << t * 1000
  30 + << std::setiosflags(std::ios::fixed)
  31 + << std::setprecision(3)
  32 + << " | dt callback: " << dt_callback * 1000
  33 + << " | dt msg: " << dt_msg * 1000
  34 + << " | age of data: " << age_of_data * 1000 << endl;
  35 +}
  36 +
18 int main(int argc, char* argv[]) { 37 int main(int argc, char* argv[]) {
19 38
20 - string usage_string = "Usage: " + string(argv[0]) + " --(set-async|get-async|set-sync|get-sync) [freq]"; 39 + string usage_string = "Usage: " + string(argv[0])
  40 + + " --(set-async|get-async|set-sync|get-sync|get-pubsub|set-pubsub) [freq]";
  41 +
21 if(argc != 3) { 42 if(argc != 3) {
22 cerr << usage_string<< endl; 43 cerr << usage_string<< endl;
23 return 1; 44 return 1;
24 } 45 }
25 46
  47 + bool nowait = true;
  48 + std::string host = "localhost";
  49 + int port = 6379;
  50 +
26 Redox rdx; 51 Redox rdx;
27 - if(!rdx.connect("localhost", 6379)) return 1; 52 + if(nowait) rdx.noWait(true);
  53 +
  54 + Subscriber rdx_sub;
  55 + if(nowait) rdx_sub.noWait(true);
28 56
29 double freq = stod(argv[2]); // Hz 57 double freq = stod(argv[2]); // Hz
30 double dt = 1 / freq; // s 58 double dt = 1 / freq; // s
31 int iter = 1000000; 59 int iter = 1000000;
32 atomic_int count(0); 60 atomic_int count(0);
33 61
34 - double t = time_s(); 62 + double t0 = time_s();
  63 + double t = t0;
35 double t_new = t; 64 double t_new = t;
36 65
37 - double t_reply = t;  
38 - double t_reply_new = t; 66 + double t_last_reply = t;
  67 + double t_this_reply = t;
39 68
40 if(!strcmp(argv[1], "--get-async")) { 69 if(!strcmp(argv[1], "--get-async")) {
41 70
  71 + if(!rdx.connect(host, port)) return 1;
  72 +
42 while(count < iter) { 73 while(count < iter) {
43 rdx.command<string>({"GET", "jitter_test:time"}, 74 rdx.command<string>({"GET", "jitter_test:time"},
44 [&](Command<string>& c) { 75 [&](Command<string>& c) {
@@ -46,11 +77,15 @@ int main(int argc, char* argv[]) { @@ -46,11 +77,15 @@ int main(int argc, char* argv[]) {
46 cerr << "Bad reply: " << c.status() << endl; 77 cerr << "Bad reply: " << c.status() << endl;
47 } else { 78 } else {
48 t_new = time_s(); 79 t_new = time_s();
49 - t_reply_new = stod(c.reply());  
50 - cout << "dt real: " << (t_new - t) * 1000  
51 - << ", dt msg: " << (t_reply_new - t_reply) * 1000 << endl; 80 + t_this_reply = stod(c.reply());
  81 + print_time(
  82 + t_new - t0,
  83 + t_new - t,
  84 + t_this_reply - t_last_reply,
  85 + t_new - t_this_reply
  86 + );
52 t = t_new; 87 t = t_new;
53 - t_reply = t_reply_new; 88 + t_last_reply = t_this_reply;
54 } 89 }
55 count++; 90 count++;
56 if (count == iter) rdx.stop(); 91 if (count == iter) rdx.stop();
@@ -62,17 +97,23 @@ int main(int argc, char* argv[]) { @@ -62,17 +97,23 @@ int main(int argc, char* argv[]) {
62 97
63 } else if(!strcmp(argv[1], "--get-async-loop")) { 98 } else if(!strcmp(argv[1], "--get-async-loop")) {
64 99
  100 + if(!rdx.connect(host, port)) return 1;
  101 +
65 rdx.commandLoop<string>({"GET", "jitter_test:time"}, 102 rdx.commandLoop<string>({"GET", "jitter_test:time"},
66 [&](Command<string>& c) { 103 [&](Command<string>& c) {
67 if (!c.ok()) { 104 if (!c.ok()) {
68 cerr << "Bad reply: " << c.status() << endl; 105 cerr << "Bad reply: " << c.status() << endl;
69 } else { 106 } else {
70 t_new = time_s(); 107 t_new = time_s();
71 - t_reply_new = stod(c.reply());  
72 - cout << "dt real: " << (t_new - t) * 1000  
73 - << ", dt msg: " << (t_reply_new - t_reply) * 1000 << endl; 108 + t_this_reply = stod(c.reply());
  109 + print_time(
  110 + t_new - t0,
  111 + t_new - t,
  112 + t_this_reply - t_last_reply,
  113 + t_new - t_this_reply
  114 + );
74 t = t_new; 115 t = t_new;
75 - t_reply = t_reply_new; 116 + t_last_reply = t_this_reply;
76 } 117 }
77 count++; 118 count++;
78 if (count == iter) rdx.stop(); 119 if (count == iter) rdx.stop();
@@ -82,6 +123,8 @@ int main(int argc, char* argv[]) { @@ -82,6 +123,8 @@ int main(int argc, char* argv[]) {
82 123
83 } else if(!strcmp(argv[1], "--set-async")) { 124 } else if(!strcmp(argv[1], "--set-async")) {
84 125
  126 + if(!rdx.connect(host, port)) return 1;
  127 +
85 while (count < iter) { 128 while (count < iter) {
86 rdx.command<string>({"SET", "jitter_test:time", to_string(time_s())}, 129 rdx.command<string>({"SET", "jitter_test:time", to_string(time_s())},
87 [&](Command<string>& c) { 130 [&](Command<string>& c) {
@@ -97,17 +140,23 @@ int main(int argc, char* argv[]) { @@ -97,17 +140,23 @@ int main(int argc, char* argv[]) {
97 140
98 } else if(!strcmp(argv[1], "--get-sync")) { 141 } else if(!strcmp(argv[1], "--get-sync")) {
99 142
  143 + if(!rdx.connect(host, port)) return 1;
  144 +
100 while(count < iter) { 145 while(count < iter) {
101 Command<string>& c = rdx.commandSync<string>({"GET", "jitter_test:time"}); 146 Command<string>& c = rdx.commandSync<string>({"GET", "jitter_test:time"});
102 if(!c.ok()) { 147 if(!c.ok()) {
103 cerr << "Error setting value: " << c.status() << endl; 148 cerr << "Error setting value: " << c.status() << endl;
104 } else { 149 } else {
105 t_new = time_s(); 150 t_new = time_s();
106 - t_reply_new = stod(c.reply());  
107 - cout << "dt real: " << (t_new - t) * 1000  
108 - << ", dt msg: " << (t_reply_new - t_reply) * 1000 << endl; 151 + t_this_reply = stod(c.reply());
  152 + print_time(
  153 + t_new - t0,
  154 + t_new - t,
  155 + t_this_reply - t_last_reply,
  156 + t_new - t_this_reply
  157 + );
109 t = t_new; 158 t = t_new;
110 - t_reply = t_reply_new; 159 + t_last_reply = t_this_reply;
111 } 160 }
112 count++; 161 count++;
113 if(count == iter) rdx.stop(); 162 if(count == iter) rdx.stop();
@@ -117,6 +166,8 @@ int main(int argc, char* argv[]) { @@ -117,6 +166,8 @@ int main(int argc, char* argv[]) {
117 166
118 } else if(!strcmp(argv[1], "--set-sync")) { 167 } else if(!strcmp(argv[1], "--set-sync")) {
119 168
  169 + if(!rdx.connect(host, port)) return 1;
  170 +
120 while(count < iter){ 171 while(count < iter){
121 Command<string>& c = rdx.commandSync<string>({"SET", "jitter_test:time", to_string(time_s())}); 172 Command<string>& c = rdx.commandSync<string>({"SET", "jitter_test:time", to_string(time_s())});
122 if(!c.ok()) { 173 if(!c.ok()) {
@@ -127,11 +178,56 @@ int main(int argc, char* argv[]) { @@ -127,11 +178,56 @@ int main(int argc, char* argv[]) {
127 c.free(); 178 c.free();
128 this_thread::sleep_for(chrono::microseconds((int)((dt) * 1e6))); 179 this_thread::sleep_for(chrono::microseconds((int)((dt) * 1e6)));
129 } 180 }
  181 +
  182 + } else if(!strcmp(argv[1], "--get-pubsub")) {
  183 +
  184 + if(!rdx_sub.connect(host, port)) return 1;
  185 +
  186 + auto got_message = [&](const string& topic, const string& msg) {
  187 +
  188 + t_new = time_s();
  189 + t_this_reply = stod(msg);
  190 + print_time(
  191 + t_new - t0,
  192 + t_new - t,
  193 + t_this_reply - t_last_reply,
  194 + t_new - t_this_reply
  195 + );
  196 + t = t_new;
  197 + t_last_reply = t_this_reply;
  198 +
  199 + count++;
  200 + if (count == iter) rdx.stop();
  201 + };
  202 +
  203 + rdx_sub.subscribe("jitter_test:time", got_message);
  204 +
  205 + } else if(!strcmp(argv[1], "--set-pubsub")) {
  206 +
  207 + if(!rdx.connect(host, port)) return 1;
  208 +
  209 + while (count < iter) {
  210 + double t1 = time_s();
  211 + rdx.command<int>({"PUBLISH", "jitter_test:time", to_string(time_s())},
  212 + [&](Command<int>& c) {
  213 + if (!c.ok()) {
  214 + cerr << "Error setting value: " << c.status() << endl;
  215 + }
  216 + count++;
  217 + if (count == iter) rdx.stop();
  218 + }
  219 + );
  220 + double wait = dt - (time_s() - t1);
  221 + this_thread::sleep_for(chrono::microseconds((int) (wait * 1e6)));
  222 + }
  223 +
130 } else { 224 } else {
131 cerr << usage_string << endl; 225 cerr << usage_string << endl;
132 return 1; 226 return 1;
133 } 227 }
134 228
135 rdx.wait(); 229 rdx.wait();
  230 + rdx_sub.wait();
  231 +
136 return 0; 232 return 0;
137 }; 233 };
examples/speed_test_async.cpp
@@ -21,7 +21,7 @@ int main(int argc, char* argv[]) { @@ -21,7 +21,7 @@ int main(int argc, char* argv[]) {
21 Redox rdx; 21 Redox rdx;
22 rdx.noWait(true); 22 rdx.noWait(true);
23 23
24 - if(!rdx.connectUnix("/var/run/redis/redis.sock")) return 1; 24 + if(!rdx.connect()) return 1;
25 25
26 if(rdx.set("simple_loop:count", "0")) { 26 if(rdx.set("simple_loop:count", "0")) {
27 cout << "Reset the counter to zero." << endl; 27 cout << "Reset the counter to zero." << endl;