Commit bfa303d5d242fc5ec75cec22b0e789277eb2a2ea

Authored by Hayk Martirosyan
1 parent 87ff8a3e

Added block_until_stopped method

Used to block the main thread until the event loop is stopped by
calling the .stop() method. Implemented using a condition_variable,
and very useful for clients.
examples/lpush_benchmark.cpp
@@ -3,7 +3,6 @@ @@ -3,7 +3,6 @@
3 */ 3 */
4 4
5 #include <iostream> 5 #include <iostream>
6 -#include <mutex>  
7 #include "../src/redisx.hpp" 6 #include "../src/redisx.hpp"
8 7
9 using namespace std; 8 using namespace std;
@@ -18,6 +17,7 @@ int main(int argc, char* argv[]) { @@ -18,6 +17,7 @@ int main(int argc, char* argv[]) {
18 redisx::Redis rdx = {"localhost", 6379}; 17 redisx::Redis rdx = {"localhost", 6379};
19 rdx.run(); 18 rdx.run();
20 19
  20 + // TODO wait for this somehow
21 rdx.command("DEL test"); 21 rdx.command("DEL test");
22 22
23 unsigned long t0 = time_ms(); 23 unsigned long t0 = time_ms();
@@ -25,11 +25,9 @@ int main(int argc, char* argv[]) { @@ -25,11 +25,9 @@ int main(int argc, char* argv[]) {
25 25
26 int len = 1000000; 26 int len = 1000000;
27 int count = 0; 27 int count = 0;
28 - mutex task_lock;  
29 28
30 - task_lock.lock();  
31 for(int i = 1; i <= len; i++) { 29 for(int i = 1; i <= len; i++) {
32 - rdx.command<int>("lpush test 1", [&t0, &t1, &count, len, &task_lock](const string& cmd, int reply) { 30 + rdx.command<int>("lpush test 1", [&t0, &t1, &count, len, &rdx](const string& cmd, int reply) {
33 31
34 count++; 32 count++;
35 if(count == len) { 33 if(count == len) {
@@ -40,14 +38,12 @@ int main(int argc, char* argv[]) { @@ -40,14 +38,12 @@ int main(int argc, char* argv[]) {
40 cout << "Time to receive all: " << t2 - t1 << "ms" << endl; 38 cout << "Time to receive all: " << t2 - t1 << "ms" << endl;
41 cout << "Total time: " << t2 - t0 << "ms" << endl; 39 cout << "Total time: " << t2 - t0 << "ms" << endl;
42 40
43 - task_lock.unlock(); 41 + rdx.stop();
44 } 42 }
45 }); 43 });
46 } 44 }
47 t1 = time_ms(); 45 t1 = time_ms();
48 46
49 - task_lock.lock();  
50 - rdx.stop();  
51 - 47 + rdx.block_until_stopped();
52 return 0; 48 return 0;
53 }; 49 };
src/redisx.cpp
@@ -73,6 +73,10 @@ void Redis::run_blocking() { @@ -73,6 +73,10 @@ void Redis::run_blocking() {
73 73
74 // Handle exit events 74 // Handle exit events
75 ev_run(EV_DEFAULT_ EVRUN_NOWAIT); 75 ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
  76 +
  77 + // Let go for block_until_stopped method
  78 + unique_lock<mutex> ul(exit_waiter_lock);
  79 + exit_waiter.notify_one();
76 } 80 }
77 81
78 void Redis::run() { 82 void Redis::run() {
@@ -85,6 +89,11 @@ void Redis::stop() { @@ -85,6 +89,11 @@ void Redis::stop() {
85 to_exit = true; 89 to_exit = true;
86 } 90 }
87 91
  92 +void Redis::block_until_stopped() {
  93 + unique_lock<mutex> ul(exit_waiter_lock);
  94 + exit_waiter.wait(ul, [this]() { return to_exit.load(); });
  95 +}
  96 +
88 template<class ReplyT> 97 template<class ReplyT>
89 bool Redis::submit_to_server(const CommandAsync<ReplyT>* cmd_obj) { 98 bool Redis::submit_to_server(const CommandAsync<ReplyT>* cmd_obj) {
90 if (redisAsyncCommand(c, command_callback<ReplyT>, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { 99 if (redisAsyncCommand(c, command_callback<ReplyT>, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) {
src/redisx.hpp
@@ -9,6 +9,7 @@ @@ -9,6 +9,7 @@
9 9
10 #include <thread> 10 #include <thread>
11 #include <mutex> 11 #include <mutex>
  12 +#include <condition_variable>
12 #include <atomic> 13 #include <atomic>
13 14
14 #include <string> 15 #include <string>
@@ -17,6 +18,7 @@ @@ -17,6 +18,7 @@
17 18
18 #include <hiredis/hiredis.h> 19 #include <hiredis/hiredis.h>
19 #include <hiredis/async.h> 20 #include <hiredis/async.h>
  21 +#include <condition_variable>
20 22
21 namespace redisx { 23 namespace redisx {
22 24
@@ -47,6 +49,7 @@ public: @@ -47,6 +49,7 @@ public:
47 void run(); 49 void run();
48 void run_blocking(); 50 void run_blocking();
49 void stop(); 51 void stop();
  52 + void block_until_stopped();
50 53
51 template<class ReplyT> 54 template<class ReplyT>
52 void command( 55 void command(
@@ -84,6 +87,8 @@ private: @@ -84,6 +87,8 @@ private:
84 redisAsyncContext *c; 87 redisAsyncContext *c;
85 88
86 std::atomic_bool to_exit; 89 std::atomic_bool to_exit;
  90 + std::mutex exit_waiter_lock;
  91 + std::condition_variable exit_waiter;
87 92
88 std::thread event_loop_thread; 93 std::thread event_loop_thread;
89 94