Commit 91e3e06c9029cef9afe1585d6f4e5d12144acbd4

Authored by Collin Hockey
1 parent 29b48d3d

More locking fixes for client.cpp

Very similar to the fixes for Command and Subscriber, adds locking
around booleans used as flags for condition variables.
include/redox/client.hpp
@@ -341,13 +341,13 @@ private: @@ -341,13 +341,13 @@ private:
341 341
342 // Variable and CV to know when the event loop starts running 342 // Variable and CV to know when the event loop starts running
343 std::atomic_bool running_ = {false}; 343 std::atomic_bool running_ = {false};
344 - std::mutex running_waiter_lock_; 344 + std::mutex running_lock_;
345 std::condition_variable running_waiter_; 345 std::condition_variable running_waiter_;
346 346
347 // Variable and CV to know when the event loop stops running 347 // Variable and CV to know when the event loop stops running
348 std::atomic_bool to_exit_ = {false}; // Signal to exit 348 std::atomic_bool to_exit_ = {false}; // Signal to exit
349 std::atomic_bool exited_ = {false}; // Event thread exited 349 std::atomic_bool exited_ = {false}; // Event thread exited
350 - std::mutex exit_waiter_lock_; 350 + std::mutex exit_lock_;
351 std::condition_variable exit_waiter_; 351 std::condition_variable exit_waiter_;
352 352
353 // Maps of each Command, fetchable by the unique ID number 353 // Maps of each Command, fetchable by the unique ID number
src/client.cpp
@@ -49,11 +49,20 @@ bool Redox::connect(const string &host, const int port, @@ -49,11 +49,20 @@ bool Redox::connect(const string &host, const int port,
49 49
50 // Block until connected and running the event loop, or until 50 // Block until connected and running the event loop, or until
51 // a connection error happens and the event loop exits 51 // a connection error happens and the event loop exits
52 - unique_lock<mutex> ul(running_waiter_lock_);  
53 - running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); 52 + {
  53 + unique_lock<mutex> ul(running_lock_);
  54 + running_waiter_.wait(ul, [this]
  55 + {
  56 + unique_lock<mutex>(connect_lock_);
  57 + return running_.load() || connect_state_ == CONNECT_ERROR;
  58 + });
  59 + }
54 60
55 // Return if succeeded 61 // Return if succeeded
56 - return connect_state_ == CONNECTED; 62 + {
  63 + unique_lock<mutex> ul(connect_lock_);
  64 + return connect_state_ == CONNECTED;
  65 + }
57 } 66 }
58 67
59 bool Redox::connectUnix(const string &path, function<void(int)> connection_callback) { 68 bool Redox::connectUnix(const string &path, function<void(int)> connection_callback) {
@@ -74,8 +83,14 @@ bool Redox::connectUnix(const string &amp;path, function&lt;void(int)&gt; connection_callb @@ -74,8 +83,14 @@ bool Redox::connectUnix(const string &amp;path, function&lt;void(int)&gt; connection_callb
74 83
75 // Block until connected and running the event loop, or until 84 // Block until connected and running the event loop, or until
76 // a connection error happens and the event loop exits 85 // a connection error happens and the event loop exits
77 - unique_lock<mutex> ul(running_waiter_lock_);  
78 - running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); 86 + {
  87 + unique_lock<mutex> ul(running_lock_);
  88 + running_waiter_.wait(ul, [this]
  89 + {
  90 + unique_lock<mutex> ul(connect_lock_);
  91 + return running_.load() || connect_state_ == CONNECT_ERROR;
  92 + });
  93 + }
79 94
80 // Return if succeeded 95 // Return if succeeded
81 return connect_state_ == CONNECTED; 96 return connect_state_ == CONNECTED;
@@ -93,7 +108,7 @@ void Redox::stop() { @@ -93,7 +108,7 @@ void Redox::stop() {
93 } 108 }
94 109
95 void Redox::wait() { 110 void Redox::wait() {
96 - unique_lock<mutex> ul(exit_waiter_lock_); 111 + unique_lock<mutex> ul(exit_lock_);
97 exit_waiter_.wait(ul, [this] { return exited_.load(); }); 112 exit_waiter_.wait(ul, [this] { return exited_.load(); });
98 } 113 }
99 114
@@ -118,13 +133,15 @@ void Redox::connectedCallback(const redisAsyncContext *ctx, int status) { @@ -118,13 +133,15 @@ void Redox::connectedCallback(const redisAsyncContext *ctx, int status) {
118 if (status != REDIS_OK) { 133 if (status != REDIS_OK) {
119 rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr; 134 rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr;
120 rdx->logger_.fatal() << "Status: " << status; 135 rdx->logger_.fatal() << "Status: " << status;
  136 + unique_lock<mutex> lk(rdx->connect_lock_);
121 rdx->connect_state_ = CONNECT_ERROR; 137 rdx->connect_state_ = CONNECT_ERROR;
122 138
123 } else { 139 } else {
  140 + rdx->logger_.info() << "Connected to Redis.";
  141 + unique_lock<mutex> lk(rdx->connect_lock_);
124 // Disable hiredis automatically freeing reply objects 142 // Disable hiredis automatically freeing reply objects
125 ctx->c.reader->fn->freeObject = [](void *reply) {}; 143 ctx->c.reader->fn->freeObject = [](void *reply) {};
126 rdx->connect_state_ = CONNECTED; 144 rdx->connect_state_ = CONNECTED;
127 - rdx->logger_.info() << "Connected to Redis.";  
128 } 145 }
129 146
130 rdx->connect_waiter_.notify_all(); 147 rdx->connect_waiter_.notify_all();
@@ -138,9 +155,11 @@ void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) { @@ -138,9 +155,11 @@ void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) {
138 155
139 if (status != REDIS_OK) { 156 if (status != REDIS_OK) {
140 rdx->logger_.error() << "Disconnected from Redis on error: " << ctx->errstr; 157 rdx->logger_.error() << "Disconnected from Redis on error: " << ctx->errstr;
  158 + unique_lock<mutex> lk(rdx->connect_lock_);
141 rdx->connect_state_ = DISCONNECT_ERROR; 159 rdx->connect_state_ = DISCONNECT_ERROR;
142 } else { 160 } else {
143 rdx->logger_.info() << "Disconnected from Redis as planned."; 161 rdx->logger_.info() << "Disconnected from Redis as planned.";
  162 + unique_lock<mutex> lk(rdx->connect_lock_);
144 rdx->connect_state_ = DISCONNECTED; 163 rdx->connect_state_ = DISCONNECTED;
145 } 164 }
146 165
@@ -155,7 +174,10 @@ bool Redox::initEv() { @@ -155,7 +174,10 @@ bool Redox::initEv() {
155 evloop_ = ev_loop_new(EVFLAG_AUTO); 174 evloop_ = ev_loop_new(EVFLAG_AUTO);
156 if (evloop_ == nullptr) { 175 if (evloop_ == nullptr) {
157 logger_.fatal() << "Could not create a libev event loop."; 176 logger_.fatal() << "Could not create a libev event loop.";
158 - connect_state_ = INIT_ERROR; 177 + {
  178 + unique_lock<mutex> lk(connect_lock_);
  179 + connect_state_ = INIT_ERROR;
  180 + }
159 connect_waiter_.notify_all(); 181 connect_waiter_.notify_all();
160 return false; 182 return false;
161 } 183 }
@@ -169,7 +191,10 @@ bool Redox::initHiredis() { @@ -169,7 +191,10 @@ bool Redox::initHiredis() {
169 191
170 if (ctx_->err) { 192 if (ctx_->err) {
171 logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr; 193 logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr;
172 - connect_state_ = INIT_ERROR; 194 + {
  195 + unique_lock<mutex> lk(connect_lock_);
  196 + connect_state_ = INIT_ERROR;
  197 + }
173 connect_waiter_.notify_all(); 198 connect_waiter_.notify_all();
174 return false; 199 return false;
175 } 200 }
@@ -177,7 +202,10 @@ bool Redox::initHiredis() { @@ -177,7 +202,10 @@ bool Redox::initHiredis() {
177 // Attach event loop to hiredis 202 // Attach event loop to hiredis
178 if (redisLibevAttach(evloop_, ctx_) != REDIS_OK) { 203 if (redisLibevAttach(evloop_, ctx_) != REDIS_OK) {
179 logger_.fatal() << "Could not attach libev event loop to hiredis."; 204 logger_.fatal() << "Could not attach libev event loop to hiredis.";
180 - connect_state_ = INIT_ERROR; 205 + {
  206 + unique_lock<mutex> lk(connect_lock_);
  207 + connect_state_ = INIT_ERROR;
  208 + }
181 connect_waiter_.notify_all(); 209 connect_waiter_.notify_all();
182 return false; 210 return false;
183 } 211 }
@@ -185,14 +213,20 @@ bool Redox::initHiredis() { @@ -185,14 +213,20 @@ bool Redox::initHiredis() {
185 // Set the callbacks to be invoked on server connection/disconnection 213 // Set the callbacks to be invoked on server connection/disconnection
186 if (redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) { 214 if (redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) {
187 logger_.fatal() << "Could not attach connect callback to hiredis."; 215 logger_.fatal() << "Could not attach connect callback to hiredis.";
188 - connect_state_ = INIT_ERROR; 216 + {
  217 + unique_lock<mutex> lk(connect_lock_);
  218 + connect_state_ = INIT_ERROR;
  219 + }
189 connect_waiter_.notify_all(); 220 connect_waiter_.notify_all();
190 return false; 221 return false;
191 } 222 }
192 223
193 if (redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) { 224 if (redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) {
194 logger_.fatal() << "Could not attach disconnect callback to hiredis."; 225 logger_.fatal() << "Could not attach disconnect callback to hiredis.";
195 - connect_state_ = INIT_ERROR; 226 + {
  227 + unique_lock<mutex> lk(connect_lock_);
  228 + connect_state_ = INIT_ERROR;
  229 + }
196 connect_waiter_.notify_all(); 230 connect_waiter_.notify_all();
197 return false; 231 return false;
198 } 232 }
@@ -219,16 +253,26 @@ void Redox::runEventLoop() { @@ -219,16 +253,26 @@ void Redox::runEventLoop() {
219 ev_run(evloop_, EVRUN_NOWAIT); 253 ev_run(evloop_, EVRUN_NOWAIT);
220 254
221 // Block until connected to Redis, or error 255 // Block until connected to Redis, or error
222 - unique_lock<mutex> ul(connect_lock_);  
223 - connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; });  
224 -  
225 - // Handle connection error  
226 - if (connect_state_ != CONNECTED) {  
227 - logger_.warning() << "Did not connect, event loop exiting.";  
228 - exited_ = true;  
229 - running_ = false;  
230 - running_waiter_.notify_one();  
231 - return; 256 + {
  257 + unique_lock<mutex> ul(connect_lock_);
  258 + connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; });
  259 +
  260 + // Handle connection error
  261 + if (connect_state_ != CONNECTED) {
  262 + logger_.warning() << "Did not connect, event loop exiting.";
  263 + {
  264 + unique_lock<mutex> ul(exit_lock_);
  265 + exited_ = true;
  266 + }
  267 +
  268 + {
  269 + unique_lock<mutex> ul(running_lock_);
  270 + running_ = false;
  271 + }
  272 +
  273 + running_waiter_.notify_one();
  274 + return;
  275 + }
232 } 276 }
233 277
234 // Set up asynchronous watcher which we signal every 278 // Set up asynchronous watcher which we signal every
@@ -245,8 +289,11 @@ void Redox::runEventLoop() { @@ -245,8 +289,11 @@ void Redox::runEventLoop() {
245 ev_async_init(&watcher_free_, freeQueuedCommands); 289 ev_async_init(&watcher_free_, freeQueuedCommands);
246 ev_async_start(evloop_, &watcher_free_); 290 ev_async_start(evloop_, &watcher_free_);
247 291
248 - running_ = true;  
249 - running_waiter_.notify_one(); 292 + {
  293 + unique_lock<mutex> ul(running_lock_);
  294 + running_ = true;
  295 + running_waiter_.notify_one();
  296 + }
250 297
251 // Run the event loop, using NOWAIT if enabled for maximum 298 // Run the event loop, using NOWAIT if enabled for maximum
252 // throughput by avoiding any sleeping 299 // throughput by avoiding any sleeping
@@ -278,10 +325,18 @@ void Redox::runEventLoop() { @@ -278,10 +325,18 @@ void Redox::runEventLoop() {
278 << commands_created_; 325 << commands_created_;
279 } 326 }
280 327
281 - exited_ = true;  
282 - running_ = false; 328 + {
  329 + unique_lock<mutex> ul(exit_lock_);
  330 + exited_ = true;
  331 + }
  332 +
  333 + {
  334 + unique_lock<mutex> ul(running_lock_);
  335 + running_ = false;
  336 + }
283 337
284 // Let go for block_until_stopped method 338 // Let go for block_until_stopped method
  339 + running_waiter_.notify_one();
285 exit_waiter_.notify_one(); 340 exit_waiter_.notify_one();
286 341
287 logger_.info() << "Event thread exited."; 342 logger_.info() << "Event thread exited.";
@@ -458,6 +513,7 @@ template &lt;class ReplyT&gt; long Redox::freeAllCommandsOfType() { @@ -458,6 +513,7 @@ template &lt;class ReplyT&gt; long Redox::freeAllCommandsOfType() {
458 513
459 lock_guard<mutex> lg(free_queue_guard_); 514 lock_guard<mutex> lg(free_queue_guard_);
460 lock_guard<mutex> lg2(queue_guard_); 515 lock_guard<mutex> lg2(queue_guard_);
  516 + lock_guard<mutex> lg3(command_map_guard_);
461 517
462 auto &command_map = getCommandMap<ReplyT>(); 518 auto &command_map = getCommandMap<ReplyT>();
463 long len = command_map.size(); 519 long len = command_map.size();