Commit 55c56d875c6a3e0a1680df7b0f69962b7a69a547

Authored by Hayk Martirosyan
1 parent 3d0b073e

Move from string command to vector of strings

This enables full support for binary data, with no tokenization needed
and all the assumptions that come with that. All of the core methods are
changed to accept vector<string>& instead of string&. Ported all
examples and tutorial as well.
README.md
... ... @@ -18,7 +18,8 @@ details so you can move on to the interesting part of your project.
18 18 * Automatic pipelining, even for synchronous calls from separate threads
19 19 * Low-level access when needed
20 20 * Accessible and robust error handling
21   - * Logs to any ostream at a user-controlled log level
  21 + * Configurable logging level and output to any ostream
  22 + * Full support for binary data (keys and values)
22 23 * Fast - developed for robotics applications
23 24 * 100% clean Valgrind reports
24 25  
... ... @@ -76,10 +77,12 @@ a reply is received from the server.
76 77  
77 78 #### Asynchronous commands
78 79 In a high-performance application, we don't want to wait for a reply, but instead
79   -do other work.
80   -The `command` method accepts a Redis command and a callback to be invoked when a reply is received.
81   -
82   - rdx.command<string>("GET hello", [](Command<string>& c) {
  80 +do other work. At the core of Redox is a generic asynchronous API for executing
  81 +any Redis command and providing a reply callback. The `command` method accepts a
  82 +Redis command in the form of an STL vector of strings, and a callback to be invoked
  83 +when a reply is received or if there is an error.
  84 +
  85 + rdx.command<string>({"GET", "hello"}, [](Command<string>& c) {
83 86 if(c.ok()) {
84 87 cout << "Hello, async " << c.reply() << endl;
85 88 } else {
... ... @@ -91,7 +94,8 @@ This statement tells redox to run the command `GET hello`. The `&lt;string&gt;` templa
91 94 parameter means that we want the reply to be put into a string and that we expect
92 95 the server to respond with something that can be put into a string. The full list
93 96 of reply types is listed in this document and covers convenient access to anything
94   -returned from the Redis protocol.
  97 +returned from the Redis protocol. The input vector can contain arbitrary binary
  98 +data.
95 99  
96 100 The second argument is a callback function that accepts a reference to a Command object
97 101 of the requested reply type. The Command object contains the reply and any error
... ... @@ -108,17 +112,17 @@ Here is a simple example of running `GET hello` asynchronously ten times:
108 112  
109 113 // Block until connected, localhost by default
110 114 if(!rdx.connect()) return 1;
111   -
  115 +
112 116 auto got_reply = [](Command<string>& c) {
113 117 if(!c.ok()) return;
114 118 cout << c.cmd() << ": " << c.reply() << endl;
115 119 };
116   -
117   - for(int i = 0; i < 10; i++) rdx.command<string>("GET hello", got_reply);
118   -
  120 +
  121 + for(int i = 0; i < 10; i++) rdx.command<string>({"GET", "hello"}, got_reply);
  122 +
119 123 // Do useful work
120 124 this_thread::sleep_for(chrono::milliseconds(10));
121   -
  125 +
122 126 rdx.disconnect(); // Block until disconnected
123 127  
124 128 The `.command()` method returns immediately, so this program doesn't wait for a reply
... ... @@ -136,7 +140,7 @@ shut down after we get all replies, we could do something like this:
136 140 if(count == total) rdx.stop(); // Signal to shut down
137 141 };
138 142  
139   - for(int i = 0; i < total; i++) rdx.command<string>("GET hello", got_reply);
  143 + for(int i = 0; i < total; i++) rdx.command<string>({"GET", "hello"}, got_reply);
140 144  
141 145 // Do useful work
142 146  
... ... @@ -157,13 +161,13 @@ between synchronous commands in different threads. The `commandSync` method prov
157 161 a similar API to `command`, but instead of a callback returns a Command object when
158 162 a reply is received.
159 163  
160   - Command<string>& c = rdx.commandSync<string>("GET hello");
  164 + Command<string>& c = rdx.commandSync<string>({"GET", "hello"});
161 165 if(c.ok()) cout << c.cmd() << ": " << c.reply() << endl;
162 166 c.free();
163 167  
164 168 When using synchronous commands, the user is responsible for freeing the memory of
165   -the Command object by calling `c.free()`. The `c.cmd()` method just returns the
166   -command string (`GET hello` in this case).
  169 +the Command object by calling `c.free()`. The `c.cmd()` method just returns a string
  170 +representation of the command (`GET hello` in this case).
167 171  
168 172 #### Looping and delayed commands
169 173 We often want to run commands on regular invervals. Redox provides the `commandLoop`
... ... @@ -173,7 +177,7 @@ commands in a loop, because it only creates a single Command object.
173 177 to repeat the command. It then runs the command on the given interval until the user
174 178 calls `c.free()`.
175 179  
176   - Command<string>& cmd = rdx.commandLoop<string>("GET hello", [](Command<string>& c) {
  180 + Command<string>& cmd = rdx.commandLoop<string>({"GET", "hello"}, [](Command<string>& c) {
177 181 if(c.ok()) cout << c.cmd() << ": " << c.reply() << endl;
178 182 }, 0.1);
179 183  
... ... @@ -185,7 +189,7 @@ Finally, `commandDelayed` runs a command after a specified delay (in seconds). I
185 189 not return a command object, because the memory is automatically freed after the callback
186 190 is invoked.
187 191  
188   - rdx.commandDelayed<string>("GET hello", [](Command<string>& c) {
  192 + rdx.commandDelayed<string>({"GET", "hello"}, [](Command<string>& c) {
189 193 if(c.ok()) cout << c.cmd() << ": " << c.reply() << endl;
190 194 }, 1);
191 195 this_thread::sleep_for(chrono::seconds(2));
... ... @@ -219,6 +223,12 @@ receives messages and provides subscribe/unsubscribe and psubscribe/punsubscribe
219 223  
220 224 sub.disconnect(); rdx.disconnect();
221 225  
  226 +#### strToVec and vecToStr
  227 +Redox provides helper methods to convert between a string command and
  228 +a vector of strings as needed by its API. `rdx.strToVec("GET foo")`
  229 +will return an `std::vector<std::string>` containing `GET` and `foo`
  230 +as entries. `rdx.vecToStr({"GET", "foo"})` will return the string `GET foo`.
  231 +
222 232 ## Reply types
223 233 These the available template parameters in redox and the Redis
224 234 [return types](http://redis.io/topics/protocol) they can hold.
... ...
examples/basic_threaded.cpp
... ... @@ -19,7 +19,7 @@ int main(int argc, char* argv[]) {
19 19  
20 20 thread setter([]() {
21 21 for(int i = 0; i < 5000; i++) {
22   - rdx.command<int>("INCR counter");
  22 + rdx.command<int>({"INCR", "counter"});
23 23 this_thread::sleep_for(chrono::milliseconds(1));
24 24 }
25 25 cout << "Setter thread exiting." << endl;
... ... @@ -28,7 +28,7 @@ int main(int argc, char* argv[]) {
28 28 thread getter([]() {
29 29 for(int i = 0; i < 5; i++) {
30 30 rdx.command<string>(
31   - "GET counter",
  31 + {"GET", "counter"},
32 32 [](Command<string>& c) {
33 33 if(c.ok()) cout << c.cmd() << ": " << c.reply() << endl;
34 34 }
... ...
examples/binary_data.cpp
... ... @@ -25,16 +25,17 @@ int main(int argc, char* argv[]) {
25 25 redox::Redox rdx; // Initialize Redox
26 26 if(!rdx.connect("localhost", 6379)) return 1; // Start the event loop
27 27  
28   - rdx.del("binary");
29   -
  28 + string binary_key = random_string(100);
30 29 string binary_data = random_string(10000);
31 30  
32   - auto& c = rdx.commandSync<string>("SET binary \"" + binary_data + "\"");
  31 + rdx.del(binary_key);
  32 +
  33 + auto& c = rdx.commandSync<string>({"SET", binary_key, binary_data});
33 34 if(c.ok()) cout << "Reply: " << c.reply() << endl;
34 35 else cerr << "Failed to set key! Status: " << c.status() << endl;
35 36 c.free();
36 37  
37   - auto& c2 = rdx.commandSync<string>("GET binary");
  38 + auto& c2 = rdx.commandSync<string>({"GET", binary_key});
38 39 if(c2.ok()) {
39 40 if(c2.reply() == binary_data) cout << "Binary data matches!" << endl;
40 41 else cerr << "Binary data differs!" << endl;
... ...
examples/data_types.cpp
... ... @@ -3,10 +3,10 @@
3 3 */
4 4  
5 5 #include <iostream>
6   -#include "redox.hpp"
7 6 #include <set>
8 7 #include <unordered_set>
9 8 #include <vector>
  9 +#include "redox.hpp"
10 10  
11 11 using namespace std;
12 12 using redox::Redox;
... ... @@ -19,9 +19,9 @@ int main(int argc, char* argv[]) {
19 19  
20 20 rdx.del("mylist");
21 21  
22   - rdx.commandSync("LPUSH mylist 1 2 3 4 5 6 7 8 9 10");
  22 + rdx.commandSync(rdx.strToVec("LPUSH mylist 1 2 3 4 5 6 7 8 9 10"));
23 23  
24   - rdx.command<vector<string>>("LRANGE mylist 0 4",
  24 + rdx.command<vector<string>>({"LRANGE", "mylist", "0", "4"},
25 25 [](Command<vector<string>>& c){
26 26 if(!c.ok()) return;
27 27 cout << "Last 5 elements as a vector: ";
... ... @@ -30,7 +30,7 @@ int main(int argc, char* argv[]) {
30 30 }
31 31 );
32 32  
33   - rdx.command<unordered_set<string>>("LRANGE mylist 0 4",
  33 + rdx.command<unordered_set<string>>(rdx.strToVec("LRANGE mylist 0 4"),
34 34 [](Command<unordered_set<string>>& c){
35 35 if(!c.ok()) return;
36 36 cout << "Last 5 elements as a hash: ";
... ... @@ -39,7 +39,7 @@ int main(int argc, char* argv[]) {
39 39 }
40 40 );
41 41  
42   - rdx.command<set<string>>("LRANGE mylist 0 4",
  42 + rdx.command<set<string>>(rdx.strToVec("LRANGE mylist 0 4"),
43 43 [&rdx](Command<set<string>>& c) {
44 44 if(c.ok()) {
45 45 cout << "Last 5 elements as a set: ";
... ...
examples/lpush_benchmark.cpp
... ... @@ -28,7 +28,7 @@ int main(int argc, char* argv[]) {
28 28 atomic_int count = {0};
29 29  
30 30 for(int i = 1; i <= len; i++) {
31   - rdx.command<int>("lpush test 1", [&t0, &t1, &count, len, &rdx](Command<int>& c) {
  31 + rdx.command<int>({"lpush", "test", "1"}, [&t0, &t1, &count, len, &rdx](Command<int>& c) {
32 32  
33 33 if(!c.ok()) return;
34 34  
... ...
examples/speed_test_async.cpp
... ... @@ -19,29 +19,28 @@ double time_s() {
19 19 int main(int argc, char* argv[]) {
20 20  
21 21 Redox rdx;
22   - if(!rdx.connect_unix("/var/run/redis/redis.sock")) return 1;
  22 + if(!rdx.connectUnix("/var/run/redis/redis.sock")) return 1;
23 23  
24   - bool status = rdx.commandSync("SET simple_loop:count 0");
25   - if(status) {
  24 + if(rdx.set("simple_loop:count", "0")) {
26 25 cout << "Reset the counter to zero." << endl;
27 26 } else {
28 27 cerr << "Failed to reset counter." << endl;
29 28 return 1;
30 29 }
31 30  
32   - string cmd_str = "INCR simple_loop:count";
  31 + vector<string> cmd_vec = {"INCR", "simple_loop:count"};
33 32 double freq = 400000; // Hz
34 33 double dt = 1 / freq; // s
35 34 double t = 5; // s
36 35  
37   - cout << "Sending \"" << cmd_str << "\" asynchronously every "
  36 + cout << "Sending \"" << rdx.vecToStr(cmd_vec) << "\" asynchronously every "
38 37 << dt << "s for " << t << "s..." << endl;
39 38  
40 39 double t0 = time_s();
41 40 atomic_int count(0);
42 41  
43 42 Command<int>& cmd = rdx.commandLoop<int>(
44   - cmd_str,
  43 + cmd_vec,
45 44 [&count, &rdx](Command<int>& c) {
46 45 if (!c.ok()) {
47 46 cerr << "Bad reply: " << c.status() << endl;
... ... @@ -67,4 +66,4 @@ int main(int argc, char* argv[]) {
67 66  
68 67 rdx.disconnect();
69 68 return 0;
70   -}
  69 +};
... ...
examples/speed_test_async_multi.cpp
... ... @@ -19,7 +19,7 @@ double time_s() {
19 19  
20 20 int main(int argc, char* argv[]) {
21 21  
22   - Redox rdx;
  22 + Redox rdx = {cout, redox::log::Debug};
23 23 if(!rdx.connect("localhost", 6379)) return 1;
24 24  
25 25 if(rdx.set("simple_loop:count", "0")) {
... ... @@ -29,13 +29,13 @@ int main(int argc, char* argv[]) {
29 29 return 1;
30 30 }
31 31  
32   - string cmd_str = "INCR simple_loop:count";
  32 + vector<string> cmd_vec = {"INCR", "simple_loop:count"};
33 33 double freq = 10000; // Hz
34 34 double dt = 1 / freq; // s
35 35 double t = 5; // s
36 36 int parallel = 100;
37 37  
38   - cout << "Sending \"" << cmd_str << "\" asynchronously every "
  38 + cout << "Sending \"" << rdx.vecToStr(cmd_vec) << "\" asynchronously every "
39 39 << dt << "s for " << t << "s..." << endl;
40 40  
41 41 double t0 = time_s();
... ... @@ -44,7 +44,7 @@ int main(int argc, char* argv[]) {
44 44 vector<Command<int>*> commands;
45 45 for(int i = 0; i < parallel; i++) {
46 46 commands.push_back(&rdx.commandLoop<int>(
47   - cmd_str,
  47 + cmd_vec,
48 48 [&count, &rdx](Command<int>& c) {
49 49 if (!c.ok()) {
50 50 cerr << "Bad reply: " << c.status() << endl;
... ... @@ -72,4 +72,4 @@ int main(int argc, char* argv[]) {
72 72  
73 73 rdx.disconnect();
74 74 return 0;
75   -}
  75 +};
... ...
examples/speed_test_sync.cpp
... ... @@ -20,24 +20,23 @@ int main(int argc, char* argv[]) {
20 20 Redox rdx;
21 21 if(!rdx.connect("localhost", 6379)) return 1;
22 22  
23   - if(rdx.commandSync("SET simple_loop:count 0")) {
  23 + if(rdx.commandSync({"SET", "simple_loop:count", "0"})) {
24 24 cout << "Reset the counter to zero." << endl;
25 25 } else {
26 26 cerr << "Failed to reset counter." << endl;
27 27 return 1;
28 28 }
29 29  
30   - string cmd_str = "INCR simple_loop:count";
31 30 double t = 5; // s
32 31  
33   - cout << "Sending \"" << cmd_str << "\" synchronously for " << t << "s..." << endl;
  32 + cout << "Sending \"" << "INCR simple_loop:count" << "\" synchronously for " << t << "s..." << endl;
34 33  
35 34 double t0 = time_s();
36 35 double t_end = t0 + t;
37 36 int count = 0;
38 37  
39 38 while(time_s() < t_end) {
40   - Command<int>& c = rdx.commandSync<int>(cmd_str);
  39 + Command<int>& c = rdx.commandSync<int>({"INCR", "simple_loop:count"});
41 40 if(!c.ok()) cerr << "Bad reply, code: " << c.status() << endl;
42 41 c.free();
43 42 count++;
... ...
include/redox/client.hpp
... ... @@ -55,11 +55,12 @@ class Redox {
55 55 public:
56 56  
57 57 // Connection states
58   - static const int NOT_YET_CONNECTED = 0;
59   - static const int CONNECTED = 1;
60   - static const int DISCONNECTED = 2;
61   - static const int CONNECT_ERROR = 3;
62   - static const int DISCONNECT_ERROR = 4;
  58 + static const int NOT_YET_CONNECTED = 0; // Starting state
  59 + static const int CONNECTED = 1; // Successfully connected
  60 + static const int DISCONNECTED = 2; // Successfully disconnected
  61 + static const int CONNECT_ERROR = 3; // Error connecting
  62 + static const int DISCONNECT_ERROR = 4; // Disconnected on error
  63 + static const int INIT_ERROR = 5; // Failed to init data structures
63 64  
64 65 // ------------------------------------------------
65 66 // Core public API
... ... @@ -92,7 +93,7 @@ public:
92 93 * Connects to Redis over a unix socket and starts an event loop in a separate
93 94 * thread. Returns true once everything is ready, or false on failure.
94 95 */
95   - bool connect_unix(
  96 + bool connectUnix(
96 97 const std::string& path = REDIS_DEFAULT_PATH,
97 98 std::function<void(int)> connection_callback = nullptr);
98 99  
... ... @@ -119,30 +120,32 @@ public:
119 120 * exactly once. The Command object is provided to the callback, and the
120 121 * memory for it is automatically freed when the callback returns.
121 122 */
  123 +
122 124 template<class ReplyT>
123 125 void command(
124   - const std::string& cmd,
  126 + const std::vector<std::string>& cmd,
125 127 const std::function<void(Command<ReplyT>&)>& callback = nullptr
126 128 );
127 129  
128 130 /**
129 131 * Asynchronously runs a command and ignores any errors or replies.
130 132 */
131   - void command(const std::string& cmd);
  133 + void command(const std::vector<std::string>& cmd);
132 134  
133 135 /**
134 136 * Synchronously runs a command, returning the Command object only once
135 137 * a reply is received or there is an error. The user is responsible for
136 138 * calling .free() on the returned Command object.
137 139 */
  140 +
138 141 template<class ReplyT>
139   - Command<ReplyT>& commandSync(const std::string& cmd);
  142 + Command<ReplyT>& commandSync(const std::vector<std::string>& cmd);
140 143  
141 144 /**
142 145 * Synchronously runs a command, returning only once a reply is received
143 146 * or there's an error. Returns true on successful reply, false on error.
144 147 */
145   - bool commandSync(const std::string& cmd);
  148 + bool commandSync(const std::vector<std::string>& cmd);
146 149  
147 150 /**
148 151 * Creates an asynchronous command that is run every [repeat] seconds,
... ... @@ -150,9 +153,10 @@ public:
150 153 * command is run only once. The user is responsible for calling .free()
151 154 * on the returned Command object.
152 155 */
  156 +
153 157 template<class ReplyT>
154 158 Command<ReplyT>& commandLoop(
155   - const std::string& cmd,
  159 + const std::vector<std::string>& cmd,
156 160 const std::function<void(Command<ReplyT>&)>& callback,
157 161 double repeat,
158 162 double after = 0.0
... ... @@ -164,15 +168,32 @@ public:
164 168 * or error, and the Command object memory is automatically freed
165 169 * after the callback returns.
166 170 */
  171 +
167 172 template<class ReplyT>
168 173 void commandDelayed(
169   - const std::string& cmd,
  174 + const std::vector<std::string>& cmd,
170 175 const std::function<void(Command<ReplyT>&)>& callback,
171 176 double after
172 177 );
173 178  
174 179 // ------------------------------------------------
175   - // Wrapper methods for convenience only
  180 + // Utility methods
  181 + // ------------------------------------------------
  182 +
  183 + /**
  184 + * Given a vector of strings, returns a string of the concatenated elements, separated
  185 + * by the delimiter. Useful for printing out a command string from a vector.
  186 + */
  187 + std::string vecToStr(const std::vector<std::string>& vec, const char delimiter = ' ');
  188 +
  189 + /**
  190 + * Given a command string, returns a vector of strings by splitting the input by
  191 + * the delimiter. Useful for turning a string input into a command.
  192 + */
  193 + std::vector<std::string> strToVec(const std::string& s, const char delimiter = ' ');
  194 +
  195 + // ------------------------------------------------
  196 + // Command wrapper methods for convenience only
176 197 // ------------------------------------------------
177 198  
178 199 /**
... ... @@ -225,9 +246,12 @@ private:
225 246  
226 247 // One stop shop for creating commands. The base of all public
227 248 // methods that run commands.
  249 +
  250 + // One stop shop for creating commands. The base of all public
  251 + // methods that run commands.
228 252 template<class ReplyT>
229 253 Command<ReplyT>& createCommand(
230   - const std::string& cmd,
  254 + const std::vector<std::string>& cmd,
231 255 const std::function<void(Command<ReplyT>&)>& callback = nullptr,
232 256 double repeat = 0.0,
233 257 double after = 0.0,
... ... @@ -236,8 +260,8 @@ private:
236 260  
237 261 // Setup code for the constructors
238 262 // Return true on success, false on failure
239   - bool init_ev();
240   - bool init_hiredis();
  263 + bool initEv();
  264 + bool initHiredis();
241 265  
242 266 // Callbacks invoked on server connection/disconnection
243 267 static void connectedCallback(const redisAsyncContext* c, int status);
... ... @@ -380,7 +404,7 @@ private:
380 404  
381 405 template<class ReplyT>
382 406 Command<ReplyT>& Redox::createCommand(
383   - const std::string& cmd,
  407 + const std::vector<std::string>& cmd,
384 408 const std::function<void(Command<ReplyT>&)>& callback,
385 409 double repeat,
386 410 double after,
... ... @@ -411,7 +435,7 @@ Command&lt;ReplyT&gt;&amp; Redox::createCommand(
411 435  
412 436 template<class ReplyT>
413 437 void Redox::command(
414   - const std::string& cmd,
  438 + const std::vector<std::string>& cmd,
415 439 const std::function<void(Command<ReplyT>&)>& callback
416 440 ) {
417 441 createCommand(cmd, callback);
... ... @@ -419,7 +443,7 @@ void Redox::command(
419 443  
420 444 template<class ReplyT>
421 445 Command<ReplyT>& Redox::commandLoop(
422   - const std::string& cmd,
  446 + const std::vector<std::string>& cmd,
423 447 const std::function<void(Command<ReplyT>&)>& callback,
424 448 double repeat,
425 449 double after
... ... @@ -429,7 +453,7 @@ Command&lt;ReplyT&gt;&amp; Redox::commandLoop(
429 453  
430 454 template<class ReplyT>
431 455 void Redox::commandDelayed(
432   - const std::string& cmd,
  456 + const std::vector<std::string>& cmd,
433 457 const std::function<void(Command<ReplyT>&)>& callback,
434 458 double after
435 459 ) {
... ... @@ -437,7 +461,7 @@ void Redox::commandDelayed(
437 461 }
438 462  
439 463 template<class ReplyT>
440   -Command<ReplyT>& Redox::commandSync(const std::string& cmd) {
  464 +Command<ReplyT>& Redox::commandSync(const std::vector<std::string>& cmd) {
441 465 auto& c = createCommand<ReplyT>(cmd, nullptr, 0, 0, false);
442 466 c.wait();
443 467 return c;
... ...
include/redox/command.hpp
... ... @@ -89,12 +89,12 @@ public:
89 89 /**
90 90 * Returns the command string represented by this object.
91 91 */
92   - const std::string& cmd() const { return cmd_; };
  92 + std::string cmd() const;
93 93  
94 94 // Allow public access to constructed data
95 95 Redox* const rdx_;
96 96 const long id_;
97   - const std::string cmd_;
  97 + const std::vector<std::string> cmd_;
98 98 const double repeat_;
99 99 const double after_;
100 100 const bool free_memory_;
... ... @@ -104,7 +104,7 @@ private:
104 104 Command(
105 105 Redox* rdx,
106 106 long id,
107   - const std::string& cmd,
  107 + const std::vector<std::string>& cmd,
108 108 const std::function<void(Command<ReplyT>&)>& callback,
109 109 double repeat, double after,
110 110 bool free_memory,
... ...
include/redox/subscriber.hpp
... ... @@ -52,12 +52,12 @@ public:
52 52 }
53 53  
54 54 /**
55   - * Same as .connect_unix() on a Redox instance.
  55 + * Same as .connectUnix() on a Redox instance.
56 56 */
57 57 bool connect_unix(
58 58 const std::string& path = REDIS_DEFAULT_PATH,
59 59 std::function<void(int)> connection_callback = nullptr) {
60   - return rdx_.connect_unix(path, connection_callback);
  60 + return rdx_.connectUnix(path, connection_callback);
61 61 }
62 62  
63 63 /**
... ...
src/client.cpp
... ... @@ -19,6 +19,7 @@
19 19 */
20 20  
21 21 #include <signal.h>
  22 +#include <algorithm>
22 23 #include "client.hpp"
23 24  
24 25 using namespace std;
... ... @@ -28,7 +29,7 @@ namespace redox {
28 29 Redox::Redox(
29 30 ostream& log_stream,
30 31 log::Level log_level
31   -) : logger_(log_stream, log_level) {}
  32 +) : logger_(log_stream, log_level), evloop_(nullptr) {}
32 33  
33 34 bool Redox::connect(
34 35 const std::string& host, const int port,
... ... @@ -39,12 +40,12 @@ bool Redox::connect(
39 40 port_ = port;
40 41 user_connection_callback_ = connection_callback;
41 42  
42   - if(!init_ev()) return false;
  43 + if(!initEv()) return false;
43 44  
44 45 // Connect over TCP
45 46 ctx_ = redisAsyncConnect(host.c_str(), port);
46 47  
47   - if(!init_hiredis()) return false;
  48 + if(!initHiredis()) return false;
48 49  
49 50 event_loop_thread_ = thread([this] { runEventLoop(); });
50 51  
... ... @@ -59,7 +60,7 @@ bool Redox::connect(
59 60 return connect_state_ == CONNECTED;
60 61 }
61 62  
62   -bool Redox::connect_unix(
  63 +bool Redox::connectUnix(
63 64 const std::string& path,
64 65 std::function<void(int)> connection_callback
65 66 ) {
... ... @@ -67,12 +68,12 @@ bool Redox::connect_unix(
67 68 path_ = path;
68 69 user_connection_callback_ = connection_callback;
69 70  
70   - if(!init_ev()) return false;
  71 + if(!initEv()) return false;
71 72  
72 73 // Connect over unix sockets
73 74 ctx_ = redisAsyncConnectUnix(path.c_str());
74 75  
75   - if(!init_hiredis()) return false;
  76 + if(!initHiredis()) return false;
76 77  
77 78 event_loop_thread_ = thread([this] { runEventLoop(); });
78 79  
... ... @@ -106,10 +107,11 @@ void Redox::wait() {
106 107 Redox::~Redox() {
107 108  
108 109 // Bring down the event loop
109   - stop();
  110 + if(running_ == true) { stop(); }
110 111  
111 112 if(event_loop_thread_.joinable()) event_loop_thread_.join();
112   - ev_loop_destroy(evloop_);
  113 +
  114 + if(evloop_ != nullptr) ev_loop_destroy(evloop_);
113 115 }
114 116  
115 117 void Redox::connectedCallback(const redisAsyncContext* ctx, int status) {
... ... @@ -148,30 +150,52 @@ void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) {
148 150 if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_);
149 151 }
150 152  
151   -bool Redox::init_ev() {
  153 +bool Redox::initEv() {
152 154 signal(SIGPIPE, SIG_IGN);
153 155 evloop_ = ev_loop_new(EVFLAG_AUTO);
  156 + if(evloop_ == nullptr) {
  157 + logger_.fatal() << "Could not create a libev event loop.";
  158 + connect_state_ = INIT_ERROR;
  159 + connect_waiter_.notify_all();
  160 + return false;
  161 + }
154 162 ev_set_userdata(evloop_, (void*)this); // Back-reference
155 163 return true;
156 164 }
157 165  
158   -bool Redox::init_hiredis() {
  166 +bool Redox::initHiredis() {
159 167  
160 168 ctx_->data = (void*)this; // Back-reference
161 169  
162 170 if (ctx_->err) {
163   - logger_.error() << "Could not create a hiredis context: " << ctx_->errstr;
164   - connect_state_ = CONNECT_ERROR;
  171 + logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr;
  172 + connect_state_ = INIT_ERROR;
165 173 connect_waiter_.notify_all();
166 174 return false;
167 175 }
168 176  
169 177 // Attach event loop to hiredis
170   - redisLibevAttach(evloop_, ctx_);
  178 + if(redisLibevAttach(evloop_, ctx_) != REDIS_OK) {
  179 + logger_.fatal() << "Could not attach libev event loop to hiredis.";
  180 + connect_state_ = INIT_ERROR;
  181 + connect_waiter_.notify_all();
  182 + return false;
  183 + }
171 184  
172 185 // Set the callbacks to be invoked on server connection/disconnection
173   - redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback);
174   - redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback);
  186 + if(redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) {
  187 + logger_.fatal() << "Could not attach connect callback to hiredis.";
  188 + connect_state_ = INIT_ERROR;
  189 + connect_waiter_.notify_all();
  190 + return false;
  191 + }
  192 +
  193 + if(redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) {
  194 + logger_.fatal() << "Could not attach disconnect callback to hiredis.";
  195 + connect_state_ = INIT_ERROR;
  196 + connect_waiter_.notify_all();
  197 + return false;
  198 + }
175 199  
176 200 return true;
177 201 }
... ... @@ -249,7 +273,6 @@ void Redox::runEventLoop() {
249 273 logger_.info() << "Event thread exited.";
250 274 }
251 275  
252   -
253 276 template<class ReplyT>
254 277 Command<ReplyT>* Redox::findCommand(long id) {
255 278  
... ... @@ -284,32 +307,21 @@ bool Redox::submitToServer(Command&lt;ReplyT&gt;* c) {
284 307 Redox* rdx = c->rdx_;
285 308 c->pending_++;
286 309  
287   - // Process binary data if trailing quotation. This is a limited implementation
288   - // to allow binary data between the first and the last quotes of the command string,
289   - // if the very last character of the command is a quote ('"').
290   - if(c->cmd_[c->cmd_.size()-1] == '"') {
291   -
292   - // Indices of the quotes
293   - size_t first = c->cmd_.find('"');
294   - size_t last = c->cmd_.size()-1;
295   -
296   - // Proceed only if the first and last quotes are different
297   - if(first != last) {
298   -
299   - string format = c->cmd_.substr(0, first) + "%b";
300   - string value = c->cmd_.substr(first+1, last-first-1);
301   - if (redisAsyncCommand(rdx->ctx_, commandCallback<ReplyT>, (void*) c->id_, format.c_str(), value.c_str(), value.size()) != REDIS_OK) {
302   - rdx->logger_.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx_->errstr;
303   - c->reply_status_ = Command<ReplyT>::SEND_ERROR;
304   - c->invoke();
305   - return false;
306   - }
307   - return true;
308   - }
309   - }
310   -
311   - if (redisAsyncCommand(rdx->ctx_, commandCallback<ReplyT>, (void*) c->id_, c->cmd_.c_str()) != REDIS_OK) {
312   - rdx->logger_.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx_->errstr;
  310 + // Construct a char** from the vector
  311 + vector<const char*> argv;
  312 + transform(c->cmd_.begin(), c->cmd_.end(), back_inserter(argv),
  313 + [](const string& s){ return s.c_str(); }
  314 + );
  315 +
  316 + // Construct a size_t* of string lengths from the vector
  317 + vector<size_t> argvlen;
  318 + transform(c->cmd_.begin(), c->cmd_.end(), back_inserter(argvlen),
  319 + [](const string& s) { return s.size(); }
  320 + );
  321 +
  322 + if(redisAsyncCommandArgv(rdx->ctx_, commandCallback<ReplyT>, (void*) c->id_,
  323 + argv.size(), &argv[0], &argvlen[0]) != REDIS_OK) {
  324 + rdx->logger_.error() << "Could not send \"" << c->cmd() << "\": " << rdx->ctx_->errstr;
313 325 c->reply_status_ = Command<ReplyT>::SEND_ERROR;
314 326 c->invoke();
315 327 return false;
... ... @@ -498,11 +510,31 @@ Redox::getCommandMap&lt;unordered_set&lt;string&gt;&gt;() { return commands_unordered_set_st
498 510 // Helpers
499 511 // ----------------------------
500 512  
501   -void Redox::command(const std::string& cmd) {
  513 +string Redox::vecToStr(const vector<string>& vec, const char delimiter) {
  514 + string str;
  515 + for(size_t i = 0; i < vec.size() - 1; i++)
  516 + str += vec[i] + delimiter;
  517 + str += vec[vec.size()-1];
  518 + return str;
  519 +}
  520 +
  521 +vector<string> Redox::strToVec(const string& s, const char delimiter) {
  522 + vector<string> vec;
  523 + size_t last = 0;
  524 + size_t next = 0;
  525 + while ((next = s.find(delimiter, last)) != string::npos) {
  526 + vec.push_back(s.substr(last, next-last));
  527 + last = next + 1;
  528 + }
  529 + vec.push_back(s.substr(last));
  530 + return vec;
  531 +}
  532 +
  533 +void Redox::command(const std::vector<std::string>& cmd) {
502 534 command<redisReply*>(cmd, nullptr);
503 535 }
504 536  
505   -bool Redox::commandSync(const string& cmd) {
  537 +bool Redox::commandSync(const std::vector<std::string>& cmd) {
506 538 auto& c = commandSync<redisReply*>(cmd);
507 539 bool succeeded = c.ok();
508 540 c.free();
... ... @@ -511,7 +543,7 @@ bool Redox::commandSync(const string&amp; cmd) {
511 543  
512 544 string Redox::get(const string& key) {
513 545  
514   - Command<char*>& c = commandSync<char*>("GET \"" + key + '"');
  546 + Command<char*>& c = commandSync<char*>({"GET", key});
515 547 if(!c.ok()) {
516 548 throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + to_string(c.status()));
517 549 }
... ... @@ -521,15 +553,15 @@ string Redox::get(const string&amp; key) {
521 553 };
522 554  
523 555 bool Redox::set(const string& key, const string& value) {
524   - return commandSync("SET " + key + " \"" + value + '"');
  556 + return commandSync({"SET", key, value});
525 557 }
526 558  
527 559 bool Redox::del(const string& key) {
528   - return commandSync("DEL \"" + key + '"');
  560 + return commandSync({"DEL", key});
529 561 }
530 562  
531 563 void Redox::publish(const string& topic, const string& msg) {
532   - command<redisReply*>("PUBLISH " + topic + " \"" + msg + '"');
  564 + command<redisReply*>({"PUBLISH", topic, msg});
533 565 }
534 566  
535 567 } // End namespace redis
... ...
src/command.cpp
... ... @@ -33,7 +33,7 @@ template&lt;class ReplyT&gt;
33 33 Command<ReplyT>::Command(
34 34 Redox* rdx,
35 35 long id,
36   - const std::string& cmd,
  36 + const std::vector<std::string>& cmd,
37 37 const std::function<void(Command<ReplyT>&)>& callback,
38 38 double repeat, double after, bool free_memory, log::Logger& logger
39 39 ) : rdx_(rdx), id_(id), cmd_(cmd), repeat_(repeat), after_(after), free_memory_(free_memory),
... ... @@ -113,12 +113,17 @@ template&lt;class ReplyT&gt;
113 113 ReplyT Command<ReplyT>::reply() {
114 114 std::lock_guard<std::mutex> lg(reply_guard_);
115 115 if (!ok()) {
116   - logger_.warning() << cmd_ << ": Accessing reply value while status != OK.";
  116 + logger_.warning() << cmd() << ": Accessing reply value while status != OK.";
117 117 }
118 118 return reply_val_;
119 119 }
120 120  
121 121 template<class ReplyT>
  122 +std::string Command<ReplyT>::cmd() const {
  123 + return rdx_->vecToStr(cmd_);
  124 +};
  125 +
  126 +template<class ReplyT>
122 127 bool Command<ReplyT>::isExpectedReply(int type) {
123 128  
124 129 if(reply_obj_->type == type) {
... ... @@ -128,7 +133,7 @@ bool Command&lt;ReplyT&gt;::isExpectedReply(int type) {
128 133  
129 134 if(checkErrorReply() || checkNilReply()) return false;
130 135  
131   - logger_.error() << cmd_ << ": Received reply of type " << reply_obj_->type
  136 + logger_.error() << cmd() << ": Received reply of type " << reply_obj_->type
132 137 << ", expected type " << type << ".";
133 138 reply_status_ = WRONG_TYPE;
134 139 return false;
... ... @@ -144,7 +149,7 @@ bool Command&lt;ReplyT&gt;::isExpectedReply(int typeA, int typeB) {
144 149  
145 150 if(checkErrorReply() || checkNilReply()) return false;
146 151  
147   - logger_.error() << cmd_ << ": Received reply of type " << reply_obj_->type
  152 + logger_.error() << cmd() << ": Received reply of type " << reply_obj_->type
148 153 << ", expected type " << typeA << " or " << typeB << ".";
149 154 reply_status_ = WRONG_TYPE;
150 155 return false;
... ... @@ -154,7 +159,7 @@ template&lt;class ReplyT&gt;
154 159 bool Command<ReplyT>::checkErrorReply() {
155 160  
156 161 if (reply_obj_->type == REDIS_REPLY_ERROR) {
157   - logger_.error() << cmd_ << ": " << reply_obj_->str;
  162 + logger_.error() << cmd() << ": " << reply_obj_->str;
158 163 reply_status_ = ERROR_REPLY;
159 164 return true;
160 165 }
... ... @@ -165,7 +170,7 @@ template&lt;class ReplyT&gt;
165 170 bool Command<ReplyT>::checkNilReply() {
166 171  
167 172 if (reply_obj_->type == REDIS_REPLY_NIL) {
168   - logger_.warning() << cmd_ << ": Nil reply.";
  173 + logger_.warning() << cmd() << ": Nil reply.";
169 174 reply_status_ = NIL_REPLY;
170 175 return true;
171 176 }
... ...
src/subscriber.cpp
... ... @@ -99,7 +99,7 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic,
99 99 function<void(const string&, int)> err_callback
100 100 ) {
101 101  
102   - Command<redisReply*>& sub_cmd = rdx_.commandLoop<redisReply*>(cmd_name + " " + topic,
  102 + Command<redisReply*>& sub_cmd = rdx_.commandLoop<redisReply*>({cmd_name, topic},
103 103 [this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command<redisReply*>& c) {
104 104  
105 105 if (!c.ok()) {
... ... @@ -191,7 +191,7 @@ void Subscriber::psubscribe(const string topic,
191 191 void Subscriber::unsubscribeBase(const string cmd_name, const string topic,
192 192 function<void(const string&, int)> err_callback
193 193 ) {
194   - rdx_.command<redisReply*>(cmd_name + " " + topic,
  194 + rdx_.command<redisReply*>({cmd_name, topic},
195 195 [topic, err_callback](Command<redisReply*>& c) {
196 196 if(!c.ok()) {
197 197 if (err_callback) err_callback(topic, c.status());
... ...
test/test.cpp
... ... @@ -44,7 +44,7 @@ protected:
44 44 rdx.connect("localhost", 6379);
45 45  
46 46 // Clear all keys used by the tests here
47   - rdx.command("DEL redox_test:a");
  47 + rdx.command({"DEL", "redox_test:a"});
48 48 }
49 49  
50 50 virtual ~RedoxTest() { }
... ... @@ -113,7 +113,7 @@ protected:
113 113 void print_and_check_sync(Command<ReplyT>& c, const ReplyT& value) {
114 114 ASSERT_TRUE(c.ok());
115 115 EXPECT_EQ(c.reply(), value);
116   - cout << "[SYNC] " << c.cmd_ << ": " << c.reply() << endl;
  116 + cout << "[SYNC] " << c.cmd() << ": " << c.reply() << endl;
117 117 c.free();
118 118 }
119 119 };
... ... @@ -123,31 +123,31 @@ protected:
123 123 // -------------------------------------------
124 124  
125 125 TEST_F(RedoxTest, GetSet) {
126   - rdx.command<string>("SET redox_test:a apple", print_and_check<string>("OK"));
127   - rdx.command<string>("GET redox_test:a", print_and_check<string>("apple"));
  126 + rdx.command<string>({"SET", "redox_test:a", "apple"}, print_and_check<string>("OK"));
  127 + rdx.command<string>({"GET", "redox_test:a"}, print_and_check<string>("apple"));
128 128 wait_for_replies();
129 129 }
130 130  
131 131 TEST_F(RedoxTest, Delete) {
132   - rdx.command<string>("SET redox_test:a apple", print_and_check<string>("OK"));
133   - rdx.command<int>("DEL redox_test:a", print_and_check(1));
134   - rdx.command<nullptr_t>("GET redox_test:a", check(nullptr));
  132 + rdx.command<string>({"SET", "redox_test:a", "apple"}, print_and_check<string>("OK"));
  133 + rdx.command<int>({"DEL", "redox_test:a"}, print_and_check(1));
  134 + rdx.command<nullptr_t>({"GET", "redox_test:a"}, check(nullptr));
135 135 wait_for_replies();
136 136 }
137 137  
138 138 TEST_F(RedoxTest, Incr) {
139 139 int count = 100;
140 140 for(int i = 0; i < count; i++) {
141   - rdx.command<int>("INCR redox_test:a", check(i+1));
  141 + rdx.command<int>({"INCR", "redox_test:a"}, check(i+1));
142 142 }
143   - rdx.command<string>("GET redox_test:a", print_and_check(to_string(count)));
  143 + rdx.command<string>({"GET", "redox_test:a"}, print_and_check(to_string(count)));
144 144 wait_for_replies();
145 145 }
146 146  
147 147 TEST_F(RedoxTest, Delayed) {
148   - rdx.commandDelayed<int>("INCR redox_test:a", check(1), 0.1);
  148 + rdx.commandDelayed<int>({"INCR", "redox_test:a"}, check(1), 0.1);
149 149 this_thread::sleep_for(chrono::milliseconds(150));
150   - rdx.command<string>("GET redox_test:a", print_and_check(to_string(1)));
  150 + rdx.command<string>({"GET", "redox_test:a"}, print_and_check(to_string(1)));
151 151 wait_for_replies();
152 152 }
153 153  
... ... @@ -155,7 +155,7 @@ TEST_F(RedoxTest, Loop) {
155 155 int count = 0;
156 156 int target_count = 20;
157 157 double dt = 0.005;
158   - Command<int>& cmd = rdx.commandLoop<int>("INCR redox_test:a",
  158 + Command<int>& cmd = rdx.commandLoop<int>({"INCR", "redox_test:a"},
159 159 [this, &count](Command<int>& c) {
160 160 check(++count)(c);
161 161 },
... ... @@ -166,7 +166,7 @@ TEST_F(RedoxTest, Loop) {
166 166 this_thread::sleep_for(std::chrono::duration<double>(wait_time));
167 167 cmd.free();
168 168  
169   - rdx.command<string>("GET redox_test:a", print_and_check(to_string(target_count)));
  169 + rdx.command<string>({"GET", "redox_test:a"}, print_and_check(to_string(target_count)));
170 170 wait_for_replies();
171 171 }
172 172  
... ... @@ -175,24 +175,24 @@ TEST_F(RedoxTest, Loop) {
175 175 // -------------------------------------------
176 176  
177 177 TEST_F(RedoxTest, GetSetSync) {
178   - print_and_check_sync<string>(rdx.commandSync<string>("SET redox_test:a apple"), "OK");
179   - print_and_check_sync<string>(rdx.commandSync<string>("GET redox_test:a"), "apple");
  178 + print_and_check_sync<string>(rdx.commandSync<string>({"SET", "redox_test:a", "apple"}), "OK");
  179 + print_and_check_sync<string>(rdx.commandSync<string>({"GET", "redox_test:a"}), "apple");
180 180 rdx.disconnect();
181 181 }
182 182  
183 183 TEST_F(RedoxTest, DeleteSync) {
184   - print_and_check_sync<string>(rdx.commandSync<string>("SET redox_test:a apple"), "OK");
185   - print_and_check_sync(rdx.commandSync<int>("DEL redox_test:a"), 1);
186   - check_sync(rdx.commandSync<nullptr_t>("GET redox_test:a"), nullptr);
  184 + print_and_check_sync<string>(rdx.commandSync<string>({"SET", "redox_test:a", "apple"}), "OK");
  185 + print_and_check_sync(rdx.commandSync<int>({"DEL", "redox_test:a"}), 1);
  186 + check_sync(rdx.commandSync<nullptr_t>({"GET", "redox_test:a"}), nullptr);
187 187 rdx.disconnect();
188 188 }
189 189  
190 190 TEST_F(RedoxTest, IncrSync) {
191 191 int count = 100;
192 192 for(int i = 0; i < count; i++) {
193   - check_sync(rdx.commandSync<int>("INCR redox_test:a"), i+1);
  193 + check_sync(rdx.commandSync<int>({"INCR", "redox_test:a"}), i+1);
194 194 }
195   - print_and_check_sync(rdx.commandSync<string>("GET redox_test:a"), to_string(count));
  195 + print_and_check_sync(rdx.commandSync<string>({"GET", "redox_test:a"}), to_string(count));
196 196 rdx.disconnect();
197 197 }
198 198  
... ...