diff --git a/README.md b/README.md index 5bbbc6c..a2f950b 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,6 @@ redisx ====== Asynchronous, minimalistic, and highly effective C++11 bindings for Redis +Asynchronous, minimalistic, and wicked fast C++11 bindings for Redis + +Work in progress, details coming soon. diff --git a/examples/simple_loop.cpp b/examples/simple_loop.cpp index 69d3ab2..128c1e3 100644 --- a/examples/simple_loop.cpp +++ b/examples/simple_loop.cpp @@ -8,8 +8,8 @@ using namespace std; double time_s() { - unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::milliseconds(1); - return (double)ms / 1000; + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); + return (double)ms / 1e6; } int main(int argc, char* argv[]) { @@ -20,28 +20,31 @@ int main(int argc, char* argv[]) { string cmd_str = "SET alaska rules!"; double freq = 10000; // Hz - double t_end = 5; - - double dt = 1 / freq; - double t0 = time_s(); - int count = 0; + double dt = 1 / freq; // s + double t = 5; // s cout << "Running \"" << cmd_str << "\" at dt = " << dt - << "s for " << t_end << "s..." << endl; + << "s for " << t << "s..." << endl; - rdx.command( + int count = 0; + redisx::Command* c = rdx.command( cmd_str, - [&count, &rdx, t0, t_end](const string &cmd, const string &value) { + [&count](const string &cmd, const string &value) { count++; - if(time_s() - t0 >= t_end) rdx.stop(); }, dt, dt ); - rdx.block_until_stopped(); - double actual_freq = (double)count / t_end; - cout << "Sent " << count << " commands in " << t_end<< "s, " + double t0 = time_s(); + this_thread::sleep_for(chrono::microseconds((int)(t*1e6))); + rdx.cancel(c); + rdx.stop(); + + double t_elapsed = time_s() - t0; + double actual_freq = (double)count / t_elapsed; + + cout << "Sent " << count << " commands in " << t_elapsed << "s, " << "that's " << actual_freq << " commands/s." << endl; return 0; diff --git a/src/command.hpp b/src/command.hpp new file mode 100644 index 0000000..01d93aa --- /dev/null +++ b/src/command.hpp @@ -0,0 +1,67 @@ +/** +* Redis C++11 wrapper. +*/ + +#pragma once + +#include +#include +#include + +#include +#include + +namespace redisx { + +template +class Command { +public: + Command( + redisAsyncContext* c, + const std::string& cmd, + const std::function& callback, + double repeat, double after + ); + + redisAsyncContext* c; + const std::string cmd; + const std::function callback; + double repeat; + double after; + bool done; + ev_timer* timer; + std::mutex timer_guard; + + void invoke(ReplyT reply); + + void free_if_done(); + ev_timer* get_timer() { + std::lock_guard lg(timer_guard); + return timer; + } +}; + +template +Command::Command( + redisAsyncContext* c, + const std::string& cmd, + const std::function& callback, + double repeat, double after +) : c(c), cmd(cmd), callback(callback), repeat(repeat), after(after), done(false) { + timer_guard.lock(); +} + +template +void Command::invoke(ReplyT reply) { + if(callback != NULL) callback(cmd, reply); + if((repeat == 0)) done = true; +} + +template +void Command::free_if_done() { + if(done) { + std::cout << "Deleting Command: " << cmd << std::endl; + delete this; + }; +} +} // End namespace redis diff --git a/src/redisx.cpp b/src/redisx.cpp index b940201..1ee636b 100644 --- a/src/redisx.cpp +++ b/src/redisx.cpp @@ -13,7 +13,7 @@ namespace redisx { // Global mutex to manage waiting for connected state mutex connected_lock; -// Map of ev_timer events to pointers to CommandAsync objects +// Map of ev_timer events to pointers to Command objects // Used to get the object back from the timer watcher callback unordered_map timer_callbacks; @@ -99,7 +99,7 @@ void Redis::block_until_stopped() { * true if succeeded, false otherwise. */ template -bool submit_to_server(CommandAsync* cmd_obj) { +bool submit_to_server(Command* cmd_obj) { if (redisAsyncCommand(cmd_obj->c, command_callback, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl; cmd_obj->free_if_done(); @@ -110,7 +110,7 @@ bool submit_to_server(CommandAsync* cmd_obj) { template void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { - auto cmd_obj = (CommandAsync*)timer_callbacks.at(timer); + auto cmd_obj = (Command*)timer_callbacks.at(timer); submit_to_server(cmd_obj); } @@ -121,7 +121,7 @@ bool Redis::process_queued_command(void* cmd_ptr) { auto it = command_map.find(cmd_ptr); if(it == command_map.end()) return false; - CommandAsync* cmd_obj = it->second; + Command* cmd_obj = it->second; command_map.erase(cmd_ptr); if((cmd_obj->repeat == 0) && (cmd_obj->after == 0)) { @@ -131,11 +131,14 @@ bool Redis::process_queued_command(void* cmd_ptr) { timer_callbacks[cmd_obj->timer] = (void*)cmd_obj; ev_timer_init(cmd_obj->timer, submit_command_callback, cmd_obj->after, cmd_obj->repeat); ev_timer_start(EV_DEFAULT_ cmd_obj->timer); + + cmd_obj->timer_guard.unlock(); } return true; } + void Redis::process_queued_commands() { lock_guard lg(queue_guard); @@ -162,15 +165,15 @@ long Redis::num_commands_processed() { // ---------------------------- -template<> unordered_map*>& Redis::get_command_map() { return commands_redis_reply; } +template<> unordered_map*>& Redis::get_command_map() { return commands_redis_reply; } template<> -void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { +void invoke_callback(Command* cmd_obj, redisReply* reply) { cmd_obj->invoke(reply); } -template<> unordered_map*>& Redis::get_command_map() { return commands_string_r; } +template<> unordered_map*>& Redis::get_command_map() { return commands_string_r; } template<> -void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { +void invoke_callback(Command* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl; return; @@ -179,9 +182,9 @@ void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { cmd_obj->invoke(reply->str); } -template<> unordered_map*>& Redis::get_command_map() { return commands_char_p; } +template<> unordered_map*>& Redis::get_command_map() { return commands_char_p; } template<> -void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { +void invoke_callback(Command* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl; return; @@ -189,9 +192,9 @@ void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { cmd_obj->invoke(reply->str); } -template<> unordered_map*>& Redis::get_command_map() { return commands_int; } +template<> unordered_map*>& Redis::get_command_map() { return commands_int; } template<> -void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { +void invoke_callback(Command* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_INTEGER) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl; return; @@ -199,9 +202,9 @@ void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { cmd_obj->invoke((int)reply->integer); } -template<> unordered_map*>& Redis::get_command_map() { return commands_long_long_int; } +template<> unordered_map*>& Redis::get_command_map() { return commands_long_long_int; } template<> -void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { +void invoke_callback(Command* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_INTEGER) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl; return; diff --git a/src/redisx.hpp b/src/redisx.hpp index ccead25..1be96d8 100644 --- a/src/redisx.hpp +++ b/src/redisx.hpp @@ -20,37 +20,9 @@ #include #include -namespace redisx { +#include "command.hpp" -template -class CommandAsync { -public: - CommandAsync( - redisAsyncContext* c, - const std::string& cmd, - const std::function& callback, - double repeat, double after - ) : c(c), cmd(cmd), callback(callback), repeat(repeat), after(after), done(false) {} - - redisAsyncContext* c; - const std::string cmd; - const std::function callback; - double repeat; - double after; - bool done; - ev_timer* timer; - - void invoke(ReplyT reply) { - if(callback != NULL) callback(cmd, reply); - if((repeat == 0)) done = true; - } - void free_if_done() { - if(done) { - std::cout << "Deleting CommandAsync: " << cmd << std::endl; - delete this; - }; - } -}; +namespace redisx { class Redis { @@ -65,7 +37,7 @@ public: void block_until_stopped(); template - void command( + Command* command( const std::string& cmd, const std::function& callback = NULL, double repeat = 0.0, @@ -74,6 +46,9 @@ public: void command(const char* command); + template + bool cancel(Command* cmd_obj); + long num_commands_processed(); // void get(const char* key, std::function callback); @@ -105,14 +80,14 @@ private: std::thread event_loop_thread; - std::unordered_map*> commands_redis_reply; - std::unordered_map*> commands_string_r; - std::unordered_map*> commands_char_p; - std::unordered_map*> commands_int; - std::unordered_map*> commands_long_long_int; + std::unordered_map*> commands_redis_reply; + std::unordered_map*> commands_string_r; + std::unordered_map*> commands_char_p; + std::unordered_map*> commands_int; + std::unordered_map*> commands_long_long_int; template - std::unordered_map*>& get_command_map(); + std::unordered_map*>& get_command_map(); std::queue command_queue; std::mutex queue_guard; @@ -126,7 +101,7 @@ private: template void invoke_callback( - CommandAsync* cmd_obj, + Command* cmd_obj, redisReply* reply ); @@ -134,7 +109,7 @@ template void command_callback(redisAsyncContext *c, void *r, void *privdata) { redisReply *reply = (redisReply *) r; - auto *cmd_obj = (CommandAsync *) privdata; + auto *cmd_obj = (Command *) privdata; if (reply->type == REDIS_REPLY_ERROR) { std::cerr << "[ERROR] " << cmd_obj->cmd << ": " << reply->str << std::endl; @@ -153,7 +128,7 @@ void command_callback(redisAsyncContext *c, void *r, void *privdata) { } template -void Redis::command( +Command* Redis::command( const std::string& cmd, const std::function& callback, double repeat, @@ -161,9 +136,21 @@ void Redis::command( ) { std::lock_guard lg(queue_guard); - auto* cmd_obj = new CommandAsync(c, cmd, callback, repeat, after); + auto* cmd_obj = new Command(c, cmd, callback, repeat, after); get_command_map()[(void*)cmd_obj] = cmd_obj; command_queue.push((void*)cmd_obj); + return cmd_obj; +} + +template +bool Redis::cancel(Command* cmd_obj) { + + // TODO erase from global timer_callbacks + + if((cmd_obj->repeat != 0) || (cmd_obj->after != 0)) + ev_timer_stop(EV_DEFAULT_ cmd_obj->get_timer()); + + return true; } } // End namespace redis