diff --git a/include/redox/client.hpp b/include/redox/client.hpp index 05c6969..364dc35 100644 --- a/include/redox/client.hpp +++ b/include/redox/client.hpp @@ -341,13 +341,13 @@ private: // Variable and CV to know when the event loop starts running std::atomic_bool running_ = {false}; - std::mutex running_waiter_lock_; + std::mutex running_lock_; std::condition_variable running_waiter_; // Variable and CV to know when the event loop stops running std::atomic_bool to_exit_ = {false}; // Signal to exit std::atomic_bool exited_ = {false}; // Event thread exited - std::mutex exit_waiter_lock_; + std::mutex exit_lock_; std::condition_variable exit_waiter_; // Maps of each Command, fetchable by the unique ID number diff --git a/src/client.cpp b/src/client.cpp index 7d652d0..2865f78 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -49,11 +49,20 @@ bool Redox::connect(const string &host, const int port, // Block until connected and running the event loop, or until // a connection error happens and the event loop exits - unique_lock ul(running_waiter_lock_); - running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); + { + unique_lock ul(running_lock_); + running_waiter_.wait(ul, [this] + { + unique_lock(connect_lock_); + return running_.load() || connect_state_ == CONNECT_ERROR; + }); + } // Return if succeeded - return connect_state_ == CONNECTED; + { + unique_lock ul(connect_lock_); + return connect_state_ == CONNECTED; + } } bool Redox::connectUnix(const string &path, function connection_callback) { @@ -74,8 +83,14 @@ bool Redox::connectUnix(const string &path, function connection_callb // Block until connected and running the event loop, or until // a connection error happens and the event loop exits - unique_lock ul(running_waiter_lock_); - running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); + { + unique_lock ul(running_lock_); + running_waiter_.wait(ul, [this] + { + unique_lock ul(connect_lock_); + return running_.load() || connect_state_ == CONNECT_ERROR; + }); + } // Return if succeeded return connect_state_ == CONNECTED; @@ -93,7 +108,7 @@ void Redox::stop() { } void Redox::wait() { - unique_lock ul(exit_waiter_lock_); + unique_lock ul(exit_lock_); exit_waiter_.wait(ul, [this] { return exited_.load(); }); } @@ -118,13 +133,15 @@ void Redox::connectedCallback(const redisAsyncContext *ctx, int status) { if (status != REDIS_OK) { rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr; rdx->logger_.fatal() << "Status: " << status; + unique_lock lk(rdx->connect_lock_); rdx->connect_state_ = CONNECT_ERROR; } else { + rdx->logger_.info() << "Connected to Redis."; + unique_lock lk(rdx->connect_lock_); // Disable hiredis automatically freeing reply objects ctx->c.reader->fn->freeObject = [](void *reply) {}; rdx->connect_state_ = CONNECTED; - rdx->logger_.info() << "Connected to Redis."; } rdx->connect_waiter_.notify_all(); @@ -138,9 +155,11 @@ void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) { if (status != REDIS_OK) { rdx->logger_.error() << "Disconnected from Redis on error: " << ctx->errstr; + unique_lock lk(rdx->connect_lock_); rdx->connect_state_ = DISCONNECT_ERROR; } else { rdx->logger_.info() << "Disconnected from Redis as planned."; + unique_lock lk(rdx->connect_lock_); rdx->connect_state_ = DISCONNECTED; } @@ -155,7 +174,10 @@ bool Redox::initEv() { evloop_ = ev_loop_new(EVFLAG_AUTO); if (evloop_ == nullptr) { logger_.fatal() << "Could not create a libev event loop."; - connect_state_ = INIT_ERROR; + { + unique_lock lk(connect_lock_); + connect_state_ = INIT_ERROR; + } connect_waiter_.notify_all(); return false; } @@ -169,7 +191,10 @@ bool Redox::initHiredis() { if (ctx_->err) { logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr; - connect_state_ = INIT_ERROR; + { + unique_lock lk(connect_lock_); + connect_state_ = INIT_ERROR; + } connect_waiter_.notify_all(); return false; } @@ -177,7 +202,10 @@ bool Redox::initHiredis() { // Attach event loop to hiredis if (redisLibevAttach(evloop_, ctx_) != REDIS_OK) { logger_.fatal() << "Could not attach libev event loop to hiredis."; - connect_state_ = INIT_ERROR; + { + unique_lock lk(connect_lock_); + connect_state_ = INIT_ERROR; + } connect_waiter_.notify_all(); return false; } @@ -185,14 +213,20 @@ bool Redox::initHiredis() { // Set the callbacks to be invoked on server connection/disconnection if (redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) { logger_.fatal() << "Could not attach connect callback to hiredis."; - connect_state_ = INIT_ERROR; + { + unique_lock lk(connect_lock_); + connect_state_ = INIT_ERROR; + } connect_waiter_.notify_all(); return false; } if (redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) { logger_.fatal() << "Could not attach disconnect callback to hiredis."; - connect_state_ = INIT_ERROR; + { + unique_lock lk(connect_lock_); + connect_state_ = INIT_ERROR; + } connect_waiter_.notify_all(); return false; } @@ -219,16 +253,26 @@ void Redox::runEventLoop() { ev_run(evloop_, EVRUN_NOWAIT); // Block until connected to Redis, or error - unique_lock ul(connect_lock_); - connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; }); - - // Handle connection error - if (connect_state_ != CONNECTED) { - logger_.warning() << "Did not connect, event loop exiting."; - exited_ = true; - running_ = false; - running_waiter_.notify_one(); - return; + { + unique_lock ul(connect_lock_); + connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; }); + + // Handle connection error + if (connect_state_ != CONNECTED) { + logger_.warning() << "Did not connect, event loop exiting."; + { + unique_lock ul(exit_lock_); + exited_ = true; + } + + { + unique_lock ul(running_lock_); + running_ = false; + } + + running_waiter_.notify_one(); + return; + } } // Set up asynchronous watcher which we signal every @@ -245,8 +289,11 @@ void Redox::runEventLoop() { ev_async_init(&watcher_free_, freeQueuedCommands); ev_async_start(evloop_, &watcher_free_); - running_ = true; - running_waiter_.notify_one(); + { + unique_lock ul(running_lock_); + running_ = true; + running_waiter_.notify_one(); + } // Run the event loop, using NOWAIT if enabled for maximum // throughput by avoiding any sleeping @@ -278,10 +325,18 @@ void Redox::runEventLoop() { << commands_created_; } - exited_ = true; - running_ = false; + { + unique_lock ul(exit_lock_); + exited_ = true; + } + + { + unique_lock ul(running_lock_); + running_ = false; + } // Let go for block_until_stopped method + running_waiter_.notify_one(); exit_waiter_.notify_one(); logger_.info() << "Event thread exited."; @@ -458,6 +513,7 @@ template long Redox::freeAllCommandsOfType() { lock_guard lg(free_queue_guard_); lock_guard lg2(queue_guard_); + lock_guard lg3(command_map_guard_); auto &command_map = getCommandMap(); long len = command_map.size();