From bf1b71983d53fc36dfdd2c88a1a9a5ba8bbfc145 Mon Sep 17 00:00:00 2001 From: Hayk Martirosyan Date: Sun, 11 Jan 2015 00:17:40 -0800 Subject: [PATCH] Use libev asynchronous watcher for processing commands --- CMakeLists.txt | 20 ++++++++++---------- src/redox.cpp | 44 ++++++++++++++++++++------------------------ src/redox.hpp | 12 +++++++----- 3 files changed, 37 insertions(+), 39 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 62fc5e4..8c25593 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,11 +32,11 @@ set(LIB_ALL ${LIB_REDIS}) add_executable(basic examples/basic.cpp ${SRC_ALL}) target_link_libraries(basic ${LIB_REDIS}) -add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL}) -target_link_libraries(basic_threaded ${LIB_REDIS}) +#add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL}) +#target_link_libraries(basic_threaded ${LIB_REDIS}) -add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL}) -target_link_libraries(lpush_benchmark ${LIB_REDIS}) +#add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL}) +#target_link_libraries(lpush_benchmark ${LIB_REDIS}) add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL}) target_link_libraries(speed_test_async ${LIB_REDIS}) @@ -44,11 +44,11 @@ target_link_libraries(speed_test_async ${LIB_REDIS}) add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL}) target_link_libraries(speed_test_sync ${LIB_REDIS}) -add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL}) -target_link_libraries(speed_test_async_multi ${LIB_REDIS}) +#add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL}) +#target_link_libraries(speed_test_async_multi ${LIB_REDIS}) -add_executable(data_types examples/data_types.cpp ${SRC_ALL}) -target_link_libraries(data_types ${LIB_REDIS}) +#add_executable(data_types examples/data_types.cpp ${SRC_ALL}) +#target_link_libraries(data_types ${LIB_REDIS}) -add_executable(string_v_char examples/string_vs_charp.cpp ${SRC_ALL}) -target_link_libraries(string_v_char ${LIB_REDIS}) +#add_executable(string_v_char examples/string_vs_charp.cpp ${SRC_ALL}) +#target_link_libraries(string_v_char ${LIB_REDIS}) diff --git a/src/redox.cpp b/src/redox.cpp index ae0431d..cd06522 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -94,17 +94,17 @@ void Redox::run_event_loop() { connected_lock.lock(); connected_lock.unlock(); + // Set up asynchronous watcher which we signal every + // time we add a command + ev_async_init(&async_w, process_queued_commands); + ev_async_start(evloop, &async_w); + running = true; running_waiter.notify_one(); - // Continuously create events and handle them + // Run the event loop while (!to_exit) { - process_queued_commands(); ev_run(evloop, EVRUN_NOWAIT); - - // Wait until notified, or check every 100 milliseconds - unique_lock ul(loop_waiter_lock); - loop_waiter.wait_for(ul, chrono::milliseconds(100)); } cout << "[INFO] Stop signal detected." << endl; @@ -181,9 +181,6 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { // Increment the Redox object command counter rdx->cmd_count++; - - // Notify to check the event loop - rdx->loop_waiter.notify_one(); } /** @@ -199,9 +196,6 @@ bool Redox::submit_to_server(Command* c) { return false; } - // Notify to check the event loop - c->rdx->loop_waiter.notify_one(); - return true; } @@ -257,22 +251,24 @@ bool Redox::process_queued_command(long id) { return true; } -void Redox::process_queued_commands() { +void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int revents) { + + Redox* rdx = (Redox*) ev_userdata(loop); - lock_guard lg(queue_guard); + lock_guard lg(rdx->queue_guard); - while(!command_queue.empty()) { + while(!rdx->command_queue.empty()) { - long id = command_queue.front(); - command_queue.pop(); + long id = rdx->command_queue.front(); + rdx->command_queue.pop(); - if(process_queued_command(id)) {} - else if(process_queued_command(id)) {} - else if(process_queued_command(id)) {} - else if(process_queued_command(id)) {} - else if(process_queued_command(id)) {} - else if(process_queued_command(id)) {} - else if(process_queued_command>(id)) {} + if(rdx->process_queued_command(id)) {} + else if(rdx->process_queued_command(id)) {} + else if(rdx->process_queued_command(id)) {} + else if(rdx->process_queued_command(id)) {} + else if(rdx->process_queued_command(id)) {} + else if(rdx->process_queued_command(id)) {} + else if(rdx->process_queued_command>(id)) {} else throw runtime_error("[FATAL] Command pointer not found in any queue!"); } } diff --git a/src/redox.hpp b/src/redox.hpp index 01771c3..4d713a8 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -166,6 +166,9 @@ private: // Dynamically allocated libev event loop struct ev_loop* evloop; + // Asynchronous watcher (for processing commands) + ev_async async_w; + // Number of commands processed std::atomic_long cmd_count = {0}; @@ -187,10 +190,6 @@ private: std::mutex exit_waiter_lock; std::condition_variable exit_waiter; - // Condition variable to check the event loop - std::condition_variable loop_waiter; - std::mutex loop_waiter_lock; - // Maps of each Command, fetchable by the unique ID number std::unordered_map*> commands_redis_reply; std::unordered_map*> commands_string_r; @@ -211,7 +210,7 @@ private: std::queue command_queue; std::mutex queue_guard; - void process_queued_commands(); + static void process_queued_commands(struct ev_loop* loop, ev_async* async, int revents); template bool process_queued_command(long id); @@ -258,6 +257,9 @@ Command* Redox::command( get_command_map()[c->id] = c; command_queue.push(c->id); + // Signal the event loop to process this command + ev_async_send(evloop, &async_w); + // std::cout << "[DEBUG] Created Command " << c->id << " at " << c << std::endl; return c; -- libgit2 0.21.4