From b4f96a55f52e5a064ad3722cd58754ca25388763 Mon Sep 17 00:00:00 2001 From: Hayk Martirosyan Date: Mon, 26 Jan 2015 17:25:28 -0800 Subject: [PATCH] Improve client API, split off Subscriber class --- CMakeLists.txt | 1 + examples/basic.cpp | 3 +-- examples/basic_threaded.cpp | 4 +--- examples/binary_data.cpp | 7 +++---- examples/data_types.cpp | 8 ++++---- examples/lpush_benchmark.cpp | 9 +++------ examples/multi-client.cpp | 6 +----- examples/pub_sub.cpp | 48 ++++++++++++++++++++++++------------------------ examples/speed_test_async.cpp | 9 ++++----- examples/speed_test_async_multi.cpp | 12 +++++------- examples/speed_test_pubsub.cpp | 15 +++++++++------ examples/speed_test_sync.cpp | 8 +++----- src/command.hpp | 2 +- src/redox.cpp | 445 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/redox.hpp | 312 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/subscriber.cpp | 163 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/subscriber.hpp | 135 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ test/test.cpp | 43 +++++++++++++++++-------------------------- 18 files changed, 618 insertions(+), 612 deletions(-) create mode 100644 src/subscriber.cpp create mode 100644 src/subscriber.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index e13002a..ba78727 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,6 +24,7 @@ set(SRC_DIR ${CMAKE_SOURCE_DIR}/src) set(SRC_CORE ${SRC_DIR}/redox.cpp ${SRC_DIR}/command.cpp + ${SRC_DIR}/subscriber.cpp ) set(SRC_LOGGER ${SRC_DIR}/utils/logger.cpp) diff --git a/examples/basic.cpp b/examples/basic.cpp index b6b78e7..866880c 100644 --- a/examples/basic.cpp +++ b/examples/basic.cpp @@ -13,7 +13,7 @@ int main(int argc, char* argv[]) { Redox rdx = {"localhost", 6379, nullptr, cout, redox::log::Info}; // Initialize Redox - if(!rdx.start()) return 1; // Start the event loop + if(!rdx.connect()) return 1; // Start the event loop rdx.del("occupation"); @@ -22,6 +22,5 @@ int main(int argc, char* argv[]) { cout << "key = \"occupation\", value = \"" << rdx.get("occupation") << "\"" << endl; - rdx.stop(); // Shut down the event loop return 0; } diff --git a/examples/basic_threaded.cpp b/examples/basic_threaded.cpp index c13e824..51ae5bd 100644 --- a/examples/basic_threaded.cpp +++ b/examples/basic_threaded.cpp @@ -15,7 +15,7 @@ redox::Redox rdx = {"localhost", 6379}; int main(int argc, char* argv[]) { - if(!rdx.start()) return 1; + if(!rdx.connect()) return 1; thread setter([]() { for(int i = 0; i < 5000; i++) { @@ -41,7 +41,5 @@ int main(int argc, char* argv[]) { setter.join(); getter.join(); - rdx.stop(); - return 0; }; diff --git a/examples/binary_data.cpp b/examples/binary_data.cpp index 33ed426..d6d7c9b 100644 --- a/examples/binary_data.cpp +++ b/examples/binary_data.cpp @@ -23,18 +23,18 @@ std::string random_string(size_t length) { int main(int argc, char* argv[]) { redox::Redox rdx = {"localhost", 6379}; // Initialize Redox - if(!rdx.start()) return 1; // Start the event loop + if(!rdx.connect()) return 1; // Start the event loop rdx.del("binary"); string binary_data = random_string(10000); - auto& c = rdx.command_blocking("SET binary \"" + binary_data + "\""); + auto& c = rdx.commandSync("SET binary \"" + binary_data + "\""); if(c.ok()) cout << "Reply: " << c.reply() << endl; else cerr << "Failed to set key! Status: " << c.status() << endl; c.free(); - auto& c2 = rdx.command_blocking("GET binary"); + auto& c2 = rdx.commandSync("GET binary"); if(c2.ok()) { if(c2.reply() == binary_data) cout << "Binary data matches!" << endl; else cerr << "Binary data differs!" << endl; @@ -42,6 +42,5 @@ int main(int argc, char* argv[]) { else cerr << "Failed to get key! Status: " << c2.status() << endl; c2.free(); - rdx.stop(); // Shut down the event loop return 0; } diff --git a/examples/data_types.cpp b/examples/data_types.cpp index 4c0d88e..1c17c6e 100644 --- a/examples/data_types.cpp +++ b/examples/data_types.cpp @@ -15,11 +15,11 @@ using redox::Command; int main(int argc, char* argv[]) { redox::Redox rdx; // Initialize Redox (default host/port) - if(!rdx.start()) return 1; // Start the event loop + if(!rdx.connect()) return 1; // Start the event loop rdx.del("mylist"); - rdx.command_blocking("LPUSH mylist 1 2 3 4 5 6 7 8 9 10"); + rdx.commandSync("LPUSH mylist 1 2 3 4 5 6 7 8 9 10"); rdx.command>("LRANGE mylist 0 4", [](Command>& c){ @@ -46,10 +46,10 @@ int main(int argc, char* argv[]) { for (const string& s : c.reply()) cout << s << " "; cout << endl; } - rdx.stop_signal(); + rdx.disconnect(); } ); - rdx.block(); // Shut down the event loop + rdx.wait(); return 0; } diff --git a/examples/lpush_benchmark.cpp b/examples/lpush_benchmark.cpp index 226d7f0..65bd2e4 100644 --- a/examples/lpush_benchmark.cpp +++ b/examples/lpush_benchmark.cpp @@ -17,7 +17,7 @@ double time_s() { int main(int argc, char* argv[]) { redox::Redox rdx; - if(!rdx.start()) return 1; + if(!rdx.connect()) return 1; rdx.del("test"); @@ -43,15 +43,12 @@ int main(int argc, char* argv[]) { cout << "Total time: " << t2 - t0 << "s" << endl; cout << "Result: " << (double)len / (t2-t0) << " commands/s" << endl; - rdx.stop_signal(); + rdx.disconnect(); } }); } t1 = time_s(); - rdx.block(); - - cout << "Commands processed: " << rdx.num_commands_processed() << endl; - + rdx.wait(); return 0; }; diff --git a/examples/multi-client.cpp b/examples/multi-client.cpp index 7d186db..a35882b 100644 --- a/examples/multi-client.cpp +++ b/examples/multi-client.cpp @@ -13,7 +13,7 @@ int main(int argc, char* argv[]) { redox::Redox rdx1, rdx2, rdx3; - if(!rdx1.start() || !rdx2.start() || !rdx3.start()) return 1; + if(!rdx1.connect() || !rdx2.connect() || !rdx3.connect()) return 1; rdx1.del("occupation"); @@ -22,9 +22,5 @@ int main(int argc, char* argv[]) { cout << "key = occupation, value = \"" << rdx3.get("occupation") << "\"" << endl; - rdx1.stop(); - rdx2.stop(); - rdx3.stop(); - return 0; } diff --git a/examples/pub_sub.cpp b/examples/pub_sub.cpp index 6301539..1b1ef20 100644 --- a/examples/pub_sub.cpp +++ b/examples/pub_sub.cpp @@ -1,16 +1,17 @@ #include #include #include "../src/redox.hpp" +#include "../src/subscriber.hpp" using namespace std; int main(int argc, char *argv[]) { - redox::Redox rdx; // Initialize Redox (default host/port) - if (!rdx.start()) return 1; // Start the event loop + redox::Redox publisher; // Initialize Redox (default host/port) + if (!publisher.connect()) return 1; // Start the event loop - redox::Redox rdx_pub; - if(!rdx_pub.start()) return 1; + redox::Subscriber subscriber; + if(!subscriber.connect()) return 1; auto got_message = [](const string& topic, const string& msg) { cout << topic << ": " << msg << endl; @@ -24,31 +25,30 @@ int main(int argc, char *argv[]) { cout << "> Unsubscribed from " << topic << endl; }; - rdx.psubscribe("news", got_message, subscribed, unsubscribed); - rdx.subscribe("sports", got_message, subscribed, unsubscribed); + subscriber.psubscribe("news", got_message, subscribed, unsubscribed); + subscriber.subscribe("sports", got_message, subscribed, unsubscribed); - this_thread::sleep_for(chrono::milliseconds(20)); - for(auto s : rdx.subscribed_topics()) cout << "topic: " << s << endl; + this_thread::sleep_for(chrono::milliseconds(10)); - rdx_pub.publish("news", "hello!"); - rdx_pub.publish("news", "whatup"); - rdx_pub.publish("sports", "yo"); + publisher.publish("news", "one"); + publisher.publish("news", "two", [](const string& topic, const string& msg) { + cout << "published to " << topic << ": " << msg << endl; + }); + publisher.publish("sports", "three"); - this_thread::sleep_for(chrono::seconds(1)); - rdx.unsubscribe("sports"); - rdx_pub.publish("sports", "yo"); - rdx_pub.publish("news", "whatup"); + this_thread::sleep_for(chrono::milliseconds(10)); + subscriber.unsubscribe("sports"); + publisher.publish("sports", "\"UH OH\""); + publisher.publish("news", "four"); - this_thread::sleep_for(chrono::milliseconds(1)); - rdx.punsubscribe("news"); + this_thread::sleep_for(chrono::milliseconds(10)); + subscriber.punsubscribe("news"); + this_thread::sleep_for(chrono::milliseconds(10)); - rdx_pub.publish("sports", "yo"); - rdx_pub.publish("news", "whatup", [](const string& topic, const string& msg) { - cout << "published to " << topic << ": " << msg << endl; - }); - rdx_pub.publish("news", "whatup"); - rdx.block(); - rdx_pub.block(); + publisher.publish("sports", "\"UH OH\""); + publisher.publish("news", "\"UH OH\""); + + this_thread::sleep_for(chrono::milliseconds(10)); return 0; } diff --git a/examples/speed_test_async.cpp b/examples/speed_test_async.cpp index c17a66a..890289c 100644 --- a/examples/speed_test_async.cpp +++ b/examples/speed_test_async.cpp @@ -19,9 +19,9 @@ double time_s() { int main(int argc, char* argv[]) { Redox rdx = {"/var/run/redis/redis.sock", nullptr}; - if(!rdx.start()) return 1; + if(!rdx.connect()) return 1; - bool status = rdx.command_blocking("SET simple_loop:count 0"); + bool status = rdx.commandSync("SET simple_loop:count 0"); if(status) { cout << "Reset the counter to zero." << endl; } else { @@ -40,10 +40,10 @@ int main(int argc, char* argv[]) { double t0 = time_s(); atomic_int count(0); - Command& cmd = rdx.command_looping( + Command& cmd = rdx.commandLoop( cmd_str, [&count, &rdx](Command& c) { - if(!c.ok()) { + if (!c.ok()) { cerr << "Bad reply: " << c.status() << endl; } count++; @@ -65,6 +65,5 @@ int main(int argc, char* argv[]) { cout << "Final value of counter: " << final_count << endl; - rdx.stop(); return 0; } diff --git a/examples/speed_test_async_multi.cpp b/examples/speed_test_async_multi.cpp index 3ff8aa6..9cd7652 100644 --- a/examples/speed_test_async_multi.cpp +++ b/examples/speed_test_async_multi.cpp @@ -20,7 +20,7 @@ double time_s() { int main(int argc, char* argv[]) { Redox rdx = {"localhost", 6379}; - if(!rdx.start()) return 1; + if(!rdx.connect()) return 1; if(rdx.set("simple_loop:count", "0")) { cout << "Reset the counter to zero." << endl; @@ -43,15 +43,15 @@ int main(int argc, char* argv[]) { vector*> commands; for(int i = 0; i < parallel; i++) { - commands.push_back(&rdx.command_looping( - cmd_str, + commands.push_back(&rdx.commandLoop( + cmd_str, [&count, &rdx](Command& c) { - if(!c.ok()) { + if (!c.ok()) { cerr << "Bad reply: " << c.status() << endl; } count++; }, - dt + dt )); } @@ -65,8 +65,6 @@ int main(int argc, char* argv[]) { // Get the final value of the counter long final_count = stol(rdx.get("simple_loop:count")); - rdx.stop(); - cout << "Sent " << count << " commands in " << t_elapsed << "s, " << "that's " << actual_freq << " commands/s." << endl; diff --git a/examples/speed_test_pubsub.cpp b/examples/speed_test_pubsub.cpp index 4ae726c..3903112 100644 --- a/examples/speed_test_pubsub.cpp +++ b/examples/speed_test_pubsub.cpp @@ -1,9 +1,11 @@ #include #include "../src/redox.hpp" +#include "../src/subscriber.hpp" using namespace std; using redox::Redox; using redox::Command; +using redox::Subscriber; double time_s() { unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); @@ -13,10 +15,10 @@ double time_s() { int main(int argc, char *argv[]) { Redox rdx_pub; - Redox rdx_sub; + Subscriber rdx_sub; - if(!rdx_pub.start()) return 1; - if(!rdx_sub.start()) return 1; + if(!rdx_pub.connect()) return 1; + if(!rdx_sub.connect()) return 1; atomic_int count(0); auto got_message = [&count](const string& topic, const string& msg) { @@ -41,12 +43,13 @@ int main(int argc, char *argv[]) { rdx_pub.publish("speedtest", "hello"); t1 = time_s(); } - this_thread::sleep_for(chrono::milliseconds(1000)); - rdx_pub.stop(); - rdx_sub.stop(); + + this_thread::sleep_for(chrono::milliseconds(10)); double t = t1 - t0; cout << "Total of messages sent in " << t << "s is " << count << endl; double msg_per_s = count / t; cout << "Messages per second: " << msg_per_s << endl; + + return 0; } diff --git a/examples/speed_test_sync.cpp b/examples/speed_test_sync.cpp index 4cd152c..3674c20 100644 --- a/examples/speed_test_sync.cpp +++ b/examples/speed_test_sync.cpp @@ -18,9 +18,9 @@ double time_s() { int main(int argc, char* argv[]) { Redox rdx = {"localhost", 6379}; - if(!rdx.start()) return 1; + if(!rdx.connect()) return 1; - if(rdx.command_blocking("SET simple_loop:count 0")) { + if(rdx.commandSync("SET simple_loop:count 0")) { cout << "Reset the counter to zero." << endl; } else { cerr << "Failed to reset counter." << endl; @@ -37,7 +37,7 @@ int main(int argc, char* argv[]) { int count = 0; while(time_s() < t_end) { - Command& c = rdx.command_blocking(cmd_str); + Command& c = rdx.commandSync(cmd_str); if(!c.ok()) cerr << "Bad reply, code: " << c.status() << endl; c.free(); count++; @@ -48,8 +48,6 @@ int main(int argc, char* argv[]) { long final_count = stol(rdx.get("simple_loop:count")); - rdx.stop(); - cout << "Sent " << count << " commands in " << t_elapsed << "s, " << "that's " << actual_freq << " commands/s." << endl; diff --git a/src/command.hpp b/src/command.hpp index b2466f5..044b0aa 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -54,7 +54,7 @@ public: /** * This method returns once this command's callback has been invoked * (or would have been invoked if there is none) since the last call - * to block(). If it is the first call, then returns once the callback + * to wait(). If it is the first call, then returns once the callback * is invoked for the first time. */ void wait(); diff --git a/src/redox.cpp b/src/redox.cpp index 2ecdbea..f58416b 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -3,72 +3,72 @@ */ #include -#include "redox.hpp" #include +#include "redox.hpp" using namespace std; namespace redox { -void Redox::connected_callback(const redisAsyncContext *ctx, int status) { +void Redox::connectedCallback(const redisAsyncContext* ctx, int status) { Redox* rdx = (Redox*) ctx->data; if (status != REDIS_OK) { - rdx->logger.fatal() << "Could not connect to Redis: " << ctx->errstr; - rdx->connect_state = REDOX_CONNECT_ERROR; + rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr; + rdx->connect_state_ = CONNECT_ERROR; } else { // Disable hiredis automatically freeing reply objects ctx->c.reader->fn->freeObject = [](void *reply) {}; - rdx->connect_state = REDOX_CONNECTED; - rdx->logger.info() << "Connected to Redis."; + rdx->connect_state_ = CONNECTED; + rdx->logger_.info() << "Connected to Redis."; } - rdx->connect_waiter.notify_all(); - if(rdx->user_connection_callback) rdx->user_connection_callback(rdx->connect_state); + rdx->connect_waiter_.notify_all(); + if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_); } -void Redox::disconnected_callback(const redisAsyncContext *ctx, int status) { +void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) { Redox* rdx = (Redox*) ctx->data; if (status != REDIS_OK) { - rdx->logger.error() << "Could not disconnect from Redis: " << ctx->errstr; - rdx->connect_state = REDOX_DISCONNECT_ERROR; + rdx->logger_.error() << "Could not disconnect from Redis: " << ctx->errstr; + rdx->connect_state_ = DISCONNECT_ERROR; } else { - rdx->logger.info() << "Disconnected from Redis as planned."; - rdx->connect_state = REDOX_DISCONNECTED; + rdx->logger_.info() << "Disconnected from Redis as planned."; + rdx->connect_state_ = DISCONNECTED; } - rdx->stop_signal(); - rdx->connect_waiter.notify_all(); - if(rdx->user_connection_callback) rdx->user_connection_callback(rdx->connect_state); + rdx->disconnect(); + rdx->connect_waiter_.notify_all(); + if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_); } void Redox::init_ev() { signal(SIGPIPE, SIG_IGN); - evloop = ev_loop_new(EVFLAG_AUTO); - ev_set_userdata(evloop, (void*)this); // Back-reference + evloop_ = ev_loop_new(EVFLAG_AUTO); + ev_set_userdata(evloop_, (void*)this); // Back-reference } void Redox::init_hiredis() { - ctx->data = (void*)this; // Back-reference + ctx_->data = (void*)this; // Back-reference - if (ctx->err) { - logger.error() << "Could not create a hiredis context: " << ctx->errstr; - connect_state = REDOX_CONNECT_ERROR; - connect_waiter.notify_all(); + if (ctx_->err) { + logger_.error() << "Could not create a hiredis context: " << ctx_->errstr; + connect_state_ = CONNECT_ERROR; + connect_waiter_.notify_all(); return; } // Attach event loop to hiredis - redisLibevAttach(evloop, ctx); + redisLibevAttach(evloop_, ctx_); // Set the callbacks to be invoked on server connection/disconnection - redisAsyncSetConnectCallback(ctx, Redox::connected_callback); - redisAsyncSetDisconnectCallback(ctx, Redox::disconnected_callback); + redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback); + redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback); } Redox::Redox( @@ -76,14 +76,14 @@ Redox::Redox( function connection_callback, ostream& log_stream, log::Level log_level -) : host(host), port(port), - logger(log_stream, log_level), - user_connection_callback(connection_callback) { +) : host_(host), port_(port), + logger_(log_stream, log_level), + user_connection_callback_(connection_callback) { init_ev(); // Connect over TCP - ctx = redisAsyncConnect(host.c_str(), port); + ctx_ = redisAsyncConnect(host.c_str(), port); init_hiredis(); } @@ -93,13 +93,13 @@ Redox::Redox( function connection_callback, ostream& log_stream, log::Level log_level -) : host(), port(), path(path), logger(log_stream, log_level), - user_connection_callback(connection_callback) { +) : host_(), port_(), path_(path), logger_(log_stream, log_level), + user_connection_callback_(connection_callback) { init_ev(); // Connect over unix sockets - ctx = redisAsyncConnectUnix(path.c_str()); + ctx_ = redisAsyncConnectUnix(path.c_str()); init_hiredis(); } @@ -108,142 +108,127 @@ void break_event_loop(struct ev_loop* loop, ev_async* async, int revents) { ev_break(loop, EVBREAK_ALL); } -void Redox::run_event_loop() { +void Redox::runEventLoop() { // Events to connect to Redox - ev_run(evloop, EVRUN_NOWAIT); + ev_run(evloop_, EVRUN_NOWAIT); // Block until connected to Redis, or error - unique_lock ul(connect_lock); - connect_waiter.wait(ul, [this] { return connect_state != REDOX_NOT_YET_CONNECTED; }); + unique_lock ul(connect_lock_); + connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; }); // Handle connection error - if(connect_state != REDOX_CONNECTED) { - logger.warning() << "Did not connect, event loop exiting."; - running_waiter.notify_one(); + if(connect_state_ != CONNECTED) { + logger_.warning() << "Did not connect, event loop exiting."; + running_waiter_.notify_one(); return; } // Set up asynchronous watcher which we signal every // time we add a command - ev_async_init(&async_w, process_queued_commands); - ev_async_start(evloop, &async_w); + ev_async_init(&watcher_command_, proccessQueuedCommands); + ev_async_start(evloop_, &watcher_command_); // Set up an async watcher to break the loop - ev_async_init(&async_stop, break_event_loop); - ev_async_start(evloop, &async_stop); + ev_async_init(&watcher_stop_, break_event_loop); + ev_async_start(evloop_, &watcher_stop_); - running = true; - running_waiter.notify_one(); + running_ = true; + running_waiter_.notify_one(); // Run the event loop - while (!to_exit) { + while (!to_exit_) { // logger.info() << "Event loop running"; - ev_run(evloop, EVRUN_NOWAIT); + ev_run(evloop_, EVRUN_NOWAIT); } - logger.info() << "Stop signal detected."; + logger_.info() << "Stop signal detected. Disconnecting from Redis."; + if(connect_state_ == CONNECTED) redisAsyncDisconnect(ctx_); - // Run a few more times to clear out canceled events + // Run a few more times to disconnect and clear out canceled events for(int i = 0; i < 100; i++) { - ev_run(evloop, EVRUN_NOWAIT); + ev_run(evloop_, EVRUN_NOWAIT); } - if(commands_created != commands_deleted) { - logger.error() << "All commands were not freed! " - << commands_deleted << "/" << commands_created; + if(commands_created_ != commands_deleted_) { + logger_.error() << "All commands were not freed! " + << commands_deleted_ << "/" << commands_created_; } - exited = true; - running = false; + exited_ = true; + running_ = false; // Let go for block_until_stopped method - exit_waiter.notify_one(); + exit_waiter_.notify_one(); - logger.info() << "Event thread exited."; + logger_.info() << "Event thread exited."; } -bool Redox::start() { +bool Redox::connect() { - event_loop_thread = thread([this] { run_event_loop(); }); + event_loop_thread_ = thread([this] { runEventLoop(); }); // Block until connected and running the event loop, or until // a connection error happens and the event loop exits - unique_lock ul(running_waiter_lock); - running_waiter.wait(ul, [this] { - return running.load() || connect_state == REDOX_CONNECT_ERROR; + unique_lock ul(running_waiter_lock_); + running_waiter_.wait(ul, [this] { + return running_.load() || connect_state_ == CONNECT_ERROR; }); // Return if succeeded - return connect_state == REDOX_CONNECTED; -} - -void Redox::stop_signal() { - to_exit = true; - logger.debug() << "stop_signal() called, breaking event loop"; - ev_async_send(evloop, &async_stop); -} - -void Redox::block() { - unique_lock ul(exit_waiter_lock); - exit_waiter.wait(ul, [this] { return exited.load(); }); + return connect_state_ == CONNECTED; } -void Redox::stop() { - stop_signal(); - block(); +void Redox::disconnect() { + to_exit_ = true; + logger_.debug() << "disconnect() called, breaking event loop"; + ev_async_send(evloop_, &watcher_stop_); } -void Redox::disconnect() { - stop_signal(); - if(connect_state == REDOX_CONNECTED) { - redisAsyncDisconnect(ctx); - block(); - } +void Redox::wait() { + unique_lock ul(exit_waiter_lock_); + exit_waiter_.wait(ul, [this] { return exited_.load(); }); } Redox::~Redox() { disconnect(); - if(event_loop_thread.joinable()) - event_loop_thread.join(); + if(event_loop_thread_.joinable()) + event_loop_thread_.join(); - ev_loop_destroy(evloop); + ev_loop_destroy(evloop_); - logger.info() << "Redox created " << commands_created - << " Commands and freed " << commands_deleted << "."; + logger_.info() << "Redox created " << commands_created_ + << " Commands and freed " << commands_deleted_ << "."; } template -Command* Redox::find_command(long id) { +Command* Redox::findCommand(long id) { - lock_guard lg(command_map_guard); + lock_guard lg(command_map_guard_); - auto& command_map = get_command_map(); + auto& command_map = getCommandMap(); auto it = command_map.find(id); if(it == command_map.end()) return nullptr; return it->second; } template -void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { +void Redox::commandCallback(redisAsyncContext* ctx, void* r, void* privdata) { Redox* rdx = (Redox*) ctx->data; long id = (long)privdata; redisReply* reply_obj = (redisReply*) r; - Command* c = rdx->find_command(id); + Command* c = rdx->findCommand(id); if(c == nullptr) { -// rdx->logger.warning() << "Couldn't find Command " << id << " in command_map (command_callback)."; +// rdx->logger.warning() << "Couldn't find Command " << id << " in command_map (commandCallback)."; freeReplyObject(reply_obj); return; } c->processReply(reply_obj); - - // Increment the Redox object command counter - rdx->cmd_count++; } /** @@ -251,7 +236,7 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { * true if succeeded, false otherwise. */ template -bool Redox::submit_to_server(Command* c) { +bool Redox::submitToServer(Command* c) { Redox* rdx = c->rdx_; c->pending_++; @@ -270,8 +255,8 @@ bool Redox::submit_to_server(Command* c) { string format = c->cmd_.substr(0, first) + "%b"; string value = c->cmd_.substr(first+1, last-first-1); - if (redisAsyncCommand(rdx->ctx, command_callback, (void*)c->id_, format.c_str(), value.c_str(), value.size()) != REDIS_OK) { - rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr; + if (redisAsyncCommand(rdx->ctx_, commandCallback < ReplyT > , (void*) c->id_, format.c_str(), value.c_str(), value.size()) != REDIS_OK) { + rdx->logger_.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx_->errstr; c->reply_status_ = Command::SEND_ERROR; c->invoke(); return false; @@ -280,8 +265,8 @@ bool Redox::submit_to_server(Command* c) { } } - if (redisAsyncCommand(rdx->ctx, command_callback, (void*)c->id_, c->cmd_.c_str()) != REDIS_OK) { - rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr; + if (redisAsyncCommand(rdx->ctx_, commandCallback < ReplyT > , (void*) c->id_, c->cmd_.c_str()) != REDIS_OK) { + rdx->logger_.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx_->errstr; c->reply_status_ = Command::SEND_ERROR; c->invoke(); return false; @@ -291,15 +276,15 @@ bool Redox::submit_to_server(Command* c) { } template -void Redox::submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { +void Redox::submitCommandCallback(struct ev_loop* loop, ev_timer* timer, int revents) { Redox* rdx = (Redox*) ev_userdata(loop); long id = (long)timer->data; - Command* c = rdx->find_command(id); + Command* c = rdx->findCommand(id); if(c == nullptr) { - rdx->logger.error() << "Couldn't find Command " << id - << " in command_map (submit_command_callback)."; + rdx->logger_.error() << "Couldn't find Command " << id + << " in command_map (submitCommandCallback)."; return; } @@ -318,23 +303,23 @@ void Redox::submit_command_callback(struct ev_loop* loop, ev_timer* timer, int r return; } - submit_to_server(c); + submitToServer(c); } template -bool Redox::process_queued_command(long id) { +bool Redox::proccessQueuedCommand(long id) { - Command* c = find_command(id); + Command* c = findCommand(id); if(c == nullptr) return false; if((c->repeat_ == 0) && (c->after_ == 0)) { - submit_to_server(c); + submitToServer(c); } else { c->timer_.data = (void*)c->id_; - ev_timer_init(&c->timer_, submit_command_callback, c->after_, c->repeat_); - ev_timer_start(evloop, &c->timer_); + ev_timer_init(&c->timer_, submitCommandCallback , c->after_, c->repeat_); + ev_timer_start(evloop_, &c->timer_); c->timer_guard_.unlock(); } @@ -342,7 +327,7 @@ bool Redox::process_queued_command(long id) { return true; } -void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int revents) { +void Redox::proccessQueuedCommands(struct ev_loop* loop, ev_async* async, int revents) { Redox* rdx = (Redox*) ev_userdata(loop); @@ -353,224 +338,60 @@ void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int r long id = rdx->command_queue.front(); rdx->command_queue.pop(); - if(rdx->process_queued_command(id)) {} - else if(rdx->process_queued_command(id)) {} - else if(rdx->process_queued_command(id)) {} - else if(rdx->process_queued_command(id)) {} - else if(rdx->process_queued_command(id)) {} - else if(rdx->process_queued_command(id)) {} - else if(rdx->process_queued_command>(id)) {} - else if(rdx->process_queued_command>(id)) {} - else if(rdx->process_queued_command>(id)) {} + if(rdx->proccessQueuedCommand(id)) {} + else if(rdx->proccessQueuedCommand(id)) {} + else if(rdx->proccessQueuedCommand(id)) {} + else if(rdx->proccessQueuedCommand(id)) {} + else if(rdx->proccessQueuedCommand(id)) {} + else if(rdx->proccessQueuedCommand(id)) {} + else if(rdx->proccessQueuedCommand>(id)) {} + else if(rdx->proccessQueuedCommand>(id)) {} + else if(rdx->proccessQueuedCommand>(id)) {} else throw runtime_error("Command pointer not found in any queue!"); } } // --------------------------------- -// Pub/Sub methods -// --------------------------------- - -void Redox::subscribe_raw(const string cmd_name, const string topic, - function msg_callback, - function sub_callback, - function unsub_callback, - function err_callback -) { - - // Start pubsub mode. No non-sub/unsub commands can be emitted by this client. - pubsub_mode = true; - - command_looping(cmd_name + " " + topic, - [this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command& c) { - - if(!c.ok()) { - if(err_callback) err_callback(topic, c.status()); - return; - } - - redisReply* reply = c.reply(); - - // For debugging only -// cout << "------" << endl; -// cout << cmd << " " << (reply->type == REDIS_REPLY_ARRAY) << " " << (reply->elements) << endl; -// for(int i = 0; i < reply->elements; i++) { -// redisReply* r = reply->element[i]; -// cout << "element " << i << ", reply type = " << r->type << " "; -// if(r->type == REDIS_REPLY_STRING) cout << r->str << endl; -// else if(r->type == REDIS_REPLY_INTEGER) cout << r->integer << endl; -// else cout << "some other type" << endl; -// } -// cout << "------" << endl; - - // TODO cancel this command on unsubscription? - - // If the last entry is an integer, then it is a [p]sub/[p]unsub command - if((reply->type == REDIS_REPLY_ARRAY) && - (reply->element[reply->elements-1]->type == REDIS_REPLY_INTEGER)) { - - if(!strncmp(reply->element[0]->str, "sub", 3)) { - subscribed_topics_.insert(topic); - if(sub_callback) sub_callback(topic); - - } else if(!strncmp(reply->element[0]->str, "psub", 4)) { - psubscribed_topics_.insert(topic); - if (sub_callback) sub_callback(topic); - - } else if(!strncmp(reply->element[0]->str, "uns", 3)) { - subscribed_topics_.erase(topic); - if (unsub_callback) unsub_callback(topic); - - } else if(!strncmp(reply->element[0]->str, "puns", 4)) { - psubscribed_topics_.erase(topic); - if (unsub_callback) unsub_callback(topic); - } - - else logger.error() << "Unknown pubsub message: " << reply->element[0]->str; - } - - // Message for subscribe - else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) { - char *msg = reply->element[2]->str; - if (msg && msg_callback) msg_callback(topic, reply->element[2]->str); - } - - // Message for psubscribe - else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 4)) { - char *msg = reply->element[2]->str; - if (msg && msg_callback) msg_callback(reply->element[2]->str, reply->element[3]->str); - } - - else logger.error() << "Unknown pubsub message of type " << reply->type; - }, - 1e10 // To keep the command around for a few hundred years - ); -} - -void Redox::subscribe(const string topic, - function msg_callback, - function sub_callback, - function unsub_callback, - function err_callback -) { - if(subscribed_topics_.find(topic) != subscribed_topics_.end()) { - logger.warning() << "Already subscribed to " << topic << "!"; - return; - } - subscribe_raw("SUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback); -} - -void Redox::psubscribe(const string topic, - function msg_callback, - function sub_callback, - function unsub_callback, - function err_callback -) { - if(psubscribed_topics_.find(topic) != psubscribed_topics_.end()) { - logger.warning() << "Already psubscribed to " << topic << "!"; - return; - } - subscribe_raw("PSUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback); -} - -void Redox::unsubscribe_raw(const string cmd_name, const string topic, - function err_callback -) { - command(cmd_name + " " + topic, - [topic, err_callback](Command& c) { - if(!c.ok()) { - if (err_callback) err_callback(topic, c.status()); - } - } - ); -} - -void Redox::unsubscribe(const string topic, - function err_callback -) { - if(subscribed_topics_.find(topic) == subscribed_topics_.end()) { - logger.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!"; - return; - } - unsubscribe_raw("UNSUBSCRIBE", topic, err_callback); -} - -void Redox::punsubscribe(const string topic, - function err_callback -) { - if(psubscribed_topics_.find(topic) == psubscribed_topics_.end()) { - logger.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!"; - return; - } - unsubscribe_raw("PUNSUBSCRIBE", topic, err_callback); -} - -void Redox::publish(const string topic, const string msg, - function pub_callback, - function err_callback -) { - command("PUBLISH " + topic + " " + msg, - [topic, msg, err_callback, pub_callback](Command& c) { - if(!c.ok()) { - if(err_callback) err_callback(topic, c.status()); - } - if(pub_callback) pub_callback(topic, msg); - } - ); -} - -/** -* Throw an exception for any non-pubsub commands. -*/ -void Redox::deny_non_pubsub(const string& cmd) { - - string cmd_name = cmd.substr(0, cmd.find(' ')); - - // Compare with the command's first 5 characters - if(!cmd_name.compare("SUBSCRIBE") || !cmd_name.compare("UNSUBSCRIBE") || - !cmd_name.compare("PSUBSCRIBE") || !cmd_name.compare("PUNSUBSCRIBE")) { - } else { - throw runtime_error("In pub/sub mode, this Redox instance can only issue " - "[p]subscribe/[p]unsubscribe commands! Use another instance for other commands."); - } -} - -// --------------------------------- // get_command_map specializations // --------------------------------- template<> unordered_map*>& -Redox::get_command_map() { return commands_redis_reply; } +Redox::getCommandMap() { return commands_redis_reply_; } template<> unordered_map*>& -Redox::get_command_map() { return commands_string_r; } +Redox::getCommandMap() { return commands_string_; } template<> unordered_map*>& -Redox::get_command_map() { return commands_char_p; } +Redox::getCommandMap() { return commands_char_p_; } template<> unordered_map*>& -Redox::get_command_map() { return commands_int; } +Redox::getCommandMap() { return commands_int_; } template<> unordered_map*>& -Redox::get_command_map() { return commands_long_long_int; } +Redox::getCommandMap() { return commands_long_long_int_; } template<> unordered_map*>& -Redox::get_command_map() { return commands_null; } +Redox::getCommandMap() { return commands_null_; } template<> unordered_map>*>& -Redox::get_command_map>() { return commands_vector_string; } +Redox::getCommandMap>() { return commands_vector_string_; } template<> unordered_map>*>& -Redox::get_command_map>() { return commands_set_string; } +Redox::getCommandMap>() { return commands_set_string_; } template<> unordered_map>*>& -Redox::get_command_map>() { return commands_unordered_set_string; } +Redox::getCommandMap>() { return commands_unordered_set_string_; } // ---------------------------- // Helpers // ---------------------------- -bool Redox::command_blocking(const string& cmd) { - auto& c = command_blocking(cmd); +void Redox::command(const std::string& cmd) { + command(cmd, nullptr); +} + +bool Redox::commandSync(const string& cmd) { + auto& c = commandSync(cmd); bool succeeded = c.ok(); c.free(); return succeeded; @@ -578,7 +399,7 @@ bool Redox::command_blocking(const string& cmd) { string Redox::get(const string& key) { - Command& c = command_blocking("GET " + key); + Command& c = commandSync("GET " + key); if(!c.ok()) { throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + to_string(c.status())); } @@ -588,11 +409,25 @@ string Redox::get(const string& key) { }; bool Redox::set(const string& key, const string& value) { - return command_blocking("SET " + key + " " + value); + return commandSync("SET " + key + " " + value); } bool Redox::del(const string& key) { - return command_blocking("DEL " + key); + return commandSync("DEL " + key); +} + +void Redox::publish(const string topic, const string msg, + function pub_callback, + function err_callback +) { + command("PUBLISH " + topic + " " + msg, + [topic, msg, err_callback, pub_callback](Command& c) { + if(!c.ok()) { + if(err_callback) err_callback(topic, c.status()); + } + if(pub_callback) pub_callback(topic, msg); + } + ); } } // End namespace redis diff --git a/src/redox.hpp b/src/redox.hpp index 16fc8a4..a72d364 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -27,23 +27,25 @@ namespace redox { -// Default to a local Redis server static const std::string REDIS_DEFAULT_HOST = "localhost"; static const int REDIS_DEFAULT_PORT = 6379; -// Connection status -static const int REDOX_NOT_YET_CONNECTED = 0; -static const int REDOX_CONNECTED = 1; -static const int REDOX_DISCONNECTED = 2; -static const int REDOX_CONNECT_ERROR = 3; -static const int REDOX_DISCONNECT_ERROR = 4; - +/** +* Redox intro here. +*/ class Redox { public: + // Connection states + static const int NOT_YET_CONNECTED = 0; + static const int CONNECTED = 1; + static const int DISCONNECTED = 2; + static const int CONNECT_ERROR = 3; + static const int DISCONNECT_ERROR = 4; + /** - * Initialize everything, connect over TCP to a Redis server. + * Initializes everything, connects over TCP to a Redis server. */ Redox( const std::string& host = REDIS_DEFAULT_HOST, @@ -54,7 +56,7 @@ public: ); /** - * Initialize everything, connect over unix sockets to a Redis server. + * Initializes everything, connects over unix sockets to a Redis server. */ Redox( const std::string& path, @@ -62,29 +64,29 @@ public: std::ostream& log_stream = std::cout, log::Level log_level = log::Info ); - ~Redox(); /** - * Connect to Redis and start the event loop in a separate thread. Returns - * true if and when everything is ready to go, or false on failure. + * Disconnects from the Redis server, shuts down the event loop, and cleans up. + * Internally calls disconnect() and wait(). */ - bool start(); + ~Redox(); /** - * Signal the event loop to stop processing commands and shut down. + * Connects to Redis and starts an event loop in a separate thread. Returns + * true once everything is ready, or false on failure. */ - void stop_signal(); + bool connect(); /** - * Wait for the event loop to exit, then return. + * Signal the event loop thread to disconnect from Redis and shut down. */ - void block(); + void disconnect(); /** - * Signal the event loop to stop, wait for all pending commands to be processed, - * and shut everything down. A simple combination of stop_signal() and block(). + * Blocks until the event loop exits and disconnection is complete, then returns. + * Usually no need to call manually as it is handled in the destructor. */ - void stop(); + void wait(); /** * Asynchronously runs a command and invokes the callback when a reply is @@ -101,7 +103,7 @@ public: /** * Asynchronously runs a command and ignores any errors or replies. */ - void command(const std::string& cmd) { command(cmd, nullptr); } + void command(const std::string& cmd); /** * Synchronously runs a command, returning the Command object only once @@ -109,150 +111,62 @@ public: * calling the Command object's .free() method when done with it. */ template - Command& command_blocking(const std::string& cmd); + Command& commandSync(const std::string& cmd); /** * Synchronously runs a command, returning only once a reply is received - * or there's an error. The return value is true if the command got a - * successful reply, and false if something went wrong. + * or there's an error. Returns true on successful reply, false on error. */ - bool command_blocking(const std::string& cmd); + bool commandSync(const std::string& cmd); + /** + * Creates an asynchronous command that is run every [repeat] seconds, + * with the first one run in [after] seconds. If [repeat] is 0, the + * command is run only once. + */ template - Command& command_looping( + Command& commandLoop( const std::string& cmd, const std::function&)>& callback, double repeat, double after = 0.0 ); - /** - * A wrapper around command() for synchronous use. Waits for a reply, populates it - * into the Command object, and returns when complete. The user can retrieve the - * results from the Command object - ok() will tell you if the call succeeded, - * status() will give the error code, and reply() will return the reply data if - * the call succeeded. - */ -// template -// Command& command_blocking(const std::string& cmd); - - /** - * Return the total number of successful commands processed by this Redox instance. - */ - long num_commands_processed() { return cmd_count; } - - // Hiredis context, left public to allow low-level access - redisAsyncContext *ctx; - - /** - * If connected, disconnect from the Redis server. Usually not necessary to invoke - * manually, as it is called in the destructor. - */ - void disconnect(); - // ------------------------------------------------ // Wrapper methods for convenience only // ------------------------------------------------ /** - * Non-templated version of command in case you really don't care - * about the reply and just want to send something off. - */ -// void command(const std::string& command); - - /** - * Non-templated version of command_blocking in case you really don't - * care about the reply. Returns true if succeeded, false if error. - */ -// bool command_blocking(const std::string& command); - - /** * Redis GET command wrapper - return the value for the given key, or throw - * an exception if there is an error. Blocking call, of course. + * an exception if there is an error. Blocking call. */ std::string get(const std::string& key); /** * Redis SET command wrapper - set the value for the given key. Return - * true if succeeded, false if error. + * true if succeeded, false if error. Blocking call. */ bool set(const std::string& key, const std::string& value); /** * Redis DEL command wrapper - delete the given key. Return true if succeeded, - * false if error. + * false if error. Blocking call. */ bool del(const std::string& key); - // ------------------------------------------------ - // Publish/subscribe - // ------------------------------------------------ - - // This is activated when subscribe is called. When active, - // all commands other than [P]SUBSCRIBE, [P]UNSUBSCRIBE - // throw exceptions - std::atomic_bool pubsub_mode = {false}; - - /** - * Subscribe to a topic. - * - * msg_callback: invoked whenever a message is received. - * sub_callback: invoked when successfully subscribed - * err_callback: invoked on some error state - */ - void subscribe(const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr - ); - - /** - * Subscribe to a topic with a pattern. - * - * msg_callback: invoked whenever a message is received. - * sub_callback: invoked when successfully subscribed - * err_callback: invoked on some error state - */ - void psubscribe(const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr - ); - /** * Publish to a topic. All subscribers will be notified. * * pub_callback: invoked when successfully published * err_callback: invoked on some error state - */ - void publish(const std::string topic, const std::string msg, - std::function pub_callback = nullptr, - std::function err_callback = nullptr - ); - - /** - * Unsubscribe from a topic. * - * err_callback: invoked on some error state + * // TODO */ - void unsubscribe(const std::string topic, - std::function err_callback = nullptr - ); - - /** - * Unsubscribe from a topic with a pattern. - * - * err_callback: invoked on some error state - */ - void punsubscribe(const std::string topic, - std::function err_callback = nullptr + void publish(const std::string topic, const std::string msg, + std::function pub_callback = nullptr, + std::function err_callback = nullptr ); - const std::set& subscribed_topics() { return subscribed_topics_; } - const std::set& psubscribed_topics() { return psubscribed_topics_; } - // ------------------------------------------------ // Public only for Command class // ------------------------------------------------ @@ -260,20 +174,23 @@ public: // Invoked by Command objects when they are completed template void remove_active_command(const long id) { - std::lock_guard lg1(command_map_guard); - get_command_map().erase(id); - commands_deleted += 1; + std::lock_guard lg1(command_map_guard_); + getCommandMap().erase(id); + commands_deleted_ += 1; } + // Hiredis context, left public to allow low-level access + redisAsyncContext * ctx_; + // Redox server over TCP - const std::string host; - const int port; + const std::string host_; + const int port_; // Redox server over unix - const std::string path; + const std::string path_; // Logger - log::Logger logger; + log::Logger logger_; private: @@ -291,108 +208,90 @@ private: void init_hiredis(); // Manage connection state - std::atomic_int connect_state = {REDOX_NOT_YET_CONNECTED}; - std::mutex connect_lock; - std::condition_variable connect_waiter; + std::atomic_int connect_state_ = {NOT_YET_CONNECTED}; + std::mutex connect_lock_; + std::condition_variable connect_waiter_; // User connect/disconnect callbacks - std::function user_connection_callback; + std::function user_connection_callback_; // Dynamically allocated libev event loop - struct ev_loop* evloop; + struct ev_loop* evloop_; // Asynchronous watchers - ev_async async_w; // For processing commands - ev_async async_stop; // For breaking the loop - - // Number of commands processed - std::atomic_long cmd_count = {0}; + ev_async watcher_command_; // For processing commands + ev_async watcher_stop_; // For breaking the loop // Track of Command objects allocated. Also provides unique Command IDs. - std::atomic_long commands_created = {0}; - std::atomic_long commands_deleted = {0}; + std::atomic_long commands_created_ = {0}; + std::atomic_long commands_deleted_ = {0}; // Separate thread to have a non-blocking event loop - std::thread event_loop_thread; + std::thread event_loop_thread_; // Variable and CV to know when the event loop starts running - std::atomic_bool running = {false}; - std::mutex running_waiter_lock; - std::condition_variable running_waiter; + std::atomic_bool running_ = {false}; + std::mutex running_waiter_lock_; + std::condition_variable running_waiter_; // Variable and CV to know when the event loop stops running - std::atomic_bool to_exit = {false}; // Signal to exit - std::atomic_bool exited = {false}; // Event thread exited - std::mutex exit_waiter_lock; - std::condition_variable exit_waiter; + std::atomic_bool to_exit_ = {false}; // Signal to exit + std::atomic_bool exited_ = {false}; // Event thread exited + std::mutex exit_waiter_lock_; + std::condition_variable exit_waiter_; // Maps of each Command, fetchable by the unique ID number - std::unordered_map*> commands_redis_reply; - std::unordered_map*> commands_string_r; - std::unordered_map*> commands_char_p; - std::unordered_map*> commands_int; - std::unordered_map*> commands_long_long_int; - std::unordered_map*> commands_null; - std::unordered_map>*> commands_vector_string; - std::unordered_map>*> commands_set_string; - std::unordered_map>*> commands_unordered_set_string; - std::mutex command_map_guard; // Guards access to all of the above + // In C++14, member variable templates will replace all of these types + // with a single templated declaration + // --------- + // template + // std::unordered_map*> commands_; + // --------- + std::unordered_map*> commands_redis_reply_; + std::unordered_map*> commands_string_; + std::unordered_map*> commands_char_p_; + std::unordered_map*> commands_int_; + std::unordered_map*> commands_long_long_int_; + std::unordered_map*> commands_null_; + std::unordered_map>*> commands_vector_string_; + std::unordered_map>*> commands_set_string_; + std::unordered_map>*> commands_unordered_set_string_; + std::mutex command_map_guard_; // Guards access to all of the above // Return the correct map from the above, based on the template specialization template - std::unordered_map*>& get_command_map(); + std::unordered_map*>& getCommandMap(); // Return the given Command from the relevant command map, or nullptr if not there template - Command* find_command(long id); + Command* findCommand(long id); std::queue command_queue; std::mutex queue_guard; - static void process_queued_commands(struct ev_loop* loop, ev_async* async, int revents); + static void proccessQueuedCommands(struct ev_loop* loop, ev_async* async, int revents); template - bool process_queued_command(long id); + bool proccessQueuedCommand(long id); - void run_event_loop(); + void runEventLoop(); // Callbacks invoked on server connection/disconnection - static void connected_callback(const redisAsyncContext *c, int status); - static void disconnected_callback(const redisAsyncContext *c, int status); + static void connectedCallback(const redisAsyncContext* c, int status); + static void disconnectedCallback(const redisAsyncContext* c, int status); template - static void command_callback(redisAsyncContext *ctx, void *r, void *privdata); + static void commandCallback(redisAsyncContext* ctx, void* r, void* privdata); template - static bool submit_to_server(Command* c); + static bool submitToServer(Command* c); template - static void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents); + static void submitCommandCallback(struct ev_loop* loop, ev_timer* timer, int revents); - void deny_non_pubsub(const std::string& cmd); - - // Base for subscribe and psubscribe - void subscribe_raw(const std::string cmd_name, const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr - ); - - // Base for unsubscribe and punsubscribe - void unsubscribe_raw(const std::string cmd_name, const std::string topic, - std::function err_callback = nullptr - ); - - // Keep track of topics because we can only unsubscribe - // from subscribed topics and punsubscribe from - // psubscribed topics, or hiredis leads to segfaults - std::set subscribed_topics_; - std::set psubscribed_topics_; }; // --------------------------- - template Command& Redox::createCommand( const std::string& cmd, @@ -402,27 +301,22 @@ Command& Redox::createCommand( bool free_memory ) { - if(!running) { - throw std::runtime_error("[ERROR] Need to start Redox before running commands!"); - } - - // Block if pubsub mode - if(pubsub_mode) { - deny_non_pubsub(cmd); + if(!running_) { + throw std::runtime_error("[ERROR] Need to connect Redox before running commands!"); } - commands_created += 1; - auto* c = new Command(this, commands_created, cmd, - callback, repeat, after, free_memory, logger); + commands_created_ += 1; + auto* c = new Command(this, commands_created_, cmd, + callback, repeat, after, free_memory, logger_); std::lock_guard lg(queue_guard); - std::lock_guard lg2(command_map_guard); + std::lock_guard lg2(command_map_guard_); - get_command_map()[c->id_] = c; + getCommandMap()[c->id_] = c; command_queue.push(c->id_); // Signal the event loop to process this command - ev_async_send(evloop, &async_w); + ev_async_send(evloop_, &watcher_command_); // logger.debug() << "Created Command " << c->id << " at " << c; @@ -438,7 +332,7 @@ void Redox::command( } template -Command& Redox::command_looping( +Command& Redox::commandLoop( const std::string& cmd, const std::function&)>& callback, double repeat, @@ -448,7 +342,7 @@ Command& Redox::command_looping( } template -Command& Redox::command_blocking(const std::string& cmd) { +Command& Redox::commandSync(const std::string& cmd) { auto& c = createCommand(cmd, nullptr, 0, 0, false); c.wait(); return c; diff --git a/src/subscriber.cpp b/src/subscriber.cpp new file mode 100644 index 0000000..8662baa --- /dev/null +++ b/src/subscriber.cpp @@ -0,0 +1,163 @@ +/** +* Redis C++11 wrapper. +*/ + +#include +#include "subscriber.hpp" + +using namespace std; + +namespace redox { + +Subscriber::Subscriber( + const std::string& host, const int port, + std::function connection_callback, + std::ostream& log_stream, log::Level log_level +) : rdx_(host, port, connection_callback, log_stream, log_level), + logger_(rdx_.logger_) {} + +Subscriber::Subscriber( + const std::string& path, + std::function connection_callback, + std::ostream& log_stream, log::Level log_level +) : rdx_(path, connection_callback, log_stream, log_level), + logger_(rdx_.logger_) {} + + +// For debugging only +void debugReply(Command c) { + + redisReply* reply = c.reply(); + + cout << "------" << endl; + cout << c.cmd() << " " << (reply->type == REDIS_REPLY_ARRAY) << " " << (reply->elements) << endl; + for(size_t i = 0; i < reply->elements; i++) { + redisReply* r = reply->element[i]; + cout << "element " << i << ", reply type = " << r->type << " "; + if(r->type == REDIS_REPLY_STRING) cout << r->str << endl; + else if(r->type == REDIS_REPLY_INTEGER) cout << r->integer << endl; + else cout << "some other type" << endl; + } + cout << "------" << endl; +} + +void Subscriber::subscribeBase(const string cmd_name, const string topic, + function msg_callback, + function sub_callback, + function unsub_callback, + function err_callback +) { + + rdx_.commandLoop(cmd_name + " " + topic, + [this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command& c) { + + if (!c.ok()) { + if (err_callback) err_callback(topic, c.status()); + return; + } + + redisReply* reply = c.reply(); + + // TODO cancel this command on unsubscription? + + // If the last entry is an integer, then it is a [p]sub/[p]unsub command + if ((reply->type == REDIS_REPLY_ARRAY) && + (reply->element[reply->elements - 1]->type == REDIS_REPLY_INTEGER)) { + + if (!strncmp(reply->element[0]->str, "sub", 3)) { + subscribed_topics_.insert(topic); + if (sub_callback) sub_callback(topic); + + } else if (!strncmp(reply->element[0]->str, "psub", 4)) { + psubscribed_topics_.insert(topic); + if (sub_callback) sub_callback(topic); + + } else if (!strncmp(reply->element[0]->str, "uns", 3)) { + subscribed_topics_.erase(topic); + if (unsub_callback) unsub_callback(topic); + + } else if (!strncmp(reply->element[0]->str, "puns", 4)) { + psubscribed_topics_.erase(topic); + if (unsub_callback) unsub_callback(topic); + } + + else logger_.error() << "Unknown pubsub message: " << reply->element[0]->str; + } + + // Message for subscribe + else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) { + char* msg = reply->element[2]->str; + if (msg && msg_callback) msg_callback(topic, reply->element[2]->str); + } + + // Message for psubscribe + else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 4)) { + char* msg = reply->element[2]->str; + if (msg && msg_callback) msg_callback(reply->element[2]->str, reply->element[3]->str); + } + + else logger_.error() << "Unknown pubsub message of type " << reply->type; + }, + 1e10 // To keep the command around for a few hundred years + ); +} + +void Subscriber::subscribe(const string topic, + function msg_callback, + function sub_callback, + function unsub_callback, + function err_callback +) { + if(subscribed_topics_.find(topic) != subscribed_topics_.end()) { + logger_.warning() << "Already subscribed to " << topic << "!"; + return; + } + subscribeBase("SUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback); +} + +void Subscriber::psubscribe(const string topic, + function msg_callback, + function sub_callback, + function unsub_callback, + function err_callback +) { + if(psubscribed_topics_.find(topic) != psubscribed_topics_.end()) { + logger_.warning() << "Already psubscribed to " << topic << "!"; + return; + } + subscribeBase("PSUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback); +} + +void Subscriber::unsubscribeBase(const string cmd_name, const string topic, + function err_callback +) { + rdx_.command(cmd_name + " " + topic, + [topic, err_callback](Command& c) { + if(!c.ok()) { + if (err_callback) err_callback(topic, c.status()); + } + } + ); +} + +void Subscriber::unsubscribe(const string topic, + function err_callback +) { + if(subscribed_topics_.find(topic) == subscribed_topics_.end()) { + logger_.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!"; + return; + } + unsubscribeBase("UNSUBSCRIBE", topic, err_callback); +} + +void Subscriber::punsubscribe(const string topic, + function err_callback +) { + if(psubscribed_topics_.find(topic) == psubscribed_topics_.end()) { + logger_.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!"; + return; + } + unsubscribeBase("PUNSUBSCRIBE", topic, err_callback); +} + +} // End namespace diff --git a/src/subscriber.hpp b/src/subscriber.hpp new file mode 100644 index 0000000..6be8a04 --- /dev/null +++ b/src/subscriber.hpp @@ -0,0 +1,135 @@ +/** +* Redis C++11 wrapper. +*/ + +#pragma once + +#include "redox.hpp" + +namespace redox { + +class Subscriber { + +public: + + /** + * Initializes everything, connects over TCP to a Redis server. + */ + Subscriber( + const std::string& host = REDIS_DEFAULT_HOST, + const int port = REDIS_DEFAULT_PORT, + std::function connection_callback = nullptr, + std::ostream& log_stream = std::cout, + log::Level log_level = log::Info + ); + + /** + * Initializes everything, connects over unix sockets to a Redis server. + */ + Subscriber( + const std::string& path, + std::function connection_callback, + std::ostream& log_stream = std::cout, + log::Level log_level = log::Info + ); + + /** + * Same as .connect() on a Redox instance. + */ + bool connect() { return rdx_.connect(); } + + /** + * Same as .disconnect() on a Redox instance. + */ + void disconnect() { return rdx_.disconnect(); } + + /** + * Same as .wait() on a Redox instance. + */ + void wait() { return rdx_.wait(); } + + /** + * Subscribe to a topic. + * + * msg_callback: invoked whenever a message is received. + * sub_callback: invoked when successfully subscribed + * err_callback: invoked on some error state + */ + void subscribe(const std::string topic, + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr + ); + + /** + * Subscribe to a topic with a pattern. + * + * msg_callback: invoked whenever a message is received. + * sub_callback: invoked when successfully subscribed + * err_callback: invoked on some error state + */ + void psubscribe(const std::string topic, + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr + ); + + /** + * Unsubscribe from a topic. + * + * err_callback: invoked on some error state + */ + void unsubscribe(const std::string topic, + std::function err_callback = nullptr + ); + + /** + * Unsubscribe from a topic with a pattern. + * + * err_callback: invoked on some error state + */ + void punsubscribe(const std::string topic, + std::function err_callback = nullptr + ); + + /** + * Return the topics that were subscribed() to. + */ + const std::set& subscribedTopics() { return subscribed_topics_; } + + /** + * Return the topic patterns that were psubscribed() to. + */ + const std::set& psubscribedTopics() { return psubscribed_topics_; } + +private: + + // Base for subscribe and psubscribe + void subscribeBase(const std::string cmd_name, const std::string topic, + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr + ); + + // Base for unsubscribe and punsubscribe + void unsubscribeBase(const std::string cmd_name, const std::string topic, + std::function err_callback = nullptr + ); + + // Underlying Redis client + Redox rdx_; + + // Keep track of topics because we can only unsubscribe + // from subscribed topics and punsubscribe from + // psubscribed topics, or hiredis leads to segfaults + std::set subscribed_topics_; + std::set psubscribed_topics_; + + // Reference to rdx_.logger_ for convenience + log::Logger& logger_; +}; + +} // End namespace diff --git a/test/test.cpp b/test/test.cpp index 525594a..d3cef32 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -25,18 +25,13 @@ protected: RedoxTest() { // Connect to the server - rdx.start(); + rdx.connect(); // Clear all keys used by the tests here rdx.command("DEL redox_test:a"); } - virtual ~RedoxTest() { - - // Block until the event loop exits. - // Each test is responsible for calling the stop_signal() - rdx.block(); - } + virtual ~RedoxTest() { } // CV and counter to wait for async commands to complete atomic_int cmd_count = {0}; @@ -85,10 +80,9 @@ protected: * Wait until all async commands that used check() as a callback * complete. */ - void wait_and_stop() { + void wait_for_replies() { unique_lock ul(cmd_waiter_lock); cmd_waiter.wait(ul, [this] { return (cmd_count == 0); }); - rdx.stop_signal(); }; template @@ -114,14 +108,14 @@ protected: TEST_F(RedoxTest, GetSet) { rdx.command("SET redox_test:a apple", print_and_check("OK")); rdx.command("GET redox_test:a", print_and_check("apple")); - wait_and_stop(); + wait_for_replies(); } TEST_F(RedoxTest, Delete) { rdx.command("SET redox_test:a apple", print_and_check("OK")); rdx.command("DEL redox_test:a", print_and_check(1)); rdx.command("GET redox_test:a", check(nullptr)); - wait_and_stop(); + wait_for_replies(); } TEST_F(RedoxTest, Incr) { @@ -130,22 +124,22 @@ TEST_F(RedoxTest, Incr) { rdx.command("INCR redox_test:a", check(i+1)); } rdx.command("GET redox_test:a", print_and_check(to_string(count))); - wait_and_stop(); + wait_for_replies(); } TEST_F(RedoxTest, Delayed) { - Command& c = rdx.command_looping("INCR redox_test:a", check(1), 0, 0.1); + Command& c = rdx.commandLoop("INCR redox_test:a", check(1), 0, 0.1); this_thread::sleep_for(chrono::milliseconds(150)); c.cancel(); rdx.command("GET redox_test:a", print_and_check(to_string(1))); - wait_and_stop(); + wait_for_replies(); } TEST_F(RedoxTest, Loop) { int count = 0; int target_count = 100; double dt = 0.001; - Command& cmd = rdx.command_looping("INCR redox_test:a", + Command& cmd = rdx.commandLoop("INCR redox_test:a", [this, &count](Command& c) { check(++count)(c); }, @@ -157,7 +151,7 @@ TEST_F(RedoxTest, Loop) { cmd.cancel(); rdx.command("GET redox_test:a", print_and_check(to_string(target_count))); - wait_and_stop(); + wait_for_replies(); } // ------------------------------------------- @@ -165,25 +159,22 @@ TEST_F(RedoxTest, Loop) { // ------------------------------------------- TEST_F(RedoxTest, GetSetSync) { - print_and_check_sync(rdx.command_blocking("SET redox_test:a apple"), "OK"); - print_and_check_sync(rdx.command_blocking("GET redox_test:a"), "apple"); - rdx.stop_signal(); + print_and_check_sync(rdx.commandSync("SET redox_test:a apple"), "OK"); + print_and_check_sync(rdx.commandSync("GET redox_test:a"), "apple"); } TEST_F(RedoxTest, DeleteSync) { - print_and_check_sync(rdx.command_blocking("SET redox_test:a apple"), "OK"); - print_and_check_sync(rdx.command_blocking("DEL redox_test:a"), 1); - check_sync(rdx.command_blocking("GET redox_test:a"), nullptr); - rdx.stop_signal(); + print_and_check_sync(rdx.commandSync("SET redox_test:a apple"), "OK"); + print_and_check_sync(rdx.commandSync("DEL redox_test:a"), 1); + check_sync(rdx.commandSync("GET redox_test:a"), nullptr); } TEST_F(RedoxTest, IncrSync) { int count = 100; for(int i = 0; i < count; i++) { - check_sync(rdx.command_blocking("INCR redox_test:a"), i+1); + check_sync(rdx.commandSync("INCR redox_test:a"), i+1); } - print_and_check_sync(rdx.command_blocking("GET redox_test:a"), to_string(count)); - rdx.stop_signal(); + print_and_check_sync(rdx.commandSync("GET redox_test:a"), to_string(count)); } // ------------------------------------------- -- libgit2 0.21.4