Commit 6b57d0251f0272ca7bdbbaac9f71b2814fbe261e

Authored by Collin Hockey
1 parent 475171c9

Fix condition variable usage in subscriber

The lock used with the condition variable should always be the same one
guarding changes to the condition itself, in these cases, the lock
guarding changes to the subscribe and psubscibe data structures.

This commit also limits the number of locks held by the subscribeBase
function at any one time.
include/redox/subscriber.hpp
@@ -163,9 +163,7 @@ private: @@ -163,9 +163,7 @@ private:
163 163
164 // CVs to wait for unsubscriptions 164 // CVs to wait for unsubscriptions
165 std::condition_variable cv_unsub_; 165 std::condition_variable cv_unsub_;
166 - std::mutex cv_unsub_guard_;  
167 std::condition_variable cv_punsub_; 166 std::condition_variable cv_punsub_;
168 - std::mutex cv_punsub_guard_;  
169 167
170 // Pending subscriptions 168 // Pending subscriptions
171 std::atomic_int num_pending_subs_ = {0}; 169 std::atomic_int num_pending_subs_ = {0};
src/subscriber.cpp
@@ -54,17 +54,19 @@ void Subscriber::stop() { @@ -54,17 +54,19 @@ void Subscriber::stop() {
54 for (const string &topic : psubscribedTopics()) 54 for (const string &topic : psubscribedTopics())
55 punsubscribe(topic); 55 punsubscribe(topic);
56 56
57 - unique_lock<mutex> ul(cv_unsub_guard_);  
58 - cv_unsub_.wait(ul, [this] {  
59 - lock_guard<mutex> lg(subscribed_topics_guard_);  
60 - return (subscribed_topics_.size() == 0);  
61 - }); 57 + {
  58 + unique_lock<mutex> ul(subscribed_topics_guard_);
  59 + cv_unsub_.wait(ul, [this] {
  60 + return (subscribed_topics_.size() == 0);
  61 + });
  62 + }
62 63
63 - unique_lock<mutex> ul2(cv_punsub_guard_);  
64 - cv_punsub_.wait(ul, [this] {  
65 - lock_guard<mutex> lg(subscribed_topics_guard_);  
66 - return (psubscribed_topics_.size() == 0);  
67 - }); 64 + {
  65 + unique_lock<mutex> ul(psubscribed_topics_guard_);
  66 + cv_punsub_.wait(ul, [this] {
  67 + return (psubscribed_topics_.size() == 0);
  68 + });
  69 + }
68 70
69 for (Command<redisReply *> *c : commands_) 71 for (Command<redisReply *> *c : commands_)
70 c->free(); 72 c->free();
@@ -116,25 +118,27 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic, @@ -116,25 +118,27 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic,
116 if ((reply->type == REDIS_REPLY_ARRAY) && 118 if ((reply->type == REDIS_REPLY_ARRAY) &&
117 (reply->element[reply->elements - 1]->type == REDIS_REPLY_INTEGER)) { 119 (reply->element[reply->elements - 1]->type == REDIS_REPLY_INTEGER)) {
118 120
119 - lock_guard<mutex> lg(subscribed_topics_guard_);  
120 - lock_guard<mutex> lg2(psubscribed_topics_guard_);  
121 121
122 if (!strncmp(reply->element[0]->str, "sub", 3)) { 122 if (!strncmp(reply->element[0]->str, "sub", 3)) {
  123 + lock_guard<mutex> lg(subscribed_topics_guard_);
123 subscribed_topics_.insert(topic); 124 subscribed_topics_.insert(topic);
124 num_pending_subs_--; 125 num_pending_subs_--;
125 if (sub_callback) 126 if (sub_callback)
126 sub_callback(topic); 127 sub_callback(topic);
127 } else if (!strncmp(reply->element[0]->str, "psub", 4)) { 128 } else if (!strncmp(reply->element[0]->str, "psub", 4)) {
  129 + lock_guard<mutex> lg(psubscribed_topics_guard_);
128 psubscribed_topics_.insert(topic); 130 psubscribed_topics_.insert(topic);
129 num_pending_subs_--; 131 num_pending_subs_--;
130 if (sub_callback) 132 if (sub_callback)
131 sub_callback(topic); 133 sub_callback(topic);
132 } else if (!strncmp(reply->element[0]->str, "uns", 3)) { 134 } else if (!strncmp(reply->element[0]->str, "uns", 3)) {
  135 + lock_guard<mutex> lg(subscribed_topics_guard_);
133 subscribed_topics_.erase(topic); 136 subscribed_topics_.erase(topic);
134 if (unsub_callback) 137 if (unsub_callback)
135 unsub_callback(topic); 138 unsub_callback(topic);
136 cv_unsub_.notify_all(); 139 cv_unsub_.notify_all();
137 } else if (!strncmp(reply->element[0]->str, "puns", 4)) { 140 } else if (!strncmp(reply->element[0]->str, "puns", 4)) {
  141 + lock_guard<mutex> lg(psubscribed_topics_guard_);
138 psubscribed_topics_.erase(topic); 142 psubscribed_topics_.erase(topic);
139 if (unsub_callback) 143 if (unsub_callback)
140 unsub_callback(topic); 144 unsub_callback(topic);