diff --git a/CMakeLists.txt b/CMakeLists.txt index 7fc3cbf..ce08a59 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,3 +42,6 @@ set(LIB_ALL ${LIB_REDIS}) add_executable(simple_loop examples/simple_loop.cpp ${SRC_ALL}) target_link_libraries(simple_loop ${LIB_REDIS}) + +add_executable(simple_sync_loop examples/simple_sync_loop.cpp ${SRC_ALL}) +target_link_libraries(simple_sync_loop ${LIB_REDIS}) diff --git a/examples/simple_loop.cpp b/examples/simple_loop.cpp index 5263f75..dab032d 100644 --- a/examples/simple_loop.cpp +++ b/examples/simple_loop.cpp @@ -17,10 +17,12 @@ int main(int argc, char* argv[]) { Redis rdx = {"localhost", 6379}; rdx.run(); - - Command* del_cmd = rdx.command_blocking("DEL simple_loop:count"); - cout << "deleted key, reply: " << del_cmd->reply() << endl; - del_cmd->free(); +// +// Command* del_cmd = rdx.command_blocking("DEL simple_loop:count"); +// cout << "deleted key, reply: " << del_cmd->reply() << endl; +// del_cmd->free(); + if(rdx.command_blocking("DEL simple_loop:count")) cout << "Deleted simple_loop:count" << endl; + else cerr << "Failed to delete simple_loop:count" << endl; Command* set_cmd = rdx.command_blocking("SET simple_loop:count 0"); cout << "set key, reply: " << set_cmd->reply() << endl; diff --git a/examples/simple_sync_loop.cpp b/examples/simple_sync_loop.cpp new file mode 100644 index 0000000..9b26231 --- /dev/null +++ b/examples/simple_sync_loop.cpp @@ -0,0 +1,48 @@ +/** +* Basic asynchronous calls using redisx. +*/ + +#include +#include "../src/redisx.hpp" + +using namespace std; +using namespace redisx; + +double time_s() { + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); + return (double)ms / 1e6; +} + +int main(int argc, char* argv[]) { + + Redis rdx = {"localhost", 6379}; + rdx.run(); + + if(rdx.command_blocking("DEL simple_loop:count")) cout << "Deleted simple_loop:count" << endl; + else cerr << "Failed to delete simple_loop:count" << endl; + + string cmd_str = "INCR simple_loop:count"; + + int count = 50000; + double t0 = time_s(); + + cout << "Running \"" << cmd_str << "\" " << count << " times." << endl; + + for(int i = 0; i < count; i++) { + Command* c = rdx.command_blocking(cmd_str); + if(c->status() != REDIS_OK) cerr << "Bad reply, code: " << c->status() << endl; + } + + cout << "At the end, simple_loop:count = " + << rdx.command_blocking("GET simple_loop:count")->reply() << endl; + + rdx.stop(); + + double t_elapsed = time_s() - t0; + double actual_freq = (double)count / t_elapsed; + + cout << "Sent " << count << " commands in " << t_elapsed << "s, " + << "that's " << actual_freq << " commands/s." << endl; + + return 0; +} diff --git a/src/command.hpp b/src/command.hpp index 1f8bbd4..87be8c0 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -74,12 +74,12 @@ private: std::atomic_bool completed; - ev_timer* timer; + ev_timer timer; std::mutex timer_guard; ev_timer* get_timer() { std::lock_guard lg(timer_guard); - return timer; + return &timer; } }; diff --git a/src/redisx.cpp b/src/redisx.cpp index 49bd2fe..4cc171d 100644 --- a/src/redisx.cpp +++ b/src/redisx.cpp @@ -11,10 +11,8 @@ using namespace std; namespace redisx { -// Default construct the static map -std::unordered_map Redis::timer_callbacks; - // Global mutex to manage waiting for connected state +// TODO get rid of this as the only global variable? mutex connected_lock; /** @@ -119,12 +117,14 @@ bool submit_to_server(Command* cmd_obj) { template void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { - auto cmd_obj = (Command*)Redis::timer_callbacks.at(timer); - if(cmd_obj == NULL) { + + // Check if canceled + if(timer->data == NULL) { cerr << "[WARNING] Skipping event, has been canceled." << endl; - Redis::timer_callbacks.erase(timer); return; } + + auto cmd_obj = (Command*)timer->data; submit_to_server(cmd_obj); } @@ -141,14 +141,11 @@ bool Redis::process_queued_command(void* cmd_ptr) { if((cmd_obj->repeat == 0) && (cmd_obj->after == 0)) { submit_to_server(cmd_obj); } else { - // TODO manage memory somehow - cmd_obj->timer = new ev_timer(); - // TODO use cmd_obj->timer->data instead of timer callbacks!!!!! + cmd_obj->timer.data = (void*)cmd_obj; - timer_callbacks[cmd_obj->timer] = (void*)cmd_obj; - ev_timer_init(cmd_obj->timer, submit_command_callback, cmd_obj->after, cmd_obj->repeat); - ev_timer_start(EV_DEFAULT_ cmd_obj->timer); + ev_timer_init(&cmd_obj->timer, submit_command_callback, cmd_obj->after, cmd_obj->repeat); + ev_timer_start(EV_DEFAULT_ &cmd_obj->timer); cmd_obj->timer_guard.unlock(); } @@ -242,8 +239,11 @@ void Redis::command(const string& cmd) { command(cmd, NULL); } -void Redis::command_blocking(const string& cmd) { - command_blocking(cmd); +bool Redis::command_blocking(const string& cmd) { + Command* c = command_blocking(cmd); + bool succeeded = (c->status() == REDISX_OK); + c->free(); + return succeeded; } } // End namespace redis diff --git a/src/redisx.hpp b/src/redisx.hpp index 3281568..d17a308 100644 --- a/src/redisx.hpp +++ b/src/redisx.hpp @@ -49,12 +49,11 @@ public: template bool cancel(Command* cmd_obj); - void command(const std::string& command); - template Command* command_blocking(const std::string& cmd); - void command_blocking(const std::string& command); + void command(const std::string& command); + bool command_blocking(const std::string& command); long num_commands_processed(); @@ -62,10 +61,6 @@ public: // void subscribe(std::string channel, std::function callback); // void unsubscribe(std::string channel); - // Map of ev_timer events to pointers to Command objects - // Used to get the object back from the timer watcher callback - static std::unordered_map timer_callbacks; - private: // Redis server @@ -154,13 +149,12 @@ bool Redis::cancel(Command* cmd_obj) { return false; } - timer_callbacks.at(cmd_obj->timer) = NULL; + cmd_obj->timer.data = NULL; std::lock_guard lg(cmd_obj->timer_guard); if((cmd_obj->repeat != 0) || (cmd_obj->after != 0)) - ev_timer_stop(EV_DEFAULT_ cmd_obj->timer); + ev_timer_stop(EV_DEFAULT_ &cmd_obj->timer); - delete cmd_obj->timer; cmd_obj->completed = true; return true; @@ -179,21 +173,22 @@ Command* Redis::command_blocking(const std::string& cmd) { Command* cmd_obj = command(cmd, [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) { - std::unique_lock lk(m); + std::unique_lock ul(m); val = reply; status = REDISX_OK; - lk.unlock(); + ul.unlock(); cv.notify_one(); }, [&status, &m, &cv](const std::string& cmd_str, int error) { - std::unique_lock lk(m); + std::unique_lock ul(m); status = error; - lk.unlock(); + ul.unlock(); cv.notify_one(); }, 0, 0, false // No repeats, don't free memory ); + // Wait until a callback is invoked cv.wait(lk, [&status] { return status != REDISX_UNINIT; }); cmd_obj->reply_val = val;