Commit 2f207093573a7257c44ffdbfbe70f3e596de254c

Authored by Hayk Martirosyan
1 parent c75df219

Fixed two major issues, the event loop and memory management

The event loop (now libev) now runs in a separate detached thread,
which is abstracted away from the user, who only calls a nonblocking
.start() method. This thread loops continously, alternating telling
Redis about asynchronous commands ready to send, and running one
iteration of the event loop, where all pending events are taken
care of. This greatly simplifies the user code.

Additionally, some clever tricker is implemented now to handle
memory management well with the templated command types. The
difficulty comes from the fact that we pass redis a void pointer
only, and must retreive everything from that (so, we can't use
shared_ptr for the whole thing). Thus, we use maps of void pointers
to their templated data structure pointers, where the memory address
of the pointer serves as the key.
CMakeLists.txt
1 1 cmake_minimum_required(VERSION 2.8.4)
2 2 project(redisx)
3 3  
4   -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall")
  4 +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer")
5 5 # set(CMAKE_VERBOSE_MAKEFILE ON)
6 6  
7 7 # ---------------------------------------------------------
... ... @@ -20,7 +20,7 @@ set(SRC_ALL ${SRC_CORE})
20 20 # Libraries
21 21 # ---------------------------------------------------------
22 22  
23   -set(LIB_REDIS hiredis event)
  23 +set(LIB_REDIS hiredis ev pthread)
24 24 set(LIB_ALL ${LIB_REDIS})
25 25  
26 26 # ---------------------------------------------------------
... ...
examples/basic.cpp
... ... @@ -19,6 +19,7 @@ unsigned long time_ms() {
19 19 int main(int argc, char* argv[]) {
20 20  
21 21 redisx::Redis r = {REDIS_HOST, REDIS_PORT};
  22 + r.start();
22 23  
23 24 r.command<const string&>("GET blah", [](const string& cmd, const string& value) {
24 25 cout << "[COMMAND] " << cmd << ": " << value << endl;
... ... @@ -28,48 +29,50 @@ int main(int argc, char* argv[]) {
28 29 cout << "[COMMAND] " << cmd << ": " << value << endl;
29 30 });
30 31  
31   - r.command<const redisReply*>("LPUSH yahoo 1 2 3 4 f w", [](const string& cmd, const redisReply* reply) {
32   - cout << "[COMMAND] " << cmd << ": " << reply->integer << endl;
33   - });
  32 +// r.command<const redisReply*>("LPUSH yahoo 1 2 3 4 f w", [](const string& cmd, const redisReply* reply) {
  33 +// cout << "[COMMAND] " << cmd << ": " << reply->integer << endl;
  34 +// });
  35 +
34 36  
35 37 r.get("blahqwefwqefef", [](const string& cmd, const char* value) {
36 38 cout << "[GET] blah: " << value << endl;
37 39 });
  40 +//
  41 +// r.set("name", "lolfewef");
  42 +//
  43 +// r.command("SET blah wefoijewfojiwef");
  44 +//
  45 +// r.del("name");
  46 +// r.del("wefoipjweojiqw", [](const string& cmd, long long int num_deleted) {
  47 +// cout << "num deleted: " << num_deleted << endl;
  48 +// });
38 49  
39   - r.set("name", "lolfewef");
  50 +// r.command_loop("LPUSH count 1", 0, 1000);
40 51  
41   - r.command("SET blah wefoijewfojiwef");
  52 + unsigned long t0 = time_ms();
  53 + unsigned long t1 = t0;
42 54  
43   - r.del("name");
44   - r.del("wefoipjweojiqw", [](const string& cmd, long long int num_deleted) {
45   - cout << "num deleted: " << num_deleted << endl;
46   - });
  55 + int len = 1000000;
  56 + int count = 0;
47 57  
48   - r.command_loop("LPUSH count 1", 0, 1);
49   - r.command_loop("LPUSH count2 1", 0, 1);
  58 + for(int i = 0; i < len; i++) {
  59 + r.command<int>("lpush test 1", [&t0, &t1, &count, len](const string& cmd, int reply) {
50 60  
51   -// unsigned long t0 = time_ms();
52   -// unsigned long t1 = t0;
53   -//
54   -// int len = 1000000;
55   -// int count = 0;
56   -//
57   -// for(int i = 0; i < len; i++) {
58   -// r.command<const string&>("set blah wefoiwef", [&t0, &t1, &count, len](const string& cmd, const string& reply) {
59   -//
60   -// count++;
61   -// if(count == len) {
62   -// cout << cmd << ": " << reply << endl;
63   -// cout << "Time to queue async commands: " << t1 - t0 << "ms" << endl;
64   -// cout << "Time to receive all: " << time_ms() - t1 << "ms" << endl;
65   -// cout << "Total time: " << time_ms() - t0 << "ms" << endl;
66   -// }
67   -// });
68   -// }
69   -// t1 = time_ms();
70   -
71   - thread loop([&r] { r.start(); });
72   - loop.join();
  61 + count++;
  62 + if(count == len) {
  63 + cout << cmd << ": " << reply << endl;
  64 +
  65 + unsigned long t2 = time_ms();
  66 + cout << "Time to queue async commands: " << t1 - t0 << "ms" << endl;
  67 + cout << "Time to receive all: " << t2 - t1 << "ms" << endl;
  68 + cout << "Total time: " << t2 - t0 << "ms" << endl;
  69 + }
  70 + });
  71 + }
  72 + t1 = time_ms();
  73 +
  74 + //r.start();
  75 + while(true) {}
73 76  
74 77 return 0;
75 78 };
... ...
src/redisx.cpp
... ... @@ -5,7 +5,9 @@
5 5 #include <signal.h>
6 6 #include <iostream>
7 7 #include <thread>
8   -#include <hiredis/adapters/libevent.h>
  8 +#include <hiredis/adapters/libev.h>
  9 +#include <ev.h>
  10 +#include <event2/thread.h>
9 11 #include <vector>
10 12 #include <string.h>
11 13 #include "redisx.hpp"
... ... @@ -14,12 +16,15 @@ using namespace std;
14 16  
15 17 namespace redisx {
16 18  
  19 +mutex connected_lock;
  20 +
17 21 void connected(const redisAsyncContext *c, int status) {
18 22 if (status != REDIS_OK) {
19 23 printf("Error: %s\n", c->errstr);
20 24 return;
21 25 }
22 26 printf("Connected...\n");
  27 + connected_lock.unlock();
23 28 }
24 29  
25 30 void disconnected(const redisAsyncContext *c, int status) {
... ... @@ -28,12 +33,20 @@ void disconnected(const redisAsyncContext *c, int status) {
28 33 return;
29 34 }
30 35 printf("Disconnected...\n");
  36 + connected_lock.lock();
31 37 }
32 38  
33 39 Redis::Redis(const string& host, const int port) : host(host), port(port), io_ops(0) {
34 40  
  41 +// evthread_use_pthreads();
  42 +// evthread_enable_lock_debuging();
  43 +// event_enable_debug_mode();
  44 +
  45 + lock_guard<mutex> lg(evlock);
  46 + connected_lock.lock();
  47 +
35 48 signal(SIGPIPE, SIG_IGN);
36   - base = event_base_new();
  49 +// base = event_base_new();
37 50  
38 51 c = redisAsyncConnect(host.c_str(), port);
39 52 if (c->err) {
... ... @@ -41,7 +54,8 @@ Redis::Redis(const string&amp; host, const int port) : host(host), port(port), io_op
41 54 return;
42 55 }
43 56  
44   - redisLibeventAttach(c, base);
  57 + redisLibevAttach(EV_DEFAULT_ c);
  58 +// redisLibeventAttach(c, base);
45 59 redisAsyncSetConnectCallback(c, connected);
46 60 redisAsyncSetDisconnectCallback(c, disconnected);
47 61 }
... ... @@ -51,57 +65,78 @@ Redis::~Redis() {
51 65 }
52 66  
53 67 void Redis::start() {
54   - event_base_dispatch(base);
  68 +
  69 + event_loop_thread = thread([this] {
  70 + ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
  71 + connected_lock.lock();
  72 +
  73 + while (true) {
  74 + process_queued_commands();
  75 + ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
  76 + }
  77 + });
  78 + event_loop_thread.detach();
55 79 }
56 80  
  81 +template<class ReplyT>
  82 +bool Redis::process_queued_command(void* cmd_ptr) {
57 83  
58   -// ----------------------------
  84 + auto& command_map = get_command_map<ReplyT>();
59 85  
60   -void Redis::command(const char* cmd) {
61   - int status = redisAsyncCommand(c, NULL, NULL, cmd);
62   - if (status != REDIS_OK) {
63   - cerr << "[ERROR] Async command \"" << cmd << "\": " << c->errstr << endl;
64   - return;
  86 + auto it = command_map.find(cmd_ptr);
  87 + if(it == command_map.end()) return false;
  88 + CommandAsync<ReplyT>* cmd_obj = it->second;
  89 + command_map.erase(cmd_ptr);
  90 +
  91 + if (redisAsyncCommand(c, command_callback<ReplyT>, cmd_ptr, cmd_obj->cmd.c_str()) != REDIS_OK) {
  92 + cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << c->errstr << endl;
  93 + delete cmd_obj;
65 94 }
  95 +
  96 + return true;
66 97 }
67 98  
68   -// ----------------------------
  99 +void Redis::process_queued_commands() {
  100 + lock_guard<mutex> lg(evlock);
  101 +
  102 + while(!command_queue.empty()) {
69 103  
70   -void e_callback(evutil_socket_t fd, short what, void *arg) {
  104 + void* cmd_ptr = command_queue.front();
  105 + if(process_queued_command<const redisReply*>(cmd_ptr)) {}
  106 + else if(process_queued_command<const string&>(cmd_ptr)) {}
  107 + else if(process_queued_command<const char*>(cmd_ptr)) {}
  108 + else if(process_queued_command<int>(cmd_ptr)) {}
  109 + else if(process_queued_command<long long int>(cmd_ptr)) {}
  110 + else throw runtime_error("[FATAL] Command pointer not found in any queue!");
71 111  
72   - const char* cmd = "LPUSH count 1";
  112 + command_queue.pop();
  113 + }
  114 +}
73 115  
74   - redisAsyncContext* c = (redisAsyncContext*)arg;
  116 +// ----------------------------
  117 +
  118 +// TODO update
  119 +void Redis::command(const char* cmd) {
75 120  
  121 + evlock.lock();
76 122 int status = redisAsyncCommand(c, NULL, NULL, cmd);
  123 + evlock.unlock();
  124 +
77 125 if (status != REDIS_OK) {
78 126 cerr << "[ERROR] Async command \"" << cmd << "\": " << c->errstr << endl;
79 127 return;
80 128 }
81 129 }
82 130  
83   -struct event* Redis::command_loop(const char* cmd, long interval_s, long interval_us) {
84   -
85   - struct event* e;
86   - if(interval_s == 0 && interval_us == 0) {
87   - e = event_new(base, -1, EV_PERSIST, e_callback, c);
88   - event_add(e, NULL);
89   - } else {
90   - struct timeval e_time = {interval_s, interval_us};
91   - e = event_new(base, -1, EV_TIMEOUT | EV_PERSIST, e_callback, c);
92   - event_add(e, &e_time);
93   - }
94   -
95   - return e;
96   -}
97   -
98 131 // ----------------------------
99 132  
  133 +template<> unordered_map<void*, CommandAsync<const redisReply*>*>& Redis::get_command_map() { return commands_redis_reply; }
100 134 template<>
101 135 void invoke_callback(const CommandAsync<const redisReply*>* cmd_obj, redisReply* reply) {
102 136 cmd_obj->invoke(reply);
103 137 }
104 138  
  139 +template<> unordered_map<void*, CommandAsync<const string&>*>& Redis::get_command_map() { return commands_string_r; }
105 140 template<>
106 141 void invoke_callback(const CommandAsync<const string&>* cmd_obj, redisReply* reply) {
107 142 if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) {
... ... @@ -112,6 +147,7 @@ void invoke_callback(const CommandAsync&lt;const string&amp;&gt;* cmd_obj, redisReply* rep
112 147 cmd_obj->invoke(reply->str);
113 148 }
114 149  
  150 +template<> unordered_map<void*, CommandAsync<const char*>*>& Redis::get_command_map() { return commands_char_p; }
115 151 template<>
116 152 void invoke_callback(const CommandAsync<const char*>* cmd_obj, redisReply* reply) {
117 153 if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) {
... ... @@ -121,6 +157,7 @@ void invoke_callback(const CommandAsync&lt;const char*&gt;* cmd_obj, redisReply* reply
121 157 cmd_obj->invoke(reply->str);
122 158 }
123 159  
  160 +template<> unordered_map<void*, CommandAsync<int>*>& Redis::get_command_map() { return commands_int; }
124 161 template<>
125 162 void invoke_callback(const CommandAsync<int>* cmd_obj, redisReply* reply) {
126 163 if(reply->type != REDIS_REPLY_INTEGER) {
... ... @@ -130,6 +167,7 @@ void invoke_callback(const CommandAsync&lt;int&gt;* cmd_obj, redisReply* reply) {
130 167 cmd_obj->invoke((int)reply->integer);
131 168 }
132 169  
  170 +template<> unordered_map<void*, CommandAsync<long long int>*>& Redis::get_command_map() { return commands_long_long_int; }
133 171 template<>
134 172 void invoke_callback(const CommandAsync<long long int>* cmd_obj, redisReply* reply) {
135 173 if(reply->type != REDIS_REPLY_INTEGER) {
... ... @@ -154,7 +192,7 @@ void Redis::set(const char* key, const char* value) {
154 192 });
155 193 }
156 194  
157   -void Redis::set(const char* key, const char* value, std::function<void(const string&, const char*)> callback) {
  195 +void Redis::set(const char* key, const char* value, function<void(const string&, const char*)> callback) {
158 196 string cmd = string("SET ") + key + " " + value;
159 197 command<const char*>(cmd.c_str(), callback);
160 198 }
... ... @@ -167,7 +205,7 @@ void Redis::del(const char* key) {
167 205 });
168 206 }
169 207  
170   -void Redis::del(const char* key, std::function<void(const string&, long long int)> callback) {
  208 +void Redis::del(const char* key, function<void(const string&, long long int)> callback) {
171 209 string cmd = string("DEL ") + key;
172 210 command<long long int>(cmd.c_str(), callback);
173 211 }
... ...
src/redisx.hpp
... ... @@ -9,9 +9,32 @@
9 9 #include <iostream>
10 10 #include <hiredis/hiredis.h>
11 11 #include <hiredis/async.h>
  12 +#include <mutex>
  13 +#include <queue>
  14 +#include <set>
  15 +#include <unordered_map>
12 16  
13 17 namespace redisx {
14 18  
  19 +class CommandAsyncGeneric {
  20 +public:
  21 + redisCallbackFn* fn;
  22 + void* privdata;
  23 + const char* cmd;
  24 + CommandAsyncGeneric(redisCallbackFn* fn, void* privdata, const char* cmd)
  25 + : fn(fn), privdata(privdata), cmd(cmd) {}
  26 +};
  27 +
  28 +template<class ReplyT>
  29 +class CommandAsync {
  30 +public:
  31 + CommandAsync(const std::string& cmd, const std::function<void(const std::string&, ReplyT)>& callback)
  32 + : cmd(cmd), callback(callback) {}
  33 + const std::string cmd;
  34 + const std::function<void(const std::string&, ReplyT)> callback;
  35 + void invoke(ReplyT reply) const {if(callback != NULL) callback(cmd, reply); }
  36 +};
  37 +
15 38 class Redis {
16 39  
17 40 public:
... ... @@ -29,7 +52,7 @@ public:
29 52  
30 53 void command(const char* command);
31 54  
32   - struct event* command_loop(const char* command, long interval_s, long interval_us);
  55 +// struct event* command_loop(const char* command, long interval_s, long interval_us);
33 56  
34 57 void get(const char* key, std::function<void(const std::string&, const char*)> callback);
35 58  
... ... @@ -54,18 +77,29 @@ private:
54 77  
55 78 struct event_base *base;
56 79 redisAsyncContext *c;
57   -};
58 80  
59   -template<class ReplyT>
60   -class CommandAsync {
61   -public:
62   - CommandAsync(const std::string& cmd, const std::function<void(const std::string&, ReplyT)>& callback)
63   - : cmd(cmd), callback(callback) {}
64   - const std::string cmd;
65   - const std::function<void(const std::string&, ReplyT)> callback;
66   - void invoke(ReplyT reply) const {if(callback != NULL) callback(cmd, reply); }
  81 + std::mutex evlock;
  82 +
  83 + std::thread event_loop_thread;
  84 +
  85 + template<class ReplyT>
  86 + std::unordered_map<void*, CommandAsync<ReplyT>*>& get_command_map();
  87 +
  88 + std::unordered_map<void*, CommandAsync<const redisReply*>*> commands_redis_reply;
  89 + std::unordered_map<void*, CommandAsync<const std::string&>*> commands_string_r;
  90 + std::unordered_map<void*, CommandAsync<const char*>*> commands_char_p;
  91 + std::unordered_map<void*, CommandAsync<int>*> commands_int;
  92 + std::unordered_map<void*, CommandAsync<long long int>*> commands_long_long_int;
  93 +
  94 + std::queue<void*> command_queue;
  95 + void process_queued_commands();
  96 +
  97 + template<class ReplyT>
  98 + bool process_queued_command(void* cmd_ptr);
67 99 };
68 100  
  101 +// ---------------------------
  102 +
69 103 template<class ReplyT>
70 104 void invoke_callback(
71 105 const CommandAsync<ReplyT>* cmd_obj,
... ... @@ -97,15 +131,10 @@ void command_callback(redisAsyncContext *c, void *r, void *privdata) {
97 131 template<class ReplyT>
98 132 void Redis::command(const std::string& cmd, const std::function<void(const std::string&, ReplyT)>& callback) {
99 133  
100   - auto *cmd_obj = new CommandAsync<ReplyT>(cmd, callback);
101   -
102   - int status = redisAsyncCommand(c, command_callback<ReplyT>, (void*)cmd_obj, cmd.c_str());
103   - if (status != REDIS_OK) {
104   - std::cerr << "[ERROR] Async command \"" << cmd << "\": " << c->errstr << std::endl;
105   - delete cmd_obj;
106   - return;
107   - }
  134 + std::lock_guard<std::mutex> lg(evlock);
  135 + auto* cmd_obj = new CommandAsync<ReplyT>(cmd, callback);
  136 + get_command_map<ReplyT>()[(void*)cmd_obj] = cmd_obj;
  137 + command_queue.push((void*)cmd_obj);
108 138 }
109 139  
110   -
111 140 } // End namespace redis
... ...