From c43f27479eb37068f0dd74f7ffec752749b9e4cd Mon Sep 17 00:00:00 2001 From: Hayk Martirosyan Date: Sun, 25 Jan 2015 21:16:56 -0800 Subject: [PATCH] Overhaul callback system to be a better interface --- CMakeLists.txt | 6 +++--- examples/basic.cpp | 6 +++++- examples/basic_threaded.cpp | 6 ++++-- examples/binary_data.cpp | 21 ++++++++++++--------- examples/data_types.cpp | 35 ++++++++++++++++------------------- examples/lpush_benchmark.cpp | 8 ++++++-- examples/multi-client.cpp | 4 ++++ examples/pub_sub.cpp | 8 ++------ examples/speed_test_async.cpp | 38 ++++++++++++++++++++++---------------- examples/speed_test_async_multi.cpp | 21 +++++++++++++-------- examples/speed_test_pubsub.cpp | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++ examples/speed_test_sync.cpp | 16 +++++++--------- examples/string_vs_charp.cpp | 49 ------------------------------------------------- src/command.cpp | 202 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------------------------------------------------------------------------------------------------- src/command.hpp | 59 ++++++++++++++++++++++++++++++++++++----------------------- src/redox.cpp | 65 ++++++++++++++++++++++++++++++++++++----------------------------- src/redox.hpp | 52 ++++++++++++++++++---------------------------------- test/test.cpp | 2 +- 18 files changed, 315 insertions(+), 335 deletions(-) create mode 100644 examples/speed_test_pubsub.cpp delete mode 100644 examples/string_vs_charp.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c5b0746..e13002a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,9 +80,6 @@ if (examples) add_executable(data_types examples/data_types.cpp ${SRC_ALL}) target_link_libraries(data_types ${LIB_REDIS}) - add_executable(string_v_char examples/string_vs_charp.cpp ${SRC_ALL}) - target_link_libraries(string_v_char ${LIB_REDIS}) - add_executable(multi_client examples/multi-client.cpp ${SRC_ALL}) target_link_libraries(multi_client ${LIB_REDIS}) @@ -92,4 +89,7 @@ if (examples) add_executable(pub_sub examples/pub_sub.cpp ${SRC_ALL}) target_link_libraries(pub_sub ${LIB_REDIS}) + add_executable(speed_test_pubsub examples/speed_test_pubsub ${SRC_ALL}) + target_link_libraries(speed_test_pubsub ${LIB_REDIS}) + endif() diff --git a/examples/basic.cpp b/examples/basic.cpp index ef11af1..b6b78e7 100644 --- a/examples/basic.cpp +++ b/examples/basic.cpp @@ -6,10 +6,13 @@ #include "../src/redox.hpp" using namespace std; +using redox::Redox; +using redox::Command; int main(int argc, char* argv[]) { - redox::Redox rdx = {"localhost", 6379}; // Initialize Redox + Redox rdx = {"localhost", 6379, nullptr, cout, redox::log::Info}; // Initialize Redox + if(!rdx.start()) return 1; // Start the event loop rdx.del("occupation"); @@ -20,4 +23,5 @@ int main(int argc, char* argv[]) { cout << "key = \"occupation\", value = \"" << rdx.get("occupation") << "\"" << endl; rdx.stop(); // Shut down the event loop + return 0; } diff --git a/examples/basic_threaded.cpp b/examples/basic_threaded.cpp index 733f50e..c13e824 100644 --- a/examples/basic_threaded.cpp +++ b/examples/basic_threaded.cpp @@ -8,6 +8,8 @@ #include "../src/redox.hpp" using namespace std; +using redox::Redox; +using redox::Command; redox::Redox rdx = {"localhost", 6379}; @@ -27,8 +29,8 @@ int main(int argc, char* argv[]) { for(int i = 0; i < 5; i++) { rdx.command( "GET counter", - [](const string& cmd, const string& value) { - cout << cmd << ": " << value << endl; + [](Command& c) { + if(c.ok()) cout << c.cmd() << ": " << c.reply() << endl; } ); this_thread::sleep_for(chrono::milliseconds(1000)); diff --git a/examples/binary_data.cpp b/examples/binary_data.cpp index 2d68c65..33ed426 100644 --- a/examples/binary_data.cpp +++ b/examples/binary_data.cpp @@ -8,6 +8,8 @@ #include "../src/redox.hpp" using namespace std; +using redox::Redox; +using redox::Command; /** * Random string generator. @@ -27,18 +29,19 @@ int main(int argc, char* argv[]) { string binary_data = random_string(10000); - auto c = rdx.command_blocking("SET binary \"" + binary_data + "\""); - if(c->ok()) cout << "Reply: " << c->reply() << endl; - else cerr << "Failed to set key! Status: " << c->status() << endl; - c->free(); + auto& c = rdx.command_blocking("SET binary \"" + binary_data + "\""); + if(c.ok()) cout << "Reply: " << c.reply() << endl; + else cerr << "Failed to set key! Status: " << c.status() << endl; + c.free(); - c = rdx.command_blocking("GET binary"); - if(c->ok()) { - if(c->reply() == binary_data) cout << "Binary data matches!" << endl; + auto& c2 = rdx.command_blocking("GET binary"); + if(c2.ok()) { + if(c2.reply() == binary_data) cout << "Binary data matches!" << endl; else cerr << "Binary data differs!" << endl; } - else cerr << "Failed to get key! Status: " << c->status() << endl; - c->free(); + else cerr << "Failed to get key! Status: " << c2.status() << endl; + c2.free(); rdx.stop(); // Shut down the event loop + return 0; } diff --git a/examples/data_types.cpp b/examples/data_types.cpp index d714b66..4c0d88e 100644 --- a/examples/data_types.cpp +++ b/examples/data_types.cpp @@ -9,6 +9,8 @@ #include using namespace std; +using redox::Redox; +using redox::Command; int main(int argc, char* argv[]) { @@ -20,39 +22,34 @@ int main(int argc, char* argv[]) { rdx.command_blocking("LPUSH mylist 1 2 3 4 5 6 7 8 9 10"); rdx.command>("LRANGE mylist 0 4", - [](const string& cmd, const vector& reply){ + [](Command>& c){ + if(!c.ok()) return; cout << "Last 5 elements as a vector: "; - for(const string& s : reply) cout << s << " "; + for (const string& s : c.reply()) cout << s << " "; cout << endl; - }, - [](const string& cmd, int status) { - cerr << "Error with LRANGE: " << status << endl; } ); rdx.command>("LRANGE mylist 0 4", - [](const string& cmd, const unordered_set& reply){ - cout << "Last 5 elements as an unordered set: "; - for(const string& s : reply) cout << s << " "; + [](Command>& c){ + if(!c.ok()) return; + cout << "Last 5 elements as a hash: "; + for (const string& s : c.reply()) cout << s << " "; cout << endl; - }, - [](const string& cmd, int status) { - cerr << "Error with LRANGE: " << status << endl; } ); rdx.command>("LRANGE mylist 0 4", - [&rdx](const string& cmd, const set& reply){ - cout << "Last 5 elements as a set: "; - for(const string& s : reply) cout << s << " "; - cout << endl; - rdx.stop_signal(); - }, - [&rdx](const string& cmd, int status) { - cerr << "Error with LRANGE: " << status << endl; + [&rdx](Command>& c) { + if(c.ok()) { + cout << "Last 5 elements as a set: "; + for (const string& s : c.reply()) cout << s << " "; + cout << endl; + } rdx.stop_signal(); } ); rdx.block(); // Shut down the event loop + return 0; } diff --git a/examples/lpush_benchmark.cpp b/examples/lpush_benchmark.cpp index 362d901..226d7f0 100644 --- a/examples/lpush_benchmark.cpp +++ b/examples/lpush_benchmark.cpp @@ -6,6 +6,8 @@ #include "../src/redox.hpp" using namespace std; +using redox::Redox; +using redox::Command; double time_s() { unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); @@ -26,12 +28,14 @@ int main(int argc, char* argv[]) { atomic_int count = {0}; for(int i = 1; i <= len; i++) { - rdx.command("lpush test 1", [&t0, &t1, &count, len, &rdx](const string& cmd, int reply) { + rdx.command("lpush test 1", [&t0, &t1, &count, len, &rdx](Command& c) { + + if(!c.ok()) return; count += 1; if(count == len) { - cout << cmd << ": " << reply << endl; + cout << c.cmd() << ": " << c.reply() << endl; double t2 = time_s(); cout << "Time to queue async commands: " << t1 - t0 << "s" << endl; diff --git a/examples/multi-client.cpp b/examples/multi-client.cpp index 4c3c1f7..7d186db 100644 --- a/examples/multi-client.cpp +++ b/examples/multi-client.cpp @@ -6,6 +6,8 @@ #include "../src/redox.hpp" using namespace std; +using redox::Redox; +using redox::Command; int main(int argc, char* argv[]) { @@ -23,4 +25,6 @@ int main(int argc, char* argv[]) { rdx1.stop(); rdx2.stop(); rdx3.stop(); + + return 0; } diff --git a/examples/pub_sub.cpp b/examples/pub_sub.cpp index c105d90..6301539 100644 --- a/examples/pub_sub.cpp +++ b/examples/pub_sub.cpp @@ -1,10 +1,4 @@ -#include #include -#include -#include -#include "hiredis/hiredis.h" -#include "hiredis/async.h" -#include "hiredis/adapters/libev.h" #include #include "../src/redox.hpp" @@ -55,4 +49,6 @@ int main(int argc, char *argv[]) { rdx_pub.publish("news", "whatup"); rdx.block(); rdx_pub.block(); + + return 0; } diff --git a/examples/speed_test_async.cpp b/examples/speed_test_async.cpp index 52de1a1..ab7df0b 100644 --- a/examples/speed_test_async.cpp +++ b/examples/speed_test_async.cpp @@ -8,7 +8,8 @@ #include "../src/redox.hpp" using namespace std; -using namespace redox; +using redox::Redox; +using redox::Command; double time_s() { unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); @@ -17,7 +18,7 @@ double time_s() { int main(int argc, char* argv[]) { - Redox rdx = {"localhost", 6379}; + Redox rdx = {"/var/run/redis/redis.sock", nullptr}; if(!rdx.start()) return 1; if(rdx.command_blocking("SET simple_loop:count 0")) { @@ -38,31 +39,36 @@ int main(int argc, char* argv[]) { double t0 = time_s(); atomic_int count(0); - Command* c = rdx.command( + Command& cmd = rdx.command( cmd_str, - [&count, &rdx](const string &cmd, const int& value) { count++; }, - [](const string& cmd, int status) { cerr << "Bad reply: " << status << endl; }, + [&count, &rdx](Command& c) { + if(!c.ok()) { + cerr << "Bad reply: " << c.status() << endl; + } + count++; + }, dt ); // Wait for t time, then stop the command. this_thread::sleep_for(chrono::microseconds((int)(t*1e6))); - c->cancel(); + cmd.cancel(); - // Get the final value of the counter - auto get_cmd = rdx.command_blocking("GET simple_loop:count"); - long final_count = stol(get_cmd->reply()); - get_cmd->free(); + rdx.command("GET simple_loop:count", [&](Command& c) { + if(!c.ok()) return; + long final_count = stol(c.reply()); - rdx.stop(); + double t_elapsed = time_s() - t0; + double actual_freq = (double)count / t_elapsed; - double t_elapsed = time_s() - t0; - double actual_freq = (double)count / t_elapsed; + cout << "Sent " << count << " commands in " << t_elapsed << "s, " + << "that's " << actual_freq << " commands/s." << endl; - cout << "Sent " << count << " commands in " << t_elapsed << "s, " - << "that's " << actual_freq << " commands/s." << endl; + cout << "Final value of counter: " << final_count << endl; - cout << "Final value of counter: " << final_count << endl; + rdx.stop_signal(); + }); + rdx.block(); return 0; } diff --git a/examples/speed_test_async_multi.cpp b/examples/speed_test_async_multi.cpp index a2dea41..582ca99 100644 --- a/examples/speed_test_async_multi.cpp +++ b/examples/speed_test_async_multi.cpp @@ -9,7 +9,8 @@ #include "../src/redox.hpp" using namespace std; -using namespace redox; +using redox::Redox; +using redox::Command; double time_s() { unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); @@ -42,26 +43,30 @@ int main(int argc, char* argv[]) { vector*> commands; for(int i = 0; i < parallel; i++) { - commands.push_back(rdx.command( + commands.push_back(&rdx.command( cmd_str, - [&count, &rdx](const string &cmd, const int& value) { count++; }, - [](const string& cmd, int status) { cerr << "Bad reply: " << status << endl; }, + [&count, &rdx](Command& c) { + if(!c.ok()) { + cerr << "Bad reply: " << c.status() << endl; + } + count++; + }, dt )); } // Wait for t time, then stop the command. this_thread::sleep_for(chrono::microseconds((int)(t*1e6))); - for(auto c : commands) c->cancel(); + for(auto& c : commands) c->cancel(); + + double t_elapsed = time_s() - t0; + double actual_freq = (double)count / t_elapsed; // Get the final value of the counter long final_count = stol(rdx.get("simple_loop:count")); rdx.stop(); - double t_elapsed = time_s() - t0; - double actual_freq = (double)count / t_elapsed; - cout << "Sent " << count << " commands in " << t_elapsed << "s, " << "that's " << actual_freq << " commands/s." << endl; diff --git a/examples/speed_test_pubsub.cpp b/examples/speed_test_pubsub.cpp new file mode 100644 index 0000000..4ae726c --- /dev/null +++ b/examples/speed_test_pubsub.cpp @@ -0,0 +1,52 @@ +#include +#include "../src/redox.hpp" + +using namespace std; +using redox::Redox; +using redox::Command; + +double time_s() { + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); + return (double)ms / 1e6; +} + +int main(int argc, char *argv[]) { + + Redox rdx_pub; + Redox rdx_sub; + + if(!rdx_pub.start()) return 1; + if(!rdx_sub.start()) return 1; + + atomic_int count(0); + auto got_message = [&count](const string& topic, const string& msg) { + count += 1; + }; + + auto subscribed = [](const string& topic) { + + }; + + auto unsubscribed = [](const string& topic) { + cout << "> Unsubscribed from " << topic << endl; + }; + + rdx_sub.subscribe("speedtest", got_message, subscribed, unsubscribed); + + double t0 = time_s(); + double t1 = t0; + double tspan = 5; + + while(t1 - t0 < tspan) { + rdx_pub.publish("speedtest", "hello"); + t1 = time_s(); + } + this_thread::sleep_for(chrono::milliseconds(1000)); + rdx_pub.stop(); + rdx_sub.stop(); + + double t = t1 - t0; + cout << "Total of messages sent in " << t << "s is " << count << endl; + double msg_per_s = count / t; + cout << "Messages per second: " << msg_per_s << endl; +} diff --git a/examples/speed_test_sync.cpp b/examples/speed_test_sync.cpp index 6cc3876..4cd152c 100644 --- a/examples/speed_test_sync.cpp +++ b/examples/speed_test_sync.cpp @@ -37,21 +37,19 @@ int main(int argc, char* argv[]) { int count = 0; while(time_s() < t_end) { - Command* c = rdx.command_blocking(cmd_str); - if(!c->ok()) cerr << "Bad reply, code: " << c->status() << endl; - c->free(); + Command& c = rdx.command_blocking(cmd_str); + if(!c.ok()) cerr << "Bad reply, code: " << c.status() << endl; + c.free(); count++; } - auto get_cmd = rdx.command_blocking("GET simple_loop:count"); - long final_count = stol(get_cmd->reply()); - get_cmd->free(); - - rdx.stop(); - double t_elapsed = time_s() - t0; double actual_freq = (double)count / t_elapsed; + long final_count = stol(rdx.get("simple_loop:count")); + + rdx.stop(); + cout << "Sent " << count << " commands in " << t_elapsed << "s, " << "that's " << actual_freq << " commands/s." << endl; diff --git a/examples/string_vs_charp.cpp b/examples/string_vs_charp.cpp deleted file mode 100644 index 769926d..0000000 --- a/examples/string_vs_charp.cpp +++ /dev/null @@ -1,49 +0,0 @@ -/** -* Redox test -* ---------- -* Increment a key on Redis using an asynchronous command on a timer. -*/ - -#include -#include "../src/redox.hpp" - -using namespace std; -using namespace redox; - -double time_s() { - unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); - return (double)ms / 1e6; -} - -int main(int argc, char* argv[]) { - - Redox rdx; - if(!rdx.start()) return 1; - - rdx.del("stringtest"); - rdx.set("stringtest", "value"); - - int count = 1000000; - double t0 = time_s(); - - string cmd_str = "GET stringtest"; - for(int i = 0; i < count; i++) { - rdx.command( - cmd_str, - [](const string &cmd, string const& value) { - value; - }, - [](const string &cmd, int status) { - cerr << "Bad reply: " << status << endl; - } - ); - } - - rdx.stop(); - - double t_elapsed = time_s() - t0; - - cout << "Sent " << count << " commands in " << t_elapsed << "s." << endl; - - return 0; -} diff --git a/src/command.cpp b/src/command.cpp index ad62bc5..435963c 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -18,11 +18,10 @@ Command::Command( Redox* rdx, long id, const std::string& cmd, - const std::function& callback, - const std::function& error_callback, + const std::function&)>& callback, double repeat, double after, bool free_memory, log::Logger& logger ) : rdx_(rdx), id_(id), cmd_(cmd), repeat_(repeat), after_(after), free_memory_(free_memory), - success_callback_(callback), error_callback_(error_callback), logger_(logger) { + callback_(callback), logger_(logger) { timer_guard_.lock(); } @@ -32,7 +31,8 @@ void Command::processReply(redisReply* r) { free_guard_.lock(); reply_obj_ = r; - handleCallback(); + parseReplyObject(); + invoke(); pending_--; @@ -96,7 +96,7 @@ void Command::freeCommand(Command* c) { template -const ReplyT& Command::reply() { +const ReplyT& Command::reply() const { if (!ok()) { logger_.warning() << cmd_ << ": Accessing value of reply with status != OK."; } @@ -104,179 +104,133 @@ const ReplyT& Command::reply() { } template -bool Command::isErrorReply() { +bool Command::isExpectedReply(int type) { + + if(reply_obj_->type == type) { + reply_status_ = OK_REPLY; + return true; + } + + if(checkErrorReply() || checkNilReply()) return false; + + logger_.error() << cmd_ << ": Received reply of type " << reply_obj_->type + << ", expected type " << type << "."; + reply_status_ = WRONG_TYPE; + return false; +} + +template +bool Command::isExpectedReply(int typeA, int typeB) { + + if((reply_obj_->type == typeA) || (reply_obj_->type == typeB)) { + reply_status_ = OK_REPLY; + return true; + } + + if(checkErrorReply() || checkNilReply()) return false; + + logger_.error() << cmd_ << ": Received reply of type " << reply_obj_->type + << ", expected type " << typeA << " or " << typeB << "."; + reply_status_ = WRONG_TYPE; + return false; +} + +template +bool Command::checkErrorReply() { if (reply_obj_->type == REDIS_REPLY_ERROR) { logger_.error() << cmd_ << ": " << reply_obj_->str; + reply_status_ = ERROR_REPLY; return true; } return false; } template -bool Command::isNilReply() { +bool Command::checkNilReply() { if (reply_obj_->type == REDIS_REPLY_NIL) { logger_.warning() << cmd_ << ": Nil reply."; + reply_status_ = NIL_REPLY; return true; } return false; } // ---------------------------------------------------------------------------- -// Specializations of handleCallback for all data types +// Specializations of parseReplyObject for all expected return types // ---------------------------------------------------------------------------- template<> -void Command::handleCallback() { - invokeSuccess(reply_obj_); +void Command::parseReplyObject() { + reply_val_ = reply_obj_; } template<> -void Command::handleCallback() { - - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); - else if(isNilReply()) invokeError(REDOX_NIL_REPLY); +void Command::parseReplyObject() { - else if(reply_obj_->type != REDIS_REPLY_STRING && reply_obj_->type != REDIS_REPLY_STATUS) { - logger_.error() << cmd_ << ": Received non-string reply."; - invokeError(REDOX_WRONG_TYPE); - - } else { - string s(reply_obj_->str, reply_obj_->len); - invokeSuccess(s); - } + if(!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) return; + reply_val_ = {reply_obj_->str, static_cast(reply_obj_->len)}; } template<> -void Command::handleCallback() { +void Command::parseReplyObject() { - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); - else if(isNilReply()) invokeError(REDOX_NIL_REPLY); + if(!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) return; + reply_val_ = reply_obj_->str; +} - else if(reply_obj_->type != REDIS_REPLY_STRING && reply_obj_->type != REDIS_REPLY_STATUS) { - logger_.error() << cmd_ << ": Received non-string reply."; - invokeError(REDOX_WRONG_TYPE); +template<> +void Command::parseReplyObject() { - } else { - invokeSuccess(reply_obj_->str); - } + if(!isExpectedReply(REDIS_REPLY_INTEGER)) return; + reply_val_ = (int) reply_obj_->integer; } template<> -void Command::handleCallback() { +void Command::parseReplyObject() { - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); - else if(isNilReply()) invokeError(REDOX_NIL_REPLY); + if(!isExpectedReply(REDIS_REPLY_INTEGER)) return; + reply_val_ = reply_obj_->integer; +} - else if(reply_obj_->type != REDIS_REPLY_INTEGER) { - logger_.error() << cmd_ << ": Received non-integer reply."; - invokeError(REDOX_WRONG_TYPE); +template<> +void Command::parseReplyObject() { - } else { - invokeSuccess((int) reply_obj_->integer); - } + if(!isExpectedReply(REDIS_REPLY_NIL)) return; + reply_val_ = nullptr; } template<> -void Command::handleCallback() { +void Command>::parseReplyObject() { - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); - else if(isNilReply()) invokeError(REDOX_NIL_REPLY); + if(!isExpectedReply(REDIS_REPLY_ARRAY)) return; - else if(reply_obj_->type != REDIS_REPLY_INTEGER) { - logger_.error() << cmd_ << ": Received non-integer reply."; - invokeError(REDOX_WRONG_TYPE); - - } else { - invokeSuccess(reply_obj_->integer); + for(size_t i = 0; i < reply_obj_->elements; i++) { + redisReply* r = *(reply_obj_->element + i); + reply_val_.emplace_back(r->str, r->len); } } template<> -void Command::handleCallback() { - - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); +void Command>::parseReplyObject() { - else if(reply_obj_->type != REDIS_REPLY_NIL) { - logger_.error() << cmd_ << ": Received non-nil reply."; - invokeError(REDOX_WRONG_TYPE); + if(!isExpectedReply(REDIS_REPLY_ARRAY)) return; - } else { - invokeSuccess(nullptr); + for(size_t i = 0; i < reply_obj_->elements; i++) { + redisReply* r = *(reply_obj_->element + i); + reply_val_.emplace(r->str, r->len); } } - template<> -void Command>::handleCallback() { - - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); - - else if(reply_obj_->type != REDIS_REPLY_ARRAY) { - logger_.error() << cmd_ << ": Received non-array reply."; - invokeError(REDOX_WRONG_TYPE); - - } else { - vector v; - size_t count = reply_obj_->elements; - for(size_t i = 0; i < count; i++) { - redisReply* r = *(reply_obj_->element + i); - if(r->type != REDIS_REPLY_STRING) { - logger_.error() << cmd_ << ": Received non-array reply."; - invokeError(REDOX_WRONG_TYPE); - } - v.emplace_back(r->str, r->len); - } - invokeSuccess(v); - } -} +void Command>::parseReplyObject() { -template<> -void Command>::handleCallback() { - - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); - - else if(reply_obj_->type != REDIS_REPLY_ARRAY) { - logger_.error() << cmd_ << ": Received non-array reply."; - invokeError(REDOX_WRONG_TYPE); - - } else { - unordered_set v; - size_t count = reply_obj_->elements; - for(size_t i = 0; i < count; i++) { - redisReply* r = *(reply_obj_->element + i); - if(r->type != REDIS_REPLY_STRING) { - logger_.error() << cmd_ << ": Received non-array reply."; - invokeError(REDOX_WRONG_TYPE); - } - v.emplace(r->str, r->len); - } - invokeSuccess(v); - } -} + if(!isExpectedReply(REDIS_REPLY_ARRAY)) return; -template<> -void Command>::handleCallback() { - - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); - - else if(reply_obj_->type != REDIS_REPLY_ARRAY) { - logger_.error() << cmd_ << ": Received non-array reply."; - invokeError(REDOX_WRONG_TYPE); - - } else { - set v; - size_t count = reply_obj_->elements; - for(size_t i = 0; i < count; i++) { - redisReply* r = *(reply_obj_->element + i); - if(r->type != REDIS_REPLY_STRING) { - logger_.error() << cmd_ << ": Received non-array reply."; - invokeError(REDOX_WRONG_TYPE); - } - v.emplace(r->str, r->len); - } - invokeSuccess(v); + for(size_t i = 0; i < reply_obj_->elements; i++) { + redisReply* r = *(reply_obj_->element + i); + reply_val_.emplace(r->str, r->len); } } diff --git a/src/command.hpp b/src/command.hpp index a49a07e..4961a12 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -16,21 +16,33 @@ namespace redox { -static const int REDOX_UNINIT = -1; -static const int REDOX_OK = 0; -static const int REDOX_SEND_ERROR = 1; -static const int REDOX_WRONG_TYPE = 2; -static const int REDOX_NIL_REPLY = 3; -static const int REDOX_ERROR_REPLY = 4; -static const int REDOX_TIMEOUT = 5; - class Redox; +//class Command; + +//template +//using CallbackT = std::function&)>; +/** +* The Command class represents a single command string to be sent to +* a Redis server, for both synchronous and asynchronous usage. It manages +* all of the state relevant to a single command string. A Command can also +* represent a deferred or looping command, in which case the success or +* error callbacks are invoked more than once. +*/ template class Command { public: + // Reply codes + static const int NO_REPLY = -1; // No reply yet + static const int OK_REPLY = 0; // Successful reply of the expected type + static const int NIL_REPLY = 1; // Got a nil reply + static const int ERROR_REPLY = 2; // Got an error reply + static const int SEND_ERROR = 3; // Could not send to server + static const int WRONG_TYPE = 4; // Got reply, but it was not the expected type + static const int TIMEOUT = 5; // No reply, timed out + /** * Frees memory allocated by this command. Commands with free_memory = false * must be freed by the user. @@ -45,25 +57,27 @@ public: /** * Returns true if the command has been canceled. */ - bool canceled() { return canceled_; } + bool canceled() const { return canceled_; } /** * Returns the reply status of this command. * Use ONLY with command_blocking. */ - int status() { return reply_status_; }; + int status() const { return reply_status_; }; /** * Returns true if this command got a successful reply. * Use ONLY with command_blocking. */ - bool ok() { return reply_status_ == REDOX_OK; } + bool ok() const { return reply_status_ == OK_REPLY; } /** * Returns the reply value, if the reply was successful (ok() == true). * Use ONLY with command_blocking. */ - const ReplyT& reply(); + const ReplyT& reply() const; + + const std::string& cmd() const { return cmd_; }; // Allow public access to constructed data Redox* const rdx_; @@ -79,8 +93,7 @@ private: Redox* rdx, long id, const std::string& cmd, - const std::function& callback, - const std::function& error_callback, + const std::function&)>& callback, double repeat, double after, bool free_memory, log::Logger& logger @@ -91,14 +104,15 @@ private: // Invoke a user callback from the reply object. This method is specialized // for each ReplyT of Command. - void handleCallback(); + void parseReplyObject(); - // Directly invoke the user callbacks if the exist - void invokeSuccess(const ReplyT& reply) { if (success_callback_) success_callback_(cmd_, reply); } - void invokeError(int status) { if (error_callback_) error_callback_(cmd_, status); } + // Directly invoke the user callback if it exists + void invoke() { if(callback_) callback_(*this); } - bool isErrorReply(); - bool isNilReply(); + bool checkErrorReply(); + bool checkNilReply(); + bool isExpectedReply(int type); + bool isExpectedReply(int typeA, int typeB); // Delete the provided Command object and deregister as an active // command from its Redox instance. @@ -110,9 +124,8 @@ private: // The last server reply redisReply* reply_obj_ = nullptr; - // Callbacks on success and error - const std::function success_callback_; - const std::function error_callback_; + // User callback + const std::function&)> callback_; // Place to store the reply value and status. // ONLY for blocking commands diff --git a/src/redox.cpp b/src/redox.cpp index 3d26f3a..9a4a69d 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -272,7 +272,8 @@ bool Redox::submit_to_server(Command* c) { string value = c->cmd_.substr(first+1, last-first-1); if (redisAsyncCommand(rdx->ctx, command_callback, (void*)c->id_, format.c_str(), value.c_str(), value.size()) != REDIS_OK) { rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr; - c->invokeError(REDOX_SEND_ERROR); + c->reply_status_ = Command::SEND_ERROR; + c->invoke(); return false; } return true; @@ -281,7 +282,8 @@ bool Redox::submit_to_server(Command* c) { if (redisAsyncCommand(rdx->ctx, command_callback, (void*)c->id_, c->cmd_.c_str()) != REDIS_OK) { rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr; - c->invokeError(REDOX_SEND_ERROR); + c->reply_status_ = Command::SEND_ERROR; + c->invoke(); return false; } @@ -379,7 +381,14 @@ void Redox::subscribe_raw(const string cmd_name, const string topic, pubsub_mode = true; command(cmd_name + " " + topic, - [this, topic, msg_callback, sub_callback, unsub_callback](const string &cmd, redisReply* const& reply) { + [this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command& c) { + + if(!c.ok()) { + if(err_callback) err_callback(topic, c.status()); + return; + } + + redisReply* reply = c.reply(); // For debugging only // cout << "------" << endl; @@ -398,19 +407,19 @@ void Redox::subscribe_raw(const string cmd_name, const string topic, (reply->element[reply->elements-1]->type == REDIS_REPLY_INTEGER)) { if(!strncmp(reply->element[0]->str, "sub", 3)) { - sub_queue.insert(topic); + subscribed_topics_.insert(topic); if(sub_callback) sub_callback(topic); } else if(!strncmp(reply->element[0]->str, "psub", 4)) { - psub_queue.insert(topic); + psubscribed_topics_.insert(topic); if (sub_callback) sub_callback(topic); } else if(!strncmp(reply->element[0]->str, "uns", 3)) { - sub_queue.erase(topic); + subscribed_topics_.erase(topic); if (unsub_callback) unsub_callback(topic); } else if(!strncmp(reply->element[0]->str, "puns", 4)) { - psub_queue.erase(topic); + psubscribed_topics_.erase(topic); if (unsub_callback) unsub_callback(topic); } @@ -431,9 +440,6 @@ void Redox::subscribe_raw(const string cmd_name, const string topic, else logger.error() << "Unknown pubsub message of type " << reply->type; }, - [topic, err_callback](const string &cmd, int status) { - if(err_callback) err_callback(topic, status); - }, 1e10 // To keep the command around for a few hundred years ); } @@ -444,7 +450,7 @@ void Redox::subscribe(const string topic, function unsub_callback, function err_callback ) { - if(sub_queue.find(topic) != sub_queue.end()) { + if(subscribed_topics_.find(topic) != subscribed_topics_.end()) { logger.warning() << "Already subscribed to " << topic << "!"; return; } @@ -457,7 +463,7 @@ void Redox::psubscribe(const string topic, function unsub_callback, function err_callback ) { - if(psub_queue.find(topic) != psub_queue.end()) { + if(psubscribed_topics_.find(topic) != psubscribed_topics_.end()) { logger.warning() << "Already psubscribed to " << topic << "!"; return; } @@ -468,9 +474,10 @@ void Redox::unsubscribe_raw(const string cmd_name, const string topic, function err_callback ) { command(cmd_name + " " + topic, - nullptr, - [topic, err_callback](const string& cmd, int status) { - if(err_callback) err_callback(topic, status); + [topic, err_callback](Command& c) { + if(!c.ok()) { + if (err_callback) err_callback(topic, c.status()); + } } ); } @@ -478,7 +485,7 @@ void Redox::unsubscribe_raw(const string cmd_name, const string topic, void Redox::unsubscribe(const string topic, function err_callback ) { - if(sub_queue.find(topic) == sub_queue.end()) { + if(subscribed_topics_.find(topic) == subscribed_topics_.end()) { logger.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!"; return; } @@ -488,7 +495,7 @@ void Redox::unsubscribe(const string topic, void Redox::punsubscribe(const string topic, function err_callback ) { - if(psub_queue.find(topic) == psub_queue.end()) { + if(psubscribed_topics_.find(topic) == psubscribed_topics_.end()) { logger.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!"; return; } @@ -500,11 +507,11 @@ void Redox::publish(const string topic, const string msg, function err_callback ) { command("PUBLISH " + topic + " " + msg, - [topic, msg, pub_callback](const string& command, redisReply* const& reply) { + [topic, msg, err_callback, pub_callback](Command& c) { + if(!c.ok()) { + if(err_callback) err_callback(topic, c.status()); + } if(pub_callback) pub_callback(topic, msg); - }, - [topic, err_callback](const string& command, int status) { - if(err_callback) err_callback(topic, status); } ); } @@ -565,20 +572,20 @@ void Redox::command(const string& cmd) { } bool Redox::command_blocking(const string& cmd) { - Command* c = command_blocking(cmd); - bool succeeded = c->ok(); - c->free(); + Command& c = command_blocking(cmd); + bool succeeded = c.ok(); + c.free(); return succeeded; } string Redox::get(const string& key) { - auto c = command_blocking("GET " + key); - if(!c->ok()) { - throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + to_string(c->status())); + Command& c = command_blocking("GET " + key); + if(!c.ok()) { + throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + to_string(c.status())); } - string reply = c->reply(); - c->free(); + string reply = c.reply(); + c.free(); return reply; }; diff --git a/src/redox.hpp b/src/redox.hpp index 3b99bf9..b268b13 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -105,15 +105,15 @@ public: * user is responsible for calling free() on the Command object. */ template - Command* command( + Command& command( const std::string& cmd, - const std::function& callback = nullptr, - const std::function& error_callback = nullptr, + const std::function&)>& callback = nullptr, double repeat = 0.0, double after = 0.0, bool free_memory = true ); + /** * A wrapper around command() for synchronous use. Waits for a reply, populates it * into the Command object, and returns when complete. The user can retrieve the @@ -122,7 +122,7 @@ public: * the call succeeded. */ template - Command* command_blocking(const std::string& cmd); + Command& command_blocking(const std::string& cmd); /** * Return the total number of successful commands processed by this Redox instance. @@ -238,8 +238,8 @@ public: std::function err_callback = nullptr ); - const std::set& subscribed_topics() { return sub_queue; } - const std::set& psubscribed_topics() { return psub_queue; } + const std::set& subscribed_topics() { return subscribed_topics_; } + const std::set& psubscribed_topics() { return psubscribed_topics_; } // ------------------------------------------------ // Public only for Command class @@ -365,18 +365,17 @@ private: // Keep track of topics because we can only unsubscribe // from subscribed topics and punsubscribe from // psubscribed topics, or hiredis leads to segfaults - std::set sub_queue; - std::set psub_queue; + std::set subscribed_topics_; + std::set psubscribed_topics_; }; // --------------------------- template -Command* Redox::command( +Command& Redox::command( const std::string& cmd, - const std::function& callback, - const std::function& error_callback, + const std::function&)>& callback, double repeat, double after, bool free_memory @@ -393,7 +392,7 @@ Command* Redox::command( commands_created += 1; auto* c = new Command(this, commands_created, cmd, - callback, error_callback, repeat, after, free_memory, logger); + callback, repeat, after, free_memory, logger); std::lock_guard lg(queue_guard); std::lock_guard lg2(command_map_guard); @@ -406,41 +405,26 @@ Command* Redox::command( // logger.debug() << "Created Command " << c->id << " at " << c; - return c; + return *c; } template -Command* Redox::command_blocking(const std::string& cmd) { - - ReplyT val; - std::atomic_int status(REDOX_UNINIT); +Command& Redox::command_blocking(const std::string& cmd) { std::condition_variable cv; std::mutex m; - std::unique_lock lk(m); + std::atomic_bool done = {false}; - Command* c = command(cmd, - [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) { - std::unique_lock ul(m); - val = reply; - status = REDOX_OK; - ul.unlock(); - cv.notify_one(); - }, - [&status, &m, &cv](const std::string& cmd_str, int error) { - std::unique_lock ul(m); - status = error; - ul.unlock(); + Command& c = command(cmd, + [&cv, &done](Command& cmd_obj) { + done = true; cv.notify_one(); }, 0, 0, false // No repeats, don't free memory ); - cv.wait(lk, [&status] { return status != REDOX_UNINIT; }); - c->reply_val_ = val; - c->reply_status_ = status; - + cv.wait(lk, [&done]() { return done.load(); }); return c; } diff --git a/test/test.cpp b/test/test.cpp index 2c84905..6cc4521 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -100,7 +100,7 @@ protected: void print_and_check_sync(Command* c, const ReplyT& value) { ASSERT_TRUE(c->ok()); EXPECT_EQ(c->reply(), value); - cout << "[SYNC] " << c->cmd << ": " << c->reply() << endl; + cout << "[SYNC] " << c->cmd_ << ": " << c->reply() << endl; c->free(); } }; -- libgit2 0.21.4