Commit 9abc01fb5047b8e2f8b45c913b94008deef662e9

Authored by Hayk Martirosyan
1 parent 589eb3fc

Use cmd.block() in command_blocking

Also make cmd.reply() copy the value and use a mutex to make it thread
safe. No noticable speed hits, probably thanks to RVO.
examples/speed_test_async.cpp
... ... @@ -57,7 +57,6 @@ int main(int argc, char* argv[]) {
57 57  
58 58 long final_count = stol(rdx.get("simple_loop:count"));
59 59  
60   -
61 60 double t_elapsed = time_s() - t0;
62 61 double actual_freq = (double)count / t_elapsed;
63 62  
... ...
src/command.cpp
... ... @@ -26,14 +26,10 @@ Command<ReplyT>::Command(
26 26 }
27 27  
28 28 template<class ReplyT>
29   -Command<ReplyT>& Command<ReplyT>::block() {
30   - std::unique_lock<std::mutex> lk(blocker_lock_);
31   - blocker_.wait(lk, [this]() {
32   - logger_.info() << "checking blocker: " << blocking_done_;
33   - return blocking_done_.load(); });
34   - logger_.info() << "returning from block";
35   - blocking_done_ = {false};
36   - return *this;
  29 +void Command<ReplyT>::wait() {
  30 + std::unique_lock<std::mutex> lk(waiter_lock_);
  31 + waiter_.wait(lk, [this]() { return waiting_done_.load(); });
  32 + waiting_done_ = {false};
37 33 }
38 34  
39 35 template<class ReplyT>
... ... @@ -44,16 +40,14 @@ void Command&lt;ReplyT&gt;::processReply(redisReply* r) {
44 40 reply_obj_ = r;
45 41 parseReplyObject();
46 42 invoke();
47   -// logger_.info() << "reply status " << reply_status_;
  43 +
48 44 pending_--;
49 45  
50   - blocking_done_ = true;
51   -// logger_.info() << "notifying blocker";
52   - blocker_.notify_all();
  46 + waiting_done_ = true;
  47 + waiter_.notify_all();
53 48  
54 49 // Allow free() method to free memory
55 50 if (!free_memory_) {
56   -// logger.trace() << "Command memory not being freed, free_memory = " << free_memory;
57 51 free_guard_.unlock();
58 52 return;
59 53 }
... ... @@ -105,15 +99,19 @@ void Command&lt;ReplyT&gt;::freeReply() {
105 99 template<class ReplyT>
106 100 void Command<ReplyT>::freeCommand(Command<ReplyT>* c) {
107 101 c->rdx_->template remove_active_command<ReplyT>(c->id_);
108   -// logger.debug() << "Deleted Command " << c->id << " at " << c;
109 102 delete c;
110 103 }
111 104  
112   -
  105 +/**
  106 +* Create a copy of the reply and return it. Use a guard
  107 +* to make sure we don't return a reply while it is being
  108 +* modified.
  109 +*/
113 110 template<class ReplyT>
114   -const ReplyT& Command<ReplyT>::reply() const {
  111 +ReplyT Command<ReplyT>::reply() {
  112 + std::lock_guard<std::mutex> lg(free_guard_);
115 113 if (!ok()) {
116   - logger_.warning() << cmd_ << ": Accessing value of reply with status != OK.";
  114 + logger_.warning() << cmd_ << ": Accessing reply value while status != OK.";
117 115 }
118 116 return reply_val_;
119 117 }
... ...
src/command.hpp
... ... @@ -57,7 +57,7 @@ public:
57 57 * to block(). If it is the first call, then returns once the callback
58 58 * is invoked for the first time.
59 59 */
60   - Command<ReplyT>& block();
  60 + void wait();
61 61  
62 62 /**
63 63 * Returns true if the command has been canceled.
... ... @@ -77,7 +77,7 @@ public:
77 77 /**
78 78 * Returns the reply value, if the reply was successful (ok() == true).
79 79 */
80   - const ReplyT& reply() const;
  80 + ReplyT reply();
81 81  
82 82 const std::string& cmd() const { return cmd_; };
83 83  
... ... @@ -130,9 +130,8 @@ private:
130 130 const std::function<void(Command<ReplyT>&)> callback_;
131 131  
132 132 // Place to store the reply value and status.
133   - // ONLY for blocking commands
134 133 ReplyT reply_val_;
135   - int reply_status_;
  134 + std::atomic_int reply_status_;
136 135  
137 136 // How many messages sent to server but not received reply
138 137 std::atomic_int pending_ = {0};
... ... @@ -148,13 +147,19 @@ private:
148 147 std::mutex free_guard_;
149 148  
150 149 // For synchronous use
151   - std::condition_variable blocker_;
152   - std::mutex blocker_lock_;
153   - std::atomic_bool blocking_done_ = {false};
  150 + std::condition_variable waiter_;
  151 + std::mutex waiter_lock_;
  152 + std::atomic_bool waiting_done_ = {false};
154 153  
155 154 // Passed on from Redox class
156 155 log::Logger& logger_;
157 156  
  157 + // Explicitly delete copy constructor and assignment operator,
  158 + // Command objects should never be copied because they hold
  159 + // state with a network resource.
  160 + Command(const Command&) = delete;
  161 + Command& operator=(const Command&) = delete;
  162 +
158 163 friend class Redox;
159 164 };
160 165  
... ...
src/redox.hpp
... ... @@ -449,21 +449,8 @@ Command&lt;ReplyT&gt;&amp; Redox::command_looping(
449 449  
450 450 template<class ReplyT>
451 451 Command<ReplyT>& Redox::command_blocking(const std::string& cmd) {
452   -
453   - std::condition_variable cv;
454   - std::mutex m;
455   - std::unique_lock<std::mutex> lk(m);
456   - std::atomic_bool done = {false};
457   -
458   - Command<ReplyT>& c = createCommand<ReplyT>(cmd,
459   - [&cv, &done](Command<ReplyT>& cmd_obj) {
460   - done = true;
461   - cv.notify_one();
462   - },
463   - 0, 0, false // No repeats, don't free memory
464   - );
465   -
466   - cv.wait(lk, [&done]() { return done.load(); });
  452 + auto& c = createCommand<ReplyT>(cmd, nullptr, 0, 0, false);
  453 + c.wait();
467 454 return c;
468 455 }
469 456  
... ...