From c2436e640194976559eb112d892af7c3675b852e Mon Sep 17 00:00:00 2001 From: Hayk Martirosyan Date: Mon, 29 Dec 2014 03:27:56 -0500 Subject: [PATCH] Clean Valgrind reports, improved speed examples --- CMakeLists.txt | 19 +++++++++++-------- examples/basic_threaded.cpp | 18 ++++++++++-------- examples/simple_loop.cpp | 74 -------------------------------------------------------------------------- examples/simple_sync_loop.cpp | 49 ------------------------------------------------- examples/speed_test_async.cpp | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ examples/speed_test_async_multi.cpp | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ examples/speed_test_sync.cpp | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/command.cpp | 2 +- src/command.hpp | 86 ++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------- src/redox.cpp | 139 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------- src/redox.hpp | 47 +++++++++++++++++++++++++++++++++++------------ 11 files changed, 414 insertions(+), 222 deletions(-) delete mode 100644 examples/simple_loop.cpp delete mode 100644 examples/simple_sync_loop.cpp create mode 100644 examples/speed_test_async.cpp create mode 100644 examples/speed_test_async_multi.cpp create mode 100644 examples/speed_test_sync.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d31d8ee..6b69882 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,8 +1,8 @@ cmake_minimum_required(VERSION 2.8.4) project(redox) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3") -#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer") +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer") # set(CMAKE_VERBOSE_MAKEFILE ON) # --------------------------------------------------------- @@ -35,14 +35,17 @@ set(LIB_ALL ${LIB_REDIS}) #add_executable(progressive examples/progressive.cpp ${SRC_ALL}) #target_link_libraries(progressive ${LIB_REDIS}) -#add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL}) -#target_link_libraries(basic_threaded ${LIB_REDIS}) +add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL}) +target_link_libraries(basic_threaded ${LIB_REDIS}) #add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL}) #target_link_libraries(lpush_benchmark ${LIB_REDIS}) -add_executable(simple_loop examples/simple_loop.cpp ${SRC_ALL}) -target_link_libraries(simple_loop ${LIB_REDIS}) +add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL}) +target_link_libraries(speed_test_async ${LIB_REDIS}) -add_executable(simple_sync_loop examples/simple_sync_loop.cpp ${SRC_ALL}) -target_link_libraries(simple_sync_loop ${LIB_REDIS}) +add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL}) +target_link_libraries(speed_test_sync ${LIB_REDIS}) + +add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL}) +target_link_libraries(speed_test_async_multi ${LIB_REDIS}) diff --git a/examples/basic_threaded.cpp b/examples/basic_threaded.cpp index e03abe3..7071d78 100644 --- a/examples/basic_threaded.cpp +++ b/examples/basic_threaded.cpp @@ -1,30 +1,30 @@ /** -* Basic asynchronous calls using redisx. +* */ #include -#include #include -#include "../src/redisx.hpp" +#include +#include "../src/redox.hpp" using namespace std; -redisx::Redis rdx = {"localhost", 6379}; +redox::Redox rdx = {"localhost", 6379}; int main(int argc, char* argv[]) { rdx.run(); thread setter([]() { - while(true) { - rdx.command("INCR counter"); + for(int i = 0; i < 5000; i++) { + rdx.command("INCR counter"); this_thread::sleep_for(chrono::milliseconds(1)); } }); thread getter([]() { - while(true) { - rdx.command( + for(int i = 0; i < 5; i++) { + rdx.command( "GET counter", [](const string& cmd, const string& value) { cout << cmd << ": " << value << endl; @@ -37,5 +37,7 @@ int main(int argc, char* argv[]) { setter.join(); getter.join(); + rdx.stop(); + return 0; }; diff --git a/examples/simple_loop.cpp b/examples/simple_loop.cpp deleted file mode 100644 index 4d44700..0000000 --- a/examples/simple_loop.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/** -* Basic asynchronous calls using redox. -*/ - -#include -#include "../src/redox.hpp" - -using namespace std; -using namespace redox; - -double time_s() { - unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); - return (double)ms / 1e6; -} - -int main(int argc, char* argv[]) { - - Redox rdx = {"localhost", 6379}; - rdx.run(); - - if(rdx.command_blocking("DEL simple_loop:count")) cout << "Deleted simple_loop:count" << endl; - else cerr << "Failed to delete simple_loop:count" << endl; - - Command* null_cmd = rdx.command_blocking("GET WFEOIEFJ"); - if(null_cmd->status() == REDOX_OK) cout << "got nonexistent key." << endl; - else cerr << "error with null cmd: " << null_cmd->status() << endl; - null_cmd->free(); - - Command* set_cmd = rdx.command_blocking("SET simple_loop:count 0"); - cout << "set key, reply: " << set_cmd->reply() << endl; - set_cmd->free(); - - Command* count_cmd = rdx.command_blocking("GET simple_loop:count"); - if(count_cmd->status() == REDOX_OK) { - cout << "At the start, simple_loop:count = " << count_cmd->reply() << endl; - } - count_cmd->free(); - - string cmd_str = "INCR simple_loop:count"; - - double freq = 10000; // Hz - double dt = 1 / freq; // s - double t = 3; // s - - cout << "Running \"" << cmd_str << "\" at dt = " << dt - << "s for " << t << "s..." << endl; - - atomic_int count(0); - Command* c = rdx.command( - cmd_str, - [&count](const string &cmd, const int& value) { count++; }, - NULL, - dt, - 0 - ); - - double t0 = time_s(); - this_thread::sleep_for(chrono::microseconds((int)(t*1e6))); - rdx.cancel(c); - - cout << "At the end, simple_loop:count = " - << rdx.command_blocking("GET simple_loop:count")->reply() << endl; - - rdx.stop(); - - double t_elapsed = time_s() - t0; - double actual_freq = (double)count / t_elapsed; - - cout << "Sent " << count << " commands in " << t_elapsed << "s, " - << "that's " << actual_freq << " commands/s." << endl; - - cout << "rdx.num_commands_processed() = " << rdx.num_commands_processed() << endl; - return 0; -} diff --git a/examples/simple_sync_loop.cpp b/examples/simple_sync_loop.cpp deleted file mode 100644 index 98bb63a..0000000 --- a/examples/simple_sync_loop.cpp +++ /dev/null @@ -1,49 +0,0 @@ -/** -* Basic synchronous calls using redox. -*/ - -#include -#include "../src/redox.hpp" - -using namespace std; -using namespace redox; - -double time_s() { - unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); - return (double)ms / 1e6; -} - -int main(int argc, char* argv[]) { - - Redox rdx = {"localhost", 6379}; - rdx.run(); - - if(rdx.command_blocking("DEL simple_loop:count")) cout << "Deleted simple_loop:count" << endl; - else cerr << "Failed to delete simple_loop:count" << endl; - - string cmd_str = "INCR simple_loop:count"; - - int count = 50000; - double t0 = time_s(); - - cout << "Running \"" << cmd_str << "\" " << count << " times." << endl; - - for(int i = 0; i < count; i++) { - Command* c = rdx.command_blocking(cmd_str); - if(c->status() != REDOX_OK) cerr << "Bad reply, code: " << c->status() << endl; - } - - cout << "At the end, simple_loop:count = " - << rdx.command_blocking("GET simple_loop:count")->reply() << endl; - - rdx.stop(); - - double t_elapsed = time_s() - t0; - double actual_freq = (double)count / t_elapsed; - - cout << "Sent " << count << " commands in " << t_elapsed << "s, " - << "that's " << actual_freq << " commands/s." << endl; - - cout << "rdx.num_commands_processed() = " << rdx.num_commands_processed() << endl; - return 0; -} diff --git a/examples/speed_test_async.cpp b/examples/speed_test_async.cpp new file mode 100644 index 0000000..eca4ff8 --- /dev/null +++ b/examples/speed_test_async.cpp @@ -0,0 +1,68 @@ +/** +* Redox test +* ---------- +* Increment a key on Redis using an asynchronous command on a timer. +*/ + +#include +#include "../src/redox.hpp" + +using namespace std; +using namespace redox; + +double time_s() { + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); + return (double)ms / 1e6; +} + +int main(int argc, char* argv[]) { + + Redox rdx = {"localhost", 6379}; + rdx.run(); + + if(rdx.command_blocking("SET simple_loop:count 0")) { + cout << "Reset the counter to zero." << endl; + } else { + cerr << "Failed to reset counter." << endl; + return 1; + } + + string cmd_str = "INCR simple_loop:count"; + double freq = 400000; // Hz + double dt = 1 / freq; // s + double t = 5; // s + + cout << "Sending \"" << cmd_str << "\" asynchronously every " + << dt << "s for " << t << "s..." << endl; + + double t0 = time_s(); + atomic_int count(0); + + Command* c = rdx.command( + cmd_str, + [&count, &rdx](const string &cmd, const int& value) { count++; }, + [](const string& cmd, int status) { cerr << "Bad reply: " << status << endl; }, + dt + ); + + // Wait for t time, then stop the command. + this_thread::sleep_for(chrono::microseconds((int)(t*1e6))); + rdx.cancel(c); + + // Get the final value of the counter + auto get_cmd = rdx.command_blocking("GET simple_loop:count"); + long final_count = stol(get_cmd->reply()); + get_cmd->free(); + + rdx.stop(); + + double t_elapsed = time_s() - t0; + double actual_freq = (double)count / t_elapsed; + + cout << "Sent " << count << " commands in " << t_elapsed << "s, " + << "that's " << actual_freq << " commands/s." << endl; + + cout << "Final value of counter: " << final_count << endl; + + return 0; +} diff --git a/examples/speed_test_async_multi.cpp b/examples/speed_test_async_multi.cpp new file mode 100644 index 0000000..a2c6c4a --- /dev/null +++ b/examples/speed_test_async_multi.cpp @@ -0,0 +1,73 @@ +/** +* Redox test +* ---------- +* Increment a key on Redis using an asynchronous command on a timer. +*/ + +#include +#include +#include "../src/redox.hpp" + +using namespace std; +using namespace redox; + +double time_s() { + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); + return (double)ms / 1e6; +} + +int main(int argc, char* argv[]) { + + Redox rdx = {"localhost", 6379}; + rdx.run(); + + if(rdx.command_blocking("SET simple_loop:count 0")) { + cout << "Reset the counter to zero." << endl; + } else { + cerr << "Failed to reset counter." << endl; + return 1; + } + + string cmd_str = "INCR simple_loop:count"; + double freq = 10000; // Hz + double dt = 1 / freq; // s + double t = 5; // s + int parallel = 100; + + cout << "Sending \"" << cmd_str << "\" asynchronously every " + << dt << "s for " << t << "s..." << endl; + + double t0 = time_s(); + atomic_int count(0); + + vector*> commands; + for(int i = 0; i < parallel; i++) { + commands.push_back(rdx.command( + cmd_str, + [&count, &rdx](const string &cmd, const int& value) { count++; }, + [](const string& cmd, int status) { cerr << "Bad reply: " << status << endl; }, + dt + )); + } + + // Wait for t time, then stop the command. + this_thread::sleep_for(chrono::microseconds((int)(t*1e6))); + for(auto c : commands) rdx.cancel(c); + + // Get the final value of the counter + auto get_cmd = rdx.command_blocking("GET simple_loop:count"); + long final_count = stol(get_cmd->reply()); + get_cmd->free(); + + rdx.stop(); + + double t_elapsed = time_s() - t0; + double actual_freq = (double)count / t_elapsed; + + cout << "Sent " << count << " commands in " << t_elapsed << "s, " + << "that's " << actual_freq << " commands/s." << endl; + + cout << "Final value of counter: " << final_count << endl; + + return 0; +} diff --git a/examples/speed_test_sync.cpp b/examples/speed_test_sync.cpp new file mode 100644 index 0000000..891a8b7 --- /dev/null +++ b/examples/speed_test_sync.cpp @@ -0,0 +1,61 @@ +/** +* Redox test +* ---------- +* Increment a key on Redis using synchronous commands in a loop. +*/ + +#include +#include "../src/redox.hpp" + +using namespace std; +using namespace redox; + +double time_s() { + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); + return (double)ms / 1e6; +} + +int main(int argc, char* argv[]) { + + Redox rdx = {"localhost", 6379}; + rdx.run(); + + if(rdx.command_blocking("SET simple_loop:count 0")) { + cout << "Reset the counter to zero." << endl; + } else { + cerr << "Failed to reset counter." << endl; + return 1; + } + + string cmd_str = "INCR simple_loop:count"; + double t = 5; // s + + cout << "Sending \"" << cmd_str << "\" synchronously for " << t << "s..." << endl; + + double t0 = time_s(); + double t_end = t0 + t; + int count = 0; + + while(time_s() < t_end) { + Command* c = rdx.command_blocking(cmd_str); + if(c->status() != REDOX_OK) cerr << "Bad reply, code: " << c->status() << endl; + c->free(); + count++; + } + + auto get_cmd = rdx.command_blocking("GET simple_loop:count"); + long final_count = stol(get_cmd->reply()); + get_cmd->free(); + + rdx.stop(); + + double t_elapsed = time_s() - t0; + double actual_freq = (double)count / t_elapsed; + + cout << "Sent " << count << " commands in " << t_elapsed << "s, " + << "that's " << actual_freq << " commands/s." << endl; + + cout << "Final value of counter: " << final_count << endl; + + return 0; +} diff --git a/src/command.cpp b/src/command.cpp index d83851c..5ec4595 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -64,7 +64,7 @@ void Command::invoke_callback() { template<> void Command::invoke_callback() { - +// std::cout << "invoking int callback" << std::endl; if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY); diff --git a/src/command.hpp b/src/command.hpp index 97ab117..56ebd06 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -26,10 +26,15 @@ static const int REDOX_TIMEOUT = 5; class Redox; template +void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents); + +template class Command { friend class Redox; +friend void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents); + public: Command( Redox* rdx, @@ -48,15 +53,16 @@ public: const bool free_memory; - redisReply* reply_obj; + redisReply* reply_obj = nullptr; - std::atomic_int pending; + std::atomic_int pending = {0}; void invoke(const ReplyT& reply); void invoke_error(int status); const ReplyT& reply(); int status() { return reply_status; }; + bool is_completed() { return completed; } /** * Called by the user to free the redisReply object, when the free_memory @@ -64,7 +70,7 @@ public: */ void free(); - static void command_callback(redisAsyncContext *c, void *r, void *privdata); + void process_reply(); private: @@ -76,7 +82,7 @@ private: ReplyT reply_val; int reply_status; - std::atomic_bool completed; + std::atomic_bool completed = {false}; ev_timer timer; std::mutex timer_guard; @@ -86,7 +92,11 @@ private: return &timer; } + // Make sure we don't free resources until details taken care of + std::mutex free_guard; + void free_reply_object(); + static void free_command(Command* c); void invoke_callback(); bool is_error_reply(); @@ -100,74 +110,74 @@ Command::Command( const std::function& callback, const std::function& error_callback, double repeat, double after, bool free_memory -) : rdx(rdx), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), reply_obj(NULL), - pending(0), callback(callback), error_callback(error_callback), completed(false) +) : rdx(rdx), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), + callback(callback), error_callback(error_callback) { timer_guard.lock(); } template -void Command::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { +void Command::process_reply() { + + free_guard.lock(); + + invoke_callback(); - auto *c = (Command *) privdata; - c->reply_obj = (redisReply *) r; - c->invoke_callback(); + pending--; + + if(!free_memory) { + // Allow free() method to free memory + free_guard.unlock(); + return; + } - // Free the reply object unless told not to - if(c->free_memory) c->free_reply_object(); + free_reply_object(); - // Increment the Redox object command counter - c->rdx->incr_cmd_count(); + if((pending == 0) && (repeat == 0)) { + free_command(this); + } else { + free_guard.unlock(); + } } template void Command::invoke(const ReplyT& r) { - if(callback) callback(cmd, r); - - pending--; - if(!free_memory) return; - if(pending != 0) return; - if(completed || (repeat == 0)) { -// std::cout << cmd << ": suicide!" << std::endl; - delete this; - } } template void Command::invoke_error(int status) { - if(error_callback) error_callback(cmd, status); - - pending--; - if(!free_memory) return; - if(pending != 0) return; - if(completed || (repeat == 0)) { -// std::cout << cmd << ": suicide!" << std::endl; - delete this; - } } template void Command::free_reply_object() { - if(reply_obj == NULL) { - std::cerr << "[ERROR] " << cmd << ": Attempting to double free reply object!" << std::endl; + if(reply_obj == nullptr) { + std::cerr << "[ERROR] " << cmd << ": Attempting to double free reply object." << std::endl; return; } freeReplyObject(reply_obj); - reply_obj = NULL; + reply_obj = nullptr; +} + +template +void Command::free_command(Command* c) { + c->rdx->commands_deleted += 1; + c->rdx->remove_active_command(c); +// std::cout << "[INFO] Deleted Command " << c->rdx->commands_created << " at " << c << std::endl; + delete c; } template void Command::free() { + free_guard.lock(); free_reply_object(); + free_guard.unlock(); - // Commit suicide -// std::cout << cmd << ": suicide, by calling free()!" << std::endl; - delete this; + free_command(this); } template diff --git a/src/redox.cpp b/src/redox.cpp index fa8f5f2..8dfd441 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -3,48 +3,47 @@ */ #include -#include #include "redox.hpp" using namespace std; namespace redox { -// Global mutex to manage waiting for connected state -// TODO get rid of this as the only global variable? -mutex connected_lock; +void Redox::connected(const redisAsyncContext *ctx, int status) { -void Redox::connected(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { - cerr << "[ERROR] Connecting to Redis: " << c->errstr << endl; + cerr << "[ERROR] Connecting to Redis: " << ctx->errstr << endl; return; } // Disable hiredis automatically freeing reply objects - c->c.reader->fn->freeObject = [](void* reply) {}; + ctx->c.reader->fn->freeObject = [](void* reply) {}; - cout << "Connected to Redis." << endl; - connected_lock.unlock(); + Redox* rdx = (Redox*) ctx->data; + rdx->connected_lock.unlock(); + + cout << "[INFO] Connected to Redis." << endl; } -void Redox::disconnected(const redisAsyncContext *c, int status) { +void Redox::disconnected(const redisAsyncContext *ctx, int status) { if (status != REDIS_OK) { - cerr << "[ERROR] Disconnecting from Redis: " << c->errstr << endl; + cerr << "[ERROR] Disconnecting from Redis: " << ctx->errstr << endl; return; } // Re-enable hiredis automatically freeing reply objects - c->c.reader->fn->freeObject = freeReplyObject; + ctx->c.reader->fn->freeObject = freeReplyObject; - cout << "Disconnected from Redis." << endl; - connected_lock.lock(); + Redox* rdx = (Redox*) ctx->data; + rdx->connected_lock.unlock(); + + cout << "[INFO] Disconnected from Redis." << endl; } Redox::Redox(const string& host, const int port) - : host(host), port(port), cmd_count(0), to_exit(false) { + : host(host), port(port) { lock_guard lg(queue_guard); - connected_lock.lock(); signal(SIGPIPE, SIG_IGN); @@ -54,48 +53,115 @@ Redox::Redox(const string& host, const int port) return; } - redisLibevAttach(EV_DEFAULT_ ctx); + evloop = ev_loop_new(EVFLAG_AUTO); + ev_set_userdata(evloop, (void*)this); + + redisLibevAttach(evloop, ctx); redisAsyncSetConnectCallback(ctx, Redox::connected); redisAsyncSetDisconnectCallback(ctx, Redox::disconnected); + + ctx->data = (void*)this; + connected_lock.lock(); } Redox::~Redox() { + +// cout << "Queue sizes: " << endl; +// cout << commands_redis_reply.size() << endl; +// cout << commands_string_r.size() << endl; +// cout << commands_char_p.size() << endl; +// cout << commands_int.size() << endl; +// cout << commands_long_long_int.size() << endl; +// cout << commands_null.size() << endl; + redisAsyncDisconnect(ctx); + stop(); + + if(event_loop_thread.joinable()) + event_loop_thread.join(); + + ev_loop_destroy(evloop); + + std::cout << "[INFO] Redox created " << commands_created + << " Commands and freed " << commands_deleted << "." << std::endl; } void Redox::run_blocking() { // Events to connect to Redox - ev_run(EV_DEFAULT_ EVRUN_NOWAIT); - lock_guard lg(connected_lock); + ev_run(evloop, EVRUN_NOWAIT); + + // Block until connected to Redis + connected_lock.lock(); + connected_lock.unlock(); // Continuously create events and handle them while (!to_exit) { process_queued_commands(); - ev_run(EV_DEFAULT_ EVRUN_NOWAIT); + ev_run(evloop, EVRUN_NOWAIT); } - // Handle exit events - ev_run(EV_DEFAULT_ EVRUN_NOWAIT); + cout << "[INFO] Stop signal detected." << endl; + + // Run a few more times to clear out canceled events +// for(int i = 0; i < 100; i++) { +// ev_run(evloop, EVRUN_NOWAIT); +// } + + // Run until all commands are processed + do { + ev_run(evloop, EVRUN_NOWAIT); + } while(commands_created != commands_deleted); + + exited = true; // Let go for block_until_stopped method exit_waiter.notify_one(); + + cout << "[INFO] Event thread exited." << endl; } void Redox::run() { event_loop_thread = thread([this] { run_blocking(); }); - event_loop_thread.detach(); + + // Don't return until connected + lock_guard lg(connected_lock); } -void Redox::stop() { +void Redox::stop_signal() { to_exit = true; } void Redox::block() { unique_lock ul(exit_waiter_lock); - exit_waiter.wait(ul, [this]() { return to_exit.load(); }); + exit_waiter.wait(ul, [this] { return exited.load(); }); +} + +void Redox::stop() { + stop_signal(); + block(); +} + +template +void command_callback(redisAsyncContext *ctx, void *r, void *privdata) { + + Command* c = (Command*) privdata; + redisReply* reply_obj = (redisReply*) r; + Redox* rdx = (Redox*) ctx->data; + + if(!rdx->is_active_command(c)) { + std::cout << "[INFO] Ignoring callback, command " << c << " was freed." << std::endl; + freeReplyObject(r); + return; + } + + c->reply_obj = reply_obj; + c->process_reply(); + + // Increment the Redox object command counter + rdx->incr_cmd_count(); } /** @@ -105,7 +171,7 @@ void Redox::block() { template bool submit_to_server(Command* c) { c->pending++; - if (redisAsyncCommand(c->rdx->ctx, c->command_callback, (void*)c, c->cmd.c_str()) != REDIS_OK) { + if (redisAsyncCommand(c->rdx->ctx, command_callback, (void*)c, c->cmd.c_str()) != REDIS_OK) { cerr << "[ERROR] Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr << endl; c->invoke_error(REDOX_SEND_ERROR); return false; @@ -116,13 +182,22 @@ bool submit_to_server(Command* c) { template void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { - // Check if canceled - if(timer->data == NULL) { - cerr << "[WARNING] Skipping event, has been canceled." << endl; + auto c = (Command*)timer->data; + + if(c->is_completed()) { + + cerr << "[INFO] Command " << c << " is completed, stopping event timer." << endl; + + c->timer_guard.lock(); + if((c->repeat != 0) || (c->after != 0)) + ev_timer_stop(loop, &c->timer); + c->timer_guard.unlock(); + + Command::free_command(c); + return; } - auto c = (Command*)timer->data; submit_to_server(c); } @@ -143,7 +218,7 @@ bool Redox::process_queued_command(void* c_ptr) { c->timer.data = (void*)c; ev_timer_init(&c->timer, submit_command_callback, c->after, c->repeat); - ev_timer_start(EV_DEFAULT_ &c->timer); + ev_timer_start(evloop, &c->timer); c->timer_guard.unlock(); } @@ -195,7 +270,7 @@ Redox::get_command_map() { return commands_null; } // ---------------------------- void Redox::command(const string& cmd) { - command(cmd, NULL); + command(cmd); } bool Redox::command_blocking(const string& cmd) { diff --git a/src/redox.hpp b/src/redox.hpp index aef5d66..93ca115 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -35,8 +36,10 @@ public: void run(); void run_blocking(); - void stop(); + + void stop_signal(); void block(); + void stop(); template Command* command( @@ -67,16 +70,34 @@ public: // void subscribe(std::string channel, std::function callback); // void unsubscribe(std::string channel); + std::atomic_int commands_created = {0}; + std::atomic_int commands_deleted = {0}; + + bool is_active_command(void* c_ptr) { + return active_commands.find(c_ptr) != active_commands.end(); + } + + void remove_active_command(void* c_ptr) { + active_commands.erase(c_ptr); + } + private: // Redox server std::string host; int port; + // Block run() until redis is connected + std::mutex connected_lock; + + // Dynamically allocated libev event loop + struct ev_loop* evloop; + // Number of commands processed - std::atomic_long cmd_count; + std::atomic_long cmd_count = {0}; - std::atomic_bool to_exit; + std::atomic_bool to_exit = {false}; // Signal to exit + std::atomic_bool exited = {false}; // Event thread exited std::mutex exit_waiter_lock; std::condition_variable exit_waiter; @@ -98,6 +119,9 @@ private: template bool process_queued_command(void* cmd_ptr); + + // Commands created but not yet deleted + std::unordered_set active_commands; }; // --------------------------- @@ -113,25 +137,24 @@ Command* Redox::command( ) { std::lock_guard lg(queue_guard); auto* c = new Command(this, cmd, callback, error_callback, repeat, after, free_memory); - get_command_map()[(void*)c] = c; - command_queue.push((void*)c); + void* c_ptr = (void*)c; + get_command_map()[c_ptr] = c; + command_queue.push(c_ptr); + active_commands.insert(c_ptr); + commands_created += 1; +// std::cout << "[DEBUG] Created Command " << commands_created << " at " << c << std::endl; return c; } template bool Redox::cancel(Command* c) { - if(c == NULL) { + if(c == nullptr) { std::cerr << "[ERROR] Canceling null command." << std::endl; return false; } - c->timer.data = NULL; - - std::lock_guard lg(c->timer_guard); - if((c->repeat != 0) || (c->after != 0)) - ev_timer_stop(EV_DEFAULT_ &c->timer); - + std::cout << "[INFO] Canceling command at " << c << std::endl; c->completed = true; return true; -- libgit2 0.21.4