diff --git a/examples/lpush_benchmark.cpp b/examples/lpush_benchmark.cpp index 99d17a2..3d20af7 100644 --- a/examples/lpush_benchmark.cpp +++ b/examples/lpush_benchmark.cpp @@ -3,7 +3,6 @@ */ #include -#include #include "../src/redisx.hpp" using namespace std; @@ -18,6 +17,7 @@ int main(int argc, char* argv[]) { redisx::Redis rdx = {"localhost", 6379}; rdx.run(); + // TODO wait for this somehow rdx.command("DEL test"); unsigned long t0 = time_ms(); @@ -25,11 +25,9 @@ int main(int argc, char* argv[]) { int len = 1000000; int count = 0; - mutex task_lock; - task_lock.lock(); for(int i = 1; i <= len; i++) { - rdx.command("lpush test 1", [&t0, &t1, &count, len, &task_lock](const string& cmd, int reply) { + rdx.command("lpush test 1", [&t0, &t1, &count, len, &rdx](const string& cmd, int reply) { count++; if(count == len) { @@ -40,14 +38,12 @@ int main(int argc, char* argv[]) { cout << "Time to receive all: " << t2 - t1 << "ms" << endl; cout << "Total time: " << t2 - t0 << "ms" << endl; - task_lock.unlock(); + rdx.stop(); } }); } t1 = time_ms(); - task_lock.lock(); - rdx.stop(); - + rdx.block_until_stopped(); return 0; }; diff --git a/src/redisx.cpp b/src/redisx.cpp index 03f9edf..180b34a 100644 --- a/src/redisx.cpp +++ b/src/redisx.cpp @@ -73,6 +73,10 @@ void Redis::run_blocking() { // Handle exit events ev_run(EV_DEFAULT_ EVRUN_NOWAIT); + + // Let go for block_until_stopped method + unique_lock ul(exit_waiter_lock); + exit_waiter.notify_one(); } void Redis::run() { @@ -85,6 +89,11 @@ void Redis::stop() { to_exit = true; } +void Redis::block_until_stopped() { + unique_lock ul(exit_waiter_lock); + exit_waiter.wait(ul, [this]() { return to_exit.load(); }); +} + template bool Redis::submit_to_server(const CommandAsync* cmd_obj) { if (redisAsyncCommand(c, command_callback, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { diff --git a/src/redisx.hpp b/src/redisx.hpp index e8886de..8e7af95 100644 --- a/src/redisx.hpp +++ b/src/redisx.hpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -17,6 +18,7 @@ #include #include +#include namespace redisx { @@ -47,6 +49,7 @@ public: void run(); void run_blocking(); void stop(); + void block_until_stopped(); template void command( @@ -84,6 +87,8 @@ private: redisAsyncContext *c; std::atomic_bool to_exit; + std::mutex exit_waiter_lock; + std::condition_variable exit_waiter; std::thread event_loop_thread;