diff --git a/README.md b/README.md index 6bba6b8..6b66942 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,8 @@ details so you can move on to the interesting part of your project. * Automatic pipelining, even for synchronous calls from separate threads * Low-level access when needed * Accessible and robust error handling - * Logs to any ostream at a user-controlled log level + * Configurable logging level and output to any ostream + * Full support for binary data (keys and values) * Fast - developed for robotics applications * 100% clean Valgrind reports @@ -76,10 +77,12 @@ a reply is received from the server. #### Asynchronous commands In a high-performance application, we don't want to wait for a reply, but instead -do other work. -The `command` method accepts a Redis command and a callback to be invoked when a reply is received. - - rdx.command("GET hello", [](Command& c) { +do other work. At the core of Redox is a generic asynchronous API for executing +any Redis command and providing a reply callback. The `command` method accepts a +Redis command in the form of an STL vector of strings, and a callback to be invoked +when a reply is received or if there is an error. + + rdx.command({"GET", "hello"}, [](Command& c) { if(c.ok()) { cout << "Hello, async " << c.reply() << endl; } else { @@ -91,7 +94,8 @@ This statement tells redox to run the command `GET hello`. The `` templa parameter means that we want the reply to be put into a string and that we expect the server to respond with something that can be put into a string. The full list of reply types is listed in this document and covers convenient access to anything -returned from the Redis protocol. +returned from the Redis protocol. The input vector can contain arbitrary binary +data. The second argument is a callback function that accepts a reference to a Command object of the requested reply type. The Command object contains the reply and any error @@ -108,17 +112,17 @@ Here is a simple example of running `GET hello` asynchronously ten times: // Block until connected, localhost by default if(!rdx.connect()) return 1; - + auto got_reply = [](Command& c) { if(!c.ok()) return; cout << c.cmd() << ": " << c.reply() << endl; }; - - for(int i = 0; i < 10; i++) rdx.command("GET hello", got_reply); - + + for(int i = 0; i < 10; i++) rdx.command({"GET", "hello"}, got_reply); + // Do useful work this_thread::sleep_for(chrono::milliseconds(10)); - + rdx.disconnect(); // Block until disconnected The `.command()` method returns immediately, so this program doesn't wait for a reply @@ -136,7 +140,7 @@ shut down after we get all replies, we could do something like this: if(count == total) rdx.stop(); // Signal to shut down }; - for(int i = 0; i < total; i++) rdx.command("GET hello", got_reply); + for(int i = 0; i < total; i++) rdx.command({"GET", "hello"}, got_reply); // Do useful work @@ -157,13 +161,13 @@ between synchronous commands in different threads. The `commandSync` method prov a similar API to `command`, but instead of a callback returns a Command object when a reply is received. - Command& c = rdx.commandSync("GET hello"); + Command& c = rdx.commandSync({"GET", "hello"}); if(c.ok()) cout << c.cmd() << ": " << c.reply() << endl; c.free(); When using synchronous commands, the user is responsible for freeing the memory of -the Command object by calling `c.free()`. The `c.cmd()` method just returns the -command string (`GET hello` in this case). +the Command object by calling `c.free()`. The `c.cmd()` method just returns a string +representation of the command (`GET hello` in this case). #### Looping and delayed commands We often want to run commands on regular invervals. Redox provides the `commandLoop` @@ -173,7 +177,7 @@ commands in a loop, because it only creates a single Command object. to repeat the command. It then runs the command on the given interval until the user calls `c.free()`. - Command& cmd = rdx.commandLoop("GET hello", [](Command& c) { + Command& cmd = rdx.commandLoop({"GET", "hello"}, [](Command& c) { if(c.ok()) cout << c.cmd() << ": " << c.reply() << endl; }, 0.1); @@ -185,7 +189,7 @@ Finally, `commandDelayed` runs a command after a specified delay (in seconds). I not return a command object, because the memory is automatically freed after the callback is invoked. - rdx.commandDelayed("GET hello", [](Command& c) { + rdx.commandDelayed({"GET", "hello"}, [](Command& c) { if(c.ok()) cout << c.cmd() << ": " << c.reply() << endl; }, 1); this_thread::sleep_for(chrono::seconds(2)); @@ -219,6 +223,12 @@ receives messages and provides subscribe/unsubscribe and psubscribe/punsubscribe sub.disconnect(); rdx.disconnect(); +#### strToVec and vecToStr +Redox provides helper methods to convert between a string command and +a vector of strings as needed by its API. `rdx.strToVec("GET foo")` +will return an `std::vector` containing `GET` and `foo` +as entries. `rdx.vecToStr({"GET", "foo"})` will return the string `GET foo`. + ## Reply types These the available template parameters in redox and the Redis [return types](http://redis.io/topics/protocol) they can hold. diff --git a/examples/basic_threaded.cpp b/examples/basic_threaded.cpp index b217196..5cbd940 100644 --- a/examples/basic_threaded.cpp +++ b/examples/basic_threaded.cpp @@ -19,7 +19,7 @@ int main(int argc, char* argv[]) { thread setter([]() { for(int i = 0; i < 5000; i++) { - rdx.command("INCR counter"); + rdx.command({"INCR", "counter"}); this_thread::sleep_for(chrono::milliseconds(1)); } cout << "Setter thread exiting." << endl; @@ -28,7 +28,7 @@ int main(int argc, char* argv[]) { thread getter([]() { for(int i = 0; i < 5; i++) { rdx.command( - "GET counter", + {"GET", "counter"}, [](Command& c) { if(c.ok()) cout << c.cmd() << ": " << c.reply() << endl; } diff --git a/examples/binary_data.cpp b/examples/binary_data.cpp index 313f416..dfc80b5 100644 --- a/examples/binary_data.cpp +++ b/examples/binary_data.cpp @@ -25,16 +25,17 @@ int main(int argc, char* argv[]) { redox::Redox rdx; // Initialize Redox if(!rdx.connect("localhost", 6379)) return 1; // Start the event loop - rdx.del("binary"); - + string binary_key = random_string(100); string binary_data = random_string(10000); - auto& c = rdx.commandSync("SET binary \"" + binary_data + "\""); + rdx.del(binary_key); + + auto& c = rdx.commandSync({"SET", binary_key, binary_data}); if(c.ok()) cout << "Reply: " << c.reply() << endl; else cerr << "Failed to set key! Status: " << c.status() << endl; c.free(); - auto& c2 = rdx.commandSync("GET binary"); + auto& c2 = rdx.commandSync({"GET", binary_key}); if(c2.ok()) { if(c2.reply() == binary_data) cout << "Binary data matches!" << endl; else cerr << "Binary data differs!" << endl; diff --git a/examples/data_types.cpp b/examples/data_types.cpp index 2d857b9..c39fb7f 100644 --- a/examples/data_types.cpp +++ b/examples/data_types.cpp @@ -3,10 +3,10 @@ */ #include -#include "redox.hpp" #include #include #include +#include "redox.hpp" using namespace std; using redox::Redox; @@ -19,9 +19,9 @@ int main(int argc, char* argv[]) { rdx.del("mylist"); - rdx.commandSync("LPUSH mylist 1 2 3 4 5 6 7 8 9 10"); + rdx.commandSync(rdx.strToVec("LPUSH mylist 1 2 3 4 5 6 7 8 9 10")); - rdx.command>("LRANGE mylist 0 4", + rdx.command>({"LRANGE", "mylist", "0", "4"}, [](Command>& c){ if(!c.ok()) return; cout << "Last 5 elements as a vector: "; @@ -30,7 +30,7 @@ int main(int argc, char* argv[]) { } ); - rdx.command>("LRANGE mylist 0 4", + rdx.command>(rdx.strToVec("LRANGE mylist 0 4"), [](Command>& c){ if(!c.ok()) return; cout << "Last 5 elements as a hash: "; @@ -39,7 +39,7 @@ int main(int argc, char* argv[]) { } ); - rdx.command>("LRANGE mylist 0 4", + rdx.command>(rdx.strToVec("LRANGE mylist 0 4"), [&rdx](Command>& c) { if(c.ok()) { cout << "Last 5 elements as a set: "; diff --git a/examples/lpush_benchmark.cpp b/examples/lpush_benchmark.cpp index 82b38ad..ec99b2c 100644 --- a/examples/lpush_benchmark.cpp +++ b/examples/lpush_benchmark.cpp @@ -28,7 +28,7 @@ int main(int argc, char* argv[]) { atomic_int count = {0}; for(int i = 1; i <= len; i++) { - rdx.command("lpush test 1", [&t0, &t1, &count, len, &rdx](Command& c) { + rdx.command({"lpush", "test", "1"}, [&t0, &t1, &count, len, &rdx](Command& c) { if(!c.ok()) return; diff --git a/examples/speed_test_async.cpp b/examples/speed_test_async.cpp index 315ddb0..f375384 100644 --- a/examples/speed_test_async.cpp +++ b/examples/speed_test_async.cpp @@ -19,29 +19,28 @@ double time_s() { int main(int argc, char* argv[]) { Redox rdx; - if(!rdx.connect_unix("/var/run/redis/redis.sock")) return 1; + if(!rdx.connectUnix("/var/run/redis/redis.sock")) return 1; - bool status = rdx.commandSync("SET simple_loop:count 0"); - if(status) { + if(rdx.set("simple_loop:count", "0")) { cout << "Reset the counter to zero." << endl; } else { cerr << "Failed to reset counter." << endl; return 1; } - string cmd_str = "INCR simple_loop:count"; + vector cmd_vec = {"INCR", "simple_loop:count"}; double freq = 400000; // Hz double dt = 1 / freq; // s double t = 5; // s - cout << "Sending \"" << cmd_str << "\" asynchronously every " + cout << "Sending \"" << rdx.vecToStr(cmd_vec) << "\" asynchronously every " << dt << "s for " << t << "s..." << endl; double t0 = time_s(); atomic_int count(0); Command& cmd = rdx.commandLoop( - cmd_str, + cmd_vec, [&count, &rdx](Command& c) { if (!c.ok()) { cerr << "Bad reply: " << c.status() << endl; @@ -67,4 +66,4 @@ int main(int argc, char* argv[]) { rdx.disconnect(); return 0; -} +}; diff --git a/examples/speed_test_async_multi.cpp b/examples/speed_test_async_multi.cpp index a5243e0..8c85699 100644 --- a/examples/speed_test_async_multi.cpp +++ b/examples/speed_test_async_multi.cpp @@ -19,7 +19,7 @@ double time_s() { int main(int argc, char* argv[]) { - Redox rdx; + Redox rdx = {cout, redox::log::Debug}; if(!rdx.connect("localhost", 6379)) return 1; if(rdx.set("simple_loop:count", "0")) { @@ -29,13 +29,13 @@ int main(int argc, char* argv[]) { return 1; } - string cmd_str = "INCR simple_loop:count"; + vector cmd_vec = {"INCR", "simple_loop:count"}; double freq = 10000; // Hz double dt = 1 / freq; // s double t = 5; // s int parallel = 100; - cout << "Sending \"" << cmd_str << "\" asynchronously every " + cout << "Sending \"" << rdx.vecToStr(cmd_vec) << "\" asynchronously every " << dt << "s for " << t << "s..." << endl; double t0 = time_s(); @@ -44,7 +44,7 @@ int main(int argc, char* argv[]) { vector*> commands; for(int i = 0; i < parallel; i++) { commands.push_back(&rdx.commandLoop( - cmd_str, + cmd_vec, [&count, &rdx](Command& c) { if (!c.ok()) { cerr << "Bad reply: " << c.status() << endl; @@ -72,4 +72,4 @@ int main(int argc, char* argv[]) { rdx.disconnect(); return 0; -} +}; diff --git a/examples/speed_test_sync.cpp b/examples/speed_test_sync.cpp index 6215563..31350b6 100644 --- a/examples/speed_test_sync.cpp +++ b/examples/speed_test_sync.cpp @@ -20,24 +20,23 @@ int main(int argc, char* argv[]) { Redox rdx; if(!rdx.connect("localhost", 6379)) return 1; - if(rdx.commandSync("SET simple_loop:count 0")) { + if(rdx.commandSync({"SET", "simple_loop:count", "0"})) { cout << "Reset the counter to zero." << endl; } else { cerr << "Failed to reset counter." << endl; return 1; } - string cmd_str = "INCR simple_loop:count"; double t = 5; // s - cout << "Sending \"" << cmd_str << "\" synchronously for " << t << "s..." << endl; + cout << "Sending \"" << "INCR simple_loop:count" << "\" synchronously for " << t << "s..." << endl; double t0 = time_s(); double t_end = t0 + t; int count = 0; while(time_s() < t_end) { - Command& c = rdx.commandSync(cmd_str); + Command& c = rdx.commandSync({"INCR", "simple_loop:count"}); if(!c.ok()) cerr << "Bad reply, code: " << c.status() << endl; c.free(); count++; diff --git a/include/redox/client.hpp b/include/redox/client.hpp index 00b2edc..175f9be 100644 --- a/include/redox/client.hpp +++ b/include/redox/client.hpp @@ -55,11 +55,12 @@ class Redox { public: // Connection states - static const int NOT_YET_CONNECTED = 0; - static const int CONNECTED = 1; - static const int DISCONNECTED = 2; - static const int CONNECT_ERROR = 3; - static const int DISCONNECT_ERROR = 4; + static const int NOT_YET_CONNECTED = 0; // Starting state + static const int CONNECTED = 1; // Successfully connected + static const int DISCONNECTED = 2; // Successfully disconnected + static const int CONNECT_ERROR = 3; // Error connecting + static const int DISCONNECT_ERROR = 4; // Disconnected on error + static const int INIT_ERROR = 5; // Failed to init data structures // ------------------------------------------------ // Core public API @@ -92,7 +93,7 @@ public: * Connects to Redis over a unix socket and starts an event loop in a separate * thread. Returns true once everything is ready, or false on failure. */ - bool connect_unix( + bool connectUnix( const std::string& path = REDIS_DEFAULT_PATH, std::function connection_callback = nullptr); @@ -119,30 +120,32 @@ public: * exactly once. The Command object is provided to the callback, and the * memory for it is automatically freed when the callback returns. */ + template void command( - const std::string& cmd, + const std::vector& cmd, const std::function&)>& callback = nullptr ); /** * Asynchronously runs a command and ignores any errors or replies. */ - void command(const std::string& cmd); + void command(const std::vector& cmd); /** * Synchronously runs a command, returning the Command object only once * a reply is received or there is an error. The user is responsible for * calling .free() on the returned Command object. */ + template - Command& commandSync(const std::string& cmd); + Command& commandSync(const std::vector& cmd); /** * Synchronously runs a command, returning only once a reply is received * or there's an error. Returns true on successful reply, false on error. */ - bool commandSync(const std::string& cmd); + bool commandSync(const std::vector& cmd); /** * Creates an asynchronous command that is run every [repeat] seconds, @@ -150,9 +153,10 @@ public: * command is run only once. The user is responsible for calling .free() * on the returned Command object. */ + template Command& commandLoop( - const std::string& cmd, + const std::vector& cmd, const std::function&)>& callback, double repeat, double after = 0.0 @@ -164,15 +168,32 @@ public: * or error, and the Command object memory is automatically freed * after the callback returns. */ + template void commandDelayed( - const std::string& cmd, + const std::vector& cmd, const std::function&)>& callback, double after ); // ------------------------------------------------ - // Wrapper methods for convenience only + // Utility methods + // ------------------------------------------------ + + /** + * Given a vector of strings, returns a string of the concatenated elements, separated + * by the delimiter. Useful for printing out a command string from a vector. + */ + std::string vecToStr(const std::vector& vec, const char delimiter = ' '); + + /** + * Given a command string, returns a vector of strings by splitting the input by + * the delimiter. Useful for turning a string input into a command. + */ + std::vector strToVec(const std::string& s, const char delimiter = ' '); + + // ------------------------------------------------ + // Command wrapper methods for convenience only // ------------------------------------------------ /** @@ -225,9 +246,12 @@ private: // One stop shop for creating commands. The base of all public // methods that run commands. + + // One stop shop for creating commands. The base of all public + // methods that run commands. template Command& createCommand( - const std::string& cmd, + const std::vector& cmd, const std::function&)>& callback = nullptr, double repeat = 0.0, double after = 0.0, @@ -236,8 +260,8 @@ private: // Setup code for the constructors // Return true on success, false on failure - bool init_ev(); - bool init_hiredis(); + bool initEv(); + bool initHiredis(); // Callbacks invoked on server connection/disconnection static void connectedCallback(const redisAsyncContext* c, int status); @@ -380,7 +404,7 @@ private: template Command& Redox::createCommand( - const std::string& cmd, + const std::vector& cmd, const std::function&)>& callback, double repeat, double after, @@ -411,7 +435,7 @@ Command& Redox::createCommand( template void Redox::command( - const std::string& cmd, + const std::vector& cmd, const std::function&)>& callback ) { createCommand(cmd, callback); @@ -419,7 +443,7 @@ void Redox::command( template Command& Redox::commandLoop( - const std::string& cmd, + const std::vector& cmd, const std::function&)>& callback, double repeat, double after @@ -429,7 +453,7 @@ Command& Redox::commandLoop( template void Redox::commandDelayed( - const std::string& cmd, + const std::vector& cmd, const std::function&)>& callback, double after ) { @@ -437,7 +461,7 @@ void Redox::commandDelayed( } template -Command& Redox::commandSync(const std::string& cmd) { +Command& Redox::commandSync(const std::vector& cmd) { auto& c = createCommand(cmd, nullptr, 0, 0, false); c.wait(); return c; diff --git a/include/redox/command.hpp b/include/redox/command.hpp index 6c574a7..6da27eb 100644 --- a/include/redox/command.hpp +++ b/include/redox/command.hpp @@ -89,12 +89,12 @@ public: /** * Returns the command string represented by this object. */ - const std::string& cmd() const { return cmd_; }; + std::string cmd() const; // Allow public access to constructed data Redox* const rdx_; const long id_; - const std::string cmd_; + const std::vector cmd_; const double repeat_; const double after_; const bool free_memory_; @@ -104,7 +104,7 @@ private: Command( Redox* rdx, long id, - const std::string& cmd, + const std::vector& cmd, const std::function&)>& callback, double repeat, double after, bool free_memory, diff --git a/include/redox/subscriber.hpp b/include/redox/subscriber.hpp index 2b1b373..ff54ce0 100644 --- a/include/redox/subscriber.hpp +++ b/include/redox/subscriber.hpp @@ -52,12 +52,12 @@ public: } /** - * Same as .connect_unix() on a Redox instance. + * Same as .connectUnix() on a Redox instance. */ bool connect_unix( const std::string& path = REDIS_DEFAULT_PATH, std::function connection_callback = nullptr) { - return rdx_.connect_unix(path, connection_callback); + return rdx_.connectUnix(path, connection_callback); } /** diff --git a/src/client.cpp b/src/client.cpp index f05615d..165782a 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -19,6 +19,7 @@ */ #include +#include #include "client.hpp" using namespace std; @@ -28,7 +29,7 @@ namespace redox { Redox::Redox( ostream& log_stream, log::Level log_level -) : logger_(log_stream, log_level) {} +) : logger_(log_stream, log_level), evloop_(nullptr) {} bool Redox::connect( const std::string& host, const int port, @@ -39,12 +40,12 @@ bool Redox::connect( port_ = port; user_connection_callback_ = connection_callback; - if(!init_ev()) return false; + if(!initEv()) return false; // Connect over TCP ctx_ = redisAsyncConnect(host.c_str(), port); - if(!init_hiredis()) return false; + if(!initHiredis()) return false; event_loop_thread_ = thread([this] { runEventLoop(); }); @@ -59,7 +60,7 @@ bool Redox::connect( return connect_state_ == CONNECTED; } -bool Redox::connect_unix( +bool Redox::connectUnix( const std::string& path, std::function connection_callback ) { @@ -67,12 +68,12 @@ bool Redox::connect_unix( path_ = path; user_connection_callback_ = connection_callback; - if(!init_ev()) return false; + if(!initEv()) return false; // Connect over unix sockets ctx_ = redisAsyncConnectUnix(path.c_str()); - if(!init_hiredis()) return false; + if(!initHiredis()) return false; event_loop_thread_ = thread([this] { runEventLoop(); }); @@ -106,10 +107,11 @@ void Redox::wait() { Redox::~Redox() { // Bring down the event loop - stop(); + if(running_ == true) { stop(); } if(event_loop_thread_.joinable()) event_loop_thread_.join(); - ev_loop_destroy(evloop_); + + if(evloop_ != nullptr) ev_loop_destroy(evloop_); } void Redox::connectedCallback(const redisAsyncContext* ctx, int status) { @@ -148,30 +150,52 @@ void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) { if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_); } -bool Redox::init_ev() { +bool Redox::initEv() { signal(SIGPIPE, SIG_IGN); evloop_ = ev_loop_new(EVFLAG_AUTO); + if(evloop_ == nullptr) { + logger_.fatal() << "Could not create a libev event loop."; + connect_state_ = INIT_ERROR; + connect_waiter_.notify_all(); + return false; + } ev_set_userdata(evloop_, (void*)this); // Back-reference return true; } -bool Redox::init_hiredis() { +bool Redox::initHiredis() { ctx_->data = (void*)this; // Back-reference if (ctx_->err) { - logger_.error() << "Could not create a hiredis context: " << ctx_->errstr; - connect_state_ = CONNECT_ERROR; + logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr; + connect_state_ = INIT_ERROR; connect_waiter_.notify_all(); return false; } // Attach event loop to hiredis - redisLibevAttach(evloop_, ctx_); + if(redisLibevAttach(evloop_, ctx_) != REDIS_OK) { + logger_.fatal() << "Could not attach libev event loop to hiredis."; + connect_state_ = INIT_ERROR; + connect_waiter_.notify_all(); + return false; + } // Set the callbacks to be invoked on server connection/disconnection - redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback); - redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback); + if(redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) { + logger_.fatal() << "Could not attach connect callback to hiredis."; + connect_state_ = INIT_ERROR; + connect_waiter_.notify_all(); + return false; + } + + if(redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) { + logger_.fatal() << "Could not attach disconnect callback to hiredis."; + connect_state_ = INIT_ERROR; + connect_waiter_.notify_all(); + return false; + } return true; } @@ -249,7 +273,6 @@ void Redox::runEventLoop() { logger_.info() << "Event thread exited."; } - template Command* Redox::findCommand(long id) { @@ -284,32 +307,21 @@ bool Redox::submitToServer(Command* c) { Redox* rdx = c->rdx_; c->pending_++; - // Process binary data if trailing quotation. This is a limited implementation - // to allow binary data between the first and the last quotes of the command string, - // if the very last character of the command is a quote ('"'). - if(c->cmd_[c->cmd_.size()-1] == '"') { - - // Indices of the quotes - size_t first = c->cmd_.find('"'); - size_t last = c->cmd_.size()-1; - - // Proceed only if the first and last quotes are different - if(first != last) { - - string format = c->cmd_.substr(0, first) + "%b"; - string value = c->cmd_.substr(first+1, last-first-1); - if (redisAsyncCommand(rdx->ctx_, commandCallback, (void*) c->id_, format.c_str(), value.c_str(), value.size()) != REDIS_OK) { - rdx->logger_.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx_->errstr; - c->reply_status_ = Command::SEND_ERROR; - c->invoke(); - return false; - } - return true; - } - } - - if (redisAsyncCommand(rdx->ctx_, commandCallback, (void*) c->id_, c->cmd_.c_str()) != REDIS_OK) { - rdx->logger_.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx_->errstr; + // Construct a char** from the vector + vector argv; + transform(c->cmd_.begin(), c->cmd_.end(), back_inserter(argv), + [](const string& s){ return s.c_str(); } + ); + + // Construct a size_t* of string lengths from the vector + vector argvlen; + transform(c->cmd_.begin(), c->cmd_.end(), back_inserter(argvlen), + [](const string& s) { return s.size(); } + ); + + if(redisAsyncCommandArgv(rdx->ctx_, commandCallback, (void*) c->id_, + argv.size(), &argv[0], &argvlen[0]) != REDIS_OK) { + rdx->logger_.error() << "Could not send \"" << c->cmd() << "\": " << rdx->ctx_->errstr; c->reply_status_ = Command::SEND_ERROR; c->invoke(); return false; @@ -498,11 +510,31 @@ Redox::getCommandMap>() { return commands_unordered_set_st // Helpers // ---------------------------- -void Redox::command(const std::string& cmd) { +string Redox::vecToStr(const vector& vec, const char delimiter) { + string str; + for(size_t i = 0; i < vec.size() - 1; i++) + str += vec[i] + delimiter; + str += vec[vec.size()-1]; + return str; +} + +vector Redox::strToVec(const string& s, const char delimiter) { + vector vec; + size_t last = 0; + size_t next = 0; + while ((next = s.find(delimiter, last)) != string::npos) { + vec.push_back(s.substr(last, next-last)); + last = next + 1; + } + vec.push_back(s.substr(last)); + return vec; +} + +void Redox::command(const std::vector& cmd) { command(cmd, nullptr); } -bool Redox::commandSync(const string& cmd) { +bool Redox::commandSync(const std::vector& cmd) { auto& c = commandSync(cmd); bool succeeded = c.ok(); c.free(); @@ -511,7 +543,7 @@ bool Redox::commandSync(const string& cmd) { string Redox::get(const string& key) { - Command& c = commandSync("GET \"" + key + '"'); + Command& c = commandSync({"GET", key}); if(!c.ok()) { throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + to_string(c.status())); } @@ -521,15 +553,15 @@ string Redox::get(const string& key) { }; bool Redox::set(const string& key, const string& value) { - return commandSync("SET " + key + " \"" + value + '"'); + return commandSync({"SET", key, value}); } bool Redox::del(const string& key) { - return commandSync("DEL \"" + key + '"'); + return commandSync({"DEL", key}); } void Redox::publish(const string& topic, const string& msg) { - command("PUBLISH " + topic + " \"" + msg + '"'); + command({"PUBLISH", topic, msg}); } } // End namespace redis diff --git a/src/command.cpp b/src/command.cpp index 9587545..d8d2d5b 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -33,7 +33,7 @@ template Command::Command( Redox* rdx, long id, - const std::string& cmd, + const std::vector& cmd, const std::function&)>& callback, double repeat, double after, bool free_memory, log::Logger& logger ) : rdx_(rdx), id_(id), cmd_(cmd), repeat_(repeat), after_(after), free_memory_(free_memory), @@ -113,12 +113,17 @@ template ReplyT Command::reply() { std::lock_guard lg(reply_guard_); if (!ok()) { - logger_.warning() << cmd_ << ": Accessing reply value while status != OK."; + logger_.warning() << cmd() << ": Accessing reply value while status != OK."; } return reply_val_; } template +std::string Command::cmd() const { + return rdx_->vecToStr(cmd_); +}; + +template bool Command::isExpectedReply(int type) { if(reply_obj_->type == type) { @@ -128,7 +133,7 @@ bool Command::isExpectedReply(int type) { if(checkErrorReply() || checkNilReply()) return false; - logger_.error() << cmd_ << ": Received reply of type " << reply_obj_->type + logger_.error() << cmd() << ": Received reply of type " << reply_obj_->type << ", expected type " << type << "."; reply_status_ = WRONG_TYPE; return false; @@ -144,7 +149,7 @@ bool Command::isExpectedReply(int typeA, int typeB) { if(checkErrorReply() || checkNilReply()) return false; - logger_.error() << cmd_ << ": Received reply of type " << reply_obj_->type + logger_.error() << cmd() << ": Received reply of type " << reply_obj_->type << ", expected type " << typeA << " or " << typeB << "."; reply_status_ = WRONG_TYPE; return false; @@ -154,7 +159,7 @@ template bool Command::checkErrorReply() { if (reply_obj_->type == REDIS_REPLY_ERROR) { - logger_.error() << cmd_ << ": " << reply_obj_->str; + logger_.error() << cmd() << ": " << reply_obj_->str; reply_status_ = ERROR_REPLY; return true; } @@ -165,7 +170,7 @@ template bool Command::checkNilReply() { if (reply_obj_->type == REDIS_REPLY_NIL) { - logger_.warning() << cmd_ << ": Nil reply."; + logger_.warning() << cmd() << ": Nil reply."; reply_status_ = NIL_REPLY; return true; } diff --git a/src/subscriber.cpp b/src/subscriber.cpp index 5deb0de..fdba9a4 100644 --- a/src/subscriber.cpp +++ b/src/subscriber.cpp @@ -99,7 +99,7 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic, function err_callback ) { - Command& sub_cmd = rdx_.commandLoop(cmd_name + " " + topic, + Command& sub_cmd = rdx_.commandLoop({cmd_name, topic}, [this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command& c) { if (!c.ok()) { @@ -191,7 +191,7 @@ void Subscriber::psubscribe(const string topic, void Subscriber::unsubscribeBase(const string cmd_name, const string topic, function err_callback ) { - rdx_.command(cmd_name + " " + topic, + rdx_.command({cmd_name, topic}, [topic, err_callback](Command& c) { if(!c.ok()) { if (err_callback) err_callback(topic, c.status()); diff --git a/test/test.cpp b/test/test.cpp index 46dadd7..73fd603 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -44,7 +44,7 @@ protected: rdx.connect("localhost", 6379); // Clear all keys used by the tests here - rdx.command("DEL redox_test:a"); + rdx.command({"DEL", "redox_test:a"}); } virtual ~RedoxTest() { } @@ -113,7 +113,7 @@ protected: void print_and_check_sync(Command& c, const ReplyT& value) { ASSERT_TRUE(c.ok()); EXPECT_EQ(c.reply(), value); - cout << "[SYNC] " << c.cmd_ << ": " << c.reply() << endl; + cout << "[SYNC] " << c.cmd() << ": " << c.reply() << endl; c.free(); } }; @@ -123,31 +123,31 @@ protected: // ------------------------------------------- TEST_F(RedoxTest, GetSet) { - rdx.command("SET redox_test:a apple", print_and_check("OK")); - rdx.command("GET redox_test:a", print_and_check("apple")); + rdx.command({"SET", "redox_test:a", "apple"}, print_and_check("OK")); + rdx.command({"GET", "redox_test:a"}, print_and_check("apple")); wait_for_replies(); } TEST_F(RedoxTest, Delete) { - rdx.command("SET redox_test:a apple", print_and_check("OK")); - rdx.command("DEL redox_test:a", print_and_check(1)); - rdx.command("GET redox_test:a", check(nullptr)); + rdx.command({"SET", "redox_test:a", "apple"}, print_and_check("OK")); + rdx.command({"DEL", "redox_test:a"}, print_and_check(1)); + rdx.command({"GET", "redox_test:a"}, check(nullptr)); wait_for_replies(); } TEST_F(RedoxTest, Incr) { int count = 100; for(int i = 0; i < count; i++) { - rdx.command("INCR redox_test:a", check(i+1)); + rdx.command({"INCR", "redox_test:a"}, check(i+1)); } - rdx.command("GET redox_test:a", print_and_check(to_string(count))); + rdx.command({"GET", "redox_test:a"}, print_and_check(to_string(count))); wait_for_replies(); } TEST_F(RedoxTest, Delayed) { - rdx.commandDelayed("INCR redox_test:a", check(1), 0.1); + rdx.commandDelayed({"INCR", "redox_test:a"}, check(1), 0.1); this_thread::sleep_for(chrono::milliseconds(150)); - rdx.command("GET redox_test:a", print_and_check(to_string(1))); + rdx.command({"GET", "redox_test:a"}, print_and_check(to_string(1))); wait_for_replies(); } @@ -155,7 +155,7 @@ TEST_F(RedoxTest, Loop) { int count = 0; int target_count = 20; double dt = 0.005; - Command& cmd = rdx.commandLoop("INCR redox_test:a", + Command& cmd = rdx.commandLoop({"INCR", "redox_test:a"}, [this, &count](Command& c) { check(++count)(c); }, @@ -166,7 +166,7 @@ TEST_F(RedoxTest, Loop) { this_thread::sleep_for(std::chrono::duration(wait_time)); cmd.free(); - rdx.command("GET redox_test:a", print_and_check(to_string(target_count))); + rdx.command({"GET", "redox_test:a"}, print_and_check(to_string(target_count))); wait_for_replies(); } @@ -175,24 +175,24 @@ TEST_F(RedoxTest, Loop) { // ------------------------------------------- TEST_F(RedoxTest, GetSetSync) { - print_and_check_sync(rdx.commandSync("SET redox_test:a apple"), "OK"); - print_and_check_sync(rdx.commandSync("GET redox_test:a"), "apple"); + print_and_check_sync(rdx.commandSync({"SET", "redox_test:a", "apple"}), "OK"); + print_and_check_sync(rdx.commandSync({"GET", "redox_test:a"}), "apple"); rdx.disconnect(); } TEST_F(RedoxTest, DeleteSync) { - print_and_check_sync(rdx.commandSync("SET redox_test:a apple"), "OK"); - print_and_check_sync(rdx.commandSync("DEL redox_test:a"), 1); - check_sync(rdx.commandSync("GET redox_test:a"), nullptr); + print_and_check_sync(rdx.commandSync({"SET", "redox_test:a", "apple"}), "OK"); + print_and_check_sync(rdx.commandSync({"DEL", "redox_test:a"}), 1); + check_sync(rdx.commandSync({"GET", "redox_test:a"}), nullptr); rdx.disconnect(); } TEST_F(RedoxTest, IncrSync) { int count = 100; for(int i = 0; i < count; i++) { - check_sync(rdx.commandSync("INCR redox_test:a"), i+1); + check_sync(rdx.commandSync({"INCR", "redox_test:a"}), i+1); } - print_and_check_sync(rdx.commandSync("GET redox_test:a"), to_string(count)); + print_and_check_sync(rdx.commandSync({"GET", "redox_test:a"}), to_string(count)); rdx.disconnect(); }