From b0c70bce04fcf50590a35dbed5799f7619be2b06 Mon Sep 17 00:00:00 2001 From: Hayk Martirosyan Date: Fri, 30 Jan 2015 01:47:39 -0800 Subject: [PATCH] Move everything out of constructor to .connect() --- CMakeLists.txt | 12 ++++++------ README.md | 12 +++++++----- examples/basic.cpp | 5 ++--- examples/basic_threaded.cpp | 4 ++-- examples/binary_data.cpp | 4 ++-- examples/speed_test_async.cpp | 4 ++-- examples/speed_test_async_multi.cpp | 4 ++-- examples/speed_test_sync.cpp | 4 ++-- include/redox/client.hpp | 43 ++++++++++++++++++++++--------------------- include/redox/subscriber.hpp | 31 ++++++++++++++++--------------- src/client.cpp | 166 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------- src/subscriber.cpp | 16 ++-------------- test/test.cpp | 4 ++-- 13 files changed, 159 insertions(+), 150 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2fd23be..29dffef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,7 +26,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall") # --------------------------------------------------------- set(SRC_DIR ${CMAKE_SOURCE_DIR}/src) -set(INC_DIR ${CMAKE_SOURCE_DIR}/include/redox) +set(INC_DIR ${CMAKE_SOURCE_DIR}/include) set(SRC_CORE ${SRC_DIR}/client.cpp @@ -34,20 +34,20 @@ set(SRC_CORE ${SRC_DIR}/subscriber.cpp) set(INC_CORE - ${INC_DIR}/client.hpp - ${INC_DIR}/subscriber.hpp - ${INC_DIR}/command.hpp) + ${INC_DIR}/redox/client.hpp + ${INC_DIR}/redox/subscriber.hpp + ${INC_DIR}/redox/command.hpp) set(SRC_UTILS ${SRC_DIR}/utils/logger.cpp) set(INC_UTILS ${INC_DIR}/utils/logger.hpp) -set(INC_WRAPPER ${CMAKE_SOURCE_DIR}/include/redox.hpp) +set(INC_WRAPPER ${INC_DIR}/redox.hpp) set(SRC_ALL ${SRC_CORE} ${SRC_UTILS}) set(INC_ALL ${INC_CORE} ${INC_UTILS} ${INC_WRAPPER}) include_directories(${INC_DIR}) -#include_directories(${INC_DIR/redox}) +include_directories(${INC_DIR}/redox) # Dependent libraries - you may have to change # pthread to whatever C++11 threads depends on diff --git a/README.md b/README.md index 77f540d..eaec05e 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ details so you can move on to the interesting part of your project. * Automatic pipelining, even for synchronous calls from separate threads * Low-level access when needed * Accessible and robust error handling - * Logs to any ostream at a user-controllable log level + * Logs to any ostream at a user-controlled log level * Fast - developed for robotics applications * 100% clean Valgrind reports @@ -55,8 +55,8 @@ Here is the simplest possible redox program: int main(int argc, char* argv[]) { - Redox rdx = {"localhost", 6379}; - if(!rdx.connect()) return 1; + Redox rdx; + if(!rdx.connect("localhost", 6379)) return 1; rdx.set("hello", "world!"); cout << "Hello, " << rdx.get("hello") << endl; @@ -104,8 +104,10 @@ the callback returns. Here is a simple example of running `GET hello` asynchronously ten times: - Redox rdx; // Localhost by default - if(!rdx.connect()) return 1; // Block until connected + Redox rdx; + + // Block until connected, localhost by default + if(!rdx.connect()) return 1; auto got_reply = [](Command& c) { if(!c.ok()) return; diff --git a/examples/basic.cpp b/examples/basic.cpp index 0a8e8bb..d0eb9e3 100644 --- a/examples/basic.cpp +++ b/examples/basic.cpp @@ -12,9 +12,8 @@ using redox::Subscriber; int main(int argc, char* argv[]) { - Redox rdx = {"localhost", 6379}; // Initialize Redox - - if(!rdx.connect()) return 1; // Start the event loop + Redox rdx; + if(!rdx.connect("localhost", 6379)) return 1; rdx.del("occupation"); diff --git a/examples/basic_threaded.cpp b/examples/basic_threaded.cpp index 8b246f7..b217196 100644 --- a/examples/basic_threaded.cpp +++ b/examples/basic_threaded.cpp @@ -11,11 +11,11 @@ using namespace std; using redox::Redox; using redox::Command; -redox::Redox rdx = {"localhost", 6379}; +redox::Redox rdx; int main(int argc, char* argv[]) { - if(!rdx.connect()) return 1; + if(!rdx.connect("localhost", 6379)) return 1; thread setter([]() { for(int i = 0; i < 5000; i++) { diff --git a/examples/binary_data.cpp b/examples/binary_data.cpp index 757e62a..313f416 100644 --- a/examples/binary_data.cpp +++ b/examples/binary_data.cpp @@ -22,8 +22,8 @@ std::string random_string(size_t length) { int main(int argc, char* argv[]) { - redox::Redox rdx = {"localhost", 6379}; // Initialize Redox - if(!rdx.connect()) return 1; // Start the event loop + redox::Redox rdx; // Initialize Redox + if(!rdx.connect("localhost", 6379)) return 1; // Start the event loop rdx.del("binary"); diff --git a/examples/speed_test_async.cpp b/examples/speed_test_async.cpp index 101fb08..315ddb0 100644 --- a/examples/speed_test_async.cpp +++ b/examples/speed_test_async.cpp @@ -18,8 +18,8 @@ double time_s() { int main(int argc, char* argv[]) { - Redox rdx = {"/var/run/redis/redis.sock", nullptr}; - if(!rdx.connect()) return 1; + Redox rdx; + if(!rdx.connect_unix("/var/run/redis/redis.sock")) return 1; bool status = rdx.commandSync("SET simple_loop:count 0"); if(status) { diff --git a/examples/speed_test_async_multi.cpp b/examples/speed_test_async_multi.cpp index e37cf5e..a5243e0 100644 --- a/examples/speed_test_async_multi.cpp +++ b/examples/speed_test_async_multi.cpp @@ -19,8 +19,8 @@ double time_s() { int main(int argc, char* argv[]) { - Redox rdx = {"localhost", 6379}; - if(!rdx.connect()) return 1; + Redox rdx; + if(!rdx.connect("localhost", 6379)) return 1; if(rdx.set("simple_loop:count", "0")) { cout << "Reset the counter to zero." << endl; diff --git a/examples/speed_test_sync.cpp b/examples/speed_test_sync.cpp index 69f1761..6215563 100644 --- a/examples/speed_test_sync.cpp +++ b/examples/speed_test_sync.cpp @@ -17,8 +17,8 @@ double time_s() { int main(int argc, char* argv[]) { - Redox rdx = {"localhost", 6379}; - if(!rdx.connect()) return 1; + Redox rdx; + if(!rdx.connect("localhost", 6379)) return 1; if(rdx.commandSync("SET simple_loop:count 0")) { cout << "Reset the counter to zero." << endl; diff --git a/include/redox/client.hpp b/include/redox/client.hpp index d5f7570..c9a6af7 100644 --- a/include/redox/client.hpp +++ b/include/redox/client.hpp @@ -45,6 +45,7 @@ namespace redox { static const std::string REDIS_DEFAULT_HOST = "localhost"; static const int REDIS_DEFAULT_PORT = 6379; +static const std::string REDIS_DEFAULT_PATH = "/var/run/redis/redis.sock"; /** * Redox intro here. @@ -65,22 +66,9 @@ public: // ------------------------------------------------ /** - * Initializes everything, connects over TCP to a Redis server. + * Constructor. Optionally specify a log stream and a log level. */ Redox( - const std::string& host = REDIS_DEFAULT_HOST, - const int port = REDIS_DEFAULT_PORT, - std::function connection_callback = nullptr, - std::ostream& log_stream = std::cout, - log::Level log_level = log::Warning - ); - - /** - * Initializes everything, connects over unix sockets to a Redis server. - */ - Redox( - const std::string& path, - std::function connection_callback, std::ostream& log_stream = std::cout, log::Level log_level = log::Warning ); @@ -92,10 +80,21 @@ public: ~Redox(); /** - * Connects to Redis and starts an event loop in a separate thread. Returns + * Connects to Redis over TCP and starts an event loop in a separate thread. Returns * true once everything is ready, or false on failure. */ - bool connect(); + bool connect( + const std::string& host = REDIS_DEFAULT_HOST, + const int port = REDIS_DEFAULT_PORT, + std::function connection_callback = nullptr); + + /** + * Connects to Redis over a unix socket and starts an event loop in a separate + * thread. Returns true once everything is ready, or false on failure. + */ + bool connect_unix( + const std::string& path = REDIS_DEFAULT_PATH, + std::function connection_callback = nullptr); /** * Disconnect from Redis, shut down the event loop, then return. A simple @@ -207,12 +206,13 @@ public: // Hiredis context, left public to allow low-level access redisAsyncContext * ctx_; + // TODO make these private // Redox server over TCP - const std::string host_; - const int port_; + std::string host_; + int port_; // Redox server over unix - const std::string path_; + std::string path_; // Logger log::Logger logger_; @@ -235,8 +235,9 @@ private: ); // Setup code for the constructors - void init_ev(); - void init_hiredis(); + // Return true on success, false on failure + bool init_ev(); + bool init_hiredis(); // Callbacks invoked on server connection/disconnection static void connectedCallback(const redisAsyncContext* c, int status); diff --git a/include/redox/subscriber.hpp b/include/redox/subscriber.hpp index bbb444b..2b1b373 100644 --- a/include/redox/subscriber.hpp +++ b/include/redox/subscriber.hpp @@ -29,22 +29,9 @@ class Subscriber { public: /** - * Initializes everything, connects over TCP to a Redis server. + * Constructor. Same as Redox. */ Subscriber( - const std::string& host = REDIS_DEFAULT_HOST, - const int port = REDIS_DEFAULT_PORT, - std::function connection_callback = nullptr, - std::ostream& log_stream = std::cout, - log::Level log_level = log::Warning - ); - - /** - * Initializes everything, connects over unix sockets to a Redis server. - */ - Subscriber( - const std::string& path, - std::function connection_callback, std::ostream& log_stream = std::cout, log::Level log_level = log::Warning ); @@ -57,7 +44,21 @@ public: /** * Same as .connect() on a Redox instance. */ - bool connect() { return rdx_.connect(); } + bool connect( + const std::string& host = REDIS_DEFAULT_HOST, + const int port = REDIS_DEFAULT_PORT, + std::function connection_callback = nullptr) { + return rdx_.connect(host, port, connection_callback); + } + + /** + * Same as .connect_unix() on a Redox instance. + */ + bool connect_unix( + const std::string& path = REDIS_DEFAULT_PATH, + std::function connection_callback = nullptr) { + return rdx_.connect_unix(path, connection_callback); + } /** * Same as .stop() on a Redox instance. diff --git a/src/client.cpp b/src/client.cpp index d804459..f7f8153 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -25,6 +25,93 @@ using namespace std; namespace redox { +Redox::Redox( + ostream& log_stream, + log::Level log_level +) : logger_(log_stream, log_level) {} + +bool Redox::connect( + const std::string& host, const int port, + std::function connection_callback +) { + + host_ = host; + port_ = port; + user_connection_callback_ = connection_callback; + + if(!init_ev()) return false; + + // Connect over TCP + ctx_ = redisAsyncConnect(host.c_str(), port); + + if(!init_hiredis()) return false; + + event_loop_thread_ = thread([this] { runEventLoop(); }); + + // Block until connected and running the event loop, or until + // a connection error happens and the event loop exits + unique_lock ul(running_waiter_lock_); + running_waiter_.wait(ul, [this] { + return running_.load() || connect_state_ == CONNECT_ERROR; + }); + + // Return if succeeded + return connect_state_ == CONNECTED; +} + +bool Redox::connect_unix( + const std::string& path, + std::function connection_callback +) { + + path_ = path; + user_connection_callback_ = connection_callback; + + if(!init_ev()) return false; + + // Connect over unix sockets + ctx_ = redisAsyncConnectUnix(path.c_str()); + + if(!init_hiredis()) return false; + + event_loop_thread_ = thread([this] { runEventLoop(); }); + + // Block until connected and running the event loop, or until + // a connection error happens and the event loop exits + unique_lock ul(running_waiter_lock_); + running_waiter_.wait(ul, [this] { + return running_.load() || connect_state_ == CONNECT_ERROR; + }); + + // Return if succeeded + return connect_state_ == CONNECTED; +} + +void Redox::disconnect() { + stop(); + wait(); +} + +void Redox::stop() { + to_exit_ = true; + logger_.debug() << "stop() called, breaking event loop"; + ev_async_send(evloop_, &watcher_stop_); +} + +void Redox::wait() { + unique_lock ul(exit_waiter_lock_); + exit_waiter_.wait(ul, [this] { return exited_.load(); }); +} + +Redox::~Redox() { + + // Bring down the event loop + stop(); + + if(event_loop_thread_.joinable()) event_loop_thread_.join(); + ev_loop_destroy(evloop_); +} + void Redox::connectedCallback(const redisAsyncContext* ctx, int status) { Redox* rdx = (Redox*) ctx->data; @@ -61,13 +148,14 @@ void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) { if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_); } -void Redox::init_ev() { +bool Redox::init_ev() { signal(SIGPIPE, SIG_IGN); evloop_ = ev_loop_new(EVFLAG_AUTO); ev_set_userdata(evloop_, (void*)this); // Back-reference + return true; } -void Redox::init_hiredis() { +bool Redox::init_hiredis() { ctx_->data = (void*)this; // Back-reference @@ -75,7 +163,7 @@ void Redox::init_hiredis() { logger_.error() << "Could not create a hiredis context: " << ctx_->errstr; connect_state_ = CONNECT_ERROR; connect_waiter_.notify_all(); - return; + return false; } // Attach event loop to hiredis @@ -84,39 +172,8 @@ void Redox::init_hiredis() { // Set the callbacks to be invoked on server connection/disconnection redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback); redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback); -} - -Redox::Redox( - const string& host, const int port, - function connection_callback, - ostream& log_stream, - log::Level log_level -) : host_(host), port_(port), - logger_(log_stream, log_level), - user_connection_callback_(connection_callback) { - - init_ev(); - // Connect over TCP - ctx_ = redisAsyncConnect(host.c_str(), port); - - init_hiredis(); -} - -Redox::Redox( - const string& path, - function connection_callback, - ostream& log_stream, - log::Level log_level -) : host_(), port_(), path_(path), logger_(log_stream, log_level), - user_connection_callback_(connection_callback) { - - init_ev(); - - // Connect over unix sockets - ctx_ = redisAsyncConnectUnix(path.c_str()); - - init_hiredis(); + return true; } void breakEventLoop(struct ev_loop* loop, ev_async* async, int revents) { @@ -192,45 +249,6 @@ void Redox::runEventLoop() { logger_.info() << "Event thread exited."; } -bool Redox::connect() { - - event_loop_thread_ = thread([this] { runEventLoop(); }); - - // Block until connected and running the event loop, or until - // a connection error happens and the event loop exits - unique_lock ul(running_waiter_lock_); - running_waiter_.wait(ul, [this] { - return running_.load() || connect_state_ == CONNECT_ERROR; - }); - - // Return if succeeded - return connect_state_ == CONNECTED; -} - -void Redox::disconnect() { - stop(); - wait(); -} - -void Redox::stop() { - to_exit_ = true; - logger_.debug() << "stop() called, breaking event loop"; - ev_async_send(evloop_, &watcher_stop_); -} - -void Redox::wait() { - unique_lock ul(exit_waiter_lock_); - exit_waiter_.wait(ul, [this] { return exited_.load(); }); -} - -Redox::~Redox() { - - // Bring down the event loop - stop(); - - if(event_loop_thread_.joinable()) event_loop_thread_.join(); - ev_loop_destroy(evloop_); -} template Command* Redox::findCommand(long id) { diff --git a/src/subscriber.cpp b/src/subscriber.cpp index 571bf69..5deb0de 100644 --- a/src/subscriber.cpp +++ b/src/subscriber.cpp @@ -26,22 +26,10 @@ using namespace std; namespace redox { Subscriber::Subscriber( - const std::string& host, const int port, - std::function connection_callback, std::ostream& log_stream, log::Level log_level -) : rdx_(host, port, connection_callback, log_stream, log_level), - logger_(rdx_.logger_) {} +) : rdx_(log_stream, log_level), logger_(rdx_.logger_) {} -Subscriber::Subscriber( - const std::string& path, - std::function connection_callback, - std::ostream& log_stream, log::Level log_level -) : rdx_(path, connection_callback, log_stream, log_level), - logger_(rdx_.logger_) {} - -Subscriber::~Subscriber() { - -} +Subscriber::~Subscriber() {} void Subscriber::disconnect() { stop(); diff --git a/test/test.cpp b/test/test.cpp index ef93268..46dadd7 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -36,12 +36,12 @@ class RedoxTest : public ::testing::Test { protected: - Redox rdx = {"localhost", 6379}; + Redox rdx; RedoxTest() { // Connect to the server - rdx.connect(); + rdx.connect("localhost", 6379); // Clear all keys used by the tests here rdx.command("DEL redox_test:a"); -- libgit2 0.21.4