Commit b5d5ea9ae393eeb17e452d3bbe8b31f280f8b7bc

Authored by Hayk Martirosyan
1 parent 2e36078f

Connection state management implemented

Created an atomic_int connect_state that keeps track of not yet
connected, connected, disconnected, and errors. Used to implement good
error behavior when the server is down and you start() a Redox instance,
or when the server goes down in the middle of running commands. Now,
nothing should hang, but should return errors (or throw exceptions) when
the server goes down.

Also added hooks for a user connect/disconnect callback in the
constructor, which is helpful for client programs.

Added an example with three Redox clients in one thread.
CMakeLists.txt
1 1 cmake_minimum_required(VERSION 2.8.4)
2 2 project(redox)
3 3  
4   -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3")
5   -#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer")
  4 +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3")
  5 +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer")
6 6 # set(CMAKE_VERBOSE_MAKEFILE ON)
7 7  
8 8 # ---------------------------------------------------------
... ... @@ -52,3 +52,6 @@ target_link_libraries(speed_test_sync ${LIB_REDIS})
52 52  
53 53 #add_executable(string_v_char examples/string_vs_charp.cpp ${SRC_ALL})
54 54 #target_link_libraries(string_v_char ${LIB_REDIS})
  55 +
  56 +add_executable(multi_client examples/multi-client.cpp ${SRC_ALL})
  57 +target_link_libraries(multi_client ${LIB_REDIS})
... ...
README.md
... ... @@ -28,6 +28,22 @@ During these tests, Redox communicated with a local Redis server over TCP.
28 28 * One repeating asynchronous command (`speed_test_async`): **195,159 commands/s**
29 29 * One blocking command in a loop (`speed_test_sync`): **23,609 commands/s**
30 30  
  31 +## Build from source
  32 +Instructions provided are for Ubuntu, but Redox is fully platform-independent.
  33 +
  34 +Get the build environment:
  35 +
  36 + sudo apt-get install git cmake build-essential
  37 +
  38 +Get the dependencies:
  39 +
  40 + sudo apt-get install libhiredis-dev libev-dev
  41 +
  42 +Build Redox and examples using CMake (a helper script is provided):
  43 +
  44 + cd redox
  45 + ./make.sh
  46 +
31 47 ## Tutorial
32 48 Coming soon. For now, look at the example programs located in `examples/`, and the snippets
33 49 posted below.
... ...
examples/basic.cpp
... ... @@ -10,7 +10,7 @@ using namespace std;
10 10 int main(int argc, char* argv[]) {
11 11  
12 12 redox::Redox rdx = {"localhost", 6379}; // Initialize Redox
13   - rdx.start(); // Start the event loop
  13 + if(!rdx.start()) return 1; // Start the event loop
14 14  
15 15 rdx.del("occupation");
16 16  
... ...
examples/multi-client.cpp 0 → 100644
  1 +/**
  2 +* Redox example with multiple clients.
  3 +*/
  4 +
  5 +#include <iostream>
  6 +#include "../src/redox.hpp"
  7 +
  8 +using namespace std;
  9 +
  10 +int main(int argc, char* argv[]) {
  11 +
  12 + redox::Redox rdx1, rdx2, rdx3;
  13 +
  14 + if(!rdx1.start() || !rdx2.start() || !rdx3.start()) return 1;
  15 +
  16 + rdx1.del("occupation");
  17 +
  18 + if(!rdx2.set("occupation", "carpenter")) // Set a key, check if succeeded
  19 + cerr << "Failed to set key!" << endl;
  20 +
  21 + cout << "key = occupation, value = \"" << rdx3.get("occupation") << "\"" << endl;
  22 +
  23 + rdx1.stop();
  24 + rdx2.stop();
  25 + rdx3.stop();
  26 +}
... ...
src/redox.cpp
... ... @@ -9,40 +9,51 @@ using namespace std;
9 9  
10 10 namespace redox {
11 11  
12   -void Redox::connected(const redisAsyncContext *ctx, int status) {
  12 +void Redox::connected_callback(const redisAsyncContext *ctx, int status) {
  13 +
  14 + Redox* rdx = (Redox*) ctx->data;
13 15  
14 16 if (status != REDIS_OK) {
15 17 cerr << "[ERROR] Connecting to Redis: " << ctx->errstr << endl;
  18 + rdx->connect_state = REDOX_CONNECT_ERROR;
  19 + rdx->connect_waiter.notify_all();
16 20 return;
17 21 }
18 22  
19 23 // Disable hiredis automatically freeing reply objects
20 24 ctx->c.reader->fn->freeObject = [](void* reply) {};
21 25  
22   - Redox* rdx = (Redox*) ctx->data;
23   - rdx->connected_lock.unlock();
  26 + rdx->connect_state = REDOX_CONNECTED;
  27 + rdx->connect_waiter.notify_all();
24 28  
25 29 cout << "[INFO] Connected to Redis." << endl;
  30 +
  31 + if(rdx->user_connect_callback) rdx->user_connect_callback();
26 32 }
27 33  
28   -void Redox::disconnected(const redisAsyncContext *ctx, int status) {
  34 +void Redox::disconnected_callback(const redisAsyncContext *ctx, int status) {
  35 +
  36 + Redox* rdx = (Redox*) ctx->data;
29 37  
30 38 if (status != REDIS_OK) {
31 39 cerr << "[ERROR] Disconnecting from Redis: " << ctx->errstr << endl;
32   - return;
  40 + rdx->connect_state = REDOX_DISCONNECT_ERROR;
  41 + } else {
  42 + cout << "[INFO] Disconnected from Redis as planned." << endl;
  43 + rdx->connect_state = REDOX_DISCONNECTED;
33 44 }
34 45  
35   - // Re-enable hiredis automatically freeing reply objects
36   - ctx->c.reader->fn->freeObject = freeReplyObject;
  46 + rdx->stop_signal();
  47 + rdx->connect_waiter.notify_all();
37 48  
38   - Redox* rdx = (Redox*) ctx->data;
39   - rdx->connected_lock.unlock();
40   -
41   - cout << "[INFO] Disconnected from Redis." << endl;
  49 + if(rdx->user_disconnect_callback) rdx->user_disconnect_callback();
42 50 }
43 51  
44   -Redox::Redox(const string& host, const int port)
45   - : host(host), port(port) {
  52 +Redox::Redox(
  53 + const string& host, const int port,
  54 + std::function<void(void)> connected,
  55 + std::function<void(void)> disconnected
  56 +) : host(host), port(port), user_connect_callback(connected), user_disconnect_callback(disconnected) {
46 57  
47 58 // Required by libev
48 59 signal(SIGPIPE, SIG_IGN);
... ... @@ -59,30 +70,12 @@ Redox::Redox(const string&amp; host, const int port)
59 70 redisLibevAttach(evloop, ctx);
60 71  
61 72 // Set the callbacks to be invoked on server connection/disconnection
62   - redisAsyncSetConnectCallback(ctx, Redox::connected);
63   - redisAsyncSetDisconnectCallback(ctx, Redox::disconnected);
  73 + redisAsyncSetConnectCallback(ctx, Redox::connected_callback);
  74 + redisAsyncSetDisconnectCallback(ctx, Redox::disconnected_callback);
64 75  
65 76 // Set back references to this Redox object (for use in callbacks)
66 77 ev_set_userdata(evloop, (void*)this);
67 78 ctx->data = (void*)this;
68   -
69   - // Lock this mutex until the connected callback is invoked
70   - connected_lock.lock();
71   -}
72   -
73   -Redox::~Redox() {
74   -
75   - redisAsyncDisconnect(ctx);
76   -
77   - stop();
78   -
79   - if(event_loop_thread.joinable())
80   - event_loop_thread.join();
81   -
82   - ev_loop_destroy(evloop);
83   -
84   - std::cout << "[INFO] Redox created " << commands_created
85   - << " Commands and freed " << commands_deleted << "." << std::endl;
86 79 }
87 80  
88 81 void Redox::run_event_loop() {
... ... @@ -90,9 +83,16 @@ void Redox::run_event_loop() {
90 83 // Events to connect to Redox
91 84 ev_run(evloop, EVRUN_NOWAIT);
92 85  
93   - // Block until connected to Redis
94   - connected_lock.lock();
95   - connected_lock.unlock();
  86 + // Block until connected to Redis, or error
  87 + unique_lock<mutex> ul(connect_lock);
  88 + connect_waiter.wait(ul, [this] { return connect_state != REDOX_NOT_YET_CONNECTED; });
  89 +
  90 + // Handle connection error
  91 + if(connect_state != REDOX_CONNECTED) {
  92 + cout << "[INFO] Did not connect, event loop exiting." << endl;
  93 + running_waiter.notify_one();
  94 + return;
  95 + }
96 96  
97 97 // Set up asynchronous watcher which we signal every
98 98 // time we add a command
... ... @@ -128,13 +128,19 @@ void Redox::run_event_loop() {
128 128 cout << "[INFO] Event thread exited." << endl;
129 129 }
130 130  
131   -void Redox::start() {
  131 +bool Redox::start() {
132 132  
133 133 event_loop_thread = thread([this] { run_event_loop(); });
134 134  
135   - // Block until connected and running the event loop
  135 + // Block until connected and running the event loop, or until
  136 + // a connection error happens and the event loop exits
136 137 unique_lock<mutex> ul(running_waiter_lock);
137   - running_waiter.wait(ul, [this] { return running.load(); });
  138 + running_waiter.wait(ul, [this] {
  139 + return running.load() || connect_state == REDOX_CONNECT_ERROR;
  140 + });
  141 +
  142 + // Return if succeeded
  143 + return connect_state == REDOX_CONNECTED;
138 144 }
139 145  
140 146 void Redox::stop_signal() {
... ... @@ -152,6 +158,27 @@ void Redox::stop() {
152 158 block();
153 159 }
154 160  
  161 +void Redox::disconnect() {
  162 + stop_signal();
  163 + if(connect_state == REDOX_CONNECTED) {
  164 + redisAsyncDisconnect(ctx);
  165 + block();
  166 + }
  167 +}
  168 +
  169 +Redox::~Redox() {
  170 +
  171 + disconnect();
  172 +
  173 + if(event_loop_thread.joinable())
  174 + event_loop_thread.join();
  175 +
  176 + ev_loop_destroy(evloop);
  177 +
  178 + std::cout << "[INFO] Redox created " << commands_created
  179 + << " Commands and freed " << commands_deleted << "." << std::endl;
  180 +}
  181 +
155 182 template<class ReplyT>
156 183 Command<ReplyT>* Redox::find_command(long id) {
157 184  
... ...
src/redox.hpp
... ... @@ -29,6 +29,13 @@ namespace redox {
29 29 static const std::string REDIS_DEFAULT_HOST = "localhost";
30 30 static const int REDIS_DEFAULT_PORT = 6379;
31 31  
  32 +// Connection status
  33 +static const int REDOX_NOT_YET_CONNECTED = 0;
  34 +static const int REDOX_CONNECTED = 1;
  35 +static const int REDOX_DISCONNECTED = 2;
  36 +static const int REDOX_CONNECT_ERROR = 3;
  37 +static const int REDOX_DISCONNECT_ERROR = 4;
  38 +
32 39 class Redox {
33 40  
34 41 public:
... ... @@ -36,14 +43,19 @@ public:
36 43 /**
37 44 * Initialize everything, connect over TCP to a Redis server.
38 45 */
39   - Redox(const std::string& host = REDIS_DEFAULT_HOST, const int port = REDIS_DEFAULT_PORT);
  46 + Redox(
  47 + const std::string& host = REDIS_DEFAULT_HOST,
  48 + const int port = REDIS_DEFAULT_PORT,
  49 + std::function<void(void)> connected = nullptr,
  50 + std::function<void(void)> disconnected = nullptr
  51 + );
40 52 ~Redox();
41 53  
42 54 /**
43 55 * Connect to Redis and start the event loop in a separate thread. Returns
44   - * once everything is ready to go.
  56 + * true if and when everything is ready to go, or false on failure.
45 57 */
46   - void start();
  58 + bool start();
47 59  
48 60 /**
49 61 * Signal the event loop to stop processing commands and shut down.
... ... @@ -107,6 +119,12 @@ public:
107 119 // Hiredis context, left public to allow low-level access
108 120 redisAsyncContext *ctx;
109 121  
  122 + /**
  123 + * If connected, disconnect from the Redis server. Usually not necessary to invoke
  124 + * manually, as it is called in the destructor.
  125 + */
  126 + void disconnect();
  127 +
110 128 // ------------------------------------------------
111 129 // Wrapper methods for convenience only
112 130 // ------------------------------------------------
... ... @@ -160,8 +178,14 @@ private:
160 178 std::string host;
161 179 int port;
162 180  
163   - // Block run() until redis is connected
164   - std::mutex connected_lock;
  181 + // Manage connection state
  182 + std::atomic_int connect_state = {REDOX_NOT_YET_CONNECTED};
  183 + std::mutex connect_lock;
  184 + std::condition_variable connect_waiter;
  185 +
  186 + // User connect/disconnect callbacks
  187 + std::function<void(void)> user_connect_callback;
  188 + std::function<void(void)> user_disconnect_callback;
165 189  
166 190 // Dynamically allocated libev event loop
167 191 struct ev_loop* evloop;
... ... @@ -218,8 +242,8 @@ private:
218 242 void run_event_loop();
219 243  
220 244 // Callbacks invoked on server connection/disconnection
221   - static void connected(const redisAsyncContext *c, int status);
222   - static void disconnected(const redisAsyncContext *c, int status);
  245 + static void connected_callback(const redisAsyncContext *c, int status);
  246 + static void disconnected_callback(const redisAsyncContext *c, int status);
223 247  
224 248 template<class ReplyT>
225 249 static void command_callback(redisAsyncContext *ctx, void *r, void *privdata);
... ...