diff --git a/examples/pub_sub.cpp b/examples/pub_sub.cpp index 12801a2..c105d90 100644 --- a/examples/pub_sub.cpp +++ b/examples/pub_sub.cpp @@ -15,6 +15,9 @@ int main(int argc, char *argv[]) { redox::Redox rdx; // Initialize Redox (default host/port) if (!rdx.start()) return 1; // Start the event loop + redox::Redox rdx_pub; + if(!rdx_pub.start()) return 1; + auto got_message = [](const string& topic, const string& msg) { cout << topic << ": " << msg << endl; }; @@ -27,27 +30,29 @@ int main(int argc, char *argv[]) { cout << "> Unsubscribed from " << topic << endl; }; - rdx.subscribe("news", got_message, subscribed, unsubscribed); + rdx.psubscribe("news", got_message, subscribed, unsubscribed); rdx.subscribe("sports", got_message, subscribed, unsubscribed); - redox::Redox rdx_pub; - if(!rdx_pub.start()) return 1; + this_thread::sleep_for(chrono::milliseconds(20)); + for(auto s : rdx.subscribed_topics()) cout << "topic: " << s << endl; rdx_pub.publish("news", "hello!"); rdx_pub.publish("news", "whatup"); rdx_pub.publish("sports", "yo"); - this_thread::sleep_for(chrono::seconds(10)); + this_thread::sleep_for(chrono::seconds(1)); rdx.unsubscribe("sports"); rdx_pub.publish("sports", "yo"); rdx_pub.publish("news", "whatup"); - this_thread::sleep_for(chrono::seconds(10)); - rdx.unsubscribe("news"); + this_thread::sleep_for(chrono::milliseconds(1)); + rdx.punsubscribe("news"); + rdx_pub.publish("sports", "yo"); rdx_pub.publish("news", "whatup", [](const string& topic, const string& msg) { cout << "published to " << topic << ": " << msg << endl; }); - + rdx_pub.publish("news", "whatup"); rdx.block(); + rdx_pub.block(); } diff --git a/src/redox.cpp b/src/redox.cpp index 7fafbd2..f7b9669 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -368,44 +368,68 @@ void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int r // Pub/Sub methods // --------------------------------- -void Redox::subscribe(const string& topic, - function msg_callback, - function sub_callback, - function unsub_callback, - function err_callback +void Redox::subscribe_raw(const string cmd_name, const string topic, + function msg_callback, + function sub_callback, + function unsub_callback, + function err_callback ) { // Start pubsub mode. No non-sub/unsub commands can be emitted by this client. pubsub_mode = true; - command("SUBSCRIBE " + topic, + command(cmd_name + " " + topic, [this, topic, msg_callback, sub_callback, unsub_callback](const string &cmd, redisReply* const& reply) { - if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) { - - // Faster way of checking if a message or sub/unsub notification. - // If the last element is an integer, then it was a sub/unsub notification. - // If the last element is a string, then its a message. - // The goal is to avoid doing a string compare for "message" every message. - if(reply->element[2]->type == REDIS_REPLY_INTEGER) { - - if(!strncmp(reply->element[0]->str, "sub", 3)) { - if(sub_callback) sub_callback(topic); + // For debugging only +// cout << "------" << endl; +// cout << cmd << " " << (reply->type == REDIS_REPLY_ARRAY) << " " << (reply->elements) << endl; +// for(int i = 0; i < reply->elements; i++) { +// redisReply* r = reply->element[i]; +// cout << "element " << i << ", reply type = " << r->type << " "; +// if(r->type == REDIS_REPLY_STRING) cout << r->str << endl; +// else if(r->type == REDIS_REPLY_INTEGER) cout << r->integer << endl; +// else cout << "some other type" << endl; +// } +// cout << "------" << endl; + + // If the last entry is an integer, then it is a [p]sub/[p]unsub command + if((reply->type == REDIS_REPLY_ARRAY) && + (reply->element[reply->elements-1]->type == REDIS_REPLY_INTEGER)) { + + if(!strncmp(reply->element[0]->str, "sub", 3)) { + sub_queue.insert(topic); + if(sub_callback) sub_callback(topic); + + } else if(!strncmp(reply->element[0]->str, "psub", 4)) { + psub_queue.insert(topic); + if (sub_callback) sub_callback(topic); + + } else if(!strncmp(reply->element[0]->str, "uns", 3)) { + sub_queue.erase(topic); + if (unsub_callback) unsub_callback(topic); + + } else if(!strncmp(reply->element[0]->str, "puns", 4)) { + psub_queue.erase(topic); + if (unsub_callback) unsub_callback(topic); + } - } else if(!strncmp(reply->element[0]->str, "uns", 3)) { - if(unsub_callback) unsub_callback(topic); + else logger.error() << "Unknown pubsub message: " << reply->element[0]->str; + } - } else logger.error() << "Unknown pubsub message: " << reply->element[1]->str; - } + // Message for subscribe + else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) { + char *msg = reply->element[2]->str; + if (msg && msg_callback) msg_callback(topic, reply->element[2]->str); + } - // Got a message - else if(reply->element[2]->type == REDIS_REPLY_STRING) { - char *msg = reply->element[2]->str; - if (msg && msg_callback) msg_callback(topic, reply->element[2]->str); - } - } else { - logger.error() << "Subscribe command got reply other than a 3-element array."; + // Message for psubscribe + else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 4)) { + char *msg = reply->element[2]->str; + if (msg && msg_callback) msg_callback(reply->element[2]->str, reply->element[3]->str); } + + else logger.error() << "Unknown pubsub message of type " << reply->type; }, [topic, err_callback](const string &cmd, int status) { if(err_callback) err_callback(topic, status); @@ -414,10 +438,36 @@ void Redox::subscribe(const string& topic, ); } -void Redox::unsubscribe(const string& topic, - function err_callback +void Redox::subscribe(const string topic, + function msg_callback, + function sub_callback, + function unsub_callback, + function err_callback +) { + if(sub_queue.find(topic) != sub_queue.end()) { + logger.warning() << "Already subscribed to " << topic << "!"; + return; + } + subscribe_raw("SUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback); +} + +void Redox::psubscribe(const string topic, + function msg_callback, + function sub_callback, + function unsub_callback, + function err_callback +) { + if(psub_queue.find(topic) != psub_queue.end()) { + logger.warning() << "Already psubscribed to " << topic << "!"; + return; + } + subscribe_raw("PSUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback); +} + +void Redox::unsubscribe_raw(const string cmd_name, const string topic, + function err_callback ) { - command("UNSUBSCRIBE " + topic, + command(cmd_name + " " + topic, nullptr, [topic, err_callback](const string& cmd, int status) { if(err_callback) err_callback(topic, status); @@ -425,9 +475,29 @@ void Redox::unsubscribe(const string& topic, ); } -void Redox::publish(const string& topic, const string& msg, - function pub_callback, - function err_callback +void Redox::unsubscribe(const string topic, + function err_callback +) { + if(sub_queue.find(topic) == sub_queue.end()) { + logger.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!"; + return; + } + unsubscribe_raw("UNSUBSCRIBE", topic, err_callback); +} + +void Redox::punsubscribe(const string topic, + function err_callback +) { + if(psub_queue.find(topic) == psub_queue.end()) { + logger.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!"; + return; + } + unsubscribe_raw("PUNSUBSCRIBE", topic, err_callback); +} + +void Redox::publish(const string topic, const string msg, + function pub_callback, + function err_callback ) { command("PUBLISH " + topic + " " + msg, [topic, msg, pub_callback](const string& command, redisReply* const& reply) { @@ -442,15 +512,15 @@ void Redox::publish(const string& topic, const string& msg, /** * Throw an exception for any non-pubsub commands. */ -void Redox::deny_non_pubsub(const std::string& cmd) { +void Redox::deny_non_pubsub(const string& cmd) { - std::string cmd_name = cmd.substr(0, cmd.find(' ')); + string cmd_name = cmd.substr(0, cmd.find(' ')); // Compare with the command's first 5 characters if(!cmd_name.compare("SUBSCRIBE") || !cmd_name.compare("UNSUBSCRIBE") || !cmd_name.compare("PSUBSCRIBE") || !cmd_name.compare("PUNSUBSCRIBE")) { } else { - throw std::runtime_error("In pub/sub mode, this Redox instance can only issue " + throw runtime_error("In pub/sub mode, this Redox instance can only issue " "[p]subscribe/[p]unsubscribe commands! Use another instance for other commands."); } } diff --git a/src/redox.hpp b/src/redox.hpp index a5a9a50..fadabc7 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -172,6 +172,10 @@ public: */ bool del(const std::string& key); + // ------------------------------------------------ + // Publish/subscribe + // ------------------------------------------------ + // This is activated when subscribe is called. When active, // all commands other than [P]SUBSCRIBE, [P]UNSUBSCRIBE // throw exceptions @@ -184,11 +188,25 @@ public: * sub_callback: invoked when successfully subscribed * err_callback: invoked on some error state */ - void subscribe(const std::string& topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr + void subscribe(const std::string topic, + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr + ); + + /** + * Subscribe to a topic with a pattern. + * + * msg_callback: invoked whenever a message is received. + * sub_callback: invoked when successfully subscribed + * err_callback: invoked on some error state + */ + void psubscribe(const std::string topic, + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr ); /** @@ -197,9 +215,9 @@ public: * pub_callback: invoked when successfully published * err_callback: invoked on some error state */ - void publish(const std::string& topic, const std::string& msg, - std::function pub_callback = nullptr, - std::function err_callback = nullptr + void publish(const std::string topic, const std::string msg, + std::function pub_callback = nullptr, + std::function err_callback = nullptr ); /** @@ -207,10 +225,26 @@ public: * * err_callback: invoked on some error state */ - void unsubscribe(const std::string& topic, - std::function err_callback = nullptr + void unsubscribe(const std::string topic, + std::function err_callback = nullptr + ); + + /** + * Unsubscribe from a topic with a pattern. + * + * err_callback: invoked on some error state + */ + void punsubscribe(const std::string topic, + std::function err_callback = nullptr ); + const std::set& subscribed_topics() { return sub_queue; } + const std::set& psubscribed_topics() { return psub_queue; } + + // ------------------------------------------------ + // Public only for Command class + // ------------------------------------------------ + // Invoked by Command objects when they are completed template void remove_active_command(const long id) { @@ -314,6 +348,25 @@ private: static void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents); void deny_non_pubsub(const std::string& cmd); + + // Base for subscribe and psubscribe + void subscribe_raw(const std::string cmd_name, const std::string topic, + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr + ); + + // Base for unsubscribe and punsubscribe + void unsubscribe_raw(const std::string cmd_name, const std::string topic, + std::function err_callback = nullptr + ); + + // Keep track of topics because we can only unsubscribe + // from subscribed topics and punsubscribe from + // psubscribed topics, or hiredis leads to segfaults + std::set sub_queue; + std::set psub_queue; }; // ---------------------------