Commit b4f96a55f52e5a064ad3722cd58754ca25388763

Authored by Hayk Martirosyan
1 parent 38dffef5

Improve client API, split off Subscriber class

Refactor state management code to use three methods .connect(),
.disconnect(), and .wait(). Start conforming to uniform coding style
already applied to Command class.

Split off subscribe functionality into its own class Subscriber. This is
good because it was introducing unnecessary state and complexity into
the main client. Now, Redox handles publishing like any other command
and the Subscriber receives messages.
CMakeLists.txt
... ... @@ -24,6 +24,7 @@ set(SRC_DIR ${CMAKE_SOURCE_DIR}/src)
24 24 set(SRC_CORE
25 25 ${SRC_DIR}/redox.cpp
26 26 ${SRC_DIR}/command.cpp
  27 + ${SRC_DIR}/subscriber.cpp
27 28 )
28 29  
29 30 set(SRC_LOGGER ${SRC_DIR}/utils/logger.cpp)
... ...
examples/basic.cpp
... ... @@ -13,7 +13,7 @@ int main(int argc, char* argv[]) {
13 13  
14 14 Redox rdx = {"localhost", 6379, nullptr, cout, redox::log::Info}; // Initialize Redox
15 15  
16   - if(!rdx.start()) return 1; // Start the event loop
  16 + if(!rdx.connect()) return 1; // Start the event loop
17 17  
18 18 rdx.del("occupation");
19 19  
... ... @@ -22,6 +22,5 @@ int main(int argc, char* argv[]) {
22 22  
23 23 cout << "key = \"occupation\", value = \"" << rdx.get("occupation") << "\"" << endl;
24 24  
25   - rdx.stop(); // Shut down the event loop
26 25 return 0;
27 26 }
... ...
examples/basic_threaded.cpp
... ... @@ -15,7 +15,7 @@ redox::Redox rdx = {&quot;localhost&quot;, 6379};
15 15  
16 16 int main(int argc, char* argv[]) {
17 17  
18   - if(!rdx.start()) return 1;
  18 + if(!rdx.connect()) return 1;
19 19  
20 20 thread setter([]() {
21 21 for(int i = 0; i < 5000; i++) {
... ... @@ -41,7 +41,5 @@ int main(int argc, char* argv[]) {
41 41 setter.join();
42 42 getter.join();
43 43  
44   - rdx.stop();
45   -
46 44 return 0;
47 45 };
... ...
examples/binary_data.cpp
... ... @@ -23,18 +23,18 @@ std::string random_string(size_t length) {
23 23 int main(int argc, char* argv[]) {
24 24  
25 25 redox::Redox rdx = {"localhost", 6379}; // Initialize Redox
26   - if(!rdx.start()) return 1; // Start the event loop
  26 + if(!rdx.connect()) return 1; // Start the event loop
27 27  
28 28 rdx.del("binary");
29 29  
30 30 string binary_data = random_string(10000);
31 31  
32   - auto& c = rdx.command_blocking<string>("SET binary \"" + binary_data + "\"");
  32 + auto& c = rdx.commandSync<string>("SET binary \"" + binary_data + "\"");
33 33 if(c.ok()) cout << "Reply: " << c.reply() << endl;
34 34 else cerr << "Failed to set key! Status: " << c.status() << endl;
35 35 c.free();
36 36  
37   - auto& c2 = rdx.command_blocking<string>("GET binary");
  37 + auto& c2 = rdx.commandSync<string>("GET binary");
38 38 if(c2.ok()) {
39 39 if(c2.reply() == binary_data) cout << "Binary data matches!" << endl;
40 40 else cerr << "Binary data differs!" << endl;
... ... @@ -42,6 +42,5 @@ int main(int argc, char* argv[]) {
42 42 else cerr << "Failed to get key! Status: " << c2.status() << endl;
43 43 c2.free();
44 44  
45   - rdx.stop(); // Shut down the event loop
46 45 return 0;
47 46 }
... ...
examples/data_types.cpp
... ... @@ -15,11 +15,11 @@ using redox::Command;
15 15 int main(int argc, char* argv[]) {
16 16  
17 17 redox::Redox rdx; // Initialize Redox (default host/port)
18   - if(!rdx.start()) return 1; // Start the event loop
  18 + if(!rdx.connect()) return 1; // Start the event loop
19 19  
20 20 rdx.del("mylist");
21 21  
22   - rdx.command_blocking("LPUSH mylist 1 2 3 4 5 6 7 8 9 10");
  22 + rdx.commandSync("LPUSH mylist 1 2 3 4 5 6 7 8 9 10");
23 23  
24 24 rdx.command<vector<string>>("LRANGE mylist 0 4",
25 25 [](Command<vector<string>>& c){
... ... @@ -46,10 +46,10 @@ int main(int argc, char* argv[]) {
46 46 for (const string& s : c.reply()) cout << s << " ";
47 47 cout << endl;
48 48 }
49   - rdx.stop_signal();
  49 + rdx.disconnect();
50 50 }
51 51 );
52 52  
53   - rdx.block(); // Shut down the event loop
  53 + rdx.wait();
54 54 return 0;
55 55 }
... ...
examples/lpush_benchmark.cpp
... ... @@ -17,7 +17,7 @@ double time_s() {
17 17 int main(int argc, char* argv[]) {
18 18  
19 19 redox::Redox rdx;
20   - if(!rdx.start()) return 1;
  20 + if(!rdx.connect()) return 1;
21 21  
22 22 rdx.del("test");
23 23  
... ... @@ -43,15 +43,12 @@ int main(int argc, char* argv[]) {
43 43 cout << "Total time: " << t2 - t0 << "s" << endl;
44 44 cout << "Result: " << (double)len / (t2-t0) << " commands/s" << endl;
45 45  
46   - rdx.stop_signal();
  46 + rdx.disconnect();
47 47 }
48 48 });
49 49 }
50 50 t1 = time_s();
51 51  
52   - rdx.block();
53   -
54   - cout << "Commands processed: " << rdx.num_commands_processed() << endl;
55   -
  52 + rdx.wait();
56 53 return 0;
57 54 };
... ...
examples/multi-client.cpp
... ... @@ -13,7 +13,7 @@ int main(int argc, char* argv[]) {
13 13  
14 14 redox::Redox rdx1, rdx2, rdx3;
15 15  
16   - if(!rdx1.start() || !rdx2.start() || !rdx3.start()) return 1;
  16 + if(!rdx1.connect() || !rdx2.connect() || !rdx3.connect()) return 1;
17 17  
18 18 rdx1.del("occupation");
19 19  
... ... @@ -22,9 +22,5 @@ int main(int argc, char* argv[]) {
22 22  
23 23 cout << "key = occupation, value = \"" << rdx3.get("occupation") << "\"" << endl;
24 24  
25   - rdx1.stop();
26   - rdx2.stop();
27   - rdx3.stop();
28   -
29 25 return 0;
30 26 }
... ...
examples/pub_sub.cpp
1 1 #include <stdlib.h>
2 2 #include <iostream>
3 3 #include "../src/redox.hpp"
  4 +#include "../src/subscriber.hpp"
4 5  
5 6 using namespace std;
6 7  
7 8 int main(int argc, char *argv[]) {
8 9  
9   - redox::Redox rdx; // Initialize Redox (default host/port)
10   - if (!rdx.start()) return 1; // Start the event loop
  10 + redox::Redox publisher; // Initialize Redox (default host/port)
  11 + if (!publisher.connect()) return 1; // Start the event loop
11 12  
12   - redox::Redox rdx_pub;
13   - if(!rdx_pub.start()) return 1;
  13 + redox::Subscriber subscriber;
  14 + if(!subscriber.connect()) return 1;
14 15  
15 16 auto got_message = [](const string& topic, const string& msg) {
16 17 cout << topic << ": " << msg << endl;
... ... @@ -24,31 +25,30 @@ int main(int argc, char *argv[]) {
24 25 cout << "> Unsubscribed from " << topic << endl;
25 26 };
26 27  
27   - rdx.psubscribe("news", got_message, subscribed, unsubscribed);
28   - rdx.subscribe("sports", got_message, subscribed, unsubscribed);
  28 + subscriber.psubscribe("news", got_message, subscribed, unsubscribed);
  29 + subscriber.subscribe("sports", got_message, subscribed, unsubscribed);
29 30  
30   - this_thread::sleep_for(chrono::milliseconds(20));
31   - for(auto s : rdx.subscribed_topics()) cout << "topic: " << s << endl;
  31 + this_thread::sleep_for(chrono::milliseconds(10));
32 32  
33   - rdx_pub.publish("news", "hello!");
34   - rdx_pub.publish("news", "whatup");
35   - rdx_pub.publish("sports", "yo");
  33 + publisher.publish("news", "one");
  34 + publisher.publish("news", "two", [](const string& topic, const string& msg) {
  35 + cout << "published to " << topic << ": " << msg << endl;
  36 + });
  37 + publisher.publish("sports", "three");
36 38  
37   - this_thread::sleep_for(chrono::seconds(1));
38   - rdx.unsubscribe("sports");
39   - rdx_pub.publish("sports", "yo");
40   - rdx_pub.publish("news", "whatup");
  39 + this_thread::sleep_for(chrono::milliseconds(10));
  40 + subscriber.unsubscribe("sports");
  41 + publisher.publish("sports", "\"UH OH\"");
  42 + publisher.publish("news", "four");
41 43  
42   - this_thread::sleep_for(chrono::milliseconds(1));
43   - rdx.punsubscribe("news");
  44 + this_thread::sleep_for(chrono::milliseconds(10));
  45 + subscriber.punsubscribe("news");
  46 + this_thread::sleep_for(chrono::milliseconds(10));
44 47  
45   - rdx_pub.publish("sports", "yo");
46   - rdx_pub.publish("news", "whatup", [](const string& topic, const string& msg) {
47   - cout << "published to " << topic << ": " << msg << endl;
48   - });
49   - rdx_pub.publish("news", "whatup");
50   - rdx.block();
51   - rdx_pub.block();
  48 + publisher.publish("sports", "\"UH OH\"");
  49 + publisher.publish("news", "\"UH OH\"");
  50 +
  51 + this_thread::sleep_for(chrono::milliseconds(10));
52 52  
53 53 return 0;
54 54 }
... ...
examples/speed_test_async.cpp
... ... @@ -19,9 +19,9 @@ double time_s() {
19 19 int main(int argc, char* argv[]) {
20 20  
21 21 Redox rdx = {"/var/run/redis/redis.sock", nullptr};
22   - if(!rdx.start()) return 1;
  22 + if(!rdx.connect()) return 1;
23 23  
24   - bool status = rdx.command_blocking("SET simple_loop:count 0");
  24 + bool status = rdx.commandSync("SET simple_loop:count 0");
25 25 if(status) {
26 26 cout << "Reset the counter to zero." << endl;
27 27 } else {
... ... @@ -40,10 +40,10 @@ int main(int argc, char* argv[]) {
40 40 double t0 = time_s();
41 41 atomic_int count(0);
42 42  
43   - Command<int>& cmd = rdx.command_looping<int>(
  43 + Command<int>& cmd = rdx.commandLoop<int>(
44 44 cmd_str,
45 45 [&count, &rdx](Command<int>& c) {
46   - if(!c.ok()) {
  46 + if (!c.ok()) {
47 47 cerr << "Bad reply: " << c.status() << endl;
48 48 }
49 49 count++;
... ... @@ -65,6 +65,5 @@ int main(int argc, char* argv[]) {
65 65  
66 66 cout << "Final value of counter: " << final_count << endl;
67 67  
68   - rdx.stop();
69 68 return 0;
70 69 }
... ...
examples/speed_test_async_multi.cpp
... ... @@ -20,7 +20,7 @@ double time_s() {
20 20 int main(int argc, char* argv[]) {
21 21  
22 22 Redox rdx = {"localhost", 6379};
23   - if(!rdx.start()) return 1;
  23 + if(!rdx.connect()) return 1;
24 24  
25 25 if(rdx.set("simple_loop:count", "0")) {
26 26 cout << "Reset the counter to zero." << endl;
... ... @@ -43,15 +43,15 @@ int main(int argc, char* argv[]) {
43 43  
44 44 vector<Command<int>*> commands;
45 45 for(int i = 0; i < parallel; i++) {
46   - commands.push_back(&rdx.command_looping<int>(
47   - cmd_str,
  46 + commands.push_back(&rdx.commandLoop<int>(
  47 + cmd_str,
48 48 [&count, &rdx](Command<int>& c) {
49   - if(!c.ok()) {
  49 + if (!c.ok()) {
50 50 cerr << "Bad reply: " << c.status() << endl;
51 51 }
52 52 count++;
53 53 },
54   - dt
  54 + dt
55 55 ));
56 56 }
57 57  
... ... @@ -65,8 +65,6 @@ int main(int argc, char* argv[]) {
65 65 // Get the final value of the counter
66 66 long final_count = stol(rdx.get("simple_loop:count"));
67 67  
68   - rdx.stop();
69   -
70 68 cout << "Sent " << count << " commands in " << t_elapsed << "s, "
71 69 << "that's " << actual_freq << " commands/s." << endl;
72 70  
... ...
examples/speed_test_pubsub.cpp
1 1 #include <iostream>
2 2 #include "../src/redox.hpp"
  3 +#include "../src/subscriber.hpp"
3 4  
4 5 using namespace std;
5 6 using redox::Redox;
6 7 using redox::Command;
  8 +using redox::Subscriber;
7 9  
8 10 double time_s() {
9 11 unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1);
... ... @@ -13,10 +15,10 @@ double time_s() {
13 15 int main(int argc, char *argv[]) {
14 16  
15 17 Redox rdx_pub;
16   - Redox rdx_sub;
  18 + Subscriber rdx_sub;
17 19  
18   - if(!rdx_pub.start()) return 1;
19   - if(!rdx_sub.start()) return 1;
  20 + if(!rdx_pub.connect()) return 1;
  21 + if(!rdx_sub.connect()) return 1;
20 22  
21 23 atomic_int count(0);
22 24 auto got_message = [&count](const string& topic, const string& msg) {
... ... @@ -41,12 +43,13 @@ int main(int argc, char *argv[]) {
41 43 rdx_pub.publish("speedtest", "hello");
42 44 t1 = time_s();
43 45 }
44   - this_thread::sleep_for(chrono::milliseconds(1000));
45   - rdx_pub.stop();
46   - rdx_sub.stop();
  46 +
  47 + this_thread::sleep_for(chrono::milliseconds(10));
47 48  
48 49 double t = t1 - t0;
49 50 cout << "Total of messages sent in " << t << "s is " << count << endl;
50 51 double msg_per_s = count / t;
51 52 cout << "Messages per second: " << msg_per_s << endl;
  53 +
  54 + return 0;
52 55 }
... ...
examples/speed_test_sync.cpp
... ... @@ -18,9 +18,9 @@ double time_s() {
18 18 int main(int argc, char* argv[]) {
19 19  
20 20 Redox rdx = {"localhost", 6379};
21   - if(!rdx.start()) return 1;
  21 + if(!rdx.connect()) return 1;
22 22  
23   - if(rdx.command_blocking("SET simple_loop:count 0")) {
  23 + if(rdx.commandSync("SET simple_loop:count 0")) {
24 24 cout << "Reset the counter to zero." << endl;
25 25 } else {
26 26 cerr << "Failed to reset counter." << endl;
... ... @@ -37,7 +37,7 @@ int main(int argc, char* argv[]) {
37 37 int count = 0;
38 38  
39 39 while(time_s() < t_end) {
40   - Command<int>& c = rdx.command_blocking<int>(cmd_str);
  40 + Command<int>& c = rdx.commandSync<int>(cmd_str);
41 41 if(!c.ok()) cerr << "Bad reply, code: " << c.status() << endl;
42 42 c.free();
43 43 count++;
... ... @@ -48,8 +48,6 @@ int main(int argc, char* argv[]) {
48 48  
49 49 long final_count = stol(rdx.get("simple_loop:count"));
50 50  
51   - rdx.stop();
52   -
53 51 cout << "Sent " << count << " commands in " << t_elapsed << "s, "
54 52 << "that's " << actual_freq << " commands/s." << endl;
55 53  
... ...
src/command.hpp
... ... @@ -54,7 +54,7 @@ public:
54 54 /**
55 55 * This method returns once this command's callback has been invoked
56 56 * (or would have been invoked if there is none) since the last call
57   - * to block(). If it is the first call, then returns once the callback
  57 + * to wait(). If it is the first call, then returns once the callback
58 58 * is invoked for the first time.
59 59 */
60 60 void wait();
... ...
src/redox.cpp
... ... @@ -3,72 +3,72 @@
3 3 */
4 4  
5 5 #include <signal.h>
6   -#include "redox.hpp"
7 6 #include <string.h>
  7 +#include "redox.hpp"
8 8  
9 9 using namespace std;
10 10  
11 11 namespace redox {
12 12  
13   -void Redox::connected_callback(const redisAsyncContext *ctx, int status) {
  13 +void Redox::connectedCallback(const redisAsyncContext* ctx, int status) {
14 14  
15 15 Redox* rdx = (Redox*) ctx->data;
16 16  
17 17 if (status != REDIS_OK) {
18   - rdx->logger.fatal() << "Could not connect to Redis: " << ctx->errstr;
19   - rdx->connect_state = REDOX_CONNECT_ERROR;
  18 + rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr;
  19 + rdx->connect_state_ = CONNECT_ERROR;
20 20  
21 21 } else {
22 22 // Disable hiredis automatically freeing reply objects
23 23 ctx->c.reader->fn->freeObject = [](void *reply) {};
24   - rdx->connect_state = REDOX_CONNECTED;
25   - rdx->logger.info() << "Connected to Redis.";
  24 + rdx->connect_state_ = CONNECTED;
  25 + rdx->logger_.info() << "Connected to Redis.";
26 26 }
27 27  
28   - rdx->connect_waiter.notify_all();
29   - if(rdx->user_connection_callback) rdx->user_connection_callback(rdx->connect_state);
  28 + rdx->connect_waiter_.notify_all();
  29 + if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_);
30 30 }
31 31  
32   -void Redox::disconnected_callback(const redisAsyncContext *ctx, int status) {
  32 +void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) {
33 33  
34 34 Redox* rdx = (Redox*) ctx->data;
35 35  
36 36 if (status != REDIS_OK) {
37   - rdx->logger.error() << "Could not disconnect from Redis: " << ctx->errstr;
38   - rdx->connect_state = REDOX_DISCONNECT_ERROR;
  37 + rdx->logger_.error() << "Could not disconnect from Redis: " << ctx->errstr;
  38 + rdx->connect_state_ = DISCONNECT_ERROR;
39 39 } else {
40   - rdx->logger.info() << "Disconnected from Redis as planned.";
41   - rdx->connect_state = REDOX_DISCONNECTED;
  40 + rdx->logger_.info() << "Disconnected from Redis as planned.";
  41 + rdx->connect_state_ = DISCONNECTED;
42 42 }
43 43  
44   - rdx->stop_signal();
45   - rdx->connect_waiter.notify_all();
46   - if(rdx->user_connection_callback) rdx->user_connection_callback(rdx->connect_state);
  44 + rdx->disconnect();
  45 + rdx->connect_waiter_.notify_all();
  46 + if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_);
47 47 }
48 48  
49 49 void Redox::init_ev() {
50 50 signal(SIGPIPE, SIG_IGN);
51   - evloop = ev_loop_new(EVFLAG_AUTO);
52   - ev_set_userdata(evloop, (void*)this); // Back-reference
  51 + evloop_ = ev_loop_new(EVFLAG_AUTO);
  52 + ev_set_userdata(evloop_, (void*)this); // Back-reference
53 53 }
54 54  
55 55 void Redox::init_hiredis() {
56 56  
57   - ctx->data = (void*)this; // Back-reference
  57 + ctx_->data = (void*)this; // Back-reference
58 58  
59   - if (ctx->err) {
60   - logger.error() << "Could not create a hiredis context: " << ctx->errstr;
61   - connect_state = REDOX_CONNECT_ERROR;
62   - connect_waiter.notify_all();
  59 + if (ctx_->err) {
  60 + logger_.error() << "Could not create a hiredis context: " << ctx_->errstr;
  61 + connect_state_ = CONNECT_ERROR;
  62 + connect_waiter_.notify_all();
63 63 return;
64 64 }
65 65  
66 66 // Attach event loop to hiredis
67   - redisLibevAttach(evloop, ctx);
  67 + redisLibevAttach(evloop_, ctx_);
68 68  
69 69 // Set the callbacks to be invoked on server connection/disconnection
70   - redisAsyncSetConnectCallback(ctx, Redox::connected_callback);
71   - redisAsyncSetDisconnectCallback(ctx, Redox::disconnected_callback);
  70 + redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback);
  71 + redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback);
72 72 }
73 73  
74 74 Redox::Redox(
... ... @@ -76,14 +76,14 @@ Redox::Redox(
76 76 function<void(int)> connection_callback,
77 77 ostream& log_stream,
78 78 log::Level log_level
79   -) : host(host), port(port),
80   - logger(log_stream, log_level),
81   - user_connection_callback(connection_callback) {
  79 +) : host_(host), port_(port),
  80 + logger_(log_stream, log_level),
  81 + user_connection_callback_(connection_callback) {
82 82  
83 83 init_ev();
84 84  
85 85 // Connect over TCP
86   - ctx = redisAsyncConnect(host.c_str(), port);
  86 + ctx_ = redisAsyncConnect(host.c_str(), port);
87 87  
88 88 init_hiredis();
89 89 }
... ... @@ -93,13 +93,13 @@ Redox::Redox(
93 93 function<void(int)> connection_callback,
94 94 ostream& log_stream,
95 95 log::Level log_level
96   -) : host(), port(), path(path), logger(log_stream, log_level),
97   - user_connection_callback(connection_callback) {
  96 +) : host_(), port_(), path_(path), logger_(log_stream, log_level),
  97 + user_connection_callback_(connection_callback) {
98 98  
99 99 init_ev();
100 100  
101 101 // Connect over unix sockets
102   - ctx = redisAsyncConnectUnix(path.c_str());
  102 + ctx_ = redisAsyncConnectUnix(path.c_str());
103 103  
104 104 init_hiredis();
105 105 }
... ... @@ -108,142 +108,127 @@ void break_event_loop(struct ev_loop* loop, ev_async* async, int revents) {
108 108 ev_break(loop, EVBREAK_ALL);
109 109 }
110 110  
111   -void Redox::run_event_loop() {
  111 +void Redox::runEventLoop() {
112 112  
113 113 // Events to connect to Redox
114   - ev_run(evloop, EVRUN_NOWAIT);
  114 + ev_run(evloop_, EVRUN_NOWAIT);
115 115  
116 116 // Block until connected to Redis, or error
117   - unique_lock<mutex> ul(connect_lock);
118   - connect_waiter.wait(ul, [this] { return connect_state != REDOX_NOT_YET_CONNECTED; });
  117 + unique_lock<mutex> ul(connect_lock_);
  118 + connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; });
119 119  
120 120 // Handle connection error
121   - if(connect_state != REDOX_CONNECTED) {
122   - logger.warning() << "Did not connect, event loop exiting.";
123   - running_waiter.notify_one();
  121 + if(connect_state_ != CONNECTED) {
  122 + logger_.warning() << "Did not connect, event loop exiting.";
  123 + running_waiter_.notify_one();
124 124 return;
125 125 }
126 126  
127 127 // Set up asynchronous watcher which we signal every
128 128 // time we add a command
129   - ev_async_init(&async_w, process_queued_commands);
130   - ev_async_start(evloop, &async_w);
  129 + ev_async_init(&watcher_command_, proccessQueuedCommands);
  130 + ev_async_start(evloop_, &watcher_command_);
131 131  
132 132 // Set up an async watcher to break the loop
133   - ev_async_init(&async_stop, break_event_loop);
134   - ev_async_start(evloop, &async_stop);
  133 + ev_async_init(&watcher_stop_, break_event_loop);
  134 + ev_async_start(evloop_, &watcher_stop_);
135 135  
136   - running = true;
137   - running_waiter.notify_one();
  136 + running_ = true;
  137 + running_waiter_.notify_one();
138 138  
139 139 // Run the event loop
140   - while (!to_exit) {
  140 + while (!to_exit_) {
141 141 // logger.info() << "Event loop running";
142   - ev_run(evloop, EVRUN_NOWAIT);
  142 + ev_run(evloop_, EVRUN_NOWAIT);
143 143 }
144 144  
145   - logger.info() << "Stop signal detected.";
  145 + logger_.info() << "Stop signal detected. Disconnecting from Redis.";
  146 + if(connect_state_ == CONNECTED) redisAsyncDisconnect(ctx_);
146 147  
147   - // Run a few more times to clear out canceled events
  148 + // Run a few more times to disconnect and clear out canceled events
148 149 for(int i = 0; i < 100; i++) {
149   - ev_run(evloop, EVRUN_NOWAIT);
  150 + ev_run(evloop_, EVRUN_NOWAIT);
150 151 }
151 152  
152   - if(commands_created != commands_deleted) {
153   - logger.error() << "All commands were not freed! "
154   - << commands_deleted << "/" << commands_created;
  153 + if(commands_created_ != commands_deleted_) {
  154 + logger_.error() << "All commands were not freed! "
  155 + << commands_deleted_ << "/" << commands_created_;
155 156 }
156 157  
157   - exited = true;
158   - running = false;
  158 + exited_ = true;
  159 + running_ = false;
159 160  
160 161 // Let go for block_until_stopped method
161   - exit_waiter.notify_one();
  162 + exit_waiter_.notify_one();
162 163  
163   - logger.info() << "Event thread exited.";
  164 + logger_.info() << "Event thread exited.";
164 165 }
165 166  
166   -bool Redox::start() {
  167 +bool Redox::connect() {
167 168  
168   - event_loop_thread = thread([this] { run_event_loop(); });
  169 + event_loop_thread_ = thread([this] { runEventLoop(); });
169 170  
170 171 // Block until connected and running the event loop, or until
171 172 // a connection error happens and the event loop exits
172   - unique_lock<mutex> ul(running_waiter_lock);
173   - running_waiter.wait(ul, [this] {
174   - return running.load() || connect_state == REDOX_CONNECT_ERROR;
  173 + unique_lock<mutex> ul(running_waiter_lock_);
  174 + running_waiter_.wait(ul, [this] {
  175 + return running_.load() || connect_state_ == CONNECT_ERROR;
175 176 });
176 177  
177 178 // Return if succeeded
178   - return connect_state == REDOX_CONNECTED;
179   -}
180   -
181   -void Redox::stop_signal() {
182   - to_exit = true;
183   - logger.debug() << "stop_signal() called, breaking event loop";
184   - ev_async_send(evloop, &async_stop);
185   -}
186   -
187   -void Redox::block() {
188   - unique_lock<mutex> ul(exit_waiter_lock);
189   - exit_waiter.wait(ul, [this] { return exited.load(); });
  179 + return connect_state_ == CONNECTED;
190 180 }
191 181  
192   -void Redox::stop() {
193   - stop_signal();
194   - block();
  182 +void Redox::disconnect() {
  183 + to_exit_ = true;
  184 + logger_.debug() << "disconnect() called, breaking event loop";
  185 + ev_async_send(evloop_, &watcher_stop_);
195 186 }
196 187  
197   -void Redox::disconnect() {
198   - stop_signal();
199   - if(connect_state == REDOX_CONNECTED) {
200   - redisAsyncDisconnect(ctx);
201   - block();
202   - }
  188 +void Redox::wait() {
  189 + unique_lock<mutex> ul(exit_waiter_lock_);
  190 + exit_waiter_.wait(ul, [this] { return exited_.load(); });
203 191 }
204 192  
205 193 Redox::~Redox() {
206 194  
207 195 disconnect();
208 196  
209   - if(event_loop_thread.joinable())
210   - event_loop_thread.join();
  197 + if(event_loop_thread_.joinable())
  198 + event_loop_thread_.join();
211 199  
212   - ev_loop_destroy(evloop);
  200 + ev_loop_destroy(evloop_);
213 201  
214   - logger.info() << "Redox created " << commands_created
215   - << " Commands and freed " << commands_deleted << ".";
  202 + logger_.info() << "Redox created " << commands_created_
  203 + << " Commands and freed " << commands_deleted_ << ".";
216 204 }
217 205  
218 206 template<class ReplyT>
219   -Command<ReplyT>* Redox::find_command(long id) {
  207 +Command<ReplyT>* Redox::findCommand(long id) {
220 208  
221   - lock_guard<mutex> lg(command_map_guard);
  209 + lock_guard<mutex> lg(command_map_guard_);
222 210  
223   - auto& command_map = get_command_map<ReplyT>();
  211 + auto& command_map = getCommandMap<ReplyT>();
224 212 auto it = command_map.find(id);
225 213 if(it == command_map.end()) return nullptr;
226 214 return it->second;
227 215 }
228 216  
229 217 template<class ReplyT>
230   -void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) {
  218 +void Redox::commandCallback(redisAsyncContext* ctx, void* r, void* privdata) {
231 219  
232 220 Redox* rdx = (Redox*) ctx->data;
233 221 long id = (long)privdata;
234 222 redisReply* reply_obj = (redisReply*) r;
235 223  
236   - Command<ReplyT>* c = rdx->find_command<ReplyT>(id);
  224 + Command<ReplyT>* c = rdx->findCommand<ReplyT>(id);
237 225 if(c == nullptr) {
238   -// rdx->logger.warning() << "Couldn't find Command " << id << " in command_map (command_callback).";
  226 +// rdx->logger.warning() << "Couldn't find Command " << id << " in command_map (commandCallback).";
239 227 freeReplyObject(reply_obj);
240 228 return;
241 229 }
242 230  
243 231 c->processReply(reply_obj);
244   -
245   - // Increment the Redox object command counter
246   - rdx->cmd_count++;
247 232 }
248 233  
249 234 /**
... ... @@ -251,7 +236,7 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) {
251 236 * true if succeeded, false otherwise.
252 237 */
253 238 template<class ReplyT>
254   -bool Redox::submit_to_server(Command<ReplyT>* c) {
  239 +bool Redox::submitToServer(Command<ReplyT>* c) {
255 240  
256 241 Redox* rdx = c->rdx_;
257 242 c->pending_++;
... ... @@ -270,8 +255,8 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) {
270 255  
271 256 string format = c->cmd_.substr(0, first) + "%b";
272 257 string value = c->cmd_.substr(first+1, last-first-1);
273   - if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id_, format.c_str(), value.c_str(), value.size()) != REDIS_OK) {
274   - rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr;
  258 + if (redisAsyncCommand(rdx->ctx_, commandCallback < ReplyT > , (void*) c->id_, format.c_str(), value.c_str(), value.size()) != REDIS_OK) {
  259 + rdx->logger_.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx_->errstr;
275 260 c->reply_status_ = Command<ReplyT>::SEND_ERROR;
276 261 c->invoke();
277 262 return false;
... ... @@ -280,8 +265,8 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) {
280 265 }
281 266 }
282 267  
283   - if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id_, c->cmd_.c_str()) != REDIS_OK) {
284   - rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr;
  268 + if (redisAsyncCommand(rdx->ctx_, commandCallback < ReplyT > , (void*) c->id_, c->cmd_.c_str()) != REDIS_OK) {
  269 + rdx->logger_.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx_->errstr;
285 270 c->reply_status_ = Command<ReplyT>::SEND_ERROR;
286 271 c->invoke();
287 272 return false;
... ... @@ -291,15 +276,15 @@ bool Redox::submit_to_server(Command&lt;ReplyT&gt;* c) {
291 276 }
292 277  
293 278 template<class ReplyT>
294   -void Redox::submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) {
  279 +void Redox::submitCommandCallback(struct ev_loop* loop, ev_timer* timer, int revents) {
295 280  
296 281 Redox* rdx = (Redox*) ev_userdata(loop);
297 282 long id = (long)timer->data;
298 283  
299   - Command<ReplyT>* c = rdx->find_command<ReplyT>(id);
  284 + Command<ReplyT>* c = rdx->findCommand<ReplyT>(id);
300 285 if(c == nullptr) {
301   - rdx->logger.error() << "Couldn't find Command " << id
302   - << " in command_map (submit_command_callback).";
  286 + rdx->logger_.error() << "Couldn't find Command " << id
  287 + << " in command_map (submitCommandCallback).";
303 288 return;
304 289 }
305 290  
... ... @@ -318,23 +303,23 @@ void Redox::submit_command_callback(struct ev_loop* loop, ev_timer* timer, int r
318 303 return;
319 304 }
320 305  
321   - submit_to_server<ReplyT>(c);
  306 + submitToServer<ReplyT>(c);
322 307 }
323 308  
324 309 template<class ReplyT>
325   -bool Redox::process_queued_command(long id) {
  310 +bool Redox::proccessQueuedCommand(long id) {
326 311  
327   - Command<ReplyT>* c = find_command<ReplyT>(id);
  312 + Command<ReplyT>* c = findCommand<ReplyT>(id);
328 313 if(c == nullptr) return false;
329 314  
330 315 if((c->repeat_ == 0) && (c->after_ == 0)) {
331   - submit_to_server<ReplyT>(c);
  316 + submitToServer<ReplyT>(c);
332 317  
333 318 } else {
334 319  
335 320 c->timer_.data = (void*)c->id_;
336   - ev_timer_init(&c->timer_, submit_command_callback<ReplyT>, c->after_, c->repeat_);
337   - ev_timer_start(evloop, &c->timer_);
  321 + ev_timer_init(&c->timer_, submitCommandCallback <ReplyT>, c->after_, c->repeat_);
  322 + ev_timer_start(evloop_, &c->timer_);
338 323  
339 324 c->timer_guard_.unlock();
340 325 }
... ... @@ -342,7 +327,7 @@ bool Redox::process_queued_command(long id) {
342 327 return true;
343 328 }
344 329  
345   -void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int revents) {
  330 +void Redox::proccessQueuedCommands(struct ev_loop* loop, ev_async* async, int revents) {
346 331  
347 332 Redox* rdx = (Redox*) ev_userdata(loop);
348 333  
... ... @@ -353,224 +338,60 @@ void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int r
353 338 long id = rdx->command_queue.front();
354 339 rdx->command_queue.pop();
355 340  
356   - if(rdx->process_queued_command<redisReply*>(id)) {}
357   - else if(rdx->process_queued_command<string>(id)) {}
358   - else if(rdx->process_queued_command<char*>(id)) {}
359   - else if(rdx->process_queued_command<int>(id)) {}
360   - else if(rdx->process_queued_command<long long int>(id)) {}
361   - else if(rdx->process_queued_command<nullptr_t>(id)) {}
362   - else if(rdx->process_queued_command<vector<string>>(id)) {}
363   - else if(rdx->process_queued_command<std::set<string>>(id)) {}
364   - else if(rdx->process_queued_command<unordered_set<string>>(id)) {}
  341 + if(rdx->proccessQueuedCommand<redisReply*>(id)) {}
  342 + else if(rdx->proccessQueuedCommand<string>(id)) {}
  343 + else if(rdx->proccessQueuedCommand<char*>(id)) {}
  344 + else if(rdx->proccessQueuedCommand<int>(id)) {}
  345 + else if(rdx->proccessQueuedCommand<long long int>(id)) {}
  346 + else if(rdx->proccessQueuedCommand<nullptr_t>(id)) {}
  347 + else if(rdx->proccessQueuedCommand<vector<string>>(id)) {}
  348 + else if(rdx->proccessQueuedCommand<std::set<string>>(id)) {}
  349 + else if(rdx->proccessQueuedCommand<unordered_set<string>>(id)) {}
365 350 else throw runtime_error("Command pointer not found in any queue!");
366 351 }
367 352 }
368 353  
369 354 // ---------------------------------
370   -// Pub/Sub methods
371   -// ---------------------------------
372   -
373   -void Redox::subscribe_raw(const string cmd_name, const string topic,
374   - function<void(const string&, const string&)> msg_callback,
375   - function<void(const string&)> sub_callback,
376   - function<void(const string&)> unsub_callback,
377   - function<void(const string&, int)> err_callback
378   -) {
379   -
380   - // Start pubsub mode. No non-sub/unsub commands can be emitted by this client.
381   - pubsub_mode = true;
382   -
383   - command_looping<redisReply*>(cmd_name + " " + topic,
384   - [this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command<redisReply*>& c) {
385   -
386   - if(!c.ok()) {
387   - if(err_callback) err_callback(topic, c.status());
388   - return;
389   - }
390   -
391   - redisReply* reply = c.reply();
392   -
393   - // For debugging only
394   -// cout << "------" << endl;
395   -// cout << cmd << " " << (reply->type == REDIS_REPLY_ARRAY) << " " << (reply->elements) << endl;
396   -// for(int i = 0; i < reply->elements; i++) {
397   -// redisReply* r = reply->element[i];
398   -// cout << "element " << i << ", reply type = " << r->type << " ";
399   -// if(r->type == REDIS_REPLY_STRING) cout << r->str << endl;
400   -// else if(r->type == REDIS_REPLY_INTEGER) cout << r->integer << endl;
401   -// else cout << "some other type" << endl;
402   -// }
403   -// cout << "------" << endl;
404   -
405   - // TODO cancel this command on unsubscription?
406   -
407   - // If the last entry is an integer, then it is a [p]sub/[p]unsub command
408   - if((reply->type == REDIS_REPLY_ARRAY) &&
409   - (reply->element[reply->elements-1]->type == REDIS_REPLY_INTEGER)) {
410   -
411   - if(!strncmp(reply->element[0]->str, "sub", 3)) {
412   - subscribed_topics_.insert(topic);
413   - if(sub_callback) sub_callback(topic);
414   -
415   - } else if(!strncmp(reply->element[0]->str, "psub", 4)) {
416   - psubscribed_topics_.insert(topic);
417   - if (sub_callback) sub_callback(topic);
418   -
419   - } else if(!strncmp(reply->element[0]->str, "uns", 3)) {
420   - subscribed_topics_.erase(topic);
421   - if (unsub_callback) unsub_callback(topic);
422   -
423   - } else if(!strncmp(reply->element[0]->str, "puns", 4)) {
424   - psubscribed_topics_.erase(topic);
425   - if (unsub_callback) unsub_callback(topic);
426   - }
427   -
428   - else logger.error() << "Unknown pubsub message: " << reply->element[0]->str;
429   - }
430   -
431   - // Message for subscribe
432   - else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) {
433   - char *msg = reply->element[2]->str;
434   - if (msg && msg_callback) msg_callback(topic, reply->element[2]->str);
435   - }
436   -
437   - // Message for psubscribe
438   - else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 4)) {
439   - char *msg = reply->element[2]->str;
440   - if (msg && msg_callback) msg_callback(reply->element[2]->str, reply->element[3]->str);
441   - }
442   -
443   - else logger.error() << "Unknown pubsub message of type " << reply->type;
444   - },
445   - 1e10 // To keep the command around for a few hundred years
446   - );
447   -}
448   -
449   -void Redox::subscribe(const string topic,
450   - function<void(const string&, const string&)> msg_callback,
451   - function<void(const string&)> sub_callback,
452   - function<void(const string&)> unsub_callback,
453   - function<void(const string&, int)> err_callback
454   -) {
455   - if(subscribed_topics_.find(topic) != subscribed_topics_.end()) {
456   - logger.warning() << "Already subscribed to " << topic << "!";
457   - return;
458   - }
459   - subscribe_raw("SUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback);
460   -}
461   -
462   -void Redox::psubscribe(const string topic,
463   - function<void(const string&, const string&)> msg_callback,
464   - function<void(const string&)> sub_callback,
465   - function<void(const string&)> unsub_callback,
466   - function<void(const string&, int)> err_callback
467   -) {
468   - if(psubscribed_topics_.find(topic) != psubscribed_topics_.end()) {
469   - logger.warning() << "Already psubscribed to " << topic << "!";
470   - return;
471   - }
472   - subscribe_raw("PSUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback);
473   -}
474   -
475   -void Redox::unsubscribe_raw(const string cmd_name, const string topic,
476   - function<void(const string&, int)> err_callback
477   -) {
478   - command<redisReply*>(cmd_name + " " + topic,
479   - [topic, err_callback](Command<redisReply*>& c) {
480   - if(!c.ok()) {
481   - if (err_callback) err_callback(topic, c.status());
482   - }
483   - }
484   - );
485   -}
486   -
487   -void Redox::unsubscribe(const string topic,
488   - function<void(const string&, int)> err_callback
489   -) {
490   - if(subscribed_topics_.find(topic) == subscribed_topics_.end()) {
491   - logger.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!";
492   - return;
493   - }
494   - unsubscribe_raw("UNSUBSCRIBE", topic, err_callback);
495   -}
496   -
497   -void Redox::punsubscribe(const string topic,
498   - function<void(const string&, int)> err_callback
499   -) {
500   - if(psubscribed_topics_.find(topic) == psubscribed_topics_.end()) {
501   - logger.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!";
502   - return;
503   - }
504   - unsubscribe_raw("PUNSUBSCRIBE", topic, err_callback);
505   -}
506   -
507   -void Redox::publish(const string topic, const string msg,
508   - function<void(const string&, const string&)> pub_callback,
509   - function<void(const string&, int)> err_callback
510   -) {
511   - command<redisReply*>("PUBLISH " + topic + " " + msg,
512   - [topic, msg, err_callback, pub_callback](Command<redisReply*>& c) {
513   - if(!c.ok()) {
514   - if(err_callback) err_callback(topic, c.status());
515   - }
516   - if(pub_callback) pub_callback(topic, msg);
517   - }
518   - );
519   -}
520   -
521   -/**
522   -* Throw an exception for any non-pubsub commands.
523   -*/
524   -void Redox::deny_non_pubsub(const string& cmd) {
525   -
526   - string cmd_name = cmd.substr(0, cmd.find(' '));
527   -
528   - // Compare with the command's first 5 characters
529   - if(!cmd_name.compare("SUBSCRIBE") || !cmd_name.compare("UNSUBSCRIBE") ||
530   - !cmd_name.compare("PSUBSCRIBE") || !cmd_name.compare("PUNSUBSCRIBE")) {
531   - } else {
532   - throw runtime_error("In pub/sub mode, this Redox instance can only issue "
533   - "[p]subscribe/[p]unsubscribe commands! Use another instance for other commands.");
534   - }
535   -}
536   -
537   -// ---------------------------------
538 355 // get_command_map specializations
539 356 // ---------------------------------
540 357  
541 358 template<> unordered_map<long, Command<redisReply*>*>&
542   -Redox::get_command_map<redisReply*>() { return commands_redis_reply; }
  359 +Redox::getCommandMap<redisReply*>() { return commands_redis_reply_; }
543 360  
544 361 template<> unordered_map<long, Command<string>*>&
545   -Redox::get_command_map<string>() { return commands_string_r; }
  362 +Redox::getCommandMap<string>() { return commands_string_; }
546 363  
547 364 template<> unordered_map<long, Command<char*>*>&
548   -Redox::get_command_map<char*>() { return commands_char_p; }
  365 +Redox::getCommandMap<char*>() { return commands_char_p_; }
549 366  
550 367 template<> unordered_map<long, Command<int>*>&
551   -Redox::get_command_map<int>() { return commands_int; }
  368 +Redox::getCommandMap<int>() { return commands_int_; }
552 369  
553 370 template<> unordered_map<long, Command<long long int>*>&
554   -Redox::get_command_map<long long int>() { return commands_long_long_int; }
  371 +Redox::getCommandMap<long long int>() { return commands_long_long_int_; }
555 372  
556 373 template<> unordered_map<long, Command<nullptr_t>*>&
557   -Redox::get_command_map<nullptr_t>() { return commands_null; }
  374 +Redox::getCommandMap<nullptr_t>() { return commands_null_; }
558 375  
559 376 template<> unordered_map<long, Command<vector<string>>*>&
560   -Redox::get_command_map<vector<string>>() { return commands_vector_string; }
  377 +Redox::getCommandMap<vector<string>>() { return commands_vector_string_; }
561 378  
562 379 template<> unordered_map<long, Command<set<string>>*>&
563   -Redox::get_command_map<set<string>>() { return commands_set_string; }
  380 +Redox::getCommandMap<set<string>>() { return commands_set_string_; }
564 381  
565 382 template<> unordered_map<long, Command<unordered_set<string>>*>&
566   -Redox::get_command_map<unordered_set<string>>() { return commands_unordered_set_string; }
  383 +Redox::getCommandMap<unordered_set<string>>() { return commands_unordered_set_string_; }
567 384  
568 385 // ----------------------------
569 386 // Helpers
570 387 // ----------------------------
571 388  
572   -bool Redox::command_blocking(const string& cmd) {
573   - auto& c = command_blocking<redisReply*>(cmd);
  389 +void Redox::command(const std::string& cmd) {
  390 + command<redisReply*>(cmd, nullptr);
  391 +}
  392 +
  393 +bool Redox::commandSync(const string& cmd) {
  394 + auto& c = commandSync<redisReply*>(cmd);
574 395 bool succeeded = c.ok();
575 396 c.free();
576 397 return succeeded;
... ... @@ -578,7 +399,7 @@ bool Redox::command_blocking(const string&amp; cmd) {
578 399  
579 400 string Redox::get(const string& key) {
580 401  
581   - Command<char*>& c = command_blocking<char*>("GET " + key);
  402 + Command<char*>& c = commandSync<char*>("GET " + key);
582 403 if(!c.ok()) {
583 404 throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + to_string(c.status()));
584 405 }
... ... @@ -588,11 +409,25 @@ string Redox::get(const string&amp; key) {
588 409 };
589 410  
590 411 bool Redox::set(const string& key, const string& value) {
591   - return command_blocking("SET " + key + " " + value);
  412 + return commandSync("SET " + key + " " + value);
592 413 }
593 414  
594 415 bool Redox::del(const string& key) {
595   - return command_blocking("DEL " + key);
  416 + return commandSync("DEL " + key);
  417 +}
  418 +
  419 +void Redox::publish(const string topic, const string msg,
  420 + function<void(const string&, const string&)> pub_callback,
  421 + function<void(const string&, int)> err_callback
  422 +) {
  423 + command<redisReply*>("PUBLISH " + topic + " " + msg,
  424 + [topic, msg, err_callback, pub_callback](Command<redisReply*>& c) {
  425 + if(!c.ok()) {
  426 + if(err_callback) err_callback(topic, c.status());
  427 + }
  428 + if(pub_callback) pub_callback(topic, msg);
  429 + }
  430 + );
596 431 }
597 432  
598 433 } // End namespace redis
... ...
src/redox.hpp
... ... @@ -27,23 +27,25 @@
27 27  
28 28 namespace redox {
29 29  
30   -// Default to a local Redis server
31 30 static const std::string REDIS_DEFAULT_HOST = "localhost";
32 31 static const int REDIS_DEFAULT_PORT = 6379;
33 32  
34   -// Connection status
35   -static const int REDOX_NOT_YET_CONNECTED = 0;
36   -static const int REDOX_CONNECTED = 1;
37   -static const int REDOX_DISCONNECTED = 2;
38   -static const int REDOX_CONNECT_ERROR = 3;
39   -static const int REDOX_DISCONNECT_ERROR = 4;
40   -
  33 +/**
  34 +* Redox intro here.
  35 +*/
41 36 class Redox {
42 37  
43 38 public:
44 39  
  40 + // Connection states
  41 + static const int NOT_YET_CONNECTED = 0;
  42 + static const int CONNECTED = 1;
  43 + static const int DISCONNECTED = 2;
  44 + static const int CONNECT_ERROR = 3;
  45 + static const int DISCONNECT_ERROR = 4;
  46 +
45 47 /**
46   - * Initialize everything, connect over TCP to a Redis server.
  48 + * Initializes everything, connects over TCP to a Redis server.
47 49 */
48 50 Redox(
49 51 const std::string& host = REDIS_DEFAULT_HOST,
... ... @@ -54,7 +56,7 @@ public:
54 56 );
55 57  
56 58 /**
57   - * Initialize everything, connect over unix sockets to a Redis server.
  59 + * Initializes everything, connects over unix sockets to a Redis server.
58 60 */
59 61 Redox(
60 62 const std::string& path,
... ... @@ -62,29 +64,29 @@ public:
62 64 std::ostream& log_stream = std::cout,
63 65 log::Level log_level = log::Info
64 66 );
65   - ~Redox();
66 67  
67 68 /**
68   - * Connect to Redis and start the event loop in a separate thread. Returns
69   - * true if and when everything is ready to go, or false on failure.
  69 + * Disconnects from the Redis server, shuts down the event loop, and cleans up.
  70 + * Internally calls disconnect() and wait().
70 71 */
71   - bool start();
  72 + ~Redox();
72 73  
73 74 /**
74   - * Signal the event loop to stop processing commands and shut down.
  75 + * Connects to Redis and starts an event loop in a separate thread. Returns
  76 + * true once everything is ready, or false on failure.
75 77 */
76   - void stop_signal();
  78 + bool connect();
77 79  
78 80 /**
79   - * Wait for the event loop to exit, then return.
  81 + * Signal the event loop thread to disconnect from Redis and shut down.
80 82 */
81   - void block();
  83 + void disconnect();
82 84  
83 85 /**
84   - * Signal the event loop to stop, wait for all pending commands to be processed,
85   - * and shut everything down. A simple combination of stop_signal() and block().
  86 + * Blocks until the event loop exits and disconnection is complete, then returns.
  87 + * Usually no need to call manually as it is handled in the destructor.
86 88 */
87   - void stop();
  89 + void wait();
88 90  
89 91 /**
90 92 * Asynchronously runs a command and invokes the callback when a reply is
... ... @@ -101,7 +103,7 @@ public:
101 103 /**
102 104 * Asynchronously runs a command and ignores any errors or replies.
103 105 */
104   - void command(const std::string& cmd) { command<redisReply*>(cmd, nullptr); }
  106 + void command(const std::string& cmd);
105 107  
106 108 /**
107 109 * Synchronously runs a command, returning the Command object only once
... ... @@ -109,150 +111,62 @@ public:
109 111 * calling the Command object's .free() method when done with it.
110 112 */
111 113 template<class ReplyT>
112   - Command<ReplyT>& command_blocking(const std::string& cmd);
  114 + Command<ReplyT>& commandSync(const std::string& cmd);
113 115  
114 116 /**
115 117 * Synchronously runs a command, returning only once a reply is received
116   - * or there's an error. The return value is true if the command got a
117   - * successful reply, and false if something went wrong.
  118 + * or there's an error. Returns true on successful reply, false on error.
118 119 */
119   - bool command_blocking(const std::string& cmd);
  120 + bool commandSync(const std::string& cmd);
120 121  
  122 + /**
  123 + * Creates an asynchronous command that is run every [repeat] seconds,
  124 + * with the first one run in [after] seconds. If [repeat] is 0, the
  125 + * command is run only once.
  126 + */
121 127 template<class ReplyT>
122   - Command<ReplyT>& command_looping(
  128 + Command<ReplyT>& commandLoop(
123 129 const std::string& cmd,
124 130 const std::function<void(Command<ReplyT>&)>& callback,
125 131 double repeat,
126 132 double after = 0.0
127 133 );
128 134  
129   - /**
130   - * A wrapper around command() for synchronous use. Waits for a reply, populates it
131   - * into the Command object, and returns when complete. The user can retrieve the
132   - * results from the Command object - ok() will tell you if the call succeeded,
133   - * status() will give the error code, and reply() will return the reply data if
134   - * the call succeeded.
135   - */
136   -// template<class ReplyT>
137   -// Command<ReplyT>& command_blocking(const std::string& cmd);
138   -
139   - /**
140   - * Return the total number of successful commands processed by this Redox instance.
141   - */
142   - long num_commands_processed() { return cmd_count; }
143   -
144   - // Hiredis context, left public to allow low-level access
145   - redisAsyncContext *ctx;
146   -
147   - /**
148   - * If connected, disconnect from the Redis server. Usually not necessary to invoke
149   - * manually, as it is called in the destructor.
150   - */
151   - void disconnect();
152   -
153 135 // ------------------------------------------------
154 136 // Wrapper methods for convenience only
155 137 // ------------------------------------------------
156 138  
157 139 /**
158   - * Non-templated version of command in case you really don't care
159   - * about the reply and just want to send something off.
160   - */
161   -// void command(const std::string& command);
162   -
163   - /**
164   - * Non-templated version of command_blocking in case you really don't
165   - * care about the reply. Returns true if succeeded, false if error.
166   - */
167   -// bool command_blocking(const std::string& command);
168   -
169   - /**
170 140 * Redis GET command wrapper - return the value for the given key, or throw
171   - * an exception if there is an error. Blocking call, of course.
  141 + * an exception if there is an error. Blocking call.
172 142 */
173 143 std::string get(const std::string& key);
174 144  
175 145 /**
176 146 * Redis SET command wrapper - set the value for the given key. Return
177   - * true if succeeded, false if error.
  147 + * true if succeeded, false if error. Blocking call.
178 148 */
179 149 bool set(const std::string& key, const std::string& value);
180 150  
181 151 /**
182 152 * Redis DEL command wrapper - delete the given key. Return true if succeeded,
183   - * false if error.
  153 + * false if error. Blocking call.
184 154 */
185 155 bool del(const std::string& key);
186 156  
187   - // ------------------------------------------------
188   - // Publish/subscribe
189   - // ------------------------------------------------
190   -
191   - // This is activated when subscribe is called. When active,
192   - // all commands other than [P]SUBSCRIBE, [P]UNSUBSCRIBE
193   - // throw exceptions
194   - std::atomic_bool pubsub_mode = {false};
195   -
196   - /**
197   - * Subscribe to a topic.
198   - *
199   - * msg_callback: invoked whenever a message is received.
200   - * sub_callback: invoked when successfully subscribed
201   - * err_callback: invoked on some error state
202   - */
203   - void subscribe(const std::string topic,
204   - std::function<void(const std::string&, const std::string&)> msg_callback,
205   - std::function<void(const std::string&)> sub_callback = nullptr,
206   - std::function<void(const std::string&)> unsub_callback = nullptr,
207   - std::function<void(const std::string&, int)> err_callback = nullptr
208   - );
209   -
210   - /**
211   - * Subscribe to a topic with a pattern.
212   - *
213   - * msg_callback: invoked whenever a message is received.
214   - * sub_callback: invoked when successfully subscribed
215   - * err_callback: invoked on some error state
216   - */
217   - void psubscribe(const std::string topic,
218   - std::function<void(const std::string&, const std::string&)> msg_callback,
219   - std::function<void(const std::string&)> sub_callback = nullptr,
220   - std::function<void(const std::string&)> unsub_callback = nullptr,
221   - std::function<void(const std::string&, int)> err_callback = nullptr
222   - );
223   -
224 157 /**
225 158 * Publish to a topic. All subscribers will be notified.
226 159 *
227 160 * pub_callback: invoked when successfully published
228 161 * err_callback: invoked on some error state
229   - */
230   - void publish(const std::string topic, const std::string msg,
231   - std::function<void(const std::string&, const std::string&)> pub_callback = nullptr,
232   - std::function<void(const std::string&, int)> err_callback = nullptr
233   - );
234   -
235   - /**
236   - * Unsubscribe from a topic.
237 162 *
238   - * err_callback: invoked on some error state
  163 + * // TODO
239 164 */
240   - void unsubscribe(const std::string topic,
241   - std::function<void(const std::string&, int)> err_callback = nullptr
242   - );
243   -
244   - /**
245   - * Unsubscribe from a topic with a pattern.
246   - *
247   - * err_callback: invoked on some error state
248   - */
249   - void punsubscribe(const std::string topic,
250   - std::function<void(const std::string&, int)> err_callback = nullptr
  165 + void publish(const std::string topic, const std::string msg,
  166 + std::function<void(const std::string&, const std::string&)> pub_callback = nullptr,
  167 + std::function<void(const std::string&, int)> err_callback = nullptr
251 168 );
252 169  
253   - const std::set<std::string>& subscribed_topics() { return subscribed_topics_; }
254   - const std::set<std::string>& psubscribed_topics() { return psubscribed_topics_; }
255   -
256 170 // ------------------------------------------------
257 171 // Public only for Command class
258 172 // ------------------------------------------------
... ... @@ -260,20 +174,23 @@ public:
260 174 // Invoked by Command objects when they are completed
261 175 template<class ReplyT>
262 176 void remove_active_command(const long id) {
263   - std::lock_guard<std::mutex> lg1(command_map_guard);
264   - get_command_map<ReplyT>().erase(id);
265   - commands_deleted += 1;
  177 + std::lock_guard<std::mutex> lg1(command_map_guard_);
  178 + getCommandMap<ReplyT>().erase(id);
  179 + commands_deleted_ += 1;
266 180 }
267 181  
  182 + // Hiredis context, left public to allow low-level access
  183 + redisAsyncContext * ctx_;
  184 +
268 185 // Redox server over TCP
269   - const std::string host;
270   - const int port;
  186 + const std::string host_;
  187 + const int port_;
271 188  
272 189 // Redox server over unix
273   - const std::string path;
  190 + const std::string path_;
274 191  
275 192 // Logger
276   - log::Logger logger;
  193 + log::Logger logger_;
277 194  
278 195 private:
279 196  
... ... @@ -291,108 +208,90 @@ private:
291 208 void init_hiredis();
292 209  
293 210 // Manage connection state
294   - std::atomic_int connect_state = {REDOX_NOT_YET_CONNECTED};
295   - std::mutex connect_lock;
296   - std::condition_variable connect_waiter;
  211 + std::atomic_int connect_state_ = {NOT_YET_CONNECTED};
  212 + std::mutex connect_lock_;
  213 + std::condition_variable connect_waiter_;
297 214  
298 215 // User connect/disconnect callbacks
299   - std::function<void(int)> user_connection_callback;
  216 + std::function<void(int)> user_connection_callback_;
300 217  
301 218 // Dynamically allocated libev event loop
302   - struct ev_loop* evloop;
  219 + struct ev_loop* evloop_;
303 220  
304 221 // Asynchronous watchers
305   - ev_async async_w; // For processing commands
306   - ev_async async_stop; // For breaking the loop
307   -
308   - // Number of commands processed
309   - std::atomic_long cmd_count = {0};
  222 + ev_async watcher_command_; // For processing commands
  223 + ev_async watcher_stop_; // For breaking the loop
310 224  
311 225 // Track of Command objects allocated. Also provides unique Command IDs.
312   - std::atomic_long commands_created = {0};
313   - std::atomic_long commands_deleted = {0};
  226 + std::atomic_long commands_created_ = {0};
  227 + std::atomic_long commands_deleted_ = {0};
314 228  
315 229 // Separate thread to have a non-blocking event loop
316   - std::thread event_loop_thread;
  230 + std::thread event_loop_thread_;
317 231  
318 232 // Variable and CV to know when the event loop starts running
319   - std::atomic_bool running = {false};
320   - std::mutex running_waiter_lock;
321   - std::condition_variable running_waiter;
  233 + std::atomic_bool running_ = {false};
  234 + std::mutex running_waiter_lock_;
  235 + std::condition_variable running_waiter_;
322 236  
323 237 // Variable and CV to know when the event loop stops running
324   - std::atomic_bool to_exit = {false}; // Signal to exit
325   - std::atomic_bool exited = {false}; // Event thread exited
326   - std::mutex exit_waiter_lock;
327   - std::condition_variable exit_waiter;
  238 + std::atomic_bool to_exit_ = {false}; // Signal to exit
  239 + std::atomic_bool exited_ = {false}; // Event thread exited
  240 + std::mutex exit_waiter_lock_;
  241 + std::condition_variable exit_waiter_;
328 242  
329 243 // Maps of each Command, fetchable by the unique ID number
330   - std::unordered_map<long, Command<redisReply*>*> commands_redis_reply;
331   - std::unordered_map<long, Command<std::string>*> commands_string_r;
332   - std::unordered_map<long, Command<char*>*> commands_char_p;
333   - std::unordered_map<long, Command<int>*> commands_int;
334   - std::unordered_map<long, Command<long long int>*> commands_long_long_int;
335   - std::unordered_map<long, Command<std::nullptr_t>*> commands_null;
336   - std::unordered_map<long, Command<std::vector<std::string>>*> commands_vector_string;
337   - std::unordered_map<long, Command<std::set<std::string>>*> commands_set_string;
338   - std::unordered_map<long, Command<std::unordered_set<std::string>>*> commands_unordered_set_string;
339   - std::mutex command_map_guard; // Guards access to all of the above
  244 + // In C++14, member variable templates will replace all of these types
  245 + // with a single templated declaration
  246 + // ---------
  247 + // template<class ReplyT>
  248 + // std::unordered_map<long, Command<ReplyT>*> commands_;
  249 + // ---------
  250 + std::unordered_map<long, Command<redisReply*>*> commands_redis_reply_;
  251 + std::unordered_map<long, Command<std::string>*> commands_string_;
  252 + std::unordered_map<long, Command<char*>*> commands_char_p_;
  253 + std::unordered_map<long, Command<int>*> commands_int_;
  254 + std::unordered_map<long, Command<long long int>*> commands_long_long_int_;
  255 + std::unordered_map<long, Command<std::nullptr_t>*> commands_null_;
  256 + std::unordered_map<long, Command<std::vector<std::string>>*> commands_vector_string_;
  257 + std::unordered_map<long, Command<std::set<std::string>>*> commands_set_string_;
  258 + std::unordered_map<long, Command<std::unordered_set<std::string>>*> commands_unordered_set_string_;
  259 + std::mutex command_map_guard_; // Guards access to all of the above
340 260  
341 261 // Return the correct map from the above, based on the template specialization
342 262 template<class ReplyT>
343   - std::unordered_map<long, Command<ReplyT>*>& get_command_map();
  263 + std::unordered_map<long, Command<ReplyT>*>& getCommandMap();
344 264  
345 265 // Return the given Command from the relevant command map, or nullptr if not there
346 266 template<class ReplyT>
347   - Command<ReplyT>* find_command(long id);
  267 + Command<ReplyT>* findCommand(long id);
348 268  
349 269 std::queue<long> command_queue;
350 270 std::mutex queue_guard;
351   - static void process_queued_commands(struct ev_loop* loop, ev_async* async, int revents);
  271 + static void proccessQueuedCommands(struct ev_loop* loop, ev_async* async, int revents);
352 272  
353 273 template<class ReplyT>
354   - bool process_queued_command(long id);
  274 + bool proccessQueuedCommand(long id);
355 275  
356   - void run_event_loop();
  276 + void runEventLoop();
357 277  
358 278 // Callbacks invoked on server connection/disconnection
359   - static void connected_callback(const redisAsyncContext *c, int status);
360   - static void disconnected_callback(const redisAsyncContext *c, int status);
  279 + static void connectedCallback(const redisAsyncContext* c, int status);
  280 + static void disconnectedCallback(const redisAsyncContext* c, int status);
361 281  
362 282 template<class ReplyT>
363   - static void command_callback(redisAsyncContext *ctx, void *r, void *privdata);
  283 + static void commandCallback(redisAsyncContext* ctx, void* r, void* privdata);
364 284  
365 285 template<class ReplyT>
366   - static bool submit_to_server(Command<ReplyT>* c);
  286 + static bool submitToServer(Command<ReplyT>* c);
367 287  
368 288 template<class ReplyT>
369   - static void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents);
  289 + static void submitCommandCallback(struct ev_loop* loop, ev_timer* timer, int revents);
370 290  
371   - void deny_non_pubsub(const std::string& cmd);
372   -
373   - // Base for subscribe and psubscribe
374   - void subscribe_raw(const std::string cmd_name, const std::string topic,
375   - std::function<void(const std::string&, const std::string&)> msg_callback,
376   - std::function<void(const std::string&)> sub_callback = nullptr,
377   - std::function<void(const std::string&)> unsub_callback = nullptr,
378   - std::function<void(const std::string&, int)> err_callback = nullptr
379   - );
380   -
381   - // Base for unsubscribe and punsubscribe
382   - void unsubscribe_raw(const std::string cmd_name, const std::string topic,
383   - std::function<void(const std::string&, int)> err_callback = nullptr
384   - );
385   -
386   - // Keep track of topics because we can only unsubscribe
387   - // from subscribed topics and punsubscribe from
388   - // psubscribed topics, or hiredis leads to segfaults
389   - std::set<std::string> subscribed_topics_;
390   - std::set<std::string> psubscribed_topics_;
391 291 };
392 292  
393 293 // ---------------------------
394 294  
395   -
396 295 template<class ReplyT>
397 296 Command<ReplyT>& Redox::createCommand(
398 297 const std::string& cmd,
... ... @@ -402,27 +301,22 @@ Command&lt;ReplyT&gt;&amp; Redox::createCommand(
402 301 bool free_memory
403 302 ) {
404 303  
405   - if(!running) {
406   - throw std::runtime_error("[ERROR] Need to start Redox before running commands!");
407   - }
408   -
409   - // Block if pubsub mode
410   - if(pubsub_mode) {
411   - deny_non_pubsub(cmd);
  304 + if(!running_) {
  305 + throw std::runtime_error("[ERROR] Need to connect Redox before running commands!");
412 306 }
413 307  
414   - commands_created += 1;
415   - auto* c = new Command<ReplyT>(this, commands_created, cmd,
416   - callback, repeat, after, free_memory, logger);
  308 + commands_created_ += 1;
  309 + auto* c = new Command<ReplyT>(this, commands_created_, cmd,
  310 + callback, repeat, after, free_memory, logger_);
417 311  
418 312 std::lock_guard<std::mutex> lg(queue_guard);
419   - std::lock_guard<std::mutex> lg2(command_map_guard);
  313 + std::lock_guard<std::mutex> lg2(command_map_guard_);
420 314  
421   - get_command_map<ReplyT>()[c->id_] = c;
  315 + getCommandMap<ReplyT>()[c->id_] = c;
422 316 command_queue.push(c->id_);
423 317  
424 318 // Signal the event loop to process this command
425   - ev_async_send(evloop, &async_w);
  319 + ev_async_send(evloop_, &watcher_command_);
426 320  
427 321 // logger.debug() << "Created Command " << c->id << " at " << c;
428 322  
... ... @@ -438,7 +332,7 @@ void Redox::command(
438 332 }
439 333  
440 334 template<class ReplyT>
441   -Command<ReplyT>& Redox::command_looping(
  335 +Command<ReplyT>& Redox::commandLoop(
442 336 const std::string& cmd,
443 337 const std::function<void(Command<ReplyT>&)>& callback,
444 338 double repeat,
... ... @@ -448,7 +342,7 @@ Command&lt;ReplyT&gt;&amp; Redox::command_looping(
448 342 }
449 343  
450 344 template<class ReplyT>
451   -Command<ReplyT>& Redox::command_blocking(const std::string& cmd) {
  345 +Command<ReplyT>& Redox::commandSync(const std::string& cmd) {
452 346 auto& c = createCommand<ReplyT>(cmd, nullptr, 0, 0, false);
453 347 c.wait();
454 348 return c;
... ...
src/subscriber.cpp 0 โ†’ 100644
  1 +/**
  2 +* Redis C++11 wrapper.
  3 +*/
  4 +
  5 +#include <string.h>
  6 +#include "subscriber.hpp"
  7 +
  8 +using namespace std;
  9 +
  10 +namespace redox {
  11 +
  12 +Subscriber::Subscriber(
  13 + const std::string& host, const int port,
  14 + std::function<void(int)> connection_callback,
  15 + std::ostream& log_stream, log::Level log_level
  16 +) : rdx_(host, port, connection_callback, log_stream, log_level),
  17 + logger_(rdx_.logger_) {}
  18 +
  19 +Subscriber::Subscriber(
  20 + const std::string& path,
  21 + std::function<void(int)> connection_callback,
  22 + std::ostream& log_stream, log::Level log_level
  23 +) : rdx_(path, connection_callback, log_stream, log_level),
  24 + logger_(rdx_.logger_) {}
  25 +
  26 +
  27 +// For debugging only
  28 +void debugReply(Command<redisReply*> c) {
  29 +
  30 + redisReply* reply = c.reply();
  31 +
  32 + cout << "------" << endl;
  33 + cout << c.cmd() << " " << (reply->type == REDIS_REPLY_ARRAY) << " " << (reply->elements) << endl;
  34 + for(size_t i = 0; i < reply->elements; i++) {
  35 + redisReply* r = reply->element[i];
  36 + cout << "element " << i << ", reply type = " << r->type << " ";
  37 + if(r->type == REDIS_REPLY_STRING) cout << r->str << endl;
  38 + else if(r->type == REDIS_REPLY_INTEGER) cout << r->integer << endl;
  39 + else cout << "some other type" << endl;
  40 + }
  41 + cout << "------" << endl;
  42 +}
  43 +
  44 +void Subscriber::subscribeBase(const string cmd_name, const string topic,
  45 + function<void(const string&, const string&)> msg_callback,
  46 + function<void(const string&)> sub_callback,
  47 + function<void(const string&)> unsub_callback,
  48 + function<void(const string&, int)> err_callback
  49 +) {
  50 +
  51 + rdx_.commandLoop<redisReply*>(cmd_name + " " + topic,
  52 + [this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command<redisReply*>& c) {
  53 +
  54 + if (!c.ok()) {
  55 + if (err_callback) err_callback(topic, c.status());
  56 + return;
  57 + }
  58 +
  59 + redisReply* reply = c.reply();
  60 +
  61 + // TODO cancel this command on unsubscription?
  62 +
  63 + // If the last entry is an integer, then it is a [p]sub/[p]unsub command
  64 + if ((reply->type == REDIS_REPLY_ARRAY) &&
  65 + (reply->element[reply->elements - 1]->type == REDIS_REPLY_INTEGER)) {
  66 +
  67 + if (!strncmp(reply->element[0]->str, "sub", 3)) {
  68 + subscribed_topics_.insert(topic);
  69 + if (sub_callback) sub_callback(topic);
  70 +
  71 + } else if (!strncmp(reply->element[0]->str, "psub", 4)) {
  72 + psubscribed_topics_.insert(topic);
  73 + if (sub_callback) sub_callback(topic);
  74 +
  75 + } else if (!strncmp(reply->element[0]->str, "uns", 3)) {
  76 + subscribed_topics_.erase(topic);
  77 + if (unsub_callback) unsub_callback(topic);
  78 +
  79 + } else if (!strncmp(reply->element[0]->str, "puns", 4)) {
  80 + psubscribed_topics_.erase(topic);
  81 + if (unsub_callback) unsub_callback(topic);
  82 + }
  83 +
  84 + else logger_.error() << "Unknown pubsub message: " << reply->element[0]->str;
  85 + }
  86 +
  87 + // Message for subscribe
  88 + else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) {
  89 + char* msg = reply->element[2]->str;
  90 + if (msg && msg_callback) msg_callback(topic, reply->element[2]->str);
  91 + }
  92 +
  93 + // Message for psubscribe
  94 + else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 4)) {
  95 + char* msg = reply->element[2]->str;
  96 + if (msg && msg_callback) msg_callback(reply->element[2]->str, reply->element[3]->str);
  97 + }
  98 +
  99 + else logger_.error() << "Unknown pubsub message of type " << reply->type;
  100 + },
  101 + 1e10 // To keep the command around for a few hundred years
  102 + );
  103 +}
  104 +
  105 +void Subscriber::subscribe(const string topic,
  106 + function<void(const string&, const string&)> msg_callback,
  107 + function<void(const string&)> sub_callback,
  108 + function<void(const string&)> unsub_callback,
  109 + function<void(const string&, int)> err_callback
  110 +) {
  111 + if(subscribed_topics_.find(topic) != subscribed_topics_.end()) {
  112 + logger_.warning() << "Already subscribed to " << topic << "!";
  113 + return;
  114 + }
  115 + subscribeBase("SUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback);
  116 +}
  117 +
  118 +void Subscriber::psubscribe(const string topic,
  119 + function<void(const string&, const string&)> msg_callback,
  120 + function<void(const string&)> sub_callback,
  121 + function<void(const string&)> unsub_callback,
  122 + function<void(const string&, int)> err_callback
  123 +) {
  124 + if(psubscribed_topics_.find(topic) != psubscribed_topics_.end()) {
  125 + logger_.warning() << "Already psubscribed to " << topic << "!";
  126 + return;
  127 + }
  128 + subscribeBase("PSUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback);
  129 +}
  130 +
  131 +void Subscriber::unsubscribeBase(const string cmd_name, const string topic,
  132 + function<void(const string&, int)> err_callback
  133 +) {
  134 + rdx_.command<redisReply*>(cmd_name + " " + topic,
  135 + [topic, err_callback](Command<redisReply*>& c) {
  136 + if(!c.ok()) {
  137 + if (err_callback) err_callback(topic, c.status());
  138 + }
  139 + }
  140 + );
  141 +}
  142 +
  143 +void Subscriber::unsubscribe(const string topic,
  144 + function<void(const string&, int)> err_callback
  145 +) {
  146 + if(subscribed_topics_.find(topic) == subscribed_topics_.end()) {
  147 + logger_.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!";
  148 + return;
  149 + }
  150 + unsubscribeBase("UNSUBSCRIBE", topic, err_callback);
  151 +}
  152 +
  153 +void Subscriber::punsubscribe(const string topic,
  154 + function<void(const string&, int)> err_callback
  155 +) {
  156 + if(psubscribed_topics_.find(topic) == psubscribed_topics_.end()) {
  157 + logger_.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!";
  158 + return;
  159 + }
  160 + unsubscribeBase("PUNSUBSCRIBE", topic, err_callback);
  161 +}
  162 +
  163 +} // End namespace
... ...
src/subscriber.hpp 0 โ†’ 100644
  1 +/**
  2 +* Redis C++11 wrapper.
  3 +*/
  4 +
  5 +#pragma once
  6 +
  7 +#include "redox.hpp"
  8 +
  9 +namespace redox {
  10 +
  11 +class Subscriber {
  12 +
  13 +public:
  14 +
  15 + /**
  16 + * Initializes everything, connects over TCP to a Redis server.
  17 + */
  18 + Subscriber(
  19 + const std::string& host = REDIS_DEFAULT_HOST,
  20 + const int port = REDIS_DEFAULT_PORT,
  21 + std::function<void(int)> connection_callback = nullptr,
  22 + std::ostream& log_stream = std::cout,
  23 + log::Level log_level = log::Info
  24 + );
  25 +
  26 + /**
  27 + * Initializes everything, connects over unix sockets to a Redis server.
  28 + */
  29 + Subscriber(
  30 + const std::string& path,
  31 + std::function<void(int)> connection_callback,
  32 + std::ostream& log_stream = std::cout,
  33 + log::Level log_level = log::Info
  34 + );
  35 +
  36 + /**
  37 + * Same as .connect() on a Redox instance.
  38 + */
  39 + bool connect() { return rdx_.connect(); }
  40 +
  41 + /**
  42 + * Same as .disconnect() on a Redox instance.
  43 + */
  44 + void disconnect() { return rdx_.disconnect(); }
  45 +
  46 + /**
  47 + * Same as .wait() on a Redox instance.
  48 + */
  49 + void wait() { return rdx_.wait(); }
  50 +
  51 + /**
  52 + * Subscribe to a topic.
  53 + *
  54 + * msg_callback: invoked whenever a message is received.
  55 + * sub_callback: invoked when successfully subscribed
  56 + * err_callback: invoked on some error state
  57 + */
  58 + void subscribe(const std::string topic,
  59 + std::function<void(const std::string&, const std::string&)> msg_callback,
  60 + std::function<void(const std::string&)> sub_callback = nullptr,
  61 + std::function<void(const std::string&)> unsub_callback = nullptr,
  62 + std::function<void(const std::string&, int)> err_callback = nullptr
  63 + );
  64 +
  65 + /**
  66 + * Subscribe to a topic with a pattern.
  67 + *
  68 + * msg_callback: invoked whenever a message is received.
  69 + * sub_callback: invoked when successfully subscribed
  70 + * err_callback: invoked on some error state
  71 + */
  72 + void psubscribe(const std::string topic,
  73 + std::function<void(const std::string&, const std::string&)> msg_callback,
  74 + std::function<void(const std::string&)> sub_callback = nullptr,
  75 + std::function<void(const std::string&)> unsub_callback = nullptr,
  76 + std::function<void(const std::string&, int)> err_callback = nullptr
  77 + );
  78 +
  79 + /**
  80 + * Unsubscribe from a topic.
  81 + *
  82 + * err_callback: invoked on some error state
  83 + */
  84 + void unsubscribe(const std::string topic,
  85 + std::function<void(const std::string&, int)> err_callback = nullptr
  86 + );
  87 +
  88 + /**
  89 + * Unsubscribe from a topic with a pattern.
  90 + *
  91 + * err_callback: invoked on some error state
  92 + */
  93 + void punsubscribe(const std::string topic,
  94 + std::function<void(const std::string&, int)> err_callback = nullptr
  95 + );
  96 +
  97 + /**
  98 + * Return the topics that were subscribed() to.
  99 + */
  100 + const std::set<std::string>& subscribedTopics() { return subscribed_topics_; }
  101 +
  102 + /**
  103 + * Return the topic patterns that were psubscribed() to.
  104 + */
  105 + const std::set<std::string>& psubscribedTopics() { return psubscribed_topics_; }
  106 +
  107 +private:
  108 +
  109 + // Base for subscribe and psubscribe
  110 + void subscribeBase(const std::string cmd_name, const std::string topic,
  111 + std::function<void(const std::string&, const std::string&)> msg_callback,
  112 + std::function<void(const std::string&)> sub_callback = nullptr,
  113 + std::function<void(const std::string&)> unsub_callback = nullptr,
  114 + std::function<void(const std::string&, int)> err_callback = nullptr
  115 + );
  116 +
  117 + // Base for unsubscribe and punsubscribe
  118 + void unsubscribeBase(const std::string cmd_name, const std::string topic,
  119 + std::function<void(const std::string&, int)> err_callback = nullptr
  120 + );
  121 +
  122 + // Underlying Redis client
  123 + Redox rdx_;
  124 +
  125 + // Keep track of topics because we can only unsubscribe
  126 + // from subscribed topics and punsubscribe from
  127 + // psubscribed topics, or hiredis leads to segfaults
  128 + std::set<std::string> subscribed_topics_;
  129 + std::set<std::string> psubscribed_topics_;
  130 +
  131 + // Reference to rdx_.logger_ for convenience
  132 + log::Logger& logger_;
  133 +};
  134 +
  135 +} // End namespace
... ...
test/test.cpp
... ... @@ -25,18 +25,13 @@ protected:
25 25 RedoxTest() {
26 26  
27 27 // Connect to the server
28   - rdx.start();
  28 + rdx.connect();
29 29  
30 30 // Clear all keys used by the tests here
31 31 rdx.command("DEL redox_test:a");
32 32 }
33 33  
34   - virtual ~RedoxTest() {
35   -
36   - // Block until the event loop exits.
37   - // Each test is responsible for calling the stop_signal()
38   - rdx.block();
39   - }
  34 + virtual ~RedoxTest() { }
40 35  
41 36 // CV and counter to wait for async commands to complete
42 37 atomic_int cmd_count = {0};
... ... @@ -85,10 +80,9 @@ protected:
85 80 * Wait until all async commands that used check() as a callback
86 81 * complete.
87 82 */
88   - void wait_and_stop() {
  83 + void wait_for_replies() {
89 84 unique_lock<mutex> ul(cmd_waiter_lock);
90 85 cmd_waiter.wait(ul, [this] { return (cmd_count == 0); });
91   - rdx.stop_signal();
92 86 };
93 87  
94 88 template<class ReplyT>
... ... @@ -114,14 +108,14 @@ protected:
114 108 TEST_F(RedoxTest, GetSet) {
115 109 rdx.command<string>("SET redox_test:a apple", print_and_check<string>("OK"));
116 110 rdx.command<string>("GET redox_test:a", print_and_check<string>("apple"));
117   - wait_and_stop();
  111 + wait_for_replies();
118 112 }
119 113  
120 114 TEST_F(RedoxTest, Delete) {
121 115 rdx.command<string>("SET redox_test:a apple", print_and_check<string>("OK"));
122 116 rdx.command<int>("DEL redox_test:a", print_and_check(1));
123 117 rdx.command<nullptr_t>("GET redox_test:a", check(nullptr));
124   - wait_and_stop();
  118 + wait_for_replies();
125 119 }
126 120  
127 121 TEST_F(RedoxTest, Incr) {
... ... @@ -130,22 +124,22 @@ TEST_F(RedoxTest, Incr) {
130 124 rdx.command<int>("INCR redox_test:a", check(i+1));
131 125 }
132 126 rdx.command<string>("GET redox_test:a", print_and_check(to_string(count)));
133   - wait_and_stop();
  127 + wait_for_replies();
134 128 }
135 129  
136 130 TEST_F(RedoxTest, Delayed) {
137   - Command<int>& c = rdx.command_looping<int>("INCR redox_test:a", check(1), 0, 0.1);
  131 + Command<int>& c = rdx.commandLoop<int>("INCR redox_test:a", check(1), 0, 0.1);
138 132 this_thread::sleep_for(chrono::milliseconds(150));
139 133 c.cancel();
140 134 rdx.command<string>("GET redox_test:a", print_and_check(to_string(1)));
141   - wait_and_stop();
  135 + wait_for_replies();
142 136 }
143 137  
144 138 TEST_F(RedoxTest, Loop) {
145 139 int count = 0;
146 140 int target_count = 100;
147 141 double dt = 0.001;
148   - Command<int>& cmd = rdx.command_looping<int>("INCR redox_test:a",
  142 + Command<int>& cmd = rdx.commandLoop<int>("INCR redox_test:a",
149 143 [this, &count](Command<int>& c) {
150 144 check(++count)(c);
151 145 },
... ... @@ -157,7 +151,7 @@ TEST_F(RedoxTest, Loop) {
157 151 cmd.cancel();
158 152  
159 153 rdx.command<string>("GET redox_test:a", print_and_check(to_string(target_count)));
160   - wait_and_stop();
  154 + wait_for_replies();
161 155 }
162 156  
163 157 // -------------------------------------------
... ... @@ -165,25 +159,22 @@ TEST_F(RedoxTest, Loop) {
165 159 // -------------------------------------------
166 160  
167 161 TEST_F(RedoxTest, GetSetSync) {
168   - print_and_check_sync<string>(rdx.command_blocking<string>("SET redox_test:a apple"), "OK");
169   - print_and_check_sync<string>(rdx.command_blocking<string>("GET redox_test:a"), "apple");
170   - rdx.stop_signal();
  162 + print_and_check_sync<string>(rdx.commandSync<string>("SET redox_test:a apple"), "OK");
  163 + print_and_check_sync<string>(rdx.commandSync<string>("GET redox_test:a"), "apple");
171 164 }
172 165  
173 166 TEST_F(RedoxTest, DeleteSync) {
174   - print_and_check_sync<string>(rdx.command_blocking<string>("SET redox_test:a apple"), "OK");
175   - print_and_check_sync(rdx.command_blocking<int>("DEL redox_test:a"), 1);
176   - check_sync(rdx.command_blocking<nullptr_t>("GET redox_test:a"), nullptr);
177   - rdx.stop_signal();
  167 + print_and_check_sync<string>(rdx.commandSync<string>("SET redox_test:a apple"), "OK");
  168 + print_and_check_sync(rdx.commandSync<int>("DEL redox_test:a"), 1);
  169 + check_sync(rdx.commandSync<nullptr_t>("GET redox_test:a"), nullptr);
178 170 }
179 171  
180 172 TEST_F(RedoxTest, IncrSync) {
181 173 int count = 100;
182 174 for(int i = 0; i < count; i++) {
183   - check_sync(rdx.command_blocking<int>("INCR redox_test:a"), i+1);
  175 + check_sync(rdx.commandSync<int>("INCR redox_test:a"), i+1);
184 176 }
185   - print_and_check_sync(rdx.command_blocking<string>("GET redox_test:a"), to_string(count));
186   - rdx.stop_signal();
  177 + print_and_check_sync(rdx.commandSync<string>("GET redox_test:a"), to_string(count));
187 178 }
188 179  
189 180 // -------------------------------------------
... ...