Commit df429ec2f7c57ef986e05d26be87b46578446448

Authored by Hayk Martirosyan
1 parent be5fad76

Rename CommandAsync->Command, cancel functionality

Renamed CommandAsync to Command to make things simpler, and
refactored into its own file.

command() now returns a pointer to the created Command object,
which the client can pass into cancel() to stop any delayed
or repeating calls. This is very important for an asynchronous
client.
README.md
@@ -2,3 +2,6 @@ redisx @@ -2,3 +2,6 @@ redisx
2 ====== 2 ======
3 3
4 Asynchronous, minimalistic, and highly effective C++11 bindings for Redis 4 Asynchronous, minimalistic, and highly effective C++11 bindings for Redis
  5 +Asynchronous, minimalistic, and wicked fast C++11 bindings for Redis
  6 +
  7 +Work in progress, details coming soon.
examples/simple_loop.cpp
@@ -8,8 +8,8 @@ @@ -8,8 +8,8 @@
8 using namespace std; 8 using namespace std;
9 9
10 double time_s() { 10 double time_s() {
11 - unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::milliseconds(1);  
12 - return (double)ms / 1000; 11 + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1);
  12 + return (double)ms / 1e6;
13 } 13 }
14 14
15 int main(int argc, char* argv[]) { 15 int main(int argc, char* argv[]) {
@@ -20,28 +20,31 @@ int main(int argc, char* argv[]) { @@ -20,28 +20,31 @@ int main(int argc, char* argv[]) {
20 string cmd_str = "SET alaska rules!"; 20 string cmd_str = "SET alaska rules!";
21 21
22 double freq = 10000; // Hz 22 double freq = 10000; // Hz
23 - double t_end = 5;  
24 -  
25 - double dt = 1 / freq;  
26 - double t0 = time_s();  
27 - int count = 0; 23 + double dt = 1 / freq; // s
  24 + double t = 5; // s
28 25
29 cout << "Running \"" << cmd_str << "\" at dt = " << dt 26 cout << "Running \"" << cmd_str << "\" at dt = " << dt
30 - << "s for " << t_end << "s..." << endl; 27 + << "s for " << t << "s..." << endl;
31 28
32 - rdx.command<const string &>( 29 + int count = 0;
  30 + redisx::Command<const string&>* c = rdx.command<const string&>(
33 cmd_str, 31 cmd_str,
34 - [&count, &rdx, t0, t_end](const string &cmd, const string &value) { 32 + [&count](const string &cmd, const string &value) {
35 count++; 33 count++;
36 - if(time_s() - t0 >= t_end) rdx.stop();  
37 }, 34 },
38 dt, 35 dt,
39 dt 36 dt
40 ); 37 );
41 38
42 - rdx.block_until_stopped();  
43 - double actual_freq = (double)count / t_end;  
44 - cout << "Sent " << count << " commands in " << t_end<< "s, " 39 + double t0 = time_s();
  40 + this_thread::sleep_for(chrono::microseconds((int)(t*1e6)));
  41 + rdx.cancel<const string&>(c);
  42 + rdx.stop();
  43 +
  44 + double t_elapsed = time_s() - t0;
  45 + double actual_freq = (double)count / t_elapsed;
  46 +
  47 + cout << "Sent " << count << " commands in " << t_elapsed << "s, "
45 << "that's " << actual_freq << " commands/s." << endl; 48 << "that's " << actual_freq << " commands/s." << endl;
46 49
47 return 0; 50 return 0;
src/command.hpp 0 → 100644
  1 +/**
  2 +* Redis C++11 wrapper.
  3 +*/
  4 +
  5 +#pragma once
  6 +
  7 +#include <iostream>
  8 +#include <string>
  9 +#include <functional>
  10 +
  11 +#include <hiredis/adapters/libev.h>
  12 +#include <hiredis/async.h>
  13 +
  14 +namespace redisx {
  15 +
  16 +template<class ReplyT>
  17 +class Command {
  18 +public:
  19 + Command(
  20 + redisAsyncContext* c,
  21 + const std::string& cmd,
  22 + const std::function<void(const std::string&, ReplyT)>& callback,
  23 + double repeat, double after
  24 + );
  25 +
  26 + redisAsyncContext* c;
  27 + const std::string cmd;
  28 + const std::function<void(const std::string&, ReplyT)> callback;
  29 + double repeat;
  30 + double after;
  31 + bool done;
  32 + ev_timer* timer;
  33 + std::mutex timer_guard;
  34 +
  35 + void invoke(ReplyT reply);
  36 +
  37 + void free_if_done();
  38 + ev_timer* get_timer() {
  39 + std::lock_guard<std::mutex> lg(timer_guard);
  40 + return timer;
  41 + }
  42 +};
  43 +
  44 +template<class ReplyT>
  45 +Command<ReplyT>::Command(
  46 + redisAsyncContext* c,
  47 + const std::string& cmd,
  48 + const std::function<void(const std::string&, ReplyT)>& callback,
  49 + double repeat, double after
  50 +) : c(c), cmd(cmd), callback(callback), repeat(repeat), after(after), done(false) {
  51 + timer_guard.lock();
  52 +}
  53 +
  54 +template<class ReplyT>
  55 +void Command<ReplyT>::invoke(ReplyT reply) {
  56 + if(callback != NULL) callback(cmd, reply);
  57 + if((repeat == 0)) done = true;
  58 +}
  59 +
  60 +template<class ReplyT>
  61 +void Command<ReplyT>::free_if_done() {
  62 + if(done) {
  63 + std::cout << "Deleting Command: " << cmd << std::endl;
  64 + delete this;
  65 + };
  66 +}
  67 +} // End namespace redis
src/redisx.cpp
@@ -13,7 +13,7 @@ namespace redisx { @@ -13,7 +13,7 @@ namespace redisx {
13 // Global mutex to manage waiting for connected state 13 // Global mutex to manage waiting for connected state
14 mutex connected_lock; 14 mutex connected_lock;
15 15
16 -// Map of ev_timer events to pointers to CommandAsync objects 16 +// Map of ev_timer events to pointers to Command objects
17 // Used to get the object back from the timer watcher callback 17 // Used to get the object back from the timer watcher callback
18 unordered_map<ev_timer*, void*> timer_callbacks; 18 unordered_map<ev_timer*, void*> timer_callbacks;
19 19
@@ -99,7 +99,7 @@ void Redis::block_until_stopped() { @@ -99,7 +99,7 @@ void Redis::block_until_stopped() {
99 * true if succeeded, false otherwise. 99 * true if succeeded, false otherwise.
100 */ 100 */
101 template<class ReplyT> 101 template<class ReplyT>
102 -bool submit_to_server(CommandAsync<ReplyT>* cmd_obj) { 102 +bool submit_to_server(Command<ReplyT>* cmd_obj) {
103 if (redisAsyncCommand(cmd_obj->c, command_callback<ReplyT>, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { 103 if (redisAsyncCommand(cmd_obj->c, command_callback<ReplyT>, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) {
104 cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl; 104 cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl;
105 cmd_obj->free_if_done(); 105 cmd_obj->free_if_done();
@@ -110,7 +110,7 @@ bool submit_to_server(CommandAsync&lt;ReplyT&gt;* cmd_obj) { @@ -110,7 +110,7 @@ bool submit_to_server(CommandAsync&lt;ReplyT&gt;* cmd_obj) {
110 110
111 template<class ReplyT> 111 template<class ReplyT>
112 void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { 112 void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) {
113 - auto cmd_obj = (CommandAsync<ReplyT>*)timer_callbacks.at(timer); 113 + auto cmd_obj = (Command<ReplyT>*)timer_callbacks.at(timer);
114 submit_to_server<ReplyT>(cmd_obj); 114 submit_to_server<ReplyT>(cmd_obj);
115 } 115 }
116 116
@@ -121,7 +121,7 @@ bool Redis::process_queued_command(void* cmd_ptr) { @@ -121,7 +121,7 @@ bool Redis::process_queued_command(void* cmd_ptr) {
121 121
122 auto it = command_map.find(cmd_ptr); 122 auto it = command_map.find(cmd_ptr);
123 if(it == command_map.end()) return false; 123 if(it == command_map.end()) return false;
124 - CommandAsync<ReplyT>* cmd_obj = it->second; 124 + Command<ReplyT>* cmd_obj = it->second;
125 command_map.erase(cmd_ptr); 125 command_map.erase(cmd_ptr);
126 126
127 if((cmd_obj->repeat == 0) && (cmd_obj->after == 0)) { 127 if((cmd_obj->repeat == 0) && (cmd_obj->after == 0)) {
@@ -131,11 +131,14 @@ bool Redis::process_queued_command(void* cmd_ptr) { @@ -131,11 +131,14 @@ bool Redis::process_queued_command(void* cmd_ptr) {
131 timer_callbacks[cmd_obj->timer] = (void*)cmd_obj; 131 timer_callbacks[cmd_obj->timer] = (void*)cmd_obj;
132 ev_timer_init(cmd_obj->timer, submit_command_callback<ReplyT>, cmd_obj->after, cmd_obj->repeat); 132 ev_timer_init(cmd_obj->timer, submit_command_callback<ReplyT>, cmd_obj->after, cmd_obj->repeat);
133 ev_timer_start(EV_DEFAULT_ cmd_obj->timer); 133 ev_timer_start(EV_DEFAULT_ cmd_obj->timer);
  134 +
  135 + cmd_obj->timer_guard.unlock();
134 } 136 }
135 137
136 return true; 138 return true;
137 } 139 }
138 140
  141 +
139 void Redis::process_queued_commands() { 142 void Redis::process_queued_commands() {
140 143
141 lock_guard<mutex> lg(queue_guard); 144 lock_guard<mutex> lg(queue_guard);
@@ -162,15 +165,15 @@ long Redis::num_commands_processed() { @@ -162,15 +165,15 @@ long Redis::num_commands_processed() {
162 165
163 // ---------------------------- 166 // ----------------------------
164 167
165 -template<> unordered_map<void*, CommandAsync<const redisReply*>*>& Redis::get_command_map() { return commands_redis_reply; } 168 +template<> unordered_map<void*, Command<const redisReply*>*>& Redis::get_command_map() { return commands_redis_reply; }
166 template<> 169 template<>
167 -void invoke_callback(CommandAsync<const redisReply*>* cmd_obj, redisReply* reply) { 170 +void invoke_callback(Command<const redisReply*>* cmd_obj, redisReply* reply) {
168 cmd_obj->invoke(reply); 171 cmd_obj->invoke(reply);
169 } 172 }
170 173
171 -template<> unordered_map<void*, CommandAsync<const string&>*>& Redis::get_command_map() { return commands_string_r; } 174 +template<> unordered_map<void*, Command<const string&>*>& Redis::get_command_map() { return commands_string_r; }
172 template<> 175 template<>
173 -void invoke_callback(CommandAsync<const string&>* cmd_obj, redisReply* reply) { 176 +void invoke_callback(Command<const string&>* cmd_obj, redisReply* reply) {
174 if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { 177 if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) {
175 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl; 178 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl;
176 return; 179 return;
@@ -179,9 +182,9 @@ void invoke_callback(CommandAsync&lt;const string&amp;&gt;* cmd_obj, redisReply* reply) { @@ -179,9 +182,9 @@ void invoke_callback(CommandAsync&lt;const string&amp;&gt;* cmd_obj, redisReply* reply) {
179 cmd_obj->invoke(reply->str); 182 cmd_obj->invoke(reply->str);
180 } 183 }
181 184
182 -template<> unordered_map<void*, CommandAsync<const char*>*>& Redis::get_command_map() { return commands_char_p; } 185 +template<> unordered_map<void*, Command<const char*>*>& Redis::get_command_map() { return commands_char_p; }
183 template<> 186 template<>
184 -void invoke_callback(CommandAsync<const char*>* cmd_obj, redisReply* reply) { 187 +void invoke_callback(Command<const char*>* cmd_obj, redisReply* reply) {
185 if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { 188 if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) {
186 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl; 189 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl;
187 return; 190 return;
@@ -189,9 +192,9 @@ void invoke_callback(CommandAsync&lt;const char*&gt;* cmd_obj, redisReply* reply) { @@ -189,9 +192,9 @@ void invoke_callback(CommandAsync&lt;const char*&gt;* cmd_obj, redisReply* reply) {
189 cmd_obj->invoke(reply->str); 192 cmd_obj->invoke(reply->str);
190 } 193 }
191 194
192 -template<> unordered_map<void*, CommandAsync<int>*>& Redis::get_command_map() { return commands_int; } 195 +template<> unordered_map<void*, Command<int>*>& Redis::get_command_map() { return commands_int; }
193 template<> 196 template<>
194 -void invoke_callback(CommandAsync<int>* cmd_obj, redisReply* reply) { 197 +void invoke_callback(Command<int>* cmd_obj, redisReply* reply) {
195 if(reply->type != REDIS_REPLY_INTEGER) { 198 if(reply->type != REDIS_REPLY_INTEGER) {
196 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl; 199 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl;
197 return; 200 return;
@@ -199,9 +202,9 @@ void invoke_callback(CommandAsync&lt;int&gt;* cmd_obj, redisReply* reply) { @@ -199,9 +202,9 @@ void invoke_callback(CommandAsync&lt;int&gt;* cmd_obj, redisReply* reply) {
199 cmd_obj->invoke((int)reply->integer); 202 cmd_obj->invoke((int)reply->integer);
200 } 203 }
201 204
202 -template<> unordered_map<void*, CommandAsync<long long int>*>& Redis::get_command_map() { return commands_long_long_int; } 205 +template<> unordered_map<void*, Command<long long int>*>& Redis::get_command_map() { return commands_long_long_int; }
203 template<> 206 template<>
204 -void invoke_callback(CommandAsync<long long int>* cmd_obj, redisReply* reply) { 207 +void invoke_callback(Command<long long int>* cmd_obj, redisReply* reply) {
205 if(reply->type != REDIS_REPLY_INTEGER) { 208 if(reply->type != REDIS_REPLY_INTEGER) {
206 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl; 209 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl;
207 return; 210 return;
src/redisx.hpp
@@ -20,37 +20,9 @@ @@ -20,37 +20,9 @@
20 #include <hiredis/async.h> 20 #include <hiredis/async.h>
21 #include <hiredis/adapters/libev.h> 21 #include <hiredis/adapters/libev.h>
22 22
23 -namespace redisx { 23 +#include "command.hpp"
24 24
25 -template<class ReplyT>  
26 -class CommandAsync {  
27 -public:  
28 - CommandAsync(  
29 - redisAsyncContext* c,  
30 - const std::string& cmd,  
31 - const std::function<void(const std::string&, ReplyT)>& callback,  
32 - double repeat, double after  
33 - ) : c(c), cmd(cmd), callback(callback), repeat(repeat), after(after), done(false) {}  
34 -  
35 - redisAsyncContext* c;  
36 - const std::string cmd;  
37 - const std::function<void(const std::string&, ReplyT)> callback;  
38 - double repeat;  
39 - double after;  
40 - bool done;  
41 - ev_timer* timer;  
42 -  
43 - void invoke(ReplyT reply) {  
44 - if(callback != NULL) callback(cmd, reply);  
45 - if((repeat == 0)) done = true;  
46 - }  
47 - void free_if_done() {  
48 - if(done) {  
49 - std::cout << "Deleting CommandAsync: " << cmd << std::endl;  
50 - delete this;  
51 - };  
52 - }  
53 -}; 25 +namespace redisx {
54 26
55 class Redis { 27 class Redis {
56 28
@@ -65,7 +37,7 @@ public: @@ -65,7 +37,7 @@ public:
65 void block_until_stopped(); 37 void block_until_stopped();
66 38
67 template<class ReplyT> 39 template<class ReplyT>
68 - void command( 40 + Command<ReplyT>* command(
69 const std::string& cmd, 41 const std::string& cmd,
70 const std::function<void(const std::string&, ReplyT)>& callback = NULL, 42 const std::function<void(const std::string&, ReplyT)>& callback = NULL,
71 double repeat = 0.0, 43 double repeat = 0.0,
@@ -74,6 +46,9 @@ public: @@ -74,6 +46,9 @@ public:
74 46
75 void command(const char* command); 47 void command(const char* command);
76 48
  49 + template<class ReplyT>
  50 + bool cancel(Command<ReplyT>* cmd_obj);
  51 +
77 long num_commands_processed(); 52 long num_commands_processed();
78 53
79 // void get(const char* key, std::function<void(const std::string&, const char*)> callback); 54 // void get(const char* key, std::function<void(const std::string&, const char*)> callback);
@@ -105,14 +80,14 @@ private: @@ -105,14 +80,14 @@ private:
105 80
106 std::thread event_loop_thread; 81 std::thread event_loop_thread;
107 82
108 - std::unordered_map<void*, CommandAsync<const redisReply*>*> commands_redis_reply;  
109 - std::unordered_map<void*, CommandAsync<const std::string&>*> commands_string_r;  
110 - std::unordered_map<void*, CommandAsync<const char*>*> commands_char_p;  
111 - std::unordered_map<void*, CommandAsync<int>*> commands_int;  
112 - std::unordered_map<void*, CommandAsync<long long int>*> commands_long_long_int; 83 + std::unordered_map<void*, Command<const redisReply*>*> commands_redis_reply;
  84 + std::unordered_map<void*, Command<const std::string&>*> commands_string_r;
  85 + std::unordered_map<void*, Command<const char*>*> commands_char_p;
  86 + std::unordered_map<void*, Command<int>*> commands_int;
  87 + std::unordered_map<void*, Command<long long int>*> commands_long_long_int;
113 88
114 template<class ReplyT> 89 template<class ReplyT>
115 - std::unordered_map<void*, CommandAsync<ReplyT>*>& get_command_map(); 90 + std::unordered_map<void*, Command<ReplyT>*>& get_command_map();
116 91
117 std::queue<void*> command_queue; 92 std::queue<void*> command_queue;
118 std::mutex queue_guard; 93 std::mutex queue_guard;
@@ -126,7 +101,7 @@ private: @@ -126,7 +101,7 @@ private:
126 101
127 template<class ReplyT> 102 template<class ReplyT>
128 void invoke_callback( 103 void invoke_callback(
129 - CommandAsync<ReplyT>* cmd_obj, 104 + Command<ReplyT>* cmd_obj,
130 redisReply* reply 105 redisReply* reply
131 ); 106 );
132 107
@@ -134,7 +109,7 @@ template&lt;class ReplyT&gt; @@ -134,7 +109,7 @@ template&lt;class ReplyT&gt;
134 void command_callback(redisAsyncContext *c, void *r, void *privdata) { 109 void command_callback(redisAsyncContext *c, void *r, void *privdata) {
135 110
136 redisReply *reply = (redisReply *) r; 111 redisReply *reply = (redisReply *) r;
137 - auto *cmd_obj = (CommandAsync<ReplyT> *) privdata; 112 + auto *cmd_obj = (Command<ReplyT> *) privdata;
138 113
139 if (reply->type == REDIS_REPLY_ERROR) { 114 if (reply->type == REDIS_REPLY_ERROR) {
140 std::cerr << "[ERROR] " << cmd_obj->cmd << ": " << reply->str << std::endl; 115 std::cerr << "[ERROR] " << cmd_obj->cmd << ": " << reply->str << std::endl;
@@ -153,7 +128,7 @@ void command_callback(redisAsyncContext *c, void *r, void *privdata) { @@ -153,7 +128,7 @@ void command_callback(redisAsyncContext *c, void *r, void *privdata) {
153 } 128 }
154 129
155 template<class ReplyT> 130 template<class ReplyT>
156 -void Redis::command( 131 +Command<ReplyT>* Redis::command(
157 const std::string& cmd, 132 const std::string& cmd,
158 const std::function<void(const std::string&, ReplyT)>& callback, 133 const std::function<void(const std::string&, ReplyT)>& callback,
159 double repeat, 134 double repeat,
@@ -161,9 +136,21 @@ void Redis::command( @@ -161,9 +136,21 @@ void Redis::command(
161 ) { 136 ) {
162 137
163 std::lock_guard<std::mutex> lg(queue_guard); 138 std::lock_guard<std::mutex> lg(queue_guard);
164 - auto* cmd_obj = new CommandAsync<ReplyT>(c, cmd, callback, repeat, after); 139 + auto* cmd_obj = new Command<ReplyT>(c, cmd, callback, repeat, after);
165 get_command_map<ReplyT>()[(void*)cmd_obj] = cmd_obj; 140 get_command_map<ReplyT>()[(void*)cmd_obj] = cmd_obj;
166 command_queue.push((void*)cmd_obj); 141 command_queue.push((void*)cmd_obj);
  142 + return cmd_obj;
  143 +}
  144 +
  145 +template<class ReplyT>
  146 +bool Redis::cancel(Command<ReplyT>* cmd_obj) {
  147 +
  148 + // TODO erase from global timer_callbacks
  149 +
  150 + if((cmd_obj->repeat != 0) || (cmd_obj->after != 0))
  151 + ev_timer_stop(EV_DEFAULT_ cmd_obj->get_timer());
  152 +
  153 + return true;
167 } 154 }
168 155
169 } // End namespace redis 156 } // End namespace redis