Commit a8a81ee77720903528da0bdeda85bd6f39c0182b

Authored by Hayk Martirosyan
1 parent b5d5ea9a

Improved connect/disconnect management

Consolidated user callbacks into one, improved some logic.
CMakeLists.txt
... ... @@ -32,11 +32,11 @@ set(LIB_ALL ${LIB_REDIS})
32 32 add_executable(basic examples/basic.cpp ${SRC_ALL})
33 33 target_link_libraries(basic ${LIB_REDIS})
34 34  
35   -#add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL})
36   -#target_link_libraries(basic_threaded ${LIB_REDIS})
  35 +add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL})
  36 +target_link_libraries(basic_threaded ${LIB_REDIS})
37 37  
38   -#add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL})
39   -#target_link_libraries(lpush_benchmark ${LIB_REDIS})
  38 +add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL})
  39 +target_link_libraries(lpush_benchmark ${LIB_REDIS})
40 40  
41 41 add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL})
42 42 target_link_libraries(speed_test_async ${LIB_REDIS})
... ... @@ -44,14 +44,14 @@ target_link_libraries(speed_test_async ${LIB_REDIS})
44 44 add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL})
45 45 target_link_libraries(speed_test_sync ${LIB_REDIS})
46 46  
47   -#add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL})
48   -#target_link_libraries(speed_test_async_multi ${LIB_REDIS})
  47 +add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL})
  48 +target_link_libraries(speed_test_async_multi ${LIB_REDIS})
49 49  
50   -#add_executable(data_types examples/data_types.cpp ${SRC_ALL})
51   -#target_link_libraries(data_types ${LIB_REDIS})
  50 +add_executable(data_types examples/data_types.cpp ${SRC_ALL})
  51 +target_link_libraries(data_types ${LIB_REDIS})
52 52  
53   -#add_executable(string_v_char examples/string_vs_charp.cpp ${SRC_ALL})
54   -#target_link_libraries(string_v_char ${LIB_REDIS})
  53 +add_executable(string_v_char examples/string_vs_charp.cpp ${SRC_ALL})
  54 +target_link_libraries(string_v_char ${LIB_REDIS})
55 55  
56 56 add_executable(multi_client examples/multi-client.cpp ${SRC_ALL})
57 57 target_link_libraries(multi_client ${LIB_REDIS})
... ...
examples/basic.cpp
... ... @@ -9,7 +9,7 @@ using namespace std;
9 9  
10 10 int main(int argc, char* argv[]) {
11 11  
12   - redox::Redox rdx = {"localhost", 6379}; // Initialize Redox
  12 + redox::Redox rdx = {"localhost", 6379, [](int state) { cout << "Connection state: " << state << endl; }}; // Initialize Redox
13 13 if(!rdx.start()) return 1; // Start the event loop
14 14  
15 15 rdx.del("occupation");
... ...
examples/basic_threaded.cpp
... ... @@ -13,7 +13,7 @@ redox::Redox rdx = {&quot;localhost&quot;, 6379};
13 13  
14 14 int main(int argc, char* argv[]) {
15 15  
16   - rdx.start();
  16 + if(!rdx.start()) return 1;
17 17  
18 18 thread setter([]() {
19 19 for(int i = 0; i < 5000; i++) {
... ...
examples/data_types.cpp
... ... @@ -10,7 +10,7 @@ using namespace std;
10 10 int main(int argc, char* argv[]) {
11 11  
12 12 redox::Redox rdx; // Initialize Redox (default host/port)
13   - rdx.start(); // Start the event loop
  13 + if(!rdx.start()) return 1; // Start the event loop
14 14  
15 15 rdx.del("mylist");
16 16  
... ...
examples/lpush_benchmark.cpp
... ... @@ -15,7 +15,7 @@ double time_s() {
15 15 int main(int argc, char* argv[]) {
16 16  
17 17 redox::Redox rdx;
18   - rdx.start();
  18 + if(!rdx.start()) return 1;
19 19  
20 20 rdx.del("test");
21 21  
... ...
examples/speed_test_async.cpp
... ... @@ -18,7 +18,7 @@ double time_s() {
18 18 int main(int argc, char* argv[]) {
19 19  
20 20 Redox rdx = {"localhost", 6379};
21   - rdx.start();
  21 + if(!rdx.start()) return 1;
22 22  
23 23 if(rdx.command_blocking("SET simple_loop:count 0")) {
24 24 cout << "Reset the counter to zero." << endl;
... ...
examples/speed_test_async_multi.cpp
... ... @@ -19,7 +19,7 @@ double time_s() {
19 19 int main(int argc, char* argv[]) {
20 20  
21 21 Redox rdx = {"localhost", 6379};
22   - rdx.start();
  22 + if(!rdx.start()) return 1;
23 23  
24 24 if(rdx.set("simple_loop:count", "0")) {
25 25 cout << "Reset the counter to zero." << endl;
... ...
examples/speed_test_sync.cpp
... ... @@ -18,7 +18,7 @@ double time_s() {
18 18 int main(int argc, char* argv[]) {
19 19  
20 20 Redox rdx = {"localhost", 6379};
21   - rdx.start();
  21 + if(!rdx.start()) return 1;
22 22  
23 23 if(rdx.command_blocking("SET simple_loop:count 0")) {
24 24 cout << "Reset the counter to zero." << endl;
... ...
examples/string_vs_charp.cpp
... ... @@ -18,7 +18,7 @@ double time_s() {
18 18 int main(int argc, char* argv[]) {
19 19  
20 20 Redox rdx;
21   - rdx.start();
  21 + if(!rdx.start()) return 1;
22 22  
23 23 rdx.del("stringtest");
24 24 rdx.set("stringtest", "value");
... ...
src/redox.cpp
... ... @@ -16,19 +16,16 @@ void Redox::connected_callback(const redisAsyncContext *ctx, int status) {
16 16 if (status != REDIS_OK) {
17 17 cerr << "[ERROR] Connecting to Redis: " << ctx->errstr << endl;
18 18 rdx->connect_state = REDOX_CONNECT_ERROR;
19   - rdx->connect_waiter.notify_all();
20   - return;
21   - }
22 19  
23   - // Disable hiredis automatically freeing reply objects
24   - ctx->c.reader->fn->freeObject = [](void* reply) {};
  20 + } else {
  21 + // Disable hiredis automatically freeing reply objects
  22 + ctx->c.reader->fn->freeObject = [](void *reply) {};
  23 + rdx->connect_state = REDOX_CONNECTED;
  24 + cout << "[INFO] Connected to Redis." << endl;
  25 + }
25 26  
26   - rdx->connect_state = REDOX_CONNECTED;
27 27 rdx->connect_waiter.notify_all();
28   -
29   - cout << "[INFO] Connected to Redis." << endl;
30   -
31   - if(rdx->user_connect_callback) rdx->user_connect_callback();
  28 + if(rdx->user_connection_callback) rdx->user_connection_callback(rdx->connect_state);
32 29 }
33 30  
34 31 void Redox::disconnected_callback(const redisAsyncContext *ctx, int status) {
... ... @@ -45,37 +42,36 @@ void Redox::disconnected_callback(const redisAsyncContext *ctx, int status) {
45 42  
46 43 rdx->stop_signal();
47 44 rdx->connect_waiter.notify_all();
48   -
49   - if(rdx->user_disconnect_callback) rdx->user_disconnect_callback();
  45 + if(rdx->user_connection_callback) rdx->user_connection_callback(rdx->connect_state);
50 46 }
51 47  
52 48 Redox::Redox(
53 49 const string& host, const int port,
54   - std::function<void(void)> connected,
55   - std::function<void(void)> disconnected
56   -) : host(host), port(port), user_connect_callback(connected), user_disconnect_callback(disconnected) {
  50 + std::function<void(int)> connection_callback
  51 +) : host(host), port(port), user_connection_callback(connection_callback) {
57 52  
58   - // Required by libev
  53 + // libev setup
59 54 signal(SIGPIPE, SIG_IGN);
  55 + evloop = ev_loop_new(EVFLAG_AUTO);
  56 + ev_set_userdata(evloop, (void*)this); // Back-reference
60 57  
61 58 // Create a redisAsyncContext
62 59 ctx = redisAsyncConnect(host.c_str(), port);
  60 + ctx->data = (void*)this; // Back-reference
  61 +
63 62 if (ctx->err) {
64   - printf("Error: %s\n", ctx->errstr);
  63 + cout << "[ERROR] Could not create a hiredis context: " << ctx->errstr << endl;
  64 + connect_state = REDOX_CONNECT_ERROR;
  65 + connect_waiter.notify_all();
65 66 return;
66 67 }
67 68  
68   - // Create a new event loop and attach it to hiredis
69   - evloop = ev_loop_new(EVFLAG_AUTO);
  69 + // Attach event loop to hiredis
70 70 redisLibevAttach(evloop, ctx);
71 71  
72 72 // Set the callbacks to be invoked on server connection/disconnection
73 73 redisAsyncSetConnectCallback(ctx, Redox::connected_callback);
74 74 redisAsyncSetDisconnectCallback(ctx, Redox::disconnected_callback);
75   -
76   - // Set back references to this Redox object (for use in callbacks)
77   - ev_set_userdata(evloop, (void*)this);
78   - ctx->data = (void*)this;
79 75 }
80 76  
81 77 void Redox::run_event_loop() {
... ...
src/redox.hpp
... ... @@ -46,8 +46,7 @@ public:
46 46 Redox(
47 47 const std::string& host = REDIS_DEFAULT_HOST,
48 48 const int port = REDIS_DEFAULT_PORT,
49   - std::function<void(void)> connected = nullptr,
50   - std::function<void(void)> disconnected = nullptr
  49 + std::function<void(int)> connection_callback = nullptr
51 50 );
52 51 ~Redox();
53 52  
... ... @@ -184,8 +183,7 @@ private:
184 183 std::condition_variable connect_waiter;
185 184  
186 185 // User connect/disconnect callbacks
187   - std::function<void(void)> user_connect_callback;
188   - std::function<void(void)> user_disconnect_callback;
  186 + std::function<void(int)> user_connection_callback;
189 187  
190 188 // Dynamically allocated libev event loop
191 189 struct ev_loop* evloop;
... ...