diff --git a/CMakeLists.txt b/CMakeLists.txt index 9550f46..c5b0746 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -89,4 +89,7 @@ if (examples) add_executable(binary_data examples/binary_data.cpp ${SRC_ALL}) target_link_libraries(binary_data ${LIB_REDIS}) + add_executable(pub_sub examples/pub_sub.cpp ${SRC_ALL}) + target_link_libraries(pub_sub ${LIB_REDIS}) + endif() diff --git a/examples/pub_sub.cpp b/examples/pub_sub.cpp new file mode 100644 index 0000000..12801a2 --- /dev/null +++ b/examples/pub_sub.cpp @@ -0,0 +1,53 @@ +#include +#include +#include +#include +#include "hiredis/hiredis.h" +#include "hiredis/async.h" +#include "hiredis/adapters/libev.h" +#include +#include "../src/redox.hpp" + +using namespace std; + +int main(int argc, char *argv[]) { + + redox::Redox rdx; // Initialize Redox (default host/port) + if (!rdx.start()) return 1; // Start the event loop + + auto got_message = [](const string& topic, const string& msg) { + cout << topic << ": " << msg << endl; + }; + + auto subscribed = [](const string& topic) { + cout << "> Subscribed to " << topic << endl; + }; + + auto unsubscribed = [](const string& topic) { + cout << "> Unsubscribed from " << topic << endl; + }; + + rdx.subscribe("news", got_message, subscribed, unsubscribed); + rdx.subscribe("sports", got_message, subscribed, unsubscribed); + + redox::Redox rdx_pub; + if(!rdx_pub.start()) return 1; + + rdx_pub.publish("news", "hello!"); + rdx_pub.publish("news", "whatup"); + rdx_pub.publish("sports", "yo"); + + this_thread::sleep_for(chrono::seconds(10)); + rdx.unsubscribe("sports"); + rdx_pub.publish("sports", "yo"); + rdx_pub.publish("news", "whatup"); + + this_thread::sleep_for(chrono::seconds(10)); + rdx.unsubscribe("news"); + rdx_pub.publish("sports", "yo"); + rdx_pub.publish("news", "whatup", [](const string& topic, const string& msg) { + cout << "published to " << topic << ": " << msg << endl; + }); + + rdx.block(); +} diff --git a/src/redox.cpp b/src/redox.cpp index e1bc309..7fafbd2 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -4,6 +4,7 @@ #include #include "redox.hpp" +#include using namespace std; @@ -72,8 +73,8 @@ void Redox::init_hiredis() { Redox::Redox( const string& host, const int port, - std::function connection_callback, - std::ostream& log_stream, + function connection_callback, + ostream& log_stream, log::Level log_level ) : host(host), port(port), logger(log_stream, log_level), @@ -88,9 +89,9 @@ Redox::Redox( } Redox::Redox( - const std::string& path, - std::function connection_callback, - std::ostream& log_stream, + const string& path, + function connection_callback, + ostream& log_stream, log::Level log_level ) : host(), port(), path(path), logger(log_stream, log_level), user_connection_callback(connection_callback) { @@ -103,6 +104,10 @@ Redox::Redox( init_hiredis(); } +void break_event_loop(struct ev_loop* loop, ev_async* async, int revents) { + ev_break(loop, EVBREAK_ALL); +} + void Redox::run_event_loop() { // Events to connect to Redox @@ -124,11 +129,16 @@ void Redox::run_event_loop() { ev_async_init(&async_w, process_queued_commands); ev_async_start(evloop, &async_w); + // Set up an async watcher to break the loop + ev_async_init(&async_stop, break_event_loop); + ev_async_start(evloop, &async_stop); + running = true; running_waiter.notify_one(); // Run the event loop while (!to_exit) { +// logger.info() << "Event loop running"; ev_run(evloop, EVRUN_NOWAIT); } @@ -170,7 +180,8 @@ bool Redox::start() { void Redox::stop_signal() { to_exit = true; - ev_break(evloop, EVBREAK_ALL); + logger.debug() << "stop_signal() called, breaking event loop"; + ev_async_send(evloop, &async_stop); } void Redox::block() { @@ -241,6 +252,8 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { */ template bool Redox::submit_to_server(Command* c) { + + Redox* rdx = c->rdx; c->pending++; // Process binary data if trailing quotation. This is a limited implementation @@ -257,8 +270,8 @@ bool Redox::submit_to_server(Command* c) { string format = c->cmd.substr(0, first) + "%b"; string value = c->cmd.substr(first+1, last-first-1); - if (redisAsyncCommand(c->rdx->ctx, command_callback, (void*)c->id, format.c_str(), value.c_str(), value.size()) != REDIS_OK) { - c->rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr; + if (redisAsyncCommand(rdx->ctx, command_callback, (void*)c->id, format.c_str(), value.c_str(), value.size()) != REDIS_OK) { + rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << rdx->ctx->errstr; c->invoke_error(REDOX_SEND_ERROR); return false; } @@ -266,8 +279,8 @@ bool Redox::submit_to_server(Command* c) { } } - if (redisAsyncCommand(c->rdx->ctx, command_callback, (void*)c->id, c->cmd.c_str()) != REDIS_OK) { - c->rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr; + if (redisAsyncCommand(rdx->ctx, command_callback, (void*)c->id, c->cmd.c_str()) != REDIS_OK) { + rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << rdx->ctx->errstr; c->invoke_error(REDOX_SEND_ERROR); return false; } @@ -339,19 +352,110 @@ void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int r rdx->command_queue.pop(); if(rdx->process_queued_command(id)) {} - else if(rdx->process_queued_command(id)) {} + else if(rdx->process_queued_command(id)) {} else if(rdx->process_queued_command(id)) {} else if(rdx->process_queued_command(id)) {} else if(rdx->process_queued_command(id)) {} - else if(rdx->process_queued_command(id)) {} - else if(rdx->process_queued_command>(id)) {} - else if(rdx->process_queued_command>(id)) {} - else if(rdx->process_queued_command>(id)) {} + else if(rdx->process_queued_command(id)) {} + else if(rdx->process_queued_command>(id)) {} + else if(rdx->process_queued_command>(id)) {} + else if(rdx->process_queued_command>(id)) {} else throw runtime_error("Command pointer not found in any queue!"); } } // --------------------------------- +// Pub/Sub methods +// --------------------------------- + +void Redox::subscribe(const string& topic, + function msg_callback, + function sub_callback, + function unsub_callback, + function err_callback +) { + + // Start pubsub mode. No non-sub/unsub commands can be emitted by this client. + pubsub_mode = true; + + command("SUBSCRIBE " + topic, + [this, topic, msg_callback, sub_callback, unsub_callback](const string &cmd, redisReply* const& reply) { + + if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) { + + // Faster way of checking if a message or sub/unsub notification. + // If the last element is an integer, then it was a sub/unsub notification. + // If the last element is a string, then its a message. + // The goal is to avoid doing a string compare for "message" every message. + if(reply->element[2]->type == REDIS_REPLY_INTEGER) { + + if(!strncmp(reply->element[0]->str, "sub", 3)) { + if(sub_callback) sub_callback(topic); + + } else if(!strncmp(reply->element[0]->str, "uns", 3)) { + if(unsub_callback) unsub_callback(topic); + + } else logger.error() << "Unknown pubsub message: " << reply->element[1]->str; + } + + // Got a message + else if(reply->element[2]->type == REDIS_REPLY_STRING) { + char *msg = reply->element[2]->str; + if (msg && msg_callback) msg_callback(topic, reply->element[2]->str); + } + } else { + logger.error() << "Subscribe command got reply other than a 3-element array."; + } + }, + [topic, err_callback](const string &cmd, int status) { + if(err_callback) err_callback(topic, status); + }, + 1e10 // To keep the command around for a few hundred years + ); +} + +void Redox::unsubscribe(const string& topic, + function err_callback +) { + command("UNSUBSCRIBE " + topic, + nullptr, + [topic, err_callback](const string& cmd, int status) { + if(err_callback) err_callback(topic, status); + } + ); +} + +void Redox::publish(const string& topic, const string& msg, + function pub_callback, + function err_callback +) { + command("PUBLISH " + topic + " " + msg, + [topic, msg, pub_callback](const string& command, redisReply* const& reply) { + if(pub_callback) pub_callback(topic, msg); + }, + [topic, err_callback](const string& command, int status) { + if(err_callback) err_callback(topic, status); + } + ); +} + +/** +* Throw an exception for any non-pubsub commands. +*/ +void Redox::deny_non_pubsub(const std::string& cmd) { + + std::string cmd_name = cmd.substr(0, cmd.find(' ')); + + // Compare with the command's first 5 characters + if(!cmd_name.compare("SUBSCRIBE") || !cmd_name.compare("UNSUBSCRIBE") || + !cmd_name.compare("PSUBSCRIBE") || !cmd_name.compare("PUNSUBSCRIBE")) { + } else { + throw std::runtime_error("In pub/sub mode, this Redox instance can only issue " + "[p]subscribe/[p]unsubscribe commands! Use another instance for other commands."); + } +} + +// --------------------------------- // get_command_map specializations // --------------------------------- @@ -408,11 +512,11 @@ string Redox::get(const string& key) { return reply; }; -bool Redox::set(const std::string& key, const std::string& value) { +bool Redox::set(const string& key, const string& value) { return command_blocking("SET " + key + " " + value); } -bool Redox::del(const std::string& key) { +bool Redox::del(const string& key) { return command_blocking("DEL " + key); } diff --git a/src/redox.hpp b/src/redox.hpp index d3fce17..a5a9a50 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -172,10 +172,44 @@ public: */ bool del(const std::string& key); - // TODO pub/sub -// void publish(std::string channel, std::string msg); -// void subscribe(std::string channel, std::function callback); -// void unsubscribe(std::string channel); + // This is activated when subscribe is called. When active, + // all commands other than [P]SUBSCRIBE, [P]UNSUBSCRIBE + // throw exceptions + std::atomic_bool pubsub_mode = {false}; + + /** + * Subscribe to a topic. + * + * msg_callback: invoked whenever a message is received. + * sub_callback: invoked when successfully subscribed + * err_callback: invoked on some error state + */ + void subscribe(const std::string& topic, + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr + ); + + /** + * Publish to a topic. All subscribers will be notified. + * + * pub_callback: invoked when successfully published + * err_callback: invoked on some error state + */ + void publish(const std::string& topic, const std::string& msg, + std::function pub_callback = nullptr, + std::function err_callback = nullptr + ); + + /** + * Unsubscribe from a topic. + * + * err_callback: invoked on some error state + */ + void unsubscribe(const std::string& topic, + std::function err_callback = nullptr + ); // Invoked by Command objects when they are completed template @@ -212,8 +246,9 @@ private: // Dynamically allocated libev event loop struct ev_loop* evloop; - // Asynchronous watcher (for processing commands) - ev_async async_w; + // Asynchronous watchers + ev_async async_w; // For processing commands + ev_async async_stop; // For breaking the loop // Number of commands processed std::atomic_long cmd_count = {0}; @@ -277,10 +312,13 @@ private: template static void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents); + + void deny_non_pubsub(const std::string& cmd); }; // --------------------------- + template Command* Redox::command( const std::string& cmd, @@ -295,6 +333,11 @@ Command* Redox::command( throw std::runtime_error("[ERROR] Need to start Redox before running commands!"); } + // Block if pubsub mode + if(pubsub_mode) { + deny_non_pubsub(cmd); + } + commands_created += 1; auto* c = new Command(this, commands_created, cmd, callback, error_callback, repeat, after, free_memory, logger);