Commit c43f27479eb37068f0dd74f7ffec752749b9e4cd

Authored by Hayk Martirosyan
1 parent 375d845f

Overhaul callback system to be a better interface

Now, there is only one callback for command(), and it returns a const
reference to the Command object. The user is responsible for error
checking using c.ok(), c.status(), and getting the reply with c.reply().
This significantly cleans up the library code and the user code.

Greatly refactored the data type specialization code in command.cpp.
CMakeLists.txt
@@ -80,9 +80,6 @@ if (examples) @@ -80,9 +80,6 @@ if (examples)
80 add_executable(data_types examples/data_types.cpp ${SRC_ALL}) 80 add_executable(data_types examples/data_types.cpp ${SRC_ALL})
81 target_link_libraries(data_types ${LIB_REDIS}) 81 target_link_libraries(data_types ${LIB_REDIS})
82 82
83 - add_executable(string_v_char examples/string_vs_charp.cpp ${SRC_ALL})  
84 - target_link_libraries(string_v_char ${LIB_REDIS})  
85 -  
86 add_executable(multi_client examples/multi-client.cpp ${SRC_ALL}) 83 add_executable(multi_client examples/multi-client.cpp ${SRC_ALL})
87 target_link_libraries(multi_client ${LIB_REDIS}) 84 target_link_libraries(multi_client ${LIB_REDIS})
88 85
@@ -92,4 +89,7 @@ if (examples) @@ -92,4 +89,7 @@ if (examples)
92 add_executable(pub_sub examples/pub_sub.cpp ${SRC_ALL}) 89 add_executable(pub_sub examples/pub_sub.cpp ${SRC_ALL})
93 target_link_libraries(pub_sub ${LIB_REDIS}) 90 target_link_libraries(pub_sub ${LIB_REDIS})
94 91
  92 + add_executable(speed_test_pubsub examples/speed_test_pubsub ${SRC_ALL})
  93 + target_link_libraries(speed_test_pubsub ${LIB_REDIS})
  94 +
95 endif() 95 endif()
examples/basic.cpp
@@ -6,10 +6,13 @@ @@ -6,10 +6,13 @@
6 #include "../src/redox.hpp" 6 #include "../src/redox.hpp"
7 7
8 using namespace std; 8 using namespace std;
  9 +using redox::Redox;
  10 +using redox::Command;
9 11
10 int main(int argc, char* argv[]) { 12 int main(int argc, char* argv[]) {
11 13
12 - redox::Redox rdx = {"localhost", 6379}; // Initialize Redox 14 + Redox rdx = {"localhost", 6379, nullptr, cout, redox::log::Info}; // Initialize Redox
  15 +
13 if(!rdx.start()) return 1; // Start the event loop 16 if(!rdx.start()) return 1; // Start the event loop
14 17
15 rdx.del("occupation"); 18 rdx.del("occupation");
@@ -20,4 +23,5 @@ int main(int argc, char* argv[]) { @@ -20,4 +23,5 @@ int main(int argc, char* argv[]) {
20 cout << "key = \"occupation\", value = \"" << rdx.get("occupation") << "\"" << endl; 23 cout << "key = \"occupation\", value = \"" << rdx.get("occupation") << "\"" << endl;
21 24
22 rdx.stop(); // Shut down the event loop 25 rdx.stop(); // Shut down the event loop
  26 + return 0;
23 } 27 }
examples/basic_threaded.cpp
@@ -8,6 +8,8 @@ @@ -8,6 +8,8 @@
8 #include "../src/redox.hpp" 8 #include "../src/redox.hpp"
9 9
10 using namespace std; 10 using namespace std;
  11 +using redox::Redox;
  12 +using redox::Command;
11 13
12 redox::Redox rdx = {"localhost", 6379}; 14 redox::Redox rdx = {"localhost", 6379};
13 15
@@ -27,8 +29,8 @@ int main(int argc, char* argv[]) { @@ -27,8 +29,8 @@ int main(int argc, char* argv[]) {
27 for(int i = 0; i < 5; i++) { 29 for(int i = 0; i < 5; i++) {
28 rdx.command<string>( 30 rdx.command<string>(
29 "GET counter", 31 "GET counter",
30 - [](const string& cmd, const string& value) {  
31 - cout << cmd << ": " << value << endl; 32 + [](Command<string>& c) {
  33 + if(c.ok()) cout << c.cmd() << ": " << c.reply() << endl;
32 } 34 }
33 ); 35 );
34 this_thread::sleep_for(chrono::milliseconds(1000)); 36 this_thread::sleep_for(chrono::milliseconds(1000));
examples/binary_data.cpp
@@ -8,6 +8,8 @@ @@ -8,6 +8,8 @@
8 #include "../src/redox.hpp" 8 #include "../src/redox.hpp"
9 9
10 using namespace std; 10 using namespace std;
  11 +using redox::Redox;
  12 +using redox::Command;
11 13
12 /** 14 /**
13 * Random string generator. 15 * Random string generator.
@@ -27,18 +29,19 @@ int main(int argc, char* argv[]) { @@ -27,18 +29,19 @@ int main(int argc, char* argv[]) {
27 29
28 string binary_data = random_string(10000); 30 string binary_data = random_string(10000);
29 31
30 - auto c = rdx.command_blocking<string>("SET binary \"" + binary_data + "\"");  
31 - if(c->ok()) cout << "Reply: " << c->reply() << endl;  
32 - else cerr << "Failed to set key! Status: " << c->status() << endl;  
33 - c->free(); 32 + auto& c = rdx.command_blocking<string>("SET binary \"" + binary_data + "\"");
  33 + if(c.ok()) cout << "Reply: " << c.reply() << endl;
  34 + else cerr << "Failed to set key! Status: " << c.status() << endl;
  35 + c.free();
34 36
35 - c = rdx.command_blocking<string>("GET binary");  
36 - if(c->ok()) {  
37 - if(c->reply() == binary_data) cout << "Binary data matches!" << endl; 37 + auto& c2 = rdx.command_blocking<string>("GET binary");
  38 + if(c2.ok()) {
  39 + if(c2.reply() == binary_data) cout << "Binary data matches!" << endl;
38 else cerr << "Binary data differs!" << endl; 40 else cerr << "Binary data differs!" << endl;
39 } 41 }
40 - else cerr << "Failed to get key! Status: " << c->status() << endl;  
41 - c->free(); 42 + else cerr << "Failed to get key! Status: " << c2.status() << endl;
  43 + c2.free();
42 44
43 rdx.stop(); // Shut down the event loop 45 rdx.stop(); // Shut down the event loop
  46 + return 0;
44 } 47 }
examples/data_types.cpp
@@ -9,6 +9,8 @@ @@ -9,6 +9,8 @@
9 #include <vector> 9 #include <vector>
10 10
11 using namespace std; 11 using namespace std;
  12 +using redox::Redox;
  13 +using redox::Command;
12 14
13 int main(int argc, char* argv[]) { 15 int main(int argc, char* argv[]) {
14 16
@@ -20,39 +22,34 @@ int main(int argc, char* argv[]) { @@ -20,39 +22,34 @@ int main(int argc, char* argv[]) {
20 rdx.command_blocking("LPUSH mylist 1 2 3 4 5 6 7 8 9 10"); 22 rdx.command_blocking("LPUSH mylist 1 2 3 4 5 6 7 8 9 10");
21 23
22 rdx.command<vector<string>>("LRANGE mylist 0 4", 24 rdx.command<vector<string>>("LRANGE mylist 0 4",
23 - [](const string& cmd, const vector<string>& reply){ 25 + [](Command<vector<string>>& c){
  26 + if(!c.ok()) return;
24 cout << "Last 5 elements as a vector: "; 27 cout << "Last 5 elements as a vector: ";
25 - for(const string& s : reply) cout << s << " "; 28 + for (const string& s : c.reply()) cout << s << " ";
26 cout << endl; 29 cout << endl;
27 - },  
28 - [](const string& cmd, int status) {  
29 - cerr << "Error with LRANGE: " << status << endl;  
30 } 30 }
31 ); 31 );
32 32
33 rdx.command<unordered_set<string>>("LRANGE mylist 0 4", 33 rdx.command<unordered_set<string>>("LRANGE mylist 0 4",
34 - [](const string& cmd, const unordered_set<string>& reply){  
35 - cout << "Last 5 elements as an unordered set: ";  
36 - for(const string& s : reply) cout << s << " "; 34 + [](Command<unordered_set<string>>& c){
  35 + if(!c.ok()) return;
  36 + cout << "Last 5 elements as a hash: ";
  37 + for (const string& s : c.reply()) cout << s << " ";
37 cout << endl; 38 cout << endl;
38 - },  
39 - [](const string& cmd, int status) {  
40 - cerr << "Error with LRANGE: " << status << endl;  
41 } 39 }
42 ); 40 );
43 41
44 rdx.command<set<string>>("LRANGE mylist 0 4", 42 rdx.command<set<string>>("LRANGE mylist 0 4",
45 - [&rdx](const string& cmd, const set<string>& reply){  
46 - cout << "Last 5 elements as a set: ";  
47 - for(const string& s : reply) cout << s << " ";  
48 - cout << endl;  
49 - rdx.stop_signal();  
50 - },  
51 - [&rdx](const string& cmd, int status) {  
52 - cerr << "Error with LRANGE: " << status << endl; 43 + [&rdx](Command<set<string>>& c) {
  44 + if(c.ok()) {
  45 + cout << "Last 5 elements as a set: ";
  46 + for (const string& s : c.reply()) cout << s << " ";
  47 + cout << endl;
  48 + }
53 rdx.stop_signal(); 49 rdx.stop_signal();
54 } 50 }
55 ); 51 );
56 52
57 rdx.block(); // Shut down the event loop 53 rdx.block(); // Shut down the event loop
  54 + return 0;
58 } 55 }
examples/lpush_benchmark.cpp
@@ -6,6 +6,8 @@ @@ -6,6 +6,8 @@
6 #include "../src/redox.hpp" 6 #include "../src/redox.hpp"
7 7
8 using namespace std; 8 using namespace std;
  9 +using redox::Redox;
  10 +using redox::Command;
9 11
10 double time_s() { 12 double time_s() {
11 unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); 13 unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1);
@@ -26,12 +28,14 @@ int main(int argc, char* argv[]) { @@ -26,12 +28,14 @@ int main(int argc, char* argv[]) {
26 atomic_int count = {0}; 28 atomic_int count = {0};
27 29
28 for(int i = 1; i <= len; i++) { 30 for(int i = 1; i <= len; i++) {
29 - rdx.command<int>("lpush test 1", [&t0, &t1, &count, len, &rdx](const string& cmd, int reply) { 31 + rdx.command<int>("lpush test 1", [&t0, &t1, &count, len, &rdx](Command<int>& c) {
  32 +
  33 + if(!c.ok()) return;
30 34
31 count += 1; 35 count += 1;
32 36
33 if(count == len) { 37 if(count == len) {
34 - cout << cmd << ": " << reply << endl; 38 + cout << c.cmd() << ": " << c.reply() << endl;
35 39
36 double t2 = time_s(); 40 double t2 = time_s();
37 cout << "Time to queue async commands: " << t1 - t0 << "s" << endl; 41 cout << "Time to queue async commands: " << t1 - t0 << "s" << endl;
examples/multi-client.cpp
@@ -6,6 +6,8 @@ @@ -6,6 +6,8 @@
6 #include "../src/redox.hpp" 6 #include "../src/redox.hpp"
7 7
8 using namespace std; 8 using namespace std;
  9 +using redox::Redox;
  10 +using redox::Command;
9 11
10 int main(int argc, char* argv[]) { 12 int main(int argc, char* argv[]) {
11 13
@@ -23,4 +25,6 @@ int main(int argc, char* argv[]) { @@ -23,4 +25,6 @@ int main(int argc, char* argv[]) {
23 rdx1.stop(); 25 rdx1.stop();
24 rdx2.stop(); 26 rdx2.stop();
25 rdx3.stop(); 27 rdx3.stop();
  28 +
  29 + return 0;
26 } 30 }
examples/pub_sub.cpp
1 -#include <stdio.h>  
2 #include <stdlib.h> 1 #include <stdlib.h>
3 -#include <string.h>  
4 -#include <signal.h>  
5 -#include "hiredis/hiredis.h"  
6 -#include "hiredis/async.h"  
7 -#include "hiredis/adapters/libev.h"  
8 #include <iostream> 2 #include <iostream>
9 #include "../src/redox.hpp" 3 #include "../src/redox.hpp"
10 4
@@ -55,4 +49,6 @@ int main(int argc, char *argv[]) { @@ -55,4 +49,6 @@ int main(int argc, char *argv[]) {
55 rdx_pub.publish("news", "whatup"); 49 rdx_pub.publish("news", "whatup");
56 rdx.block(); 50 rdx.block();
57 rdx_pub.block(); 51 rdx_pub.block();
  52 +
  53 + return 0;
58 } 54 }
examples/speed_test_async.cpp
@@ -8,7 +8,8 @@ @@ -8,7 +8,8 @@
8 #include "../src/redox.hpp" 8 #include "../src/redox.hpp"
9 9
10 using namespace std; 10 using namespace std;
11 -using namespace redox; 11 +using redox::Redox;
  12 +using redox::Command;
12 13
13 double time_s() { 14 double time_s() {
14 unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); 15 unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1);
@@ -17,7 +18,7 @@ double time_s() { @@ -17,7 +18,7 @@ double time_s() {
17 18
18 int main(int argc, char* argv[]) { 19 int main(int argc, char* argv[]) {
19 20
20 - Redox rdx = {"localhost", 6379}; 21 + Redox rdx = {"/var/run/redis/redis.sock", nullptr};
21 if(!rdx.start()) return 1; 22 if(!rdx.start()) return 1;
22 23
23 if(rdx.command_blocking("SET simple_loop:count 0")) { 24 if(rdx.command_blocking("SET simple_loop:count 0")) {
@@ -38,31 +39,36 @@ int main(int argc, char* argv[]) { @@ -38,31 +39,36 @@ int main(int argc, char* argv[]) {
38 double t0 = time_s(); 39 double t0 = time_s();
39 atomic_int count(0); 40 atomic_int count(0);
40 41
41 - Command<int>* c = rdx.command<int>( 42 + Command<int>& cmd = rdx.command<int>(
42 cmd_str, 43 cmd_str,
43 - [&count, &rdx](const string &cmd, const int& value) { count++; },  
44 - [](const string& cmd, int status) { cerr << "Bad reply: " << status << endl; }, 44 + [&count, &rdx](Command<int>& c) {
  45 + if(!c.ok()) {
  46 + cerr << "Bad reply: " << c.status() << endl;
  47 + }
  48 + count++;
  49 + },
45 dt 50 dt
46 ); 51 );
47 52
48 // Wait for t time, then stop the command. 53 // Wait for t time, then stop the command.
49 this_thread::sleep_for(chrono::microseconds((int)(t*1e6))); 54 this_thread::sleep_for(chrono::microseconds((int)(t*1e6)));
50 - c->cancel(); 55 + cmd.cancel();
51 56
52 - // Get the final value of the counter  
53 - auto get_cmd = rdx.command_blocking<string>("GET simple_loop:count");  
54 - long final_count = stol(get_cmd->reply());  
55 - get_cmd->free(); 57 + rdx.command<string>("GET simple_loop:count", [&](Command<string>& c) {
  58 + if(!c.ok()) return;
  59 + long final_count = stol(c.reply());
56 60
57 - rdx.stop(); 61 + double t_elapsed = time_s() - t0;
  62 + double actual_freq = (double)count / t_elapsed;
58 63
59 - double t_elapsed = time_s() - t0;  
60 - double actual_freq = (double)count / t_elapsed; 64 + cout << "Sent " << count << " commands in " << t_elapsed << "s, "
  65 + << "that's " << actual_freq << " commands/s." << endl;
61 66
62 - cout << "Sent " << count << " commands in " << t_elapsed << "s, "  
63 - << "that's " << actual_freq << " commands/s." << endl; 67 + cout << "Final value of counter: " << final_count << endl;
64 68
65 - cout << "Final value of counter: " << final_count << endl; 69 + rdx.stop_signal();
  70 + });
66 71
  72 + rdx.block();
67 return 0; 73 return 0;
68 } 74 }
examples/speed_test_async_multi.cpp
@@ -9,7 +9,8 @@ @@ -9,7 +9,8 @@
9 #include "../src/redox.hpp" 9 #include "../src/redox.hpp"
10 10
11 using namespace std; 11 using namespace std;
12 -using namespace redox; 12 +using redox::Redox;
  13 +using redox::Command;
13 14
14 double time_s() { 15 double time_s() {
15 unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); 16 unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1);
@@ -42,26 +43,30 @@ int main(int argc, char* argv[]) { @@ -42,26 +43,30 @@ int main(int argc, char* argv[]) {
42 43
43 vector<Command<int>*> commands; 44 vector<Command<int>*> commands;
44 for(int i = 0; i < parallel; i++) { 45 for(int i = 0; i < parallel; i++) {
45 - commands.push_back(rdx.command<int>( 46 + commands.push_back(&rdx.command<int>(
46 cmd_str, 47 cmd_str,
47 - [&count, &rdx](const string &cmd, const int& value) { count++; },  
48 - [](const string& cmd, int status) { cerr << "Bad reply: " << status << endl; }, 48 + [&count, &rdx](Command<int>& c) {
  49 + if(!c.ok()) {
  50 + cerr << "Bad reply: " << c.status() << endl;
  51 + }
  52 + count++;
  53 + },
49 dt 54 dt
50 )); 55 ));
51 } 56 }
52 57
53 // Wait for t time, then stop the command. 58 // Wait for t time, then stop the command.
54 this_thread::sleep_for(chrono::microseconds((int)(t*1e6))); 59 this_thread::sleep_for(chrono::microseconds((int)(t*1e6)));
55 - for(auto c : commands) c->cancel(); 60 + for(auto& c : commands) c->cancel();
  61 +
  62 + double t_elapsed = time_s() - t0;
  63 + double actual_freq = (double)count / t_elapsed;
56 64
57 // Get the final value of the counter 65 // Get the final value of the counter
58 long final_count = stol(rdx.get("simple_loop:count")); 66 long final_count = stol(rdx.get("simple_loop:count"));
59 67
60 rdx.stop(); 68 rdx.stop();
61 69
62 - double t_elapsed = time_s() - t0;  
63 - double actual_freq = (double)count / t_elapsed;  
64 -  
65 cout << "Sent " << count << " commands in " << t_elapsed << "s, " 70 cout << "Sent " << count << " commands in " << t_elapsed << "s, "
66 << "that's " << actual_freq << " commands/s." << endl; 71 << "that's " << actual_freq << " commands/s." << endl;
67 72
examples/speed_test_pubsub.cpp 0 โ†’ 100644
  1 +#include <iostream>
  2 +#include "../src/redox.hpp"
  3 +
  4 +using namespace std;
  5 +using redox::Redox;
  6 +using redox::Command;
  7 +
  8 +double time_s() {
  9 + unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1);
  10 + return (double)ms / 1e6;
  11 +}
  12 +
  13 +int main(int argc, char *argv[]) {
  14 +
  15 + Redox rdx_pub;
  16 + Redox rdx_sub;
  17 +
  18 + if(!rdx_pub.start()) return 1;
  19 + if(!rdx_sub.start()) return 1;
  20 +
  21 + atomic_int count(0);
  22 + auto got_message = [&count](const string& topic, const string& msg) {
  23 + count += 1;
  24 + };
  25 +
  26 + auto subscribed = [](const string& topic) {
  27 +
  28 + };
  29 +
  30 + auto unsubscribed = [](const string& topic) {
  31 + cout << "> Unsubscribed from " << topic << endl;
  32 + };
  33 +
  34 + rdx_sub.subscribe("speedtest", got_message, subscribed, unsubscribed);
  35 +
  36 + double t0 = time_s();
  37 + double t1 = t0;
  38 + double tspan = 5;
  39 +
  40 + while(t1 - t0 < tspan) {
  41 + rdx_pub.publish("speedtest", "hello");
  42 + t1 = time_s();
  43 + }
  44 + this_thread::sleep_for(chrono::milliseconds(1000));
  45 + rdx_pub.stop();
  46 + rdx_sub.stop();
  47 +
  48 + double t = t1 - t0;
  49 + cout << "Total of messages sent in " << t << "s is " << count << endl;
  50 + double msg_per_s = count / t;
  51 + cout << "Messages per second: " << msg_per_s << endl;
  52 +}
examples/speed_test_sync.cpp
@@ -37,21 +37,19 @@ int main(int argc, char* argv[]) { @@ -37,21 +37,19 @@ int main(int argc, char* argv[]) {
37 int count = 0; 37 int count = 0;
38 38
39 while(time_s() < t_end) { 39 while(time_s() < t_end) {
40 - Command<int>* c = rdx.command_blocking<int>(cmd_str);  
41 - if(!c->ok()) cerr << "Bad reply, code: " << c->status() << endl;  
42 - c->free(); 40 + Command<int>& c = rdx.command_blocking<int>(cmd_str);
  41 + if(!c.ok()) cerr << "Bad reply, code: " << c.status() << endl;
  42 + c.free();
43 count++; 43 count++;
44 } 44 }
45 45
46 - auto get_cmd = rdx.command_blocking<string>("GET simple_loop:count");  
47 - long final_count = stol(get_cmd->reply());  
48 - get_cmd->free();  
49 -  
50 - rdx.stop();  
51 -  
52 double t_elapsed = time_s() - t0; 46 double t_elapsed = time_s() - t0;
53 double actual_freq = (double)count / t_elapsed; 47 double actual_freq = (double)count / t_elapsed;
54 48
  49 + long final_count = stol(rdx.get("simple_loop:count"));
  50 +
  51 + rdx.stop();
  52 +
55 cout << "Sent " << count << " commands in " << t_elapsed << "s, " 53 cout << "Sent " << count << " commands in " << t_elapsed << "s, "
56 << "that's " << actual_freq << " commands/s." << endl; 54 << "that's " << actual_freq << " commands/s." << endl;
57 55
examples/string_vs_charp.cpp deleted
1 -/**  
2 -* Redox test  
3 -* ----------  
4 -* Increment a key on Redis using an asynchronous command on a timer.  
5 -*/  
6 -  
7 -#include <iostream>  
8 -#include "../src/redox.hpp"  
9 -  
10 -using namespace std;  
11 -using namespace redox;  
12 -  
13 -double time_s() {  
14 - unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1);  
15 - return (double)ms / 1e6;  
16 -}  
17 -  
18 -int main(int argc, char* argv[]) {  
19 -  
20 - Redox rdx;  
21 - if(!rdx.start()) return 1;  
22 -  
23 - rdx.del("stringtest");  
24 - rdx.set("stringtest", "value");  
25 -  
26 - int count = 1000000;  
27 - double t0 = time_s();  
28 -  
29 - string cmd_str = "GET stringtest";  
30 - for(int i = 0; i < count; i++) {  
31 - rdx.command<string>(  
32 - cmd_str,  
33 - [](const string &cmd, string const& value) {  
34 - value;  
35 - },  
36 - [](const string &cmd, int status) {  
37 - cerr << "Bad reply: " << status << endl;  
38 - }  
39 - );  
40 - }  
41 -  
42 - rdx.stop();  
43 -  
44 - double t_elapsed = time_s() - t0;  
45 -  
46 - cout << "Sent " << count << " commands in " << t_elapsed << "s." << endl;  
47 -  
48 - return 0;  
49 -}  
src/command.cpp
@@ -18,11 +18,10 @@ Command&lt;ReplyT&gt;::Command( @@ -18,11 +18,10 @@ Command&lt;ReplyT&gt;::Command(
18 Redox* rdx, 18 Redox* rdx,
19 long id, 19 long id,
20 const std::string& cmd, 20 const std::string& cmd,
21 - const std::function<void(const std::string&, const ReplyT&)>& callback,  
22 - const std::function<void(const std::string&, int status)>& error_callback, 21 + const std::function<void(Command<ReplyT>&)>& callback,
23 double repeat, double after, bool free_memory, log::Logger& logger 22 double repeat, double after, bool free_memory, log::Logger& logger
24 ) : rdx_(rdx), id_(id), cmd_(cmd), repeat_(repeat), after_(after), free_memory_(free_memory), 23 ) : rdx_(rdx), id_(id), cmd_(cmd), repeat_(repeat), after_(after), free_memory_(free_memory),
25 - success_callback_(callback), error_callback_(error_callback), logger_(logger) { 24 + callback_(callback), logger_(logger) {
26 timer_guard_.lock(); 25 timer_guard_.lock();
27 } 26 }
28 27
@@ -32,7 +31,8 @@ void Command&lt;ReplyT&gt;::processReply(redisReply* r) { @@ -32,7 +31,8 @@ void Command&lt;ReplyT&gt;::processReply(redisReply* r) {
32 free_guard_.lock(); 31 free_guard_.lock();
33 32
34 reply_obj_ = r; 33 reply_obj_ = r;
35 - handleCallback(); 34 + parseReplyObject();
  35 + invoke();
36 36
37 pending_--; 37 pending_--;
38 38
@@ -96,7 +96,7 @@ void Command&lt;ReplyT&gt;::freeCommand(Command&lt;ReplyT&gt;* c) { @@ -96,7 +96,7 @@ void Command&lt;ReplyT&gt;::freeCommand(Command&lt;ReplyT&gt;* c) {
96 96
97 97
98 template<class ReplyT> 98 template<class ReplyT>
99 -const ReplyT& Command<ReplyT>::reply() { 99 +const ReplyT& Command<ReplyT>::reply() const {
100 if (!ok()) { 100 if (!ok()) {
101 logger_.warning() << cmd_ << ": Accessing value of reply with status != OK."; 101 logger_.warning() << cmd_ << ": Accessing value of reply with status != OK.";
102 } 102 }
@@ -104,179 +104,133 @@ const ReplyT&amp; Command&lt;ReplyT&gt;::reply() { @@ -104,179 +104,133 @@ const ReplyT&amp; Command&lt;ReplyT&gt;::reply() {
104 } 104 }
105 105
106 template<class ReplyT> 106 template<class ReplyT>
107 -bool Command<ReplyT>::isErrorReply() { 107 +bool Command<ReplyT>::isExpectedReply(int type) {
  108 +
  109 + if(reply_obj_->type == type) {
  110 + reply_status_ = OK_REPLY;
  111 + return true;
  112 + }
  113 +
  114 + if(checkErrorReply() || checkNilReply()) return false;
  115 +
  116 + logger_.error() << cmd_ << ": Received reply of type " << reply_obj_->type
  117 + << ", expected type " << type << ".";
  118 + reply_status_ = WRONG_TYPE;
  119 + return false;
  120 +}
  121 +
  122 +template<class ReplyT>
  123 +bool Command<ReplyT>::isExpectedReply(int typeA, int typeB) {
  124 +
  125 + if((reply_obj_->type == typeA) || (reply_obj_->type == typeB)) {
  126 + reply_status_ = OK_REPLY;
  127 + return true;
  128 + }
  129 +
  130 + if(checkErrorReply() || checkNilReply()) return false;
  131 +
  132 + logger_.error() << cmd_ << ": Received reply of type " << reply_obj_->type
  133 + << ", expected type " << typeA << " or " << typeB << ".";
  134 + reply_status_ = WRONG_TYPE;
  135 + return false;
  136 +}
  137 +
  138 +template<class ReplyT>
  139 +bool Command<ReplyT>::checkErrorReply() {
108 140
109 if (reply_obj_->type == REDIS_REPLY_ERROR) { 141 if (reply_obj_->type == REDIS_REPLY_ERROR) {
110 logger_.error() << cmd_ << ": " << reply_obj_->str; 142 logger_.error() << cmd_ << ": " << reply_obj_->str;
  143 + reply_status_ = ERROR_REPLY;
111 return true; 144 return true;
112 } 145 }
113 return false; 146 return false;
114 } 147 }
115 148
116 template<class ReplyT> 149 template<class ReplyT>
117 -bool Command<ReplyT>::isNilReply() { 150 +bool Command<ReplyT>::checkNilReply() {
118 151
119 if (reply_obj_->type == REDIS_REPLY_NIL) { 152 if (reply_obj_->type == REDIS_REPLY_NIL) {
120 logger_.warning() << cmd_ << ": Nil reply."; 153 logger_.warning() << cmd_ << ": Nil reply.";
  154 + reply_status_ = NIL_REPLY;
121 return true; 155 return true;
122 } 156 }
123 return false; 157 return false;
124 } 158 }
125 159
126 // ---------------------------------------------------------------------------- 160 // ----------------------------------------------------------------------------
127 -// Specializations of handleCallback for all data types 161 +// Specializations of parseReplyObject for all expected return types
128 // ---------------------------------------------------------------------------- 162 // ----------------------------------------------------------------------------
129 163
130 template<> 164 template<>
131 -void Command<redisReply*>::handleCallback() {  
132 - invokeSuccess(reply_obj_); 165 +void Command<redisReply*>::parseReplyObject() {
  166 + reply_val_ = reply_obj_;
133 } 167 }
134 168
135 template<> 169 template<>
136 -void Command<string>::handleCallback() {  
137 -  
138 - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY);  
139 - else if(isNilReply()) invokeError(REDOX_NIL_REPLY); 170 +void Command<string>::parseReplyObject() {
140 171
141 - else if(reply_obj_->type != REDIS_REPLY_STRING && reply_obj_->type != REDIS_REPLY_STATUS) {  
142 - logger_.error() << cmd_ << ": Received non-string reply.";  
143 - invokeError(REDOX_WRONG_TYPE);  
144 -  
145 - } else {  
146 - string s(reply_obj_->str, reply_obj_->len);  
147 - invokeSuccess(s);  
148 - } 172 + if(!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) return;
  173 + reply_val_ = {reply_obj_->str, static_cast<size_t>(reply_obj_->len)};
149 } 174 }
150 175
151 template<> 176 template<>
152 -void Command<char*>::handleCallback() { 177 +void Command<char*>::parseReplyObject() {
153 178
154 - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY);  
155 - else if(isNilReply()) invokeError(REDOX_NIL_REPLY); 179 + if(!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) return;
  180 + reply_val_ = reply_obj_->str;
  181 +}
156 182
157 - else if(reply_obj_->type != REDIS_REPLY_STRING && reply_obj_->type != REDIS_REPLY_STATUS) {  
158 - logger_.error() << cmd_ << ": Received non-string reply.";  
159 - invokeError(REDOX_WRONG_TYPE); 183 +template<>
  184 +void Command<int>::parseReplyObject() {
160 185
161 - } else {  
162 - invokeSuccess(reply_obj_->str);  
163 - } 186 + if(!isExpectedReply(REDIS_REPLY_INTEGER)) return;
  187 + reply_val_ = (int) reply_obj_->integer;
164 } 188 }
165 189
166 template<> 190 template<>
167 -void Command<int>::handleCallback() { 191 +void Command<long long int>::parseReplyObject() {
168 192
169 - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY);  
170 - else if(isNilReply()) invokeError(REDOX_NIL_REPLY); 193 + if(!isExpectedReply(REDIS_REPLY_INTEGER)) return;
  194 + reply_val_ = reply_obj_->integer;
  195 +}
171 196
172 - else if(reply_obj_->type != REDIS_REPLY_INTEGER) {  
173 - logger_.error() << cmd_ << ": Received non-integer reply.";  
174 - invokeError(REDOX_WRONG_TYPE); 197 +template<>
  198 +void Command<nullptr_t>::parseReplyObject() {
175 199
176 - } else {  
177 - invokeSuccess((int) reply_obj_->integer);  
178 - } 200 + if(!isExpectedReply(REDIS_REPLY_NIL)) return;
  201 + reply_val_ = nullptr;
179 } 202 }
180 203
181 template<> 204 template<>
182 -void Command<long long int>::handleCallback() { 205 +void Command<vector<string>>::parseReplyObject() {
183 206
184 - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY);  
185 - else if(isNilReply()) invokeError(REDOX_NIL_REPLY); 207 + if(!isExpectedReply(REDIS_REPLY_ARRAY)) return;
186 208
187 - else if(reply_obj_->type != REDIS_REPLY_INTEGER) {  
188 - logger_.error() << cmd_ << ": Received non-integer reply.";  
189 - invokeError(REDOX_WRONG_TYPE);  
190 -  
191 - } else {  
192 - invokeSuccess(reply_obj_->integer); 209 + for(size_t i = 0; i < reply_obj_->elements; i++) {
  210 + redisReply* r = *(reply_obj_->element + i);
  211 + reply_val_.emplace_back(r->str, r->len);
193 } 212 }
194 } 213 }
195 214
196 template<> 215 template<>
197 -void Command<nullptr_t>::handleCallback() {  
198 -  
199 - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); 216 +void Command<unordered_set<string>>::parseReplyObject() {
200 217
201 - else if(reply_obj_->type != REDIS_REPLY_NIL) {  
202 - logger_.error() << cmd_ << ": Received non-nil reply.";  
203 - invokeError(REDOX_WRONG_TYPE); 218 + if(!isExpectedReply(REDIS_REPLY_ARRAY)) return;
204 219
205 - } else {  
206 - invokeSuccess(nullptr); 220 + for(size_t i = 0; i < reply_obj_->elements; i++) {
  221 + redisReply* r = *(reply_obj_->element + i);
  222 + reply_val_.emplace(r->str, r->len);
207 } 223 }
208 } 224 }
209 225
210 -  
211 template<> 226 template<>
212 -void Command<vector<string>>::handleCallback() {  
213 -  
214 - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY);  
215 -  
216 - else if(reply_obj_->type != REDIS_REPLY_ARRAY) {  
217 - logger_.error() << cmd_ << ": Received non-array reply.";  
218 - invokeError(REDOX_WRONG_TYPE);  
219 -  
220 - } else {  
221 - vector<string> v;  
222 - size_t count = reply_obj_->elements;  
223 - for(size_t i = 0; i < count; i++) {  
224 - redisReply* r = *(reply_obj_->element + i);  
225 - if(r->type != REDIS_REPLY_STRING) {  
226 - logger_.error() << cmd_ << ": Received non-array reply.";  
227 - invokeError(REDOX_WRONG_TYPE);  
228 - }  
229 - v.emplace_back(r->str, r->len);  
230 - }  
231 - invokeSuccess(v);  
232 - }  
233 -} 227 +void Command<set<string>>::parseReplyObject() {
234 228
235 -template<>  
236 -void Command<unordered_set<string>>::handleCallback() {  
237 -  
238 - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY);  
239 -  
240 - else if(reply_obj_->type != REDIS_REPLY_ARRAY) {  
241 - logger_.error() << cmd_ << ": Received non-array reply.";  
242 - invokeError(REDOX_WRONG_TYPE);  
243 -  
244 - } else {  
245 - unordered_set<string> v;  
246 - size_t count = reply_obj_->elements;  
247 - for(size_t i = 0; i < count; i++) {  
248 - redisReply* r = *(reply_obj_->element + i);  
249 - if(r->type != REDIS_REPLY_STRING) {  
250 - logger_.error() << cmd_ << ": Received non-array reply.";  
251 - invokeError(REDOX_WRONG_TYPE);  
252 - }  
253 - v.emplace(r->str, r->len);  
254 - }  
255 - invokeSuccess(v);  
256 - }  
257 -} 229 + if(!isExpectedReply(REDIS_REPLY_ARRAY)) return;
258 230
259 -template<>  
260 -void Command<set<string>>::handleCallback() {  
261 -  
262 - if(isErrorReply()) invokeError(REDOX_ERROR_REPLY);  
263 -  
264 - else if(reply_obj_->type != REDIS_REPLY_ARRAY) {  
265 - logger_.error() << cmd_ << ": Received non-array reply.";  
266 - invokeError(REDOX_WRONG_TYPE);  
267 -  
268 - } else {  
269 - set<string> v;  
270 - size_t count = reply_obj_->elements;  
271 - for(size_t i = 0; i < count; i++) {  
272 - redisReply* r = *(reply_obj_->element + i);  
273 - if(r->type != REDIS_REPLY_STRING) {  
274 - logger_.error() << cmd_ << ": Received non-array reply.";  
275 - invokeError(REDOX_WRONG_TYPE);  
276 - }  
277 - v.emplace(r->str, r->len);  
278 - }  
279 - invokeSuccess(v); 231 + for(size_t i = 0; i < reply_obj_->elements; i++) {
  232 + redisReply* r = *(reply_obj_->element + i);
  233 + reply_val_.emplace(r->str, r->len);
280 } 234 }
281 } 235 }
282 236
src/command.hpp
@@ -16,21 +16,33 @@ @@ -16,21 +16,33 @@
16 16
17 namespace redox { 17 namespace redox {
18 18
19 -static const int REDOX_UNINIT = -1;  
20 -static const int REDOX_OK = 0;  
21 -static const int REDOX_SEND_ERROR = 1;  
22 -static const int REDOX_WRONG_TYPE = 2;  
23 -static const int REDOX_NIL_REPLY = 3;  
24 -static const int REDOX_ERROR_REPLY = 4;  
25 -static const int REDOX_TIMEOUT = 5;  
26 -  
27 class Redox; 19 class Redox;
  20 +//class Command;
  21 +
  22 +//template <typename ReplyT>
  23 +//using CallbackT = std::function<void(Command<ReplyT>&)>;
28 24
  25 +/**
  26 +* The Command class represents a single command string to be sent to
  27 +* a Redis server, for both synchronous and asynchronous usage. It manages
  28 +* all of the state relevant to a single command string. A Command can also
  29 +* represent a deferred or looping command, in which case the success or
  30 +* error callbacks are invoked more than once.
  31 +*/
29 template<class ReplyT> 32 template<class ReplyT>
30 class Command { 33 class Command {
31 34
32 public: 35 public:
33 36
  37 + // Reply codes
  38 + static const int NO_REPLY = -1; // No reply yet
  39 + static const int OK_REPLY = 0; // Successful reply of the expected type
  40 + static const int NIL_REPLY = 1; // Got a nil reply
  41 + static const int ERROR_REPLY = 2; // Got an error reply
  42 + static const int SEND_ERROR = 3; // Could not send to server
  43 + static const int WRONG_TYPE = 4; // Got reply, but it was not the expected type
  44 + static const int TIMEOUT = 5; // No reply, timed out
  45 +
34 /** 46 /**
35 * Frees memory allocated by this command. Commands with free_memory = false 47 * Frees memory allocated by this command. Commands with free_memory = false
36 * must be freed by the user. 48 * must be freed by the user.
@@ -45,25 +57,27 @@ public: @@ -45,25 +57,27 @@ public:
45 /** 57 /**
46 * Returns true if the command has been canceled. 58 * Returns true if the command has been canceled.
47 */ 59 */
48 - bool canceled() { return canceled_; } 60 + bool canceled() const { return canceled_; }
49 61
50 /** 62 /**
51 * Returns the reply status of this command. 63 * Returns the reply status of this command.
52 * Use ONLY with command_blocking. 64 * Use ONLY with command_blocking.
53 */ 65 */
54 - int status() { return reply_status_; }; 66 + int status() const { return reply_status_; };
55 67
56 /** 68 /**
57 * Returns true if this command got a successful reply. 69 * Returns true if this command got a successful reply.
58 * Use ONLY with command_blocking. 70 * Use ONLY with command_blocking.
59 */ 71 */
60 - bool ok() { return reply_status_ == REDOX_OK; } 72 + bool ok() const { return reply_status_ == OK_REPLY; }
61 73
62 /** 74 /**
63 * Returns the reply value, if the reply was successful (ok() == true). 75 * Returns the reply value, if the reply was successful (ok() == true).
64 * Use ONLY with command_blocking. 76 * Use ONLY with command_blocking.
65 */ 77 */
66 - const ReplyT& reply(); 78 + const ReplyT& reply() const;
  79 +
  80 + const std::string& cmd() const { return cmd_; };
67 81
68 // Allow public access to constructed data 82 // Allow public access to constructed data
69 Redox* const rdx_; 83 Redox* const rdx_;
@@ -79,8 +93,7 @@ private: @@ -79,8 +93,7 @@ private:
79 Redox* rdx, 93 Redox* rdx,
80 long id, 94 long id,
81 const std::string& cmd, 95 const std::string& cmd,
82 - const std::function<void(const std::string&, const ReplyT&)>& callback,  
83 - const std::function<void(const std::string&, int status)>& error_callback, 96 + const std::function<void(Command<ReplyT>&)>& callback,
84 double repeat, double after, 97 double repeat, double after,
85 bool free_memory, 98 bool free_memory,
86 log::Logger& logger 99 log::Logger& logger
@@ -91,14 +104,15 @@ private: @@ -91,14 +104,15 @@ private:
91 104
92 // Invoke a user callback from the reply object. This method is specialized 105 // Invoke a user callback from the reply object. This method is specialized
93 // for each ReplyT of Command. 106 // for each ReplyT of Command.
94 - void handleCallback(); 107 + void parseReplyObject();
95 108
96 - // Directly invoke the user callbacks if the exist  
97 - void invokeSuccess(const ReplyT& reply) { if (success_callback_) success_callback_(cmd_, reply); }  
98 - void invokeError(int status) { if (error_callback_) error_callback_(cmd_, status); } 109 + // Directly invoke the user callback if it exists
  110 + void invoke() { if(callback_) callback_(*this); }
99 111
100 - bool isErrorReply();  
101 - bool isNilReply(); 112 + bool checkErrorReply();
  113 + bool checkNilReply();
  114 + bool isExpectedReply(int type);
  115 + bool isExpectedReply(int typeA, int typeB);
102 116
103 // Delete the provided Command object and deregister as an active 117 // Delete the provided Command object and deregister as an active
104 // command from its Redox instance. 118 // command from its Redox instance.
@@ -110,9 +124,8 @@ private: @@ -110,9 +124,8 @@ private:
110 // The last server reply 124 // The last server reply
111 redisReply* reply_obj_ = nullptr; 125 redisReply* reply_obj_ = nullptr;
112 126
113 - // Callbacks on success and error  
114 - const std::function<void(const std::string&, const ReplyT&)> success_callback_;  
115 - const std::function<void(const std::string&, int status)> error_callback_; 127 + // User callback
  128 + const std::function<void(Command<ReplyT>&)> callback_;
116 129
117 // Place to store the reply value and status. 130 // Place to store the reply value and status.
118 // ONLY for blocking commands 131 // ONLY for blocking commands
src/redox.cpp
@@ -272,7 +272,8 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) { @@ -272,7 +272,8 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) {
272 string value = c->cmd_.substr(first+1, last-first-1); 272 string value = c->cmd_.substr(first+1, last-first-1);
273 if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id_, format.c_str(), value.c_str(), value.size()) != REDIS_OK) { 273 if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id_, format.c_str(), value.c_str(), value.size()) != REDIS_OK) {
274 rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr; 274 rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr;
275 - c->invokeError(REDOX_SEND_ERROR); 275 + c->reply_status_ = Command<ReplyT>::SEND_ERROR;
  276 + c->invoke();
276 return false; 277 return false;
277 } 278 }
278 return true; 279 return true;
@@ -281,7 +282,8 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) { @@ -281,7 +282,8 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) {
281 282
282 if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id_, c->cmd_.c_str()) != REDIS_OK) { 283 if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id_, c->cmd_.c_str()) != REDIS_OK) {
283 rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr; 284 rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr;
284 - c->invokeError(REDOX_SEND_ERROR); 285 + c->reply_status_ = Command<ReplyT>::SEND_ERROR;
  286 + c->invoke();
285 return false; 287 return false;
286 } 288 }
287 289
@@ -379,7 +381,14 @@ void Redox::subscribe_raw(const string cmd_name, const string topic, @@ -379,7 +381,14 @@ void Redox::subscribe_raw(const string cmd_name, const string topic,
379 pubsub_mode = true; 381 pubsub_mode = true;
380 382
381 command<redisReply*>(cmd_name + " " + topic, 383 command<redisReply*>(cmd_name + " " + topic,
382 - [this, topic, msg_callback, sub_callback, unsub_callback](const string &cmd, redisReply* const& reply) { 384 + [this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command<redisReply*>& c) {
  385 +
  386 + if(!c.ok()) {
  387 + if(err_callback) err_callback(topic, c.status());
  388 + return;
  389 + }
  390 +
  391 + redisReply* reply = c.reply();
383 392
384 // For debugging only 393 // For debugging only
385 // cout << "------" << endl; 394 // cout << "------" << endl;
@@ -398,19 +407,19 @@ void Redox::subscribe_raw(const string cmd_name, const string topic, @@ -398,19 +407,19 @@ void Redox::subscribe_raw(const string cmd_name, const string topic,
398 (reply->element[reply->elements-1]->type == REDIS_REPLY_INTEGER)) { 407 (reply->element[reply->elements-1]->type == REDIS_REPLY_INTEGER)) {
399 408
400 if(!strncmp(reply->element[0]->str, "sub", 3)) { 409 if(!strncmp(reply->element[0]->str, "sub", 3)) {
401 - sub_queue.insert(topic); 410 + subscribed_topics_.insert(topic);
402 if(sub_callback) sub_callback(topic); 411 if(sub_callback) sub_callback(topic);
403 412
404 } else if(!strncmp(reply->element[0]->str, "psub", 4)) { 413 } else if(!strncmp(reply->element[0]->str, "psub", 4)) {
405 - psub_queue.insert(topic); 414 + psubscribed_topics_.insert(topic);
406 if (sub_callback) sub_callback(topic); 415 if (sub_callback) sub_callback(topic);
407 416
408 } else if(!strncmp(reply->element[0]->str, "uns", 3)) { 417 } else if(!strncmp(reply->element[0]->str, "uns", 3)) {
409 - sub_queue.erase(topic); 418 + subscribed_topics_.erase(topic);
410 if (unsub_callback) unsub_callback(topic); 419 if (unsub_callback) unsub_callback(topic);
411 420
412 } else if(!strncmp(reply->element[0]->str, "puns", 4)) { 421 } else if(!strncmp(reply->element[0]->str, "puns", 4)) {
413 - psub_queue.erase(topic); 422 + psubscribed_topics_.erase(topic);
414 if (unsub_callback) unsub_callback(topic); 423 if (unsub_callback) unsub_callback(topic);
415 } 424 }
416 425
@@ -431,9 +440,6 @@ void Redox::subscribe_raw(const string cmd_name, const string topic, @@ -431,9 +440,6 @@ void Redox::subscribe_raw(const string cmd_name, const string topic,
431 440
432 else logger.error() << "Unknown pubsub message of type " << reply->type; 441 else logger.error() << "Unknown pubsub message of type " << reply->type;
433 }, 442 },
434 - [topic, err_callback](const string &cmd, int status) {  
435 - if(err_callback) err_callback(topic, status);  
436 - },  
437 1e10 // To keep the command around for a few hundred years 443 1e10 // To keep the command around for a few hundred years
438 ); 444 );
439 } 445 }
@@ -444,7 +450,7 @@ void Redox::subscribe(const string topic, @@ -444,7 +450,7 @@ void Redox::subscribe(const string topic,
444 function<void(const string&)> unsub_callback, 450 function<void(const string&)> unsub_callback,
445 function<void(const string&, int)> err_callback 451 function<void(const string&, int)> err_callback
446 ) { 452 ) {
447 - if(sub_queue.find(topic) != sub_queue.end()) { 453 + if(subscribed_topics_.find(topic) != subscribed_topics_.end()) {
448 logger.warning() << "Already subscribed to " << topic << "!"; 454 logger.warning() << "Already subscribed to " << topic << "!";
449 return; 455 return;
450 } 456 }
@@ -457,7 +463,7 @@ void Redox::psubscribe(const string topic, @@ -457,7 +463,7 @@ void Redox::psubscribe(const string topic,
457 function<void(const string&)> unsub_callback, 463 function<void(const string&)> unsub_callback,
458 function<void(const string&, int)> err_callback 464 function<void(const string&, int)> err_callback
459 ) { 465 ) {
460 - if(psub_queue.find(topic) != psub_queue.end()) { 466 + if(psubscribed_topics_.find(topic) != psubscribed_topics_.end()) {
461 logger.warning() << "Already psubscribed to " << topic << "!"; 467 logger.warning() << "Already psubscribed to " << topic << "!";
462 return; 468 return;
463 } 469 }
@@ -468,9 +474,10 @@ void Redox::unsubscribe_raw(const string cmd_name, const string topic, @@ -468,9 +474,10 @@ void Redox::unsubscribe_raw(const string cmd_name, const string topic,
468 function<void(const string&, int)> err_callback 474 function<void(const string&, int)> err_callback
469 ) { 475 ) {
470 command<redisReply*>(cmd_name + " " + topic, 476 command<redisReply*>(cmd_name + " " + topic,
471 - nullptr,  
472 - [topic, err_callback](const string& cmd, int status) {  
473 - if(err_callback) err_callback(topic, status); 477 + [topic, err_callback](Command<redisReply*>& c) {
  478 + if(!c.ok()) {
  479 + if (err_callback) err_callback(topic, c.status());
  480 + }
474 } 481 }
475 ); 482 );
476 } 483 }
@@ -478,7 +485,7 @@ void Redox::unsubscribe_raw(const string cmd_name, const string topic, @@ -478,7 +485,7 @@ void Redox::unsubscribe_raw(const string cmd_name, const string topic,
478 void Redox::unsubscribe(const string topic, 485 void Redox::unsubscribe(const string topic,
479 function<void(const string&, int)> err_callback 486 function<void(const string&, int)> err_callback
480 ) { 487 ) {
481 - if(sub_queue.find(topic) == sub_queue.end()) { 488 + if(subscribed_topics_.find(topic) == subscribed_topics_.end()) {
482 logger.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!"; 489 logger.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!";
483 return; 490 return;
484 } 491 }
@@ -488,7 +495,7 @@ void Redox::unsubscribe(const string topic, @@ -488,7 +495,7 @@ void Redox::unsubscribe(const string topic,
488 void Redox::punsubscribe(const string topic, 495 void Redox::punsubscribe(const string topic,
489 function<void(const string&, int)> err_callback 496 function<void(const string&, int)> err_callback
490 ) { 497 ) {
491 - if(psub_queue.find(topic) == psub_queue.end()) { 498 + if(psubscribed_topics_.find(topic) == psubscribed_topics_.end()) {
492 logger.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!"; 499 logger.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!";
493 return; 500 return;
494 } 501 }
@@ -500,11 +507,11 @@ void Redox::publish(const string topic, const string msg, @@ -500,11 +507,11 @@ void Redox::publish(const string topic, const string msg,
500 function<void(const string&, int)> err_callback 507 function<void(const string&, int)> err_callback
501 ) { 508 ) {
502 command<redisReply*>("PUBLISH " + topic + " " + msg, 509 command<redisReply*>("PUBLISH " + topic + " " + msg,
503 - [topic, msg, pub_callback](const string& command, redisReply* const& reply) { 510 + [topic, msg, err_callback, pub_callback](Command<redisReply*>& c) {
  511 + if(!c.ok()) {
  512 + if(err_callback) err_callback(topic, c.status());
  513 + }
504 if(pub_callback) pub_callback(topic, msg); 514 if(pub_callback) pub_callback(topic, msg);
505 - },  
506 - [topic, err_callback](const string& command, int status) {  
507 - if(err_callback) err_callback(topic, status);  
508 } 515 }
509 ); 516 );
510 } 517 }
@@ -565,20 +572,20 @@ void Redox::command(const string&amp; cmd) { @@ -565,20 +572,20 @@ void Redox::command(const string&amp; cmd) {
565 } 572 }
566 573
567 bool Redox::command_blocking(const string& cmd) { 574 bool Redox::command_blocking(const string& cmd) {
568 - Command<redisReply*>* c = command_blocking<redisReply*>(cmd);  
569 - bool succeeded = c->ok();  
570 - c->free(); 575 + Command<redisReply*>& c = command_blocking<redisReply*>(cmd);
  576 + bool succeeded = c.ok();
  577 + c.free();
571 return succeeded; 578 return succeeded;
572 } 579 }
573 580
574 string Redox::get(const string& key) { 581 string Redox::get(const string& key) {
575 582
576 - auto c = command_blocking<char*>("GET " + key);  
577 - if(!c->ok()) {  
578 - throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + to_string(c->status())); 583 + Command<char*>& c = command_blocking<char*>("GET " + key);
  584 + if(!c.ok()) {
  585 + throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + to_string(c.status()));
579 } 586 }
580 - string reply = c->reply();  
581 - c->free(); 587 + string reply = c.reply();
  588 + c.free();
582 return reply; 589 return reply;
583 }; 590 };
584 591
src/redox.hpp
@@ -105,15 +105,15 @@ public: @@ -105,15 +105,15 @@ public:
105 * user is responsible for calling free() on the Command object. 105 * user is responsible for calling free() on the Command object.
106 */ 106 */
107 template<class ReplyT> 107 template<class ReplyT>
108 - Command<ReplyT>* command( 108 + Command<ReplyT>& command(
109 const std::string& cmd, 109 const std::string& cmd,
110 - const std::function<void(const std::string&, const ReplyT&)>& callback = nullptr,  
111 - const std::function<void(const std::string&, int status)>& error_callback = nullptr, 110 + const std::function<void(Command<ReplyT>&)>& callback = nullptr,
112 double repeat = 0.0, 111 double repeat = 0.0,
113 double after = 0.0, 112 double after = 0.0,
114 bool free_memory = true 113 bool free_memory = true
115 ); 114 );
116 115
  116 +
117 /** 117 /**
118 * A wrapper around command() for synchronous use. Waits for a reply, populates it 118 * A wrapper around command() for synchronous use. Waits for a reply, populates it
119 * into the Command object, and returns when complete. The user can retrieve the 119 * into the Command object, and returns when complete. The user can retrieve the
@@ -122,7 +122,7 @@ public: @@ -122,7 +122,7 @@ public:
122 * the call succeeded. 122 * the call succeeded.
123 */ 123 */
124 template<class ReplyT> 124 template<class ReplyT>
125 - Command<ReplyT>* command_blocking(const std::string& cmd); 125 + Command<ReplyT>& command_blocking(const std::string& cmd);
126 126
127 /** 127 /**
128 * Return the total number of successful commands processed by this Redox instance. 128 * Return the total number of successful commands processed by this Redox instance.
@@ -238,8 +238,8 @@ public: @@ -238,8 +238,8 @@ public:
238 std::function<void(const std::string&, int)> err_callback = nullptr 238 std::function<void(const std::string&, int)> err_callback = nullptr
239 ); 239 );
240 240
241 - const std::set<std::string>& subscribed_topics() { return sub_queue; }  
242 - const std::set<std::string>& psubscribed_topics() { return psub_queue; } 241 + const std::set<std::string>& subscribed_topics() { return subscribed_topics_; }
  242 + const std::set<std::string>& psubscribed_topics() { return psubscribed_topics_; }
243 243
244 // ------------------------------------------------ 244 // ------------------------------------------------
245 // Public only for Command class 245 // Public only for Command class
@@ -365,18 +365,17 @@ private: @@ -365,18 +365,17 @@ private:
365 // Keep track of topics because we can only unsubscribe 365 // Keep track of topics because we can only unsubscribe
366 // from subscribed topics and punsubscribe from 366 // from subscribed topics and punsubscribe from
367 // psubscribed topics, or hiredis leads to segfaults 367 // psubscribed topics, or hiredis leads to segfaults
368 - std::set<std::string> sub_queue;  
369 - std::set<std::string> psub_queue; 368 + std::set<std::string> subscribed_topics_;
  369 + std::set<std::string> psubscribed_topics_;
370 }; 370 };
371 371
372 // --------------------------- 372 // ---------------------------
373 373
374 374
375 template<class ReplyT> 375 template<class ReplyT>
376 -Command<ReplyT>* Redox::command( 376 +Command<ReplyT>& Redox::command(
377 const std::string& cmd, 377 const std::string& cmd,
378 - const std::function<void(const std::string&, const ReplyT&)>& callback,  
379 - const std::function<void(const std::string&, int status)>& error_callback, 378 + const std::function<void(Command<ReplyT>&)>& callback,
380 double repeat, 379 double repeat,
381 double after, 380 double after,
382 bool free_memory 381 bool free_memory
@@ -393,7 +392,7 @@ Command&lt;ReplyT&gt;* Redox::command( @@ -393,7 +392,7 @@ Command&lt;ReplyT&gt;* Redox::command(
393 392
394 commands_created += 1; 393 commands_created += 1;
395 auto* c = new Command<ReplyT>(this, commands_created, cmd, 394 auto* c = new Command<ReplyT>(this, commands_created, cmd,
396 - callback, error_callback, repeat, after, free_memory, logger); 395 + callback, repeat, after, free_memory, logger);
397 396
398 std::lock_guard<std::mutex> lg(queue_guard); 397 std::lock_guard<std::mutex> lg(queue_guard);
399 std::lock_guard<std::mutex> lg2(command_map_guard); 398 std::lock_guard<std::mutex> lg2(command_map_guard);
@@ -406,41 +405,26 @@ Command&lt;ReplyT&gt;* Redox::command( @@ -406,41 +405,26 @@ Command&lt;ReplyT&gt;* Redox::command(
406 405
407 // logger.debug() << "Created Command " << c->id << " at " << c; 406 // logger.debug() << "Created Command " << c->id << " at " << c;
408 407
409 - return c; 408 + return *c;
410 } 409 }
411 410
412 template<class ReplyT> 411 template<class ReplyT>
413 -Command<ReplyT>* Redox::command_blocking(const std::string& cmd) {  
414 -  
415 - ReplyT val;  
416 - std::atomic_int status(REDOX_UNINIT); 412 +Command<ReplyT>& Redox::command_blocking(const std::string& cmd) {
417 413
418 std::condition_variable cv; 414 std::condition_variable cv;
419 std::mutex m; 415 std::mutex m;
420 -  
421 std::unique_lock<std::mutex> lk(m); 416 std::unique_lock<std::mutex> lk(m);
  417 + std::atomic_bool done = {false};
422 418
423 - Command<ReplyT>* c = command<ReplyT>(cmd,  
424 - [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) {  
425 - std::unique_lock<std::mutex> ul(m);  
426 - val = reply;  
427 - status = REDOX_OK;  
428 - ul.unlock();  
429 - cv.notify_one();  
430 - },  
431 - [&status, &m, &cv](const std::string& cmd_str, int error) {  
432 - std::unique_lock<std::mutex> ul(m);  
433 - status = error;  
434 - ul.unlock(); 419 + Command<ReplyT>& c = command<ReplyT>(cmd,
  420 + [&cv, &done](Command<ReplyT>& cmd_obj) {
  421 + done = true;
435 cv.notify_one(); 422 cv.notify_one();
436 }, 423 },
437 0, 0, false // No repeats, don't free memory 424 0, 0, false // No repeats, don't free memory
438 ); 425 );
439 426
440 - cv.wait(lk, [&status] { return status != REDOX_UNINIT; });  
441 - c->reply_val_ = val;  
442 - c->reply_status_ = status;  
443 - 427 + cv.wait(lk, [&done]() { return done.load(); });
444 return c; 428 return c;
445 } 429 }
446 430
test/test.cpp
@@ -100,7 +100,7 @@ protected: @@ -100,7 +100,7 @@ protected:
100 void print_and_check_sync(Command<ReplyT>* c, const ReplyT& value) { 100 void print_and_check_sync(Command<ReplyT>* c, const ReplyT& value) {
101 ASSERT_TRUE(c->ok()); 101 ASSERT_TRUE(c->ok());
102 EXPECT_EQ(c->reply(), value); 102 EXPECT_EQ(c->reply(), value);
103 - cout << "[SYNC] " << c->cmd << ": " << c->reply() << endl; 103 + cout << "[SYNC] " << c->cmd_ << ": " << c->reply() << endl;
104 c->free(); 104 c->free();
105 } 105 }
106 }; 106 };