diff --git a/examples/speed_test_async.cpp b/examples/speed_test_async.cpp index 9de6e92..c17a66a 100644 --- a/examples/speed_test_async.cpp +++ b/examples/speed_test_async.cpp @@ -57,7 +57,6 @@ int main(int argc, char* argv[]) { long final_count = stol(rdx.get("simple_loop:count")); - double t_elapsed = time_s() - t0; double actual_freq = (double)count / t_elapsed; diff --git a/src/command.cpp b/src/command.cpp index 3736b5a..5363bb1 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -26,14 +26,10 @@ Command::Command( } template -Command& Command::block() { - std::unique_lock lk(blocker_lock_); - blocker_.wait(lk, [this]() { - logger_.info() << "checking blocker: " << blocking_done_; - return blocking_done_.load(); }); - logger_.info() << "returning from block"; - blocking_done_ = {false}; - return *this; +void Command::wait() { + std::unique_lock lk(waiter_lock_); + waiter_.wait(lk, [this]() { return waiting_done_.load(); }); + waiting_done_ = {false}; } template @@ -44,16 +40,14 @@ void Command::processReply(redisReply* r) { reply_obj_ = r; parseReplyObject(); invoke(); -// logger_.info() << "reply status " << reply_status_; + pending_--; - blocking_done_ = true; -// logger_.info() << "notifying blocker"; - blocker_.notify_all(); + waiting_done_ = true; + waiter_.notify_all(); // Allow free() method to free memory if (!free_memory_) { -// logger.trace() << "Command memory not being freed, free_memory = " << free_memory; free_guard_.unlock(); return; } @@ -105,15 +99,19 @@ void Command::freeReply() { template void Command::freeCommand(Command* c) { c->rdx_->template remove_active_command(c->id_); -// logger.debug() << "Deleted Command " << c->id << " at " << c; delete c; } - +/** +* Create a copy of the reply and return it. Use a guard +* to make sure we don't return a reply while it is being +* modified. +*/ template -const ReplyT& Command::reply() const { +ReplyT Command::reply() { + std::lock_guard lg(free_guard_); if (!ok()) { - logger_.warning() << cmd_ << ": Accessing value of reply with status != OK."; + logger_.warning() << cmd_ << ": Accessing reply value while status != OK."; } return reply_val_; } diff --git a/src/command.hpp b/src/command.hpp index 321f507..105a773 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -57,7 +57,7 @@ public: * to block(). If it is the first call, then returns once the callback * is invoked for the first time. */ - Command& block(); + void wait(); /** * Returns true if the command has been canceled. @@ -77,7 +77,7 @@ public: /** * Returns the reply value, if the reply was successful (ok() == true). */ - const ReplyT& reply() const; + ReplyT reply(); const std::string& cmd() const { return cmd_; }; @@ -130,9 +130,8 @@ private: const std::function&)> callback_; // Place to store the reply value and status. - // ONLY for blocking commands ReplyT reply_val_; - int reply_status_; + std::atomic_int reply_status_; // How many messages sent to server but not received reply std::atomic_int pending_ = {0}; @@ -148,13 +147,19 @@ private: std::mutex free_guard_; // For synchronous use - std::condition_variable blocker_; - std::mutex blocker_lock_; - std::atomic_bool blocking_done_ = {false}; + std::condition_variable waiter_; + std::mutex waiter_lock_; + std::atomic_bool waiting_done_ = {false}; // Passed on from Redox class log::Logger& logger_; + // Explicitly delete copy constructor and assignment operator, + // Command objects should never be copied because they hold + // state with a network resource. + Command(const Command&) = delete; + Command& operator=(const Command&) = delete; + friend class Redox; }; diff --git a/src/redox.hpp b/src/redox.hpp index de96811..16fc8a4 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -449,21 +449,8 @@ Command& Redox::command_looping( template Command& Redox::command_blocking(const std::string& cmd) { - - std::condition_variable cv; - std::mutex m; - std::unique_lock lk(m); - std::atomic_bool done = {false}; - - Command& c = createCommand(cmd, - [&cv, &done](Command& cmd_obj) { - done = true; - cv.notify_one(); - }, - 0, 0, false // No repeats, don't free memory - ); - - cv.wait(lk, [&done]() { return done.load(); }); + auto& c = createCommand(cmd, nullptr, 0, 0, false); + c.wait(); return c; }