Commit b0c70bce04fcf50590a35dbed5799f7619be2b06
1 parent
7e6343a1
Move everything out of constructor to .connect()
This makes more sense logically, and lets the Redox object get default constructed with nothing but the optional logger parameters.
Showing
13 changed files
with
159 additions
and
150 deletions
CMakeLists.txt
| @@ -26,7 +26,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall") | @@ -26,7 +26,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall") | ||
| 26 | # --------------------------------------------------------- | 26 | # --------------------------------------------------------- |
| 27 | 27 | ||
| 28 | set(SRC_DIR ${CMAKE_SOURCE_DIR}/src) | 28 | set(SRC_DIR ${CMAKE_SOURCE_DIR}/src) |
| 29 | -set(INC_DIR ${CMAKE_SOURCE_DIR}/include/redox) | 29 | +set(INC_DIR ${CMAKE_SOURCE_DIR}/include) |
| 30 | 30 | ||
| 31 | set(SRC_CORE | 31 | set(SRC_CORE |
| 32 | ${SRC_DIR}/client.cpp | 32 | ${SRC_DIR}/client.cpp |
| @@ -34,20 +34,20 @@ set(SRC_CORE | @@ -34,20 +34,20 @@ set(SRC_CORE | ||
| 34 | ${SRC_DIR}/subscriber.cpp) | 34 | ${SRC_DIR}/subscriber.cpp) |
| 35 | 35 | ||
| 36 | set(INC_CORE | 36 | set(INC_CORE |
| 37 | - ${INC_DIR}/client.hpp | ||
| 38 | - ${INC_DIR}/subscriber.hpp | ||
| 39 | - ${INC_DIR}/command.hpp) | 37 | + ${INC_DIR}/redox/client.hpp |
| 38 | + ${INC_DIR}/redox/subscriber.hpp | ||
| 39 | + ${INC_DIR}/redox/command.hpp) | ||
| 40 | 40 | ||
| 41 | set(SRC_UTILS ${SRC_DIR}/utils/logger.cpp) | 41 | set(SRC_UTILS ${SRC_DIR}/utils/logger.cpp) |
| 42 | set(INC_UTILS ${INC_DIR}/utils/logger.hpp) | 42 | set(INC_UTILS ${INC_DIR}/utils/logger.hpp) |
| 43 | 43 | ||
| 44 | -set(INC_WRAPPER ${CMAKE_SOURCE_DIR}/include/redox.hpp) | 44 | +set(INC_WRAPPER ${INC_DIR}/redox.hpp) |
| 45 | 45 | ||
| 46 | set(SRC_ALL ${SRC_CORE} ${SRC_UTILS}) | 46 | set(SRC_ALL ${SRC_CORE} ${SRC_UTILS}) |
| 47 | set(INC_ALL ${INC_CORE} ${INC_UTILS} ${INC_WRAPPER}) | 47 | set(INC_ALL ${INC_CORE} ${INC_UTILS} ${INC_WRAPPER}) |
| 48 | 48 | ||
| 49 | include_directories(${INC_DIR}) | 49 | include_directories(${INC_DIR}) |
| 50 | -#include_directories(${INC_DIR/redox}) | 50 | +include_directories(${INC_DIR}/redox) |
| 51 | 51 | ||
| 52 | # Dependent libraries - you may have to change | 52 | # Dependent libraries - you may have to change |
| 53 | # pthread to whatever C++11 threads depends on | 53 | # pthread to whatever C++11 threads depends on |
README.md
| @@ -18,7 +18,7 @@ details so you can move on to the interesting part of your project. | @@ -18,7 +18,7 @@ details so you can move on to the interesting part of your project. | ||
| 18 | * Automatic pipelining, even for synchronous calls from separate threads | 18 | * Automatic pipelining, even for synchronous calls from separate threads |
| 19 | * Low-level access when needed | 19 | * Low-level access when needed |
| 20 | * Accessible and robust error handling | 20 | * Accessible and robust error handling |
| 21 | - * Logs to any ostream at a user-controllable log level | 21 | + * Logs to any ostream at a user-controlled log level |
| 22 | * Fast - developed for robotics applications | 22 | * Fast - developed for robotics applications |
| 23 | * 100% clean Valgrind reports | 23 | * 100% clean Valgrind reports |
| 24 | 24 | ||
| @@ -55,8 +55,8 @@ Here is the simplest possible redox program: | @@ -55,8 +55,8 @@ Here is the simplest possible redox program: | ||
| 55 | 55 | ||
| 56 | int main(int argc, char* argv[]) { | 56 | int main(int argc, char* argv[]) { |
| 57 | 57 | ||
| 58 | - Redox rdx = {"localhost", 6379}; | ||
| 59 | - if(!rdx.connect()) return 1; | 58 | + Redox rdx; |
| 59 | + if(!rdx.connect("localhost", 6379)) return 1; | ||
| 60 | 60 | ||
| 61 | rdx.set("hello", "world!"); | 61 | rdx.set("hello", "world!"); |
| 62 | cout << "Hello, " << rdx.get("hello") << endl; | 62 | cout << "Hello, " << rdx.get("hello") << endl; |
| @@ -104,8 +104,10 @@ the callback returns. | @@ -104,8 +104,10 @@ the callback returns. | ||
| 104 | 104 | ||
| 105 | Here is a simple example of running `GET hello` asynchronously ten times: | 105 | Here is a simple example of running `GET hello` asynchronously ten times: |
| 106 | 106 | ||
| 107 | - Redox rdx; // Localhost by default | ||
| 108 | - if(!rdx.connect()) return 1; // Block until connected | 107 | + Redox rdx; |
| 108 | + | ||
| 109 | + // Block until connected, localhost by default | ||
| 110 | + if(!rdx.connect()) return 1; | ||
| 109 | 111 | ||
| 110 | auto got_reply = [](Command<string>& c) { | 112 | auto got_reply = [](Command<string>& c) { |
| 111 | if(!c.ok()) return; | 113 | if(!c.ok()) return; |
examples/basic.cpp
| @@ -12,9 +12,8 @@ using redox::Subscriber; | @@ -12,9 +12,8 @@ using redox::Subscriber; | ||
| 12 | 12 | ||
| 13 | int main(int argc, char* argv[]) { | 13 | int main(int argc, char* argv[]) { |
| 14 | 14 | ||
| 15 | - Redox rdx = {"localhost", 6379}; // Initialize Redox | ||
| 16 | - | ||
| 17 | - if(!rdx.connect()) return 1; // Start the event loop | 15 | + Redox rdx; |
| 16 | + if(!rdx.connect("localhost", 6379)) return 1; | ||
| 18 | 17 | ||
| 19 | rdx.del("occupation"); | 18 | rdx.del("occupation"); |
| 20 | 19 |
examples/basic_threaded.cpp
| @@ -11,11 +11,11 @@ using namespace std; | @@ -11,11 +11,11 @@ using namespace std; | ||
| 11 | using redox::Redox; | 11 | using redox::Redox; |
| 12 | using redox::Command; | 12 | using redox::Command; |
| 13 | 13 | ||
| 14 | -redox::Redox rdx = {"localhost", 6379}; | 14 | +redox::Redox rdx; |
| 15 | 15 | ||
| 16 | int main(int argc, char* argv[]) { | 16 | int main(int argc, char* argv[]) { |
| 17 | 17 | ||
| 18 | - if(!rdx.connect()) return 1; | 18 | + if(!rdx.connect("localhost", 6379)) return 1; |
| 19 | 19 | ||
| 20 | thread setter([]() { | 20 | thread setter([]() { |
| 21 | for(int i = 0; i < 5000; i++) { | 21 | for(int i = 0; i < 5000; i++) { |
examples/binary_data.cpp
| @@ -22,8 +22,8 @@ std::string random_string(size_t length) { | @@ -22,8 +22,8 @@ std::string random_string(size_t length) { | ||
| 22 | 22 | ||
| 23 | int main(int argc, char* argv[]) { | 23 | int main(int argc, char* argv[]) { |
| 24 | 24 | ||
| 25 | - redox::Redox rdx = {"localhost", 6379}; // Initialize Redox | ||
| 26 | - if(!rdx.connect()) return 1; // Start the event loop | 25 | + redox::Redox rdx; // Initialize Redox |
| 26 | + if(!rdx.connect("localhost", 6379)) return 1; // Start the event loop | ||
| 27 | 27 | ||
| 28 | rdx.del("binary"); | 28 | rdx.del("binary"); |
| 29 | 29 |
examples/speed_test_async.cpp
| @@ -18,8 +18,8 @@ double time_s() { | @@ -18,8 +18,8 @@ double time_s() { | ||
| 18 | 18 | ||
| 19 | int main(int argc, char* argv[]) { | 19 | int main(int argc, char* argv[]) { |
| 20 | 20 | ||
| 21 | - Redox rdx = {"/var/run/redis/redis.sock", nullptr}; | ||
| 22 | - if(!rdx.connect()) return 1; | 21 | + Redox rdx; |
| 22 | + if(!rdx.connect_unix("/var/run/redis/redis.sock")) return 1; | ||
| 23 | 23 | ||
| 24 | bool status = rdx.commandSync("SET simple_loop:count 0"); | 24 | bool status = rdx.commandSync("SET simple_loop:count 0"); |
| 25 | if(status) { | 25 | if(status) { |
examples/speed_test_async_multi.cpp
| @@ -19,8 +19,8 @@ double time_s() { | @@ -19,8 +19,8 @@ double time_s() { | ||
| 19 | 19 | ||
| 20 | int main(int argc, char* argv[]) { | 20 | int main(int argc, char* argv[]) { |
| 21 | 21 | ||
| 22 | - Redox rdx = {"localhost", 6379}; | ||
| 23 | - if(!rdx.connect()) return 1; | 22 | + Redox rdx; |
| 23 | + if(!rdx.connect("localhost", 6379)) return 1; | ||
| 24 | 24 | ||
| 25 | if(rdx.set("simple_loop:count", "0")) { | 25 | if(rdx.set("simple_loop:count", "0")) { |
| 26 | cout << "Reset the counter to zero." << endl; | 26 | cout << "Reset the counter to zero." << endl; |
examples/speed_test_sync.cpp
| @@ -17,8 +17,8 @@ double time_s() { | @@ -17,8 +17,8 @@ double time_s() { | ||
| 17 | 17 | ||
| 18 | int main(int argc, char* argv[]) { | 18 | int main(int argc, char* argv[]) { |
| 19 | 19 | ||
| 20 | - Redox rdx = {"localhost", 6379}; | ||
| 21 | - if(!rdx.connect()) return 1; | 20 | + Redox rdx; |
| 21 | + if(!rdx.connect("localhost", 6379)) return 1; | ||
| 22 | 22 | ||
| 23 | if(rdx.commandSync("SET simple_loop:count 0")) { | 23 | if(rdx.commandSync("SET simple_loop:count 0")) { |
| 24 | cout << "Reset the counter to zero." << endl; | 24 | cout << "Reset the counter to zero." << endl; |
include/redox/client.hpp
| @@ -45,6 +45,7 @@ namespace redox { | @@ -45,6 +45,7 @@ namespace redox { | ||
| 45 | 45 | ||
| 46 | static const std::string REDIS_DEFAULT_HOST = "localhost"; | 46 | static const std::string REDIS_DEFAULT_HOST = "localhost"; |
| 47 | static const int REDIS_DEFAULT_PORT = 6379; | 47 | static const int REDIS_DEFAULT_PORT = 6379; |
| 48 | +static const std::string REDIS_DEFAULT_PATH = "/var/run/redis/redis.sock"; | ||
| 48 | 49 | ||
| 49 | /** | 50 | /** |
| 50 | * Redox intro here. | 51 | * Redox intro here. |
| @@ -65,22 +66,9 @@ public: | @@ -65,22 +66,9 @@ public: | ||
| 65 | // ------------------------------------------------ | 66 | // ------------------------------------------------ |
| 66 | 67 | ||
| 67 | /** | 68 | /** |
| 68 | - * Initializes everything, connects over TCP to a Redis server. | 69 | + * Constructor. Optionally specify a log stream and a log level. |
| 69 | */ | 70 | */ |
| 70 | Redox( | 71 | Redox( |
| 71 | - const std::string& host = REDIS_DEFAULT_HOST, | ||
| 72 | - const int port = REDIS_DEFAULT_PORT, | ||
| 73 | - std::function<void(int)> connection_callback = nullptr, | ||
| 74 | - std::ostream& log_stream = std::cout, | ||
| 75 | - log::Level log_level = log::Warning | ||
| 76 | - ); | ||
| 77 | - | ||
| 78 | - /** | ||
| 79 | - * Initializes everything, connects over unix sockets to a Redis server. | ||
| 80 | - */ | ||
| 81 | - Redox( | ||
| 82 | - const std::string& path, | ||
| 83 | - std::function<void(int)> connection_callback, | ||
| 84 | std::ostream& log_stream = std::cout, | 72 | std::ostream& log_stream = std::cout, |
| 85 | log::Level log_level = log::Warning | 73 | log::Level log_level = log::Warning |
| 86 | ); | 74 | ); |
| @@ -92,10 +80,21 @@ public: | @@ -92,10 +80,21 @@ public: | ||
| 92 | ~Redox(); | 80 | ~Redox(); |
| 93 | 81 | ||
| 94 | /** | 82 | /** |
| 95 | - * Connects to Redis and starts an event loop in a separate thread. Returns | 83 | + * Connects to Redis over TCP and starts an event loop in a separate thread. Returns |
| 96 | * true once everything is ready, or false on failure. | 84 | * true once everything is ready, or false on failure. |
| 97 | */ | 85 | */ |
| 98 | - bool connect(); | 86 | + bool connect( |
| 87 | + const std::string& host = REDIS_DEFAULT_HOST, | ||
| 88 | + const int port = REDIS_DEFAULT_PORT, | ||
| 89 | + std::function<void(int)> connection_callback = nullptr); | ||
| 90 | + | ||
| 91 | + /** | ||
| 92 | + * Connects to Redis over a unix socket and starts an event loop in a separate | ||
| 93 | + * thread. Returns true once everything is ready, or false on failure. | ||
| 94 | + */ | ||
| 95 | + bool connect_unix( | ||
| 96 | + const std::string& path = REDIS_DEFAULT_PATH, | ||
| 97 | + std::function<void(int)> connection_callback = nullptr); | ||
| 99 | 98 | ||
| 100 | /** | 99 | /** |
| 101 | * Disconnect from Redis, shut down the event loop, then return. A simple | 100 | * Disconnect from Redis, shut down the event loop, then return. A simple |
| @@ -207,12 +206,13 @@ public: | @@ -207,12 +206,13 @@ public: | ||
| 207 | // Hiredis context, left public to allow low-level access | 206 | // Hiredis context, left public to allow low-level access |
| 208 | redisAsyncContext * ctx_; | 207 | redisAsyncContext * ctx_; |
| 209 | 208 | ||
| 209 | + // TODO make these private | ||
| 210 | // Redox server over TCP | 210 | // Redox server over TCP |
| 211 | - const std::string host_; | ||
| 212 | - const int port_; | 211 | + std::string host_; |
| 212 | + int port_; | ||
| 213 | 213 | ||
| 214 | // Redox server over unix | 214 | // Redox server over unix |
| 215 | - const std::string path_; | 215 | + std::string path_; |
| 216 | 216 | ||
| 217 | // Logger | 217 | // Logger |
| 218 | log::Logger logger_; | 218 | log::Logger logger_; |
| @@ -235,8 +235,9 @@ private: | @@ -235,8 +235,9 @@ private: | ||
| 235 | ); | 235 | ); |
| 236 | 236 | ||
| 237 | // Setup code for the constructors | 237 | // Setup code for the constructors |
| 238 | - void init_ev(); | ||
| 239 | - void init_hiredis(); | 238 | + // Return true on success, false on failure |
| 239 | + bool init_ev(); | ||
| 240 | + bool init_hiredis(); | ||
| 240 | 241 | ||
| 241 | // Callbacks invoked on server connection/disconnection | 242 | // Callbacks invoked on server connection/disconnection |
| 242 | static void connectedCallback(const redisAsyncContext* c, int status); | 243 | static void connectedCallback(const redisAsyncContext* c, int status); |
include/redox/subscriber.hpp
| @@ -29,22 +29,9 @@ class Subscriber { | @@ -29,22 +29,9 @@ class Subscriber { | ||
| 29 | public: | 29 | public: |
| 30 | 30 | ||
| 31 | /** | 31 | /** |
| 32 | - * Initializes everything, connects over TCP to a Redis server. | 32 | + * Constructor. Same as Redox. |
| 33 | */ | 33 | */ |
| 34 | Subscriber( | 34 | Subscriber( |
| 35 | - const std::string& host = REDIS_DEFAULT_HOST, | ||
| 36 | - const int port = REDIS_DEFAULT_PORT, | ||
| 37 | - std::function<void(int)> connection_callback = nullptr, | ||
| 38 | - std::ostream& log_stream = std::cout, | ||
| 39 | - log::Level log_level = log::Warning | ||
| 40 | - ); | ||
| 41 | - | ||
| 42 | - /** | ||
| 43 | - * Initializes everything, connects over unix sockets to a Redis server. | ||
| 44 | - */ | ||
| 45 | - Subscriber( | ||
| 46 | - const std::string& path, | ||
| 47 | - std::function<void(int)> connection_callback, | ||
| 48 | std::ostream& log_stream = std::cout, | 35 | std::ostream& log_stream = std::cout, |
| 49 | log::Level log_level = log::Warning | 36 | log::Level log_level = log::Warning |
| 50 | ); | 37 | ); |
| @@ -57,7 +44,21 @@ public: | @@ -57,7 +44,21 @@ public: | ||
| 57 | /** | 44 | /** |
| 58 | * Same as .connect() on a Redox instance. | 45 | * Same as .connect() on a Redox instance. |
| 59 | */ | 46 | */ |
| 60 | - bool connect() { return rdx_.connect(); } | 47 | + bool connect( |
| 48 | + const std::string& host = REDIS_DEFAULT_HOST, | ||
| 49 | + const int port = REDIS_DEFAULT_PORT, | ||
| 50 | + std::function<void(int)> connection_callback = nullptr) { | ||
| 51 | + return rdx_.connect(host, port, connection_callback); | ||
| 52 | + } | ||
| 53 | + | ||
| 54 | + /** | ||
| 55 | + * Same as .connect_unix() on a Redox instance. | ||
| 56 | + */ | ||
| 57 | + bool connect_unix( | ||
| 58 | + const std::string& path = REDIS_DEFAULT_PATH, | ||
| 59 | + std::function<void(int)> connection_callback = nullptr) { | ||
| 60 | + return rdx_.connect_unix(path, connection_callback); | ||
| 61 | + } | ||
| 61 | 62 | ||
| 62 | /** | 63 | /** |
| 63 | * Same as .stop() on a Redox instance. | 64 | * Same as .stop() on a Redox instance. |
src/client.cpp
| @@ -25,6 +25,93 @@ using namespace std; | @@ -25,6 +25,93 @@ using namespace std; | ||
| 25 | 25 | ||
| 26 | namespace redox { | 26 | namespace redox { |
| 27 | 27 | ||
| 28 | +Redox::Redox( | ||
| 29 | + ostream& log_stream, | ||
| 30 | + log::Level log_level | ||
| 31 | +) : logger_(log_stream, log_level) {} | ||
| 32 | + | ||
| 33 | +bool Redox::connect( | ||
| 34 | + const std::string& host, const int port, | ||
| 35 | + std::function<void(int)> connection_callback | ||
| 36 | +) { | ||
| 37 | + | ||
| 38 | + host_ = host; | ||
| 39 | + port_ = port; | ||
| 40 | + user_connection_callback_ = connection_callback; | ||
| 41 | + | ||
| 42 | + if(!init_ev()) return false; | ||
| 43 | + | ||
| 44 | + // Connect over TCP | ||
| 45 | + ctx_ = redisAsyncConnect(host.c_str(), port); | ||
| 46 | + | ||
| 47 | + if(!init_hiredis()) return false; | ||
| 48 | + | ||
| 49 | + event_loop_thread_ = thread([this] { runEventLoop(); }); | ||
| 50 | + | ||
| 51 | + // Block until connected and running the event loop, or until | ||
| 52 | + // a connection error happens and the event loop exits | ||
| 53 | + unique_lock<mutex> ul(running_waiter_lock_); | ||
| 54 | + running_waiter_.wait(ul, [this] { | ||
| 55 | + return running_.load() || connect_state_ == CONNECT_ERROR; | ||
| 56 | + }); | ||
| 57 | + | ||
| 58 | + // Return if succeeded | ||
| 59 | + return connect_state_ == CONNECTED; | ||
| 60 | +} | ||
| 61 | + | ||
| 62 | +bool Redox::connect_unix( | ||
| 63 | + const std::string& path, | ||
| 64 | + std::function<void(int)> connection_callback | ||
| 65 | +) { | ||
| 66 | + | ||
| 67 | + path_ = path; | ||
| 68 | + user_connection_callback_ = connection_callback; | ||
| 69 | + | ||
| 70 | + if(!init_ev()) return false; | ||
| 71 | + | ||
| 72 | + // Connect over unix sockets | ||
| 73 | + ctx_ = redisAsyncConnectUnix(path.c_str()); | ||
| 74 | + | ||
| 75 | + if(!init_hiredis()) return false; | ||
| 76 | + | ||
| 77 | + event_loop_thread_ = thread([this] { runEventLoop(); }); | ||
| 78 | + | ||
| 79 | + // Block until connected and running the event loop, or until | ||
| 80 | + // a connection error happens and the event loop exits | ||
| 81 | + unique_lock<mutex> ul(running_waiter_lock_); | ||
| 82 | + running_waiter_.wait(ul, [this] { | ||
| 83 | + return running_.load() || connect_state_ == CONNECT_ERROR; | ||
| 84 | + }); | ||
| 85 | + | ||
| 86 | + // Return if succeeded | ||
| 87 | + return connect_state_ == CONNECTED; | ||
| 88 | +} | ||
| 89 | + | ||
| 90 | +void Redox::disconnect() { | ||
| 91 | + stop(); | ||
| 92 | + wait(); | ||
| 93 | +} | ||
| 94 | + | ||
| 95 | +void Redox::stop() { | ||
| 96 | + to_exit_ = true; | ||
| 97 | + logger_.debug() << "stop() called, breaking event loop"; | ||
| 98 | + ev_async_send(evloop_, &watcher_stop_); | ||
| 99 | +} | ||
| 100 | + | ||
| 101 | +void Redox::wait() { | ||
| 102 | + unique_lock<mutex> ul(exit_waiter_lock_); | ||
| 103 | + exit_waiter_.wait(ul, [this] { return exited_.load(); }); | ||
| 104 | +} | ||
| 105 | + | ||
| 106 | +Redox::~Redox() { | ||
| 107 | + | ||
| 108 | + // Bring down the event loop | ||
| 109 | + stop(); | ||
| 110 | + | ||
| 111 | + if(event_loop_thread_.joinable()) event_loop_thread_.join(); | ||
| 112 | + ev_loop_destroy(evloop_); | ||
| 113 | +} | ||
| 114 | + | ||
| 28 | void Redox::connectedCallback(const redisAsyncContext* ctx, int status) { | 115 | void Redox::connectedCallback(const redisAsyncContext* ctx, int status) { |
| 29 | 116 | ||
| 30 | Redox* rdx = (Redox*) ctx->data; | 117 | Redox* rdx = (Redox*) ctx->data; |
| @@ -61,13 +148,14 @@ void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) { | @@ -61,13 +148,14 @@ void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) { | ||
| 61 | if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_); | 148 | if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_); |
| 62 | } | 149 | } |
| 63 | 150 | ||
| 64 | -void Redox::init_ev() { | 151 | +bool Redox::init_ev() { |
| 65 | signal(SIGPIPE, SIG_IGN); | 152 | signal(SIGPIPE, SIG_IGN); |
| 66 | evloop_ = ev_loop_new(EVFLAG_AUTO); | 153 | evloop_ = ev_loop_new(EVFLAG_AUTO); |
| 67 | ev_set_userdata(evloop_, (void*)this); // Back-reference | 154 | ev_set_userdata(evloop_, (void*)this); // Back-reference |
| 155 | + return true; | ||
| 68 | } | 156 | } |
| 69 | 157 | ||
| 70 | -void Redox::init_hiredis() { | 158 | +bool Redox::init_hiredis() { |
| 71 | 159 | ||
| 72 | ctx_->data = (void*)this; // Back-reference | 160 | ctx_->data = (void*)this; // Back-reference |
| 73 | 161 | ||
| @@ -75,7 +163,7 @@ void Redox::init_hiredis() { | @@ -75,7 +163,7 @@ void Redox::init_hiredis() { | ||
| 75 | logger_.error() << "Could not create a hiredis context: " << ctx_->errstr; | 163 | logger_.error() << "Could not create a hiredis context: " << ctx_->errstr; |
| 76 | connect_state_ = CONNECT_ERROR; | 164 | connect_state_ = CONNECT_ERROR; |
| 77 | connect_waiter_.notify_all(); | 165 | connect_waiter_.notify_all(); |
| 78 | - return; | 166 | + return false; |
| 79 | } | 167 | } |
| 80 | 168 | ||
| 81 | // Attach event loop to hiredis | 169 | // Attach event loop to hiredis |
| @@ -84,39 +172,8 @@ void Redox::init_hiredis() { | @@ -84,39 +172,8 @@ void Redox::init_hiredis() { | ||
| 84 | // Set the callbacks to be invoked on server connection/disconnection | 172 | // Set the callbacks to be invoked on server connection/disconnection |
| 85 | redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback); | 173 | redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback); |
| 86 | redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback); | 174 | redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback); |
| 87 | -} | ||
| 88 | - | ||
| 89 | -Redox::Redox( | ||
| 90 | - const string& host, const int port, | ||
| 91 | - function<void(int)> connection_callback, | ||
| 92 | - ostream& log_stream, | ||
| 93 | - log::Level log_level | ||
| 94 | -) : host_(host), port_(port), | ||
| 95 | - logger_(log_stream, log_level), | ||
| 96 | - user_connection_callback_(connection_callback) { | ||
| 97 | - | ||
| 98 | - init_ev(); | ||
| 99 | 175 | ||
| 100 | - // Connect over TCP | ||
| 101 | - ctx_ = redisAsyncConnect(host.c_str(), port); | ||
| 102 | - | ||
| 103 | - init_hiredis(); | ||
| 104 | -} | ||
| 105 | - | ||
| 106 | -Redox::Redox( | ||
| 107 | - const string& path, | ||
| 108 | - function<void(int)> connection_callback, | ||
| 109 | - ostream& log_stream, | ||
| 110 | - log::Level log_level | ||
| 111 | -) : host_(), port_(), path_(path), logger_(log_stream, log_level), | ||
| 112 | - user_connection_callback_(connection_callback) { | ||
| 113 | - | ||
| 114 | - init_ev(); | ||
| 115 | - | ||
| 116 | - // Connect over unix sockets | ||
| 117 | - ctx_ = redisAsyncConnectUnix(path.c_str()); | ||
| 118 | - | ||
| 119 | - init_hiredis(); | 176 | + return true; |
| 120 | } | 177 | } |
| 121 | 178 | ||
| 122 | void breakEventLoop(struct ev_loop* loop, ev_async* async, int revents) { | 179 | void breakEventLoop(struct ev_loop* loop, ev_async* async, int revents) { |
| @@ -192,45 +249,6 @@ void Redox::runEventLoop() { | @@ -192,45 +249,6 @@ void Redox::runEventLoop() { | ||
| 192 | logger_.info() << "Event thread exited."; | 249 | logger_.info() << "Event thread exited."; |
| 193 | } | 250 | } |
| 194 | 251 | ||
| 195 | -bool Redox::connect() { | ||
| 196 | - | ||
| 197 | - event_loop_thread_ = thread([this] { runEventLoop(); }); | ||
| 198 | - | ||
| 199 | - // Block until connected and running the event loop, or until | ||
| 200 | - // a connection error happens and the event loop exits | ||
| 201 | - unique_lock<mutex> ul(running_waiter_lock_); | ||
| 202 | - running_waiter_.wait(ul, [this] { | ||
| 203 | - return running_.load() || connect_state_ == CONNECT_ERROR; | ||
| 204 | - }); | ||
| 205 | - | ||
| 206 | - // Return if succeeded | ||
| 207 | - return connect_state_ == CONNECTED; | ||
| 208 | -} | ||
| 209 | - | ||
| 210 | -void Redox::disconnect() { | ||
| 211 | - stop(); | ||
| 212 | - wait(); | ||
| 213 | -} | ||
| 214 | - | ||
| 215 | -void Redox::stop() { | ||
| 216 | - to_exit_ = true; | ||
| 217 | - logger_.debug() << "stop() called, breaking event loop"; | ||
| 218 | - ev_async_send(evloop_, &watcher_stop_); | ||
| 219 | -} | ||
| 220 | - | ||
| 221 | -void Redox::wait() { | ||
| 222 | - unique_lock<mutex> ul(exit_waiter_lock_); | ||
| 223 | - exit_waiter_.wait(ul, [this] { return exited_.load(); }); | ||
| 224 | -} | ||
| 225 | - | ||
| 226 | -Redox::~Redox() { | ||
| 227 | - | ||
| 228 | - // Bring down the event loop | ||
| 229 | - stop(); | ||
| 230 | - | ||
| 231 | - if(event_loop_thread_.joinable()) event_loop_thread_.join(); | ||
| 232 | - ev_loop_destroy(evloop_); | ||
| 233 | -} | ||
| 234 | 252 | ||
| 235 | template<class ReplyT> | 253 | template<class ReplyT> |
| 236 | Command<ReplyT>* Redox::findCommand(long id) { | 254 | Command<ReplyT>* Redox::findCommand(long id) { |
src/subscriber.cpp
| @@ -26,22 +26,10 @@ using namespace std; | @@ -26,22 +26,10 @@ using namespace std; | ||
| 26 | namespace redox { | 26 | namespace redox { |
| 27 | 27 | ||
| 28 | Subscriber::Subscriber( | 28 | Subscriber::Subscriber( |
| 29 | - const std::string& host, const int port, | ||
| 30 | - std::function<void(int)> connection_callback, | ||
| 31 | std::ostream& log_stream, log::Level log_level | 29 | std::ostream& log_stream, log::Level log_level |
| 32 | -) : rdx_(host, port, connection_callback, log_stream, log_level), | ||
| 33 | - logger_(rdx_.logger_) {} | 30 | +) : rdx_(log_stream, log_level), logger_(rdx_.logger_) {} |
| 34 | 31 | ||
| 35 | -Subscriber::Subscriber( | ||
| 36 | - const std::string& path, | ||
| 37 | - std::function<void(int)> connection_callback, | ||
| 38 | - std::ostream& log_stream, log::Level log_level | ||
| 39 | -) : rdx_(path, connection_callback, log_stream, log_level), | ||
| 40 | - logger_(rdx_.logger_) {} | ||
| 41 | - | ||
| 42 | -Subscriber::~Subscriber() { | ||
| 43 | - | ||
| 44 | -} | 32 | +Subscriber::~Subscriber() {} |
| 45 | 33 | ||
| 46 | void Subscriber::disconnect() { | 34 | void Subscriber::disconnect() { |
| 47 | stop(); | 35 | stop(); |
test/test.cpp
| @@ -36,12 +36,12 @@ class RedoxTest : public ::testing::Test { | @@ -36,12 +36,12 @@ class RedoxTest : public ::testing::Test { | ||
| 36 | 36 | ||
| 37 | protected: | 37 | protected: |
| 38 | 38 | ||
| 39 | - Redox rdx = {"localhost", 6379}; | 39 | + Redox rdx; |
| 40 | 40 | ||
| 41 | RedoxTest() { | 41 | RedoxTest() { |
| 42 | 42 | ||
| 43 | // Connect to the server | 43 | // Connect to the server |
| 44 | - rdx.connect(); | 44 | + rdx.connect("localhost", 6379); |
| 45 | 45 | ||
| 46 | // Clear all keys used by the tests here | 46 | // Clear all keys used by the tests here |
| 47 | rdx.command("DEL redox_test:a"); | 47 | rdx.command("DEL redox_test:a"); |