Commit c2436e640194976559eb112d892af7c3675b852e

Authored by Hayk Martirosyan
1 parent 46752c1e

Clean Valgrind reports, improved speed examples

Take that, hours of debugging memory leaks! Did lots of structural
tweaks to more smartly keep track of commands and make sure everything
is evenutally freed from the heap.
CMakeLists.txt
1 1 cmake_minimum_required(VERSION 2.8.4)
2 2 project(redox)
3 3  
4   -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3")
5   -#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer")
  4 +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3")
  5 +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer")
6 6 # set(CMAKE_VERBOSE_MAKEFILE ON)
7 7  
8 8 # ---------------------------------------------------------
... ... @@ -35,14 +35,17 @@ set(LIB_ALL ${LIB_REDIS})
35 35 #add_executable(progressive examples/progressive.cpp ${SRC_ALL})
36 36 #target_link_libraries(progressive ${LIB_REDIS})
37 37  
38   -#add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL})
39   -#target_link_libraries(basic_threaded ${LIB_REDIS})
  38 +add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL})
  39 +target_link_libraries(basic_threaded ${LIB_REDIS})
40 40  
41 41 #add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL})
42 42 #target_link_libraries(lpush_benchmark ${LIB_REDIS})
43 43  
44   -add_executable(simple_loop examples/simple_loop.cpp ${SRC_ALL})
45   -target_link_libraries(simple_loop ${LIB_REDIS})
  44 +add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL})
  45 +target_link_libraries(speed_test_async ${LIB_REDIS})
46 46  
47   -add_executable(simple_sync_loop examples/simple_sync_loop.cpp ${SRC_ALL})
48   -target_link_libraries(simple_sync_loop ${LIB_REDIS})
  47 +add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL})
  48 +target_link_libraries(speed_test_sync ${LIB_REDIS})
  49 +
  50 +add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL})
  51 +target_link_libraries(speed_test_async_multi ${LIB_REDIS})
... ...
examples/basic_threaded.cpp
1 1 /**
2   -* Basic asynchronous calls using redisx.
  2 +*
3 3 */
4 4  
5 5 #include <iostream>
6   -#include <thread>
7 6 #include <chrono>
8   -#include "../src/redisx.hpp"
  7 +#include <thread>
  8 +#include "../src/redox.hpp"
9 9  
10 10 using namespace std;
11 11  
12   -redisx::Redis rdx = {"localhost", 6379};
  12 +redox::Redox rdx = {"localhost", 6379};
13 13  
14 14 int main(int argc, char* argv[]) {
15 15  
16 16 rdx.run();
17 17  
18 18 thread setter([]() {
19   - while(true) {
20   - rdx.command("INCR counter");
  19 + for(int i = 0; i < 5000; i++) {
  20 + rdx.command<int>("INCR counter");
21 21 this_thread::sleep_for(chrono::milliseconds(1));
22 22 }
23 23 });
24 24  
25 25 thread getter([]() {
26   - while(true) {
27   - rdx.command<const string &>(
  26 + for(int i = 0; i < 5; i++) {
  27 + rdx.command<string>(
28 28 "GET counter",
29 29 [](const string& cmd, const string& value) {
30 30 cout << cmd << ": " << value << endl;
... ... @@ -37,5 +37,7 @@ int main(int argc, char* argv[]) {
37 37 setter.join();
38 38 getter.join();
39 39  
  40 + rdx.stop();
  41 +
40 42 return 0;
41 43 };
... ...
examples/simple_loop.cpp renamed to examples/speed_test_async.cpp
1 1 /**
2   -* Basic asynchronous calls using redox.
  2 +* Redox test
  3 +* ----------
  4 +* Increment a key on Redis using an asynchronous command on a timer.
3 5 */
4 6  
5 7 #include <iostream>
... ... @@ -18,48 +20,39 @@ int main(int argc, char* argv[]) {
18 20 Redox rdx = {"localhost", 6379};
19 21 rdx.run();
20 22  
21   - if(rdx.command_blocking("DEL simple_loop:count")) cout << "Deleted simple_loop:count" << endl;
22   - else cerr << "Failed to delete simple_loop:count" << endl;
23   -
24   - Command<nullptr_t>* null_cmd = rdx.command_blocking<nullptr_t>("GET WFEOIEFJ");
25   - if(null_cmd->status() == REDOX_OK) cout << "got nonexistent key." << endl;
26   - else cerr << "error with null cmd: " << null_cmd->status() << endl;
27   - null_cmd->free();
28   -
29   - Command<char*>* set_cmd = rdx.command_blocking<char*>("SET simple_loop:count 0");
30   - cout << "set key, reply: " << set_cmd->reply() << endl;
31   - set_cmd->free();
32   -
33   - Command<char*>* count_cmd = rdx.command_blocking<char*>("GET simple_loop:count");
34   - if(count_cmd->status() == REDOX_OK) {
35   - cout << "At the start, simple_loop:count = " << count_cmd->reply() << endl;
  23 + if(rdx.command_blocking("SET simple_loop:count 0")) {
  24 + cout << "Reset the counter to zero." << endl;
  25 + } else {
  26 + cerr << "Failed to reset counter." << endl;
  27 + return 1;
36 28 }
37   - count_cmd->free();
38 29  
39 30 string cmd_str = "INCR simple_loop:count";
40   -
41   - double freq = 10000; // Hz
  31 + double freq = 400000; // Hz
42 32 double dt = 1 / freq; // s
43   - double t = 3; // s
  33 + double t = 5; // s
44 34  
45   - cout << "Running \"" << cmd_str << "\" at dt = " << dt
46   - << "s for " << t << "s..." << endl;
  35 + cout << "Sending \"" << cmd_str << "\" asynchronously every "
  36 + << dt << "s for " << t << "s..." << endl;
47 37  
  38 + double t0 = time_s();
48 39 atomic_int count(0);
  40 +
49 41 Command<int>* c = rdx.command<int>(
50 42 cmd_str,
51   - [&count](const string &cmd, const int& value) { count++; },
52   - NULL,
53   - dt,
54   - 0
  43 + [&count, &rdx](const string &cmd, const int& value) { count++; },
  44 + [](const string& cmd, int status) { cerr << "Bad reply: " << status << endl; },
  45 + dt
55 46 );
56 47  
57   - double t0 = time_s();
  48 + // Wait for t time, then stop the command.
58 49 this_thread::sleep_for(chrono::microseconds((int)(t*1e6)));
59 50 rdx.cancel(c);
60 51  
61   - cout << "At the end, simple_loop:count = "
62   - << rdx.command_blocking<string>("GET simple_loop:count")->reply() << endl;
  52 + // Get the final value of the counter
  53 + auto get_cmd = rdx.command_blocking<string>("GET simple_loop:count");
  54 + long final_count = stol(get_cmd->reply());
  55 + get_cmd->free();
63 56  
64 57 rdx.stop();
65 58  
... ... @@ -69,6 +62,7 @@ int main(int argc, char* argv[]) {
69 62 cout << "Sent " << count << " commands in " << t_elapsed << "s, "
70 63 << "that's " << actual_freq << " commands/s." << endl;
71 64  
72   - cout << "rdx.num_commands_processed() = " << rdx.num_commands_processed() << endl;
  65 + cout << "Final value of counter: " << final_count << endl;
  66 +
73 67 return 0;
74 68 }
... ...
examples/speed_test_async_multi.cpp 0 โ†’ 100644
  1 +/**
  2 +* Redox test
  3 +* ----------
  4 +* Increment a key on Redis using an asynchronous command on a timer.
  5 +*/
  6 +
  7 +#include <iostream>
  8 +#include <vector>
  9 +#include "../src/redox.hpp"
  10 +
  11 +using namespace std;
  12 +using namespace redox;
  13 +
  14 +double time_s() {
  15 + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1);
  16 + return (double)ms / 1e6;
  17 +}
  18 +
  19 +int main(int argc, char* argv[]) {
  20 +
  21 + Redox rdx = {"localhost", 6379};
  22 + rdx.run();
  23 +
  24 + if(rdx.command_blocking("SET simple_loop:count 0")) {
  25 + cout << "Reset the counter to zero." << endl;
  26 + } else {
  27 + cerr << "Failed to reset counter." << endl;
  28 + return 1;
  29 + }
  30 +
  31 + string cmd_str = "INCR simple_loop:count";
  32 + double freq = 10000; // Hz
  33 + double dt = 1 / freq; // s
  34 + double t = 5; // s
  35 + int parallel = 100;
  36 +
  37 + cout << "Sending \"" << cmd_str << "\" asynchronously every "
  38 + << dt << "s for " << t << "s..." << endl;
  39 +
  40 + double t0 = time_s();
  41 + atomic_int count(0);
  42 +
  43 + vector<Command<int>*> commands;
  44 + for(int i = 0; i < parallel; i++) {
  45 + commands.push_back(rdx.command<int>(
  46 + cmd_str,
  47 + [&count, &rdx](const string &cmd, const int& value) { count++; },
  48 + [](const string& cmd, int status) { cerr << "Bad reply: " << status << endl; },
  49 + dt
  50 + ));
  51 + }
  52 +
  53 + // Wait for t time, then stop the command.
  54 + this_thread::sleep_for(chrono::microseconds((int)(t*1e6)));
  55 + for(auto c : commands) rdx.cancel(c);
  56 +
  57 + // Get the final value of the counter
  58 + auto get_cmd = rdx.command_blocking<string>("GET simple_loop:count");
  59 + long final_count = stol(get_cmd->reply());
  60 + get_cmd->free();
  61 +
  62 + rdx.stop();
  63 +
  64 + double t_elapsed = time_s() - t0;
  65 + double actual_freq = (double)count / t_elapsed;
  66 +
  67 + cout << "Sent " << count << " commands in " << t_elapsed << "s, "
  68 + << "that's " << actual_freq << " commands/s." << endl;
  69 +
  70 + cout << "Final value of counter: " << final_count << endl;
  71 +
  72 + return 0;
  73 +}
... ...
examples/simple_sync_loop.cpp renamed to examples/speed_test_sync.cpp
1 1 /**
2   -* Basic synchronous calls using redox.
  2 +* Redox test
  3 +* ----------
  4 +* Increment a key on Redis using synchronous commands in a loop.
3 5 */
4 6  
5 7 #include <iostream>
... ... @@ -18,23 +20,32 @@ int main(int argc, char* argv[]) {
18 20 Redox rdx = {"localhost", 6379};
19 21 rdx.run();
20 22  
21   - if(rdx.command_blocking("DEL simple_loop:count")) cout << "Deleted simple_loop:count" << endl;
22   - else cerr << "Failed to delete simple_loop:count" << endl;
  23 + if(rdx.command_blocking("SET simple_loop:count 0")) {
  24 + cout << "Reset the counter to zero." << endl;
  25 + } else {
  26 + cerr << "Failed to reset counter." << endl;
  27 + return 1;
  28 + }
23 29  
24 30 string cmd_str = "INCR simple_loop:count";
  31 + double t = 5; // s
25 32  
26   - int count = 50000;
27   - double t0 = time_s();
  33 + cout << "Sending \"" << cmd_str << "\" synchronously for " << t << "s..." << endl;
28 34  
29   - cout << "Running \"" << cmd_str << "\" " << count << " times." << endl;
  35 + double t0 = time_s();
  36 + double t_end = t0 + t;
  37 + int count = 0;
30 38  
31   - for(int i = 0; i < count; i++) {
  39 + while(time_s() < t_end) {
32 40 Command<int>* c = rdx.command_blocking<int>(cmd_str);
33 41 if(c->status() != REDOX_OK) cerr << "Bad reply, code: " << c->status() << endl;
  42 + c->free();
  43 + count++;
34 44 }
35 45  
36   - cout << "At the end, simple_loop:count = "
37   - << rdx.command_blocking<string>("GET simple_loop:count")->reply() << endl;
  46 + auto get_cmd = rdx.command_blocking<string>("GET simple_loop:count");
  47 + long final_count = stol(get_cmd->reply());
  48 + get_cmd->free();
38 49  
39 50 rdx.stop();
40 51  
... ... @@ -42,8 +53,9 @@ int main(int argc, char* argv[]) {
42 53 double actual_freq = (double)count / t_elapsed;
43 54  
44 55 cout << "Sent " << count << " commands in " << t_elapsed << "s, "
45   - << "that's " << actual_freq << " commands/s." << endl;
  56 + << "that's " << actual_freq << " commands/s." << endl;
  57 +
  58 + cout << "Final value of counter: " << final_count << endl;
46 59  
47   - cout << "rdx.num_commands_processed() = " << rdx.num_commands_processed() << endl;
48 60 return 0;
49 61 }
... ...
src/command.cpp
... ... @@ -64,7 +64,7 @@ void Command&lt;char*&gt;::invoke_callback() {
64 64  
65 65 template<>
66 66 void Command<int>::invoke_callback() {
67   -
  67 +// std::cout << "invoking int callback" << std::endl;
68 68 if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY);
69 69 else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY);
70 70  
... ...
src/command.hpp
... ... @@ -26,10 +26,15 @@ static const int REDOX_TIMEOUT = 5;
26 26 class Redox;
27 27  
28 28 template<class ReplyT>
  29 +void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents);
  30 +
  31 +template<class ReplyT>
29 32 class Command {
30 33  
31 34 friend class Redox;
32 35  
  36 +friend void submit_command_callback<ReplyT>(struct ev_loop* loop, ev_timer* timer, int revents);
  37 +
33 38 public:
34 39 Command(
35 40 Redox* rdx,
... ... @@ -48,15 +53,16 @@ public:
48 53  
49 54 const bool free_memory;
50 55  
51   - redisReply* reply_obj;
  56 + redisReply* reply_obj = nullptr;
52 57  
53   - std::atomic_int pending;
  58 + std::atomic_int pending = {0};
54 59  
55 60 void invoke(const ReplyT& reply);
56 61 void invoke_error(int status);
57 62  
58 63 const ReplyT& reply();
59 64 int status() { return reply_status; };
  65 + bool is_completed() { return completed; }
60 66  
61 67 /**
62 68 * Called by the user to free the redisReply object, when the free_memory
... ... @@ -64,7 +70,7 @@ public:
64 70 */
65 71 void free();
66 72  
67   - static void command_callback(redisAsyncContext *c, void *r, void *privdata);
  73 + void process_reply();
68 74  
69 75 private:
70 76  
... ... @@ -76,7 +82,7 @@ private:
76 82 ReplyT reply_val;
77 83 int reply_status;
78 84  
79   - std::atomic_bool completed;
  85 + std::atomic_bool completed = {false};
80 86  
81 87 ev_timer timer;
82 88 std::mutex timer_guard;
... ... @@ -86,7 +92,11 @@ private:
86 92 return &timer;
87 93 }
88 94  
  95 + // Make sure we don't free resources until details taken care of
  96 + std::mutex free_guard;
  97 +
89 98 void free_reply_object();
  99 + static void free_command(Command<ReplyT>* c);
90 100  
91 101 void invoke_callback();
92 102 bool is_error_reply();
... ... @@ -100,74 +110,74 @@ Command&lt;ReplyT&gt;::Command(
100 110 const std::function<void(const std::string&, const ReplyT&)>& callback,
101 111 const std::function<void(const std::string&, int status)>& error_callback,
102 112 double repeat, double after, bool free_memory
103   -) : rdx(rdx), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), reply_obj(NULL),
104   - pending(0), callback(callback), error_callback(error_callback), completed(false)
  113 +) : rdx(rdx), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory),
  114 + callback(callback), error_callback(error_callback)
105 115 {
106 116 timer_guard.lock();
107 117 }
108 118  
109 119 template<class ReplyT>
110   -void Command<ReplyT>::command_callback(redisAsyncContext *ctx, void *r, void *privdata) {
  120 +void Command<ReplyT>::process_reply() {
  121 +
  122 + free_guard.lock();
  123 +
  124 + invoke_callback();
111 125  
112   - auto *c = (Command<ReplyT> *) privdata;
113   - c->reply_obj = (redisReply *) r;
114   - c->invoke_callback();
  126 + pending--;
  127 +
  128 + if(!free_memory) {
  129 + // Allow free() method to free memory
  130 + free_guard.unlock();
  131 + return;
  132 + }
115 133  
116   - // Free the reply object unless told not to
117   - if(c->free_memory) c->free_reply_object();
  134 + free_reply_object();
118 135  
119   - // Increment the Redox object command counter
120   - c->rdx->incr_cmd_count();
  136 + if((pending == 0) && (repeat == 0)) {
  137 + free_command(this);
  138 + } else {
  139 + free_guard.unlock();
  140 + }
121 141 }
122 142  
123 143 template<class ReplyT>
124 144 void Command<ReplyT>::invoke(const ReplyT& r) {
125   -
126 145 if(callback) callback(cmd, r);
127   -
128   - pending--;
129   - if(!free_memory) return;
130   - if(pending != 0) return;
131   - if(completed || (repeat == 0)) {
132   -// std::cout << cmd << ": suicide!" << std::endl;
133   - delete this;
134   - }
135 146 }
136 147  
137 148 template<class ReplyT>
138 149 void Command<ReplyT>::invoke_error(int status) {
139   -
140 150 if(error_callback) error_callback(cmd, status);
141   -
142   - pending--;
143   - if(!free_memory) return;
144   - if(pending != 0) return;
145   - if(completed || (repeat == 0)) {
146   -// std::cout << cmd << ": suicide!" << std::endl;
147   - delete this;
148   - }
149 151 }
150 152  
151 153 template<class ReplyT>
152 154 void Command<ReplyT>::free_reply_object() {
153 155  
154   - if(reply_obj == NULL) {
155   - std::cerr << "[ERROR] " << cmd << ": Attempting to double free reply object!" << std::endl;
  156 + if(reply_obj == nullptr) {
  157 + std::cerr << "[ERROR] " << cmd << ": Attempting to double free reply object." << std::endl;
156 158 return;
157 159 }
158 160  
159 161 freeReplyObject(reply_obj);
160   - reply_obj = NULL;
  162 + reply_obj = nullptr;
  163 +}
  164 +
  165 +template<class ReplyT>
  166 +void Command<ReplyT>::free_command(Command<ReplyT>* c) {
  167 + c->rdx->commands_deleted += 1;
  168 + c->rdx->remove_active_command(c);
  169 +// std::cout << "[INFO] Deleted Command " << c->rdx->commands_created << " at " << c << std::endl;
  170 + delete c;
161 171 }
162 172  
163 173 template<class ReplyT>
164 174 void Command<ReplyT>::free() {
165 175  
  176 + free_guard.lock();
166 177 free_reply_object();
  178 + free_guard.unlock();
167 179  
168   - // Commit suicide
169   -// std::cout << cmd << ": suicide, by calling free()!" << std::endl;
170   - delete this;
  180 + free_command(this);
171 181 }
172 182  
173 183 template<class ReplyT>
... ...
src/redox.cpp
... ... @@ -3,48 +3,47 @@
3 3 */
4 4  
5 5 #include <signal.h>
6   -#include <string.h>
7 6 #include "redox.hpp"
8 7  
9 8 using namespace std;
10 9  
11 10 namespace redox {
12 11  
13   -// Global mutex to manage waiting for connected state
14   -// TODO get rid of this as the only global variable?
15   -mutex connected_lock;
  12 +void Redox::connected(const redisAsyncContext *ctx, int status) {
16 13  
17   -void Redox::connected(const redisAsyncContext *c, int status) {
18 14 if (status != REDIS_OK) {
19   - cerr << "[ERROR] Connecting to Redis: " << c->errstr << endl;
  15 + cerr << "[ERROR] Connecting to Redis: " << ctx->errstr << endl;
20 16 return;
21 17 }
22 18  
23 19 // Disable hiredis automatically freeing reply objects
24   - c->c.reader->fn->freeObject = [](void* reply) {};
  20 + ctx->c.reader->fn->freeObject = [](void* reply) {};
25 21  
26   - cout << "Connected to Redis." << endl;
27   - connected_lock.unlock();
  22 + Redox* rdx = (Redox*) ctx->data;
  23 + rdx->connected_lock.unlock();
  24 +
  25 + cout << "[INFO] Connected to Redis." << endl;
28 26 }
29 27  
30   -void Redox::disconnected(const redisAsyncContext *c, int status) {
  28 +void Redox::disconnected(const redisAsyncContext *ctx, int status) {
31 29 if (status != REDIS_OK) {
32   - cerr << "[ERROR] Disconnecting from Redis: " << c->errstr << endl;
  30 + cerr << "[ERROR] Disconnecting from Redis: " << ctx->errstr << endl;
33 31 return;
34 32 }
35 33  
36 34 // Re-enable hiredis automatically freeing reply objects
37   - c->c.reader->fn->freeObject = freeReplyObject;
  35 + ctx->c.reader->fn->freeObject = freeReplyObject;
38 36  
39   - cout << "Disconnected from Redis." << endl;
40   - connected_lock.lock();
  37 + Redox* rdx = (Redox*) ctx->data;
  38 + rdx->connected_lock.unlock();
  39 +
  40 + cout << "[INFO] Disconnected from Redis." << endl;
41 41 }
42 42  
43 43 Redox::Redox(const string& host, const int port)
44   - : host(host), port(port), cmd_count(0), to_exit(false) {
  44 + : host(host), port(port) {
45 45  
46 46 lock_guard<mutex> lg(queue_guard);
47   - connected_lock.lock();
48 47  
49 48 signal(SIGPIPE, SIG_IGN);
50 49  
... ... @@ -54,48 +53,115 @@ Redox::Redox(const string&amp; host, const int port)
54 53 return;
55 54 }
56 55  
57   - redisLibevAttach(EV_DEFAULT_ ctx);
  56 + evloop = ev_loop_new(EVFLAG_AUTO);
  57 + ev_set_userdata(evloop, (void*)this);
  58 +
  59 + redisLibevAttach(evloop, ctx);
58 60 redisAsyncSetConnectCallback(ctx, Redox::connected);
59 61 redisAsyncSetDisconnectCallback(ctx, Redox::disconnected);
  62 +
  63 + ctx->data = (void*)this;
  64 + connected_lock.lock();
60 65 }
61 66  
62 67 Redox::~Redox() {
  68 +
  69 +// cout << "Queue sizes: " << endl;
  70 +// cout << commands_redis_reply.size() << endl;
  71 +// cout << commands_string_r.size() << endl;
  72 +// cout << commands_char_p.size() << endl;
  73 +// cout << commands_int.size() << endl;
  74 +// cout << commands_long_long_int.size() << endl;
  75 +// cout << commands_null.size() << endl;
  76 +
63 77 redisAsyncDisconnect(ctx);
  78 +
64 79 stop();
  80 +
  81 + if(event_loop_thread.joinable())
  82 + event_loop_thread.join();
  83 +
  84 + ev_loop_destroy(evloop);
  85 +
  86 + std::cout << "[INFO] Redox created " << commands_created
  87 + << " Commands and freed " << commands_deleted << "." << std::endl;
65 88 }
66 89  
67 90 void Redox::run_blocking() {
68 91  
69 92 // Events to connect to Redox
70   - ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
71   - lock_guard<mutex> lg(connected_lock);
  93 + ev_run(evloop, EVRUN_NOWAIT);
  94 +
  95 + // Block until connected to Redis
  96 + connected_lock.lock();
  97 + connected_lock.unlock();
72 98  
73 99 // Continuously create events and handle them
74 100 while (!to_exit) {
75 101 process_queued_commands();
76   - ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
  102 + ev_run(evloop, EVRUN_NOWAIT);
77 103 }
78 104  
79   - // Handle exit events
80   - ev_run(EV_DEFAULT_ EVRUN_NOWAIT);
  105 + cout << "[INFO] Stop signal detected." << endl;
  106 +
  107 + // Run a few more times to clear out canceled events
  108 +// for(int i = 0; i < 100; i++) {
  109 +// ev_run(evloop, EVRUN_NOWAIT);
  110 +// }
  111 +
  112 + // Run until all commands are processed
  113 + do {
  114 + ev_run(evloop, EVRUN_NOWAIT);
  115 + } while(commands_created != commands_deleted);
  116 +
  117 + exited = true;
81 118  
82 119 // Let go for block_until_stopped method
83 120 exit_waiter.notify_one();
  121 +
  122 + cout << "[INFO] Event thread exited." << endl;
84 123 }
85 124  
86 125 void Redox::run() {
87 126  
88 127 event_loop_thread = thread([this] { run_blocking(); });
89   - event_loop_thread.detach();
  128 +
  129 + // Don't return until connected
  130 + lock_guard<mutex> lg(connected_lock);
90 131 }
91 132  
92   -void Redox::stop() {
  133 +void Redox::stop_signal() {
93 134 to_exit = true;
94 135 }
95 136  
96 137 void Redox::block() {
97 138 unique_lock<mutex> ul(exit_waiter_lock);
98   - exit_waiter.wait(ul, [this]() { return to_exit.load(); });
  139 + exit_waiter.wait(ul, [this] { return exited.load(); });
  140 +}
  141 +
  142 +void Redox::stop() {
  143 + stop_signal();
  144 + block();
  145 +}
  146 +
  147 +template<class ReplyT>
  148 +void command_callback(redisAsyncContext *ctx, void *r, void *privdata) {
  149 +
  150 + Command<ReplyT>* c = (Command<ReplyT>*) privdata;
  151 + redisReply* reply_obj = (redisReply*) r;
  152 + Redox* rdx = (Redox*) ctx->data;
  153 +
  154 + if(!rdx->is_active_command(c)) {
  155 + std::cout << "[INFO] Ignoring callback, command " << c << " was freed." << std::endl;
  156 + freeReplyObject(r);
  157 + return;
  158 + }
  159 +
  160 + c->reply_obj = reply_obj;
  161 + c->process_reply();
  162 +
  163 + // Increment the Redox object command counter
  164 + rdx->incr_cmd_count();
99 165 }
100 166  
101 167 /**
... ... @@ -105,7 +171,7 @@ void Redox::block() {
105 171 template<class ReplyT>
106 172 bool submit_to_server(Command<ReplyT>* c) {
107 173 c->pending++;
108   - if (redisAsyncCommand(c->rdx->ctx, c->command_callback, (void*)c, c->cmd.c_str()) != REDIS_OK) {
  174 + if (redisAsyncCommand(c->rdx->ctx, command_callback<ReplyT>, (void*)c, c->cmd.c_str()) != REDIS_OK) {
109 175 cerr << "[ERROR] Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr << endl;
110 176 c->invoke_error(REDOX_SEND_ERROR);
111 177 return false;
... ... @@ -116,13 +182,22 @@ bool submit_to_server(Command&lt;ReplyT&gt;* c) {
116 182 template<class ReplyT>
117 183 void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) {
118 184  
119   - // Check if canceled
120   - if(timer->data == NULL) {
121   - cerr << "[WARNING] Skipping event, has been canceled." << endl;
  185 + auto c = (Command<ReplyT>*)timer->data;
  186 +
  187 + if(c->is_completed()) {
  188 +
  189 + cerr << "[INFO] Command " << c << " is completed, stopping event timer." << endl;
  190 +
  191 + c->timer_guard.lock();
  192 + if((c->repeat != 0) || (c->after != 0))
  193 + ev_timer_stop(loop, &c->timer);
  194 + c->timer_guard.unlock();
  195 +
  196 + Command<ReplyT>::free_command(c);
  197 +
122 198 return;
123 199 }
124 200  
125   - auto c = (Command<ReplyT>*)timer->data;
126 201 submit_to_server<ReplyT>(c);
127 202 }
128 203  
... ... @@ -143,7 +218,7 @@ bool Redox::process_queued_command(void* c_ptr) {
143 218 c->timer.data = (void*)c;
144 219  
145 220 ev_timer_init(&c->timer, submit_command_callback<ReplyT>, c->after, c->repeat);
146   - ev_timer_start(EV_DEFAULT_ &c->timer);
  221 + ev_timer_start(evloop, &c->timer);
147 222  
148 223 c->timer_guard.unlock();
149 224 }
... ... @@ -195,7 +270,7 @@ Redox::get_command_map() { return commands_null; }
195 270 // ----------------------------
196 271  
197 272 void Redox::command(const string& cmd) {
198   - command<redisReply*>(cmd, NULL);
  273 + command<redisReply*>(cmd);
199 274 }
200 275  
201 276 bool Redox::command_blocking(const string& cmd) {
... ...
src/redox.hpp
... ... @@ -15,6 +15,7 @@
15 15 #include <string>
16 16 #include <queue>
17 17 #include <unordered_map>
  18 +#include <unordered_set>
18 19  
19 20 #include <hiredis/hiredis.h>
20 21 #include <hiredis/async.h>
... ... @@ -35,8 +36,10 @@ public:
35 36  
36 37 void run();
37 38 void run_blocking();
38   - void stop();
  39 +
  40 + void stop_signal();
39 41 void block();
  42 + void stop();
40 43  
41 44 template<class ReplyT>
42 45 Command<ReplyT>* command(
... ... @@ -67,16 +70,34 @@ public:
67 70 // void subscribe(std::string channel, std::function<void(std::string channel, std::string msg)> callback);
68 71 // void unsubscribe(std::string channel);
69 72  
  73 + std::atomic_int commands_created = {0};
  74 + std::atomic_int commands_deleted = {0};
  75 +
  76 + bool is_active_command(void* c_ptr) {
  77 + return active_commands.find(c_ptr) != active_commands.end();
  78 + }
  79 +
  80 + void remove_active_command(void* c_ptr) {
  81 + active_commands.erase(c_ptr);
  82 + }
  83 +
70 84 private:
71 85  
72 86 // Redox server
73 87 std::string host;
74 88 int port;
75 89  
  90 + // Block run() until redis is connected
  91 + std::mutex connected_lock;
  92 +
  93 + // Dynamically allocated libev event loop
  94 + struct ev_loop* evloop;
  95 +
76 96 // Number of commands processed
77   - std::atomic_long cmd_count;
  97 + std::atomic_long cmd_count = {0};
78 98  
79   - std::atomic_bool to_exit;
  99 + std::atomic_bool to_exit = {false}; // Signal to exit
  100 + std::atomic_bool exited = {false}; // Event thread exited
80 101 std::mutex exit_waiter_lock;
81 102 std::condition_variable exit_waiter;
82 103  
... ... @@ -98,6 +119,9 @@ private:
98 119  
99 120 template<class ReplyT>
100 121 bool process_queued_command(void* cmd_ptr);
  122 +
  123 + // Commands created but not yet deleted
  124 + std::unordered_set<void*> active_commands;
101 125 };
102 126  
103 127 // ---------------------------
... ... @@ -113,25 +137,24 @@ Command&lt;ReplyT&gt;* Redox::command(
113 137 ) {
114 138 std::lock_guard<std::mutex> lg(queue_guard);
115 139 auto* c = new Command<ReplyT>(this, cmd, callback, error_callback, repeat, after, free_memory);
116   - get_command_map<ReplyT>()[(void*)c] = c;
117   - command_queue.push((void*)c);
  140 + void* c_ptr = (void*)c;
  141 + get_command_map<ReplyT>()[c_ptr] = c;
  142 + command_queue.push(c_ptr);
  143 + active_commands.insert(c_ptr);
  144 + commands_created += 1;
  145 +// std::cout << "[DEBUG] Created Command " << commands_created << " at " << c << std::endl;
118 146 return c;
119 147 }
120 148  
121 149 template<class ReplyT>
122 150 bool Redox::cancel(Command<ReplyT>* c) {
123 151  
124   - if(c == NULL) {
  152 + if(c == nullptr) {
125 153 std::cerr << "[ERROR] Canceling null command." << std::endl;
126 154 return false;
127 155 }
128 156  
129   - c->timer.data = NULL;
130   -
131   - std::lock_guard<std::mutex> lg(c->timer_guard);
132   - if((c->repeat != 0) || (c->after != 0))
133   - ev_timer_stop(EV_DEFAULT_ &c->timer);
134   -
  157 + std::cout << "[INFO] Canceling command at " << c << std::endl;
135 158 c->completed = true;
136 159  
137 160 return true;
... ...