diff --git a/examples/speed_test_async.cpp b/examples/speed_test_async.cpp index ab7df0b..9de6e92 100644 --- a/examples/speed_test_async.cpp +++ b/examples/speed_test_async.cpp @@ -21,7 +21,8 @@ int main(int argc, char* argv[]) { Redox rdx = {"/var/run/redis/redis.sock", nullptr}; if(!rdx.start()) return 1; - if(rdx.command_blocking("SET simple_loop:count 0")) { + bool status = rdx.command_blocking("SET simple_loop:count 0"); + if(status) { cout << "Reset the counter to zero." << endl; } else { cerr << "Failed to reset counter." << endl; @@ -39,7 +40,7 @@ int main(int argc, char* argv[]) { double t0 = time_s(); atomic_int count(0); - Command& cmd = rdx.command( + Command& cmd = rdx.command_looping( cmd_str, [&count, &rdx](Command& c) { if(!c.ok()) { @@ -54,21 +55,17 @@ int main(int argc, char* argv[]) { this_thread::sleep_for(chrono::microseconds((int)(t*1e6))); cmd.cancel(); - rdx.command("GET simple_loop:count", [&](Command& c) { - if(!c.ok()) return; - long final_count = stol(c.reply()); + long final_count = stol(rdx.get("simple_loop:count")); - double t_elapsed = time_s() - t0; - double actual_freq = (double)count / t_elapsed; - cout << "Sent " << count << " commands in " << t_elapsed << "s, " - << "that's " << actual_freq << " commands/s." << endl; + double t_elapsed = time_s() - t0; + double actual_freq = (double)count / t_elapsed; - cout << "Final value of counter: " << final_count << endl; + cout << "Sent " << count << " commands in " << t_elapsed << "s, " + << "that's " << actual_freq << " commands/s." << endl; - rdx.stop_signal(); - }); + cout << "Final value of counter: " << final_count << endl; - rdx.block(); + rdx.stop(); return 0; } diff --git a/examples/speed_test_async_multi.cpp b/examples/speed_test_async_multi.cpp index 582ca99..3ff8aa6 100644 --- a/examples/speed_test_async_multi.cpp +++ b/examples/speed_test_async_multi.cpp @@ -43,7 +43,7 @@ int main(int argc, char* argv[]) { vector*> commands; for(int i = 0; i < parallel; i++) { - commands.push_back(&rdx.command( + commands.push_back(&rdx.command_looping( cmd_str, [&count, &rdx](Command& c) { if(!c.ok()) { diff --git a/src/command.cpp b/src/command.cpp index 435963c..3736b5a 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -26,6 +26,17 @@ Command::Command( } template +Command& Command::block() { + std::unique_lock lk(blocker_lock_); + blocker_.wait(lk, [this]() { + logger_.info() << "checking blocker: " << blocking_done_; + return blocking_done_.load(); }); + logger_.info() << "returning from block"; + blocking_done_ = {false}; + return *this; +} + +template void Command::processReply(redisReply* r) { free_guard_.lock(); @@ -33,9 +44,13 @@ void Command::processReply(redisReply* r) { reply_obj_ = r; parseReplyObject(); invoke(); - +// logger_.info() << "reply status " << reply_status_; pending_--; + blocking_done_ = true; +// logger_.info() << "notifying blocker"; + blocker_.notify_all(); + // Allow free() method to free memory if (!free_memory_) { // logger.trace() << "Command memory not being freed, free_memory = " << free_memory; @@ -163,19 +178,18 @@ bool Command::checkNilReply() { template<> void Command::parseReplyObject() { + if(!checkErrorReply()) reply_status_ = OK_REPLY; reply_val_ = reply_obj_; } template<> void Command::parseReplyObject() { - if(!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) return; reply_val_ = {reply_obj_->str, static_cast(reply_obj_->len)}; } template<> void Command::parseReplyObject() { - if(!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) return; reply_val_ = reply_obj_->str; } diff --git a/src/command.hpp b/src/command.hpp index 4961a12..321f507 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -17,10 +18,6 @@ namespace redox { class Redox; -//class Command; - -//template -//using CallbackT = std::function&)>; /** * The Command class represents a single command string to be sent to @@ -55,25 +52,30 @@ public: void cancel() { canceled_ = true; } /** + * This method returns once this command's callback has been invoked + * (or would have been invoked if there is none) since the last call + * to block(). If it is the first call, then returns once the callback + * is invoked for the first time. + */ + Command& block(); + + /** * Returns true if the command has been canceled. */ bool canceled() const { return canceled_; } /** * Returns the reply status of this command. - * Use ONLY with command_blocking. */ - int status() const { return reply_status_; }; + int status() const { return reply_status_; } /** * Returns true if this command got a successful reply. - * Use ONLY with command_blocking. */ bool ok() const { return reply_status_ == OK_REPLY; } /** * Returns the reply value, if the reply was successful (ok() == true). - * Use ONLY with command_blocking. */ const ReplyT& reply() const; @@ -145,6 +147,11 @@ private: // Make sure we don't free resources until details taken care of std::mutex free_guard_; + // For synchronous use + std::condition_variable blocker_; + std::mutex blocker_lock_; + std::atomic_bool blocking_done_ = {false}; + // Passed on from Redox class log::Logger& logger_; diff --git a/src/redox.cpp b/src/redox.cpp index 9a4a69d..2ecdbea 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -380,7 +380,7 @@ void Redox::subscribe_raw(const string cmd_name, const string topic, // Start pubsub mode. No non-sub/unsub commands can be emitted by this client. pubsub_mode = true; - command(cmd_name + " " + topic, + command_looping(cmd_name + " " + topic, [this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command& c) { if(!c.ok()) { @@ -402,6 +402,8 @@ void Redox::subscribe_raw(const string cmd_name, const string topic, // } // cout << "------" << endl; + // TODO cancel this command on unsubscription? + // If the last entry is an integer, then it is a [p]sub/[p]unsub command if((reply->type == REDIS_REPLY_ARRAY) && (reply->element[reply->elements-1]->type == REDIS_REPLY_INTEGER)) { @@ -567,12 +569,8 @@ Redox::get_command_map>() { return commands_unordered_set_ // Helpers // ---------------------------- -void Redox::command(const string& cmd) { - command(cmd); -} - bool Redox::command_blocking(const string& cmd) { - Command& c = command_blocking(cmd); + auto& c = command_blocking(cmd); bool succeeded = c.ok(); c.free(); return succeeded; diff --git a/src/redox.hpp b/src/redox.hpp index b268b13..de96811 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -87,32 +87,44 @@ public: void stop(); /** - * Create an asynchronous Redis command to be executed. Return a pointer to a - * Command object that represents this command. If the command succeeded, the - * callback is invoked with a reference to the reply. If something went wrong, - * the error_callback is invoked with an error_code. One of the two is guaranteed - * to be invoked. The method is templated by the expected data type of the reply, - * and can be one of {redisReply*, string, char*, int, long long int, nullptr_t}. - * - * cmd: The command to be run. - * callback: A function invoked on a successful reply from the server. - * error_callback: A function invoked on some error state. - * repeat: If non-zero, executes the command continuously at the given rate - * in seconds, until cancel() is called on the Command object. - * after: If non-zero, executes the command after the given delay in seconds. - * free_memory: If true (default), Redox automatically frees the Command object and - * reply from the server after a callback is invoked. If false, the - * user is responsible for calling free() on the Command object. + * Asynchronously runs a command and invokes the callback when a reply is + * received or there is an error. The callback is guaranteed to be invoked + * exactly once. The Command object is provided to the callback, and the + * memory for it is automatically freed when the callback returns. */ template - Command& command( - const std::string& cmd, - const std::function&)>& callback = nullptr, - double repeat = 0.0, - double after = 0.0, - bool free_memory = true + void command( + const std::string& cmd, + const std::function&)>& callback = nullptr ); + /** + * Asynchronously runs a command and ignores any errors or replies. + */ + void command(const std::string& cmd) { command(cmd, nullptr); } + + /** + * Synchronously runs a command, returning the Command object only once + * a reply is received or there is an error. The user is responsible for + * calling the Command object's .free() method when done with it. + */ + template + Command& command_blocking(const std::string& cmd); + + /** + * Synchronously runs a command, returning only once a reply is received + * or there's an error. The return value is true if the command got a + * successful reply, and false if something went wrong. + */ + bool command_blocking(const std::string& cmd); + + template + Command& command_looping( + const std::string& cmd, + const std::function&)>& callback, + double repeat, + double after = 0.0 + ); /** * A wrapper around command() for synchronous use. Waits for a reply, populates it @@ -121,8 +133,8 @@ public: * status() will give the error code, and reply() will return the reply data if * the call succeeded. */ - template - Command& command_blocking(const std::string& cmd); +// template +// Command& command_blocking(const std::string& cmd); /** * Return the total number of successful commands processed by this Redox instance. @@ -146,13 +158,13 @@ public: * Non-templated version of command in case you really don't care * about the reply and just want to send something off. */ - void command(const std::string& command); +// void command(const std::string& command); /** * Non-templated version of command_blocking in case you really don't * care about the reply. Returns true if succeeded, false if error. */ - bool command_blocking(const std::string& command); +// bool command_blocking(const std::string& command); /** * Redis GET command wrapper - return the value for the given key, or throw @@ -265,6 +277,15 @@ public: private: + template + Command& createCommand( + const std::string& cmd, + const std::function&)>& callback = nullptr, + double repeat = 0.0, + double after = 0.0, + bool free_memory = true + ); + // Setup code for the constructors void init_ev(); void init_hiredis(); @@ -373,7 +394,7 @@ private: template -Command& Redox::command( +Command& Redox::createCommand( const std::string& cmd, const std::function&)>& callback, double repeat, @@ -409,6 +430,24 @@ Command& Redox::command( } template +void Redox::command( + const std::string& cmd, + const std::function&)>& callback +) { + createCommand(cmd, callback); +} + +template +Command& Redox::command_looping( + const std::string& cmd, + const std::function&)>& callback, + double repeat, + double after +) { + return createCommand(cmd, callback, repeat, after); +} + +template Command& Redox::command_blocking(const std::string& cmd) { std::condition_variable cv; @@ -416,7 +455,7 @@ Command& Redox::command_blocking(const std::string& cmd) { std::unique_lock lk(m); std::atomic_bool done = {false}; - Command& c = command(cmd, + Command& c = createCommand(cmd, [&cv, &done](Command& cmd_obj) { done = true; cv.notify_one(); diff --git a/test/test.cpp b/test/test.cpp index 6cc4521..bdb7b19 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -9,8 +9,9 @@ namespace { -using namespace redox; using namespace std; +using redox::Redox; +using redox::Command; // ------------------------------------------ // The fixture for testing class Redox. @@ -43,8 +44,8 @@ protected: mutex cmd_waiter_lock; // To make the callback code nicer - template - using Callback = std::function; + template + using Callback = std::function&)>; /** * Helper function that returns a command callback to print out the @@ -53,8 +54,9 @@ protected: template Callback check(const ReplyT& value) { cmd_count++; - return [this, value](const string& cmd, const ReplyT& reply) { - EXPECT_EQ(reply, value); + return [this, value](Command& c) { + EXPECT_TRUE(c.ok()); + if(c.ok()) EXPECT_EQ(c.reply(), value); cmd_count--; cmd_waiter.notify_all(); }; @@ -65,9 +67,9 @@ protected: */ template Callback print(Callback callback) { - return [callback](const string& cmd, const ReplyT& reply) { - cout << "[ASYNC] " << cmd << ": " << reply << endl; - callback(cmd, reply); + return [callback](Command& c) { + if(c.ok()) cout << "[ASYNC] " << c.cmd() << ": " << c.reply() << endl; + callback(c); }; } @@ -90,18 +92,18 @@ protected: }; template - void check_sync(Command* c, const ReplyT& value) { - ASSERT_TRUE(c->ok()); - EXPECT_EQ(c->reply(), value); - c->free(); + void check_sync(Command& c, const ReplyT& value) { + ASSERT_TRUE(c.ok()); + EXPECT_EQ(c.reply(), value); + c.free(); } template - void print_and_check_sync(Command* c, const ReplyT& value) { - ASSERT_TRUE(c->ok()); - EXPECT_EQ(c->reply(), value); - cout << "[SYNC] " << c->cmd_ << ": " << c->reply() << endl; - c->free(); + void print_and_check_sync(Command& c, const ReplyT& value) { + ASSERT_TRUE(c.ok()); + EXPECT_EQ(c.reply(), value); + cout << "[SYNC] " << c.cmd_ << ": " << c.reply() << endl; + c.free(); } }; @@ -132,9 +134,9 @@ TEST_F(RedoxTest, Incr) { } TEST_F(RedoxTest, Delayed) { - Command* c = rdx.command("INCR redox_test:a", check(1), nullptr, 0, 0.1); + Command& c = rdx.command_looping("INCR redox_test:a", check(1), 0, 0.1); this_thread::sleep_for(chrono::milliseconds(150)); - c->cancel(); + c.cancel(); rdx.command("GET redox_test:a", print_and_check(to_string(1))); wait_and_stop(); } @@ -143,14 +145,16 @@ TEST_F(RedoxTest, Loop) { int count = 0; int target_count = 100; double dt = 0.001; - Command* c = rdx.command("INCR redox_test:a", - [this, &count](const string& cmd, const int& reply) { - check(++count)(cmd, reply); - }, nullptr, dt); + Command& cmd = rdx.command_looping("INCR redox_test:a", + [this, &count](Command& c) { + check(++count)(c); + }, + dt + ); double wait_time = dt * (target_count - 0.5); this_thread::sleep_for(std::chrono::duration(wait_time)); - c->cancel(); + cmd.cancel(); rdx.command("GET redox_test:a", print_and_check(to_string(target_count))); wait_and_stop();