Commit 3e1fa843e601bb81affee2032e3561f4ddc1f70e

Authored by Hayk Martirosyan
1 parent 2e3a71b8

Cleanup of synchronization fixes

* Use lock_guard instead of unique_lock when possible
* Make a few types non-atomic since they are already guarded by a mutex
* Create setVariable helpers that lock and notify
include/redox/client.hpp
... ... @@ -309,12 +309,20 @@ private:
309 309 // Helper function for freeAllCommands to access a specific command map
310 310 template <class ReplyT> long freeAllCommandsOfType();
311 311  
  312 + // Helper functions to get/set variables with synchronization.
  313 + int getConnectState();
  314 + void setConnectState(int connect_state);
  315 + int getRunning();
  316 + void setRunning(bool running);
  317 + int getExited();
  318 + void setExited(bool exited);
  319 +
312 320 // ------------------------------------------------
313 321 // Private members
314 322 // ------------------------------------------------
315 323  
316 324 // Manage connection state
317   - std::atomic_int connect_state_ = {NOT_YET_CONNECTED};
  325 + int connect_state_ = NOT_YET_CONNECTED;
318 326 std::mutex connect_lock_;
319 327 std::condition_variable connect_waiter_;
320 328  
... ... @@ -340,13 +348,13 @@ private:
340 348 std::thread event_loop_thread_;
341 349  
342 350 // Variable and CV to know when the event loop starts running
343   - std::atomic_bool running_ = {false};
  351 + bool running_ = false;
344 352 std::mutex running_lock_;
345 353 std::condition_variable running_waiter_;
346 354  
347 355 // Variable and CV to know when the event loop stops running
348 356 std::atomic_bool to_exit_ = {false}; // Signal to exit
349   - std::atomic_bool exited_ = {false}; // Event thread exited
  357 + bool exited_ = false; // Event thread exited
350 358 std::mutex exit_lock_;
351 359 std::condition_variable exit_waiter_;
352 360  
... ...
src/client.cpp
... ... @@ -51,18 +51,14 @@ bool Redox::connect(const string &amp;host, const int port,
51 51 // a connection error happens and the event loop exits
52 52 {
53 53 unique_lock<mutex> ul(running_lock_);
54   - running_waiter_.wait(ul, [this]
55   - {
56   - unique_lock<mutex>(connect_lock_);
57   - return running_.load() || connect_state_ == CONNECT_ERROR;
  54 + running_waiter_.wait(ul, [this] {
  55 + lock_guard<mutex> lg(connect_lock_);
  56 + return running_ || connect_state_ == CONNECT_ERROR;
58 57 });
59 58 }
60 59  
61 60 // Return if succeeded
62   - {
63   - unique_lock<mutex> ul(connect_lock_);
64   - return connect_state_ == CONNECTED;
65   - }
  61 + return getConnectState() == CONNECTED;
66 62 }
67 63  
68 64 bool Redox::connectUnix(const string &path, function<void(int)> connection_callback) {
... ... @@ -85,18 +81,14 @@ bool Redox::connectUnix(const string &amp;path, function&lt;void(int)&gt; connection_callb
85 81 // a connection error happens and the event loop exits
86 82 {
87 83 unique_lock<mutex> ul(running_lock_);
88   - running_waiter_.wait(ul, [this]
89   - {
90   - unique_lock<mutex> ul(connect_lock_);
91   - return running_.load() || connect_state_ == CONNECT_ERROR;
  84 + running_waiter_.wait(ul, [this] {
  85 + lock_guard<mutex> lg(connect_lock_);
  86 + return running_ || connect_state_ == CONNECT_ERROR;
92 87 });
93 88 }
94 89  
95 90 // Return if succeeded
96   - {
97   - unique_lock<mutex> ul(connect_lock_);
98   - return connect_state_ == CONNECTED;
99   - }
  91 + return getConnectState() == CONNECTED;
100 92 }
101 93  
102 94 void Redox::disconnect() {
... ... @@ -112,13 +104,13 @@ void Redox::stop() {
112 104  
113 105 void Redox::wait() {
114 106 unique_lock<mutex> ul(exit_lock_);
115   - exit_waiter_.wait(ul, [this] { return exited_.load(); });
  107 + exit_waiter_.wait(ul, [this] { return exited_; });
116 108 }
117 109  
118 110 Redox::~Redox() {
119 111  
120 112 // Bring down the event loop
121   - if (running_ == true) {
  113 + if (getRunning()) {
122 114 stop();
123 115 }
124 116  
... ... @@ -135,25 +127,18 @@ void Redox::connectedCallback(const redisAsyncContext *ctx, int status) {
135 127 if (status != REDIS_OK) {
136 128 rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr;
137 129 rdx->logger_.fatal() << "Status: " << status;
138   - unique_lock<mutex> lk(rdx->connect_lock_);
139   - rdx->connect_state_ = CONNECT_ERROR;
  130 + rdx->setConnectState(CONNECT_ERROR);
140 131  
141 132 } else {
142 133 rdx->logger_.info() << "Connected to Redis.";
143   - unique_lock<mutex> lk(rdx->connect_lock_);
144 134 // Disable hiredis automatically freeing reply objects
145 135 ctx->c.reader->fn->freeObject = [](void *reply) {};
146   - rdx->connect_state_ = CONNECTED;
  136 + rdx->setConnectState(CONNECTED);
147 137 }
148 138  
149   - int state;
150   - {
151   - unique_lock<mutex> lk(rdx->connect_lock_);
152   - state = rdx->connect_state_;
  139 + if (rdx->user_connection_callback_) {
  140 + rdx->user_connection_callback_(rdx->getConnectState());
153 141 }
154   - rdx->connect_waiter_.notify_all();
155   - if (rdx->user_connection_callback_)
156   - rdx->user_connection_callback_(state);
157 142 }
158 143  
159 144 void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) {
... ... @@ -162,23 +147,16 @@ void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) {
162 147  
163 148 if (status != REDIS_OK) {
164 149 rdx->logger_.error() << "Disconnected from Redis on error: " << ctx->errstr;
165   - unique_lock<mutex> lk(rdx->connect_lock_);
166   - rdx->connect_state_ = DISCONNECT_ERROR;
  150 + rdx->setConnectState(DISCONNECT_ERROR);
167 151 } else {
168 152 rdx->logger_.info() << "Disconnected from Redis as planned.";
169   - unique_lock<mutex> lk(rdx->connect_lock_);
170   - rdx->connect_state_ = DISCONNECTED;
  153 + rdx->setConnectState(DISCONNECTED);
171 154 }
172 155  
173 156 rdx->stop();
174   - int state;
175   - {
176   - unique_lock<mutex> lk(rdx->connect_lock_);
177   - state = rdx->connect_state_;
  157 + if (rdx->user_connection_callback_) {
  158 + rdx->user_connection_callback_(rdx->getConnectState());
178 159 }
179   - rdx->connect_waiter_.notify_all();
180   - if (rdx->user_connection_callback_)
181   - rdx->user_connection_callback_(state);
182 160 }
183 161  
184 162 bool Redox::initEv() {
... ... @@ -186,11 +164,7 @@ bool Redox::initEv() {
186 164 evloop_ = ev_loop_new(EVFLAG_AUTO);
187 165 if (evloop_ == nullptr) {
188 166 logger_.fatal() << "Could not create a libev event loop.";
189   - {
190   - unique_lock<mutex> lk(connect_lock_);
191   - connect_state_ = INIT_ERROR;
192   - }
193   - connect_waiter_.notify_all();
  167 + setConnectState(INIT_ERROR);
194 168 return false;
195 169 }
196 170 ev_set_userdata(evloop_, (void *)this); // Back-reference
... ... @@ -203,43 +177,27 @@ bool Redox::initHiredis() {
203 177  
204 178 if (ctx_->err) {
205 179 logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr;
206   - {
207   - unique_lock<mutex> lk(connect_lock_);
208   - connect_state_ = INIT_ERROR;
209   - }
210   - connect_waiter_.notify_all();
  180 + setConnectState(INIT_ERROR);
211 181 return false;
212 182 }
213 183  
214 184 // Attach event loop to hiredis
215 185 if (redisLibevAttach(evloop_, ctx_) != REDIS_OK) {
216 186 logger_.fatal() << "Could not attach libev event loop to hiredis.";
217   - {
218   - unique_lock<mutex> lk(connect_lock_);
219   - connect_state_ = INIT_ERROR;
220   - }
221   - connect_waiter_.notify_all();
  187 + setConnectState(INIT_ERROR);
222 188 return false;
223 189 }
224 190  
225 191 // Set the callbacks to be invoked on server connection/disconnection
226 192 if (redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) {
227 193 logger_.fatal() << "Could not attach connect callback to hiredis.";
228   - {
229   - unique_lock<mutex> lk(connect_lock_);
230   - connect_state_ = INIT_ERROR;
231   - }
232   - connect_waiter_.notify_all();
  194 + setConnectState(INIT_ERROR);
233 195 return false;
234 196 }
235 197  
236 198 if (redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) {
237 199 logger_.fatal() << "Could not attach disconnect callback to hiredis.";
238   - {
239   - unique_lock<mutex> lk(connect_lock_);
240   - connect_state_ = INIT_ERROR;
241   - }
242   - connect_waiter_.notify_all();
  200 + setConnectState(INIT_ERROR);
243 201 return false;
244 202 }
245 203  
... ... @@ -258,6 +216,43 @@ void breakEventLoop(struct ev_loop *loop, ev_async *async, int revents) {
258 216 ev_break(loop, EVBREAK_ALL);
259 217 }
260 218  
  219 +int Redox::getConnectState() {
  220 + lock_guard<mutex> lk(connect_lock_);
  221 + return connect_state_;
  222 +}
  223 +
  224 +void Redox::setConnectState(int connect_state) {
  225 + {
  226 + lock_guard<mutex> lk(connect_lock_);
  227 + connect_state_ = connect_state;
  228 + }
  229 + connect_waiter_.notify_all();
  230 +}
  231 +
  232 +int Redox::getRunning() {
  233 + lock_guard<mutex> lg(running_lock_);
  234 + return running_;
  235 +}
  236 +void Redox::setRunning(bool running) {
  237 + {
  238 + lock_guard<mutex> lg(running_lock_);
  239 + running_ = running;
  240 + }
  241 + running_waiter_.notify_one();
  242 +}
  243 +
  244 +int Redox::getExited() {
  245 + lock_guard<mutex> lg(exit_lock_);
  246 + return exited_;
  247 +}
  248 +void Redox::setExited(bool exited) {
  249 + {
  250 + lock_guard<mutex> lg(exit_lock_);
  251 + exited_ = exited;
  252 + }
  253 + exit_waiter_.notify_one();
  254 +}
  255 +
261 256 void Redox::runEventLoop() {
262 257  
263 258 // Events to connect to Redox
... ... @@ -272,18 +267,8 @@ void Redox::runEventLoop() {
272 267 // Handle connection error
273 268 if (connect_state_ != CONNECTED) {
274 269 logger_.warning() << "Did not connect, event loop exiting.";
275   - {
276   - unique_lock<mutex> ul(exit_lock_);
277   - exited_ = true;
278   - }
279   -
280   - {
281   - unique_lock<mutex> ul(running_lock_);
282   - running_ = false;
283   - }
284   -
285   - running_waiter_.notify_one();
286   - exit_waiter_.notify_one();
  270 + setExited(true);
  271 + setRunning(false);
287 272 return;
288 273 }
289 274 }
... ... @@ -302,11 +287,7 @@ void Redox::runEventLoop() {
302 287 ev_async_init(&watcher_free_, freeQueuedCommands);
303 288 ev_async_start(evloop_, &watcher_free_);
304 289  
305   - {
306   - unique_lock<mutex> ul(running_lock_);
307   - running_ = true;
308   - running_waiter_.notify_one();
309   - }
  290 + setRunning(true);
310 291  
311 292 // Run the event loop, using NOWAIT if enabled for maximum
312 293 // throughput by avoiding any sleeping
... ... @@ -326,15 +307,10 @@ void Redox::runEventLoop() {
326 307 // Wait to receive server replies for clean hiredis disconnect
327 308 this_thread::sleep_for(chrono::milliseconds(10));
328 309 ev_run(evloop_, EVRUN_NOWAIT);
329   -
330   - int state;
331   - {
332   - unique_lock<mutex> ul(connect_lock_);
333   - state = connect_state_;
334   - }
335 310  
336   - if (connect_state_ == CONNECTED)
  311 + if (getConnectState() == CONNECTED) {
337 312 redisAsyncDisconnect(ctx_);
  313 + }
338 314  
339 315 // Run once more to disconnect
340 316 ev_run(evloop_, EVRUN_NOWAIT);
... ... @@ -346,19 +322,9 @@ void Redox::runEventLoop() {
346 322 << created;
347 323 }
348 324  
349   - {
350   - unique_lock<mutex> ul(exit_lock_);
351   - exited_ = true;
352   - }
353   -
354   - {
355   - unique_lock<mutex> ul(running_lock_);
356   - running_ = false;
357   - }
358   -
359 325 // Let go for block_until_stopped method
360   - running_waiter_.notify_one();
361   - exit_waiter_.notify_one();
  326 + setExited(true);
  327 + setRunning(false);
362 328  
363 329 logger_.info() << "Event thread exited.";
364 330 }
... ...