Commit d6beae7cece810a52a4ec8e12de17447c5a33339

Authored by Hayk Martirosyan
1 parent ca1d3171

Make .disconnect() a combination of .wait() and .stop()

More intuitive if disconnect() blocks like connect(). Most clients will
want to use these two methods. For more fine grained control,
disconnect() is just a combo of stop() and wait().
examples/basic.cpp
... ... @@ -22,5 +22,6 @@ int main(int argc, char* argv[]) {
22 22  
23 23 cout << "key = \"occupation\", value = \"" << rdx.get("occupation") << "\"" << endl;
24 24  
  25 + rdx.disconnect();
25 26 return 0;
26 27 }
... ...
examples/basic_threaded.cpp
... ... @@ -42,6 +42,5 @@ int main(int argc, char* argv[]) {
42 42 getter.join();
43 43  
44 44 rdx.disconnect();
45   - rdx.wait();
46 45 return 0;
47 46 };
... ...
examples/binary_data.cpp
... ... @@ -42,5 +42,6 @@ int main(int argc, char* argv[]) {
42 42 else cerr << "Failed to get key! Status: " << c2.status() << endl;
43 43 c2.free();
44 44  
  45 + rdx.disconnect();
45 46 return 0;
46 47 }
... ...
examples/data_types.cpp
... ... @@ -46,7 +46,7 @@ int main(int argc, char* argv[]) {
46 46 for (const string& s : c.reply()) cout << s << " ";
47 47 cout << endl;
48 48 }
49   - rdx.disconnect();
  49 + rdx.stop();
50 50 }
51 51 );
52 52  
... ...
examples/lpush_benchmark.cpp
... ... @@ -43,7 +43,7 @@ int main(int argc, char* argv[]) {
43 43 cout << "Total time: " << t2 - t0 << "s" << endl;
44 44 cout << "Result: " << (double)len / (t2-t0) << " commands/s" << endl;
45 45  
46   - rdx.disconnect();
  46 + rdx.stop();
47 47 }
48 48 });
49 49 }
... ...
examples/multi-client.cpp
... ... @@ -22,5 +22,9 @@ int main(int argc, char* argv[]) {
22 22  
23 23 cout << "key = occupation, value = \"" << rdx3.get("occupation") << "\"" << endl;
24 24  
  25 + rdx1.disconnect();
  26 + rdx2.disconnect();
  27 + rdx3.disconnect();
  28 +
25 29 return 0;
26 30 }
... ...
examples/pub_sub.cpp
... ... @@ -55,8 +55,6 @@ int main(int argc, char *argv[]) {
55 55 this_thread::sleep_for(chrono::milliseconds(10));
56 56  
57 57 subscriber.disconnect();
58   - subscriber.wait();
59 58 publisher.disconnect();
60   - publisher.wait();
61 59 return 0;
62 60 }
... ...
examples/speed_test_async.cpp
... ... @@ -66,6 +66,5 @@ int main(int argc, char* argv[]) {
66 66 cout << "Final value of counter: " << final_count << endl;
67 67  
68 68 rdx.disconnect();
69   - rdx.wait();
70 69 return 0;
71 70 }
... ...
examples/speed_test_async_multi.cpp
... ... @@ -71,6 +71,5 @@ int main(int argc, char* argv[]) {
71 71 cout << "Final value of counter: " << final_count << endl;
72 72  
73 73 rdx.disconnect();
74   - rdx.wait();
75 74 return 0;
76 75 }
... ...
examples/speed_test_sync.cpp
... ... @@ -53,5 +53,6 @@ int main(int argc, char* argv[]) {
53 53  
54 54 cout << "Final value of counter: " << final_count << endl;
55 55  
  56 + rdx.disconnect();
56 57 return 0;
57 58 }
... ...
src/client.cpp
... ... @@ -56,7 +56,7 @@ void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) {
56 56 rdx->connect_state_ = DISCONNECTED;
57 57 }
58 58  
59   - rdx->disconnect();
  59 + rdx->stop();
60 60 rdx->connect_waiter_.notify_all();
61 61 if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_);
62 62 }
... ... @@ -208,8 +208,13 @@ bool Redox::connect() {
208 208 }
209 209  
210 210 void Redox::disconnect() {
  211 + stop();
  212 + wait();
  213 +}
  214 +
  215 +void Redox::stop() {
211 216 to_exit_ = true;
212   - logger_.debug() << "disconnect() called, breaking event loop";
  217 + logger_.debug() << "stop() called, breaking event loop";
213 218 ev_async_send(evloop_, &watcher_stop_);
214 219 }
215 220  
... ... @@ -221,7 +226,7 @@ void Redox::wait() {
221 226 Redox::~Redox() {
222 227  
223 228 // Bring down the event loop
224   - disconnect();
  229 + stop();
225 230  
226 231 if(event_loop_thread_.joinable()) event_loop_thread_.join();
227 232 ev_loop_destroy(evloop_);
... ...
src/client.hpp
... ... @@ -98,11 +98,17 @@ public:
98 98 bool connect();
99 99  
100 100 /**
101   - * Signal the event loop thread to disconnect from Redis and shut down.
  101 + * Disconnect from Redis, shut down the event loop, then return. A simple
  102 + * combination of .stop() and .wait().
102 103 */
103 104 void disconnect();
104 105  
105 106 /**
  107 + * Signal the event loop thread to disconnect from Redis and shut down.
  108 + */
  109 + void stop();
  110 +
  111 + /**
106 112 * Blocks until the event loop exits and disconnection is complete, then returns.
107 113 * Usually no need to call manually as it is handled in the destructor.
108 114 */
... ...
src/subscriber.cpp
... ... @@ -43,6 +43,15 @@ Subscriber::~Subscriber() {
43 43  
44 44 }
45 45  
  46 +void Subscriber::disconnect() {
  47 + stop();
  48 + wait();
  49 +}
  50 +
  51 +void Subscriber::wait() {
  52 + rdx_.wait();
  53 +}
  54 +
46 55 // This is a fairly awkward way of shutting down, where
47 56 // we pause to wait for subscriptions to happen, and then
48 57 // unsubscribe from everything and wait for that to finish.
... ... @@ -50,7 +59,7 @@ Subscriber::~Subscriber() {
50 59 // a segfault in freeReplyObject() under redisAsyncDisconnect()
51 60 // if we don't do this first.
52 61 // TODO look at hiredis, ask them what causes the error
53   -void Subscriber::disconnect() {
  62 +void Subscriber::stop() {
54 63  
55 64 this_thread::sleep_for(chrono::milliseconds(1000));
56 65  
... ... @@ -63,11 +72,8 @@ void Subscriber::disconnect() {
63 72 unique_lock<mutex> ul(cv_unsub_guard_);
64 73 cv_unsub_.wait(ul, [this] {
65 74 std::lock_guard<std::mutex> lg(subscribed_topics_guard_);
66   - cout << "sub topic count : " << subscribed_topics_.size() << endl;
67   - for(const string& topic : subscribed_topics_)
68   - cout << "topic remaining: " << topic << endl;
69   -
70   - return (subscribed_topics_.size() == 0); });
  75 + return (subscribed_topics_.size() == 0);
  76 + });
71 77  
72 78 unique_lock<mutex> ul2(cv_punsub_guard_);
73 79 cv_punsub_.wait(ul, [this] {
... ... @@ -78,7 +84,7 @@ void Subscriber::disconnect() {
78 84 for(Command<redisReply*>* c : commands_)
79 85 c->free();
80 86  
81   - rdx_.disconnect();
  87 + rdx_.stop();
82 88 }
83 89  
84 90 // For debugging only
... ... @@ -132,12 +138,10 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic,
132 138 num_pending_subs_--;
133 139 if (sub_callback) sub_callback(topic);
134 140 } else if (!strncmp(reply->element[0]->str, "uns", 3)) {
135   - cout << "unsub from topic " << topic << endl;
136 141 subscribed_topics_.erase(topic);
137 142 if (unsub_callback) unsub_callback(topic);
138 143 cv_unsub_.notify_all();
139 144 } else if (!strncmp(reply->element[0]->str, "puns", 4)) {
140   - cout << "punsub from topic " << topic << endl;
141 145 psubscribed_topics_.erase(topic);
142 146 if (unsub_callback) unsub_callback(topic);
143 147 cv_punsub_.notify_all();
... ... @@ -199,15 +203,12 @@ void Subscriber::psubscribe(const string topic,
199 203 void Subscriber::unsubscribeBase(const string cmd_name, const string topic,
200 204 function<void(const string&, int)> err_callback
201 205 ) {
202   - cout << "running " << cmd_name << " for " << topic << endl;
203 206 rdx_.command<redisReply*>(cmd_name + " " + topic,
204 207 [topic, err_callback](Command<redisReply*>& c) {
205 208 if(!c.ok()) {
206 209 if (err_callback) err_callback(topic, c.status());
207 210 return;
208 211 }
209   -
210   - cout << "got unsub reply - " << c.cmd() << ": " << c.reply() << endl;
211 212 }
212 213 );
213 214 }
... ...
src/subscriber.hpp
... ... @@ -60,6 +60,11 @@ public:
60 60 bool connect() { return rdx_.connect(); }
61 61  
62 62 /**
  63 + * Same as .stop() on a Redox instance.
  64 + */
  65 + void stop();
  66 +
  67 + /**
63 68 * Same as .disconnect() on a Redox instance.
64 69 */
65 70 void disconnect();
... ... @@ -67,7 +72,7 @@ public:
67 72 /**
68 73 * Same as .wait() on a Redox instance.
69 74 */
70   - void wait() { rdx_.wait(); }
  75 + void wait();
71 76  
72 77 /**
73 78 * Subscribe to a topic.
... ...
test/test.cpp
... ... @@ -100,7 +100,6 @@ protected:
100 100 unique_lock<mutex> ul(cmd_waiter_lock);
101 101 cmd_waiter.wait(ul, [this] { return (cmd_count == 0); });
102 102 rdx.disconnect();
103   - rdx.wait();
104 103 };
105 104  
106 105 template<class ReplyT>
... ... @@ -178,12 +177,14 @@ TEST_F(RedoxTest, Loop) {
178 177 TEST_F(RedoxTest, GetSetSync) {
179 178 print_and_check_sync<string>(rdx.commandSync<string>("SET redox_test:a apple"), "OK");
180 179 print_and_check_sync<string>(rdx.commandSync<string>("GET redox_test:a"), "apple");
  180 + rdx.disconnect();
181 181 }
182 182  
183 183 TEST_F(RedoxTest, DeleteSync) {
184 184 print_and_check_sync<string>(rdx.commandSync<string>("SET redox_test:a apple"), "OK");
185 185 print_and_check_sync(rdx.commandSync<int>("DEL redox_test:a"), 1);
186 186 check_sync(rdx.commandSync<nullptr_t>("GET redox_test:a"), nullptr);
  187 + rdx.disconnect();
187 188 }
188 189  
189 190 TEST_F(RedoxTest, IncrSync) {
... ... @@ -192,6 +193,7 @@ TEST_F(RedoxTest, IncrSync) {
192 193 check_sync(rdx.commandSync<int>("INCR redox_test:a"), i+1);
193 194 }
194 195 print_and_check_sync(rdx.commandSync<string>("GET redox_test:a"), to_string(count));
  196 + rdx.disconnect();
195 197 }
196 198  
197 199 // -------------------------------------------
... ...