diff --git a/CMakeLists.txt b/CMakeLists.txt index 6b69882..828db03 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,8 +29,8 @@ set(LIB_ALL ${LIB_REDIS}) # Examples # --------------------------------------------------------- -#add_executable(basic examples/basic.cpp ${SRC_ALL}) -#target_link_libraries(basic ${LIB_REDIS}) +add_executable(basic examples/basic.cpp ${SRC_ALL}) +target_link_libraries(basic ${LIB_REDIS}) #add_executable(progressive examples/progressive.cpp ${SRC_ALL}) #target_link_libraries(progressive ${LIB_REDIS}) diff --git a/README.md b/README.md index 5124e73..a964a13 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ redox ====== -High-level, asynchronous, and wicked fast C++11 bindings for Redis +Modern, asynchronous, and wicked fast C++11 bindings for Redis Work in progress, details coming soon. diff --git a/examples/basic.cpp b/examples/basic.cpp index 1d40ae4..e10d471 100644 --- a/examples/basic.cpp +++ b/examples/basic.cpp @@ -1,21 +1,21 @@ /** -* Basic asynchronous calls using redisx. +* Basic use of Redox to set and get a Redis key. */ #include -#include "../src/redisx.hpp" +#include "../src/redox.hpp" using namespace std; int main(int argc, char* argv[]) { - redisx::Redis rdx = {"localhost", 6379}; + redox::Redox rdx = {"localhost", 6379}; - rdx.command("SET alaska rules!", [](const string &cmd, const string &value) { + rdx.command("SET alaska rules!", [](const string &cmd, const string &value) { cout << cmd << ": " << value << endl; }); - rdx.command("GET alaska", [](const string &cmd, const string &value) { + rdx.command("GET alaska", [](const string &cmd, const string &value) { cout << cmd << ": " << value << endl; }); diff --git a/examples/basic_threaded.cpp b/examples/basic_threaded.cpp index 7071d78..23242fc 100644 --- a/examples/basic_threaded.cpp +++ b/examples/basic_threaded.cpp @@ -20,6 +20,7 @@ int main(int argc, char* argv[]) { rdx.command("INCR counter"); this_thread::sleep_for(chrono::milliseconds(1)); } + cout << "Setter thread exiting." << endl; }); thread getter([]() { @@ -32,6 +33,7 @@ int main(int argc, char* argv[]) { ); this_thread::sleep_for(chrono::milliseconds(1000)); } + cout << "Getter thread exiting." << endl; }); setter.join(); diff --git a/src/command.cpp b/src/command.cpp index 5ec4595..d83851c 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -64,7 +64,7 @@ void Command::invoke_callback() { template<> void Command::invoke_callback() { -// std::cout << "invoking int callback" << std::endl; + if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY); diff --git a/src/command.hpp b/src/command.hpp index 429de6b..71656f9 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -74,6 +74,13 @@ public: void process_reply(); + ev_timer* get_timer() { + std::lock_guard lg(timer_guard); + return &timer; + } + + static void free_command(Command* c); + private: const std::function callback; @@ -89,16 +96,11 @@ private: ev_timer timer; std::mutex timer_guard; - ev_timer* get_timer() { - std::lock_guard lg(timer_guard); - return &timer; - } // Make sure we don't free resources until details taken care of std::mutex free_guard; void free_reply_object(); - static void free_command(Command* c); void invoke_callback(); bool is_error_reply(); @@ -122,11 +124,6 @@ Command::Command( template void Command::process_reply() { - if(cmd == "GET simple_loop:count") { - std::cout << "In process_reply, cmd = " << cmd << ", reply_obj = " << reply_obj << std::endl; - std::cout << "reply int: " << reply_obj->integer << std::endl; - std::cout << "reply str: " << reply_obj->str << std::endl; - } free_guard.lock(); invoke_callback(); @@ -134,20 +131,44 @@ void Command::process_reply() { pending--; if(!free_memory) { - std::cout << "Command memory not being freed, free_memory = " << free_memory << std::endl; // Allow free() method to free memory +// std::cout << "Command memory not being freed, free_memory = " << free_memory << std::endl; free_guard.unlock(); return; } - free_reply_object(); - if((pending == 0) && (repeat == 0)) { - free_command(this); - } else { - free_guard.unlock(); +// // Free memory when all pending callbacks are received +// if((repeat != 0) && (pending == 0) && ((long)(get_timer()->data) == 0)) { +// std::cout << "Freeing command, timer stopped and pending is 0." << std::endl; +// free_command(this); +// } +// +// if((pending == 0) && (repeat == 0)) { +// free_command(this); +// } else { +// free_guard.unlock(); +// } + + // Handle memory if all pending replies have arrived + if(pending == 0) { + + // Just free non-repeating commands + if (repeat == 0) { + free_command(this); + return; + + // Free repeating commands if timer is stopped + } else { + if((long)(get_timer()->data) == 0) { + free_command(this); + return; + } + } } + + free_guard.unlock(); } template @@ -174,9 +195,8 @@ void Command::free_reply_object() { template void Command::free_command(Command* c) { - c->rdx->commands_deleted += 1; c->rdx->template remove_active_command(c->id); -// std::cout << "[INFO] Deleted Command " << c->rdx->commands_created << " at " << c << std::endl; +// std::cout << "[INFO] Deleted Command " << c->id << " at " << c << std::endl; delete c; } diff --git a/src/redox.cpp b/src/redox.cpp index ad94937..750e54c 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -66,14 +66,6 @@ Redox::Redox(const string& host, const int port) Redox::~Redox() { -// cout << "Queue sizes: " << endl; -// cout << commands_redis_reply.size() << endl; -// cout << commands_string_r.size() << endl; -// cout << commands_char_p.size() << endl; -// cout << commands_int.size() << endl; -// cout << commands_long_long_int.size() << endl; -// cout << commands_null.size() << endl; - redisAsyncDisconnect(ctx); stop(); @@ -105,14 +97,14 @@ void Redox::run_blocking() { cout << "[INFO] Stop signal detected." << endl; // Run a few more times to clear out canceled events -// for(int i = 0; i < 100; i++) { -// ev_run(evloop, EVRUN_NOWAIT); -// } - - // Run until all commands are processed - do { + for(int i = 0; i < 100; i++) { ev_run(evloop, EVRUN_NOWAIT); - } while(commands_created != commands_deleted); + } + + if(commands_created != commands_deleted) { + cerr << "[ERROR] All commands were not freed! " + << commands_created << "/" << commands_deleted << endl; + } exited = true; @@ -145,36 +137,30 @@ void Redox::stop() { } template +Command* Redox::find_command(long id) { + + lock_guard lg(command_map_guard); + + auto& command_map = get_command_map(); + auto it = command_map.find(id); + if(it == command_map.end()) return nullptr; + return it->second; +} + +template void command_callback(redisAsyncContext *ctx, void *r, void *privdata) { Redox* rdx = (Redox*) ctx->data; long id = (long)privdata; redisReply* reply_obj = (redisReply*) r; - auto& command_map = rdx->get_command_map(); - auto it = command_map.find(id); - if(it == command_map.end()) { - cout << "[ERROR] Couldn't find Command " << id << " in command_map." << endl; - freeReplyObject(r); - return; - }; - Command* c = it->second; - - if(!rdx->is_active_command(c->id)) { - std::cout << "[INFO] Ignoring callback, command " << c << " was freed." << std::endl; - freeReplyObject(r); + Command* c = rdx->find_command(id); + if(c == nullptr) { +// cout << "[WARNING] Couldn't find Command " << id << " in command_map (command_callback)." << endl; + freeReplyObject(reply_obj); return; } - if(c->cmd == "GET simple_loop:count") { - std::cout << "In command_callback = " << c->cmd << " at " << c << ", reply_obj = " << reply_obj << std::endl; - std::cout << "reply type: " << reply_obj->type << std::endl; - std::cout << "reply int: " << reply_obj->integer << std::endl; - std::cout << "reply str: " << reply_obj->str << std::endl; -// std::string s(reply_obj->str); -// std::cout << "string object: " << s << std::endl; - } - c->reply_obj = reply_obj; c->process_reply(); @@ -188,9 +174,6 @@ void command_callback(redisAsyncContext *ctx, void *r, void *privdata) { */ template bool submit_to_server(Command* c) { - if(c->cmd == "GET simple_loop:count") { - std::cout << "submit_to_server for cmd at " << c << ": " << c->cmd << std::endl; - } c->pending++; if (redisAsyncCommand(c->rdx->ctx, command_callback, (void*)c->id, c->cmd.c_str()) != REDIS_OK) { cerr << "[ERROR] Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr << endl; @@ -206,24 +189,24 @@ void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) Redox* rdx = (Redox*) ev_userdata(loop); long id = (long)timer->data; - auto& command_map = rdx->get_command_map(); - auto it = command_map.find(id); - if(it == command_map.end()) { - cout << "[ERROR] Couldn't find Command " << id << " in command_map." << endl; + Command* c = rdx->find_command(id); + if(c == nullptr) { + cout << "[ERROR] Couldn't find Command " << id + << " in command_map (submit_command_callback)." << endl; return; - }; - Command* c = it->second; + } if(c->is_completed()) { - cerr << "[INFO] Command " << c << " is completed, stopping event timer." << endl; +// cout << "[INFO] Command " << c << " is completed, stopping event timer." << endl; c->timer_guard.lock(); if((c->repeat != 0) || (c->after != 0)) ev_timer_stop(loop, &c->timer); c->timer_guard.unlock(); - Command::free_command(c); + // Mark for memory to be freed when all callbacks are received + c->timer.data = (void*)0; return; } @@ -234,15 +217,8 @@ void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) template bool Redox::process_queued_command(long id) { - auto& command_map = get_command_map(); - - auto it = command_map.find(id); - if(it == command_map.end()) return false; - Command* c = it->second; - - if(c->cmd == "GET simple_loop:count") { - std::cout << "process_queued_command for cmd at " << c << ": " << c->cmd << std::endl; - } + Command* c = find_command(id); + if(c == nullptr) return false; if((c->repeat == 0) && (c->after == 0)) { submit_to_server(c); @@ -266,6 +242,8 @@ void Redox::process_queued_commands() { while(!command_queue.empty()) { long id = command_queue.front(); + command_queue.pop(); + if(process_queued_command(id)) {} else if(process_queued_command(id)) {} else if(process_queued_command(id)) {} @@ -273,30 +251,38 @@ void Redox::process_queued_commands() { else if(process_queued_command(id)) {} else if(process_queued_command(id)) {} else throw runtime_error("[FATAL] Command pointer not found in any queue!"); - - command_queue.pop(); } } // ---------------------------- template<> unordered_map*>& -Redox::get_command_map() { return commands_redis_reply; } +Redox::get_command_map() { +// cout << "redis reply command map at " << &commands_redis_reply << endl; + return commands_redis_reply; } template<> unordered_map*>& -Redox::get_command_map() { return commands_string_r; } +Redox::get_command_map() { +// cout << "string command map at " << &commands_string_r << endl; + return commands_string_r; } template<> unordered_map*>& -Redox::get_command_map() { return commands_char_p; } +Redox::get_command_map() { +// cout << "char* command map at " << &commands_char_p << endl; + return commands_char_p; } template<> unordered_map*>& -Redox::get_command_map() { return commands_int; } +Redox::get_command_map() { +// cout << "int command map at " << &commands_int << " has size: " << commands_int.size() << endl; + return commands_int; } template<> unordered_map*>& -Redox::get_command_map() { return commands_long_long_int; } +Redox::get_command_map() { +// cout << "long long int command map at " << &commands_long_long_int << endl; + return commands_long_long_int; } template<> unordered_map*>& -Redox::get_command_map() { return commands_null; } +Redox::get_command_map() { return commands_null; } // ---------------------------- // Helpers diff --git a/src/redox.hpp b/src/redox.hpp index dc6d4b4..82e3c8b 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -73,17 +73,17 @@ public: std::atomic_long commands_created = {0}; std::atomic_long commands_deleted = {0}; - bool is_active_command(const long id) { - return active_commands.find(id) != active_commands.end(); - } - template void remove_active_command(const long id) { - active_commands.erase(id); + std::lock_guard lg1(command_map_guard); get_command_map().erase(id); + commands_deleted += 1; } template + Command* find_command(long id); + + template std::unordered_map*>& get_command_map(); private: @@ -114,6 +114,7 @@ private: std::unordered_map*> commands_int; std::unordered_map*> commands_long_long_int; std::unordered_map*> commands_null; + std::mutex command_map_guard; std::queue command_queue; std::mutex queue_guard; @@ -121,9 +122,6 @@ private: template bool process_queued_command(long id); - - // Commands created but not yet deleted (stored by id) - std::unordered_set active_commands; }; // --------------------------- @@ -137,19 +135,19 @@ Command* Redox::command( double after, bool free_memory ) { - std::lock_guard lg(queue_guard); commands_created += 1; auto* c = new Command(this, commands_created, cmd, callback, error_callback, repeat, after, free_memory); + std::lock_guard lg(queue_guard); + std::lock_guard lg2(command_map_guard); + get_command_map()[c->id] = c; - active_commands.insert(c->id); command_queue.push(c->id); - std::cout << "[DEBUG] Created Command " << c->id << " at " << c << std::endl; - if(cmd == "GET simple_loop:count") { - std::cout << "Command created at " << c << ": " << c->cmd << std::endl; - } + +// std::cout << "[DEBUG] Created Command " << c->id << " at " << c << std::endl; + return c; } @@ -161,7 +159,7 @@ bool Redox::cancel(Command* c) { return false; } - std::cout << "[INFO] Canceling command " << c->id << " at " << c << std::endl; +// std::cout << "[INFO] Canceling command " << c->id << " at " << c << std::endl; c->completed = true; return true; @@ -181,7 +179,7 @@ Command* Redox::command_blocking(const std::string& cmd) { Command* c = command(cmd, [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) { std::unique_lock ul(m); - std::cout << "success callback: " << cmd_str << std::endl; + val = reply; status = REDOX_OK; ul.unlock(); @@ -195,10 +193,10 @@ Command* Redox::command_blocking(const std::string& cmd) { }, 0, 0, false // No repeats, don't free memory ); - std::cout << "command blocking cv wait starting" << std::endl; + // Wait until a callback is invoked cv.wait(lk, [&status] { return status != REDOX_UNINIT; }); - std::cout << "command blocking cv wait over" << std::endl; + c->reply_val = val; c->reply_status = status;