Commit 1a47846bd71a2d20663f6ebfafd46fb869f33766
1 parent
1a01c9dd
Commenting, cleanup, privatizing methods/variables
Thoroughly cleaned up redox.hpp to expose a minimal public API, and commented those well. Generally cleaned up a bunch of stuff. Added wrapper function DEL, which could be useful.
Showing
11 changed files
with
173 additions
and
130 deletions
CMakeLists.txt
| 1 | 1 | cmake_minimum_required(VERSION 2.8.4) |
| 2 | 2 | project(redox) |
| 3 | 3 | |
| 4 | -#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3") | |
| 5 | -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer") | |
| 4 | +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3") | |
| 5 | +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer") | |
| 6 | 6 | # set(CMAKE_VERBOSE_MAKEFILE ON) |
| 7 | 7 | |
| 8 | 8 | # --------------------------------------------------------- |
| ... | ... | @@ -32,20 +32,17 @@ set(LIB_ALL ${LIB_REDIS}) |
| 32 | 32 | add_executable(basic examples/basic.cpp ${SRC_ALL}) |
| 33 | 33 | target_link_libraries(basic ${LIB_REDIS}) |
| 34 | 34 | |
| 35 | -#add_executable(progressive examples/progressive.cpp ${SRC_ALL}) | |
| 36 | -#target_link_libraries(progressive ${LIB_REDIS}) | |
| 37 | - | |
| 38 | 35 | add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL}) |
| 39 | 36 | target_link_libraries(basic_threaded ${LIB_REDIS}) |
| 40 | 37 | |
| 41 | 38 | #add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL}) |
| 42 | 39 | #target_link_libraries(lpush_benchmark ${LIB_REDIS}) |
| 43 | 40 | |
| 44 | -#add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL}) | |
| 45 | -#target_link_libraries(speed_test_async ${LIB_REDIS}) | |
| 41 | +add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL}) | |
| 42 | +target_link_libraries(speed_test_async ${LIB_REDIS}) | |
| 46 | 43 | |
| 47 | -#add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL}) | |
| 48 | -#target_link_libraries(speed_test_sync ${LIB_REDIS}) | |
| 44 | +add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL}) | |
| 45 | +target_link_libraries(speed_test_sync ${LIB_REDIS}) | |
| 49 | 46 | |
| 50 | 47 | add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL}) |
| 51 | 48 | target_link_libraries(speed_test_async_multi ${LIB_REDIS}) | ... | ... |
examples/basic.cpp
| ... | ... | @@ -9,13 +9,13 @@ using namespace std; |
| 9 | 9 | |
| 10 | 10 | int main(int argc, char* argv[]) { |
| 11 | 11 | |
| 12 | - redox::Redox rdx = {"localhost", 6379}; | |
| 13 | - rdx.run(); | |
| 12 | + redox::Redox rdx; // Initialize Redox (default host/port) | |
| 13 | + rdx.start(); // Start the event loop | |
| 14 | 14 | |
| 15 | - if(!rdx.set("alaska", "rules")) | |
| 15 | + if(!rdx.set("alaska", "rules")) // Set a key, check if succeeded | |
| 16 | 16 | cerr << "Failed to set key!" << endl; |
| 17 | 17 | |
| 18 | - cout << "alaska: " << rdx.get("alaska") << endl; | |
| 18 | + cout << "key = alaska, value = " << rdx.get("alaska") << endl; | |
| 19 | 19 | |
| 20 | - rdx.stop(); | |
| 20 | + rdx.stop(); // Shut down the event loop | |
| 21 | 21 | } | ... | ... |
examples/basic_threaded.cpp
examples/lpush_benchmark.cpp
| 1 | 1 | /** |
| 2 | -* Basic asynchronous calls using redisx. | |
| 2 | +* Basic asynchronous calls using redox. | |
| 3 | 3 | */ |
| 4 | 4 | |
| 5 | 5 | #include <iostream> |
| 6 | -#include "../src/redisx.hpp" | |
| 6 | +#include "../src/redox.hpp" | |
| 7 | 7 | |
| 8 | 8 | using namespace std; |
| 9 | 9 | |
| ... | ... | @@ -14,8 +14,8 @@ unsigned long time_ms() { |
| 14 | 14 | |
| 15 | 15 | int main(int argc, char* argv[]) { |
| 16 | 16 | |
| 17 | - redisx::Redis rdx = {"localhost", 6379}; | |
| 18 | - rdx.run(); | |
| 17 | + redox::Redox rdx = {"localhost", 6379}; | |
| 18 | + rdx.start(); | |
| 19 | 19 | |
| 20 | 20 | // TODO wait for this somehow |
| 21 | 21 | rdx.command("DEL test"); | ... | ... |
examples/progressive.cpp deleted
| 1 | -/** | |
| 2 | -* Basic asynchronous calls using redisx. | |
| 3 | -*/ | |
| 4 | - | |
| 5 | -#include <iostream> | |
| 6 | -#include "../src/redisx.hpp" | |
| 7 | - | |
| 8 | -using namespace std; | |
| 9 | - | |
| 10 | -redisx::Redis rdx = {"localhost", 6379}; | |
| 11 | - | |
| 12 | -void print_key(const string& key) { | |
| 13 | - rdx.command<const string&>("GET " + key, [key](const string& cmd, const string& value) { | |
| 14 | - cout << "[GET] " << key << ": \"" << value << '\"' << endl; | |
| 15 | - }); | |
| 16 | -} | |
| 17 | - | |
| 18 | -void set_key(const string& key, const string& value) { | |
| 19 | - string cmd_str = "SET " + key + " " + value; | |
| 20 | - rdx.command<const string&>(cmd_str, [key, value](const string& cmd, const string& reply) { | |
| 21 | - cout << "[SET] " << key << ": \"" << value << '\"' << endl; | |
| 22 | - }); | |
| 23 | -} | |
| 24 | - | |
| 25 | -int main(int argc, char* argv[]) { | |
| 26 | - | |
| 27 | - set_key("name", "Bob"); | |
| 28 | - print_key("name"); | |
| 29 | - set_key("name", "Steve"); | |
| 30 | - print_key("name"); | |
| 31 | - | |
| 32 | - rdx.run_blocking(); | |
| 33 | - return 0; | |
| 34 | -}; |
examples/speed_test_async.cpp
| ... | ... | @@ -18,7 +18,7 @@ double time_s() { |
| 18 | 18 | int main(int argc, char* argv[]) { |
| 19 | 19 | |
| 20 | 20 | Redox rdx = {"localhost", 6379}; |
| 21 | - rdx.run(); | |
| 21 | + rdx.start(); | |
| 22 | 22 | |
| 23 | 23 | if(rdx.command_blocking("SET simple_loop:count 0")) { |
| 24 | 24 | cout << "Reset the counter to zero." << endl; |
| ... | ... | @@ -47,7 +47,7 @@ int main(int argc, char* argv[]) { |
| 47 | 47 | |
| 48 | 48 | // Wait for t time, then stop the command. |
| 49 | 49 | this_thread::sleep_for(chrono::microseconds((int)(t*1e6))); |
| 50 | - rdx.cancel(c); | |
| 50 | + c->cancel(); | |
| 51 | 51 | |
| 52 | 52 | // Get the final value of the counter |
| 53 | 53 | auto get_cmd = rdx.command_blocking<string>("GET simple_loop:count"); | ... | ... |
examples/speed_test_async_multi.cpp
| ... | ... | @@ -19,7 +19,7 @@ double time_s() { |
| 19 | 19 | int main(int argc, char* argv[]) { |
| 20 | 20 | |
| 21 | 21 | Redox rdx = {"localhost", 6379}; |
| 22 | - rdx.run(); | |
| 22 | + rdx.start(); | |
| 23 | 23 | |
| 24 | 24 | if(rdx.command_blocking("SET simple_loop:count 0")) { |
| 25 | 25 | cout << "Reset the counter to zero." << endl; |
| ... | ... | @@ -52,7 +52,7 @@ int main(int argc, char* argv[]) { |
| 52 | 52 | |
| 53 | 53 | // Wait for t time, then stop the command. |
| 54 | 54 | this_thread::sleep_for(chrono::microseconds((int)(t*1e6))); |
| 55 | - for(auto c : commands) rdx.cancel(c); | |
| 55 | + for(auto c : commands) c->cancel(); | |
| 56 | 56 | |
| 57 | 57 | // Get the final value of the counter |
| 58 | 58 | auto get_cmd = rdx.command_blocking<string>("GET simple_loop:count"); | ... | ... |
examples/speed_test_sync.cpp
| ... | ... | @@ -18,7 +18,7 @@ double time_s() { |
| 18 | 18 | int main(int argc, char* argv[]) { |
| 19 | 19 | |
| 20 | 20 | Redox rdx = {"localhost", 6379}; |
| 21 | - rdx.run(); | |
| 21 | + rdx.start(); | |
| 22 | 22 | |
| 23 | 23 | if(rdx.command_blocking("SET simple_loop:count 0")) { |
| 24 | 24 | cout << "Reset the counter to zero." << endl; |
| ... | ... | @@ -38,7 +38,7 @@ int main(int argc, char* argv[]) { |
| 38 | 38 | |
| 39 | 39 | while(time_s() < t_end) { |
| 40 | 40 | Command<int>* c = rdx.command_blocking<int>(cmd_str); |
| 41 | - if(c->status() != REDOX_OK) cerr << "Bad reply, code: " << c->status() << endl; | |
| 41 | + if(!c->ok()) cerr << "Bad reply, code: " << c->status() << endl; | |
| 42 | 42 | c->free(); |
| 43 | 43 | count++; |
| 44 | 44 | } | ... | ... |
src/command.hpp
| ... | ... | @@ -65,7 +65,9 @@ public: |
| 65 | 65 | const ReplyT& reply(); |
| 66 | 66 | int status() { return reply_status; }; |
| 67 | 67 | bool ok() { return reply_status == REDOX_OK; } |
| 68 | - bool is_completed() { return completed; } | |
| 68 | + bool is_canceled() { return canceled; } | |
| 69 | + | |
| 70 | + void cancel() { canceled = true; } | |
| 69 | 71 | |
| 70 | 72 | /** |
| 71 | 73 | * Called by the user to free the redisReply object, when the free_memory |
| ... | ... | @@ -92,7 +94,7 @@ private: |
| 92 | 94 | ReplyT reply_val; |
| 93 | 95 | int reply_status; |
| 94 | 96 | |
| 95 | - std::atomic_bool completed = {false}; | |
| 97 | + std::atomic_bool canceled = {false}; | |
| 96 | 98 | |
| 97 | 99 | ev_timer timer; |
| 98 | 100 | std::mutex timer_guard; |
| ... | ... | @@ -140,18 +142,6 @@ void Command<ReplyT>::process_reply() { |
| 140 | 142 | |
| 141 | 143 | free_reply_object(); |
| 142 | 144 | |
| 143 | -// // Free memory when all pending callbacks are received | |
| 144 | -// if((repeat != 0) && (pending == 0) && ((long)(get_timer()->data) == 0)) { | |
| 145 | -// std::cout << "Freeing command, timer stopped and pending is 0." << std::endl; | |
| 146 | -// free_command(this); | |
| 147 | -// } | |
| 148 | -// | |
| 149 | -// if((pending == 0) && (repeat == 0)) { | |
| 150 | -// free_command(this); | |
| 151 | -// } else { | |
| 152 | -// free_guard.unlock(); | |
| 153 | -// } | |
| 154 | - | |
| 155 | 145 | // Handle memory if all pending replies have arrived |
| 156 | 146 | if(pending == 0) { |
| 157 | 147 | ... | ... |
src/redox.cpp
| ... | ... | @@ -26,6 +26,7 @@ void Redox::connected(const redisAsyncContext *ctx, int status) { |
| 26 | 26 | } |
| 27 | 27 | |
| 28 | 28 | void Redox::disconnected(const redisAsyncContext *ctx, int status) { |
| 29 | + | |
| 29 | 30 | if (status != REDIS_OK) { |
| 30 | 31 | cerr << "[ERROR] Disconnecting from Redis: " << ctx->errstr << endl; |
| 31 | 32 | return; |
| ... | ... | @@ -43,24 +44,29 @@ void Redox::disconnected(const redisAsyncContext *ctx, int status) { |
| 43 | 44 | Redox::Redox(const string& host, const int port) |
| 44 | 45 | : host(host), port(port) { |
| 45 | 46 | |
| 46 | - lock_guard<mutex> lg(queue_guard); | |
| 47 | - | |
| 47 | + // Required by libev | |
| 48 | 48 | signal(SIGPIPE, SIG_IGN); |
| 49 | 49 | |
| 50 | + // Create a redisAsyncContext | |
| 50 | 51 | ctx = redisAsyncConnect(host.c_str(), port); |
| 51 | 52 | if (ctx->err) { |
| 52 | 53 | printf("Error: %s\n", ctx->errstr); |
| 53 | 54 | return; |
| 54 | 55 | } |
| 55 | 56 | |
| 57 | + // Create a new event loop and attach it to hiredis | |
| 56 | 58 | evloop = ev_loop_new(EVFLAG_AUTO); |
| 57 | - ev_set_userdata(evloop, (void*)this); | |
| 58 | - | |
| 59 | 59 | redisLibevAttach(evloop, ctx); |
| 60 | + | |
| 61 | + // Set the callbacks to be invoked on server connection/disconnection | |
| 60 | 62 | redisAsyncSetConnectCallback(ctx, Redox::connected); |
| 61 | 63 | redisAsyncSetDisconnectCallback(ctx, Redox::disconnected); |
| 62 | 64 | |
| 65 | + // Set back references to this Redox object (for use in callbacks) | |
| 66 | + ev_set_userdata(evloop, (void*)this); | |
| 63 | 67 | ctx->data = (void*)this; |
| 68 | + | |
| 69 | + // Lock this mutex until the connected callback is invoked | |
| 64 | 70 | connected_lock.lock(); |
| 65 | 71 | } |
| 66 | 72 | |
| ... | ... | @@ -79,7 +85,7 @@ Redox::~Redox() { |
| 79 | 85 | << " Commands and freed " << commands_deleted << "." << std::endl; |
| 80 | 86 | } |
| 81 | 87 | |
| 82 | -void Redox::run_blocking() { | |
| 88 | +void Redox::run_event_loop() { | |
| 83 | 89 | |
| 84 | 90 | // Events to connect to Redox |
| 85 | 91 | ev_run(evloop, EVRUN_NOWAIT); |
| ... | ... | @@ -118,9 +124,9 @@ void Redox::run_blocking() { |
| 118 | 124 | cout << "[INFO] Event thread exited." << endl; |
| 119 | 125 | } |
| 120 | 126 | |
| 121 | -void Redox::run() { | |
| 127 | +void Redox::start() { | |
| 122 | 128 | |
| 123 | - event_loop_thread = thread([this] { run_blocking(); }); | |
| 129 | + event_loop_thread = thread([this] { run_event_loop(); }); | |
| 124 | 130 | |
| 125 | 131 | // Block until connected and running the event loop |
| 126 | 132 | unique_lock<mutex> ul(running_waiter_lock); |
| ... | ... | @@ -153,7 +159,7 @@ Command<ReplyT>* Redox::find_command(long id) { |
| 153 | 159 | } |
| 154 | 160 | |
| 155 | 161 | template<class ReplyT> |
| 156 | -void command_callback(redisAsyncContext *ctx, void *r, void *privdata) { | |
| 162 | +void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { | |
| 157 | 163 | |
| 158 | 164 | Redox* rdx = (Redox*) ctx->data; |
| 159 | 165 | long id = (long)privdata; |
| ... | ... | @@ -170,7 +176,7 @@ void command_callback(redisAsyncContext *ctx, void *r, void *privdata) { |
| 170 | 176 | c->process_reply(); |
| 171 | 177 | |
| 172 | 178 | // Increment the Redox object command counter |
| 173 | - rdx->incr_cmd_count(); | |
| 179 | + rdx->cmd_count++; | |
| 174 | 180 | } |
| 175 | 181 | |
| 176 | 182 | /** |
| ... | ... | @@ -178,7 +184,7 @@ void command_callback(redisAsyncContext *ctx, void *r, void *privdata) { |
| 178 | 184 | * true if succeeded, false otherwise. |
| 179 | 185 | */ |
| 180 | 186 | template<class ReplyT> |
| 181 | -bool submit_to_server(Command<ReplyT>* c) { | |
| 187 | +bool Redox::submit_to_server(Command<ReplyT>* c) { | |
| 182 | 188 | c->pending++; |
| 183 | 189 | if (redisAsyncCommand(c->rdx->ctx, command_callback<ReplyT>, (void*)c->id, c->cmd.c_str()) != REDIS_OK) { |
| 184 | 190 | cerr << "[ERROR] Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr << endl; |
| ... | ... | @@ -189,7 +195,7 @@ bool submit_to_server(Command<ReplyT>* c) { |
| 189 | 195 | } |
| 190 | 196 | |
| 191 | 197 | template<class ReplyT> |
| 192 | -void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { | |
| 198 | +void Redox::submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { | |
| 193 | 199 | |
| 194 | 200 | Redox* rdx = (Redox*) ev_userdata(loop); |
| 195 | 201 | long id = (long)timer->data; |
| ... | ... | @@ -201,7 +207,7 @@ void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) |
| 201 | 207 | return; |
| 202 | 208 | } |
| 203 | 209 | |
| 204 | - if(c->is_completed()) { | |
| 210 | + if(c->is_canceled()) { | |
| 205 | 211 | |
| 206 | 212 | // cout << "[INFO] Command " << c << " is completed, stopping event timer." << endl; |
| 207 | 213 | |
| ... | ... | @@ -227,10 +233,10 @@ bool Redox::process_queued_command(long id) { |
| 227 | 233 | |
| 228 | 234 | if((c->repeat == 0) && (c->after == 0)) { |
| 229 | 235 | submit_to_server<ReplyT>(c); |
| 236 | + | |
| 230 | 237 | } else { |
| 231 | 238 | |
| 232 | 239 | c->timer.data = (void*)c->id; |
| 233 | - | |
| 234 | 240 | ev_timer_init(&c->timer, submit_command_callback<ReplyT>, c->after, c->repeat); |
| 235 | 241 | ev_timer_start(evloop, &c->timer); |
| 236 | 242 | |
| ... | ... | @@ -320,4 +326,8 @@ bool Redox::set(const std::string& key, const std::string& value) { |
| 320 | 326 | return command_blocking("SET " + key + " " + value); |
| 321 | 327 | } |
| 322 | 328 | |
| 329 | +bool Redox::del(const std::string& key) { | |
| 330 | + return command_blocking("DEL " + key); | |
| 331 | +} | |
| 332 | + | |
| 323 | 333 | } // End namespace redis | ... | ... |
src/redox.hpp
| ... | ... | @@ -25,53 +25,128 @@ |
| 25 | 25 | |
| 26 | 26 | namespace redox { |
| 27 | 27 | |
| 28 | +// Default to a local Redis server | |
| 29 | +static const std::string REDIS_DEFAULT_HOST = "localhost"; | |
| 30 | +static const int REDIS_DEFAULT_PORT = 6379; | |
| 31 | + | |
| 28 | 32 | class Redox { |
| 29 | 33 | |
| 30 | 34 | public: |
| 31 | 35 | |
| 32 | - Redox(const std::string& host, const int port); | |
| 36 | + /** | |
| 37 | + * Initialize everything, connect over TCP to a Redis server. | |
| 38 | + */ | |
| 39 | + Redox(const std::string& host = REDIS_DEFAULT_HOST, const int port = REDIS_DEFAULT_PORT); | |
| 33 | 40 | ~Redox(); |
| 34 | 41 | |
| 35 | - redisAsyncContext *ctx; | |
| 36 | - | |
| 37 | - void run(); | |
| 42 | + /** | |
| 43 | + * Connect to Redis and start the event loop in a separate thread. Returns | |
| 44 | + * once everything is ready to go. | |
| 45 | + */ | |
| 46 | + void start(); | |
| 38 | 47 | |
| 48 | + /** | |
| 49 | + * Signal the event loop to stop processing commands and shut down. | |
| 50 | + */ | |
| 39 | 51 | void stop_signal(); |
| 52 | + | |
| 53 | + /** | |
| 54 | + * Wait for the event loop to exit, then return. | |
| 55 | + */ | |
| 40 | 56 | void block(); |
| 57 | + | |
| 58 | + /** | |
| 59 | + * Signal the event loop to stop, wait for all pending commands to be processed, | |
| 60 | + * and shut everything down. A simple combination of stop_signal() and block(). | |
| 61 | + */ | |
| 41 | 62 | void stop(); |
| 42 | 63 | |
| 64 | + /** | |
| 65 | + * Create an asynchronous Redis command to be executed. Return a pointer to a | |
| 66 | + * Command object that represents this command. If the command succeeded, the | |
| 67 | + * callback is invoked with a reference to the reply. If something went wrong, | |
| 68 | + * the error_callback is invoked with an error_code. One of the two is guaranteed | |
| 69 | + * to be invoked. The method is templated by the expected data type of the reply, | |
| 70 | + * and can be one of {redisReply*, string, char*, int, long long int, nullptr_t}. | |
| 71 | + * | |
| 72 | + * cmd: The command to be run. | |
| 73 | + * callback: A function invoked on a successful reply from the server. | |
| 74 | + * error_callback: A function invoked on some error state. | |
| 75 | + * repeat: If non-zero, executes the command continuously at the given rate | |
| 76 | + * in seconds, until cancel() is called on the Command object. | |
| 77 | + * after: If non-zero, executes the command after the given delay in seconds. | |
| 78 | + * free_memory: If true (default), Redox automatically frees the Command object and | |
| 79 | + * reply from the server after a callback is invoked. If false, the | |
| 80 | + * user is responsible for calling free() on the Command object. | |
| 81 | + */ | |
| 43 | 82 | template<class ReplyT> |
| 44 | 83 | Command<ReplyT>* command( |
| 45 | 84 | const std::string& cmd, |
| 46 | - const std::function<void(const std::string&, const ReplyT&)>& callback = NULL, | |
| 47 | - const std::function<void(const std::string&, int status)>& error_callback = NULL, | |
| 85 | + const std::function<void(const std::string&, const ReplyT&)>& callback = nullptr, | |
| 86 | + const std::function<void(const std::string&, int status)>& error_callback = nullptr, | |
| 48 | 87 | double repeat = 0.0, |
| 49 | 88 | double after = 0.0, |
| 50 | 89 | bool free_memory = true |
| 51 | 90 | ); |
| 52 | 91 | |
| 53 | - template<class ReplyT> | |
| 54 | - bool cancel(Command<ReplyT>* c); | |
| 55 | - | |
| 92 | + /** | |
| 93 | + * A wrapper around command() for synchronous use. Waits for a reply, populates it | |
| 94 | + * into the Command object, and returns when complete. The user can retrieve the | |
| 95 | + * results from the Command object - ok() will tell you if the call succeeded, | |
| 96 | + * status() will give the error code, and reply() will return the reply data if | |
| 97 | + * the call succeeded. | |
| 98 | + */ | |
| 56 | 99 | template<class ReplyT> |
| 57 | 100 | Command<ReplyT>* command_blocking(const std::string& cmd); |
| 58 | 101 | |
| 102 | + /** | |
| 103 | + * Return the total number of successful commands processed by this Redox instance. | |
| 104 | + */ | |
| 105 | + long num_commands_processed() { return cmd_count; } | |
| 106 | + | |
| 107 | + // Hiredis context, left public to allow low-level access | |
| 108 | + redisAsyncContext *ctx; | |
| 109 | + | |
| 110 | + // ------------------------------------------------ | |
| 111 | + // Wrapper methods for convenience only | |
| 112 | + // ------------------------------------------------ | |
| 113 | + | |
| 114 | + /** | |
| 115 | + * Non-templated version of command in case you really don't care | |
| 116 | + * about the reply and just want to send something off. | |
| 117 | + */ | |
| 59 | 118 | void command(const std::string& command); |
| 119 | + | |
| 120 | + /** | |
| 121 | + * Non-templated version of command_blocking in case you really don't | |
| 122 | + * care about the reply. Returns true if succeeded, false if error. | |
| 123 | + */ | |
| 60 | 124 | bool command_blocking(const std::string& command); |
| 61 | 125 | |
| 62 | - void incr_cmd_count() { cmd_count++; } | |
| 63 | - long num_commands_processed() { return cmd_count; } | |
| 126 | + /** | |
| 127 | + * Redis GET command wrapper - return the value for the given key, or throw | |
| 128 | + * an exception if there is an error. Blocking call, of course. | |
| 129 | + */ | |
| 130 | + std::string get(const std::string& key); | |
| 64 | 131 | |
| 65 | - static void connected(const redisAsyncContext *c, int status); | |
| 66 | - static void disconnected(const redisAsyncContext *c, int status); | |
| 132 | + /** | |
| 133 | + * Redis SET command wrapper - set the value for the given key. Return | |
| 134 | + * true if succeeded, false if error. | |
| 135 | + */ | |
| 136 | + bool set(const std::string& key, const std::string& value); | |
| 137 | + | |
| 138 | + /** | |
| 139 | + * Redis DEL command wrapper - delete the given key. Return true if succeeded, | |
| 140 | + * false if error. | |
| 141 | + */ | |
| 142 | + bool del(const std::string& key); | |
| 67 | 143 | |
| 144 | + // TODO pub/sub | |
| 68 | 145 | // void publish(std::string channel, std::string msg); |
| 69 | 146 | // void subscribe(std::string channel, std::function<void(std::string channel, std::string msg)> callback); |
| 70 | 147 | // void unsubscribe(std::string channel); |
| 71 | 148 | |
| 72 | - std::atomic_long commands_created = {0}; | |
| 73 | - std::atomic_long commands_deleted = {0}; | |
| 74 | - | |
| 149 | + // Invoked by Command objects when they are completed | |
| 75 | 150 | template<class ReplyT> |
| 76 | 151 | void remove_active_command(const long id) { |
| 77 | 152 | std::lock_guard<std::mutex> lg1(command_map_guard); |
| ... | ... | @@ -79,16 +154,6 @@ public: |
| 79 | 154 | commands_deleted += 1; |
| 80 | 155 | } |
| 81 | 156 | |
| 82 | - template<class ReplyT> | |
| 83 | - Command<ReplyT>* find_command(long id); | |
| 84 | - | |
| 85 | - template<class ReplyT> | |
| 86 | - std::unordered_map<long, Command<ReplyT>*>& get_command_map(); | |
| 87 | - | |
| 88 | - // Helpers | |
| 89 | - std::string get(const std::string& key); | |
| 90 | - bool set(const std::string& key, const std::string& value); | |
| 91 | - | |
| 92 | 157 | private: |
| 93 | 158 | |
| 94 | 159 | // Redox server |
| ... | ... | @@ -104,24 +169,40 @@ private: |
| 104 | 169 | // Number of commands processed |
| 105 | 170 | std::atomic_long cmd_count = {0}; |
| 106 | 171 | |
| 172 | + // Track of Command objects allocated. Also provides unique Command IDs. | |
| 173 | + std::atomic_long commands_created = {0}; | |
| 174 | + std::atomic_long commands_deleted = {0}; | |
| 175 | + | |
| 176 | + // Separate thread to have a non-blocking event loop | |
| 177 | + std::thread event_loop_thread; | |
| 178 | + | |
| 179 | + // Variable and CV to know when the event loop starts running | |
| 107 | 180 | std::atomic_bool running = {false}; |
| 108 | 181 | std::mutex running_waiter_lock; |
| 109 | 182 | std::condition_variable running_waiter; |
| 110 | 183 | |
| 184 | + // Variable and CV to know when the event loop stops running | |
| 111 | 185 | std::atomic_bool to_exit = {false}; // Signal to exit |
| 112 | 186 | std::atomic_bool exited = {false}; // Event thread exited |
| 113 | 187 | std::mutex exit_waiter_lock; |
| 114 | 188 | std::condition_variable exit_waiter; |
| 115 | 189 | |
| 116 | - std::thread event_loop_thread; | |
| 117 | - | |
| 190 | + // Maps of each Command, fetchable by the unique ID number | |
| 118 | 191 | std::unordered_map<long, Command<redisReply*>*> commands_redis_reply; |
| 119 | 192 | std::unordered_map<long, Command<std::string>*> commands_string_r; |
| 120 | 193 | std::unordered_map<long, Command<char*>*> commands_char_p; |
| 121 | 194 | std::unordered_map<long, Command<int>*> commands_int; |
| 122 | 195 | std::unordered_map<long, Command<long long int>*> commands_long_long_int; |
| 123 | 196 | std::unordered_map<long, Command<std::nullptr_t>*> commands_null; |
| 124 | - std::mutex command_map_guard; | |
| 197 | + std::mutex command_map_guard; // Guards access to all of the above | |
| 198 | + | |
| 199 | + // Return the correct map from the above, based on the template specialization | |
| 200 | + template<class ReplyT> | |
| 201 | + std::unordered_map<long, Command<ReplyT>*>& get_command_map(); | |
| 202 | + | |
| 203 | + // Return the given Command from the relevant command map, or nullptr if not there | |
| 204 | + template<class ReplyT> | |
| 205 | + Command<ReplyT>* find_command(long id); | |
| 125 | 206 | |
| 126 | 207 | std::queue<long> command_queue; |
| 127 | 208 | std::mutex queue_guard; |
| ... | ... | @@ -130,7 +211,20 @@ private: |
| 130 | 211 | template<class ReplyT> |
| 131 | 212 | bool process_queued_command(long id); |
| 132 | 213 | |
| 133 | - void run_blocking(); | |
| 214 | + void run_event_loop(); | |
| 215 | + | |
| 216 | + // Callbacks invoked on server connection/disconnection | |
| 217 | + static void connected(const redisAsyncContext *c, int status); | |
| 218 | + static void disconnected(const redisAsyncContext *c, int status); | |
| 219 | + | |
| 220 | + template<class ReplyT> | |
| 221 | + static void command_callback(redisAsyncContext *ctx, void *r, void *privdata); | |
| 222 | + | |
| 223 | + template<class ReplyT> | |
| 224 | + static bool submit_to_server(Command<ReplyT>* c); | |
| 225 | + | |
| 226 | + template<class ReplyT> | |
| 227 | + static void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents); | |
| 134 | 228 | }; |
| 135 | 229 | |
| 136 | 230 | // --------------------------- |
| ... | ... | @@ -165,20 +259,6 @@ Command<ReplyT>* Redox::command( |
| 165 | 259 | } |
| 166 | 260 | |
| 167 | 261 | template<class ReplyT> |
| 168 | -bool Redox::cancel(Command<ReplyT>* c) { | |
| 169 | - | |
| 170 | - if(c == nullptr) { | |
| 171 | - std::cerr << "[ERROR] Canceling null command." << std::endl; | |
| 172 | - return false; | |
| 173 | - } | |
| 174 | - | |
| 175 | -// std::cout << "[INFO] Canceling command " << c->id << " at " << c << std::endl; | |
| 176 | - c->completed = true; | |
| 177 | - | |
| 178 | - return true; | |
| 179 | -} | |
| 180 | - | |
| 181 | -template<class ReplyT> | |
| 182 | 262 | Command<ReplyT>* Redox::command_blocking(const std::string& cmd) { |
| 183 | 263 | |
| 184 | 264 | ReplyT val; | ... | ... |