Commit bf1b71983d53fc36dfdd2c88a1a9a5ba8bbfc145

Authored by Hayk Martirosyan
1 parent e3565000

Use libev asynchronous watcher for processing commands

Finally getting rid of the busy loop in the event thread. Now, we use
an ev_async watcher that processes commands in the queue, which we
can signal in a thread-safe manner using ev_async_send.
CMakeLists.txt
... ... @@ -32,11 +32,11 @@ set(LIB_ALL ${LIB_REDIS})
32 32 add_executable(basic examples/basic.cpp ${SRC_ALL})
33 33 target_link_libraries(basic ${LIB_REDIS})
34 34  
35   -add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL})
36   -target_link_libraries(basic_threaded ${LIB_REDIS})
  35 +#add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL})
  36 +#target_link_libraries(basic_threaded ${LIB_REDIS})
37 37  
38   -add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL})
39   -target_link_libraries(lpush_benchmark ${LIB_REDIS})
  38 +#add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL})
  39 +#target_link_libraries(lpush_benchmark ${LIB_REDIS})
40 40  
41 41 add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL})
42 42 target_link_libraries(speed_test_async ${LIB_REDIS})
... ... @@ -44,11 +44,11 @@ target_link_libraries(speed_test_async ${LIB_REDIS})
44 44 add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL})
45 45 target_link_libraries(speed_test_sync ${LIB_REDIS})
46 46  
47   -add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL})
48   -target_link_libraries(speed_test_async_multi ${LIB_REDIS})
  47 +#add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL})
  48 +#target_link_libraries(speed_test_async_multi ${LIB_REDIS})
49 49  
50   -add_executable(data_types examples/data_types.cpp ${SRC_ALL})
51   -target_link_libraries(data_types ${LIB_REDIS})
  50 +#add_executable(data_types examples/data_types.cpp ${SRC_ALL})
  51 +#target_link_libraries(data_types ${LIB_REDIS})
52 52  
53   -add_executable(string_v_char examples/string_vs_charp.cpp ${SRC_ALL})
54   -target_link_libraries(string_v_char ${LIB_REDIS})
  53 +#add_executable(string_v_char examples/string_vs_charp.cpp ${SRC_ALL})
  54 +#target_link_libraries(string_v_char ${LIB_REDIS})
... ...
src/redox.cpp
... ... @@ -94,17 +94,17 @@ void Redox::run_event_loop() {
94 94 connected_lock.lock();
95 95 connected_lock.unlock();
96 96  
  97 + // Set up asynchronous watcher which we signal every
  98 + // time we add a command
  99 + ev_async_init(&async_w, process_queued_commands);
  100 + ev_async_start(evloop, &async_w);
  101 +
97 102 running = true;
98 103 running_waiter.notify_one();
99 104  
100   - // Continuously create events and handle them
  105 + // Run the event loop
101 106 while (!to_exit) {
102   - process_queued_commands();
103 107 ev_run(evloop, EVRUN_NOWAIT);
104   -
105   - // Wait until notified, or check every 100 milliseconds
106   - unique_lock<mutex> ul(loop_waiter_lock);
107   - loop_waiter.wait_for(ul, chrono::milliseconds(100));
108 108 }
109 109  
110 110 cout << "[INFO] Stop signal detected." << endl;
... ... @@ -181,9 +181,6 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) {
181 181  
182 182 // Increment the Redox object command counter
183 183 rdx->cmd_count++;
184   -
185   - // Notify to check the event loop
186   - rdx->loop_waiter.notify_one();
187 184 }
188 185  
189 186 /**
... ... @@ -199,9 +196,6 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) {
199 196 return false;
200 197 }
201 198  
202   - // Notify to check the event loop
203   - c->rdx->loop_waiter.notify_one();
204   -
205 199 return true;
206 200 }
207 201  
... ... @@ -257,22 +251,24 @@ bool Redox::process_queued_command(long id) {
257 251 return true;
258 252 }
259 253  
260   -void Redox::process_queued_commands() {
  254 +void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int revents) {
  255 +
  256 + Redox* rdx = (Redox*) ev_userdata(loop);
261 257  
262   - lock_guard<mutex> lg(queue_guard);
  258 + lock_guard<mutex> lg(rdx->queue_guard);
263 259  
264   - while(!command_queue.empty()) {
  260 + while(!rdx->command_queue.empty()) {
265 261  
266   - long id = command_queue.front();
267   - command_queue.pop();
  262 + long id = rdx->command_queue.front();
  263 + rdx->command_queue.pop();
268 264  
269   - if(process_queued_command<redisReply*>(id)) {}
270   - else if(process_queued_command<string>(id)) {}
271   - else if(process_queued_command<char*>(id)) {}
272   - else if(process_queued_command<int>(id)) {}
273   - else if(process_queued_command<long long int>(id)) {}
274   - else if(process_queued_command<nullptr_t>(id)) {}
275   - else if(process_queued_command<vector<string>>(id)) {}
  265 + if(rdx->process_queued_command<redisReply*>(id)) {}
  266 + else if(rdx->process_queued_command<string>(id)) {}
  267 + else if(rdx->process_queued_command<char*>(id)) {}
  268 + else if(rdx->process_queued_command<int>(id)) {}
  269 + else if(rdx->process_queued_command<long long int>(id)) {}
  270 + else if(rdx->process_queued_command<nullptr_t>(id)) {}
  271 + else if(rdx->process_queued_command<vector<string>>(id)) {}
276 272 else throw runtime_error("[FATAL] Command pointer not found in any queue!");
277 273 }
278 274 }
... ...
src/redox.hpp
... ... @@ -166,6 +166,9 @@ private:
166 166 // Dynamically allocated libev event loop
167 167 struct ev_loop* evloop;
168 168  
  169 + // Asynchronous watcher (for processing commands)
  170 + ev_async async_w;
  171 +
169 172 // Number of commands processed
170 173 std::atomic_long cmd_count = {0};
171 174  
... ... @@ -187,10 +190,6 @@ private:
187 190 std::mutex exit_waiter_lock;
188 191 std::condition_variable exit_waiter;
189 192  
190   - // Condition variable to check the event loop
191   - std::condition_variable loop_waiter;
192   - std::mutex loop_waiter_lock;
193   -
194 193 // Maps of each Command, fetchable by the unique ID number
195 194 std::unordered_map<long, Command<redisReply*>*> commands_redis_reply;
196 195 std::unordered_map<long, Command<std::string>*> commands_string_r;
... ... @@ -211,7 +210,7 @@ private:
211 210  
212 211 std::queue<long> command_queue;
213 212 std::mutex queue_guard;
214   - void process_queued_commands();
  213 + static void process_queued_commands(struct ev_loop* loop, ev_async* async, int revents);
215 214  
216 215 template<class ReplyT>
217 216 bool process_queued_command(long id);
... ... @@ -258,6 +257,9 @@ Command&lt;ReplyT&gt;* Redox::command(
258 257 get_command_map<ReplyT>()[c->id] = c;
259 258 command_queue.push(c->id);
260 259  
  260 + // Signal the event loop to process this command
  261 + ev_async_send(evloop, &async_w);
  262 +
261 263 // std::cout << "[DEBUG] Created Command " << c->id << " at " << c << std::endl;
262 264  
263 265 return c;
... ...