Commit 589eb3fc18505a95a92a78a3cbbfd6e3474f0efe

Authored by Hayk Martirosyan
1 parent c43f2747

Simplified API for .command()

Split up command into command, command_blocking, and command_looping.
The original command no longer returns anything, which clarifies how it
should be used in most cases. command_looping and command_blocking both
return a command object.

Also added a waiting CV to the Command object, though it is not used in
command_blocking yet.
examples/speed_test_async.cpp
... ... @@ -21,7 +21,8 @@ int main(int argc, char* argv[]) {
21 21 Redox rdx = {"/var/run/redis/redis.sock", nullptr};
22 22 if(!rdx.start()) return 1;
23 23  
24   - if(rdx.command_blocking("SET simple_loop:count 0")) {
  24 + bool status = rdx.command_blocking("SET simple_loop:count 0");
  25 + if(status) {
25 26 cout << "Reset the counter to zero." << endl;
26 27 } else {
27 28 cerr << "Failed to reset counter." << endl;
... ... @@ -39,7 +40,7 @@ int main(int argc, char* argv[]) {
39 40 double t0 = time_s();
40 41 atomic_int count(0);
41 42  
42   - Command<int>& cmd = rdx.command<int>(
  43 + Command<int>& cmd = rdx.command_looping<int>(
43 44 cmd_str,
44 45 [&count, &rdx](Command<int>& c) {
45 46 if(!c.ok()) {
... ... @@ -54,21 +55,17 @@ int main(int argc, char* argv[]) {
54 55 this_thread::sleep_for(chrono::microseconds((int)(t*1e6)));
55 56 cmd.cancel();
56 57  
57   - rdx.command<string>("GET simple_loop:count", [&](Command<string>& c) {
58   - if(!c.ok()) return;
59   - long final_count = stol(c.reply());
  58 + long final_count = stol(rdx.get("simple_loop:count"));
60 59  
61   - double t_elapsed = time_s() - t0;
62   - double actual_freq = (double)count / t_elapsed;
63 60  
64   - cout << "Sent " << count << " commands in " << t_elapsed << "s, "
65   - << "that's " << actual_freq << " commands/s." << endl;
  61 + double t_elapsed = time_s() - t0;
  62 + double actual_freq = (double)count / t_elapsed;
66 63  
67   - cout << "Final value of counter: " << final_count << endl;
  64 + cout << "Sent " << count << " commands in " << t_elapsed << "s, "
  65 + << "that's " << actual_freq << " commands/s." << endl;
68 66  
69   - rdx.stop_signal();
70   - });
  67 + cout << "Final value of counter: " << final_count << endl;
71 68  
72   - rdx.block();
  69 + rdx.stop();
73 70 return 0;
74 71 }
... ...
examples/speed_test_async_multi.cpp
... ... @@ -43,7 +43,7 @@ int main(int argc, char* argv[]) {
43 43  
44 44 vector<Command<int>*> commands;
45 45 for(int i = 0; i < parallel; i++) {
46   - commands.push_back(&rdx.command<int>(
  46 + commands.push_back(&rdx.command_looping<int>(
47 47 cmd_str,
48 48 [&count, &rdx](Command<int>& c) {
49 49 if(!c.ok()) {
... ...
src/command.cpp
... ... @@ -26,6 +26,17 @@ Command&lt;ReplyT&gt;::Command(
26 26 }
27 27  
28 28 template<class ReplyT>
  29 +Command<ReplyT>& Command<ReplyT>::block() {
  30 + std::unique_lock<std::mutex> lk(blocker_lock_);
  31 + blocker_.wait(lk, [this]() {
  32 + logger_.info() << "checking blocker: " << blocking_done_;
  33 + return blocking_done_.load(); });
  34 + logger_.info() << "returning from block";
  35 + blocking_done_ = {false};
  36 + return *this;
  37 +}
  38 +
  39 +template<class ReplyT>
29 40 void Command<ReplyT>::processReply(redisReply* r) {
30 41  
31 42 free_guard_.lock();
... ... @@ -33,9 +44,13 @@ void Command&lt;ReplyT&gt;::processReply(redisReply* r) {
33 44 reply_obj_ = r;
34 45 parseReplyObject();
35 46 invoke();
36   -
  47 +// logger_.info() << "reply status " << reply_status_;
37 48 pending_--;
38 49  
  50 + blocking_done_ = true;
  51 +// logger_.info() << "notifying blocker";
  52 + blocker_.notify_all();
  53 +
39 54 // Allow free() method to free memory
40 55 if (!free_memory_) {
41 56 // logger.trace() << "Command memory not being freed, free_memory = " << free_memory;
... ... @@ -163,19 +178,18 @@ bool Command&lt;ReplyT&gt;::checkNilReply() {
163 178  
164 179 template<>
165 180 void Command<redisReply*>::parseReplyObject() {
  181 + if(!checkErrorReply()) reply_status_ = OK_REPLY;
166 182 reply_val_ = reply_obj_;
167 183 }
168 184  
169 185 template<>
170 186 void Command<string>::parseReplyObject() {
171   -
172 187 if(!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) return;
173 188 reply_val_ = {reply_obj_->str, static_cast<size_t>(reply_obj_->len)};
174 189 }
175 190  
176 191 template<>
177 192 void Command<char*>::parseReplyObject() {
178   -
179 193 if(!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) return;
180 194 reply_val_ = reply_obj_->str;
181 195 }
... ...
src/command.hpp
... ... @@ -8,6 +8,7 @@
8 8 #include <functional>
9 9 #include <atomic>
10 10 #include <mutex>
  11 +#include <condition_variable>
11 12  
12 13 #include <hiredis/adapters/libev.h>
13 14 #include <hiredis/async.h>
... ... @@ -17,10 +18,6 @@
17 18 namespace redox {
18 19  
19 20 class Redox;
20   -//class Command;
21   -
22   -//template <typename ReplyT>
23   -//using CallbackT = std::function<void(Command<ReplyT>&)>;
24 21  
25 22 /**
26 23 * The Command class represents a single command string to be sent to
... ... @@ -55,25 +52,30 @@ public:
55 52 void cancel() { canceled_ = true; }
56 53  
57 54 /**
  55 + * This method returns once this command's callback has been invoked
  56 + * (or would have been invoked if there is none) since the last call
  57 + * to block(). If it is the first call, then returns once the callback
  58 + * is invoked for the first time.
  59 + */
  60 + Command<ReplyT>& block();
  61 +
  62 + /**
58 63 * Returns true if the command has been canceled.
59 64 */
60 65 bool canceled() const { return canceled_; }
61 66  
62 67 /**
63 68 * Returns the reply status of this command.
64   - * Use ONLY with command_blocking.
65 69 */
66   - int status() const { return reply_status_; };
  70 + int status() const { return reply_status_; }
67 71  
68 72 /**
69 73 * Returns true if this command got a successful reply.
70   - * Use ONLY with command_blocking.
71 74 */
72 75 bool ok() const { return reply_status_ == OK_REPLY; }
73 76  
74 77 /**
75 78 * Returns the reply value, if the reply was successful (ok() == true).
76   - * Use ONLY with command_blocking.
77 79 */
78 80 const ReplyT& reply() const;
79 81  
... ... @@ -145,6 +147,11 @@ private:
145 147 // Make sure we don't free resources until details taken care of
146 148 std::mutex free_guard_;
147 149  
  150 + // For synchronous use
  151 + std::condition_variable blocker_;
  152 + std::mutex blocker_lock_;
  153 + std::atomic_bool blocking_done_ = {false};
  154 +
148 155 // Passed on from Redox class
149 156 log::Logger& logger_;
150 157  
... ...
src/redox.cpp
... ... @@ -380,7 +380,7 @@ void Redox::subscribe_raw(const string cmd_name, const string topic,
380 380 // Start pubsub mode. No non-sub/unsub commands can be emitted by this client.
381 381 pubsub_mode = true;
382 382  
383   - command<redisReply*>(cmd_name + " " + topic,
  383 + command_looping<redisReply*>(cmd_name + " " + topic,
384 384 [this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command<redisReply*>& c) {
385 385  
386 386 if(!c.ok()) {
... ... @@ -402,6 +402,8 @@ void Redox::subscribe_raw(const string cmd_name, const string topic,
402 402 // }
403 403 // cout << "------" << endl;
404 404  
  405 + // TODO cancel this command on unsubscription?
  406 +
405 407 // If the last entry is an integer, then it is a [p]sub/[p]unsub command
406 408 if((reply->type == REDIS_REPLY_ARRAY) &&
407 409 (reply->element[reply->elements-1]->type == REDIS_REPLY_INTEGER)) {
... ... @@ -567,12 +569,8 @@ Redox::get_command_map&lt;unordered_set&lt;string&gt;&gt;() { return commands_unordered_set_
567 569 // Helpers
568 570 // ----------------------------
569 571  
570   -void Redox::command(const string& cmd) {
571   - command<redisReply*>(cmd);
572   -}
573   -
574 572 bool Redox::command_blocking(const string& cmd) {
575   - Command<redisReply*>& c = command_blocking<redisReply*>(cmd);
  573 + auto& c = command_blocking<redisReply*>(cmd);
576 574 bool succeeded = c.ok();
577 575 c.free();
578 576 return succeeded;
... ...
src/redox.hpp
... ... @@ -87,32 +87,44 @@ public:
87 87 void stop();
88 88  
89 89 /**
90   - * Create an asynchronous Redis command to be executed. Return a pointer to a
91   - * Command object that represents this command. If the command succeeded, the
92   - * callback is invoked with a reference to the reply. If something went wrong,
93   - * the error_callback is invoked with an error_code. One of the two is guaranteed
94   - * to be invoked. The method is templated by the expected data type of the reply,
95   - * and can be one of {redisReply*, string, char*, int, long long int, nullptr_t}.
96   - *
97   - * cmd: The command to be run.
98   - * callback: A function invoked on a successful reply from the server.
99   - * error_callback: A function invoked on some error state.
100   - * repeat: If non-zero, executes the command continuously at the given rate
101   - * in seconds, until cancel() is called on the Command object.
102   - * after: If non-zero, executes the command after the given delay in seconds.
103   - * free_memory: If true (default), Redox automatically frees the Command object and
104   - * reply from the server after a callback is invoked. If false, the
105   - * user is responsible for calling free() on the Command object.
  90 + * Asynchronously runs a command and invokes the callback when a reply is
  91 + * received or there is an error. The callback is guaranteed to be invoked
  92 + * exactly once. The Command object is provided to the callback, and the
  93 + * memory for it is automatically freed when the callback returns.
106 94 */
107 95 template<class ReplyT>
108   - Command<ReplyT>& command(
109   - const std::string& cmd,
110   - const std::function<void(Command<ReplyT>&)>& callback = nullptr,
111   - double repeat = 0.0,
112   - double after = 0.0,
113   - bool free_memory = true
  96 + void command(
  97 + const std::string& cmd,
  98 + const std::function<void(Command<ReplyT>&)>& callback = nullptr
114 99 );
115 100  
  101 + /**
  102 + * Asynchronously runs a command and ignores any errors or replies.
  103 + */
  104 + void command(const std::string& cmd) { command<redisReply*>(cmd, nullptr); }
  105 +
  106 + /**
  107 + * Synchronously runs a command, returning the Command object only once
  108 + * a reply is received or there is an error. The user is responsible for
  109 + * calling the Command object's .free() method when done with it.
  110 + */
  111 + template<class ReplyT>
  112 + Command<ReplyT>& command_blocking(const std::string& cmd);
  113 +
  114 + /**
  115 + * Synchronously runs a command, returning only once a reply is received
  116 + * or there's an error. The return value is true if the command got a
  117 + * successful reply, and false if something went wrong.
  118 + */
  119 + bool command_blocking(const std::string& cmd);
  120 +
  121 + template<class ReplyT>
  122 + Command<ReplyT>& command_looping(
  123 + const std::string& cmd,
  124 + const std::function<void(Command<ReplyT>&)>& callback,
  125 + double repeat,
  126 + double after = 0.0
  127 + );
116 128  
117 129 /**
118 130 * A wrapper around command() for synchronous use. Waits for a reply, populates it
... ... @@ -121,8 +133,8 @@ public:
121 133 * status() will give the error code, and reply() will return the reply data if
122 134 * the call succeeded.
123 135 */
124   - template<class ReplyT>
125   - Command<ReplyT>& command_blocking(const std::string& cmd);
  136 +// template<class ReplyT>
  137 +// Command<ReplyT>& command_blocking(const std::string& cmd);
126 138  
127 139 /**
128 140 * Return the total number of successful commands processed by this Redox instance.
... ... @@ -146,13 +158,13 @@ public:
146 158 * Non-templated version of command in case you really don't care
147 159 * about the reply and just want to send something off.
148 160 */
149   - void command(const std::string& command);
  161 +// void command(const std::string& command);
150 162  
151 163 /**
152 164 * Non-templated version of command_blocking in case you really don't
153 165 * care about the reply. Returns true if succeeded, false if error.
154 166 */
155   - bool command_blocking(const std::string& command);
  167 +// bool command_blocking(const std::string& command);
156 168  
157 169 /**
158 170 * Redis GET command wrapper - return the value for the given key, or throw
... ... @@ -265,6 +277,15 @@ public:
265 277  
266 278 private:
267 279  
  280 + template<class ReplyT>
  281 + Command<ReplyT>& createCommand(
  282 + const std::string& cmd,
  283 + const std::function<void(Command<ReplyT>&)>& callback = nullptr,
  284 + double repeat = 0.0,
  285 + double after = 0.0,
  286 + bool free_memory = true
  287 + );
  288 +
268 289 // Setup code for the constructors
269 290 void init_ev();
270 291 void init_hiredis();
... ... @@ -373,7 +394,7 @@ private:
373 394  
374 395  
375 396 template<class ReplyT>
376   -Command<ReplyT>& Redox::command(
  397 +Command<ReplyT>& Redox::createCommand(
377 398 const std::string& cmd,
378 399 const std::function<void(Command<ReplyT>&)>& callback,
379 400 double repeat,
... ... @@ -409,6 +430,24 @@ Command&lt;ReplyT&gt;&amp; Redox::command(
409 430 }
410 431  
411 432 template<class ReplyT>
  433 +void Redox::command(
  434 + const std::string& cmd,
  435 + const std::function<void(Command<ReplyT>&)>& callback
  436 +) {
  437 + createCommand(cmd, callback);
  438 +}
  439 +
  440 +template<class ReplyT>
  441 +Command<ReplyT>& Redox::command_looping(
  442 + const std::string& cmd,
  443 + const std::function<void(Command<ReplyT>&)>& callback,
  444 + double repeat,
  445 + double after
  446 +) {
  447 + return createCommand(cmd, callback, repeat, after);
  448 +}
  449 +
  450 +template<class ReplyT>
412 451 Command<ReplyT>& Redox::command_blocking(const std::string& cmd) {
413 452  
414 453 std::condition_variable cv;
... ... @@ -416,7 +455,7 @@ Command&lt;ReplyT&gt;&amp; Redox::command_blocking(const std::string&amp; cmd) {
416 455 std::unique_lock<std::mutex> lk(m);
417 456 std::atomic_bool done = {false};
418 457  
419   - Command<ReplyT>& c = command<ReplyT>(cmd,
  458 + Command<ReplyT>& c = createCommand<ReplyT>(cmd,
420 459 [&cv, &done](Command<ReplyT>& cmd_obj) {
421 460 done = true;
422 461 cv.notify_one();
... ...
test/test.cpp
... ... @@ -9,8 +9,9 @@
9 9  
10 10 namespace {
11 11  
12   -using namespace redox;
13 12 using namespace std;
  13 +using redox::Redox;
  14 +using redox::Command;
14 15  
15 16 // ------------------------------------------
16 17 // The fixture for testing class Redox.
... ... @@ -43,8 +44,8 @@ protected:
43 44 mutex cmd_waiter_lock;
44 45  
45 46 // To make the callback code nicer
46   - template <class ReplyT>
47   - using Callback = std::function<void(const std::string&, const ReplyT&)>;
  47 + template<class ReplyT>
  48 + using Callback = std::function<void(Command<ReplyT>&)>;
48 49  
49 50 /**
50 51 * Helper function that returns a command callback to print out the
... ... @@ -53,8 +54,9 @@ protected:
53 54 template<class ReplyT>
54 55 Callback<ReplyT> check(const ReplyT& value) {
55 56 cmd_count++;
56   - return [this, value](const string& cmd, const ReplyT& reply) {
57   - EXPECT_EQ(reply, value);
  57 + return [this, value](Command<ReplyT>& c) {
  58 + EXPECT_TRUE(c.ok());
  59 + if(c.ok()) EXPECT_EQ(c.reply(), value);
58 60 cmd_count--;
59 61 cmd_waiter.notify_all();
60 62 };
... ... @@ -65,9 +67,9 @@ protected:
65 67 */
66 68 template<class ReplyT>
67 69 Callback<ReplyT> print(Callback<ReplyT> callback) {
68   - return [callback](const string& cmd, const ReplyT& reply) {
69   - cout << "[ASYNC] " << cmd << ": " << reply << endl;
70   - callback(cmd, reply);
  70 + return [callback](Command<ReplyT>& c) {
  71 + if(c.ok()) cout << "[ASYNC] " << c.cmd() << ": " << c.reply() << endl;
  72 + callback(c);
71 73 };
72 74 }
73 75  
... ... @@ -90,18 +92,18 @@ protected:
90 92 };
91 93  
92 94 template<class ReplyT>
93   - void check_sync(Command<ReplyT>* c, const ReplyT& value) {
94   - ASSERT_TRUE(c->ok());
95   - EXPECT_EQ(c->reply(), value);
96   - c->free();
  95 + void check_sync(Command<ReplyT>& c, const ReplyT& value) {
  96 + ASSERT_TRUE(c.ok());
  97 + EXPECT_EQ(c.reply(), value);
  98 + c.free();
97 99 }
98 100  
99 101 template<class ReplyT>
100   - void print_and_check_sync(Command<ReplyT>* c, const ReplyT& value) {
101   - ASSERT_TRUE(c->ok());
102   - EXPECT_EQ(c->reply(), value);
103   - cout << "[SYNC] " << c->cmd_ << ": " << c->reply() << endl;
104   - c->free();
  102 + void print_and_check_sync(Command<ReplyT>& c, const ReplyT& value) {
  103 + ASSERT_TRUE(c.ok());
  104 + EXPECT_EQ(c.reply(), value);
  105 + cout << "[SYNC] " << c.cmd_ << ": " << c.reply() << endl;
  106 + c.free();
105 107 }
106 108 };
107 109  
... ... @@ -132,9 +134,9 @@ TEST_F(RedoxTest, Incr) {
132 134 }
133 135  
134 136 TEST_F(RedoxTest, Delayed) {
135   - Command<int>* c = rdx.command<int>("INCR redox_test:a", check(1), nullptr, 0, 0.1);
  137 + Command<int>& c = rdx.command_looping<int>("INCR redox_test:a", check(1), 0, 0.1);
136 138 this_thread::sleep_for(chrono::milliseconds(150));
137   - c->cancel();
  139 + c.cancel();
138 140 rdx.command<string>("GET redox_test:a", print_and_check(to_string(1)));
139 141 wait_and_stop();
140 142 }
... ... @@ -143,14 +145,16 @@ TEST_F(RedoxTest, Loop) {
143 145 int count = 0;
144 146 int target_count = 100;
145 147 double dt = 0.001;
146   - Command<int>* c = rdx.command<int>("INCR redox_test:a",
147   - [this, &count](const string& cmd, const int& reply) {
148   - check(++count)(cmd, reply);
149   - }, nullptr, dt);
  148 + Command<int>& cmd = rdx.command_looping<int>("INCR redox_test:a",
  149 + [this, &count](Command<int>& c) {
  150 + check(++count)(c);
  151 + },
  152 + dt
  153 + );
150 154  
151 155 double wait_time = dt * (target_count - 0.5);
152 156 this_thread::sleep_for(std::chrono::duration<double>(wait_time));
153   - c->cancel();
  157 + cmd.cancel();
154 158  
155 159 rdx.command<string>("GET redox_test:a", print_and_check(to_string(target_count)));
156 160 wait_and_stop();
... ...