diff --git a/src/command.hpp b/src/command.hpp index 56ebd06..429de6b 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -38,6 +38,7 @@ friend void submit_command_callback(struct ev_loop* loop, ev_timer* time public: Command( Redox* rdx, + long id, const std::string& cmd, const std::function& callback, const std::function& error_callback, @@ -47,6 +48,7 @@ public: Redox* rdx; + const long id; const std::string cmd; const double repeat; const double after; @@ -106,11 +108,12 @@ private: template Command::Command( Redox* rdx, + long id, const std::string& cmd, const std::function& callback, const std::function& error_callback, double repeat, double after, bool free_memory -) : rdx(rdx), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), +) : rdx(rdx), id(id), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), callback(callback), error_callback(error_callback) { timer_guard.lock(); @@ -119,6 +122,11 @@ Command::Command( template void Command::process_reply() { + if(cmd == "GET simple_loop:count") { + std::cout << "In process_reply, cmd = " << cmd << ", reply_obj = " << reply_obj << std::endl; + std::cout << "reply int: " << reply_obj->integer << std::endl; + std::cout << "reply str: " << reply_obj->str << std::endl; + } free_guard.lock(); invoke_callback(); @@ -126,11 +134,13 @@ void Command::process_reply() { pending--; if(!free_memory) { + std::cout << "Command memory not being freed, free_memory = " << free_memory << std::endl; // Allow free() method to free memory free_guard.unlock(); return; } + free_reply_object(); if((pending == 0) && (repeat == 0)) { @@ -165,7 +175,7 @@ void Command::free_reply_object() { template void Command::free_command(Command* c) { c->rdx->commands_deleted += 1; - c->rdx->remove_active_command(c); + c->rdx->template remove_active_command(c->id); // std::cout << "[INFO] Deleted Command " << c->rdx->commands_created << " at " << c << std::endl; delete c; } diff --git a/src/redox.cpp b/src/redox.cpp index 8dfd441..ad94937 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -147,16 +147,34 @@ void Redox::stop() { template void command_callback(redisAsyncContext *ctx, void *r, void *privdata) { - Command* c = (Command*) privdata; - redisReply* reply_obj = (redisReply*) r; Redox* rdx = (Redox*) ctx->data; + long id = (long)privdata; + redisReply* reply_obj = (redisReply*) r; - if(!rdx->is_active_command(c)) { + auto& command_map = rdx->get_command_map(); + auto it = command_map.find(id); + if(it == command_map.end()) { + cout << "[ERROR] Couldn't find Command " << id << " in command_map." << endl; + freeReplyObject(r); + return; + }; + Command* c = it->second; + + if(!rdx->is_active_command(c->id)) { std::cout << "[INFO] Ignoring callback, command " << c << " was freed." << std::endl; freeReplyObject(r); return; } + if(c->cmd == "GET simple_loop:count") { + std::cout << "In command_callback = " << c->cmd << " at " << c << ", reply_obj = " << reply_obj << std::endl; + std::cout << "reply type: " << reply_obj->type << std::endl; + std::cout << "reply int: " << reply_obj->integer << std::endl; + std::cout << "reply str: " << reply_obj->str << std::endl; +// std::string s(reply_obj->str); +// std::cout << "string object: " << s << std::endl; + } + c->reply_obj = reply_obj; c->process_reply(); @@ -170,8 +188,11 @@ void command_callback(redisAsyncContext *ctx, void *r, void *privdata) { */ template bool submit_to_server(Command* c) { + if(c->cmd == "GET simple_loop:count") { + std::cout << "submit_to_server for cmd at " << c << ": " << c->cmd << std::endl; + } c->pending++; - if (redisAsyncCommand(c->rdx->ctx, command_callback, (void*)c, c->cmd.c_str()) != REDIS_OK) { + if (redisAsyncCommand(c->rdx->ctx, command_callback, (void*)c->id, c->cmd.c_str()) != REDIS_OK) { cerr << "[ERROR] Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr << endl; c->invoke_error(REDOX_SEND_ERROR); return false; @@ -182,7 +203,16 @@ bool submit_to_server(Command* c) { template void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { - auto c = (Command*)timer->data; + Redox* rdx = (Redox*) ev_userdata(loop); + long id = (long)timer->data; + + auto& command_map = rdx->get_command_map(); + auto it = command_map.find(id); + if(it == command_map.end()) { + cout << "[ERROR] Couldn't find Command " << id << " in command_map." << endl; + return; + }; + Command* c = it->second; if(c->is_completed()) { @@ -202,20 +232,23 @@ void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) } template -bool Redox::process_queued_command(void* c_ptr) { +bool Redox::process_queued_command(long id) { auto& command_map = get_command_map(); - auto it = command_map.find(c_ptr); + auto it = command_map.find(id); if(it == command_map.end()) return false; Command* c = it->second; - command_map.erase(c_ptr); + + if(c->cmd == "GET simple_loop:count") { + std::cout << "process_queued_command for cmd at " << c << ": " << c->cmd << std::endl; + } if((c->repeat == 0) && (c->after == 0)) { submit_to_server(c); } else { - c->timer.data = (void*)c; + c->timer.data = (void*)c->id; ev_timer_init(&c->timer, submit_command_callback, c->after, c->repeat); ev_timer_start(evloop, &c->timer); @@ -232,13 +265,13 @@ void Redox::process_queued_commands() { while(!command_queue.empty()) { - void* c_ptr = command_queue.front(); - if(process_queued_command(c_ptr)) {} - else if(process_queued_command(c_ptr)) {} - else if(process_queued_command(c_ptr)) {} - else if(process_queued_command(c_ptr)) {} - else if(process_queued_command(c_ptr)) {} - else if(process_queued_command(c_ptr)) {} + long id = command_queue.front(); + if(process_queued_command(id)) {} + else if(process_queued_command(id)) {} + else if(process_queued_command(id)) {} + else if(process_queued_command(id)) {} + else if(process_queued_command(id)) {} + else if(process_queued_command(id)) {} else throw runtime_error("[FATAL] Command pointer not found in any queue!"); command_queue.pop(); @@ -247,22 +280,22 @@ void Redox::process_queued_commands() { // ---------------------------- -template<> unordered_map*>& +template<> unordered_map*>& Redox::get_command_map() { return commands_redis_reply; } -template<> unordered_map*>& +template<> unordered_map*>& Redox::get_command_map() { return commands_string_r; } -template<> unordered_map*>& +template<> unordered_map*>& Redox::get_command_map() { return commands_char_p; } -template<> unordered_map*>& +template<> unordered_map*>& Redox::get_command_map() { return commands_int; } -template<> unordered_map*>& +template<> unordered_map*>& Redox::get_command_map() { return commands_long_long_int; } -template<> unordered_map*>& +template<> unordered_map*>& Redox::get_command_map() { return commands_null; } // ---------------------------- diff --git a/src/redox.hpp b/src/redox.hpp index 93ca115..dc6d4b4 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -70,17 +70,22 @@ public: // void subscribe(std::string channel, std::function callback); // void unsubscribe(std::string channel); - std::atomic_int commands_created = {0}; - std::atomic_int commands_deleted = {0}; + std::atomic_long commands_created = {0}; + std::atomic_long commands_deleted = {0}; - bool is_active_command(void* c_ptr) { - return active_commands.find(c_ptr) != active_commands.end(); + bool is_active_command(const long id) { + return active_commands.find(id) != active_commands.end(); } - void remove_active_command(void* c_ptr) { - active_commands.erase(c_ptr); + template + void remove_active_command(const long id) { + active_commands.erase(id); + get_command_map().erase(id); } + template + std::unordered_map*>& get_command_map(); + private: // Redox server @@ -103,25 +108,22 @@ private: std::thread event_loop_thread; - std::unordered_map*> commands_redis_reply; - std::unordered_map*> commands_string_r; - std::unordered_map*> commands_char_p; - std::unordered_map*> commands_int; - std::unordered_map*> commands_long_long_int; - std::unordered_map*> commands_null; - - template - std::unordered_map*>& get_command_map(); + std::unordered_map*> commands_redis_reply; + std::unordered_map*> commands_string_r; + std::unordered_map*> commands_char_p; + std::unordered_map*> commands_int; + std::unordered_map*> commands_long_long_int; + std::unordered_map*> commands_null; - std::queue command_queue; + std::queue command_queue; std::mutex queue_guard; void process_queued_commands(); template - bool process_queued_command(void* cmd_ptr); + bool process_queued_command(long id); - // Commands created but not yet deleted - std::unordered_set active_commands; + // Commands created but not yet deleted (stored by id) + std::unordered_set active_commands; }; // --------------------------- @@ -136,13 +138,18 @@ Command* Redox::command( bool free_memory ) { std::lock_guard lg(queue_guard); - auto* c = new Command(this, cmd, callback, error_callback, repeat, after, free_memory); - void* c_ptr = (void*)c; - get_command_map()[c_ptr] = c; - command_queue.push(c_ptr); - active_commands.insert(c_ptr); + commands_created += 1; -// std::cout << "[DEBUG] Created Command " << commands_created << " at " << c << std::endl; + auto* c = new Command(this, commands_created, cmd, + callback, error_callback, repeat, after, free_memory); + + get_command_map()[c->id] = c; + active_commands.insert(c->id); + command_queue.push(c->id); + std::cout << "[DEBUG] Created Command " << c->id << " at " << c << std::endl; + if(cmd == "GET simple_loop:count") { + std::cout << "Command created at " << c << ": " << c->cmd << std::endl; + } return c; } @@ -154,7 +161,7 @@ bool Redox::cancel(Command* c) { return false; } - std::cout << "[INFO] Canceling command at " << c << std::endl; + std::cout << "[INFO] Canceling command " << c->id << " at " << c << std::endl; c->completed = true; return true; @@ -174,6 +181,7 @@ Command* Redox::command_blocking(const std::string& cmd) { Command* c = command(cmd, [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) { std::unique_lock ul(m); + std::cout << "success callback: " << cmd_str << std::endl; val = reply; status = REDOX_OK; ul.unlock(); @@ -187,10 +195,10 @@ Command* Redox::command_blocking(const std::string& cmd) { }, 0, 0, false // No repeats, don't free memory ); - + std::cout << "command blocking cv wait starting" << std::endl; // Wait until a callback is invoked cv.wait(lk, [&status] { return status != REDOX_UNINIT; }); - + std::cout << "command blocking cv wait over" << std::endl; c->reply_val = val; c->reply_status = status;