Commit 350c0377579ac357cba3cbae5d02b11dd54e9488

Authored by Hayk Martirosyan
2 parents 475171c9 080720ce

Merge pull request #27 from hmartiro/bdallas-master

Fixes for Multi-Threading (bdallas)
CMakeLists.txt
@@ -2,8 +2,8 @@ cmake_minimum_required(VERSION 2.8.4) @@ -2,8 +2,8 @@ cmake_minimum_required(VERSION 2.8.4)
2 project(redox) 2 project(redox)
3 3
4 set(REDOX_VERSION_MAJOR 0) 4 set(REDOX_VERSION_MAJOR 0)
5 -set(REDOX_VERSION_MINOR 2)  
6 -set(REDOX_VERSION_PATCH 3) 5 +set(REDOX_VERSION_MINOR 3)
  6 +set(REDOX_VERSION_PATCH 0)
7 set(REDOX_VERSION_STRING ${REDOX_VERSION_MAJOR}.${REDOX_VERSION_MINOR}.${REDOX_VERSION_PATCH}) 7 set(REDOX_VERSION_STRING ${REDOX_VERSION_MAJOR}.${REDOX_VERSION_MINOR}.${REDOX_VERSION_PATCH})
8 8
9 option(lib "Build Redox as a dynamic library." ON) 9 option(lib "Build Redox as a dynamic library." ON)
HISTORY.md
1 # Release History 1 # Release History
2 2
  3 +## 0.3 (2015-09-28)
  4 +A bunch of multithreading fixes, thanks to @bdallas.
  5 +
3 ## 0.2 (2015-01-31) 6 ## 0.2 (2015-01-31)
4 * Move to vector of strings as input, to handle arbitrary data better and 7 * Move to vector of strings as input, to handle arbitrary data better and
5 improve speed. 8 improve speed.
include/redox/client.hpp
@@ -309,12 +309,20 @@ private: @@ -309,12 +309,20 @@ private:
309 // Helper function for freeAllCommands to access a specific command map 309 // Helper function for freeAllCommands to access a specific command map
310 template <class ReplyT> long freeAllCommandsOfType(); 310 template <class ReplyT> long freeAllCommandsOfType();
311 311
  312 + // Helper functions to get/set variables with synchronization.
  313 + int getConnectState();
  314 + void setConnectState(int connect_state);
  315 + int getRunning();
  316 + void setRunning(bool running);
  317 + int getExited();
  318 + void setExited(bool exited);
  319 +
312 // ------------------------------------------------ 320 // ------------------------------------------------
313 // Private members 321 // Private members
314 // ------------------------------------------------ 322 // ------------------------------------------------
315 323
316 // Manage connection state 324 // Manage connection state
317 - std::atomic_int connect_state_ = {NOT_YET_CONNECTED}; 325 + int connect_state_ = NOT_YET_CONNECTED;
318 std::mutex connect_lock_; 326 std::mutex connect_lock_;
319 std::condition_variable connect_waiter_; 327 std::condition_variable connect_waiter_;
320 328
@@ -340,14 +348,14 @@ private: @@ -340,14 +348,14 @@ private:
340 std::thread event_loop_thread_; 348 std::thread event_loop_thread_;
341 349
342 // Variable and CV to know when the event loop starts running 350 // Variable and CV to know when the event loop starts running
343 - std::atomic_bool running_ = {false};  
344 - std::mutex running_waiter_lock_; 351 + bool running_ = false;
  352 + std::mutex running_lock_;
345 std::condition_variable running_waiter_; 353 std::condition_variable running_waiter_;
346 354
347 // Variable and CV to know when the event loop stops running 355 // Variable and CV to know when the event loop stops running
348 std::atomic_bool to_exit_ = {false}; // Signal to exit 356 std::atomic_bool to_exit_ = {false}; // Signal to exit
349 - std::atomic_bool exited_ = {false}; // Event thread exited  
350 - std::mutex exit_waiter_lock_; 357 + bool exited_ = false; // Event thread exited
  358 + std::mutex exit_lock_;
351 std::condition_variable exit_waiter_; 359 std::condition_variable exit_waiter_;
352 360
353 // Maps of each Command, fetchable by the unique ID number 361 // Maps of each Command, fetchable by the unique ID number
@@ -393,14 +401,15 @@ template &lt;class ReplyT&gt; @@ -393,14 +401,15 @@ template &lt;class ReplyT&gt;
393 Command<ReplyT> &Redox::createCommand(const std::vector<std::string> &cmd, 401 Command<ReplyT> &Redox::createCommand(const std::vector<std::string> &cmd,
394 const std::function<void(Command<ReplyT> &)> &callback, 402 const std::function<void(Command<ReplyT> &)> &callback,
395 double repeat, double after, bool free_memory) { 403 double repeat, double after, bool free_memory) {
396 -  
397 - if (!running_) {  
398 - throw std::runtime_error("[ERROR] Need to connect Redox before running commands!"); 404 + {
  405 + std::unique_lock<std::mutex> ul(running_lock_);
  406 + if (!running_) {
  407 + throw std::runtime_error("[ERROR] Need to connect Redox before running commands!");
  408 + }
399 } 409 }
400 410
401 - commands_created_ += 1;  
402 - auto *c = new Command<ReplyT>(this, commands_created_, cmd, callback, repeat, after, free_memory,  
403 - logger_); 411 + auto *c = new Command<ReplyT>(this, commands_created_.fetch_add(1), cmd,
  412 + callback, repeat, after, free_memory, logger_);
404 413
405 std::lock_guard<std::mutex> lg(queue_guard_); 414 std::lock_guard<std::mutex> lg(queue_guard_);
406 std::lock_guard<std::mutex> lg2(command_map_guard_); 415 std::lock_guard<std::mutex> lg2(command_map_guard_);
include/redox/command.hpp
@@ -133,7 +133,7 @@ private: @@ -133,7 +133,7 @@ private:
133 133
134 // Place to store the reply value and status. 134 // Place to store the reply value and status.
135 ReplyT reply_val_; 135 ReplyT reply_val_;
136 - std::atomic_int reply_status_; 136 + int reply_status_;
137 std::string last_error_; 137 std::string last_error_;
138 138
139 // How many messages sent to server but not received reply 139 // How many messages sent to server but not received reply
include/redox/subscriber.hpp
@@ -163,9 +163,7 @@ private: @@ -163,9 +163,7 @@ private:
163 163
164 // CVs to wait for unsubscriptions 164 // CVs to wait for unsubscriptions
165 std::condition_variable cv_unsub_; 165 std::condition_variable cv_unsub_;
166 - std::mutex cv_unsub_guard_;  
167 std::condition_variable cv_punsub_; 166 std::condition_variable cv_punsub_;
168 - std::mutex cv_punsub_guard_;  
169 167
170 // Pending subscriptions 168 // Pending subscriptions
171 std::atomic_int num_pending_subs_ = {0}; 169 std::atomic_int num_pending_subs_ = {0};
src/client.cpp
@@ -49,11 +49,16 @@ bool Redox::connect(const string &amp;host, const int port, @@ -49,11 +49,16 @@ bool Redox::connect(const string &amp;host, const int port,
49 49
50 // Block until connected and running the event loop, or until 50 // Block until connected and running the event loop, or until
51 // a connection error happens and the event loop exits 51 // a connection error happens and the event loop exits
52 - unique_lock<mutex> ul(running_waiter_lock_);  
53 - running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); 52 + {
  53 + unique_lock<mutex> ul(running_lock_);
  54 + running_waiter_.wait(ul, [this] {
  55 + lock_guard<mutex> lg(connect_lock_);
  56 + return running_ || connect_state_ == CONNECT_ERROR;
  57 + });
  58 + }
54 59
55 // Return if succeeded 60 // Return if succeeded
56 - return connect_state_ == CONNECTED; 61 + return getConnectState() == CONNECTED;
57 } 62 }
58 63
59 bool Redox::connectUnix(const string &path, function<void(int)> connection_callback) { 64 bool Redox::connectUnix(const string &path, function<void(int)> connection_callback) {
@@ -74,11 +79,16 @@ bool Redox::connectUnix(const string &amp;path, function&lt;void(int)&gt; connection_callb @@ -74,11 +79,16 @@ bool Redox::connectUnix(const string &amp;path, function&lt;void(int)&gt; connection_callb
74 79
75 // Block until connected and running the event loop, or until 80 // Block until connected and running the event loop, or until
76 // a connection error happens and the event loop exits 81 // a connection error happens and the event loop exits
77 - unique_lock<mutex> ul(running_waiter_lock_);  
78 - running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); 82 + {
  83 + unique_lock<mutex> ul(running_lock_);
  84 + running_waiter_.wait(ul, [this] {
  85 + lock_guard<mutex> lg(connect_lock_);
  86 + return running_ || connect_state_ == CONNECT_ERROR;
  87 + });
  88 + }
79 89
80 // Return if succeeded 90 // Return if succeeded
81 - return connect_state_ == CONNECTED; 91 + return getConnectState() == CONNECTED;
82 } 92 }
83 93
84 void Redox::disconnect() { 94 void Redox::disconnect() {
@@ -93,14 +103,14 @@ void Redox::stop() { @@ -93,14 +103,14 @@ void Redox::stop() {
93 } 103 }
94 104
95 void Redox::wait() { 105 void Redox::wait() {
96 - unique_lock<mutex> ul(exit_waiter_lock_);  
97 - exit_waiter_.wait(ul, [this] { return exited_.load(); }); 106 + unique_lock<mutex> ul(exit_lock_);
  107 + exit_waiter_.wait(ul, [this] { return exited_; });
98 } 108 }
99 109
100 Redox::~Redox() { 110 Redox::~Redox() {
101 111
102 // Bring down the event loop 112 // Bring down the event loop
103 - if (running_ == true) { 113 + if (getRunning()) {
104 stop(); 114 stop();
105 } 115 }
106 116
@@ -112,24 +122,23 @@ Redox::~Redox() { @@ -112,24 +122,23 @@ Redox::~Redox() {
112 } 122 }
113 123
114 void Redox::connectedCallback(const redisAsyncContext *ctx, int status) { 124 void Redox::connectedCallback(const redisAsyncContext *ctx, int status) {
115 -  
116 Redox *rdx = (Redox *)ctx->data; 125 Redox *rdx = (Redox *)ctx->data;
117 126
118 if (status != REDIS_OK) { 127 if (status != REDIS_OK) {
119 rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr; 128 rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr;
120 rdx->logger_.fatal() << "Status: " << status; 129 rdx->logger_.fatal() << "Status: " << status;
121 - rdx->connect_state_ = CONNECT_ERROR; 130 + rdx->setConnectState(CONNECT_ERROR);
122 131
123 } else { 132 } else {
  133 + rdx->logger_.info() << "Connected to Redis.";
124 // Disable hiredis automatically freeing reply objects 134 // Disable hiredis automatically freeing reply objects
125 ctx->c.reader->fn->freeObject = [](void *reply) {}; 135 ctx->c.reader->fn->freeObject = [](void *reply) {};
126 - rdx->connect_state_ = CONNECTED;  
127 - rdx->logger_.info() << "Connected to Redis."; 136 + rdx->setConnectState(CONNECTED);
128 } 137 }
129 138
130 - rdx->connect_waiter_.notify_all();  
131 - if (rdx->user_connection_callback_)  
132 - rdx->user_connection_callback_(rdx->connect_state_); 139 + if (rdx->user_connection_callback_) {
  140 + rdx->user_connection_callback_(rdx->getConnectState());
  141 + }
133 } 142 }
134 143
135 void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) { 144 void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) {
@@ -138,16 +147,16 @@ void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) { @@ -138,16 +147,16 @@ void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) {
138 147
139 if (status != REDIS_OK) { 148 if (status != REDIS_OK) {
140 rdx->logger_.error() << "Disconnected from Redis on error: " << ctx->errstr; 149 rdx->logger_.error() << "Disconnected from Redis on error: " << ctx->errstr;
141 - rdx->connect_state_ = DISCONNECT_ERROR; 150 + rdx->setConnectState(DISCONNECT_ERROR);
142 } else { 151 } else {
143 rdx->logger_.info() << "Disconnected from Redis as planned."; 152 rdx->logger_.info() << "Disconnected from Redis as planned.";
144 - rdx->connect_state_ = DISCONNECTED; 153 + rdx->setConnectState(DISCONNECTED);
145 } 154 }
146 155
147 rdx->stop(); 156 rdx->stop();
148 - rdx->connect_waiter_.notify_all();  
149 - if (rdx->user_connection_callback_)  
150 - rdx->user_connection_callback_(rdx->connect_state_); 157 + if (rdx->user_connection_callback_) {
  158 + rdx->user_connection_callback_(rdx->getConnectState());
  159 + }
151 } 160 }
152 161
153 bool Redox::initEv() { 162 bool Redox::initEv() {
@@ -155,8 +164,7 @@ bool Redox::initEv() { @@ -155,8 +164,7 @@ bool Redox::initEv() {
155 evloop_ = ev_loop_new(EVFLAG_AUTO); 164 evloop_ = ev_loop_new(EVFLAG_AUTO);
156 if (evloop_ == nullptr) { 165 if (evloop_ == nullptr) {
157 logger_.fatal() << "Could not create a libev event loop."; 166 logger_.fatal() << "Could not create a libev event loop.";
158 - connect_state_ = INIT_ERROR;  
159 - connect_waiter_.notify_all(); 167 + setConnectState(INIT_ERROR);
160 return false; 168 return false;
161 } 169 }
162 ev_set_userdata(evloop_, (void *)this); // Back-reference 170 ev_set_userdata(evloop_, (void *)this); // Back-reference
@@ -169,31 +177,27 @@ bool Redox::initHiredis() { @@ -169,31 +177,27 @@ bool Redox::initHiredis() {
169 177
170 if (ctx_->err) { 178 if (ctx_->err) {
171 logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr; 179 logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr;
172 - connect_state_ = INIT_ERROR;  
173 - connect_waiter_.notify_all(); 180 + setConnectState(INIT_ERROR);
174 return false; 181 return false;
175 } 182 }
176 183
177 // Attach event loop to hiredis 184 // Attach event loop to hiredis
178 if (redisLibevAttach(evloop_, ctx_) != REDIS_OK) { 185 if (redisLibevAttach(evloop_, ctx_) != REDIS_OK) {
179 logger_.fatal() << "Could not attach libev event loop to hiredis."; 186 logger_.fatal() << "Could not attach libev event loop to hiredis.";
180 - connect_state_ = INIT_ERROR;  
181 - connect_waiter_.notify_all(); 187 + setConnectState(INIT_ERROR);
182 return false; 188 return false;
183 } 189 }
184 190
185 // Set the callbacks to be invoked on server connection/disconnection 191 // Set the callbacks to be invoked on server connection/disconnection
186 if (redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) { 192 if (redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) {
187 logger_.fatal() << "Could not attach connect callback to hiredis."; 193 logger_.fatal() << "Could not attach connect callback to hiredis.";
188 - connect_state_ = INIT_ERROR;  
189 - connect_waiter_.notify_all(); 194 + setConnectState(INIT_ERROR);
190 return false; 195 return false;
191 } 196 }
192 197
193 if (redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) { 198 if (redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) {
194 logger_.fatal() << "Could not attach disconnect callback to hiredis."; 199 logger_.fatal() << "Could not attach disconnect callback to hiredis.";
195 - connect_state_ = INIT_ERROR;  
196 - connect_waiter_.notify_all(); 200 + setConnectState(INIT_ERROR);
197 return false; 201 return false;
198 } 202 }
199 203
@@ -212,6 +216,43 @@ void breakEventLoop(struct ev_loop *loop, ev_async *async, int revents) { @@ -212,6 +216,43 @@ void breakEventLoop(struct ev_loop *loop, ev_async *async, int revents) {
212 ev_break(loop, EVBREAK_ALL); 216 ev_break(loop, EVBREAK_ALL);
213 } 217 }
214 218
  219 +int Redox::getConnectState() {
  220 + lock_guard<mutex> lk(connect_lock_);
  221 + return connect_state_;
  222 +}
  223 +
  224 +void Redox::setConnectState(int connect_state) {
  225 + {
  226 + lock_guard<mutex> lk(connect_lock_);
  227 + connect_state_ = connect_state;
  228 + }
  229 + connect_waiter_.notify_all();
  230 +}
  231 +
  232 +int Redox::getRunning() {
  233 + lock_guard<mutex> lg(running_lock_);
  234 + return running_;
  235 +}
  236 +void Redox::setRunning(bool running) {
  237 + {
  238 + lock_guard<mutex> lg(running_lock_);
  239 + running_ = running;
  240 + }
  241 + running_waiter_.notify_one();
  242 +}
  243 +
  244 +int Redox::getExited() {
  245 + lock_guard<mutex> lg(exit_lock_);
  246 + return exited_;
  247 +}
  248 +void Redox::setExited(bool exited) {
  249 + {
  250 + lock_guard<mutex> lg(exit_lock_);
  251 + exited_ = exited;
  252 + }
  253 + exit_waiter_.notify_one();
  254 +}
  255 +
215 void Redox::runEventLoop() { 256 void Redox::runEventLoop() {
216 257
217 // Events to connect to Redox 258 // Events to connect to Redox
@@ -219,16 +260,17 @@ void Redox::runEventLoop() { @@ -219,16 +260,17 @@ void Redox::runEventLoop() {
219 ev_run(evloop_, EVRUN_NOWAIT); 260 ev_run(evloop_, EVRUN_NOWAIT);
220 261
221 // Block until connected to Redis, or error 262 // Block until connected to Redis, or error
222 - unique_lock<mutex> ul(connect_lock_);  
223 - connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; });  
224 -  
225 - // Handle connection error  
226 - if (connect_state_ != CONNECTED) {  
227 - logger_.warning() << "Did not connect, event loop exiting.";  
228 - exited_ = true;  
229 - running_ = false;  
230 - running_waiter_.notify_one();  
231 - return; 263 + {
  264 + unique_lock<mutex> ul(connect_lock_);
  265 + connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; });
  266 +
  267 + // Handle connection error
  268 + if (connect_state_ != CONNECTED) {
  269 + logger_.warning() << "Did not connect, event loop exiting.";
  270 + setExited(true);
  271 + setRunning(false);
  272 + return;
  273 + }
232 } 274 }
233 275
234 // Set up asynchronous watcher which we signal every 276 // Set up asynchronous watcher which we signal every
@@ -245,8 +287,7 @@ void Redox::runEventLoop() { @@ -245,8 +287,7 @@ void Redox::runEventLoop() {
245 ev_async_init(&watcher_free_, freeQueuedCommands); 287 ev_async_init(&watcher_free_, freeQueuedCommands);
246 ev_async_start(evloop_, &watcher_free_); 288 ev_async_start(evloop_, &watcher_free_);
247 289
248 - running_ = true;  
249 - running_waiter_.notify_one(); 290 + setRunning(true);
250 291
251 // Run the event loop, using NOWAIT if enabled for maximum 292 // Run the event loop, using NOWAIT if enabled for maximum
252 // throughput by avoiding any sleeping 293 // throughput by avoiding any sleeping
@@ -266,23 +307,24 @@ void Redox::runEventLoop() { @@ -266,23 +307,24 @@ void Redox::runEventLoop() {
266 // Wait to receive server replies for clean hiredis disconnect 307 // Wait to receive server replies for clean hiredis disconnect
267 this_thread::sleep_for(chrono::milliseconds(10)); 308 this_thread::sleep_for(chrono::milliseconds(10));
268 ev_run(evloop_, EVRUN_NOWAIT); 309 ev_run(evloop_, EVRUN_NOWAIT);
269 -  
270 - if (connect_state_ == CONNECTED) 310 +
  311 + if (getConnectState() == CONNECTED) {
271 redisAsyncDisconnect(ctx_); 312 redisAsyncDisconnect(ctx_);
  313 + }
272 314
273 // Run once more to disconnect 315 // Run once more to disconnect
274 ev_run(evloop_, EVRUN_NOWAIT); 316 ev_run(evloop_, EVRUN_NOWAIT);
275 317
276 - if (commands_created_ != commands_deleted_) {  
277 - logger_.error() << "All commands were not freed! " << commands_deleted_ << "/"  
278 - << commands_created_; 318 + long created = commands_created_;
  319 + long deleted = commands_deleted_;
  320 + if (created != deleted) {
  321 + logger_.error() << "All commands were not freed! " << deleted << "/"
  322 + << created;
279 } 323 }
280 324
281 - exited_ = true;  
282 - running_ = false;  
283 -  
284 // Let go for block_until_stopped method 325 // Let go for block_until_stopped method
285 - exit_waiter_.notify_one(); 326 + setExited(true);
  327 + setRunning(false);
286 328
287 logger_.info() << "Event thread exited."; 329 logger_.info() << "Event thread exited.";
288 } 330 }
@@ -458,6 +500,7 @@ template &lt;class ReplyT&gt; long Redox::freeAllCommandsOfType() { @@ -458,6 +500,7 @@ template &lt;class ReplyT&gt; long Redox::freeAllCommandsOfType() {
458 500
459 lock_guard<mutex> lg(free_queue_guard_); 501 lock_guard<mutex> lg(free_queue_guard_);
460 lock_guard<mutex> lg2(queue_guard_); 502 lock_guard<mutex> lg2(queue_guard_);
  503 + lock_guard<mutex> lg3(command_map_guard_);
461 504
462 auto &command_map = getCommandMap<ReplyT>(); 505 auto &command_map = getCommandMap<ReplyT>();
463 long len = command_map.size(); 506 long len = command_map.size();
src/command.cpp
@@ -64,7 +64,10 @@ template &lt;class ReplyT&gt; void Command&lt;ReplyT&gt;::processReply(redisReply *r) { @@ -64,7 +64,10 @@ template &lt;class ReplyT&gt; void Command&lt;ReplyT&gt;::processReply(redisReply *r) {
64 64
65 pending_--; 65 pending_--;
66 66
67 - waiting_done_ = true; 67 + {
  68 + unique_lock<mutex> lk(waiter_lock_);
  69 + waiting_done_ = true;
  70 + }
68 waiter_.notify_all(); 71 waiter_.notify_all();
69 72
70 // Always free the reply object for repeating commands 73 // Always free the reply object for repeating commands
src/subscriber.cpp
@@ -54,17 +54,19 @@ void Subscriber::stop() { @@ -54,17 +54,19 @@ void Subscriber::stop() {
54 for (const string &topic : psubscribedTopics()) 54 for (const string &topic : psubscribedTopics())
55 punsubscribe(topic); 55 punsubscribe(topic);
56 56
57 - unique_lock<mutex> ul(cv_unsub_guard_);  
58 - cv_unsub_.wait(ul, [this] {  
59 - lock_guard<mutex> lg(subscribed_topics_guard_);  
60 - return (subscribed_topics_.size() == 0);  
61 - }); 57 + {
  58 + unique_lock<mutex> ul(subscribed_topics_guard_);
  59 + cv_unsub_.wait(ul, [this] {
  60 + return (subscribed_topics_.size() == 0);
  61 + });
  62 + }
62 63
63 - unique_lock<mutex> ul2(cv_punsub_guard_);  
64 - cv_punsub_.wait(ul, [this] {  
65 - lock_guard<mutex> lg(subscribed_topics_guard_);  
66 - return (psubscribed_topics_.size() == 0);  
67 - }); 64 + {
  65 + unique_lock<mutex> ul(psubscribed_topics_guard_);
  66 + cv_punsub_.wait(ul, [this] {
  67 + return (psubscribed_topics_.size() == 0);
  68 + });
  69 + }
68 70
69 for (Command<redisReply *> *c : commands_) 71 for (Command<redisReply *> *c : commands_)
70 c->free(); 72 c->free();
@@ -116,25 +118,27 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic, @@ -116,25 +118,27 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic,
116 if ((reply->type == REDIS_REPLY_ARRAY) && 118 if ((reply->type == REDIS_REPLY_ARRAY) &&
117 (reply->element[reply->elements - 1]->type == REDIS_REPLY_INTEGER)) { 119 (reply->element[reply->elements - 1]->type == REDIS_REPLY_INTEGER)) {
118 120
119 - lock_guard<mutex> lg(subscribed_topics_guard_);  
120 - lock_guard<mutex> lg2(psubscribed_topics_guard_);  
121 121
122 if (!strncmp(reply->element[0]->str, "sub", 3)) { 122 if (!strncmp(reply->element[0]->str, "sub", 3)) {
  123 + lock_guard<mutex> lg(subscribed_topics_guard_);
123 subscribed_topics_.insert(topic); 124 subscribed_topics_.insert(topic);
124 num_pending_subs_--; 125 num_pending_subs_--;
125 if (sub_callback) 126 if (sub_callback)
126 sub_callback(topic); 127 sub_callback(topic);
127 } else if (!strncmp(reply->element[0]->str, "psub", 4)) { 128 } else if (!strncmp(reply->element[0]->str, "psub", 4)) {
  129 + lock_guard<mutex> lg(psubscribed_topics_guard_);
128 psubscribed_topics_.insert(topic); 130 psubscribed_topics_.insert(topic);
129 num_pending_subs_--; 131 num_pending_subs_--;
130 if (sub_callback) 132 if (sub_callback)
131 sub_callback(topic); 133 sub_callback(topic);
132 } else if (!strncmp(reply->element[0]->str, "uns", 3)) { 134 } else if (!strncmp(reply->element[0]->str, "uns", 3)) {
  135 + lock_guard<mutex> lg(subscribed_topics_guard_);
133 subscribed_topics_.erase(topic); 136 subscribed_topics_.erase(topic);
134 if (unsub_callback) 137 if (unsub_callback)
135 unsub_callback(topic); 138 unsub_callback(topic);
136 cv_unsub_.notify_all(); 139 cv_unsub_.notify_all();
137 } else if (!strncmp(reply->element[0]->str, "puns", 4)) { 140 } else if (!strncmp(reply->element[0]->str, "puns", 4)) {
  141 + lock_guard<mutex> lg(psubscribed_topics_guard_);
138 psubscribed_topics_.erase(topic); 142 psubscribed_topics_.erase(topic);
139 if (unsub_callback) 143 if (unsub_callback)
140 unsub_callback(topic); 144 unsub_callback(topic);
test/test.cpp
@@ -240,6 +240,64 @@ TEST_F(RedoxTest, GetSetSyncError) { @@ -240,6 +240,64 @@ TEST_F(RedoxTest, GetSetSyncError) {
240 rdx.disconnect(); 240 rdx.disconnect();
241 } 241 }
242 242
  243 +TEST_F(RedoxTest, MultithreadedCRUD) {
  244 + connect();
  245 + int create_count(0);
  246 + int delete_count(0);
  247 + int createExcCount(0);
  248 + int deleteExcCount(0);
  249 +
  250 + std::mutex startMutex;
  251 + bool start = false;
  252 + std::condition_variable start_cv;
  253 + const int count = 10000;
  254 +
  255 + std::thread create_thread([&]() {
  256 + {
  257 + std::unique_lock<std::mutex> lock(startMutex);
  258 + start_cv.wait(lock, [&]() { return start; });
  259 + }
  260 + for (int i = 0; i < count; ++i) {
  261 + try {
  262 + rdx.commandSync<string>({"SET", "redox_test:mt", "create"});
  263 + }
  264 + catch (...) {
  265 + createExcCount++;
  266 + }
  267 + create_count++;
  268 + }
  269 + });
  270 +
  271 + std::thread delete_thread([&]() {
  272 + {
  273 + std::unique_lock<std::mutex> lock(startMutex);
  274 + start_cv.wait(lock, [&]() { return start; });
  275 + }
  276 + for (int i = 0; i < count; ++i) {
  277 + try {
  278 + rdx.commandSync<int>({"DEL", "redox_test:mt"});
  279 + }
  280 + catch (...) {
  281 + deleteExcCount++;
  282 + }
  283 + delete_count++;
  284 + }
  285 + });
  286 +
  287 + // Start threads
  288 + {
  289 + std::lock_guard<std::mutex> lock(startMutex);
  290 + start = true;
  291 + }
  292 + start_cv.notify_all();
  293 +
  294 + // Wait for threads to finish
  295 + create_thread.join();
  296 + delete_thread.join();
  297 + EXPECT_EQ(count, create_count);
  298 + EXPECT_EQ(count, delete_count);
  299 +}
  300 +
243 // ------------------------------------------- 301 // -------------------------------------------
244 // End tests 302 // End tests
245 // ------------------------------------------- 303 // -------------------------------------------