Commit c21560d97a4a7df9dc0e1135c0082ff49984e6bf

Authored by Hayk Martirosyan
1 parent 78585079

Removing timer_callbacks global, ev_timer allocation

Found out that ev_timer watchers have a void* data that allows
currying information to the libev callbacks! That means I could
stick the Command* in there and completely get rid of the global
timer_callbacks map that was needed before.

Additionally, we can move ev_timer into the Command object instead
of separately allocating it on the heap. Simplifies management.

Implemented a synchronous loop speed test based on the latest
and greatest command_blocking(). Happy to see it is around
30k/s, which is 100% network delays.
CMakeLists.txt
@@ -42,3 +42,6 @@ set(LIB_ALL ${LIB_REDIS}) @@ -42,3 +42,6 @@ set(LIB_ALL ${LIB_REDIS})
42 42
43 add_executable(simple_loop examples/simple_loop.cpp ${SRC_ALL}) 43 add_executable(simple_loop examples/simple_loop.cpp ${SRC_ALL})
44 target_link_libraries(simple_loop ${LIB_REDIS}) 44 target_link_libraries(simple_loop ${LIB_REDIS})
  45 +
  46 +add_executable(simple_sync_loop examples/simple_sync_loop.cpp ${SRC_ALL})
  47 +target_link_libraries(simple_sync_loop ${LIB_REDIS})
examples/simple_loop.cpp
@@ -17,10 +17,12 @@ int main(int argc, char* argv[]) { @@ -17,10 +17,12 @@ int main(int argc, char* argv[]) {
17 17
18 Redis rdx = {"localhost", 6379}; 18 Redis rdx = {"localhost", 6379};
19 rdx.run(); 19 rdx.run();
20 -  
21 - Command<int>* del_cmd = rdx.command_blocking<int>("DEL simple_loop:count");  
22 - cout << "deleted key, reply: " << del_cmd->reply() << endl;  
23 - del_cmd->free(); 20 +//
  21 +// Command<int>* del_cmd = rdx.command_blocking<int>("DEL simple_loop:count");
  22 +// cout << "deleted key, reply: " << del_cmd->reply() << endl;
  23 +// del_cmd->free();
  24 + if(rdx.command_blocking("DEL simple_loop:count")) cout << "Deleted simple_loop:count" << endl;
  25 + else cerr << "Failed to delete simple_loop:count" << endl;
24 26
25 Command<char*>* set_cmd = rdx.command_blocking<char*>("SET simple_loop:count 0"); 27 Command<char*>* set_cmd = rdx.command_blocking<char*>("SET simple_loop:count 0");
26 cout << "set key, reply: " << set_cmd->reply() << endl; 28 cout << "set key, reply: " << set_cmd->reply() << endl;
examples/simple_sync_loop.cpp 0 → 100644
  1 +/**
  2 +* Basic asynchronous calls using redisx.
  3 +*/
  4 +
  5 +#include <iostream>
  6 +#include "../src/redisx.hpp"
  7 +
  8 +using namespace std;
  9 +using namespace redisx;
  10 +
  11 +double time_s() {
  12 + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1);
  13 + return (double)ms / 1e6;
  14 +}
  15 +
  16 +int main(int argc, char* argv[]) {
  17 +
  18 + Redis rdx = {"localhost", 6379};
  19 + rdx.run();
  20 +
  21 + if(rdx.command_blocking("DEL simple_loop:count")) cout << "Deleted simple_loop:count" << endl;
  22 + else cerr << "Failed to delete simple_loop:count" << endl;
  23 +
  24 + string cmd_str = "INCR simple_loop:count";
  25 +
  26 + int count = 50000;
  27 + double t0 = time_s();
  28 +
  29 + cout << "Running \"" << cmd_str << "\" " << count << " times." << endl;
  30 +
  31 + for(int i = 0; i < count; i++) {
  32 + Command<int>* c = rdx.command_blocking<int>(cmd_str);
  33 + if(c->status() != REDIS_OK) cerr << "Bad reply, code: " << c->status() << endl;
  34 + }
  35 +
  36 + cout << "At the end, simple_loop:count = "
  37 + << rdx.command_blocking<string>("GET simple_loop:count")->reply() << endl;
  38 +
  39 + rdx.stop();
  40 +
  41 + double t_elapsed = time_s() - t0;
  42 + double actual_freq = (double)count / t_elapsed;
  43 +
  44 + cout << "Sent " << count << " commands in " << t_elapsed << "s, "
  45 + << "that's " << actual_freq << " commands/s." << endl;
  46 +
  47 + return 0;
  48 +}
src/command.hpp
@@ -74,12 +74,12 @@ private: @@ -74,12 +74,12 @@ private:
74 74
75 std::atomic_bool completed; 75 std::atomic_bool completed;
76 76
77 - ev_timer* timer; 77 + ev_timer timer;
78 std::mutex timer_guard; 78 std::mutex timer_guard;
79 79
80 ev_timer* get_timer() { 80 ev_timer* get_timer() {
81 std::lock_guard<std::mutex> lg(timer_guard); 81 std::lock_guard<std::mutex> lg(timer_guard);
82 - return timer; 82 + return &timer;
83 } 83 }
84 }; 84 };
85 85
src/redisx.cpp
@@ -11,10 +11,8 @@ using namespace std; @@ -11,10 +11,8 @@ using namespace std;
11 11
12 namespace redisx { 12 namespace redisx {
13 13
14 -// Default construct the static map  
15 -std::unordered_map<ev_timer*, void*> Redis::timer_callbacks;  
16 -  
17 // Global mutex to manage waiting for connected state 14 // Global mutex to manage waiting for connected state
  15 +// TODO get rid of this as the only global variable?
18 mutex connected_lock; 16 mutex connected_lock;
19 17
20 /** 18 /**
@@ -119,12 +117,14 @@ bool submit_to_server(Command&lt;ReplyT&gt;* cmd_obj) { @@ -119,12 +117,14 @@ bool submit_to_server(Command&lt;ReplyT&gt;* cmd_obj) {
119 117
120 template<class ReplyT> 118 template<class ReplyT>
121 void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { 119 void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) {
122 - auto cmd_obj = (Command<ReplyT>*)Redis::timer_callbacks.at(timer);  
123 - if(cmd_obj == NULL) { 120 +
  121 + // Check if canceled
  122 + if(timer->data == NULL) {
124 cerr << "[WARNING] Skipping event, has been canceled." << endl; 123 cerr << "[WARNING] Skipping event, has been canceled." << endl;
125 - Redis::timer_callbacks.erase(timer);  
126 return; 124 return;
127 } 125 }
  126 +
  127 + auto cmd_obj = (Command<ReplyT>*)timer->data;
128 submit_to_server<ReplyT>(cmd_obj); 128 submit_to_server<ReplyT>(cmd_obj);
129 } 129 }
130 130
@@ -141,14 +141,11 @@ bool Redis::process_queued_command(void* cmd_ptr) { @@ -141,14 +141,11 @@ bool Redis::process_queued_command(void* cmd_ptr) {
141 if((cmd_obj->repeat == 0) && (cmd_obj->after == 0)) { 141 if((cmd_obj->repeat == 0) && (cmd_obj->after == 0)) {
142 submit_to_server<ReplyT>(cmd_obj); 142 submit_to_server<ReplyT>(cmd_obj);
143 } else { 143 } else {
144 - // TODO manage memory somehow  
145 - cmd_obj->timer = new ev_timer();  
146 144
147 - // TODO use cmd_obj->timer->data instead of timer callbacks!!!!! 145 + cmd_obj->timer.data = (void*)cmd_obj;
148 146
149 - timer_callbacks[cmd_obj->timer] = (void*)cmd_obj;  
150 - ev_timer_init(cmd_obj->timer, submit_command_callback<ReplyT>, cmd_obj->after, cmd_obj->repeat);  
151 - ev_timer_start(EV_DEFAULT_ cmd_obj->timer); 147 + ev_timer_init(&cmd_obj->timer, submit_command_callback<ReplyT>, cmd_obj->after, cmd_obj->repeat);
  148 + ev_timer_start(EV_DEFAULT_ &cmd_obj->timer);
152 149
153 cmd_obj->timer_guard.unlock(); 150 cmd_obj->timer_guard.unlock();
154 } 151 }
@@ -242,8 +239,11 @@ void Redis::command(const string&amp; cmd) { @@ -242,8 +239,11 @@ void Redis::command(const string&amp; cmd) {
242 command<redisReply*>(cmd, NULL); 239 command<redisReply*>(cmd, NULL);
243 } 240 }
244 241
245 -void Redis::command_blocking(const string& cmd) {  
246 - command_blocking<redisReply*>(cmd); 242 +bool Redis::command_blocking(const string& cmd) {
  243 + Command<redisReply*>* c = command_blocking<redisReply*>(cmd);
  244 + bool succeeded = (c->status() == REDISX_OK);
  245 + c->free();
  246 + return succeeded;
247 } 247 }
248 248
249 } // End namespace redis 249 } // End namespace redis
src/redisx.hpp
@@ -49,12 +49,11 @@ public: @@ -49,12 +49,11 @@ public:
49 template<class ReplyT> 49 template<class ReplyT>
50 bool cancel(Command<ReplyT>* cmd_obj); 50 bool cancel(Command<ReplyT>* cmd_obj);
51 51
52 - void command(const std::string& command);  
53 -  
54 template<class ReplyT> 52 template<class ReplyT>
55 Command<ReplyT>* command_blocking(const std::string& cmd); 53 Command<ReplyT>* command_blocking(const std::string& cmd);
56 54
57 - void command_blocking(const std::string& command); 55 + void command(const std::string& command);
  56 + bool command_blocking(const std::string& command);
58 57
59 long num_commands_processed(); 58 long num_commands_processed();
60 59
@@ -62,10 +61,6 @@ public: @@ -62,10 +61,6 @@ public:
62 // void subscribe(std::string channel, std::function<void(std::string channel, std::string msg)> callback); 61 // void subscribe(std::string channel, std::function<void(std::string channel, std::string msg)> callback);
63 // void unsubscribe(std::string channel); 62 // void unsubscribe(std::string channel);
64 63
65 - // Map of ev_timer events to pointers to Command objects  
66 - // Used to get the object back from the timer watcher callback  
67 - static std::unordered_map<ev_timer*, void*> timer_callbacks;  
68 -  
69 private: 64 private:
70 65
71 // Redis server 66 // Redis server
@@ -154,13 +149,12 @@ bool Redis::cancel(Command&lt;ReplyT&gt;* cmd_obj) { @@ -154,13 +149,12 @@ bool Redis::cancel(Command&lt;ReplyT&gt;* cmd_obj) {
154 return false; 149 return false;
155 } 150 }
156 151
157 - timer_callbacks.at(cmd_obj->timer) = NULL; 152 + cmd_obj->timer.data = NULL;
158 153
159 std::lock_guard<std::mutex> lg(cmd_obj->timer_guard); 154 std::lock_guard<std::mutex> lg(cmd_obj->timer_guard);
160 if((cmd_obj->repeat != 0) || (cmd_obj->after != 0)) 155 if((cmd_obj->repeat != 0) || (cmd_obj->after != 0))
161 - ev_timer_stop(EV_DEFAULT_ cmd_obj->timer); 156 + ev_timer_stop(EV_DEFAULT_ &cmd_obj->timer);
162 157
163 - delete cmd_obj->timer;  
164 cmd_obj->completed = true; 158 cmd_obj->completed = true;
165 159
166 return true; 160 return true;
@@ -179,21 +173,22 @@ Command&lt;ReplyT&gt;* Redis::command_blocking(const std::string&amp; cmd) { @@ -179,21 +173,22 @@ Command&lt;ReplyT&gt;* Redis::command_blocking(const std::string&amp; cmd) {
179 173
180 Command<ReplyT>* cmd_obj = command<ReplyT>(cmd, 174 Command<ReplyT>* cmd_obj = command<ReplyT>(cmd,
181 [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) { 175 [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) {
182 - std::unique_lock<std::mutex> lk(m); 176 + std::unique_lock<std::mutex> ul(m);
183 val = reply; 177 val = reply;
184 status = REDISX_OK; 178 status = REDISX_OK;
185 - lk.unlock(); 179 + ul.unlock();
186 cv.notify_one(); 180 cv.notify_one();
187 }, 181 },
188 [&status, &m, &cv](const std::string& cmd_str, int error) { 182 [&status, &m, &cv](const std::string& cmd_str, int error) {
189 - std::unique_lock<std::mutex> lk(m); 183 + std::unique_lock<std::mutex> ul(m);
190 status = error; 184 status = error;
191 - lk.unlock(); 185 + ul.unlock();
192 cv.notify_one(); 186 cv.notify_one();
193 }, 187 },
194 0, 0, false // No repeats, don't free memory 188 0, 0, false // No repeats, don't free memory
195 ); 189 );
196 190
  191 + // Wait until a callback is invoked
197 cv.wait(lk, [&status] { return status != REDISX_UNINIT; }); 192 cv.wait(lk, [&status] { return status != REDISX_UNINIT; });
198 193
199 cmd_obj->reply_val = val; 194 cmd_obj->reply_val = val;