From 2f207093573a7257c44ffdbfbe70f3e596de254c Mon Sep 17 00:00:00 2001 From: Hayk Martirosyan Date: Wed, 24 Dec 2014 04:17:23 -0500 Subject: [PATCH] Fixed two major issues, the event loop and memory management --- CMakeLists.txt | 4 ++-- examples/basic.cpp | 69 ++++++++++++++++++++++++++++++++++++--------------------------------- src/redisx.cpp | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------- src/redisx.hpp | 67 ++++++++++++++++++++++++++++++++++++++++++++++++------------------- 4 files changed, 155 insertions(+), 85 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e36451a..da4cd10 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 2.8.4) project(redisx) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer") # set(CMAKE_VERBOSE_MAKEFILE ON) # --------------------------------------------------------- @@ -20,7 +20,7 @@ set(SRC_ALL ${SRC_CORE}) # Libraries # --------------------------------------------------------- -set(LIB_REDIS hiredis event) +set(LIB_REDIS hiredis ev pthread) set(LIB_ALL ${LIB_REDIS}) # --------------------------------------------------------- diff --git a/examples/basic.cpp b/examples/basic.cpp index 4f165dd..c8e3a86 100644 --- a/examples/basic.cpp +++ b/examples/basic.cpp @@ -19,6 +19,7 @@ unsigned long time_ms() { int main(int argc, char* argv[]) { redisx::Redis r = {REDIS_HOST, REDIS_PORT}; + r.start(); r.command("GET blah", [](const string& cmd, const string& value) { cout << "[COMMAND] " << cmd << ": " << value << endl; @@ -28,48 +29,50 @@ int main(int argc, char* argv[]) { cout << "[COMMAND] " << cmd << ": " << value << endl; }); - r.command("LPUSH yahoo 1 2 3 4 f w", [](const string& cmd, const redisReply* reply) { - cout << "[COMMAND] " << cmd << ": " << reply->integer << endl; - }); +// r.command("LPUSH yahoo 1 2 3 4 f w", [](const string& cmd, const redisReply* reply) { +// cout << "[COMMAND] " << cmd << ": " << reply->integer << endl; +// }); + r.get("blahqwefwqefef", [](const string& cmd, const char* value) { cout << "[GET] blah: " << value << endl; }); +// +// r.set("name", "lolfewef"); +// +// r.command("SET blah wefoijewfojiwef"); +// +// r.del("name"); +// r.del("wefoipjweojiqw", [](const string& cmd, long long int num_deleted) { +// cout << "num deleted: " << num_deleted << endl; +// }); - r.set("name", "lolfewef"); +// r.command_loop("LPUSH count 1", 0, 1000); - r.command("SET blah wefoijewfojiwef"); + unsigned long t0 = time_ms(); + unsigned long t1 = t0; - r.del("name"); - r.del("wefoipjweojiqw", [](const string& cmd, long long int num_deleted) { - cout << "num deleted: " << num_deleted << endl; - }); + int len = 1000000; + int count = 0; - r.command_loop("LPUSH count 1", 0, 1); - r.command_loop("LPUSH count2 1", 0, 1); + for(int i = 0; i < len; i++) { + r.command("lpush test 1", [&t0, &t1, &count, len](const string& cmd, int reply) { -// unsigned long t0 = time_ms(); -// unsigned long t1 = t0; -// -// int len = 1000000; -// int count = 0; -// -// for(int i = 0; i < len; i++) { -// r.command("set blah wefoiwef", [&t0, &t1, &count, len](const string& cmd, const string& reply) { -// -// count++; -// if(count == len) { -// cout << cmd << ": " << reply << endl; -// cout << "Time to queue async commands: " << t1 - t0 << "ms" << endl; -// cout << "Time to receive all: " << time_ms() - t1 << "ms" << endl; -// cout << "Total time: " << time_ms() - t0 << "ms" << endl; -// } -// }); -// } -// t1 = time_ms(); - - thread loop([&r] { r.start(); }); - loop.join(); + count++; + if(count == len) { + cout << cmd << ": " << reply << endl; + + unsigned long t2 = time_ms(); + cout << "Time to queue async commands: " << t1 - t0 << "ms" << endl; + cout << "Time to receive all: " << t2 - t1 << "ms" << endl; + cout << "Total time: " << t2 - t0 << "ms" << endl; + } + }); + } + t1 = time_ms(); + + //r.start(); + while(true) {} return 0; }; diff --git a/src/redisx.cpp b/src/redisx.cpp index 6d71a44..bdaa99d 100644 --- a/src/redisx.cpp +++ b/src/redisx.cpp @@ -5,7 +5,9 @@ #include #include #include -#include +#include +#include +#include #include #include #include "redisx.hpp" @@ -14,12 +16,15 @@ using namespace std; namespace redisx { +mutex connected_lock; + void connected(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); return; } printf("Connected...\n"); + connected_lock.unlock(); } void disconnected(const redisAsyncContext *c, int status) { @@ -28,12 +33,20 @@ void disconnected(const redisAsyncContext *c, int status) { return; } printf("Disconnected...\n"); + connected_lock.lock(); } Redis::Redis(const string& host, const int port) : host(host), port(port), io_ops(0) { +// evthread_use_pthreads(); +// evthread_enable_lock_debuging(); +// event_enable_debug_mode(); + + lock_guard lg(evlock); + connected_lock.lock(); + signal(SIGPIPE, SIG_IGN); - base = event_base_new(); +// base = event_base_new(); c = redisAsyncConnect(host.c_str(), port); if (c->err) { @@ -41,7 +54,8 @@ Redis::Redis(const string& host, const int port) : host(host), port(port), io_op return; } - redisLibeventAttach(c, base); + redisLibevAttach(EV_DEFAULT_ c); +// redisLibeventAttach(c, base); redisAsyncSetConnectCallback(c, connected); redisAsyncSetDisconnectCallback(c, disconnected); } @@ -51,57 +65,78 @@ Redis::~Redis() { } void Redis::start() { - event_base_dispatch(base); + + event_loop_thread = thread([this] { + ev_run(EV_DEFAULT_ EVRUN_NOWAIT); + connected_lock.lock(); + + while (true) { + process_queued_commands(); + ev_run(EV_DEFAULT_ EVRUN_NOWAIT); + } + }); + event_loop_thread.detach(); } +template +bool Redis::process_queued_command(void* cmd_ptr) { -// ---------------------------- + auto& command_map = get_command_map(); -void Redis::command(const char* cmd) { - int status = redisAsyncCommand(c, NULL, NULL, cmd); - if (status != REDIS_OK) { - cerr << "[ERROR] Async command \"" << cmd << "\": " << c->errstr << endl; - return; + auto it = command_map.find(cmd_ptr); + if(it == command_map.end()) return false; + CommandAsync* cmd_obj = it->second; + command_map.erase(cmd_ptr); + + if (redisAsyncCommand(c, command_callback, cmd_ptr, cmd_obj->cmd.c_str()) != REDIS_OK) { + cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << c->errstr << endl; + delete cmd_obj; } + + return true; } -// ---------------------------- +void Redis::process_queued_commands() { + lock_guard lg(evlock); + + while(!command_queue.empty()) { -void e_callback(evutil_socket_t fd, short what, void *arg) { + void* cmd_ptr = command_queue.front(); + if(process_queued_command(cmd_ptr)) {} + else if(process_queued_command(cmd_ptr)) {} + else if(process_queued_command(cmd_ptr)) {} + else if(process_queued_command(cmd_ptr)) {} + else if(process_queued_command(cmd_ptr)) {} + else throw runtime_error("[FATAL] Command pointer not found in any queue!"); - const char* cmd = "LPUSH count 1"; + command_queue.pop(); + } +} - redisAsyncContext* c = (redisAsyncContext*)arg; +// ---------------------------- + +// TODO update +void Redis::command(const char* cmd) { + evlock.lock(); int status = redisAsyncCommand(c, NULL, NULL, cmd); + evlock.unlock(); + if (status != REDIS_OK) { cerr << "[ERROR] Async command \"" << cmd << "\": " << c->errstr << endl; return; } } -struct event* Redis::command_loop(const char* cmd, long interval_s, long interval_us) { - - struct event* e; - if(interval_s == 0 && interval_us == 0) { - e = event_new(base, -1, EV_PERSIST, e_callback, c); - event_add(e, NULL); - } else { - struct timeval e_time = {interval_s, interval_us}; - e = event_new(base, -1, EV_TIMEOUT | EV_PERSIST, e_callback, c); - event_add(e, &e_time); - } - - return e; -} - // ---------------------------- +template<> unordered_map*>& Redis::get_command_map() { return commands_redis_reply; } template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { cmd_obj->invoke(reply); } +template<> unordered_map*>& Redis::get_command_map() { return commands_string_r; } template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { @@ -112,6 +147,7 @@ void invoke_callback(const CommandAsync* cmd_obj, redisReply* rep cmd_obj->invoke(reply->str); } +template<> unordered_map*>& Redis::get_command_map() { return commands_char_p; } template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { @@ -121,6 +157,7 @@ void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply cmd_obj->invoke(reply->str); } +template<> unordered_map*>& Redis::get_command_map() { return commands_int; } template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_INTEGER) { @@ -130,6 +167,7 @@ void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { cmd_obj->invoke((int)reply->integer); } +template<> unordered_map*>& Redis::get_command_map() { return commands_long_long_int; } template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_INTEGER) { @@ -154,7 +192,7 @@ void Redis::set(const char* key, const char* value) { }); } -void Redis::set(const char* key, const char* value, std::function callback) { +void Redis::set(const char* key, const char* value, function callback) { string cmd = string("SET ") + key + " " + value; command(cmd.c_str(), callback); } @@ -167,7 +205,7 @@ void Redis::del(const char* key) { }); } -void Redis::del(const char* key, std::function callback) { +void Redis::del(const char* key, function callback) { string cmd = string("DEL ") + key; command(cmd.c_str(), callback); } diff --git a/src/redisx.hpp b/src/redisx.hpp index 951bbbb..3af590b 100644 --- a/src/redisx.hpp +++ b/src/redisx.hpp @@ -9,9 +9,32 @@ #include #include #include +#include +#include +#include +#include namespace redisx { +class CommandAsyncGeneric { +public: + redisCallbackFn* fn; + void* privdata; + const char* cmd; + CommandAsyncGeneric(redisCallbackFn* fn, void* privdata, const char* cmd) + : fn(fn), privdata(privdata), cmd(cmd) {} +}; + +template +class CommandAsync { +public: + CommandAsync(const std::string& cmd, const std::function& callback) + : cmd(cmd), callback(callback) {} + const std::string cmd; + const std::function callback; + void invoke(ReplyT reply) const {if(callback != NULL) callback(cmd, reply); } +}; + class Redis { public: @@ -29,7 +52,7 @@ public: void command(const char* command); - struct event* command_loop(const char* command, long interval_s, long interval_us); +// struct event* command_loop(const char* command, long interval_s, long interval_us); void get(const char* key, std::function callback); @@ -54,18 +77,29 @@ private: struct event_base *base; redisAsyncContext *c; -}; -template -class CommandAsync { -public: - CommandAsync(const std::string& cmd, const std::function& callback) - : cmd(cmd), callback(callback) {} - const std::string cmd; - const std::function callback; - void invoke(ReplyT reply) const {if(callback != NULL) callback(cmd, reply); } + std::mutex evlock; + + std::thread event_loop_thread; + + template + std::unordered_map*>& get_command_map(); + + std::unordered_map*> commands_redis_reply; + std::unordered_map*> commands_string_r; + std::unordered_map*> commands_char_p; + std::unordered_map*> commands_int; + std::unordered_map*> commands_long_long_int; + + std::queue command_queue; + void process_queued_commands(); + + template + bool process_queued_command(void* cmd_ptr); }; +// --------------------------- + template void invoke_callback( const CommandAsync* cmd_obj, @@ -97,15 +131,10 @@ void command_callback(redisAsyncContext *c, void *r, void *privdata) { template void Redis::command(const std::string& cmd, const std::function& callback) { - auto *cmd_obj = new CommandAsync(cmd, callback); - - int status = redisAsyncCommand(c, command_callback, (void*)cmd_obj, cmd.c_str()); - if (status != REDIS_OK) { - std::cerr << "[ERROR] Async command \"" << cmd << "\": " << c->errstr << std::endl; - delete cmd_obj; - return; - } + std::lock_guard lg(evlock); + auto* cmd_obj = new CommandAsync(cmd, callback); + get_command_map()[(void*)cmd_obj] = cmd_obj; + command_queue.push((void*)cmd_obj); } - } // End namespace redis -- libgit2 0.21.4