Commit 7858507963e633d14315205d7e973227f4420cb9

Authored by Hayk Martirosyan
1 parent 916eafff

Fix memory management, concurrency issues

Several hours of multithreaded debugging to find a nasty extra
ampersand. command_blocking() now returns a Command object with
the value and status accessible through methods.

Added the option to free memory or not for Command objects when
command() is called. If free_memory = true, then the Command
object is freed after the callback or error callback returns.
If free_memory = false, then the user must call cmd->free().

To implement this, we have to pass in a blank function to the
freeObject entry in the hiredis redisReader, because otherwise
it automatically frees memory. This was not cool for the blocking
case.
examples/simple_loop.cpp
... ... @@ -6,6 +6,7 @@
6 6 #include "../src/redisx.hpp"
7 7  
8 8 using namespace std;
  9 +using namespace redisx;
9 10  
10 11 double time_s() {
11 12 unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1);
... ... @@ -14,14 +15,22 @@ double time_s() {
14 15  
15 16 int main(int argc, char* argv[]) {
16 17  
17   - redisx::Redis rdx = {"localhost", 6379};
  18 + Redis rdx = {"localhost", 6379};
18 19 rdx.run();
19 20  
20   - rdx.command_blocking("DEL simple_loop:count");
21   - rdx.command_blocking("SET simple_loop:count 0");
  21 + Command<int>* del_cmd = rdx.command_blocking<int>("DEL simple_loop:count");
  22 + cout << "deleted key, reply: " << del_cmd->reply() << endl;
  23 + del_cmd->free();
22 24  
23   - cout << "At the start, simple_loop:count = "
24   - << rdx.command_blocking<string>("GET simple_loop:count") << endl;
  25 + Command<char*>* set_cmd = rdx.command_blocking<char*>("SET simple_loop:count 0");
  26 + cout << "set key, reply: " << set_cmd->reply() << endl;
  27 + set_cmd->free();
  28 +
  29 + Command<char*>* count_cmd = rdx.command_blocking<char*>("GET simple_loop:count");
  30 + if(count_cmd->status() == REDISX_OK) {
  31 + cout << "At the start, simple_loop:count = " << count_cmd->reply() << endl;
  32 + }
  33 + count_cmd->free();
25 34  
26 35 string cmd_str = "INCR simple_loop:count";
27 36  
... ... @@ -33,7 +42,7 @@ int main(int argc, char* argv[]) {
33 42 << "s for " << t << "s..." << endl;
34 43  
35 44 atomic_int count(0);
36   - redisx::Command<int>* c = rdx.command<int>(
  45 + Command<int>* c = rdx.command<int>(
37 46 cmd_str,
38 47 [&count](const string &cmd, const int& value) { count++; },
39 48 NULL,
... ... @@ -46,7 +55,7 @@ int main(int argc, char* argv[]) {
46 55 rdx.cancel(c);
47 56  
48 57 cout << "At the end, simple_loop:count = "
49   - << rdx.command_blocking<string>("GET simple_loop:count") << endl;
  58 + << rdx.command_blocking<string>("GET simple_loop:count")->reply() << endl;
50 59  
51 60 rdx.stop();
52 61  
... ...
src/command.hpp
... ... @@ -14,6 +14,14 @@
14 14  
15 15 namespace redisx {
16 16  
  17 +static const int REDISX_UNINIT = -1;
  18 +static const int REDISX_OK = 0;
  19 +static const int REDISX_SEND_ERROR = 1;
  20 +static const int REDISX_WRONG_TYPE = 2;
  21 +static const int REDISX_NIL_REPLY = 3;
  22 +static const int REDISX_ERROR_REPLY = 4;
  23 +static const int REDISX_TIMEOUT = 5;
  24 +
17 25 template<class ReplyT>
18 26 class Command {
19 27  
... ... @@ -21,28 +29,48 @@ friend class Redis;
21 29  
22 30 public:
23 31 Command(
24   - redisAsyncContext* c,
25   - const std::string& cmd,
26   - const std::function<void(const std::string&, const ReplyT&)>& callback,
27   - const std::function<void(const std::string&, int status)>& error_callback,
28   - double repeat, double after
  32 + redisAsyncContext* c,
  33 + const std::string& cmd,
  34 + const std::function<void(const std::string&, const ReplyT&)>& callback,
  35 + const std::function<void(const std::string&, int status)>& error_callback,
  36 + double repeat, double after,
  37 + bool free_memory
29 38 );
30 39  
31 40 const std::string cmd;
32 41 const double repeat;
33 42 const double after;
34 43  
  44 + const bool free_memory;
  45 +
35 46 redisAsyncContext* c;
  47 + redisReply* reply_obj;
36 48  
37 49 std::atomic_int pending;
38 50  
39 51 void invoke(const ReplyT& reply);
40 52 void invoke_error(int status);
41 53  
  54 + const ReplyT& reply();
  55 + int status() { return reply_status; };
  56 +
  57 + /**
  58 + * Called by the user to free the redisReply object, when the free_memory
  59 + * flag is set to false for a command.
  60 + */
  61 + void free();
  62 +
  63 + void free_reply_object();
  64 +
42 65 private:
43 66  
44 67 const std::function<void(const std::string&, const ReplyT&)> callback;
45   - const std::function<void(const std::string&, int status)>& error_callback;
  68 + const std::function<void(const std::string&, int status)> error_callback;
  69 +
  70 + // Place to store the reply value and status.
  71 + // ONLY for blocking commands
  72 + ReplyT reply_val;
  73 + int reply_status;
46 74  
47 75 std::atomic_bool completed;
48 76  
... ... @@ -61,33 +89,70 @@ Command&lt;ReplyT&gt;::Command(
61 89 const std::string& cmd,
62 90 const std::function<void(const std::string&, const ReplyT&)>& callback,
63 91 const std::function<void(const std::string&, int status)>& error_callback,
64   - double repeat, double after
65   -) : cmd(cmd), repeat(repeat), after(after), c(c), pending(0),
66   - callback(callback), error_callback(error_callback), completed(false)
  92 + double repeat, double after, bool free_memory
  93 +) : cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), c(c), reply_obj(NULL),
  94 + pending(0), callback(callback), error_callback(error_callback), completed(false)
67 95 {
68 96 timer_guard.lock();
69 97 }
70 98  
71 99 template<class ReplyT>
72   -void Command<ReplyT>::invoke(const ReplyT& reply) {
  100 +void Command<ReplyT>::invoke(const ReplyT& r) {
  101 +
  102 + if(callback) callback(cmd, r);
73 103  
74   - if(callback != NULL) callback(cmd, reply);
75 104 pending--;
76   - if((pending == 0) && (completed || (repeat == 0))) {
77   -// std::cout << "invoking success, reply: " << reply << std::endl;
78   -// std::cout << "Freeing cmd " << cmd << " in success invoke" << std::endl;
  105 + if(!free_memory) return;
  106 + if(pending != 0) return;
  107 + if(completed || (repeat == 0)) {
  108 +// std::cout << cmd << ": suicide!" << std::endl;
79 109 delete this;
80 110 }
81 111 }
82 112  
83 113 template<class ReplyT>
84 114 void Command<ReplyT>::invoke_error(int status) {
85   - if(error_callback != NULL) error_callback(cmd, status);
  115 +
  116 + if(error_callback) error_callback(cmd, status);
  117 +
86 118 pending--;
87   - if((pending == 0) && (completed || (repeat == 0))) {
88   -// std::cout << "Freeing cmd " << cmd << " in error invoke" << std::endl;
  119 + if(!free_memory) return;
  120 + if(pending != 0) return;
  121 + if(completed || (repeat == 0)) {
  122 +// std::cout << cmd << ": suicide!" << std::endl;
89 123 delete this;
90 124 }
91 125 }
92 126  
  127 +template<class ReplyT>
  128 +void Command<ReplyT>::free_reply_object() {
  129 +
  130 + if(reply_obj == NULL) {
  131 + std::cerr << "[ERROR] " << cmd << ": Attempting to double free reply object!" << std::endl;
  132 + return;
  133 + }
  134 +
  135 + freeReplyObject(reply_obj);
  136 + reply_obj = NULL;
  137 +}
  138 +
  139 +template<class ReplyT>
  140 +void Command<ReplyT>::free() {
  141 +
  142 + free_reply_object();
  143 +
  144 + // Commit suicide
  145 +// std::cout << cmd << ": suicide, by calling free()!" << std::endl;
  146 + delete this;
  147 +}
  148 +
  149 +template<class ReplyT>
  150 +const ReplyT& Command<ReplyT>::reply() {
  151 + if(reply_status != REDISX_OK) {
  152 + std::cout << "[WARNING] " << cmd
  153 + << ": Accessing value of reply with status != OK." << std::endl;
  154 + }
  155 + return reply_val;
  156 +}
  157 +
93 158 } // End namespace redis
... ...
src/redisx.cpp
... ... @@ -17,11 +17,19 @@ std::unordered_map&lt;ev_timer*, void*&gt; Redis::timer_callbacks;
17 17 // Global mutex to manage waiting for connected state
18 18 mutex connected_lock;
19 19  
  20 +/**
  21 +* Dummy function given to hiredis to use for freeing reply
  22 +* objects, so the memory can be managed here instead.
  23 +*/
  24 +void dummy_free_reply(void *reply) {}
  25 +
20 26 void connected(const redisAsyncContext *c, int status) {
21 27 if (status != REDIS_OK) {
22 28 cerr << "[ERROR] Connecting to Redis: " << c->errstr << endl;
23 29 return;
24 30 }
  31 +
  32 + c->c.reader->fn->freeObject = dummy_free_reply;
25 33 cout << "Connected to Redis." << endl;
26 34 connected_lock.unlock();
27 35 }
... ... @@ -31,6 +39,7 @@ void disconnected(const redisAsyncContext *c, int status) {
31 39 cerr << "[ERROR] Disconnecting from Redis: " << c->errstr << endl;
32 40 return;
33 41 }
  42 + c->c.reader->fn->freeObject = freeReplyObject;
34 43 cout << "Disconnected from Redis." << endl;
35 44 connected_lock.lock();
36 45 }
... ... @@ -101,7 +110,7 @@ template&lt;class ReplyT&gt;
101 110 bool submit_to_server(Command<ReplyT>* cmd_obj) {
102 111 cmd_obj->pending++;
103 112 if (redisAsyncCommand(cmd_obj->c, command_callback<ReplyT>, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) {
104   - cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl;
  113 + cerr << "[ERROR] Could not send \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl;
105 114 cmd_obj->invoke_error(REDISX_SEND_ERROR);
106 115 return false;
107 116 }
... ... @@ -134,6 +143,9 @@ bool Redis::process_queued_command(void* cmd_ptr) {
134 143 } else {
135 144 // TODO manage memory somehow
136 145 cmd_obj->timer = new ev_timer();
  146 +
  147 + // TODO use cmd_obj->timer->data instead of timer callbacks!!!!!
  148 +
137 149 timer_callbacks[cmd_obj->timer] = (void*)cmd_obj;
138 150 ev_timer_init(cmd_obj->timer, submit_command_callback<ReplyT>, cmd_obj->after, cmd_obj->repeat);
139 151 ev_timer_start(EV_DEFAULT_ cmd_obj->timer);
... ... @@ -175,12 +187,6 @@ template&lt;&gt;
175 187 void invoke_callback(Command<redisReply*>* cmd_obj, redisReply* reply) {
176 188 cmd_obj->invoke(reply);
177 189 }
178   -template<> redisReply* Redis::copy_reply(const redisReply*& reply) {
179   - // TODO get rid of this it is dumb.
180   - auto* copy = new redisReply;
181   - *copy = *reply;
182   - return copy;
183   -}
184 190  
185 191 template<> unordered_map<void*, Command<string>*>& Redis::get_command_map() { return commands_string_r; }
186 192 template<>
... ... @@ -194,7 +200,6 @@ void invoke_callback(Command&lt;string&gt;* cmd_obj, redisReply* reply) {
194 200 string s = reply->str;
195 201 cmd_obj->invoke(s);
196 202 }
197   -template<> string Redis::copy_reply(const string& reply) { return reply; }
198 203  
199 204 template<> unordered_map<void*, Command<char*>*>& Redis::get_command_map() { return commands_char_p; }
200 205 template<>
... ... @@ -206,13 +211,6 @@ void invoke_callback(Command&lt;char*&gt;* cmd_obj, redisReply* reply) {
206 211 }
207 212 cmd_obj->invoke(reply->str);
208 213 }
209   -template<> char* Redis::copy_reply(const char*& reply) {
210   - // Here, reply MUST be null terminated!
211   - size_t len = strlen(reply);
212   - auto* copy = new char[len+1];
213   - strcpy(copy, reply);
214   - return copy;
215   -}
216 214  
217 215 template<> unordered_map<void*, Command<int>*>& Redis::get_command_map() { return commands_int; }
218 216 template<>
... ... @@ -236,9 +234,6 @@ void invoke_callback(Command&lt;long long int&gt;* cmd_obj, redisReply* reply) {
236 234 cmd_obj->invoke(reply->integer);
237 235 }
238 236  
239   -
240   -
241   -
242 237 // ----------------------------
243 238 // Helpers
244 239 // ----------------------------
... ... @@ -251,35 +246,4 @@ void Redis::command_blocking(const string&amp; cmd) {
251 246 command_blocking<redisReply*>(cmd);
252 247 }
253 248  
254   -//void Redis::get(const char* key, function<void(const string&, const char*)> callback) {
255   -// string cmd = string("GET ") + key;
256   -// command<const char*>(cmd.c_str(), callback);
257   -//}
258   -//
259   -//void Redis::set(const char* key, const char* value) {
260   -// string cmd = string("SET ") + key + " " + value;
261   -// command<const char*>(cmd.c_str(), [](const string& command, const char* reply) {
262   -// if(strcmp(reply, "OK"))
263   -// cerr << "[ERROR] " << command << ": SET failed with reply " << reply << endl;
264   -// });
265   -//}
266   -//
267   -//void Redis::set(const char* key, const char* value, function<void(const string&, const char*)> callback) {
268   -// string cmd = string("SET ") + key + " " + value;
269   -// command<const char*>(cmd.c_str(), callback);
270   -//}
271   -//
272   -//void Redis::del(const char* key) {
273   -// string cmd = string("DEL ") + key;
274   -// command<long long int>(cmd.c_str(), [](const string& command, long long int num_deleted) {
275   -// if(num_deleted != 1)
276   -// cerr << "[ERROR] " << command << ": Deleted " << num_deleted << " keys." << endl;
277   -// });
278   -//}
279   -//
280   -//void Redis::del(const char* key, function<void(const string&, long long int)> callback) {
281   -// string cmd = string("DEL ") + key;
282   -// command<long long int>(cmd.c_str(), callback);
283   -//}
284   -
285 249 } // End namespace redis
... ...
src/redisx.hpp
... ... @@ -22,14 +22,6 @@
22 22  
23 23 #include "command.hpp"
24 24  
25   -static const int REDISX_UNINIT = -1;
26   -static const int REDISX_OK = 0;
27   -static const int REDISX_SEND_ERROR = 1;
28   -static const int REDISX_WRONG_TYPE = 2;
29   -static const int REDISX_NIL_REPLY = 3;
30   -static const int REDISX_ERROR_REPLY = 4;
31   -static const int REDISX_TIMEOUT = 5;
32   -
33 25 namespace redisx {
34 26  
35 27 class Redis {
... ... @@ -46,11 +38,12 @@ public:
46 38  
47 39 template<class ReplyT>
48 40 Command<ReplyT>* command(
49   - const std::string& cmd,
50   - const std::function<void(const std::string&, const ReplyT&)>& callback = NULL,
51   - const std::function<void(const std::string&, int status)>& error_callback = NULL,
52   - double repeat = 0.0,
53   - double after = 0.0
  41 + const std::string& cmd,
  42 + const std::function<void(const std::string&, const ReplyT&)>& callback = NULL,
  43 + const std::function<void(const std::string&, int status)>& error_callback = NULL,
  44 + double repeat = 0.0,
  45 + double after = 0.0,
  46 + bool free_memory = true
54 47 );
55 48  
56 49 template<class ReplyT>
... ... @@ -59,20 +52,12 @@ public:
59 52 void command(const std::string& command);
60 53  
61 54 template<class ReplyT>
62   - ReplyT command_blocking(const std::string& cmd);
  55 + Command<ReplyT>* command_blocking(const std::string& cmd);
63 56  
64 57 void command_blocking(const std::string& command);
65 58  
66 59 long num_commands_processed();
67 60  
68   -// void get(const char* key, std::function<void(const std::string&, const char*)> callback);
69   -//
70   -// void set(const char* key, const char* value);
71   -// void set(const char* key, const char* value, std::function<void(const std::string&, const char*)> callback);
72   -//
73   -// void del(const char* key);
74   -// void del(const char* key, std::function<void(const std::string&, long long int)> callback);
75   -
76 61 // void publish(std::string channel, std::string msg);
77 62 // void subscribe(std::string channel, std::function<void(std::string channel, std::string msg)> callback);
78 63 // void unsubscribe(std::string channel);
... ... @@ -113,9 +98,6 @@ private:
113 98  
114 99 template<class ReplyT>
115 100 bool process_queued_command(void* cmd_ptr);
116   -
117   - template<class ReplyT>
118   - ReplyT copy_reply(const ReplyT& reply);
119 101 };
120 102  
121 103 // ---------------------------
... ... @@ -129,34 +111,36 @@ void invoke_callback(
129 111 template<class ReplyT>
130 112 void command_callback(redisAsyncContext *c, void *r, void *privdata) {
131 113  
132   - redisReply *reply = (redisReply *) r;
133 114 auto *cmd_obj = (Command<ReplyT> *) privdata;
  115 + cmd_obj->reply_obj = (redisReply *) r;
134 116  
135   - if (reply->type == REDIS_REPLY_ERROR) {
136   - std::cerr << "[ERROR] " << cmd_obj->cmd << ": " << reply->str << std::endl;
  117 + if (cmd_obj->reply_obj->type == REDIS_REPLY_ERROR) {
  118 + std::cerr << "[ERROR redisx.hpp:121] " << cmd_obj->cmd << ": " << cmd_obj->reply_obj->str << std::endl;
137 119 cmd_obj->invoke_error(REDISX_ERROR_REPLY);
138   - return;
139   - }
140 120  
141   - if(reply->type == REDIS_REPLY_NIL) {
  121 + } else if(cmd_obj->reply_obj->type == REDIS_REPLY_NIL) {
142 122 std::cerr << "[WARNING] " << cmd_obj->cmd << ": Nil reply." << std::endl;
143 123 cmd_obj->invoke_error(REDISX_NIL_REPLY);
144   - return;
  124 +
  125 + } else {
  126 + invoke_callback<ReplyT>(cmd_obj, cmd_obj->reply_obj);
145 127 }
146 128  
147   - invoke_callback<ReplyT>(cmd_obj, reply);
  129 + // Free the reply object unless told not to
  130 + if(cmd_obj->free_memory) cmd_obj->free_reply_object();
148 131 }
149 132  
150 133 template<class ReplyT>
151 134 Command<ReplyT>* Redis::command(
152   - const std::string& cmd,
153   - const std::function<void(const std::string&, const ReplyT&)>& callback,
154   - const std::function<void(const std::string&, int status)>& error_callback,
155   - double repeat,
156   - double after
  135 + const std::string& cmd,
  136 + const std::function<void(const std::string&, const ReplyT&)>& callback,
  137 + const std::function<void(const std::string&, int status)>& error_callback,
  138 + double repeat,
  139 + double after,
  140 + bool free_memory
157 141 ) {
158 142 std::lock_guard<std::mutex> lg(queue_guard);
159   - auto* cmd_obj = new Command<ReplyT>(c, cmd, callback, error_callback, repeat, after);
  143 + auto* cmd_obj = new Command<ReplyT>(c, cmd, callback, error_callback, repeat, after, free_memory);
160 144 get_command_map<ReplyT>()[(void*)cmd_obj] = cmd_obj;
161 145 command_queue.push((void*)cmd_obj);
162 146 return cmd_obj;
... ... @@ -183,43 +167,39 @@ bool Redis::cancel(Command&lt;ReplyT&gt;* cmd_obj) {
183 167 }
184 168  
185 169 template<class ReplyT>
186   -ReplyT Redis::command_blocking(const std::string& cmd) {
187   - std::mutex m;
188   - std::condition_variable cv;
  170 +Command<ReplyT>* Redis::command_blocking(const std::string& cmd) {
  171 +
189 172 ReplyT val;
190 173 std::atomic_int status(REDISX_UNINIT);
191 174  
192   - // There is a memory issue here, because after the callback returns
193   - // all memory is cleared.
194   - // TODO right now just don't use char* or redisReply* for blocking
195   - // Later, maybe specialize a function to copy the pointer types to
196   - // the heap
  175 + std::condition_variable cv;
  176 + std::mutex m;
  177 +
  178 + std::unique_lock<std::mutex> lk(m);
197 179  
198   - command<ReplyT>(cmd,
199   - [&cv, &val, &status](const std::string& cmd_str, const ReplyT& reply) {
200   - std::cout << "cmd: " << cmd_str << std::endl;
201   - std::cout << "invoking success, reply: " << reply << std::endl;
  180 + Command<ReplyT>* cmd_obj = command<ReplyT>(cmd,
  181 + [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) {
  182 + std::unique_lock<std::mutex> lk(m);
202 183 val = reply;
203   - std::cout << "invoking success, reply copied: " << val << std::endl;
204 184 status = REDISX_OK;
  185 + lk.unlock();
205 186 cv.notify_one();
206 187 },
207   - [&cv, &status](const std::string& cmd_str, int error) {
  188 + [&status, &m, &cv](const std::string& cmd_str, int error) {
  189 + std::unique_lock<std::mutex> lk(m);
208 190 status = error;
  191 + lk.unlock();
209 192 cv.notify_one();
210   - }
  193 + },
  194 + 0, 0, false // No repeats, don't free memory
211 195 );
212 196  
213   - std::unique_lock<std::mutex> ul(m);
214   - cv.wait(ul, [&status] { return status != REDISX_UNINIT; });
  197 + cv.wait(lk, [&status] { return status != REDISX_UNINIT; });
215 198  
216   - std::cout << "invoking success, after wait: " << val << std::endl;
217   - std::cout << "response: " << status << std::endl;
218   - if(status == REDISX_OK) return val;
219   - else if(status == REDISX_NIL_REPLY) return val;
220   - else throw std::runtime_error(
221   - "[ERROR] " + cmd + ": redisx error code " + std::to_string(status.load())
222   - );
  199 + cmd_obj->reply_val = val;
  200 + cmd_obj->reply_status = status;
  201 +
  202 + return cmd_obj;
223 203 }
224 204  
225 205 } // End namespace redis
... ...