Commit 3bb3949a3986e2e8e503e823cfc0a263a5588693

Authored by Hayk Martirosyan
1 parent 5d3828dd

Add more guards, rearrange logic to be more bulletproof

Fixed a couple of bugs found with a test case of 100 parallel
asynchronous clients. As of now, there are no known memory
leaks, segfaults, or deadlocks in Redox.
CMakeLists.txt
... ... @@ -29,8 +29,8 @@ set(LIB_ALL ${LIB_REDIS})
29 29 # Examples
30 30 # ---------------------------------------------------------
31 31  
32   -#add_executable(basic examples/basic.cpp ${SRC_ALL})
33   -#target_link_libraries(basic ${LIB_REDIS})
  32 +add_executable(basic examples/basic.cpp ${SRC_ALL})
  33 +target_link_libraries(basic ${LIB_REDIS})
34 34  
35 35 #add_executable(progressive examples/progressive.cpp ${SRC_ALL})
36 36 #target_link_libraries(progressive ${LIB_REDIS})
... ...
README.md
1 1 redox
2 2 ======
3 3  
4   -High-level, asynchronous, and wicked fast C++11 bindings for Redis
  4 +Modern, asynchronous, and wicked fast C++11 bindings for Redis
5 5  
6 6 Work in progress, details coming soon.
... ...
examples/basic.cpp
1 1 /**
2   -* Basic asynchronous calls using redisx.
  2 +* Basic use of Redox to set and get a Redis key.
3 3 */
4 4  
5 5 #include <iostream>
6   -#include "../src/redisx.hpp"
  6 +#include "../src/redox.hpp"
7 7  
8 8 using namespace std;
9 9  
10 10 int main(int argc, char* argv[]) {
11 11  
12   - redisx::Redis rdx = {"localhost", 6379};
  12 + redox::Redox rdx = {"localhost", 6379};
13 13  
14   - rdx.command<const string &>("SET alaska rules!", [](const string &cmd, const string &value) {
  14 + rdx.command<string>("SET alaska rules!", [](const string &cmd, const string &value) {
15 15 cout << cmd << ": " << value << endl;
16 16 });
17 17  
18   - rdx.command<const string &>("GET alaska", [](const string &cmd, const string &value) {
  18 + rdx.command<string>("GET alaska", [](const string &cmd, const string &value) {
19 19 cout << cmd << ": " << value << endl;
20 20 });
21 21  
... ...
examples/basic_threaded.cpp
... ... @@ -20,6 +20,7 @@ int main(int argc, char* argv[]) {
20 20 rdx.command<int>("INCR counter");
21 21 this_thread::sleep_for(chrono::milliseconds(1));
22 22 }
  23 + cout << "Setter thread exiting." << endl;
23 24 });
24 25  
25 26 thread getter([]() {
... ... @@ -32,6 +33,7 @@ int main(int argc, char* argv[]) {
32 33 );
33 34 this_thread::sleep_for(chrono::milliseconds(1000));
34 35 }
  36 + cout << "Getter thread exiting." << endl;
35 37 });
36 38  
37 39 setter.join();
... ...
src/command.cpp
... ... @@ -64,7 +64,7 @@ void Command&lt;char*&gt;::invoke_callback() {
64 64  
65 65 template<>
66 66 void Command<int>::invoke_callback() {
67   -// std::cout << "invoking int callback" << std::endl;
  67 +
68 68 if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY);
69 69 else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY);
70 70  
... ...
src/command.hpp
... ... @@ -74,6 +74,13 @@ public:
74 74  
75 75 void process_reply();
76 76  
  77 + ev_timer* get_timer() {
  78 + std::lock_guard<std::mutex> lg(timer_guard);
  79 + return &timer;
  80 + }
  81 +
  82 + static void free_command(Command<ReplyT>* c);
  83 +
77 84 private:
78 85  
79 86 const std::function<void(const std::string&, const ReplyT&)> callback;
... ... @@ -89,16 +96,11 @@ private:
89 96 ev_timer timer;
90 97 std::mutex timer_guard;
91 98  
92   - ev_timer* get_timer() {
93   - std::lock_guard<std::mutex> lg(timer_guard);
94   - return &timer;
95   - }
96 99  
97 100 // Make sure we don't free resources until details taken care of
98 101 std::mutex free_guard;
99 102  
100 103 void free_reply_object();
101   - static void free_command(Command<ReplyT>* c);
102 104  
103 105 void invoke_callback();
104 106 bool is_error_reply();
... ... @@ -122,11 +124,6 @@ Command&lt;ReplyT&gt;::Command(
122 124 template<class ReplyT>
123 125 void Command<ReplyT>::process_reply() {
124 126  
125   - if(cmd == "GET simple_loop:count") {
126   - std::cout << "In process_reply, cmd = " << cmd << ", reply_obj = " << reply_obj << std::endl;
127   - std::cout << "reply int: " << reply_obj->integer << std::endl;
128   - std::cout << "reply str: " << reply_obj->str << std::endl;
129   - }
130 127 free_guard.lock();
131 128  
132 129 invoke_callback();
... ... @@ -134,20 +131,44 @@ void Command&lt;ReplyT&gt;::process_reply() {
134 131 pending--;
135 132  
136 133 if(!free_memory) {
137   - std::cout << "Command memory not being freed, free_memory = " << free_memory << std::endl;
138 134 // Allow free() method to free memory
  135 +// std::cout << "Command memory not being freed, free_memory = " << free_memory << std::endl;
139 136 free_guard.unlock();
140 137 return;
141 138 }
142 139  
143   -
144 140 free_reply_object();
145 141  
146   - if((pending == 0) && (repeat == 0)) {
147   - free_command(this);
148   - } else {
149   - free_guard.unlock();
  142 +// // Free memory when all pending callbacks are received
  143 +// if((repeat != 0) && (pending == 0) && ((long)(get_timer()->data) == 0)) {
  144 +// std::cout << "Freeing command, timer stopped and pending is 0." << std::endl;
  145 +// free_command(this);
  146 +// }
  147 +//
  148 +// if((pending == 0) && (repeat == 0)) {
  149 +// free_command(this);
  150 +// } else {
  151 +// free_guard.unlock();
  152 +// }
  153 +
  154 + // Handle memory if all pending replies have arrived
  155 + if(pending == 0) {
  156 +
  157 + // Just free non-repeating commands
  158 + if (repeat == 0) {
  159 + free_command(this);
  160 + return;
  161 +
  162 + // Free repeating commands if timer is stopped
  163 + } else {
  164 + if((long)(get_timer()->data) == 0) {
  165 + free_command(this);
  166 + return;
  167 + }
  168 + }
150 169 }
  170 +
  171 + free_guard.unlock();
151 172 }
152 173  
153 174 template<class ReplyT>
... ... @@ -174,9 +195,8 @@ void Command&lt;ReplyT&gt;::free_reply_object() {
174 195  
175 196 template<class ReplyT>
176 197 void Command<ReplyT>::free_command(Command<ReplyT>* c) {
177   - c->rdx->commands_deleted += 1;
178 198 c->rdx->template remove_active_command<ReplyT>(c->id);
179   -// std::cout << "[INFO] Deleted Command " << c->rdx->commands_created << " at " << c << std::endl;
  199 +// std::cout << "[INFO] Deleted Command " << c->id << " at " << c << std::endl;
180 200 delete c;
181 201 }
182 202  
... ...
src/redox.cpp
... ... @@ -66,14 +66,6 @@ Redox::Redox(const string&amp; host, const int port)
66 66  
67 67 Redox::~Redox() {
68 68  
69   -// cout << "Queue sizes: " << endl;
70   -// cout << commands_redis_reply.size() << endl;
71   -// cout << commands_string_r.size() << endl;
72   -// cout << commands_char_p.size() << endl;
73   -// cout << commands_int.size() << endl;
74   -// cout << commands_long_long_int.size() << endl;
75   -// cout << commands_null.size() << endl;
76   -
77 69 redisAsyncDisconnect(ctx);
78 70  
79 71 stop();
... ... @@ -105,14 +97,14 @@ void Redox::run_blocking() {
105 97 cout << "[INFO] Stop signal detected." << endl;
106 98  
107 99 // Run a few more times to clear out canceled events
108   -// for(int i = 0; i < 100; i++) {
109   -// ev_run(evloop, EVRUN_NOWAIT);
110   -// }
111   -
112   - // Run until all commands are processed
113   - do {
  100 + for(int i = 0; i < 100; i++) {
114 101 ev_run(evloop, EVRUN_NOWAIT);
115   - } while(commands_created != commands_deleted);
  102 + }
  103 +
  104 + if(commands_created != commands_deleted) {
  105 + cerr << "[ERROR] All commands were not freed! "
  106 + << commands_created << "/" << commands_deleted << endl;
  107 + }
116 108  
117 109 exited = true;
118 110  
... ... @@ -145,36 +137,30 @@ void Redox::stop() {
145 137 }
146 138  
147 139 template<class ReplyT>
  140 +Command<ReplyT>* Redox::find_command(long id) {
  141 +
  142 + lock_guard<mutex> lg(command_map_guard);
  143 +
  144 + auto& command_map = get_command_map<ReplyT>();
  145 + auto it = command_map.find(id);
  146 + if(it == command_map.end()) return nullptr;
  147 + return it->second;
  148 +}
  149 +
  150 +template<class ReplyT>
148 151 void command_callback(redisAsyncContext *ctx, void *r, void *privdata) {
149 152  
150 153 Redox* rdx = (Redox*) ctx->data;
151 154 long id = (long)privdata;
152 155 redisReply* reply_obj = (redisReply*) r;
153 156  
154   - auto& command_map = rdx->get_command_map<ReplyT>();
155   - auto it = command_map.find(id);
156   - if(it == command_map.end()) {
157   - cout << "[ERROR] Couldn't find Command " << id << " in command_map." << endl;
158   - freeReplyObject(r);
159   - return;
160   - };
161   - Command<ReplyT>* c = it->second;
162   -
163   - if(!rdx->is_active_command(c->id)) {
164   - std::cout << "[INFO] Ignoring callback, command " << c << " was freed." << std::endl;
165   - freeReplyObject(r);
  157 + Command<ReplyT>* c = rdx->find_command<ReplyT>(id);
  158 + if(c == nullptr) {
  159 +// cout << "[WARNING] Couldn't find Command " << id << " in command_map (command_callback)." << endl;
  160 + freeReplyObject(reply_obj);
166 161 return;
167 162 }
168 163  
169   - if(c->cmd == "GET simple_loop:count") {
170   - std::cout << "In command_callback = " << c->cmd << " at " << c << ", reply_obj = " << reply_obj << std::endl;
171   - std::cout << "reply type: " << reply_obj->type << std::endl;
172   - std::cout << "reply int: " << reply_obj->integer << std::endl;
173   - std::cout << "reply str: " << reply_obj->str << std::endl;
174   -// std::string s(reply_obj->str);
175   -// std::cout << "string object: " << s << std::endl;
176   - }
177   -
178 164 c->reply_obj = reply_obj;
179 165 c->process_reply();
180 166  
... ... @@ -188,9 +174,6 @@ void command_callback(redisAsyncContext *ctx, void *r, void *privdata) {
188 174 */
189 175 template<class ReplyT>
190 176 bool submit_to_server(Command<ReplyT>* c) {
191   - if(c->cmd == "GET simple_loop:count") {
192   - std::cout << "submit_to_server for cmd at " << c << ": " << c->cmd << std::endl;
193   - }
194 177 c->pending++;
195 178 if (redisAsyncCommand(c->rdx->ctx, command_callback<ReplyT>, (void*)c->id, c->cmd.c_str()) != REDIS_OK) {
196 179 cerr << "[ERROR] Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr << endl;
... ... @@ -206,24 +189,24 @@ void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents)
206 189 Redox* rdx = (Redox*) ev_userdata(loop);
207 190 long id = (long)timer->data;
208 191  
209   - auto& command_map = rdx->get_command_map<ReplyT>();
210   - auto it = command_map.find(id);
211   - if(it == command_map.end()) {
212   - cout << "[ERROR] Couldn't find Command " << id << " in command_map." << endl;
  192 + Command<ReplyT>* c = rdx->find_command<ReplyT>(id);
  193 + if(c == nullptr) {
  194 + cout << "[ERROR] Couldn't find Command " << id
  195 + << " in command_map (submit_command_callback)." << endl;
213 196 return;
214   - };
215   - Command<ReplyT>* c = it->second;
  197 + }
216 198  
217 199 if(c->is_completed()) {
218 200  
219   - cerr << "[INFO] Command " << c << " is completed, stopping event timer." << endl;
  201 +// cout << "[INFO] Command " << c << " is completed, stopping event timer." << endl;
220 202  
221 203 c->timer_guard.lock();
222 204 if((c->repeat != 0) || (c->after != 0))
223 205 ev_timer_stop(loop, &c->timer);
224 206 c->timer_guard.unlock();
225 207  
226   - Command<ReplyT>::free_command(c);
  208 + // Mark for memory to be freed when all callbacks are received
  209 + c->timer.data = (void*)0;
227 210  
228 211 return;
229 212 }
... ... @@ -234,15 +217,8 @@ void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents)
234 217 template<class ReplyT>
235 218 bool Redox::process_queued_command(long id) {
236 219  
237   - auto& command_map = get_command_map<ReplyT>();
238   -
239   - auto it = command_map.find(id);
240   - if(it == command_map.end()) return false;
241   - Command<ReplyT>* c = it->second;
242   -
243   - if(c->cmd == "GET simple_loop:count") {
244   - std::cout << "process_queued_command for cmd at " << c << ": " << c->cmd << std::endl;
245   - }
  220 + Command<ReplyT>* c = find_command<ReplyT>(id);
  221 + if(c == nullptr) return false;
246 222  
247 223 if((c->repeat == 0) && (c->after == 0)) {
248 224 submit_to_server<ReplyT>(c);
... ... @@ -266,6 +242,8 @@ void Redox::process_queued_commands() {
266 242 while(!command_queue.empty()) {
267 243  
268 244 long id = command_queue.front();
  245 + command_queue.pop();
  246 +
269 247 if(process_queued_command<redisReply*>(id)) {}
270 248 else if(process_queued_command<string>(id)) {}
271 249 else if(process_queued_command<char*>(id)) {}
... ... @@ -273,30 +251,38 @@ void Redox::process_queued_commands() {
273 251 else if(process_queued_command<long long int>(id)) {}
274 252 else if(process_queued_command<nullptr_t>(id)) {}
275 253 else throw runtime_error("[FATAL] Command pointer not found in any queue!");
276   -
277   - command_queue.pop();
278 254 }
279 255 }
280 256  
281 257 // ----------------------------
282 258  
283 259 template<> unordered_map<long, Command<redisReply*>*>&
284   -Redox::get_command_map() { return commands_redis_reply; }
  260 +Redox::get_command_map<redisReply*>() {
  261 +// cout << "redis reply command map at " << &commands_redis_reply << endl;
  262 + return commands_redis_reply; }
285 263  
286 264 template<> unordered_map<long, Command<string>*>&
287   -Redox::get_command_map() { return commands_string_r; }
  265 +Redox::get_command_map<string>() {
  266 +// cout << "string command map at " << &commands_string_r << endl;
  267 + return commands_string_r; }
288 268  
289 269 template<> unordered_map<long, Command<char*>*>&
290   -Redox::get_command_map() { return commands_char_p; }
  270 +Redox::get_command_map<char*>() {
  271 +// cout << "char* command map at " << &commands_char_p << endl;
  272 + return commands_char_p; }
291 273  
292 274 template<> unordered_map<long, Command<int>*>&
293   -Redox::get_command_map() { return commands_int; }
  275 +Redox::get_command_map<int>() {
  276 +// cout << "int command map at " << &commands_int << " has size: " << commands_int.size() << endl;
  277 + return commands_int; }
294 278  
295 279 template<> unordered_map<long, Command<long long int>*>&
296   -Redox::get_command_map() { return commands_long_long_int; }
  280 +Redox::get_command_map<long long int>() {
  281 +// cout << "long long int command map at " << &commands_long_long_int << endl;
  282 + return commands_long_long_int; }
297 283  
298 284 template<> unordered_map<long, Command<nullptr_t>*>&
299   -Redox::get_command_map() { return commands_null; }
  285 +Redox::get_command_map<nullptr_t>() { return commands_null; }
300 286  
301 287 // ----------------------------
302 288 // Helpers
... ...
src/redox.hpp
... ... @@ -73,17 +73,17 @@ public:
73 73 std::atomic_long commands_created = {0};
74 74 std::atomic_long commands_deleted = {0};
75 75  
76   - bool is_active_command(const long id) {
77   - return active_commands.find(id) != active_commands.end();
78   - }
79   -
80 76 template<class ReplyT>
81 77 void remove_active_command(const long id) {
82   - active_commands.erase(id);
  78 + std::lock_guard<std::mutex> lg1(command_map_guard);
83 79 get_command_map<ReplyT>().erase(id);
  80 + commands_deleted += 1;
84 81 }
85 82  
86 83 template<class ReplyT>
  84 + Command<ReplyT>* find_command(long id);
  85 +
  86 + template<class ReplyT>
87 87 std::unordered_map<long, Command<ReplyT>*>& get_command_map();
88 88  
89 89 private:
... ... @@ -114,6 +114,7 @@ private:
114 114 std::unordered_map<long, Command<int>*> commands_int;
115 115 std::unordered_map<long, Command<long long int>*> commands_long_long_int;
116 116 std::unordered_map<long, Command<std::nullptr_t>*> commands_null;
  117 + std::mutex command_map_guard;
117 118  
118 119 std::queue<long> command_queue;
119 120 std::mutex queue_guard;
... ... @@ -121,9 +122,6 @@ private:
121 122  
122 123 template<class ReplyT>
123 124 bool process_queued_command(long id);
124   -
125   - // Commands created but not yet deleted (stored by id)
126   - std::unordered_set<long> active_commands;
127 125 };
128 126  
129 127 // ---------------------------
... ... @@ -137,19 +135,19 @@ Command&lt;ReplyT&gt;* Redox::command(
137 135 double after,
138 136 bool free_memory
139 137 ) {
140   - std::lock_guard<std::mutex> lg(queue_guard);
141 138  
142 139 commands_created += 1;
143 140 auto* c = new Command<ReplyT>(this, commands_created, cmd,
144 141 callback, error_callback, repeat, after, free_memory);
145 142  
  143 + std::lock_guard<std::mutex> lg(queue_guard);
  144 + std::lock_guard<std::mutex> lg2(command_map_guard);
  145 +
146 146 get_command_map<ReplyT>()[c->id] = c;
147   - active_commands.insert(c->id);
148 147 command_queue.push(c->id);
149   - std::cout << "[DEBUG] Created Command " << c->id << " at " << c << std::endl;
150   - if(cmd == "GET simple_loop:count") {
151   - std::cout << "Command created at " << c << ": " << c->cmd << std::endl;
152   - }
  148 +
  149 +// std::cout << "[DEBUG] Created Command " << c->id << " at " << c << std::endl;
  150 +
153 151 return c;
154 152 }
155 153  
... ... @@ -161,7 +159,7 @@ bool Redox::cancel(Command&lt;ReplyT&gt;* c) {
161 159 return false;
162 160 }
163 161  
164   - std::cout << "[INFO] Canceling command " << c->id << " at " << c << std::endl;
  162 +// std::cout << "[INFO] Canceling command " << c->id << " at " << c << std::endl;
165 163 c->completed = true;
166 164  
167 165 return true;
... ... @@ -181,7 +179,7 @@ Command&lt;ReplyT&gt;* Redox::command_blocking(const std::string&amp; cmd) {
181 179 Command<ReplyT>* c = command<ReplyT>(cmd,
182 180 [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) {
183 181 std::unique_lock<std::mutex> ul(m);
184   - std::cout << "success callback: " << cmd_str << std::endl;
  182 +
185 183 val = reply;
186 184 status = REDOX_OK;
187 185 ul.unlock();
... ... @@ -195,10 +193,10 @@ Command&lt;ReplyT&gt;* Redox::command_blocking(const std::string&amp; cmd) {
195 193 },
196 194 0, 0, false // No repeats, don't free memory
197 195 );
198   - std::cout << "command blocking cv wait starting" << std::endl;
  196 +
199 197 // Wait until a callback is invoked
200 198 cv.wait(lk, [&status] { return status != REDOX_UNINIT; });
201   - std::cout << "command blocking cv wait over" << std::endl;
  199 +
202 200 c->reply_val = val;
203 201 c->reply_status = status;
204 202  
... ...