diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c785c6..c49c6da 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,8 +2,8 @@ cmake_minimum_required(VERSION 2.8.4) project(redox) set(REDOX_VERSION_MAJOR 0) -set(REDOX_VERSION_MINOR 2) -set(REDOX_VERSION_PATCH 3) +set(REDOX_VERSION_MINOR 3) +set(REDOX_VERSION_PATCH 0) set(REDOX_VERSION_STRING ${REDOX_VERSION_MAJOR}.${REDOX_VERSION_MINOR}.${REDOX_VERSION_PATCH}) option(lib "Build Redox as a dynamic library." ON) diff --git a/HISTORY.md b/HISTORY.md index e85239c..84f80b7 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,8 @@ # Release History +## 0.3 (2015-09-28) +A bunch of multithreading fixes, thanks to @bdallas. + ## 0.2 (2015-01-31) * Move to vector of strings as input, to handle arbitrary data better and improve speed. diff --git a/include/redox/client.hpp b/include/redox/client.hpp index 05c6969..12a95bc 100644 --- a/include/redox/client.hpp +++ b/include/redox/client.hpp @@ -309,12 +309,20 @@ private: // Helper function for freeAllCommands to access a specific command map template long freeAllCommandsOfType(); + // Helper functions to get/set variables with synchronization. + int getConnectState(); + void setConnectState(int connect_state); + int getRunning(); + void setRunning(bool running); + int getExited(); + void setExited(bool exited); + // ------------------------------------------------ // Private members // ------------------------------------------------ // Manage connection state - std::atomic_int connect_state_ = {NOT_YET_CONNECTED}; + int connect_state_ = NOT_YET_CONNECTED; std::mutex connect_lock_; std::condition_variable connect_waiter_; @@ -340,14 +348,14 @@ private: std::thread event_loop_thread_; // Variable and CV to know when the event loop starts running - std::atomic_bool running_ = {false}; - std::mutex running_waiter_lock_; + bool running_ = false; + std::mutex running_lock_; std::condition_variable running_waiter_; // Variable and CV to know when the event loop stops running std::atomic_bool to_exit_ = {false}; // Signal to exit - std::atomic_bool exited_ = {false}; // Event thread exited - std::mutex exit_waiter_lock_; + bool exited_ = false; // Event thread exited + std::mutex exit_lock_; std::condition_variable exit_waiter_; // Maps of each Command, fetchable by the unique ID number @@ -393,14 +401,15 @@ template Command &Redox::createCommand(const std::vector &cmd, const std::function &)> &callback, double repeat, double after, bool free_memory) { - - if (!running_) { - throw std::runtime_error("[ERROR] Need to connect Redox before running commands!"); + { + std::unique_lock ul(running_lock_); + if (!running_) { + throw std::runtime_error("[ERROR] Need to connect Redox before running commands!"); + } } - commands_created_ += 1; - auto *c = new Command(this, commands_created_, cmd, callback, repeat, after, free_memory, - logger_); + auto *c = new Command(this, commands_created_.fetch_add(1), cmd, + callback, repeat, after, free_memory, logger_); std::lock_guard lg(queue_guard_); std::lock_guard lg2(command_map_guard_); diff --git a/include/redox/command.hpp b/include/redox/command.hpp index be7f30a..69b44c4 100644 --- a/include/redox/command.hpp +++ b/include/redox/command.hpp @@ -133,7 +133,7 @@ private: // Place to store the reply value and status. ReplyT reply_val_; - std::atomic_int reply_status_; + int reply_status_; std::string last_error_; // How many messages sent to server but not received reply diff --git a/include/redox/subscriber.hpp b/include/redox/subscriber.hpp index 9635ec7..63a89f4 100644 --- a/include/redox/subscriber.hpp +++ b/include/redox/subscriber.hpp @@ -163,9 +163,7 @@ private: // CVs to wait for unsubscriptions std::condition_variable cv_unsub_; - std::mutex cv_unsub_guard_; std::condition_variable cv_punsub_; - std::mutex cv_punsub_guard_; // Pending subscriptions std::atomic_int num_pending_subs_ = {0}; diff --git a/src/client.cpp b/src/client.cpp index 7d652d0..154f218 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -49,11 +49,16 @@ bool Redox::connect(const string &host, const int port, // Block until connected and running the event loop, or until // a connection error happens and the event loop exits - unique_lock ul(running_waiter_lock_); - running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); + { + unique_lock ul(running_lock_); + running_waiter_.wait(ul, [this] { + lock_guard lg(connect_lock_); + return running_ || connect_state_ == CONNECT_ERROR; + }); + } // Return if succeeded - return connect_state_ == CONNECTED; + return getConnectState() == CONNECTED; } bool Redox::connectUnix(const string &path, function connection_callback) { @@ -74,11 +79,16 @@ bool Redox::connectUnix(const string &path, function connection_callb // Block until connected and running the event loop, or until // a connection error happens and the event loop exits - unique_lock ul(running_waiter_lock_); - running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); + { + unique_lock ul(running_lock_); + running_waiter_.wait(ul, [this] { + lock_guard lg(connect_lock_); + return running_ || connect_state_ == CONNECT_ERROR; + }); + } // Return if succeeded - return connect_state_ == CONNECTED; + return getConnectState() == CONNECTED; } void Redox::disconnect() { @@ -93,14 +103,14 @@ void Redox::stop() { } void Redox::wait() { - unique_lock ul(exit_waiter_lock_); - exit_waiter_.wait(ul, [this] { return exited_.load(); }); + unique_lock ul(exit_lock_); + exit_waiter_.wait(ul, [this] { return exited_; }); } Redox::~Redox() { // Bring down the event loop - if (running_ == true) { + if (getRunning()) { stop(); } @@ -112,24 +122,23 @@ Redox::~Redox() { } void Redox::connectedCallback(const redisAsyncContext *ctx, int status) { - Redox *rdx = (Redox *)ctx->data; if (status != REDIS_OK) { rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr; rdx->logger_.fatal() << "Status: " << status; - rdx->connect_state_ = CONNECT_ERROR; + rdx->setConnectState(CONNECT_ERROR); } else { + rdx->logger_.info() << "Connected to Redis."; // Disable hiredis automatically freeing reply objects ctx->c.reader->fn->freeObject = [](void *reply) {}; - rdx->connect_state_ = CONNECTED; - rdx->logger_.info() << "Connected to Redis."; + rdx->setConnectState(CONNECTED); } - rdx->connect_waiter_.notify_all(); - if (rdx->user_connection_callback_) - rdx->user_connection_callback_(rdx->connect_state_); + if (rdx->user_connection_callback_) { + rdx->user_connection_callback_(rdx->getConnectState()); + } } void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) { @@ -138,16 +147,16 @@ void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) { if (status != REDIS_OK) { rdx->logger_.error() << "Disconnected from Redis on error: " << ctx->errstr; - rdx->connect_state_ = DISCONNECT_ERROR; + rdx->setConnectState(DISCONNECT_ERROR); } else { rdx->logger_.info() << "Disconnected from Redis as planned."; - rdx->connect_state_ = DISCONNECTED; + rdx->setConnectState(DISCONNECTED); } rdx->stop(); - rdx->connect_waiter_.notify_all(); - if (rdx->user_connection_callback_) - rdx->user_connection_callback_(rdx->connect_state_); + if (rdx->user_connection_callback_) { + rdx->user_connection_callback_(rdx->getConnectState()); + } } bool Redox::initEv() { @@ -155,8 +164,7 @@ bool Redox::initEv() { evloop_ = ev_loop_new(EVFLAG_AUTO); if (evloop_ == nullptr) { logger_.fatal() << "Could not create a libev event loop."; - connect_state_ = INIT_ERROR; - connect_waiter_.notify_all(); + setConnectState(INIT_ERROR); return false; } ev_set_userdata(evloop_, (void *)this); // Back-reference @@ -169,31 +177,27 @@ bool Redox::initHiredis() { if (ctx_->err) { logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr; - connect_state_ = INIT_ERROR; - connect_waiter_.notify_all(); + setConnectState(INIT_ERROR); return false; } // Attach event loop to hiredis if (redisLibevAttach(evloop_, ctx_) != REDIS_OK) { logger_.fatal() << "Could not attach libev event loop to hiredis."; - connect_state_ = INIT_ERROR; - connect_waiter_.notify_all(); + setConnectState(INIT_ERROR); return false; } // Set the callbacks to be invoked on server connection/disconnection if (redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) { logger_.fatal() << "Could not attach connect callback to hiredis."; - connect_state_ = INIT_ERROR; - connect_waiter_.notify_all(); + setConnectState(INIT_ERROR); return false; } if (redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) { logger_.fatal() << "Could not attach disconnect callback to hiredis."; - connect_state_ = INIT_ERROR; - connect_waiter_.notify_all(); + setConnectState(INIT_ERROR); return false; } @@ -212,6 +216,43 @@ void breakEventLoop(struct ev_loop *loop, ev_async *async, int revents) { ev_break(loop, EVBREAK_ALL); } +int Redox::getConnectState() { + lock_guard lk(connect_lock_); + return connect_state_; +} + +void Redox::setConnectState(int connect_state) { + { + lock_guard lk(connect_lock_); + connect_state_ = connect_state; + } + connect_waiter_.notify_all(); +} + +int Redox::getRunning() { + lock_guard lg(running_lock_); + return running_; +} +void Redox::setRunning(bool running) { + { + lock_guard lg(running_lock_); + running_ = running; + } + running_waiter_.notify_one(); +} + +int Redox::getExited() { + lock_guard lg(exit_lock_); + return exited_; +} +void Redox::setExited(bool exited) { + { + lock_guard lg(exit_lock_); + exited_ = exited; + } + exit_waiter_.notify_one(); +} + void Redox::runEventLoop() { // Events to connect to Redox @@ -219,16 +260,17 @@ void Redox::runEventLoop() { ev_run(evloop_, EVRUN_NOWAIT); // Block until connected to Redis, or error - unique_lock ul(connect_lock_); - connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; }); - - // Handle connection error - if (connect_state_ != CONNECTED) { - logger_.warning() << "Did not connect, event loop exiting."; - exited_ = true; - running_ = false; - running_waiter_.notify_one(); - return; + { + unique_lock ul(connect_lock_); + connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; }); + + // Handle connection error + if (connect_state_ != CONNECTED) { + logger_.warning() << "Did not connect, event loop exiting."; + setExited(true); + setRunning(false); + return; + } } // Set up asynchronous watcher which we signal every @@ -245,8 +287,7 @@ void Redox::runEventLoop() { ev_async_init(&watcher_free_, freeQueuedCommands); ev_async_start(evloop_, &watcher_free_); - running_ = true; - running_waiter_.notify_one(); + setRunning(true); // Run the event loop, using NOWAIT if enabled for maximum // throughput by avoiding any sleeping @@ -266,23 +307,24 @@ void Redox::runEventLoop() { // Wait to receive server replies for clean hiredis disconnect this_thread::sleep_for(chrono::milliseconds(10)); ev_run(evloop_, EVRUN_NOWAIT); - - if (connect_state_ == CONNECTED) + + if (getConnectState() == CONNECTED) { redisAsyncDisconnect(ctx_); + } // Run once more to disconnect ev_run(evloop_, EVRUN_NOWAIT); - if (commands_created_ != commands_deleted_) { - logger_.error() << "All commands were not freed! " << commands_deleted_ << "/" - << commands_created_; + long created = commands_created_; + long deleted = commands_deleted_; + if (created != deleted) { + logger_.error() << "All commands were not freed! " << deleted << "/" + << created; } - exited_ = true; - running_ = false; - // Let go for block_until_stopped method - exit_waiter_.notify_one(); + setExited(true); + setRunning(false); logger_.info() << "Event thread exited."; } @@ -458,6 +500,7 @@ template long Redox::freeAllCommandsOfType() { lock_guard lg(free_queue_guard_); lock_guard lg2(queue_guard_); + lock_guard lg3(command_map_guard_); auto &command_map = getCommandMap(); long len = command_map.size(); diff --git a/src/command.cpp b/src/command.cpp index d7d39a1..95907b8 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -64,7 +64,10 @@ template void Command::processReply(redisReply *r) { pending_--; - waiting_done_ = true; + { + unique_lock lk(waiter_lock_); + waiting_done_ = true; + } waiter_.notify_all(); // Always free the reply object for repeating commands diff --git a/src/subscriber.cpp b/src/subscriber.cpp index c650d58..47815d6 100644 --- a/src/subscriber.cpp +++ b/src/subscriber.cpp @@ -54,17 +54,19 @@ void Subscriber::stop() { for (const string &topic : psubscribedTopics()) punsubscribe(topic); - unique_lock ul(cv_unsub_guard_); - cv_unsub_.wait(ul, [this] { - lock_guard lg(subscribed_topics_guard_); - return (subscribed_topics_.size() == 0); - }); + { + unique_lock ul(subscribed_topics_guard_); + cv_unsub_.wait(ul, [this] { + return (subscribed_topics_.size() == 0); + }); + } - unique_lock ul2(cv_punsub_guard_); - cv_punsub_.wait(ul, [this] { - lock_guard lg(subscribed_topics_guard_); - return (psubscribed_topics_.size() == 0); - }); + { + unique_lock ul(psubscribed_topics_guard_); + cv_punsub_.wait(ul, [this] { + return (psubscribed_topics_.size() == 0); + }); + } for (Command *c : commands_) c->free(); @@ -116,25 +118,27 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic, if ((reply->type == REDIS_REPLY_ARRAY) && (reply->element[reply->elements - 1]->type == REDIS_REPLY_INTEGER)) { - lock_guard lg(subscribed_topics_guard_); - lock_guard lg2(psubscribed_topics_guard_); if (!strncmp(reply->element[0]->str, "sub", 3)) { + lock_guard lg(subscribed_topics_guard_); subscribed_topics_.insert(topic); num_pending_subs_--; if (sub_callback) sub_callback(topic); } else if (!strncmp(reply->element[0]->str, "psub", 4)) { + lock_guard lg(psubscribed_topics_guard_); psubscribed_topics_.insert(topic); num_pending_subs_--; if (sub_callback) sub_callback(topic); } else if (!strncmp(reply->element[0]->str, "uns", 3)) { + lock_guard lg(subscribed_topics_guard_); subscribed_topics_.erase(topic); if (unsub_callback) unsub_callback(topic); cv_unsub_.notify_all(); } else if (!strncmp(reply->element[0]->str, "puns", 4)) { + lock_guard lg(psubscribed_topics_guard_); psubscribed_topics_.erase(topic); if (unsub_callback) unsub_callback(topic); diff --git a/test/test.cpp b/test/test.cpp index c457641..cbafca5 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -240,6 +240,64 @@ TEST_F(RedoxTest, GetSetSyncError) { rdx.disconnect(); } +TEST_F(RedoxTest, MultithreadedCRUD) { + connect(); + int create_count(0); + int delete_count(0); + int createExcCount(0); + int deleteExcCount(0); + + std::mutex startMutex; + bool start = false; + std::condition_variable start_cv; + const int count = 10000; + + std::thread create_thread([&]() { + { + std::unique_lock lock(startMutex); + start_cv.wait(lock, [&]() { return start; }); + } + for (int i = 0; i < count; ++i) { + try { + rdx.commandSync({"SET", "redox_test:mt", "create"}); + } + catch (...) { + createExcCount++; + } + create_count++; + } + }); + + std::thread delete_thread([&]() { + { + std::unique_lock lock(startMutex); + start_cv.wait(lock, [&]() { return start; }); + } + for (int i = 0; i < count; ++i) { + try { + rdx.commandSync({"DEL", "redox_test:mt"}); + } + catch (...) { + deleteExcCount++; + } + delete_count++; + } + }); + + // Start threads + { + std::lock_guard lock(startMutex); + start = true; + } + start_cv.notify_all(); + + // Wait for threads to finish + create_thread.join(); + delete_thread.join(); + EXPECT_EQ(count, create_count); + EXPECT_EQ(count, delete_count); +} + // ------------------------------------------- // End tests // -------------------------------------------