Commit 8b1dda93975f91a837c5920d933db429f8b2b198

Authored by Hayk Martirosyan
1 parent 89325318

Full pubsub implementation

subscribe/unsubscribe/publish with proper and working callbacks. Guard
added that throws an exception if a non-pubsub command is issued after a
subscribe.

Also removed all std:: prefixes from redox.cpp. Just made the decision
for "using namespace std" there for readability. Not a header, of
course.

Also added another async watcher for breaking the loop, since ev_break
doesn't actually do anything when called outside of an ev_run callback.
CMakeLists.txt
@@ -89,4 +89,7 @@ if (examples) @@ -89,4 +89,7 @@ if (examples)
89 add_executable(binary_data examples/binary_data.cpp ${SRC_ALL}) 89 add_executable(binary_data examples/binary_data.cpp ${SRC_ALL})
90 target_link_libraries(binary_data ${LIB_REDIS}) 90 target_link_libraries(binary_data ${LIB_REDIS})
91 91
  92 + add_executable(pub_sub examples/pub_sub.cpp ${SRC_ALL})
  93 + target_link_libraries(pub_sub ${LIB_REDIS})
  94 +
92 endif() 95 endif()
examples/pub_sub.cpp 0 → 100644
  1 +#include <stdio.h>
  2 +#include <stdlib.h>
  3 +#include <string.h>
  4 +#include <signal.h>
  5 +#include "hiredis/hiredis.h"
  6 +#include "hiredis/async.h"
  7 +#include "hiredis/adapters/libev.h"
  8 +#include <iostream>
  9 +#include "../src/redox.hpp"
  10 +
  11 +using namespace std;
  12 +
  13 +int main(int argc, char *argv[]) {
  14 +
  15 + redox::Redox rdx; // Initialize Redox (default host/port)
  16 + if (!rdx.start()) return 1; // Start the event loop
  17 +
  18 + auto got_message = [](const string& topic, const string& msg) {
  19 + cout << topic << ": " << msg << endl;
  20 + };
  21 +
  22 + auto subscribed = [](const string& topic) {
  23 + cout << "> Subscribed to " << topic << endl;
  24 + };
  25 +
  26 + auto unsubscribed = [](const string& topic) {
  27 + cout << "> Unsubscribed from " << topic << endl;
  28 + };
  29 +
  30 + rdx.subscribe("news", got_message, subscribed, unsubscribed);
  31 + rdx.subscribe("sports", got_message, subscribed, unsubscribed);
  32 +
  33 + redox::Redox rdx_pub;
  34 + if(!rdx_pub.start()) return 1;
  35 +
  36 + rdx_pub.publish("news", "hello!");
  37 + rdx_pub.publish("news", "whatup");
  38 + rdx_pub.publish("sports", "yo");
  39 +
  40 + this_thread::sleep_for(chrono::seconds(10));
  41 + rdx.unsubscribe("sports");
  42 + rdx_pub.publish("sports", "yo");
  43 + rdx_pub.publish("news", "whatup");
  44 +
  45 + this_thread::sleep_for(chrono::seconds(10));
  46 + rdx.unsubscribe("news");
  47 + rdx_pub.publish("sports", "yo");
  48 + rdx_pub.publish("news", "whatup", [](const string& topic, const string& msg) {
  49 + cout << "published to " << topic << ": " << msg << endl;
  50 + });
  51 +
  52 + rdx.block();
  53 +}
src/redox.cpp
@@ -4,6 +4,7 @@ @@ -4,6 +4,7 @@
4 4
5 #include <signal.h> 5 #include <signal.h>
6 #include "redox.hpp" 6 #include "redox.hpp"
  7 +#include <string.h>
7 8
8 using namespace std; 9 using namespace std;
9 10
@@ -72,8 +73,8 @@ void Redox::init_hiredis() { @@ -72,8 +73,8 @@ void Redox::init_hiredis() {
72 73
73 Redox::Redox( 74 Redox::Redox(
74 const string& host, const int port, 75 const string& host, const int port,
75 - std::function<void(int)> connection_callback,  
76 - std::ostream& log_stream, 76 + function<void(int)> connection_callback,
  77 + ostream& log_stream,
77 log::Level log_level 78 log::Level log_level
78 ) : host(host), port(port), 79 ) : host(host), port(port),
79 logger(log_stream, log_level), 80 logger(log_stream, log_level),
@@ -88,9 +89,9 @@ Redox::Redox( @@ -88,9 +89,9 @@ Redox::Redox(
88 } 89 }
89 90
90 Redox::Redox( 91 Redox::Redox(
91 - const std::string& path,  
92 - std::function<void(int)> connection_callback,  
93 - std::ostream& log_stream, 92 + const string& path,
  93 + function<void(int)> connection_callback,
  94 + ostream& log_stream,
94 log::Level log_level 95 log::Level log_level
95 ) : host(), port(), path(path), logger(log_stream, log_level), 96 ) : host(), port(), path(path), logger(log_stream, log_level),
96 user_connection_callback(connection_callback) { 97 user_connection_callback(connection_callback) {
@@ -103,6 +104,10 @@ Redox::Redox( @@ -103,6 +104,10 @@ Redox::Redox(
103 init_hiredis(); 104 init_hiredis();
104 } 105 }
105 106
  107 +void break_event_loop(struct ev_loop* loop, ev_async* async, int revents) {
  108 + ev_break(loop, EVBREAK_ALL);
  109 +}
  110 +
106 void Redox::run_event_loop() { 111 void Redox::run_event_loop() {
107 112
108 // Events to connect to Redox 113 // Events to connect to Redox
@@ -124,11 +129,16 @@ void Redox::run_event_loop() { @@ -124,11 +129,16 @@ void Redox::run_event_loop() {
124 ev_async_init(&async_w, process_queued_commands); 129 ev_async_init(&async_w, process_queued_commands);
125 ev_async_start(evloop, &async_w); 130 ev_async_start(evloop, &async_w);
126 131
  132 + // Set up an async watcher to break the loop
  133 + ev_async_init(&async_stop, break_event_loop);
  134 + ev_async_start(evloop, &async_stop);
  135 +
127 running = true; 136 running = true;
128 running_waiter.notify_one(); 137 running_waiter.notify_one();
129 138
130 // Run the event loop 139 // Run the event loop
131 while (!to_exit) { 140 while (!to_exit) {
  141 +// logger.info() << "Event loop running";
132 ev_run(evloop, EVRUN_NOWAIT); 142 ev_run(evloop, EVRUN_NOWAIT);
133 } 143 }
134 144
@@ -170,7 +180,8 @@ bool Redox::start() { @@ -170,7 +180,8 @@ bool Redox::start() {
170 180
171 void Redox::stop_signal() { 181 void Redox::stop_signal() {
172 to_exit = true; 182 to_exit = true;
173 - ev_break(evloop, EVBREAK_ALL); 183 + logger.debug() << "stop_signal() called, breaking event loop";
  184 + ev_async_send(evloop, &async_stop);
174 } 185 }
175 186
176 void Redox::block() { 187 void Redox::block() {
@@ -241,6 +252,8 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { @@ -241,6 +252,8 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) {
241 */ 252 */
242 template<class ReplyT> 253 template<class ReplyT>
243 bool Redox::submit_to_server(Command<ReplyT>* c) { 254 bool Redox::submit_to_server(Command<ReplyT>* c) {
  255 +
  256 + Redox* rdx = c->rdx;
244 c->pending++; 257 c->pending++;
245 258
246 // Process binary data if trailing quotation. This is a limited implementation 259 // Process binary data if trailing quotation. This is a limited implementation
@@ -257,8 +270,8 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) { @@ -257,8 +270,8 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) {
257 270
258 string format = c->cmd.substr(0, first) + "%b"; 271 string format = c->cmd.substr(0, first) + "%b";
259 string value = c->cmd.substr(first+1, last-first-1); 272 string value = c->cmd.substr(first+1, last-first-1);
260 - if (redisAsyncCommand(c->rdx->ctx, command_callback<ReplyT>, (void*)c->id, format.c_str(), value.c_str(), value.size()) != REDIS_OK) {  
261 - c->rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr; 273 + if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id, format.c_str(), value.c_str(), value.size()) != REDIS_OK) {
  274 + rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << rdx->ctx->errstr;
262 c->invoke_error(REDOX_SEND_ERROR); 275 c->invoke_error(REDOX_SEND_ERROR);
263 return false; 276 return false;
264 } 277 }
@@ -266,8 +279,8 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) { @@ -266,8 +279,8 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) {
266 } 279 }
267 } 280 }
268 281
269 - if (redisAsyncCommand(c->rdx->ctx, command_callback<ReplyT>, (void*)c->id, c->cmd.c_str()) != REDIS_OK) {  
270 - c->rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr; 282 + if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id, c->cmd.c_str()) != REDIS_OK) {
  283 + rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << rdx->ctx->errstr;
271 c->invoke_error(REDOX_SEND_ERROR); 284 c->invoke_error(REDOX_SEND_ERROR);
272 return false; 285 return false;
273 } 286 }
@@ -339,19 +352,110 @@ void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int r @@ -339,19 +352,110 @@ void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int r
339 rdx->command_queue.pop(); 352 rdx->command_queue.pop();
340 353
341 if(rdx->process_queued_command<redisReply*>(id)) {} 354 if(rdx->process_queued_command<redisReply*>(id)) {}
342 - else if(rdx->process_queued_command<std::string>(id)) {} 355 + else if(rdx->process_queued_command<string>(id)) {}
343 else if(rdx->process_queued_command<char*>(id)) {} 356 else if(rdx->process_queued_command<char*>(id)) {}
344 else if(rdx->process_queued_command<int>(id)) {} 357 else if(rdx->process_queued_command<int>(id)) {}
345 else if(rdx->process_queued_command<long long int>(id)) {} 358 else if(rdx->process_queued_command<long long int>(id)) {}
346 - else if(rdx->process_queued_command<std::nullptr_t>(id)) {}  
347 - else if(rdx->process_queued_command<std::vector<std::string>>(id)) {}  
348 - else if(rdx->process_queued_command<std::set<std::string>>(id)) {}  
349 - else if(rdx->process_queued_command<std::unordered_set<std::string>>(id)) {} 359 + else if(rdx->process_queued_command<nullptr_t>(id)) {}
  360 + else if(rdx->process_queued_command<vector<string>>(id)) {}
  361 + else if(rdx->process_queued_command<std::set<string>>(id)) {}
  362 + else if(rdx->process_queued_command<unordered_set<string>>(id)) {}
350 else throw runtime_error("Command pointer not found in any queue!"); 363 else throw runtime_error("Command pointer not found in any queue!");
351 } 364 }
352 } 365 }
353 366
354 // --------------------------------- 367 // ---------------------------------
  368 +// Pub/Sub methods
  369 +// ---------------------------------
  370 +
  371 +void Redox::subscribe(const string& topic,
  372 + function<void(const string& topic, const string& message)> msg_callback,
  373 + function<void(const string& topic)> sub_callback,
  374 + function<void(const string& topic)> unsub_callback,
  375 + function<void(const string& topic, int status)> err_callback
  376 +) {
  377 +
  378 + // Start pubsub mode. No non-sub/unsub commands can be emitted by this client.
  379 + pubsub_mode = true;
  380 +
  381 + command<redisReply*>("SUBSCRIBE " + topic,
  382 + [this, topic, msg_callback, sub_callback, unsub_callback](const string &cmd, redisReply* const& reply) {
  383 +
  384 + if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) {
  385 +
  386 + // Faster way of checking if a message or sub/unsub notification.
  387 + // If the last element is an integer, then it was a sub/unsub notification.
  388 + // If the last element is a string, then its a message.
  389 + // The goal is to avoid doing a string compare for "message" every message.
  390 + if(reply->element[2]->type == REDIS_REPLY_INTEGER) {
  391 +
  392 + if(!strncmp(reply->element[0]->str, "sub", 3)) {
  393 + if(sub_callback) sub_callback(topic);
  394 +
  395 + } else if(!strncmp(reply->element[0]->str, "uns", 3)) {
  396 + if(unsub_callback) unsub_callback(topic);
  397 +
  398 + } else logger.error() << "Unknown pubsub message: " << reply->element[1]->str;
  399 + }
  400 +
  401 + // Got a message
  402 + else if(reply->element[2]->type == REDIS_REPLY_STRING) {
  403 + char *msg = reply->element[2]->str;
  404 + if (msg && msg_callback) msg_callback(topic, reply->element[2]->str);
  405 + }
  406 + } else {
  407 + logger.error() << "Subscribe command got reply other than a 3-element array.";
  408 + }
  409 + },
  410 + [topic, err_callback](const string &cmd, int status) {
  411 + if(err_callback) err_callback(topic, status);
  412 + },
  413 + 1e10 // To keep the command around for a few hundred years
  414 + );
  415 +}
  416 +
  417 +void Redox::unsubscribe(const string& topic,
  418 + function<void(const string& topic, int status)> err_callback
  419 +) {
  420 + command<redisReply*>("UNSUBSCRIBE " + topic,
  421 + nullptr,
  422 + [topic, err_callback](const string& cmd, int status) {
  423 + if(err_callback) err_callback(topic, status);
  424 + }
  425 + );
  426 +}
  427 +
  428 +void Redox::publish(const string& topic, const string& msg,
  429 + function<void(const string& topic, const string& msg)> pub_callback,
  430 + function<void(const string& topic, int status)> err_callback
  431 +) {
  432 + command<redisReply*>("PUBLISH " + topic + " " + msg,
  433 + [topic, msg, pub_callback](const string& command, redisReply* const& reply) {
  434 + if(pub_callback) pub_callback(topic, msg);
  435 + },
  436 + [topic, err_callback](const string& command, int status) {
  437 + if(err_callback) err_callback(topic, status);
  438 + }
  439 + );
  440 +}
  441 +
  442 +/**
  443 +* Throw an exception for any non-pubsub commands.
  444 +*/
  445 +void Redox::deny_non_pubsub(const std::string& cmd) {
  446 +
  447 + std::string cmd_name = cmd.substr(0, cmd.find(' '));
  448 +
  449 + // Compare with the command's first 5 characters
  450 + if(!cmd_name.compare("SUBSCRIBE") || !cmd_name.compare("UNSUBSCRIBE") ||
  451 + !cmd_name.compare("PSUBSCRIBE") || !cmd_name.compare("PUNSUBSCRIBE")) {
  452 + } else {
  453 + throw std::runtime_error("In pub/sub mode, this Redox instance can only issue "
  454 + "[p]subscribe/[p]unsubscribe commands! Use another instance for other commands.");
  455 + }
  456 +}
  457 +
  458 +// ---------------------------------
355 // get_command_map specializations 459 // get_command_map specializations
356 // --------------------------------- 460 // ---------------------------------
357 461
@@ -408,11 +512,11 @@ string Redox::get(const string&amp; key) { @@ -408,11 +512,11 @@ string Redox::get(const string&amp; key) {
408 return reply; 512 return reply;
409 }; 513 };
410 514
411 -bool Redox::set(const std::string& key, const std::string& value) { 515 +bool Redox::set(const string& key, const string& value) {
412 return command_blocking("SET " + key + " " + value); 516 return command_blocking("SET " + key + " " + value);
413 } 517 }
414 518
415 -bool Redox::del(const std::string& key) { 519 +bool Redox::del(const string& key) {
416 return command_blocking("DEL " + key); 520 return command_blocking("DEL " + key);
417 } 521 }
418 522
src/redox.hpp
@@ -172,10 +172,44 @@ public: @@ -172,10 +172,44 @@ public:
172 */ 172 */
173 bool del(const std::string& key); 173 bool del(const std::string& key);
174 174
175 - // TODO pub/sub  
176 -// void publish(std::string channel, std::string msg);  
177 -// void subscribe(std::string channel, std::function<void(std::string channel, std::string msg)> callback);  
178 -// void unsubscribe(std::string channel); 175 + // This is activated when subscribe is called. When active,
  176 + // all commands other than [P]SUBSCRIBE, [P]UNSUBSCRIBE
  177 + // throw exceptions
  178 + std::atomic_bool pubsub_mode = {false};
  179 +
  180 + /**
  181 + * Subscribe to a topic.
  182 + *
  183 + * msg_callback: invoked whenever a message is received.
  184 + * sub_callback: invoked when successfully subscribed
  185 + * err_callback: invoked on some error state
  186 + */
  187 + void subscribe(const std::string& topic,
  188 + std::function<void(const std::string& topic, const std::string& message)> msg_callback,
  189 + std::function<void(const std::string& topic)> sub_callback = nullptr,
  190 + std::function<void(const std::string& topic)> unsub_callback = nullptr,
  191 + std::function<void(const std::string& topic, int status)> err_callback = nullptr
  192 + );
  193 +
  194 + /**
  195 + * Publish to a topic. All subscribers will be notified.
  196 + *
  197 + * pub_callback: invoked when successfully published
  198 + * err_callback: invoked on some error state
  199 + */
  200 + void publish(const std::string& topic, const std::string& msg,
  201 + std::function<void(const std::string& topic, const std::string& msg)> pub_callback = nullptr,
  202 + std::function<void(const std::string& topic, int status)> err_callback = nullptr
  203 + );
  204 +
  205 + /**
  206 + * Unsubscribe from a topic.
  207 + *
  208 + * err_callback: invoked on some error state
  209 + */
  210 + void unsubscribe(const std::string& topic,
  211 + std::function<void(const std::string& topic, int status)> err_callback = nullptr
  212 + );
179 213
180 // Invoked by Command objects when they are completed 214 // Invoked by Command objects when they are completed
181 template<class ReplyT> 215 template<class ReplyT>
@@ -212,8 +246,9 @@ private: @@ -212,8 +246,9 @@ private:
212 // Dynamically allocated libev event loop 246 // Dynamically allocated libev event loop
213 struct ev_loop* evloop; 247 struct ev_loop* evloop;
214 248
215 - // Asynchronous watcher (for processing commands)  
216 - ev_async async_w; 249 + // Asynchronous watchers
  250 + ev_async async_w; // For processing commands
  251 + ev_async async_stop; // For breaking the loop
217 252
218 // Number of commands processed 253 // Number of commands processed
219 std::atomic_long cmd_count = {0}; 254 std::atomic_long cmd_count = {0};
@@ -277,10 +312,13 @@ private: @@ -277,10 +312,13 @@ private:
277 312
278 template<class ReplyT> 313 template<class ReplyT>
279 static void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents); 314 static void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents);
  315 +
  316 + void deny_non_pubsub(const std::string& cmd);
280 }; 317 };
281 318
282 // --------------------------- 319 // ---------------------------
283 320
  321 +
284 template<class ReplyT> 322 template<class ReplyT>
285 Command<ReplyT>* Redox::command( 323 Command<ReplyT>* Redox::command(
286 const std::string& cmd, 324 const std::string& cmd,
@@ -295,6 +333,11 @@ Command&lt;ReplyT&gt;* Redox::command( @@ -295,6 +333,11 @@ Command&lt;ReplyT&gt;* Redox::command(
295 throw std::runtime_error("[ERROR] Need to start Redox before running commands!"); 333 throw std::runtime_error("[ERROR] Need to start Redox before running commands!");
296 } 334 }
297 335
  336 + // Block if pubsub mode
  337 + if(pubsub_mode) {
  338 + deny_non_pubsub(cmd);
  339 + }
  340 +
298 commands_created += 1; 341 commands_created += 1;
299 auto* c = new Command<ReplyT>(this, commands_created, cmd, 342 auto* c = new Command<ReplyT>(this, commands_created, cmd,
300 callback, error_callback, repeat, after, free_memory, logger); 343 callback, error_callback, repeat, after, free_memory, logger);