Commit be5fad765d2dc312acf46f2c10dfe30c66733b2e

Authored by Hayk Martirosyan
1 parent 2833f2e8

Working loop command implementation

Uses libev timer watchers to queue commands on a regular
interval. Very fast. Still need to handle deactivation of
timers from the user side.
CMakeLists.txt
1 1 cmake_minimum_required(VERSION 2.8.4)
2 2 project(redisx)
3 3  
4   -#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3")
5   -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer")
  4 +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3")
  5 +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer")
6 6 # set(CMAKE_VERBOSE_MAKEFILE ON)
7 7  
8 8 # ---------------------------------------------------------
... ... @@ -34,8 +34,11 @@ set(LIB_ALL ${LIB_REDIS})
34 34 #add_executable(progressive examples/progressive.cpp ${SRC_ALL})
35 35 #target_link_libraries(progressive ${LIB_REDIS})
36 36  
37   -add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL})
38   -target_link_libraries(basic_threaded ${LIB_REDIS})
  37 +#add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL})
  38 +#target_link_libraries(basic_threaded ${LIB_REDIS})
39 39  
40   -add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL})
41   -target_link_libraries(lpush_benchmark ${LIB_REDIS})
  40 +#add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL})
  41 +#target_link_libraries(lpush_benchmark ${LIB_REDIS})
  42 +
  43 +add_executable(simple_loop examples/simple_loop.cpp ${SRC_ALL})
  44 +target_link_libraries(simple_loop ${LIB_REDIS})
... ...
examples/simple_loop.cpp 0 → 100644
  1 +/**
  2 +* Basic asynchronous calls using redisx.
  3 +*/
  4 +
  5 +#include <iostream>
  6 +#include "../src/redisx.hpp"
  7 +
  8 +using namespace std;
  9 +
  10 +double time_s() {
  11 + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::milliseconds(1);
  12 + return (double)ms / 1000;
  13 +}
  14 +
  15 +int main(int argc, char* argv[]) {
  16 +
  17 + redisx::Redis rdx = {"localhost", 6379};
  18 + rdx.run();
  19 +
  20 + string cmd_str = "SET alaska rules!";
  21 +
  22 + double freq = 10000; // Hz
  23 + double t_end = 5;
  24 +
  25 + double dt = 1 / freq;
  26 + double t0 = time_s();
  27 + int count = 0;
  28 +
  29 + cout << "Running \"" << cmd_str << "\" at dt = " << dt
  30 + << "s for " << t_end << "s..." << endl;
  31 +
  32 + rdx.command<const string &>(
  33 + cmd_str,
  34 + [&count, &rdx, t0, t_end](const string &cmd, const string &value) {
  35 + count++;
  36 + if(time_s() - t0 >= t_end) rdx.stop();
  37 + },
  38 + dt,
  39 + dt
  40 + );
  41 +
  42 + rdx.block_until_stopped();
  43 + double actual_freq = (double)count / t_end;
  44 + cout << "Sent " << count << " commands in " << t_end<< "s, "
  45 + << "that's " << actual_freq << " commands/s." << endl;
  46 +
  47 + return 0;
  48 +}
... ...
src/redisx.cpp
... ... @@ -3,20 +3,20 @@
3 3 */
4 4  
5 5 #include <signal.h>
6   -#include <iostream>
7   -#include <thread>
8   -#include <hiredis/adapters/libev.h>
9   -#include <ev.h>
10   -#include <event2/thread.h>
11   -#include <vector>
  6 +//#include <event2/thread.h>
12 7 #include "redisx.hpp"
13 8  
14 9 using namespace std;
15 10  
16 11 namespace redisx {
17 12  
  13 +// Global mutex to manage waiting for connected state
18 14 mutex connected_lock;
19 15  
  16 +// Map of ev_timer events to pointers to CommandAsync objects
  17 +// Used to get the object back from the timer watcher callback
  18 +unordered_map<ev_timer*, void*> timer_callbacks;
  19 +
20 20 void connected(const redisAsyncContext *c, int status) {
21 21 if (status != REDIS_OK) {
22 22 cerr << "[ERROR] Connecting to Redis: " << c->errstr << endl;
... ... @@ -94,17 +94,27 @@ void Redis::block_until_stopped() {
94 94 exit_waiter.wait(ul, [this]() { return to_exit.load(); });
95 95 }
96 96  
  97 +/**
  98 +* Submit an asynchronous command to the Redis server. Return
  99 +* true if succeeded, false otherwise.
  100 +*/
97 101 template<class ReplyT>
98   -bool Redis::submit_to_server(const CommandAsync<ReplyT>* cmd_obj) {
99   - if (redisAsyncCommand(c, command_callback<ReplyT>, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) {
100   - cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << c->errstr << endl;
101   - delete cmd_obj;
  102 +bool submit_to_server(CommandAsync<ReplyT>* cmd_obj) {
  103 + if (redisAsyncCommand(cmd_obj->c, command_callback<ReplyT>, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) {
  104 + cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl;
  105 + cmd_obj->free_if_done();
102 106 return false;
103 107 }
104 108 return true;
105 109 }
106 110  
107 111 template<class ReplyT>
  112 +void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) {
  113 + auto cmd_obj = (CommandAsync<ReplyT>*)timer_callbacks.at(timer);
  114 + submit_to_server<ReplyT>(cmd_obj);
  115 +}
  116 +
  117 +template<class ReplyT>
108 118 bool Redis::process_queued_command(void* cmd_ptr) {
109 119  
110 120 auto& command_map = get_command_map<ReplyT>();
... ... @@ -114,7 +124,14 @@ bool Redis::process_queued_command(void* cmd_ptr) {
114 124 CommandAsync<ReplyT>* cmd_obj = it->second;
115 125 command_map.erase(cmd_ptr);
116 126  
117   - submit_to_server<ReplyT>(cmd_obj);
  127 + if((cmd_obj->repeat == 0) && (cmd_obj->after == 0)) {
  128 + submit_to_server<ReplyT>(cmd_obj);
  129 + } else {
  130 + cmd_obj->timer = new ev_timer();
  131 + timer_callbacks[cmd_obj->timer] = (void*)cmd_obj;
  132 + ev_timer_init(cmd_obj->timer, submit_command_callback<ReplyT>, cmd_obj->after, cmd_obj->repeat);
  133 + ev_timer_start(EV_DEFAULT_ cmd_obj->timer);
  134 + }
118 135  
119 136 return true;
120 137 }
... ... @@ -147,13 +164,13 @@ long Redis::num_commands_processed() {
147 164  
148 165 template<> unordered_map<void*, CommandAsync<const redisReply*>*>& Redis::get_command_map() { return commands_redis_reply; }
149 166 template<>
150   -void invoke_callback(const CommandAsync<const redisReply*>* cmd_obj, redisReply* reply) {
  167 +void invoke_callback(CommandAsync<const redisReply*>* cmd_obj, redisReply* reply) {
151 168 cmd_obj->invoke(reply);
152 169 }
153 170  
154 171 template<> unordered_map<void*, CommandAsync<const string&>*>& Redis::get_command_map() { return commands_string_r; }
155 172 template<>
156   -void invoke_callback(const CommandAsync<const string&>* cmd_obj, redisReply* reply) {
  173 +void invoke_callback(CommandAsync<const string&>* cmd_obj, redisReply* reply) {
157 174 if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) {
158 175 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl;
159 176 return;
... ... @@ -164,7 +181,7 @@ void invoke_callback(const CommandAsync&lt;const string&amp;&gt;* cmd_obj, redisReply* rep
164 181  
165 182 template<> unordered_map<void*, CommandAsync<const char*>*>& Redis::get_command_map() { return commands_char_p; }
166 183 template<>
167   -void invoke_callback(const CommandAsync<const char*>* cmd_obj, redisReply* reply) {
  184 +void invoke_callback(CommandAsync<const char*>* cmd_obj, redisReply* reply) {
168 185 if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) {
169 186 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl;
170 187 return;
... ... @@ -174,7 +191,7 @@ void invoke_callback(const CommandAsync&lt;const char*&gt;* cmd_obj, redisReply* reply
174 191  
175 192 template<> unordered_map<void*, CommandAsync<int>*>& Redis::get_command_map() { return commands_int; }
176 193 template<>
177   -void invoke_callback(const CommandAsync<int>* cmd_obj, redisReply* reply) {
  194 +void invoke_callback(CommandAsync<int>* cmd_obj, redisReply* reply) {
178 195 if(reply->type != REDIS_REPLY_INTEGER) {
179 196 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl;
180 197 return;
... ... @@ -184,7 +201,7 @@ void invoke_callback(const CommandAsync&lt;int&gt;* cmd_obj, redisReply* reply) {
184 201  
185 202 template<> unordered_map<void*, CommandAsync<long long int>*>& Redis::get_command_map() { return commands_long_long_int; }
186 203 template<>
187   -void invoke_callback(const CommandAsync<long long int>* cmd_obj, redisReply* reply) {
  204 +void invoke_callback(CommandAsync<long long int>* cmd_obj, redisReply* reply) {
188 205 if(reply->type != REDIS_REPLY_INTEGER) {
189 206 cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl;
190 207 return;
... ...
src/redisx.hpp
... ... @@ -18,7 +18,7 @@
18 18  
19 19 #include <hiredis/hiredis.h>
20 20 #include <hiredis/async.h>
21   -#include <condition_variable>
  21 +#include <hiredis/adapters/libev.h>
22 22  
23 23 namespace redisx {
24 24  
... ... @@ -26,17 +26,30 @@ template&lt;class ReplyT&gt;
26 26 class CommandAsync {
27 27 public:
28 28 CommandAsync(
  29 + redisAsyncContext* c,
29 30 const std::string& cmd,
30 31 const std::function<void(const std::string&, ReplyT)>& callback,
31 32 double repeat, double after
32   - ) : cmd(cmd), callback(callback), repeat(repeat), after(after) {}
  33 + ) : c(c), cmd(cmd), callback(callback), repeat(repeat), after(after), done(false) {}
33 34  
  35 + redisAsyncContext* c;
34 36 const std::string cmd;
35 37 const std::function<void(const std::string&, ReplyT)> callback;
36 38 double repeat;
37 39 double after;
  40 + bool done;
  41 + ev_timer* timer;
38 42  
39   - void invoke(ReplyT reply) const {if(callback != NULL) callback(cmd, reply); }
  43 + void invoke(ReplyT reply) {
  44 + if(callback != NULL) callback(cmd, reply);
  45 + if((repeat == 0)) done = true;
  46 + }
  47 + void free_if_done() {
  48 + if(done) {
  49 + std::cout << "Deleting CommandAsync: " << cmd << std::endl;
  50 + delete this;
  51 + };
  52 + }
40 53 };
41 54  
42 55 class Redis {
... ... @@ -63,8 +76,6 @@ public:
63 76  
64 77 long num_commands_processed();
65 78  
66   -// struct event* command_loop(const char* command, long interval_s, long interval_us);
67   -
68 79 // void get(const char* key, std::function<void(const std::string&, const char*)> callback);
69 80 //
70 81 // void set(const char* key, const char* value);
... ... @@ -109,20 +120,13 @@ private:
109 120  
110 121 template<class ReplyT>
111 122 bool process_queued_command(void* cmd_ptr);
112   -
113   - /**
114   - * Submit an asynchronous command to the Redis server. Return
115   - * true if succeeded, false otherwise.
116   - */
117   - template<class ReplyT>
118   - bool submit_to_server(const CommandAsync<ReplyT>* cmd_obj);
119 123 };
120 124  
121 125 // ---------------------------
122 126  
123 127 template<class ReplyT>
124 128 void invoke_callback(
125   - const CommandAsync<ReplyT>* cmd_obj,
  129 + CommandAsync<ReplyT>* cmd_obj,
126 130 redisReply* reply
127 131 );
128 132  
... ... @@ -134,18 +138,18 @@ void command_callback(redisAsyncContext *c, void *r, void *privdata) {
134 138  
135 139 if (reply->type == REDIS_REPLY_ERROR) {
136 140 std::cerr << "[ERROR] " << cmd_obj->cmd << ": " << reply->str << std::endl;
137   - delete cmd_obj;
  141 + cmd_obj->free_if_done();
138 142 return;
139 143 }
140 144  
141 145 if(reply->type == REDIS_REPLY_NIL) {
142 146 std::cerr << "[WARNING] " << cmd_obj->cmd << ": Nil reply." << std::endl;
143   - delete cmd_obj;
  147 + cmd_obj->free_if_done();
144 148 return; // cmd_obj->invoke(NULL);
145 149 }
146 150  
147 151 invoke_callback<ReplyT>(cmd_obj, reply);
148   - delete cmd_obj;
  152 + cmd_obj->free_if_done();
149 153 }
150 154  
151 155 template<class ReplyT>
... ... @@ -157,10 +161,9 @@ void Redis::command(
157 161 ) {
158 162  
159 163 std::lock_guard<std::mutex> lg(queue_guard);
160   - auto* cmd_obj = new CommandAsync<ReplyT>(cmd, callback, repeat, after);
  164 + auto* cmd_obj = new CommandAsync<ReplyT>(c, cmd, callback, repeat, after);
161 165 get_command_map<ReplyT>()[(void*)cmd_obj] = cmd_obj;
162 166 command_queue.push((void*)cmd_obj);
163 167 }
164 168  
165   -
166 169 } // End namespace redis
... ...