diff --git a/include/redox/subscriber.hpp b/include/redox/subscriber.hpp index 9635ec7..63a89f4 100644 --- a/include/redox/subscriber.hpp +++ b/include/redox/subscriber.hpp @@ -163,9 +163,7 @@ private: // CVs to wait for unsubscriptions std::condition_variable cv_unsub_; - std::mutex cv_unsub_guard_; std::condition_variable cv_punsub_; - std::mutex cv_punsub_guard_; // Pending subscriptions std::atomic_int num_pending_subs_ = {0}; diff --git a/src/subscriber.cpp b/src/subscriber.cpp index c650d58..47815d6 100644 --- a/src/subscriber.cpp +++ b/src/subscriber.cpp @@ -54,17 +54,19 @@ void Subscriber::stop() { for (const string &topic : psubscribedTopics()) punsubscribe(topic); - unique_lock ul(cv_unsub_guard_); - cv_unsub_.wait(ul, [this] { - lock_guard lg(subscribed_topics_guard_); - return (subscribed_topics_.size() == 0); - }); + { + unique_lock ul(subscribed_topics_guard_); + cv_unsub_.wait(ul, [this] { + return (subscribed_topics_.size() == 0); + }); + } - unique_lock ul2(cv_punsub_guard_); - cv_punsub_.wait(ul, [this] { - lock_guard lg(subscribed_topics_guard_); - return (psubscribed_topics_.size() == 0); - }); + { + unique_lock ul(psubscribed_topics_guard_); + cv_punsub_.wait(ul, [this] { + return (psubscribed_topics_.size() == 0); + }); + } for (Command *c : commands_) c->free(); @@ -116,25 +118,27 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic, if ((reply->type == REDIS_REPLY_ARRAY) && (reply->element[reply->elements - 1]->type == REDIS_REPLY_INTEGER)) { - lock_guard lg(subscribed_topics_guard_); - lock_guard lg2(psubscribed_topics_guard_); if (!strncmp(reply->element[0]->str, "sub", 3)) { + lock_guard lg(subscribed_topics_guard_); subscribed_topics_.insert(topic); num_pending_subs_--; if (sub_callback) sub_callback(topic); } else if (!strncmp(reply->element[0]->str, "psub", 4)) { + lock_guard lg(psubscribed_topics_guard_); psubscribed_topics_.insert(topic); num_pending_subs_--; if (sub_callback) sub_callback(topic); } else if (!strncmp(reply->element[0]->str, "uns", 3)) { + lock_guard lg(subscribed_topics_guard_); subscribed_topics_.erase(topic); if (unsub_callback) unsub_callback(topic); cv_unsub_.notify_all(); } else if (!strncmp(reply->element[0]->str, "puns", 4)) { + lock_guard lg(psubscribed_topics_guard_); psubscribed_topics_.erase(topic); if (unsub_callback) unsub_callback(topic);