diff --git a/include/redox/client.hpp b/include/redox/client.hpp index 9a1ec1d..12a95bc 100644 --- a/include/redox/client.hpp +++ b/include/redox/client.hpp @@ -309,12 +309,20 @@ private: // Helper function for freeAllCommands to access a specific command map template long freeAllCommandsOfType(); + // Helper functions to get/set variables with synchronization. + int getConnectState(); + void setConnectState(int connect_state); + int getRunning(); + void setRunning(bool running); + int getExited(); + void setExited(bool exited); + // ------------------------------------------------ // Private members // ------------------------------------------------ // Manage connection state - std::atomic_int connect_state_ = {NOT_YET_CONNECTED}; + int connect_state_ = NOT_YET_CONNECTED; std::mutex connect_lock_; std::condition_variable connect_waiter_; @@ -340,13 +348,13 @@ private: std::thread event_loop_thread_; // Variable and CV to know when the event loop starts running - std::atomic_bool running_ = {false}; + bool running_ = false; std::mutex running_lock_; std::condition_variable running_waiter_; // Variable and CV to know when the event loop stops running std::atomic_bool to_exit_ = {false}; // Signal to exit - std::atomic_bool exited_ = {false}; // Event thread exited + bool exited_ = false; // Event thread exited std::mutex exit_lock_; std::condition_variable exit_waiter_; diff --git a/src/client.cpp b/src/client.cpp index dc39192..154f218 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -51,18 +51,14 @@ bool Redox::connect(const string &host, const int port, // a connection error happens and the event loop exits { unique_lock ul(running_lock_); - running_waiter_.wait(ul, [this] - { - unique_lock(connect_lock_); - return running_.load() || connect_state_ == CONNECT_ERROR; + running_waiter_.wait(ul, [this] { + lock_guard lg(connect_lock_); + return running_ || connect_state_ == CONNECT_ERROR; }); } // Return if succeeded - { - unique_lock ul(connect_lock_); - return connect_state_ == CONNECTED; - } + return getConnectState() == CONNECTED; } bool Redox::connectUnix(const string &path, function connection_callback) { @@ -85,18 +81,14 @@ bool Redox::connectUnix(const string &path, function connection_callb // a connection error happens and the event loop exits { unique_lock ul(running_lock_); - running_waiter_.wait(ul, [this] - { - unique_lock ul(connect_lock_); - return running_.load() || connect_state_ == CONNECT_ERROR; + running_waiter_.wait(ul, [this] { + lock_guard lg(connect_lock_); + return running_ || connect_state_ == CONNECT_ERROR; }); } // Return if succeeded - { - unique_lock ul(connect_lock_); - return connect_state_ == CONNECTED; - } + return getConnectState() == CONNECTED; } void Redox::disconnect() { @@ -112,13 +104,13 @@ void Redox::stop() { void Redox::wait() { unique_lock ul(exit_lock_); - exit_waiter_.wait(ul, [this] { return exited_.load(); }); + exit_waiter_.wait(ul, [this] { return exited_; }); } Redox::~Redox() { // Bring down the event loop - if (running_ == true) { + if (getRunning()) { stop(); } @@ -135,25 +127,18 @@ void Redox::connectedCallback(const redisAsyncContext *ctx, int status) { if (status != REDIS_OK) { rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr; rdx->logger_.fatal() << "Status: " << status; - unique_lock lk(rdx->connect_lock_); - rdx->connect_state_ = CONNECT_ERROR; + rdx->setConnectState(CONNECT_ERROR); } else { rdx->logger_.info() << "Connected to Redis."; - unique_lock lk(rdx->connect_lock_); // Disable hiredis automatically freeing reply objects ctx->c.reader->fn->freeObject = [](void *reply) {}; - rdx->connect_state_ = CONNECTED; + rdx->setConnectState(CONNECTED); } - int state; - { - unique_lock lk(rdx->connect_lock_); - state = rdx->connect_state_; + if (rdx->user_connection_callback_) { + rdx->user_connection_callback_(rdx->getConnectState()); } - rdx->connect_waiter_.notify_all(); - if (rdx->user_connection_callback_) - rdx->user_connection_callback_(state); } void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) { @@ -162,23 +147,16 @@ void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) { if (status != REDIS_OK) { rdx->logger_.error() << "Disconnected from Redis on error: " << ctx->errstr; - unique_lock lk(rdx->connect_lock_); - rdx->connect_state_ = DISCONNECT_ERROR; + rdx->setConnectState(DISCONNECT_ERROR); } else { rdx->logger_.info() << "Disconnected from Redis as planned."; - unique_lock lk(rdx->connect_lock_); - rdx->connect_state_ = DISCONNECTED; + rdx->setConnectState(DISCONNECTED); } rdx->stop(); - int state; - { - unique_lock lk(rdx->connect_lock_); - state = rdx->connect_state_; + if (rdx->user_connection_callback_) { + rdx->user_connection_callback_(rdx->getConnectState()); } - rdx->connect_waiter_.notify_all(); - if (rdx->user_connection_callback_) - rdx->user_connection_callback_(state); } bool Redox::initEv() { @@ -186,11 +164,7 @@ bool Redox::initEv() { evloop_ = ev_loop_new(EVFLAG_AUTO); if (evloop_ == nullptr) { logger_.fatal() << "Could not create a libev event loop."; - { - unique_lock lk(connect_lock_); - connect_state_ = INIT_ERROR; - } - connect_waiter_.notify_all(); + setConnectState(INIT_ERROR); return false; } ev_set_userdata(evloop_, (void *)this); // Back-reference @@ -203,43 +177,27 @@ bool Redox::initHiredis() { if (ctx_->err) { logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr; - { - unique_lock lk(connect_lock_); - connect_state_ = INIT_ERROR; - } - connect_waiter_.notify_all(); + setConnectState(INIT_ERROR); return false; } // Attach event loop to hiredis if (redisLibevAttach(evloop_, ctx_) != REDIS_OK) { logger_.fatal() << "Could not attach libev event loop to hiredis."; - { - unique_lock lk(connect_lock_); - connect_state_ = INIT_ERROR; - } - connect_waiter_.notify_all(); + setConnectState(INIT_ERROR); return false; } // Set the callbacks to be invoked on server connection/disconnection if (redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) { logger_.fatal() << "Could not attach connect callback to hiredis."; - { - unique_lock lk(connect_lock_); - connect_state_ = INIT_ERROR; - } - connect_waiter_.notify_all(); + setConnectState(INIT_ERROR); return false; } if (redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) { logger_.fatal() << "Could not attach disconnect callback to hiredis."; - { - unique_lock lk(connect_lock_); - connect_state_ = INIT_ERROR; - } - connect_waiter_.notify_all(); + setConnectState(INIT_ERROR); return false; } @@ -258,6 +216,43 @@ void breakEventLoop(struct ev_loop *loop, ev_async *async, int revents) { ev_break(loop, EVBREAK_ALL); } +int Redox::getConnectState() { + lock_guard lk(connect_lock_); + return connect_state_; +} + +void Redox::setConnectState(int connect_state) { + { + lock_guard lk(connect_lock_); + connect_state_ = connect_state; + } + connect_waiter_.notify_all(); +} + +int Redox::getRunning() { + lock_guard lg(running_lock_); + return running_; +} +void Redox::setRunning(bool running) { + { + lock_guard lg(running_lock_); + running_ = running; + } + running_waiter_.notify_one(); +} + +int Redox::getExited() { + lock_guard lg(exit_lock_); + return exited_; +} +void Redox::setExited(bool exited) { + { + lock_guard lg(exit_lock_); + exited_ = exited; + } + exit_waiter_.notify_one(); +} + void Redox::runEventLoop() { // Events to connect to Redox @@ -272,18 +267,8 @@ void Redox::runEventLoop() { // Handle connection error if (connect_state_ != CONNECTED) { logger_.warning() << "Did not connect, event loop exiting."; - { - unique_lock ul(exit_lock_); - exited_ = true; - } - - { - unique_lock ul(running_lock_); - running_ = false; - } - - running_waiter_.notify_one(); - exit_waiter_.notify_one(); + setExited(true); + setRunning(false); return; } } @@ -302,11 +287,7 @@ void Redox::runEventLoop() { ev_async_init(&watcher_free_, freeQueuedCommands); ev_async_start(evloop_, &watcher_free_); - { - unique_lock ul(running_lock_); - running_ = true; - running_waiter_.notify_one(); - } + setRunning(true); // Run the event loop, using NOWAIT if enabled for maximum // throughput by avoiding any sleeping @@ -326,15 +307,10 @@ void Redox::runEventLoop() { // Wait to receive server replies for clean hiredis disconnect this_thread::sleep_for(chrono::milliseconds(10)); ev_run(evloop_, EVRUN_NOWAIT); - - int state; - { - unique_lock ul(connect_lock_); - state = connect_state_; - } - if (connect_state_ == CONNECTED) + if (getConnectState() == CONNECTED) { redisAsyncDisconnect(ctx_); + } // Run once more to disconnect ev_run(evloop_, EVRUN_NOWAIT); @@ -346,19 +322,9 @@ void Redox::runEventLoop() { << created; } - { - unique_lock ul(exit_lock_); - exited_ = true; - } - - { - unique_lock ul(running_lock_); - running_ = false; - } - // Let go for block_until_stopped method - running_waiter_.notify_one(); - exit_waiter_.notify_one(); + setExited(true); + setRunning(false); logger_.info() << "Event thread exited."; }