From d6beae7cece810a52a4ec8e12de17447c5a33339 Mon Sep 17 00:00:00 2001 From: Hayk Martirosyan Date: Tue, 27 Jan 2015 02:07:03 -0800 Subject: [PATCH] Make .disconnect() a combination of .wait() and .stop() --- examples/basic.cpp | 1 + examples/basic_threaded.cpp | 1 - examples/binary_data.cpp | 1 + examples/data_types.cpp | 2 +- examples/lpush_benchmark.cpp | 2 +- examples/multi-client.cpp | 4 ++++ examples/pub_sub.cpp | 2 -- examples/speed_test_async.cpp | 1 - examples/speed_test_async_multi.cpp | 1 - examples/speed_test_sync.cpp | 1 + src/client.cpp | 11 ++++++++--- src/client.hpp | 8 +++++++- src/subscriber.cpp | 25 +++++++++++++------------ src/subscriber.hpp | 7 ++++++- test/test.cpp | 4 +++- 15 files changed, 46 insertions(+), 25 deletions(-) diff --git a/examples/basic.cpp b/examples/basic.cpp index 90155ba..a7be0e2 100644 --- a/examples/basic.cpp +++ b/examples/basic.cpp @@ -22,5 +22,6 @@ int main(int argc, char* argv[]) { cout << "key = \"occupation\", value = \"" << rdx.get("occupation") << "\"" << endl; + rdx.disconnect(); return 0; } diff --git a/examples/basic_threaded.cpp b/examples/basic_threaded.cpp index 11f0acd..8b246f7 100644 --- a/examples/basic_threaded.cpp +++ b/examples/basic_threaded.cpp @@ -42,6 +42,5 @@ int main(int argc, char* argv[]) { getter.join(); rdx.disconnect(); - rdx.wait(); return 0; }; diff --git a/examples/binary_data.cpp b/examples/binary_data.cpp index 484b20f..757e62a 100644 --- a/examples/binary_data.cpp +++ b/examples/binary_data.cpp @@ -42,5 +42,6 @@ int main(int argc, char* argv[]) { else cerr << "Failed to get key! Status: " << c2.status() << endl; c2.free(); + rdx.disconnect(); return 0; } diff --git a/examples/data_types.cpp b/examples/data_types.cpp index 00afc51..2d857b9 100644 --- a/examples/data_types.cpp +++ b/examples/data_types.cpp @@ -46,7 +46,7 @@ int main(int argc, char* argv[]) { for (const string& s : c.reply()) cout << s << " "; cout << endl; } - rdx.disconnect(); + rdx.stop(); } ); diff --git a/examples/lpush_benchmark.cpp b/examples/lpush_benchmark.cpp index dba9bfa..82b38ad 100644 --- a/examples/lpush_benchmark.cpp +++ b/examples/lpush_benchmark.cpp @@ -43,7 +43,7 @@ int main(int argc, char* argv[]) { cout << "Total time: " << t2 - t0 << "s" << endl; cout << "Result: " << (double)len / (t2-t0) << " commands/s" << endl; - rdx.disconnect(); + rdx.stop(); } }); } diff --git a/examples/multi-client.cpp b/examples/multi-client.cpp index 98d5001..2aeea7e 100644 --- a/examples/multi-client.cpp +++ b/examples/multi-client.cpp @@ -22,5 +22,9 @@ int main(int argc, char* argv[]) { cout << "key = occupation, value = \"" << rdx3.get("occupation") << "\"" << endl; + rdx1.disconnect(); + rdx2.disconnect(); + rdx3.disconnect(); + return 0; } diff --git a/examples/pub_sub.cpp b/examples/pub_sub.cpp index 8a369b3..af8a7c1 100644 --- a/examples/pub_sub.cpp +++ b/examples/pub_sub.cpp @@ -55,8 +55,6 @@ int main(int argc, char *argv[]) { this_thread::sleep_for(chrono::milliseconds(10)); subscriber.disconnect(); - subscriber.wait(); publisher.disconnect(); - publisher.wait(); return 0; } diff --git a/examples/speed_test_async.cpp b/examples/speed_test_async.cpp index 05970be..101fb08 100644 --- a/examples/speed_test_async.cpp +++ b/examples/speed_test_async.cpp @@ -66,6 +66,5 @@ int main(int argc, char* argv[]) { cout << "Final value of counter: " << final_count << endl; rdx.disconnect(); - rdx.wait(); return 0; } diff --git a/examples/speed_test_async_multi.cpp b/examples/speed_test_async_multi.cpp index ea25061..e37cf5e 100644 --- a/examples/speed_test_async_multi.cpp +++ b/examples/speed_test_async_multi.cpp @@ -71,6 +71,5 @@ int main(int argc, char* argv[]) { cout << "Final value of counter: " << final_count << endl; rdx.disconnect(); - rdx.wait(); return 0; } diff --git a/examples/speed_test_sync.cpp b/examples/speed_test_sync.cpp index 5cfe11d..69f1761 100644 --- a/examples/speed_test_sync.cpp +++ b/examples/speed_test_sync.cpp @@ -53,5 +53,6 @@ int main(int argc, char* argv[]) { cout << "Final value of counter: " << final_count << endl; + rdx.disconnect(); return 0; } diff --git a/src/client.cpp b/src/client.cpp index 4bc3da9..3364b4e 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -56,7 +56,7 @@ void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) { rdx->connect_state_ = DISCONNECTED; } - rdx->disconnect(); + rdx->stop(); rdx->connect_waiter_.notify_all(); if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_); } @@ -208,8 +208,13 @@ bool Redox::connect() { } void Redox::disconnect() { + stop(); + wait(); +} + +void Redox::stop() { to_exit_ = true; - logger_.debug() << "disconnect() called, breaking event loop"; + logger_.debug() << "stop() called, breaking event loop"; ev_async_send(evloop_, &watcher_stop_); } @@ -221,7 +226,7 @@ void Redox::wait() { Redox::~Redox() { // Bring down the event loop - disconnect(); + stop(); if(event_loop_thread_.joinable()) event_loop_thread_.join(); ev_loop_destroy(evloop_); diff --git a/src/client.hpp b/src/client.hpp index 5b43037..c00c48c 100644 --- a/src/client.hpp +++ b/src/client.hpp @@ -98,11 +98,17 @@ public: bool connect(); /** - * Signal the event loop thread to disconnect from Redis and shut down. + * Disconnect from Redis, shut down the event loop, then return. A simple + * combination of .stop() and .wait(). */ void disconnect(); /** + * Signal the event loop thread to disconnect from Redis and shut down. + */ + void stop(); + + /** * Blocks until the event loop exits and disconnection is complete, then returns. * Usually no need to call manually as it is handled in the destructor. */ diff --git a/src/subscriber.cpp b/src/subscriber.cpp index 9de0f11..571bf69 100644 --- a/src/subscriber.cpp +++ b/src/subscriber.cpp @@ -43,6 +43,15 @@ Subscriber::~Subscriber() { } +void Subscriber::disconnect() { + stop(); + wait(); +} + +void Subscriber::wait() { + rdx_.wait(); +} + // This is a fairly awkward way of shutting down, where // we pause to wait for subscriptions to happen, and then // unsubscribe from everything and wait for that to finish. @@ -50,7 +59,7 @@ Subscriber::~Subscriber() { // a segfault in freeReplyObject() under redisAsyncDisconnect() // if we don't do this first. // TODO look at hiredis, ask them what causes the error -void Subscriber::disconnect() { +void Subscriber::stop() { this_thread::sleep_for(chrono::milliseconds(1000)); @@ -63,11 +72,8 @@ void Subscriber::disconnect() { unique_lock ul(cv_unsub_guard_); cv_unsub_.wait(ul, [this] { std::lock_guard lg(subscribed_topics_guard_); - cout << "sub topic count : " << subscribed_topics_.size() << endl; - for(const string& topic : subscribed_topics_) - cout << "topic remaining: " << topic << endl; - - return (subscribed_topics_.size() == 0); }); + return (subscribed_topics_.size() == 0); + }); unique_lock ul2(cv_punsub_guard_); cv_punsub_.wait(ul, [this] { @@ -78,7 +84,7 @@ void Subscriber::disconnect() { for(Command* c : commands_) c->free(); - rdx_.disconnect(); + rdx_.stop(); } // For debugging only @@ -132,12 +138,10 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic, num_pending_subs_--; if (sub_callback) sub_callback(topic); } else if (!strncmp(reply->element[0]->str, "uns", 3)) { - cout << "unsub from topic " << topic << endl; subscribed_topics_.erase(topic); if (unsub_callback) unsub_callback(topic); cv_unsub_.notify_all(); } else if (!strncmp(reply->element[0]->str, "puns", 4)) { - cout << "punsub from topic " << topic << endl; psubscribed_topics_.erase(topic); if (unsub_callback) unsub_callback(topic); cv_punsub_.notify_all(); @@ -199,15 +203,12 @@ void Subscriber::psubscribe(const string topic, void Subscriber::unsubscribeBase(const string cmd_name, const string topic, function err_callback ) { - cout << "running " << cmd_name << " for " << topic << endl; rdx_.command(cmd_name + " " + topic, [topic, err_callback](Command& c) { if(!c.ok()) { if (err_callback) err_callback(topic, c.status()); return; } - - cout << "got unsub reply - " << c.cmd() << ": " << c.reply() << endl; } ); } diff --git a/src/subscriber.hpp b/src/subscriber.hpp index c2e455c..4eb3dbc 100644 --- a/src/subscriber.hpp +++ b/src/subscriber.hpp @@ -60,6 +60,11 @@ public: bool connect() { return rdx_.connect(); } /** + * Same as .stop() on a Redox instance. + */ + void stop(); + + /** * Same as .disconnect() on a Redox instance. */ void disconnect(); @@ -67,7 +72,7 @@ public: /** * Same as .wait() on a Redox instance. */ - void wait() { rdx_.wait(); } + void wait(); /** * Subscribe to a topic. diff --git a/test/test.cpp b/test/test.cpp index 359353e..ef93268 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -100,7 +100,6 @@ protected: unique_lock ul(cmd_waiter_lock); cmd_waiter.wait(ul, [this] { return (cmd_count == 0); }); rdx.disconnect(); - rdx.wait(); }; template @@ -178,12 +177,14 @@ TEST_F(RedoxTest, Loop) { TEST_F(RedoxTest, GetSetSync) { print_and_check_sync(rdx.commandSync("SET redox_test:a apple"), "OK"); print_and_check_sync(rdx.commandSync("GET redox_test:a"), "apple"); + rdx.disconnect(); } TEST_F(RedoxTest, DeleteSync) { print_and_check_sync(rdx.commandSync("SET redox_test:a apple"), "OK"); print_and_check_sync(rdx.commandSync("DEL redox_test:a"), 1); check_sync(rdx.commandSync("GET redox_test:a"), nullptr); + rdx.disconnect(); } TEST_F(RedoxTest, IncrSync) { @@ -192,6 +193,7 @@ TEST_F(RedoxTest, IncrSync) { check_sync(rdx.commandSync("INCR redox_test:a"), i+1); } print_and_check_sync(rdx.commandSync("GET redox_test:a"), to_string(count)); + rdx.disconnect(); } // ------------------------------------------- -- libgit2 0.21.4