Commit 3f65d70aefbb3cc695fa5562a47825a4feed8011

Authored by Hayk Martirosyan
1 parent 710acae0

Complete pub/sub implementation

subscribe/psubscribe/unsubscribe/punsubscribe methods that keep track of
subscribed topics to make sure we don't ask hiredis for bad things. It
appears all crashes are eliminated, though no stress testing has been
done.
examples/pub_sub.cpp
@@ -15,6 +15,9 @@ int main(int argc, char *argv[]) { @@ -15,6 +15,9 @@ int main(int argc, char *argv[]) {
15 redox::Redox rdx; // Initialize Redox (default host/port) 15 redox::Redox rdx; // Initialize Redox (default host/port)
16 if (!rdx.start()) return 1; // Start the event loop 16 if (!rdx.start()) return 1; // Start the event loop
17 17
  18 + redox::Redox rdx_pub;
  19 + if(!rdx_pub.start()) return 1;
  20 +
18 auto got_message = [](const string& topic, const string& msg) { 21 auto got_message = [](const string& topic, const string& msg) {
19 cout << topic << ": " << msg << endl; 22 cout << topic << ": " << msg << endl;
20 }; 23 };
@@ -27,27 +30,29 @@ int main(int argc, char *argv[]) { @@ -27,27 +30,29 @@ int main(int argc, char *argv[]) {
27 cout << "> Unsubscribed from " << topic << endl; 30 cout << "> Unsubscribed from " << topic << endl;
28 }; 31 };
29 32
30 - rdx.subscribe("news", got_message, subscribed, unsubscribed); 33 + rdx.psubscribe("news", got_message, subscribed, unsubscribed);
31 rdx.subscribe("sports", got_message, subscribed, unsubscribed); 34 rdx.subscribe("sports", got_message, subscribed, unsubscribed);
32 35
33 - redox::Redox rdx_pub;  
34 - if(!rdx_pub.start()) return 1; 36 + this_thread::sleep_for(chrono::milliseconds(20));
  37 + for(auto s : rdx.subscribed_topics()) cout << "topic: " << s << endl;
35 38
36 rdx_pub.publish("news", "hello!"); 39 rdx_pub.publish("news", "hello!");
37 rdx_pub.publish("news", "whatup"); 40 rdx_pub.publish("news", "whatup");
38 rdx_pub.publish("sports", "yo"); 41 rdx_pub.publish("sports", "yo");
39 42
40 - this_thread::sleep_for(chrono::seconds(10)); 43 + this_thread::sleep_for(chrono::seconds(1));
41 rdx.unsubscribe("sports"); 44 rdx.unsubscribe("sports");
42 rdx_pub.publish("sports", "yo"); 45 rdx_pub.publish("sports", "yo");
43 rdx_pub.publish("news", "whatup"); 46 rdx_pub.publish("news", "whatup");
44 47
45 - this_thread::sleep_for(chrono::seconds(10));  
46 - rdx.unsubscribe("news"); 48 + this_thread::sleep_for(chrono::milliseconds(1));
  49 + rdx.punsubscribe("news");
  50 +
47 rdx_pub.publish("sports", "yo"); 51 rdx_pub.publish("sports", "yo");
48 rdx_pub.publish("news", "whatup", [](const string& topic, const string& msg) { 52 rdx_pub.publish("news", "whatup", [](const string& topic, const string& msg) {
49 cout << "published to " << topic << ": " << msg << endl; 53 cout << "published to " << topic << ": " << msg << endl;
50 }); 54 });
51 - 55 + rdx_pub.publish("news", "whatup");
52 rdx.block(); 56 rdx.block();
  57 + rdx_pub.block();
53 } 58 }
src/redox.cpp
@@ -368,44 +368,68 @@ void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int r @@ -368,44 +368,68 @@ void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int r
368 // Pub/Sub methods 368 // Pub/Sub methods
369 // --------------------------------- 369 // ---------------------------------
370 370
371 -void Redox::subscribe(const string& topic,  
372 - function<void(const string& topic, const string& message)> msg_callback,  
373 - function<void(const string& topic)> sub_callback,  
374 - function<void(const string& topic)> unsub_callback,  
375 - function<void(const string& topic, int status)> err_callback 371 +void Redox::subscribe_raw(const string cmd_name, const string topic,
  372 + function<void(const string&, const string&)> msg_callback,
  373 + function<void(const string&)> sub_callback,
  374 + function<void(const string&)> unsub_callback,
  375 + function<void(const string&, int)> err_callback
376 ) { 376 ) {
377 377
378 // Start pubsub mode. No non-sub/unsub commands can be emitted by this client. 378 // Start pubsub mode. No non-sub/unsub commands can be emitted by this client.
379 pubsub_mode = true; 379 pubsub_mode = true;
380 380
381 - command<redisReply*>("SUBSCRIBE " + topic, 381 + command<redisReply*>(cmd_name + " " + topic,
382 [this, topic, msg_callback, sub_callback, unsub_callback](const string &cmd, redisReply* const& reply) { 382 [this, topic, msg_callback, sub_callback, unsub_callback](const string &cmd, redisReply* const& reply) {
383 383
384 - if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) {  
385 -  
386 - // Faster way of checking if a message or sub/unsub notification.  
387 - // If the last element is an integer, then it was a sub/unsub notification.  
388 - // If the last element is a string, then its a message.  
389 - // The goal is to avoid doing a string compare for "message" every message.  
390 - if(reply->element[2]->type == REDIS_REPLY_INTEGER) {  
391 -  
392 - if(!strncmp(reply->element[0]->str, "sub", 3)) {  
393 - if(sub_callback) sub_callback(topic); 384 + // For debugging only
  385 +// cout << "------" << endl;
  386 +// cout << cmd << " " << (reply->type == REDIS_REPLY_ARRAY) << " " << (reply->elements) << endl;
  387 +// for(int i = 0; i < reply->elements; i++) {
  388 +// redisReply* r = reply->element[i];
  389 +// cout << "element " << i << ", reply type = " << r->type << " ";
  390 +// if(r->type == REDIS_REPLY_STRING) cout << r->str << endl;
  391 +// else if(r->type == REDIS_REPLY_INTEGER) cout << r->integer << endl;
  392 +// else cout << "some other type" << endl;
  393 +// }
  394 +// cout << "------" << endl;
  395 +
  396 + // If the last entry is an integer, then it is a [p]sub/[p]unsub command
  397 + if((reply->type == REDIS_REPLY_ARRAY) &&
  398 + (reply->element[reply->elements-1]->type == REDIS_REPLY_INTEGER)) {
  399 +
  400 + if(!strncmp(reply->element[0]->str, "sub", 3)) {
  401 + sub_queue.insert(topic);
  402 + if(sub_callback) sub_callback(topic);
  403 +
  404 + } else if(!strncmp(reply->element[0]->str, "psub", 4)) {
  405 + psub_queue.insert(topic);
  406 + if (sub_callback) sub_callback(topic);
  407 +
  408 + } else if(!strncmp(reply->element[0]->str, "uns", 3)) {
  409 + sub_queue.erase(topic);
  410 + if (unsub_callback) unsub_callback(topic);
  411 +
  412 + } else if(!strncmp(reply->element[0]->str, "puns", 4)) {
  413 + psub_queue.erase(topic);
  414 + if (unsub_callback) unsub_callback(topic);
  415 + }
394 416
395 - } else if(!strncmp(reply->element[0]->str, "uns", 3)) {  
396 - if(unsub_callback) unsub_callback(topic); 417 + else logger.error() << "Unknown pubsub message: " << reply->element[0]->str;
  418 + }
397 419
398 - } else logger.error() << "Unknown pubsub message: " << reply->element[1]->str;  
399 - } 420 + // Message for subscribe
  421 + else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) {
  422 + char *msg = reply->element[2]->str;
  423 + if (msg && msg_callback) msg_callback(topic, reply->element[2]->str);
  424 + }
400 425
401 - // Got a message  
402 - else if(reply->element[2]->type == REDIS_REPLY_STRING) {  
403 - char *msg = reply->element[2]->str;  
404 - if (msg && msg_callback) msg_callback(topic, reply->element[2]->str);  
405 - }  
406 - } else {  
407 - logger.error() << "Subscribe command got reply other than a 3-element array."; 426 + // Message for psubscribe
  427 + else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 4)) {
  428 + char *msg = reply->element[2]->str;
  429 + if (msg && msg_callback) msg_callback(reply->element[2]->str, reply->element[3]->str);
408 } 430 }
  431 +
  432 + else logger.error() << "Unknown pubsub message of type " << reply->type;
409 }, 433 },
410 [topic, err_callback](const string &cmd, int status) { 434 [topic, err_callback](const string &cmd, int status) {
411 if(err_callback) err_callback(topic, status); 435 if(err_callback) err_callback(topic, status);
@@ -414,10 +438,36 @@ void Redox::subscribe(const string&amp; topic, @@ -414,10 +438,36 @@ void Redox::subscribe(const string&amp; topic,
414 ); 438 );
415 } 439 }
416 440
417 -void Redox::unsubscribe(const string& topic,  
418 - function<void(const string& topic, int status)> err_callback 441 +void Redox::subscribe(const string topic,
  442 + function<void(const string&, const string&)> msg_callback,
  443 + function<void(const string&)> sub_callback,
  444 + function<void(const string&)> unsub_callback,
  445 + function<void(const string&, int)> err_callback
  446 +) {
  447 + if(sub_queue.find(topic) != sub_queue.end()) {
  448 + logger.warning() << "Already subscribed to " << topic << "!";
  449 + return;
  450 + }
  451 + subscribe_raw("SUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback);
  452 +}
  453 +
  454 +void Redox::psubscribe(const string topic,
  455 + function<void(const string&, const string&)> msg_callback,
  456 + function<void(const string&)> sub_callback,
  457 + function<void(const string&)> unsub_callback,
  458 + function<void(const string&, int)> err_callback
  459 +) {
  460 + if(psub_queue.find(topic) != psub_queue.end()) {
  461 + logger.warning() << "Already psubscribed to " << topic << "!";
  462 + return;
  463 + }
  464 + subscribe_raw("PSUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback);
  465 +}
  466 +
  467 +void Redox::unsubscribe_raw(const string cmd_name, const string topic,
  468 + function<void(const string&, int)> err_callback
419 ) { 469 ) {
420 - command<redisReply*>("UNSUBSCRIBE " + topic, 470 + command<redisReply*>(cmd_name + " " + topic,
421 nullptr, 471 nullptr,
422 [topic, err_callback](const string& cmd, int status) { 472 [topic, err_callback](const string& cmd, int status) {
423 if(err_callback) err_callback(topic, status); 473 if(err_callback) err_callback(topic, status);
@@ -425,9 +475,29 @@ void Redox::unsubscribe(const string&amp; topic, @@ -425,9 +475,29 @@ void Redox::unsubscribe(const string&amp; topic,
425 ); 475 );
426 } 476 }
427 477
428 -void Redox::publish(const string& topic, const string& msg,  
429 - function<void(const string& topic, const string& msg)> pub_callback,  
430 - function<void(const string& topic, int status)> err_callback 478 +void Redox::unsubscribe(const string topic,
  479 + function<void(const string&, int)> err_callback
  480 +) {
  481 + if(sub_queue.find(topic) == sub_queue.end()) {
  482 + logger.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!";
  483 + return;
  484 + }
  485 + unsubscribe_raw("UNSUBSCRIBE", topic, err_callback);
  486 +}
  487 +
  488 +void Redox::punsubscribe(const string topic,
  489 + function<void(const string&, int)> err_callback
  490 +) {
  491 + if(psub_queue.find(topic) == psub_queue.end()) {
  492 + logger.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!";
  493 + return;
  494 + }
  495 + unsubscribe_raw("PUNSUBSCRIBE", topic, err_callback);
  496 +}
  497 +
  498 +void Redox::publish(const string topic, const string msg,
  499 + function<void(const string&, const string&)> pub_callback,
  500 + function<void(const string&, int)> err_callback
431 ) { 501 ) {
432 command<redisReply*>("PUBLISH " + topic + " " + msg, 502 command<redisReply*>("PUBLISH " + topic + " " + msg,
433 [topic, msg, pub_callback](const string& command, redisReply* const& reply) { 503 [topic, msg, pub_callback](const string& command, redisReply* const& reply) {
@@ -442,15 +512,15 @@ void Redox::publish(const string&amp; topic, const string&amp; msg, @@ -442,15 +512,15 @@ void Redox::publish(const string&amp; topic, const string&amp; msg,
442 /** 512 /**
443 * Throw an exception for any non-pubsub commands. 513 * Throw an exception for any non-pubsub commands.
444 */ 514 */
445 -void Redox::deny_non_pubsub(const std::string& cmd) { 515 +void Redox::deny_non_pubsub(const string& cmd) {
446 516
447 - std::string cmd_name = cmd.substr(0, cmd.find(' ')); 517 + string cmd_name = cmd.substr(0, cmd.find(' '));
448 518
449 // Compare with the command's first 5 characters 519 // Compare with the command's first 5 characters
450 if(!cmd_name.compare("SUBSCRIBE") || !cmd_name.compare("UNSUBSCRIBE") || 520 if(!cmd_name.compare("SUBSCRIBE") || !cmd_name.compare("UNSUBSCRIBE") ||
451 !cmd_name.compare("PSUBSCRIBE") || !cmd_name.compare("PUNSUBSCRIBE")) { 521 !cmd_name.compare("PSUBSCRIBE") || !cmd_name.compare("PUNSUBSCRIBE")) {
452 } else { 522 } else {
453 - throw std::runtime_error("In pub/sub mode, this Redox instance can only issue " 523 + throw runtime_error("In pub/sub mode, this Redox instance can only issue "
454 "[p]subscribe/[p]unsubscribe commands! Use another instance for other commands."); 524 "[p]subscribe/[p]unsubscribe commands! Use another instance for other commands.");
455 } 525 }
456 } 526 }
src/redox.hpp
@@ -172,6 +172,10 @@ public: @@ -172,6 +172,10 @@ public:
172 */ 172 */
173 bool del(const std::string& key); 173 bool del(const std::string& key);
174 174
  175 + // ------------------------------------------------
  176 + // Publish/subscribe
  177 + // ------------------------------------------------
  178 +
175 // This is activated when subscribe is called. When active, 179 // This is activated when subscribe is called. When active,
176 // all commands other than [P]SUBSCRIBE, [P]UNSUBSCRIBE 180 // all commands other than [P]SUBSCRIBE, [P]UNSUBSCRIBE
177 // throw exceptions 181 // throw exceptions
@@ -184,11 +188,25 @@ public: @@ -184,11 +188,25 @@ public:
184 * sub_callback: invoked when successfully subscribed 188 * sub_callback: invoked when successfully subscribed
185 * err_callback: invoked on some error state 189 * err_callback: invoked on some error state
186 */ 190 */
187 - void subscribe(const std::string& topic,  
188 - std::function<void(const std::string& topic, const std::string& message)> msg_callback,  
189 - std::function<void(const std::string& topic)> sub_callback = nullptr,  
190 - std::function<void(const std::string& topic)> unsub_callback = nullptr,  
191 - std::function<void(const std::string& topic, int status)> err_callback = nullptr 191 + void subscribe(const std::string topic,
  192 + std::function<void(const std::string&, const std::string&)> msg_callback,
  193 + std::function<void(const std::string&)> sub_callback = nullptr,
  194 + std::function<void(const std::string&)> unsub_callback = nullptr,
  195 + std::function<void(const std::string&, int)> err_callback = nullptr
  196 + );
  197 +
  198 + /**
  199 + * Subscribe to a topic with a pattern.
  200 + *
  201 + * msg_callback: invoked whenever a message is received.
  202 + * sub_callback: invoked when successfully subscribed
  203 + * err_callback: invoked on some error state
  204 + */
  205 + void psubscribe(const std::string topic,
  206 + std::function<void(const std::string&, const std::string&)> msg_callback,
  207 + std::function<void(const std::string&)> sub_callback = nullptr,
  208 + std::function<void(const std::string&)> unsub_callback = nullptr,
  209 + std::function<void(const std::string&, int)> err_callback = nullptr
192 ); 210 );
193 211
194 /** 212 /**
@@ -197,9 +215,9 @@ public: @@ -197,9 +215,9 @@ public:
197 * pub_callback: invoked when successfully published 215 * pub_callback: invoked when successfully published
198 * err_callback: invoked on some error state 216 * err_callback: invoked on some error state
199 */ 217 */
200 - void publish(const std::string& topic, const std::string& msg,  
201 - std::function<void(const std::string& topic, const std::string& msg)> pub_callback = nullptr,  
202 - std::function<void(const std::string& topic, int status)> err_callback = nullptr 218 + void publish(const std::string topic, const std::string msg,
  219 + std::function<void(const std::string&, const std::string&)> pub_callback = nullptr,
  220 + std::function<void(const std::string&, int)> err_callback = nullptr
203 ); 221 );
204 222
205 /** 223 /**
@@ -207,10 +225,26 @@ public: @@ -207,10 +225,26 @@ public:
207 * 225 *
208 * err_callback: invoked on some error state 226 * err_callback: invoked on some error state
209 */ 227 */
210 - void unsubscribe(const std::string& topic,  
211 - std::function<void(const std::string& topic, int status)> err_callback = nullptr 228 + void unsubscribe(const std::string topic,
  229 + std::function<void(const std::string&, int)> err_callback = nullptr
  230 + );
  231 +
  232 + /**
  233 + * Unsubscribe from a topic with a pattern.
  234 + *
  235 + * err_callback: invoked on some error state
  236 + */
  237 + void punsubscribe(const std::string topic,
  238 + std::function<void(const std::string&, int)> err_callback = nullptr
212 ); 239 );
213 240
  241 + const std::set<std::string>& subscribed_topics() { return sub_queue; }
  242 + const std::set<std::string>& psubscribed_topics() { return psub_queue; }
  243 +
  244 + // ------------------------------------------------
  245 + // Public only for Command class
  246 + // ------------------------------------------------
  247 +
214 // Invoked by Command objects when they are completed 248 // Invoked by Command objects when they are completed
215 template<class ReplyT> 249 template<class ReplyT>
216 void remove_active_command(const long id) { 250 void remove_active_command(const long id) {
@@ -314,6 +348,25 @@ private: @@ -314,6 +348,25 @@ private:
314 static void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents); 348 static void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents);
315 349
316 void deny_non_pubsub(const std::string& cmd); 350 void deny_non_pubsub(const std::string& cmd);
  351 +
  352 + // Base for subscribe and psubscribe
  353 + void subscribe_raw(const std::string cmd_name, const std::string topic,
  354 + std::function<void(const std::string&, const std::string&)> msg_callback,
  355 + std::function<void(const std::string&)> sub_callback = nullptr,
  356 + std::function<void(const std::string&)> unsub_callback = nullptr,
  357 + std::function<void(const std::string&, int)> err_callback = nullptr
  358 + );
  359 +
  360 + // Base for unsubscribe and punsubscribe
  361 + void unsubscribe_raw(const std::string cmd_name, const std::string topic,
  362 + std::function<void(const std::string&, int)> err_callback = nullptr
  363 + );
  364 +
  365 + // Keep track of topics because we can only unsubscribe
  366 + // from subscribed topics and punsubscribe from
  367 + // psubscribed topics, or hiredis leads to segfaults
  368 + std::set<std::string> sub_queue;
  369 + std::set<std::string> psub_queue;
317 }; 370 };
318 371
319 // --------------------------- 372 // ---------------------------