diff --git a/CMakeLists.txt b/CMakeLists.txt index 9cdc60b..7fc3cbf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,8 +1,8 @@ cmake_minimum_required(VERSION 2.8.4) project(redisx) -#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3") -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3") +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer") # set(CMAKE_VERBOSE_MAKEFILE ON) # --------------------------------------------------------- @@ -34,8 +34,11 @@ set(LIB_ALL ${LIB_REDIS}) #add_executable(progressive examples/progressive.cpp ${SRC_ALL}) #target_link_libraries(progressive ${LIB_REDIS}) -add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL}) -target_link_libraries(basic_threaded ${LIB_REDIS}) +#add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL}) +#target_link_libraries(basic_threaded ${LIB_REDIS}) -add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL}) -target_link_libraries(lpush_benchmark ${LIB_REDIS}) +#add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL}) +#target_link_libraries(lpush_benchmark ${LIB_REDIS}) + +add_executable(simple_loop examples/simple_loop.cpp ${SRC_ALL}) +target_link_libraries(simple_loop ${LIB_REDIS}) diff --git a/examples/simple_loop.cpp b/examples/simple_loop.cpp new file mode 100644 index 0000000..69d3ab2 --- /dev/null +++ b/examples/simple_loop.cpp @@ -0,0 +1,48 @@ +/** +* Basic asynchronous calls using redisx. +*/ + +#include +#include "../src/redisx.hpp" + +using namespace std; + +double time_s() { + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::milliseconds(1); + return (double)ms / 1000; +} + +int main(int argc, char* argv[]) { + + redisx::Redis rdx = {"localhost", 6379}; + rdx.run(); + + string cmd_str = "SET alaska rules!"; + + double freq = 10000; // Hz + double t_end = 5; + + double dt = 1 / freq; + double t0 = time_s(); + int count = 0; + + cout << "Running \"" << cmd_str << "\" at dt = " << dt + << "s for " << t_end << "s..." << endl; + + rdx.command( + cmd_str, + [&count, &rdx, t0, t_end](const string &cmd, const string &value) { + count++; + if(time_s() - t0 >= t_end) rdx.stop(); + }, + dt, + dt + ); + + rdx.block_until_stopped(); + double actual_freq = (double)count / t_end; + cout << "Sent " << count << " commands in " << t_end<< "s, " + << "that's " << actual_freq << " commands/s." << endl; + + return 0; +} diff --git a/src/redisx.cpp b/src/redisx.cpp index 373eec5..b940201 100644 --- a/src/redisx.cpp +++ b/src/redisx.cpp @@ -3,20 +3,20 @@ */ #include -#include -#include -#include -#include -#include -#include +//#include #include "redisx.hpp" using namespace std; namespace redisx { +// Global mutex to manage waiting for connected state mutex connected_lock; +// Map of ev_timer events to pointers to CommandAsync objects +// Used to get the object back from the timer watcher callback +unordered_map timer_callbacks; + void connected(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { cerr << "[ERROR] Connecting to Redis: " << c->errstr << endl; @@ -94,17 +94,27 @@ void Redis::block_until_stopped() { exit_waiter.wait(ul, [this]() { return to_exit.load(); }); } +/** +* Submit an asynchronous command to the Redis server. Return +* true if succeeded, false otherwise. +*/ template -bool Redis::submit_to_server(const CommandAsync* cmd_obj) { - if (redisAsyncCommand(c, command_callback, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { - cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << c->errstr << endl; - delete cmd_obj; +bool submit_to_server(CommandAsync* cmd_obj) { + if (redisAsyncCommand(cmd_obj->c, command_callback, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { + cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl; + cmd_obj->free_if_done(); return false; } return true; } template +void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { + auto cmd_obj = (CommandAsync*)timer_callbacks.at(timer); + submit_to_server(cmd_obj); +} + +template bool Redis::process_queued_command(void* cmd_ptr) { auto& command_map = get_command_map(); @@ -114,7 +124,14 @@ bool Redis::process_queued_command(void* cmd_ptr) { CommandAsync* cmd_obj = it->second; command_map.erase(cmd_ptr); - submit_to_server(cmd_obj); + if((cmd_obj->repeat == 0) && (cmd_obj->after == 0)) { + submit_to_server(cmd_obj); + } else { + cmd_obj->timer = new ev_timer(); + timer_callbacks[cmd_obj->timer] = (void*)cmd_obj; + ev_timer_init(cmd_obj->timer, submit_command_callback, cmd_obj->after, cmd_obj->repeat); + ev_timer_start(EV_DEFAULT_ cmd_obj->timer); + } return true; } @@ -147,13 +164,13 @@ long Redis::num_commands_processed() { template<> unordered_map*>& Redis::get_command_map() { return commands_redis_reply; } template<> -void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { +void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { cmd_obj->invoke(reply); } template<> unordered_map*>& Redis::get_command_map() { return commands_string_r; } template<> -void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { +void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl; return; @@ -164,7 +181,7 @@ void invoke_callback(const CommandAsync* cmd_obj, redisReply* rep template<> unordered_map*>& Redis::get_command_map() { return commands_char_p; } template<> -void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { +void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl; return; @@ -174,7 +191,7 @@ void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply template<> unordered_map*>& Redis::get_command_map() { return commands_int; } template<> -void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { +void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_INTEGER) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl; return; @@ -184,7 +201,7 @@ void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { template<> unordered_map*>& Redis::get_command_map() { return commands_long_long_int; } template<> -void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { +void invoke_callback(CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_INTEGER) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl; return; diff --git a/src/redisx.hpp b/src/redisx.hpp index b9bb11d..ccead25 100644 --- a/src/redisx.hpp +++ b/src/redisx.hpp @@ -18,7 +18,7 @@ #include #include -#include +#include namespace redisx { @@ -26,17 +26,30 @@ template class CommandAsync { public: CommandAsync( + redisAsyncContext* c, const std::string& cmd, const std::function& callback, double repeat, double after - ) : cmd(cmd), callback(callback), repeat(repeat), after(after) {} + ) : c(c), cmd(cmd), callback(callback), repeat(repeat), after(after), done(false) {} + redisAsyncContext* c; const std::string cmd; const std::function callback; double repeat; double after; + bool done; + ev_timer* timer; - void invoke(ReplyT reply) const {if(callback != NULL) callback(cmd, reply); } + void invoke(ReplyT reply) { + if(callback != NULL) callback(cmd, reply); + if((repeat == 0)) done = true; + } + void free_if_done() { + if(done) { + std::cout << "Deleting CommandAsync: " << cmd << std::endl; + delete this; + }; + } }; class Redis { @@ -63,8 +76,6 @@ public: long num_commands_processed(); -// struct event* command_loop(const char* command, long interval_s, long interval_us); - // void get(const char* key, std::function callback); // // void set(const char* key, const char* value); @@ -109,20 +120,13 @@ private: template bool process_queued_command(void* cmd_ptr); - - /** - * Submit an asynchronous command to the Redis server. Return - * true if succeeded, false otherwise. - */ - template - bool submit_to_server(const CommandAsync* cmd_obj); }; // --------------------------- template void invoke_callback( - const CommandAsync* cmd_obj, + CommandAsync* cmd_obj, redisReply* reply ); @@ -134,18 +138,18 @@ void command_callback(redisAsyncContext *c, void *r, void *privdata) { if (reply->type == REDIS_REPLY_ERROR) { std::cerr << "[ERROR] " << cmd_obj->cmd << ": " << reply->str << std::endl; - delete cmd_obj; + cmd_obj->free_if_done(); return; } if(reply->type == REDIS_REPLY_NIL) { std::cerr << "[WARNING] " << cmd_obj->cmd << ": Nil reply." << std::endl; - delete cmd_obj; + cmd_obj->free_if_done(); return; // cmd_obj->invoke(NULL); } invoke_callback(cmd_obj, reply); - delete cmd_obj; + cmd_obj->free_if_done(); } template @@ -157,10 +161,9 @@ void Redis::command( ) { std::lock_guard lg(queue_guard); - auto* cmd_obj = new CommandAsync(cmd, callback, repeat, after); + auto* cmd_obj = new CommandAsync(c, cmd, callback, repeat, after); get_command_map()[(void*)cmd_obj] = cmd_obj; command_queue.push((void*)cmd_obj); } - } // End namespace redis