Commit 46752c1e4b5915762fda4263387cb480211efc35

Authored by Hayk Martirosyan
1 parent 0d5e3bdf

num_commands_processed() counts repeated commands too

Created a pointer Command.rdx (Redox*) to allow commands
to increment Redox.cmd_count, and simplify a couple of
other things.

Did a little bit of moving and renaming context objects,
so the context is always .ctx and commands can always
be called c.
README.md
1 1 redox
2 2 ======
3 3  
4   -Asynchronous, minimalistic, and wicked fast C++11 bindings for Redis
  4 +High-level, asynchronous, and wicked fast C++11 bindings for Redis
5 5  
6 6 Work in progress, details coming soon.
... ...
examples/simple_loop.cpp
1 1 /**
2   -* Basic asynchronous calls using redisx.
  2 +* Basic asynchronous calls using redox.
3 3 */
4 4  
5 5 #include <iostream>
... ... @@ -69,5 +69,6 @@ int main(int argc, char* argv[]) {
69 69 cout << "Sent " << count << " commands in " << t_elapsed << "s, "
70 70 << "that's " << actual_freq << " commands/s." << endl;
71 71  
  72 + cout << "rdx.num_commands_processed() = " << rdx.num_commands_processed() << endl;
72 73 return 0;
73 74 }
... ...
examples/simple_sync_loop.cpp
1 1 /**
2   -* Basic asynchronous calls using redisx.
  2 +* Basic synchronous calls using redox.
3 3 */
4 4  
5 5 #include <iostream>
... ... @@ -44,5 +44,6 @@ int main(int argc, char* argv[]) {
44 44 cout << "Sent " << count << " commands in " << t_elapsed << "s, "
45 45 << "that's " << actual_freq << " commands/s." << endl;
46 46  
  47 + cout << "rdx.num_commands_processed() = " << rdx.num_commands_processed() << endl;
47 48 return 0;
48 49 }
... ...
src/command.hpp
... ... @@ -23,6 +23,8 @@ static const int REDOX_NIL_REPLY = 3;
23 23 static const int REDOX_ERROR_REPLY = 4;
24 24 static const int REDOX_TIMEOUT = 5;
25 25  
  26 +class Redox;
  27 +
26 28 template<class ReplyT>
27 29 class Command {
28 30  
... ... @@ -30,7 +32,7 @@ friend class Redox;
30 32  
31 33 public:
32 34 Command(
33   - redisAsyncContext* c,
  35 + Redox* rdx,
34 36 const std::string& cmd,
35 37 const std::function<void(const std::string&, const ReplyT&)>& callback,
36 38 const std::function<void(const std::string&, int status)>& error_callback,
... ... @@ -38,13 +40,14 @@ public:
38 40 bool free_memory
39 41 );
40 42  
  43 + Redox* rdx;
  44 +
41 45 const std::string cmd;
42 46 const double repeat;
43 47 const double after;
44 48  
45 49 const bool free_memory;
46 50  
47   - redisAsyncContext* c;
48 51 redisReply* reply_obj;
49 52  
50 53 std::atomic_int pending;
... ... @@ -92,26 +95,29 @@ private:
92 95  
93 96 template<class ReplyT>
94 97 Command<ReplyT>::Command(
95   - redisAsyncContext* c,
  98 + Redox* rdx,
96 99 const std::string& cmd,
97 100 const std::function<void(const std::string&, const ReplyT&)>& callback,
98 101 const std::function<void(const std::string&, int status)>& error_callback,
99 102 double repeat, double after, bool free_memory
100   -) : cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), c(c), reply_obj(NULL),
  103 +) : rdx(rdx), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), reply_obj(NULL),
101 104 pending(0), callback(callback), error_callback(error_callback), completed(false)
102 105 {
103 106 timer_guard.lock();
104 107 }
105 108  
106 109 template<class ReplyT>
107   -void Command<ReplyT>::command_callback(redisAsyncContext *c, void *r, void *privdata) {
  110 +void Command<ReplyT>::command_callback(redisAsyncContext *ctx, void *r, void *privdata) {
108 111  
109   - auto *cmd_obj = (Command<ReplyT> *) privdata;
110   - cmd_obj->reply_obj = (redisReply *) r;
111   - cmd_obj->invoke_callback();
  112 + auto *c = (Command<ReplyT> *) privdata;
  113 + c->reply_obj = (redisReply *) r;
  114 + c->invoke_callback();
112 115  
113 116 // Free the reply object unless told not to
114   - if(cmd_obj->free_memory) cmd_obj->free_reply_object();
  117 + if(c->free_memory) c->free_reply_object();
  118 +
  119 + // Increment the Redox object command counter
  120 + c->rdx->incr_cmd_count();
115 121 }
116 122  
117 123 template<class ReplyT>
... ...
src/redox.cpp
1 1 /**
2   -* Redox C++11 wrapper.
  2 +* Redis C++11 wrapper.
3 3 */
4 4  
5 5 #include <signal.h>
... ... @@ -48,19 +48,19 @@ Redox::Redox(const string&amp; host, const int port)
48 48  
49 49 signal(SIGPIPE, SIG_IGN);
50 50  
51   - c = redisAsyncConnect(host.c_str(), port);
52   - if (c->err) {
53   - printf("Error: %s\n", c->errstr);
  51 + ctx = redisAsyncConnect(host.c_str(), port);
  52 + if (ctx->err) {
  53 + printf("Error: %s\n", ctx->errstr);
54 54 return;
55 55 }
56 56  
57   - redisLibevAttach(EV_DEFAULT_ c);
58   - redisAsyncSetConnectCallback(c, Redox::connected);
59   - redisAsyncSetDisconnectCallback(c, Redox::disconnected);
  57 + redisLibevAttach(EV_DEFAULT_ ctx);
  58 + redisAsyncSetConnectCallback(ctx, Redox::connected);
  59 + redisAsyncSetDisconnectCallback(ctx, Redox::disconnected);
60 60 }
61 61  
62 62 Redox::~Redox() {
63   - redisAsyncDisconnect(c);
  63 + redisAsyncDisconnect(ctx);
64 64 stop();
65 65 }
66 66  
... ... @@ -103,11 +103,11 @@ void Redox::block() {
103 103 * true if succeeded, false otherwise.
104 104 */
105 105 template<class ReplyT>
106   -bool submit_to_server(Command<ReplyT>* cmd_obj) {
107   - cmd_obj->pending++;
108   - if (redisAsyncCommand(cmd_obj->c, cmd_obj->command_callback, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) {
109   - cerr << "[ERROR] Could not send \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl;
110   - cmd_obj->invoke_error(REDOX_SEND_ERROR);
  106 +bool submit_to_server(Command<ReplyT>* c) {
  107 + c->pending++;
  108 + if (redisAsyncCommand(c->rdx->ctx, c->command_callback, (void*)c, c->cmd.c_str()) != REDIS_OK) {
  109 + cerr << "[ERROR] Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr << endl;
  110 + c->invoke_error(REDOX_SEND_ERROR);
111 111 return false;
112 112 }
113 113 return true;
... ... @@ -122,30 +122,30 @@ void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents)
122 122 return;
123 123 }
124 124  
125   - auto cmd_obj = (Command<ReplyT>*)timer->data;
126   - submit_to_server<ReplyT>(cmd_obj);
  125 + auto c = (Command<ReplyT>*)timer->data;
  126 + submit_to_server<ReplyT>(c);
127 127 }
128 128  
129 129 template<class ReplyT>
130   -bool Redox::process_queued_command(void* cmd_ptr) {
  130 +bool Redox::process_queued_command(void* c_ptr) {
131 131  
132 132 auto& command_map = get_command_map<ReplyT>();
133 133  
134   - auto it = command_map.find(cmd_ptr);
  134 + auto it = command_map.find(c_ptr);
135 135 if(it == command_map.end()) return false;
136   - Command<ReplyT>* cmd_obj = it->second;
137   - command_map.erase(cmd_ptr);
  136 + Command<ReplyT>* c = it->second;
  137 + command_map.erase(c_ptr);
138 138  
139   - if((cmd_obj->repeat == 0) && (cmd_obj->after == 0)) {
140   - submit_to_server<ReplyT>(cmd_obj);
  139 + if((c->repeat == 0) && (c->after == 0)) {
  140 + submit_to_server<ReplyT>(c);
141 141 } else {
142 142  
143   - cmd_obj->timer.data = (void*)cmd_obj;
  143 + c->timer.data = (void*)c;
144 144  
145   - ev_timer_init(&cmd_obj->timer, submit_command_callback<ReplyT>, cmd_obj->after, cmd_obj->repeat);
146   - ev_timer_start(EV_DEFAULT_ &cmd_obj->timer);
  145 + ev_timer_init(&c->timer, submit_command_callback<ReplyT>, c->after, c->repeat);
  146 + ev_timer_start(EV_DEFAULT_ &c->timer);
147 147  
148   - cmd_obj->timer_guard.unlock();
  148 + c->timer_guard.unlock();
149 149 }
150 150  
151 151 return true;
... ... @@ -157,25 +157,19 @@ void Redox::process_queued_commands() {
157 157  
158 158 while(!command_queue.empty()) {
159 159  
160   - void* cmd_ptr = command_queue.front();
161   - if(process_queued_command<redisReply*>(cmd_ptr)) {}
162   - else if(process_queued_command<string>(cmd_ptr)) {}
163   - else if(process_queued_command<char*>(cmd_ptr)) {}
164   - else if(process_queued_command<int>(cmd_ptr)) {}
165   - else if(process_queued_command<long long int>(cmd_ptr)) {}
166   - else if(process_queued_command<nullptr_t>(cmd_ptr)) {}
  160 + void* c_ptr = command_queue.front();
  161 + if(process_queued_command<redisReply*>(c_ptr)) {}
  162 + else if(process_queued_command<string>(c_ptr)) {}
  163 + else if(process_queued_command<char*>(c_ptr)) {}
  164 + else if(process_queued_command<int>(c_ptr)) {}
  165 + else if(process_queued_command<long long int>(c_ptr)) {}
  166 + else if(process_queued_command<nullptr_t>(c_ptr)) {}
167 167 else throw runtime_error("[FATAL] Command pointer not found in any queue!");
168 168  
169 169 command_queue.pop();
170   - cmd_count++;
171 170 }
172 171 }
173 172  
174   -long Redox::num_commands_processed() {
175   - lock_guard<mutex> lg(queue_guard);
176   - return cmd_count;
177   -}
178   -
179 173 // ----------------------------
180 174  
181 175 template<> unordered_map<void*, Command<redisReply*>*>&
... ...
src/redox.hpp
1 1 /**
2   -* Redox C++11 wrapper.
  2 +* Redis C++11 wrapper.
3 3 */
4 4  
5 5 #pragma once
... ... @@ -31,6 +31,8 @@ public:
31 31 Redox(const std::string& host, const int port);
32 32 ~Redox();
33 33  
  34 + redisAsyncContext *ctx;
  35 +
34 36 void run();
35 37 void run_blocking();
36 38 void stop();
... ... @@ -47,7 +49,7 @@ public:
47 49 );
48 50  
49 51 template<class ReplyT>
50   - bool cancel(Command<ReplyT>* cmd_obj);
  52 + bool cancel(Command<ReplyT>* c);
51 53  
52 54 template<class ReplyT>
53 55 Command<ReplyT>* command_blocking(const std::string& cmd);
... ... @@ -55,10 +57,8 @@ public:
55 57 void command(const std::string& command);
56 58 bool command_blocking(const std::string& command);
57 59  
58   - long num_commands_processed();
59   -
60   - template<class ReplyT>
61   - static void command_callback(redisAsyncContext *c, void *r, void *privdata);
  60 + void incr_cmd_count() { cmd_count++; }
  61 + long num_commands_processed() { return cmd_count; }
62 62  
63 63 static void connected(const redisAsyncContext *c, int status);
64 64 static void disconnected(const redisAsyncContext *c, int status);
... ... @@ -74,9 +74,7 @@ private:
74 74 int port;
75 75  
76 76 // Number of commands processed
77   - long cmd_count;
78   -
79   - redisAsyncContext *c;
  77 + std::atomic_long cmd_count;
80 78  
81 79 std::atomic_bool to_exit;
82 80 std::mutex exit_waiter_lock;
... ... @@ -114,27 +112,27 @@ Command&lt;ReplyT&gt;* Redox::command(
114 112 bool free_memory
115 113 ) {
116 114 std::lock_guard<std::mutex> lg(queue_guard);
117   - auto* cmd_obj = new Command<ReplyT>(c, cmd, callback, error_callback, repeat, after, free_memory);
118   - get_command_map<ReplyT>()[(void*)cmd_obj] = cmd_obj;
119   - command_queue.push((void*)cmd_obj);
120   - return cmd_obj;
  115 + auto* c = new Command<ReplyT>(this, cmd, callback, error_callback, repeat, after, free_memory);
  116 + get_command_map<ReplyT>()[(void*)c] = c;
  117 + command_queue.push((void*)c);
  118 + return c;
121 119 }
122 120  
123 121 template<class ReplyT>
124   -bool Redox::cancel(Command<ReplyT>* cmd_obj) {
  122 +bool Redox::cancel(Command<ReplyT>* c) {
125 123  
126   - if(cmd_obj == NULL) {
  124 + if(c == NULL) {
127 125 std::cerr << "[ERROR] Canceling null command." << std::endl;
128 126 return false;
129 127 }
130 128  
131   - cmd_obj->timer.data = NULL;
  129 + c->timer.data = NULL;
132 130  
133   - std::lock_guard<std::mutex> lg(cmd_obj->timer_guard);
134   - if((cmd_obj->repeat != 0) || (cmd_obj->after != 0))
135   - ev_timer_stop(EV_DEFAULT_ &cmd_obj->timer);
  131 + std::lock_guard<std::mutex> lg(c->timer_guard);
  132 + if((c->repeat != 0) || (c->after != 0))
  133 + ev_timer_stop(EV_DEFAULT_ &c->timer);
136 134  
137   - cmd_obj->completed = true;
  135 + c->completed = true;
138 136  
139 137 return true;
140 138 }
... ... @@ -150,7 +148,7 @@ Command&lt;ReplyT&gt;* Redox::command_blocking(const std::string&amp; cmd) {
150 148  
151 149 std::unique_lock<std::mutex> lk(m);
152 150  
153   - Command<ReplyT>* cmd_obj = command<ReplyT>(cmd,
  151 + Command<ReplyT>* c = command<ReplyT>(cmd,
154 152 [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) {
155 153 std::unique_lock<std::mutex> ul(m);
156 154 val = reply;
... ... @@ -170,10 +168,10 @@ Command&lt;ReplyT&gt;* Redox::command_blocking(const std::string&amp; cmd) {
170 168 // Wait until a callback is invoked
171 169 cv.wait(lk, [&status] { return status != REDOX_UNINIT; });
172 170  
173   - cmd_obj->reply_val = val;
174   - cmd_obj->reply_status = status;
  171 + c->reply_val = val;
  172 + c->reply_status = status;
175 173  
176   - return cmd_obj;
  174 + return c;
177 175 }
178 176  
179 177 } // End namespace redis
... ...