Commit 87ff8a3ebb58630143841b2927cd28beca4d1b3f

Authored by Hayk Martirosyan
1 parent f6e82b58

Improved structure, stop handling, cleanups

Improved examples, added a threaded getter/setter example
and cleaned up the other. Generally refactored code in the
core files and cleaned up the details to make the slickest
possible API. Implemented a to_exit variable as a stop
condition for the event thread.
CMakeLists.txt
1 1 cmake_minimum_required(VERSION 2.8.4)
2 2 project(redisx)
3 3  
4   -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3")#-g -fno-omit-frame-pointer")
  4 +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3")
  5 +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer")
5 6 # set(CMAKE_VERBOSE_MAKEFILE ON)
6 7  
7 8 # ---------------------------------------------------------
... ... @@ -27,9 +28,14 @@ set(LIB_ALL ${LIB_REDIS})
27 28 # Examples
28 29 # ---------------------------------------------------------
29 30  
30   -add_executable(basic examples/basic.cpp ${SRC_ALL})
31   -target_link_libraries(basic ${LIB_REDIS})
  31 +#add_executable(basic examples/basic.cpp ${SRC_ALL})
  32 +#target_link_libraries(basic ${LIB_REDIS})
32 33  
  34 +#add_executable(progressive examples/progressive.cpp ${SRC_ALL})
  35 +#target_link_libraries(progressive ${LIB_REDIS})
  36 +
  37 +add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL})
  38 +target_link_libraries(basic_threaded ${LIB_REDIS})
33 39  
34 40 add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL})
35 41 target_link_libraries(lpush_benchmark ${LIB_REDIS})
... ...
examples/basic_threaded.cpp 0 → 100644
  1 +/**
  2 +* Basic asynchronous calls using redisx.
  3 +*/
  4 +
  5 +#include <iostream>
  6 +#include <thread>
  7 +#include <chrono>
  8 +#include "../src/redisx.hpp"
  9 +
  10 +using namespace std;
  11 +
  12 +redisx::Redis rdx = {"localhost", 6379};
  13 +
  14 +int main(int argc, char* argv[]) {
  15 +
  16 + rdx.run();
  17 +
  18 + thread setter([]() {
  19 + while(true) {
  20 + rdx.command("INCR counter");
  21 + this_thread::sleep_for(chrono::milliseconds(1));
  22 + }
  23 + });
  24 +
  25 + thread getter([]() {
  26 + while(true) {
  27 + rdx.command<const string &>(
  28 + "GET counter",
  29 + [](const string& cmd, const string& value) {
  30 + cout << cmd << ": " << value << endl;
  31 + }
  32 + );
  33 + this_thread::sleep_for(chrono::milliseconds(1000));
  34 + }
  35 + });
  36 +
  37 + setter.join();
  38 + getter.join();
  39 +
  40 + return 0;
  41 +};
... ...
examples/lpush_benchmark.cpp
... ... @@ -3,13 +3,11 @@
3 3 */
4 4  
5 5 #include <iostream>
  6 +#include <mutex>
6 7 #include "../src/redisx.hpp"
7 8  
8 9 using namespace std;
9 10  
10   -static const string REDIS_HOST = "localhost";
11   -static const int REDIS_PORT = 6379;
12   -
13 11 unsigned long time_ms() {
14 12 return chrono::system_clock::now().time_since_epoch()
15 13 /chrono::milliseconds(1);
... ... @@ -17,45 +15,21 @@ unsigned long time_ms() {
17 15  
18 16 int main(int argc, char* argv[]) {
19 17  
20   - redisx::Redis r = {REDIS_HOST, REDIS_PORT};
21   - r.run();
22   -
23   - r.command<const string&>("GET blah", [](const string& cmd, const string& value) {
24   - cout << "[COMMAND] " << cmd << ": " << value << endl;
25   - });
26   -
27   - r.command<const char*>("GET blah", [](const string& cmd, const char* value) {
28   - cout << "[COMMAND] " << cmd << ": " << value << endl;
29   - });
30   -
31   -// r.command<const redisReply*>("LPUSH yahoo 1 2 3 4 f w", [](const string& cmd, const redisReply* reply) {
32   -// cout << "[COMMAND] " << cmd << ": " << reply->integer << endl;
33   -// });
  18 + redisx::Redis rdx = {"localhost", 6379};
  19 + rdx.run();
34 20  
35   -
36   - r.get("blahqwefwqefef", [](const string& cmd, const char* value) {
37   - cout << "[GET] blah: " << value << endl;
38   - });
39   -//
40   -// r.set("name", "lolfewef");
41   -//
42   -// r.command("SET blah wefoijewfojiwef");
43   -//
44   -// r.del("name");
45   -// r.del("wefoipjweojiqw", [](const string& cmd, long long int num_deleted) {
46   -// cout << "num deleted: " << num_deleted << endl;
47   -// });
48   -
49   -// r.command_loop("LPUSH count 1", 0, 1000);
  21 + rdx.command("DEL test");
50 22  
51 23 unsigned long t0 = time_ms();
52 24 unsigned long t1 = t0;
53 25  
54   - int len = 10000000;
  26 + int len = 1000000;
55 27 int count = 0;
  28 + mutex task_lock;
56 29  
  30 + task_lock.lock();
57 31 for(int i = 1; i <= len; i++) {
58   - r.command<int>("lpush test 1", [&t0, &t1, &count, len](const string& cmd, int reply) {
  32 + rdx.command<int>("lpush test 1", [&t0, &t1, &count, len, &task_lock](const string& cmd, int reply) {
59 33  
60 34 count++;
61 35 if(count == len) {
... ... @@ -65,13 +39,15 @@ int main(int argc, char* argv[]) {
65 39 cout << "Time to queue async commands: " << t1 - t0 << "ms" << endl;
66 40 cout << "Time to receive all: " << t2 - t1 << "ms" << endl;
67 41 cout << "Total time: " << t2 - t0 << "ms" << endl;
  42 +
  43 + task_lock.unlock();
68 44 }
69 45 });
70 46 }
71 47 t1 = time_ms();
72 48  
73   - //r.start();
74   - while(true) {}
  49 + task_lock.lock();
  50 + rdx.stop();
75 51  
76 52 return 0;
77 53 };
... ...
examples/progressive.cpp 0 → 100644
  1 +/**
  2 +* Basic asynchronous calls using redisx.
  3 +*/
  4 +
  5 +#include <iostream>
  6 +#include "../src/redisx.hpp"
  7 +
  8 +using namespace std;
  9 +
  10 +redisx::Redis rdx = {"localhost", 6379};
  11 +
  12 +void print_key(const string& key) {
  13 + rdx.command<const string&>("GET " + key, [key](const string& cmd, const string& value) {
  14 + cout << "[GET] " << key << ": \"" << value << '\"' << endl;
  15 + });
  16 +}
  17 +
  18 +void set_key(const string& key, const string& value) {
  19 + string cmd_str = "SET " + key + " " + value;
  20 + rdx.command<const string&>(cmd_str, [key, value](const string& cmd, const string& reply) {
  21 + cout << "[SET] " << key << ": \"" << value << '\"' << endl;
  22 + });
  23 +}
  24 +
  25 +int main(int argc, char* argv[]) {
  26 +
  27 + set_key("name", "Bob");
  28 + print_key("name");
  29 + set_key("name", "Steve");
  30 + print_key("name");
  31 +
  32 + rdx.run_blocking();
  33 + return 0;
  34 +};
... ...
src/redisx.cpp
... ... @@ -9,7 +9,6 @@
9 9 #include <ev.h>
10 10 #include <event2/thread.h>
11 11 #include <vector>
12   -#include <string.h>
13 12 #include "redisx.hpp"
14 13  
15 14 using namespace std;
... ... @@ -36,17 +35,13 @@ void disconnected(const redisAsyncContext *c, int status) {
36 35 connected_lock.lock();
37 36 }
38 37  
39   -Redis::Redis(const string& host, const int port) : host(host), port(port), io_ops(0) {
  38 +Redis::Redis(const string& host, const int port)
  39 + : host(host), port(port), io_ops(0), to_exit(false) {
40 40  
41   -// evthread_use_pthreads();
42   -// evthread_enable_lock_debuging();
43   -// event_enable_debug_mode();
44   -
45   - lock_guard<mutex> lg(evlock);
  41 + lock_guard<mutex> lg(queue_guard);
46 42 connected_lock.lock();
47 43  
48 44 signal(SIGPIPE, SIG_IGN);
49   -// base = event_base_new();
50 45  
51 46 c = redisAsyncConnect(host.c_str(), port);
52 47 if (c->err) {
... ... @@ -55,38 +50,49 @@ Redis::Redis(const string&amp; host, const int port) : host(host), port(port), io_op
55 50 }
56 51  
57 52 redisLibevAttach(EV_DEFAULT_ c);
58   -// redisLibeventAttach(c, base);
59 53 redisAsyncSetConnectCallback(c, connected);
60 54 redisAsyncSetDisconnectCallback(c, disconnected);
61 55 }
62 56  
63 57 Redis::~Redis() {
64 58 redisAsyncDisconnect(c);
  59 + stop();
65 60 }
66 61  
67   -void Redis::run() {
  62 +void Redis::run_blocking() {
68 63  
69   - event_loop_thread = thread([this] {
  64 + // Events to connect to Redis
  65 + ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
  66 + lock_guard<mutex> lg(connected_lock);
  67 +
  68 + // Continuously create events and handle them
  69 + while (!to_exit) {
  70 + process_queued_commands();
70 71 ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
71   - connected_lock.lock();
  72 + }
72 73  
73   - while (true) {
74   - process_queued_commands();
75   - ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
76   - }
77   - });
78   - event_loop_thread.detach();
  74 + // Handle exit events
  75 + ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
79 76 }
80 77  
81   -void Redis::run_blocking() {
  78 +void Redis::run() {
82 79  
83   - ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
84   - connected_lock.lock();
  80 + event_loop_thread = thread([this] { run_blocking(); });
  81 + event_loop_thread.detach();
  82 +}
85 83  
86   - while (true) {
87   - process_queued_commands();
88   - ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
  84 +void Redis::stop() {
  85 + to_exit = true;
  86 +}
  87 +
  88 +template<class ReplyT>
  89 +bool Redis::submit_to_server(const CommandAsync<ReplyT>* cmd_obj) {
  90 + if (redisAsyncCommand(c, command_callback<ReplyT>, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) {
  91 + cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << c->errstr << endl;
  92 + delete cmd_obj;
  93 + return false;
89 94 }
  95 + return true;
90 96 }
91 97  
92 98 template<class ReplyT>
... ... @@ -99,16 +105,14 @@ bool Redis::process_queued_command(void* cmd_ptr) {
99 105 CommandAsync<ReplyT>* cmd_obj = it->second;
100 106 command_map.erase(cmd_ptr);
101 107  
102   - if (redisAsyncCommand(c, command_callback<ReplyT>, cmd_ptr, cmd_obj->cmd.c_str()) != REDIS_OK) {
103   - cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << c->errstr << endl;
104   - delete cmd_obj;
105   - }
  108 + submit_to_server<ReplyT>(cmd_obj);
106 109  
107 110 return true;
108 111 }
109 112  
110 113 void Redis::process_queued_commands() {
111   - lock_guard<mutex> lg(evlock);
  114 +
  115 + lock_guard<mutex> lg(queue_guard);
112 116  
113 117 while(!command_queue.empty()) {
114 118  
... ... @@ -126,21 +130,6 @@ void Redis::process_queued_commands() {
126 130  
127 131 // ----------------------------
128 132  
129   -// TODO update
130   -void Redis::command(const char* cmd) {
131   -
132   - evlock.lock();
133   - int status = redisAsyncCommand(c, NULL, NULL, cmd);
134   - evlock.unlock();
135   -
136   - if (status != REDIS_OK) {
137   - cerr << "[ERROR] Async command \"" << cmd << "\": " << c->errstr << endl;
138   - return;
139   - }
140   -}
141   -
142   -// ----------------------------
143   -
144 133 template<> unordered_map<void*, CommandAsync<const redisReply*>*>& Redis::get_command_map() { return commands_redis_reply; }
145 134 template<>
146 135 void invoke_callback(const CommandAsync<const redisReply*>* cmd_obj, redisReply* reply) {
... ... @@ -189,36 +178,42 @@ void invoke_callback(const CommandAsync&lt;long long int&gt;* cmd_obj, redisReply* rep
189 178 }
190 179  
191 180 // ----------------------------
  181 +// Helpers
  182 +// ----------------------------
192 183  
193   -void Redis::get(const char* key, function<void(const string&, const char*)> callback) {
194   - string cmd = string("GET ") + key;
195   - command<const char*>(cmd.c_str(), callback);
196   -}
197   -
198   -void Redis::set(const char* key, const char* value) {
199   - string cmd = string("SET ") + key + " " + value;
200   - command<const char*>(cmd.c_str(), [](const string& command, const char* reply) {
201   - if(strcmp(reply, "OK"))
202   - cerr << "[ERROR] " << command << ": SET failed with reply " << reply << endl;
203   - });
204   -}
205   -
206   -void Redis::set(const char* key, const char* value, function<void(const string&, const char*)> callback) {
207   - string cmd = string("SET ") + key + " " + value;
208   - command<const char*>(cmd.c_str(), callback);
209   -}
210   -
211   -void Redis::del(const char* key) {
212   - string cmd = string("DEL ") + key;
213   - command<long long int>(cmd.c_str(), [](const string& command, long long int num_deleted) {
214   - if(num_deleted != 1)
215   - cerr << "[ERROR] " << command << ": Deleted " << num_deleted << " keys." << endl;
216   - });
217   -}
218   -
219   -void Redis::del(const char* key, function<void(const string&, long long int)> callback) {
220   - string cmd = string("DEL ") + key;
221   - command<long long int>(cmd.c_str(), callback);
222   -}
  184 +void Redis::command(const char* cmd) {
  185 + command<const redisReply*>(cmd, NULL);
  186 +}
  187 +
  188 +//void Redis::get(const char* key, function<void(const string&, const char*)> callback) {
  189 +// string cmd = string("GET ") + key;
  190 +// command<const char*>(cmd.c_str(), callback);
  191 +//}
  192 +//
  193 +//void Redis::set(const char* key, const char* value) {
  194 +// string cmd = string("SET ") + key + " " + value;
  195 +// command<const char*>(cmd.c_str(), [](const string& command, const char* reply) {
  196 +// if(strcmp(reply, "OK"))
  197 +// cerr << "[ERROR] " << command << ": SET failed with reply " << reply << endl;
  198 +// });
  199 +//}
  200 +//
  201 +//void Redis::set(const char* key, const char* value, function<void(const string&, const char*)> callback) {
  202 +// string cmd = string("SET ") + key + " " + value;
  203 +// command<const char*>(cmd.c_str(), callback);
  204 +//}
  205 +//
  206 +//void Redis::del(const char* key) {
  207 +// string cmd = string("DEL ") + key;
  208 +// command<long long int>(cmd.c_str(), [](const string& command, long long int num_deleted) {
  209 +// if(num_deleted != 1)
  210 +// cerr << "[ERROR] " << command << ": Deleted " << num_deleted << " keys." << endl;
  211 +// });
  212 +//}
  213 +//
  214 +//void Redis::del(const char* key, function<void(const string&, long long int)> callback) {
  215 +// string cmd = string("DEL ") + key;
  216 +// command<long long int>(cmd.c_str(), callback);
  217 +//}
223 218  
224 219 } // End namespace redis
... ...
src/redisx.hpp
... ... @@ -4,35 +4,36 @@
4 4  
5 5 #pragma once
6 6  
7   -#include <functional>
8   -#include <string>
9 7 #include <iostream>
10   -#include <hiredis/hiredis.h>
11   -#include <hiredis/async.h>
  8 +#include <functional>
  9 +
  10 +#include <thread>
12 11 #include <mutex>
  12 +#include <atomic>
  13 +
  14 +#include <string>
13 15 #include <queue>
14   -#include <set>
15 16 #include <unordered_map>
16   -#include <thread>
17 17  
18   -namespace redisx {
  18 +#include <hiredis/hiredis.h>
  19 +#include <hiredis/async.h>
19 20  
20   -class CommandAsyncGeneric {
21   -public:
22   - redisCallbackFn* fn;
23   - void* privdata;
24   - const char* cmd;
25   - CommandAsyncGeneric(redisCallbackFn* fn, void* privdata, const char* cmd)
26   - : fn(fn), privdata(privdata), cmd(cmd) {}
27   -};
  21 +namespace redisx {
28 22  
29 23 template<class ReplyT>
30 24 class CommandAsync {
31 25 public:
32   - CommandAsync(const std::string& cmd, const std::function<void(const std::string&, ReplyT)>& callback)
33   - : cmd(cmd), callback(callback) {}
  26 + CommandAsync(
  27 + const std::string& cmd,
  28 + const std::function<void(const std::string&, ReplyT)>& callback,
  29 + double repeat, double after
  30 + ) : cmd(cmd), callback(callback), repeat(repeat), after(after) {}
  31 +
34 32 const std::string cmd;
35 33 const std::function<void(const std::string&, ReplyT)> callback;
  34 + double repeat;
  35 + double after;
  36 +
36 37 void invoke(ReplyT reply) const {if(callback != NULL) callback(cmd, reply); }
37 38 };
38 39  
... ... @@ -45,24 +46,27 @@ public:
45 46  
46 47 void run();
47 48 void run_blocking();
  49 + void stop();
48 50  
49 51 template<class ReplyT>
50 52 void command(
51 53 const std::string& cmd,
52   - const std::function<void(const std::string&, ReplyT)>& callback
  54 + const std::function<void(const std::string&, ReplyT)>& callback = NULL,
  55 + double repeat = 0.0,
  56 + double after = 0.0
53 57 );
54 58  
55 59 void command(const char* command);
56 60  
57 61 // struct event* command_loop(const char* command, long interval_s, long interval_us);
58 62  
59   - void get(const char* key, std::function<void(const std::string&, const char*)> callback);
60   -
61   - void set(const char* key, const char* value);
62   - void set(const char* key, const char* value, std::function<void(const std::string&, const char*)> callback);
63   -
64   - void del(const char* key);
65   - void del(const char* key, std::function<void(const std::string&, long long int)> callback);
  63 +// void get(const char* key, std::function<void(const std::string&, const char*)> callback);
  64 +//
  65 +// void set(const char* key, const char* value);
  66 +// void set(const char* key, const char* value, std::function<void(const std::string&, const char*)> callback);
  67 +//
  68 +// void del(const char* key);
  69 +// void del(const char* key, std::function<void(const std::string&, long long int)> callback);
66 70  
67 71 // void publish(std::string channel, std::string msg);
68 72 // void subscribe(std::string channel, std::function<void(std::string channel, std::string msg)> callback);
... ... @@ -77,27 +81,34 @@ private:
77 81 // Number of IOs performed
78 82 long io_ops;
79 83  
80   - struct event_base *base;
81 84 redisAsyncContext *c;
82 85  
83   - std::mutex evlock;
  86 + std::atomic_bool to_exit;
84 87  
85 88 std::thread event_loop_thread;
86 89  
87   - template<class ReplyT>
88   - std::unordered_map<void*, CommandAsync<ReplyT>*>& get_command_map();
89   -
90 90 std::unordered_map<void*, CommandAsync<const redisReply*>*> commands_redis_reply;
91 91 std::unordered_map<void*, CommandAsync<const std::string&>*> commands_string_r;
92 92 std::unordered_map<void*, CommandAsync<const char*>*> commands_char_p;
93 93 std::unordered_map<void*, CommandAsync<int>*> commands_int;
94 94 std::unordered_map<void*, CommandAsync<long long int>*> commands_long_long_int;
95 95  
  96 + template<class ReplyT>
  97 + std::unordered_map<void*, CommandAsync<ReplyT>*>& get_command_map();
  98 +
96 99 std::queue<void*> command_queue;
  100 + std::mutex queue_guard;
97 101 void process_queued_commands();
98 102  
99 103 template<class ReplyT>
100 104 bool process_queued_command(void* cmd_ptr);
  105 +
  106 + /**
  107 + * Submit an asynchronous command to the Redis server. Return
  108 + * true if succeeded, false otherwise.
  109 + */
  110 + template<class ReplyT>
  111 + bool submit_to_server(const CommandAsync<ReplyT>* cmd_obj);
101 112 };
102 113  
103 114 // ---------------------------
... ... @@ -131,12 +142,18 @@ void command_callback(redisAsyncContext *c, void *r, void *privdata) {
131 142 }
132 143  
133 144 template<class ReplyT>
134   -void Redis::command(const std::string& cmd, const std::function<void(const std::string&, ReplyT)>& callback) {
135   -
136   - std::lock_guard<std::mutex> lg(evlock);
137   - auto* cmd_obj = new CommandAsync<ReplyT>(cmd, callback);
  145 +void Redis::command(
  146 + const std::string& cmd,
  147 + const std::function<void(const std::string&, ReplyT)>& callback,
  148 + double repeat,
  149 + double after
  150 +) {
  151 +
  152 + std::lock_guard<std::mutex> lg(queue_guard);
  153 + auto* cmd_obj = new CommandAsync<ReplyT>(cmd, callback, repeat, after);
138 154 get_command_map<ReplyT>()[(void*)cmd_obj] = cmd_obj;
139 155 command_queue.push((void*)cmd_obj);
140 156 }
141 157  
  158 +
142 159 } // End namespace redis
... ...