From 87ff8a3ebb58630143841b2927cd28beca4d1b3f Mon Sep 17 00:00:00 2001 From: Hayk Martirosyan Date: Thu, 25 Dec 2014 02:23:11 -0500 Subject: [PATCH] Improved structure, stop handling, cleanups --- CMakeLists.txt | 12 +++++++++--- examples/basic_threaded.cpp | 41 +++++++++++++++++++++++++++++++++++++++++ examples/lpush_benchmark.cpp | 48 ++++++++++++------------------------------------ examples/progressive.cpp | 34 ++++++++++++++++++++++++++++++++++ src/redisx.cpp | 143 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------- src/redisx.hpp | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------- 6 files changed, 216 insertions(+), 147 deletions(-) create mode 100644 examples/basic_threaded.cpp create mode 100644 examples/progressive.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d3424f2..9cdc60b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,8 @@ cmake_minimum_required(VERSION 2.8.4) project(redisx) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3")#-g -fno-omit-frame-pointer") +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer") # set(CMAKE_VERBOSE_MAKEFILE ON) # --------------------------------------------------------- @@ -27,9 +28,14 @@ set(LIB_ALL ${LIB_REDIS}) # Examples # --------------------------------------------------------- -add_executable(basic examples/basic.cpp ${SRC_ALL}) -target_link_libraries(basic ${LIB_REDIS}) +#add_executable(basic examples/basic.cpp ${SRC_ALL}) +#target_link_libraries(basic ${LIB_REDIS}) +#add_executable(progressive examples/progressive.cpp ${SRC_ALL}) +#target_link_libraries(progressive ${LIB_REDIS}) + +add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL}) +target_link_libraries(basic_threaded ${LIB_REDIS}) add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL}) target_link_libraries(lpush_benchmark ${LIB_REDIS}) diff --git a/examples/basic_threaded.cpp b/examples/basic_threaded.cpp new file mode 100644 index 0000000..e03abe3 --- /dev/null +++ b/examples/basic_threaded.cpp @@ -0,0 +1,41 @@ +/** +* Basic asynchronous calls using redisx. +*/ + +#include +#include +#include +#include "../src/redisx.hpp" + +using namespace std; + +redisx::Redis rdx = {"localhost", 6379}; + +int main(int argc, char* argv[]) { + + rdx.run(); + + thread setter([]() { + while(true) { + rdx.command("INCR counter"); + this_thread::sleep_for(chrono::milliseconds(1)); + } + }); + + thread getter([]() { + while(true) { + rdx.command( + "GET counter", + [](const string& cmd, const string& value) { + cout << cmd << ": " << value << endl; + } + ); + this_thread::sleep_for(chrono::milliseconds(1000)); + } + }); + + setter.join(); + getter.join(); + + return 0; +}; diff --git a/examples/lpush_benchmark.cpp b/examples/lpush_benchmark.cpp index 6742273..99d17a2 100644 --- a/examples/lpush_benchmark.cpp +++ b/examples/lpush_benchmark.cpp @@ -3,13 +3,11 @@ */ #include +#include #include "../src/redisx.hpp" using namespace std; -static const string REDIS_HOST = "localhost"; -static const int REDIS_PORT = 6379; - unsigned long time_ms() { return chrono::system_clock::now().time_since_epoch() /chrono::milliseconds(1); @@ -17,45 +15,21 @@ unsigned long time_ms() { int main(int argc, char* argv[]) { - redisx::Redis r = {REDIS_HOST, REDIS_PORT}; - r.run(); - - r.command("GET blah", [](const string& cmd, const string& value) { - cout << "[COMMAND] " << cmd << ": " << value << endl; - }); - - r.command("GET blah", [](const string& cmd, const char* value) { - cout << "[COMMAND] " << cmd << ": " << value << endl; - }); - -// r.command("LPUSH yahoo 1 2 3 4 f w", [](const string& cmd, const redisReply* reply) { -// cout << "[COMMAND] " << cmd << ": " << reply->integer << endl; -// }); + redisx::Redis rdx = {"localhost", 6379}; + rdx.run(); - - r.get("blahqwefwqefef", [](const string& cmd, const char* value) { - cout << "[GET] blah: " << value << endl; - }); -// -// r.set("name", "lolfewef"); -// -// r.command("SET blah wefoijewfojiwef"); -// -// r.del("name"); -// r.del("wefoipjweojiqw", [](const string& cmd, long long int num_deleted) { -// cout << "num deleted: " << num_deleted << endl; -// }); - -// r.command_loop("LPUSH count 1", 0, 1000); + rdx.command("DEL test"); unsigned long t0 = time_ms(); unsigned long t1 = t0; - int len = 10000000; + int len = 1000000; int count = 0; + mutex task_lock; + task_lock.lock(); for(int i = 1; i <= len; i++) { - r.command("lpush test 1", [&t0, &t1, &count, len](const string& cmd, int reply) { + rdx.command("lpush test 1", [&t0, &t1, &count, len, &task_lock](const string& cmd, int reply) { count++; if(count == len) { @@ -65,13 +39,15 @@ int main(int argc, char* argv[]) { cout << "Time to queue async commands: " << t1 - t0 << "ms" << endl; cout << "Time to receive all: " << t2 - t1 << "ms" << endl; cout << "Total time: " << t2 - t0 << "ms" << endl; + + task_lock.unlock(); } }); } t1 = time_ms(); - //r.start(); - while(true) {} + task_lock.lock(); + rdx.stop(); return 0; }; diff --git a/examples/progressive.cpp b/examples/progressive.cpp new file mode 100644 index 0000000..a7dce38 --- /dev/null +++ b/examples/progressive.cpp @@ -0,0 +1,34 @@ +/** +* Basic asynchronous calls using redisx. +*/ + +#include +#include "../src/redisx.hpp" + +using namespace std; + +redisx::Redis rdx = {"localhost", 6379}; + +void print_key(const string& key) { + rdx.command("GET " + key, [key](const string& cmd, const string& value) { + cout << "[GET] " << key << ": \"" << value << '\"' << endl; + }); +} + +void set_key(const string& key, const string& value) { + string cmd_str = "SET " + key + " " + value; + rdx.command(cmd_str, [key, value](const string& cmd, const string& reply) { + cout << "[SET] " << key << ": \"" << value << '\"' << endl; + }); +} + +int main(int argc, char* argv[]) { + + set_key("name", "Bob"); + print_key("name"); + set_key("name", "Steve"); + print_key("name"); + + rdx.run_blocking(); + return 0; +}; diff --git a/src/redisx.cpp b/src/redisx.cpp index 133f2ba..03f9edf 100644 --- a/src/redisx.cpp +++ b/src/redisx.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include "redisx.hpp" using namespace std; @@ -36,17 +35,13 @@ void disconnected(const redisAsyncContext *c, int status) { connected_lock.lock(); } -Redis::Redis(const string& host, const int port) : host(host), port(port), io_ops(0) { +Redis::Redis(const string& host, const int port) + : host(host), port(port), io_ops(0), to_exit(false) { -// evthread_use_pthreads(); -// evthread_enable_lock_debuging(); -// event_enable_debug_mode(); - - lock_guard lg(evlock); + lock_guard lg(queue_guard); connected_lock.lock(); signal(SIGPIPE, SIG_IGN); -// base = event_base_new(); c = redisAsyncConnect(host.c_str(), port); if (c->err) { @@ -55,38 +50,49 @@ Redis::Redis(const string& host, const int port) : host(host), port(port), io_op } redisLibevAttach(EV_DEFAULT_ c); -// redisLibeventAttach(c, base); redisAsyncSetConnectCallback(c, connected); redisAsyncSetDisconnectCallback(c, disconnected); } Redis::~Redis() { redisAsyncDisconnect(c); + stop(); } -void Redis::run() { +void Redis::run_blocking() { - event_loop_thread = thread([this] { + // Events to connect to Redis + ev_run(EV_DEFAULT_ EVRUN_NOWAIT); + lock_guard lg(connected_lock); + + // Continuously create events and handle them + while (!to_exit) { + process_queued_commands(); ev_run(EV_DEFAULT_ EVRUN_NOWAIT); - connected_lock.lock(); + } - while (true) { - process_queued_commands(); - ev_run(EV_DEFAULT_ EVRUN_NOWAIT); - } - }); - event_loop_thread.detach(); + // Handle exit events + ev_run(EV_DEFAULT_ EVRUN_NOWAIT); } -void Redis::run_blocking() { +void Redis::run() { - ev_run(EV_DEFAULT_ EVRUN_NOWAIT); - connected_lock.lock(); + event_loop_thread = thread([this] { run_blocking(); }); + event_loop_thread.detach(); +} - while (true) { - process_queued_commands(); - ev_run(EV_DEFAULT_ EVRUN_NOWAIT); +void Redis::stop() { + to_exit = true; +} + +template +bool Redis::submit_to_server(const CommandAsync* cmd_obj) { + if (redisAsyncCommand(c, command_callback, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { + cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << c->errstr << endl; + delete cmd_obj; + return false; } + return true; } template @@ -99,16 +105,14 @@ bool Redis::process_queued_command(void* cmd_ptr) { CommandAsync* cmd_obj = it->second; command_map.erase(cmd_ptr); - if (redisAsyncCommand(c, command_callback, cmd_ptr, cmd_obj->cmd.c_str()) != REDIS_OK) { - cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << c->errstr << endl; - delete cmd_obj; - } + submit_to_server(cmd_obj); return true; } void Redis::process_queued_commands() { - lock_guard lg(evlock); + + lock_guard lg(queue_guard); while(!command_queue.empty()) { @@ -126,21 +130,6 @@ void Redis::process_queued_commands() { // ---------------------------- -// TODO update -void Redis::command(const char* cmd) { - - evlock.lock(); - int status = redisAsyncCommand(c, NULL, NULL, cmd); - evlock.unlock(); - - if (status != REDIS_OK) { - cerr << "[ERROR] Async command \"" << cmd << "\": " << c->errstr << endl; - return; - } -} - -// ---------------------------- - template<> unordered_map*>& Redis::get_command_map() { return commands_redis_reply; } template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { @@ -189,36 +178,42 @@ void invoke_callback(const CommandAsync* cmd_obj, redisReply* rep } // ---------------------------- +// Helpers +// ---------------------------- -void Redis::get(const char* key, function callback) { - string cmd = string("GET ") + key; - command(cmd.c_str(), callback); -} - -void Redis::set(const char* key, const char* value) { - string cmd = string("SET ") + key + " " + value; - command(cmd.c_str(), [](const string& command, const char* reply) { - if(strcmp(reply, "OK")) - cerr << "[ERROR] " << command << ": SET failed with reply " << reply << endl; - }); -} - -void Redis::set(const char* key, const char* value, function callback) { - string cmd = string("SET ") + key + " " + value; - command(cmd.c_str(), callback); -} - -void Redis::del(const char* key) { - string cmd = string("DEL ") + key; - command(cmd.c_str(), [](const string& command, long long int num_deleted) { - if(num_deleted != 1) - cerr << "[ERROR] " << command << ": Deleted " << num_deleted << " keys." << endl; - }); -} - -void Redis::del(const char* key, function callback) { - string cmd = string("DEL ") + key; - command(cmd.c_str(), callback); -} +void Redis::command(const char* cmd) { + command(cmd, NULL); +} + +//void Redis::get(const char* key, function callback) { +// string cmd = string("GET ") + key; +// command(cmd.c_str(), callback); +//} +// +//void Redis::set(const char* key, const char* value) { +// string cmd = string("SET ") + key + " " + value; +// command(cmd.c_str(), [](const string& command, const char* reply) { +// if(strcmp(reply, "OK")) +// cerr << "[ERROR] " << command << ": SET failed with reply " << reply << endl; +// }); +//} +// +//void Redis::set(const char* key, const char* value, function callback) { +// string cmd = string("SET ") + key + " " + value; +// command(cmd.c_str(), callback); +//} +// +//void Redis::del(const char* key) { +// string cmd = string("DEL ") + key; +// command(cmd.c_str(), [](const string& command, long long int num_deleted) { +// if(num_deleted != 1) +// cerr << "[ERROR] " << command << ": Deleted " << num_deleted << " keys." << endl; +// }); +//} +// +//void Redis::del(const char* key, function callback) { +// string cmd = string("DEL ") + key; +// command(cmd.c_str(), callback); +//} } // End namespace redis diff --git a/src/redisx.hpp b/src/redisx.hpp index 14d7e1e..e8886de 100644 --- a/src/redisx.hpp +++ b/src/redisx.hpp @@ -4,35 +4,36 @@ #pragma once -#include -#include #include -#include -#include +#include + +#include #include +#include + +#include #include -#include #include -#include -namespace redisx { +#include +#include -class CommandAsyncGeneric { -public: - redisCallbackFn* fn; - void* privdata; - const char* cmd; - CommandAsyncGeneric(redisCallbackFn* fn, void* privdata, const char* cmd) - : fn(fn), privdata(privdata), cmd(cmd) {} -}; +namespace redisx { template class CommandAsync { public: - CommandAsync(const std::string& cmd, const std::function& callback) - : cmd(cmd), callback(callback) {} + CommandAsync( + const std::string& cmd, + const std::function& callback, + double repeat, double after + ) : cmd(cmd), callback(callback), repeat(repeat), after(after) {} + const std::string cmd; const std::function callback; + double repeat; + double after; + void invoke(ReplyT reply) const {if(callback != NULL) callback(cmd, reply); } }; @@ -45,24 +46,27 @@ public: void run(); void run_blocking(); + void stop(); template void command( const std::string& cmd, - const std::function& callback + const std::function& callback = NULL, + double repeat = 0.0, + double after = 0.0 ); void command(const char* command); // struct event* command_loop(const char* command, long interval_s, long interval_us); - void get(const char* key, std::function callback); - - void set(const char* key, const char* value); - void set(const char* key, const char* value, std::function callback); - - void del(const char* key); - void del(const char* key, std::function callback); +// void get(const char* key, std::function callback); +// +// void set(const char* key, const char* value); +// void set(const char* key, const char* value, std::function callback); +// +// void del(const char* key); +// void del(const char* key, std::function callback); // void publish(std::string channel, std::string msg); // void subscribe(std::string channel, std::function callback); @@ -77,27 +81,34 @@ private: // Number of IOs performed long io_ops; - struct event_base *base; redisAsyncContext *c; - std::mutex evlock; + std::atomic_bool to_exit; std::thread event_loop_thread; - template - std::unordered_map*>& get_command_map(); - std::unordered_map*> commands_redis_reply; std::unordered_map*> commands_string_r; std::unordered_map*> commands_char_p; std::unordered_map*> commands_int; std::unordered_map*> commands_long_long_int; + template + std::unordered_map*>& get_command_map(); + std::queue command_queue; + std::mutex queue_guard; void process_queued_commands(); template bool process_queued_command(void* cmd_ptr); + + /** + * Submit an asynchronous command to the Redis server. Return + * true if succeeded, false otherwise. + */ + template + bool submit_to_server(const CommandAsync* cmd_obj); }; // --------------------------- @@ -131,12 +142,18 @@ void command_callback(redisAsyncContext *c, void *r, void *privdata) { } template -void Redis::command(const std::string& cmd, const std::function& callback) { - - std::lock_guard lg(evlock); - auto* cmd_obj = new CommandAsync(cmd, callback); +void Redis::command( + const std::string& cmd, + const std::function& callback, + double repeat, + double after +) { + + std::lock_guard lg(queue_guard); + auto* cmd_obj = new CommandAsync(cmd, callback, repeat, after); get_command_map()[(void*)cmd_obj] = cmd_obj; command_queue.push((void*)cmd_obj); } + } // End namespace redis -- libgit2 0.21.4