Commit 5d3828ddb01ce95271d0e73a6bcf690c576ae8bd

Authored by Hayk Martirosyan
1 parent c2436e64

Implement unique Command ID to replace void* usage

Ran into a nasty segfault when using 100 parallel asynchronous command
loops, where a command's address would match up with a previous
command's
address. To circumvent the whole issue, Redox keeps a counter of how
many Commands it has created and assigns each Command a unique id in
the order they are created.

Changed all data structures and methods to pass around Command IDs, so
nothing uses void* anymore. I consider this a very good thing, but it
did seem to slow down performance 20-30% because of the extra lookups.
Will see if there's a solution later.
src/command.hpp
@@ -38,6 +38,7 @@ friend void submit_command_callback<ReplyT>(struct ev_loop* loop, ev_timer* time @@ -38,6 +38,7 @@ friend void submit_command_callback<ReplyT>(struct ev_loop* loop, ev_timer* time
38 public: 38 public:
39 Command( 39 Command(
40 Redox* rdx, 40 Redox* rdx,
  41 + long id,
41 const std::string& cmd, 42 const std::string& cmd,
42 const std::function<void(const std::string&, const ReplyT&)>& callback, 43 const std::function<void(const std::string&, const ReplyT&)>& callback,
43 const std::function<void(const std::string&, int status)>& error_callback, 44 const std::function<void(const std::string&, int status)>& error_callback,
@@ -47,6 +48,7 @@ public: @@ -47,6 +48,7 @@ public:
47 48
48 Redox* rdx; 49 Redox* rdx;
49 50
  51 + const long id;
50 const std::string cmd; 52 const std::string cmd;
51 const double repeat; 53 const double repeat;
52 const double after; 54 const double after;
@@ -106,11 +108,12 @@ private: @@ -106,11 +108,12 @@ private:
106 template<class ReplyT> 108 template<class ReplyT>
107 Command<ReplyT>::Command( 109 Command<ReplyT>::Command(
108 Redox* rdx, 110 Redox* rdx,
  111 + long id,
109 const std::string& cmd, 112 const std::string& cmd,
110 const std::function<void(const std::string&, const ReplyT&)>& callback, 113 const std::function<void(const std::string&, const ReplyT&)>& callback,
111 const std::function<void(const std::string&, int status)>& error_callback, 114 const std::function<void(const std::string&, int status)>& error_callback,
112 double repeat, double after, bool free_memory 115 double repeat, double after, bool free_memory
113 -) : rdx(rdx), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), 116 +) : rdx(rdx), id(id), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory),
114 callback(callback), error_callback(error_callback) 117 callback(callback), error_callback(error_callback)
115 { 118 {
116 timer_guard.lock(); 119 timer_guard.lock();
@@ -119,6 +122,11 @@ Command&lt;ReplyT&gt;::Command( @@ -119,6 +122,11 @@ Command&lt;ReplyT&gt;::Command(
119 template<class ReplyT> 122 template<class ReplyT>
120 void Command<ReplyT>::process_reply() { 123 void Command<ReplyT>::process_reply() {
121 124
  125 + if(cmd == "GET simple_loop:count") {
  126 + std::cout << "In process_reply, cmd = " << cmd << ", reply_obj = " << reply_obj << std::endl;
  127 + std::cout << "reply int: " << reply_obj->integer << std::endl;
  128 + std::cout << "reply str: " << reply_obj->str << std::endl;
  129 + }
122 free_guard.lock(); 130 free_guard.lock();
123 131
124 invoke_callback(); 132 invoke_callback();
@@ -126,11 +134,13 @@ void Command&lt;ReplyT&gt;::process_reply() { @@ -126,11 +134,13 @@ void Command&lt;ReplyT&gt;::process_reply() {
126 pending--; 134 pending--;
127 135
128 if(!free_memory) { 136 if(!free_memory) {
  137 + std::cout << "Command memory not being freed, free_memory = " << free_memory << std::endl;
129 // Allow free() method to free memory 138 // Allow free() method to free memory
130 free_guard.unlock(); 139 free_guard.unlock();
131 return; 140 return;
132 } 141 }
133 142
  143 +
134 free_reply_object(); 144 free_reply_object();
135 145
136 if((pending == 0) && (repeat == 0)) { 146 if((pending == 0) && (repeat == 0)) {
@@ -165,7 +175,7 @@ void Command&lt;ReplyT&gt;::free_reply_object() { @@ -165,7 +175,7 @@ void Command&lt;ReplyT&gt;::free_reply_object() {
165 template<class ReplyT> 175 template<class ReplyT>
166 void Command<ReplyT>::free_command(Command<ReplyT>* c) { 176 void Command<ReplyT>::free_command(Command<ReplyT>* c) {
167 c->rdx->commands_deleted += 1; 177 c->rdx->commands_deleted += 1;
168 - c->rdx->remove_active_command(c); 178 + c->rdx->template remove_active_command<ReplyT>(c->id);
169 // std::cout << "[INFO] Deleted Command " << c->rdx->commands_created << " at " << c << std::endl; 179 // std::cout << "[INFO] Deleted Command " << c->rdx->commands_created << " at " << c << std::endl;
170 delete c; 180 delete c;
171 } 181 }
src/redox.cpp
@@ -147,16 +147,34 @@ void Redox::stop() { @@ -147,16 +147,34 @@ void Redox::stop() {
147 template<class ReplyT> 147 template<class ReplyT>
148 void command_callback(redisAsyncContext *ctx, void *r, void *privdata) { 148 void command_callback(redisAsyncContext *ctx, void *r, void *privdata) {
149 149
150 - Command<ReplyT>* c = (Command<ReplyT>*) privdata;  
151 - redisReply* reply_obj = (redisReply*) r;  
152 Redox* rdx = (Redox*) ctx->data; 150 Redox* rdx = (Redox*) ctx->data;
  151 + long id = (long)privdata;
  152 + redisReply* reply_obj = (redisReply*) r;
153 153
154 - if(!rdx->is_active_command(c)) { 154 + auto& command_map = rdx->get_command_map<ReplyT>();
  155 + auto it = command_map.find(id);
  156 + if(it == command_map.end()) {
  157 + cout << "[ERROR] Couldn't find Command " << id << " in command_map." << endl;
  158 + freeReplyObject(r);
  159 + return;
  160 + };
  161 + Command<ReplyT>* c = it->second;
  162 +
  163 + if(!rdx->is_active_command(c->id)) {
155 std::cout << "[INFO] Ignoring callback, command " << c << " was freed." << std::endl; 164 std::cout << "[INFO] Ignoring callback, command " << c << " was freed." << std::endl;
156 freeReplyObject(r); 165 freeReplyObject(r);
157 return; 166 return;
158 } 167 }
159 168
  169 + if(c->cmd == "GET simple_loop:count") {
  170 + std::cout << "In command_callback = " << c->cmd << " at " << c << ", reply_obj = " << reply_obj << std::endl;
  171 + std::cout << "reply type: " << reply_obj->type << std::endl;
  172 + std::cout << "reply int: " << reply_obj->integer << std::endl;
  173 + std::cout << "reply str: " << reply_obj->str << std::endl;
  174 +// std::string s(reply_obj->str);
  175 +// std::cout << "string object: " << s << std::endl;
  176 + }
  177 +
160 c->reply_obj = reply_obj; 178 c->reply_obj = reply_obj;
161 c->process_reply(); 179 c->process_reply();
162 180
@@ -170,8 +188,11 @@ void command_callback(redisAsyncContext *ctx, void *r, void *privdata) { @@ -170,8 +188,11 @@ void command_callback(redisAsyncContext *ctx, void *r, void *privdata) {
170 */ 188 */
171 template<class ReplyT> 189 template<class ReplyT>
172 bool submit_to_server(Command<ReplyT>* c) { 190 bool submit_to_server(Command<ReplyT>* c) {
  191 + if(c->cmd == "GET simple_loop:count") {
  192 + std::cout << "submit_to_server for cmd at " << c << ": " << c->cmd << std::endl;
  193 + }
173 c->pending++; 194 c->pending++;
174 - if (redisAsyncCommand(c->rdx->ctx, command_callback<ReplyT>, (void*)c, c->cmd.c_str()) != REDIS_OK) { 195 + if (redisAsyncCommand(c->rdx->ctx, command_callback<ReplyT>, (void*)c->id, c->cmd.c_str()) != REDIS_OK) {
175 cerr << "[ERROR] Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr << endl; 196 cerr << "[ERROR] Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr << endl;
176 c->invoke_error(REDOX_SEND_ERROR); 197 c->invoke_error(REDOX_SEND_ERROR);
177 return false; 198 return false;
@@ -182,7 +203,16 @@ bool submit_to_server(Command&lt;ReplyT&gt;* c) { @@ -182,7 +203,16 @@ bool submit_to_server(Command&lt;ReplyT&gt;* c) {
182 template<class ReplyT> 203 template<class ReplyT>
183 void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { 204 void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) {
184 205
185 - auto c = (Command<ReplyT>*)timer->data; 206 + Redox* rdx = (Redox*) ev_userdata(loop);
  207 + long id = (long)timer->data;
  208 +
  209 + auto& command_map = rdx->get_command_map<ReplyT>();
  210 + auto it = command_map.find(id);
  211 + if(it == command_map.end()) {
  212 + cout << "[ERROR] Couldn't find Command " << id << " in command_map." << endl;
  213 + return;
  214 + };
  215 + Command<ReplyT>* c = it->second;
186 216
187 if(c->is_completed()) { 217 if(c->is_completed()) {
188 218
@@ -202,20 +232,23 @@ void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) @@ -202,20 +232,23 @@ void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents)
202 } 232 }
203 233
204 template<class ReplyT> 234 template<class ReplyT>
205 -bool Redox::process_queued_command(void* c_ptr) { 235 +bool Redox::process_queued_command(long id) {
206 236
207 auto& command_map = get_command_map<ReplyT>(); 237 auto& command_map = get_command_map<ReplyT>();
208 238
209 - auto it = command_map.find(c_ptr); 239 + auto it = command_map.find(id);
210 if(it == command_map.end()) return false; 240 if(it == command_map.end()) return false;
211 Command<ReplyT>* c = it->second; 241 Command<ReplyT>* c = it->second;
212 - command_map.erase(c_ptr); 242 +
  243 + if(c->cmd == "GET simple_loop:count") {
  244 + std::cout << "process_queued_command for cmd at " << c << ": " << c->cmd << std::endl;
  245 + }
213 246
214 if((c->repeat == 0) && (c->after == 0)) { 247 if((c->repeat == 0) && (c->after == 0)) {
215 submit_to_server<ReplyT>(c); 248 submit_to_server<ReplyT>(c);
216 } else { 249 } else {
217 250
218 - c->timer.data = (void*)c; 251 + c->timer.data = (void*)c->id;
219 252
220 ev_timer_init(&c->timer, submit_command_callback<ReplyT>, c->after, c->repeat); 253 ev_timer_init(&c->timer, submit_command_callback<ReplyT>, c->after, c->repeat);
221 ev_timer_start(evloop, &c->timer); 254 ev_timer_start(evloop, &c->timer);
@@ -232,13 +265,13 @@ void Redox::process_queued_commands() { @@ -232,13 +265,13 @@ void Redox::process_queued_commands() {
232 265
233 while(!command_queue.empty()) { 266 while(!command_queue.empty()) {
234 267
235 - void* c_ptr = command_queue.front();  
236 - if(process_queued_command<redisReply*>(c_ptr)) {}  
237 - else if(process_queued_command<string>(c_ptr)) {}  
238 - else if(process_queued_command<char*>(c_ptr)) {}  
239 - else if(process_queued_command<int>(c_ptr)) {}  
240 - else if(process_queued_command<long long int>(c_ptr)) {}  
241 - else if(process_queued_command<nullptr_t>(c_ptr)) {} 268 + long id = command_queue.front();
  269 + if(process_queued_command<redisReply*>(id)) {}
  270 + else if(process_queued_command<string>(id)) {}
  271 + else if(process_queued_command<char*>(id)) {}
  272 + else if(process_queued_command<int>(id)) {}
  273 + else if(process_queued_command<long long int>(id)) {}
  274 + else if(process_queued_command<nullptr_t>(id)) {}
242 else throw runtime_error("[FATAL] Command pointer not found in any queue!"); 275 else throw runtime_error("[FATAL] Command pointer not found in any queue!");
243 276
244 command_queue.pop(); 277 command_queue.pop();
@@ -247,22 +280,22 @@ void Redox::process_queued_commands() { @@ -247,22 +280,22 @@ void Redox::process_queued_commands() {
247 280
248 // ---------------------------- 281 // ----------------------------
249 282
250 -template<> unordered_map<void*, Command<redisReply*>*>& 283 +template<> unordered_map<long, Command<redisReply*>*>&
251 Redox::get_command_map() { return commands_redis_reply; } 284 Redox::get_command_map() { return commands_redis_reply; }
252 285
253 -template<> unordered_map<void*, Command<string>*>& 286 +template<> unordered_map<long, Command<string>*>&
254 Redox::get_command_map() { return commands_string_r; } 287 Redox::get_command_map() { return commands_string_r; }
255 288
256 -template<> unordered_map<void*, Command<char*>*>& 289 +template<> unordered_map<long, Command<char*>*>&
257 Redox::get_command_map() { return commands_char_p; } 290 Redox::get_command_map() { return commands_char_p; }
258 291
259 -template<> unordered_map<void*, Command<int>*>& 292 +template<> unordered_map<long, Command<int>*>&
260 Redox::get_command_map() { return commands_int; } 293 Redox::get_command_map() { return commands_int; }
261 294
262 -template<> unordered_map<void*, Command<long long int>*>& 295 +template<> unordered_map<long, Command<long long int>*>&
263 Redox::get_command_map() { return commands_long_long_int; } 296 Redox::get_command_map() { return commands_long_long_int; }
264 297
265 -template<> unordered_map<void*, Command<nullptr_t>*>& 298 +template<> unordered_map<long, Command<nullptr_t>*>&
266 Redox::get_command_map() { return commands_null; } 299 Redox::get_command_map() { return commands_null; }
267 300
268 // ---------------------------- 301 // ----------------------------
src/redox.hpp
@@ -70,17 +70,22 @@ public: @@ -70,17 +70,22 @@ public:
70 // void subscribe(std::string channel, std::function<void(std::string channel, std::string msg)> callback); 70 // void subscribe(std::string channel, std::function<void(std::string channel, std::string msg)> callback);
71 // void unsubscribe(std::string channel); 71 // void unsubscribe(std::string channel);
72 72
73 - std::atomic_int commands_created = {0};  
74 - std::atomic_int commands_deleted = {0}; 73 + std::atomic_long commands_created = {0};
  74 + std::atomic_long commands_deleted = {0};
75 75
76 - bool is_active_command(void* c_ptr) {  
77 - return active_commands.find(c_ptr) != active_commands.end(); 76 + bool is_active_command(const long id) {
  77 + return active_commands.find(id) != active_commands.end();
78 } 78 }
79 79
80 - void remove_active_command(void* c_ptr) {  
81 - active_commands.erase(c_ptr); 80 + template<class ReplyT>
  81 + void remove_active_command(const long id) {
  82 + active_commands.erase(id);
  83 + get_command_map<ReplyT>().erase(id);
82 } 84 }
83 85
  86 + template<class ReplyT>
  87 + std::unordered_map<long, Command<ReplyT>*>& get_command_map();
  88 +
84 private: 89 private:
85 90
86 // Redox server 91 // Redox server
@@ -103,25 +108,22 @@ private: @@ -103,25 +108,22 @@ private:
103 108
104 std::thread event_loop_thread; 109 std::thread event_loop_thread;
105 110
106 - std::unordered_map<void*, Command<redisReply*>*> commands_redis_reply;  
107 - std::unordered_map<void*, Command<std::string>*> commands_string_r;  
108 - std::unordered_map<void*, Command<char*>*> commands_char_p;  
109 - std::unordered_map<void*, Command<int>*> commands_int;  
110 - std::unordered_map<void*, Command<long long int>*> commands_long_long_int;  
111 - std::unordered_map<void*, Command<std::nullptr_t>*> commands_null;  
112 -  
113 - template<class ReplyT>  
114 - std::unordered_map<void*, Command<ReplyT>*>& get_command_map(); 111 + std::unordered_map<long, Command<redisReply*>*> commands_redis_reply;
  112 + std::unordered_map<long, Command<std::string>*> commands_string_r;
  113 + std::unordered_map<long, Command<char*>*> commands_char_p;
  114 + std::unordered_map<long, Command<int>*> commands_int;
  115 + std::unordered_map<long, Command<long long int>*> commands_long_long_int;
  116 + std::unordered_map<long, Command<std::nullptr_t>*> commands_null;
115 117
116 - std::queue<void*> command_queue; 118 + std::queue<long> command_queue;
117 std::mutex queue_guard; 119 std::mutex queue_guard;
118 void process_queued_commands(); 120 void process_queued_commands();
119 121
120 template<class ReplyT> 122 template<class ReplyT>
121 - bool process_queued_command(void* cmd_ptr); 123 + bool process_queued_command(long id);
122 124
123 - // Commands created but not yet deleted  
124 - std::unordered_set<void*> active_commands; 125 + // Commands created but not yet deleted (stored by id)
  126 + std::unordered_set<long> active_commands;
125 }; 127 };
126 128
127 // --------------------------- 129 // ---------------------------
@@ -136,13 +138,18 @@ Command&lt;ReplyT&gt;* Redox::command( @@ -136,13 +138,18 @@ Command&lt;ReplyT&gt;* Redox::command(
136 bool free_memory 138 bool free_memory
137 ) { 139 ) {
138 std::lock_guard<std::mutex> lg(queue_guard); 140 std::lock_guard<std::mutex> lg(queue_guard);
139 - auto* c = new Command<ReplyT>(this, cmd, callback, error_callback, repeat, after, free_memory);  
140 - void* c_ptr = (void*)c;  
141 - get_command_map<ReplyT>()[c_ptr] = c;  
142 - command_queue.push(c_ptr);  
143 - active_commands.insert(c_ptr); 141 +
144 commands_created += 1; 142 commands_created += 1;
145 -// std::cout << "[DEBUG] Created Command " << commands_created << " at " << c << std::endl; 143 + auto* c = new Command<ReplyT>(this, commands_created, cmd,
  144 + callback, error_callback, repeat, after, free_memory);
  145 +
  146 + get_command_map<ReplyT>()[c->id] = c;
  147 + active_commands.insert(c->id);
  148 + command_queue.push(c->id);
  149 + std::cout << "[DEBUG] Created Command " << c->id << " at " << c << std::endl;
  150 + if(cmd == "GET simple_loop:count") {
  151 + std::cout << "Command created at " << c << ": " << c->cmd << std::endl;
  152 + }
146 return c; 153 return c;
147 } 154 }
148 155
@@ -154,7 +161,7 @@ bool Redox::cancel(Command&lt;ReplyT&gt;* c) { @@ -154,7 +161,7 @@ bool Redox::cancel(Command&lt;ReplyT&gt;* c) {
154 return false; 161 return false;
155 } 162 }
156 163
157 - std::cout << "[INFO] Canceling command at " << c << std::endl; 164 + std::cout << "[INFO] Canceling command " << c->id << " at " << c << std::endl;
158 c->completed = true; 165 c->completed = true;
159 166
160 return true; 167 return true;
@@ -174,6 +181,7 @@ Command&lt;ReplyT&gt;* Redox::command_blocking(const std::string&amp; cmd) { @@ -174,6 +181,7 @@ Command&lt;ReplyT&gt;* Redox::command_blocking(const std::string&amp; cmd) {
174 Command<ReplyT>* c = command<ReplyT>(cmd, 181 Command<ReplyT>* c = command<ReplyT>(cmd,
175 [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) { 182 [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) {
176 std::unique_lock<std::mutex> ul(m); 183 std::unique_lock<std::mutex> ul(m);
  184 + std::cout << "success callback: " << cmd_str << std::endl;
177 val = reply; 185 val = reply;
178 status = REDOX_OK; 186 status = REDOX_OK;
179 ul.unlock(); 187 ul.unlock();
@@ -187,10 +195,10 @@ Command&lt;ReplyT&gt;* Redox::command_blocking(const std::string&amp; cmd) { @@ -187,10 +195,10 @@ Command&lt;ReplyT&gt;* Redox::command_blocking(const std::string&amp; cmd) {
187 }, 195 },
188 0, 0, false // No repeats, don't free memory 196 0, 0, false // No repeats, don't free memory
189 ); 197 );
190 - 198 + std::cout << "command blocking cv wait starting" << std::endl;
191 // Wait until a callback is invoked 199 // Wait until a callback is invoked
192 cv.wait(lk, [&status] { return status != REDOX_UNINIT; }); 200 cv.wait(lk, [&status] { return status != REDOX_UNINIT; });
193 - 201 + std::cout << "command blocking cv wait over" << std::endl;
194 c->reply_val = val; 202 c->reply_val = val;
195 c->reply_status = status; 203 c->reply_status = status;
196 204