diff --git a/README.md b/README.md index fa39cd9..5124e73 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ redox ====== -Asynchronous, minimalistic, and wicked fast C++11 bindings for Redis +High-level, asynchronous, and wicked fast C++11 bindings for Redis Work in progress, details coming soon. diff --git a/examples/simple_loop.cpp b/examples/simple_loop.cpp index 9c2523d..4d44700 100644 --- a/examples/simple_loop.cpp +++ b/examples/simple_loop.cpp @@ -1,5 +1,5 @@ /** -* Basic asynchronous calls using redisx. +* Basic asynchronous calls using redox. */ #include @@ -69,5 +69,6 @@ int main(int argc, char* argv[]) { cout << "Sent " << count << " commands in " << t_elapsed << "s, " << "that's " << actual_freq << " commands/s." << endl; + cout << "rdx.num_commands_processed() = " << rdx.num_commands_processed() << endl; return 0; } diff --git a/examples/simple_sync_loop.cpp b/examples/simple_sync_loop.cpp index 5a08b8a..98bb63a 100644 --- a/examples/simple_sync_loop.cpp +++ b/examples/simple_sync_loop.cpp @@ -1,5 +1,5 @@ /** -* Basic asynchronous calls using redisx. +* Basic synchronous calls using redox. */ #include @@ -44,5 +44,6 @@ int main(int argc, char* argv[]) { cout << "Sent " << count << " commands in " << t_elapsed << "s, " << "that's " << actual_freq << " commands/s." << endl; + cout << "rdx.num_commands_processed() = " << rdx.num_commands_processed() << endl; return 0; } diff --git a/src/command.hpp b/src/command.hpp index 36fb9d0..97ab117 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -23,6 +23,8 @@ static const int REDOX_NIL_REPLY = 3; static const int REDOX_ERROR_REPLY = 4; static const int REDOX_TIMEOUT = 5; +class Redox; + template class Command { @@ -30,7 +32,7 @@ friend class Redox; public: Command( - redisAsyncContext* c, + Redox* rdx, const std::string& cmd, const std::function& callback, const std::function& error_callback, @@ -38,13 +40,14 @@ public: bool free_memory ); + Redox* rdx; + const std::string cmd; const double repeat; const double after; const bool free_memory; - redisAsyncContext* c; redisReply* reply_obj; std::atomic_int pending; @@ -92,26 +95,29 @@ private: template Command::Command( - redisAsyncContext* c, + Redox* rdx, const std::string& cmd, const std::function& callback, const std::function& error_callback, double repeat, double after, bool free_memory -) : cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), c(c), reply_obj(NULL), +) : rdx(rdx), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), reply_obj(NULL), pending(0), callback(callback), error_callback(error_callback), completed(false) { timer_guard.lock(); } template -void Command::command_callback(redisAsyncContext *c, void *r, void *privdata) { +void Command::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { - auto *cmd_obj = (Command *) privdata; - cmd_obj->reply_obj = (redisReply *) r; - cmd_obj->invoke_callback(); + auto *c = (Command *) privdata; + c->reply_obj = (redisReply *) r; + c->invoke_callback(); // Free the reply object unless told not to - if(cmd_obj->free_memory) cmd_obj->free_reply_object(); + if(c->free_memory) c->free_reply_object(); + + // Increment the Redox object command counter + c->rdx->incr_cmd_count(); } template diff --git a/src/redox.cpp b/src/redox.cpp index 5994cdc..fa8f5f2 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -1,5 +1,5 @@ /** -* Redox C++11 wrapper. +* Redis C++11 wrapper. */ #include @@ -48,19 +48,19 @@ Redox::Redox(const string& host, const int port) signal(SIGPIPE, SIG_IGN); - c = redisAsyncConnect(host.c_str(), port); - if (c->err) { - printf("Error: %s\n", c->errstr); + ctx = redisAsyncConnect(host.c_str(), port); + if (ctx->err) { + printf("Error: %s\n", ctx->errstr); return; } - redisLibevAttach(EV_DEFAULT_ c); - redisAsyncSetConnectCallback(c, Redox::connected); - redisAsyncSetDisconnectCallback(c, Redox::disconnected); + redisLibevAttach(EV_DEFAULT_ ctx); + redisAsyncSetConnectCallback(ctx, Redox::connected); + redisAsyncSetDisconnectCallback(ctx, Redox::disconnected); } Redox::~Redox() { - redisAsyncDisconnect(c); + redisAsyncDisconnect(ctx); stop(); } @@ -103,11 +103,11 @@ void Redox::block() { * true if succeeded, false otherwise. */ template -bool submit_to_server(Command* cmd_obj) { - cmd_obj->pending++; - if (redisAsyncCommand(cmd_obj->c, cmd_obj->command_callback, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { - cerr << "[ERROR] Could not send \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl; - cmd_obj->invoke_error(REDOX_SEND_ERROR); +bool submit_to_server(Command* c) { + c->pending++; + if (redisAsyncCommand(c->rdx->ctx, c->command_callback, (void*)c, c->cmd.c_str()) != REDIS_OK) { + cerr << "[ERROR] Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr << endl; + c->invoke_error(REDOX_SEND_ERROR); return false; } return true; @@ -122,30 +122,30 @@ void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) return; } - auto cmd_obj = (Command*)timer->data; - submit_to_server(cmd_obj); + auto c = (Command*)timer->data; + submit_to_server(c); } template -bool Redox::process_queued_command(void* cmd_ptr) { +bool Redox::process_queued_command(void* c_ptr) { auto& command_map = get_command_map(); - auto it = command_map.find(cmd_ptr); + auto it = command_map.find(c_ptr); if(it == command_map.end()) return false; - Command* cmd_obj = it->second; - command_map.erase(cmd_ptr); + Command* c = it->second; + command_map.erase(c_ptr); - if((cmd_obj->repeat == 0) && (cmd_obj->after == 0)) { - submit_to_server(cmd_obj); + if((c->repeat == 0) && (c->after == 0)) { + submit_to_server(c); } else { - cmd_obj->timer.data = (void*)cmd_obj; + c->timer.data = (void*)c; - ev_timer_init(&cmd_obj->timer, submit_command_callback, cmd_obj->after, cmd_obj->repeat); - ev_timer_start(EV_DEFAULT_ &cmd_obj->timer); + ev_timer_init(&c->timer, submit_command_callback, c->after, c->repeat); + ev_timer_start(EV_DEFAULT_ &c->timer); - cmd_obj->timer_guard.unlock(); + c->timer_guard.unlock(); } return true; @@ -157,25 +157,19 @@ void Redox::process_queued_commands() { while(!command_queue.empty()) { - void* cmd_ptr = command_queue.front(); - if(process_queued_command(cmd_ptr)) {} - else if(process_queued_command(cmd_ptr)) {} - else if(process_queued_command(cmd_ptr)) {} - else if(process_queued_command(cmd_ptr)) {} - else if(process_queued_command(cmd_ptr)) {} - else if(process_queued_command(cmd_ptr)) {} + void* c_ptr = command_queue.front(); + if(process_queued_command(c_ptr)) {} + else if(process_queued_command(c_ptr)) {} + else if(process_queued_command(c_ptr)) {} + else if(process_queued_command(c_ptr)) {} + else if(process_queued_command(c_ptr)) {} + else if(process_queued_command(c_ptr)) {} else throw runtime_error("[FATAL] Command pointer not found in any queue!"); command_queue.pop(); - cmd_count++; } } -long Redox::num_commands_processed() { - lock_guard lg(queue_guard); - return cmd_count; -} - // ---------------------------- template<> unordered_map*>& diff --git a/src/redox.hpp b/src/redox.hpp index 03c00bc..aef5d66 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -1,5 +1,5 @@ /** -* Redox C++11 wrapper. +* Redis C++11 wrapper. */ #pragma once @@ -31,6 +31,8 @@ public: Redox(const std::string& host, const int port); ~Redox(); + redisAsyncContext *ctx; + void run(); void run_blocking(); void stop(); @@ -47,7 +49,7 @@ public: ); template - bool cancel(Command* cmd_obj); + bool cancel(Command* c); template Command* command_blocking(const std::string& cmd); @@ -55,10 +57,8 @@ public: void command(const std::string& command); bool command_blocking(const std::string& command); - long num_commands_processed(); - - template - static void command_callback(redisAsyncContext *c, void *r, void *privdata); + void incr_cmd_count() { cmd_count++; } + long num_commands_processed() { return cmd_count; } static void connected(const redisAsyncContext *c, int status); static void disconnected(const redisAsyncContext *c, int status); @@ -74,9 +74,7 @@ private: int port; // Number of commands processed - long cmd_count; - - redisAsyncContext *c; + std::atomic_long cmd_count; std::atomic_bool to_exit; std::mutex exit_waiter_lock; @@ -114,27 +112,27 @@ Command* Redox::command( bool free_memory ) { std::lock_guard lg(queue_guard); - auto* cmd_obj = new Command(c, cmd, callback, error_callback, repeat, after, free_memory); - get_command_map()[(void*)cmd_obj] = cmd_obj; - command_queue.push((void*)cmd_obj); - return cmd_obj; + auto* c = new Command(this, cmd, callback, error_callback, repeat, after, free_memory); + get_command_map()[(void*)c] = c; + command_queue.push((void*)c); + return c; } template -bool Redox::cancel(Command* cmd_obj) { +bool Redox::cancel(Command* c) { - if(cmd_obj == NULL) { + if(c == NULL) { std::cerr << "[ERROR] Canceling null command." << std::endl; return false; } - cmd_obj->timer.data = NULL; + c->timer.data = NULL; - std::lock_guard lg(cmd_obj->timer_guard); - if((cmd_obj->repeat != 0) || (cmd_obj->after != 0)) - ev_timer_stop(EV_DEFAULT_ &cmd_obj->timer); + std::lock_guard lg(c->timer_guard); + if((c->repeat != 0) || (c->after != 0)) + ev_timer_stop(EV_DEFAULT_ &c->timer); - cmd_obj->completed = true; + c->completed = true; return true; } @@ -150,7 +148,7 @@ Command* Redox::command_blocking(const std::string& cmd) { std::unique_lock lk(m); - Command* cmd_obj = command(cmd, + Command* c = command(cmd, [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) { std::unique_lock ul(m); val = reply; @@ -170,10 +168,10 @@ Command* Redox::command_blocking(const std::string& cmd) { // Wait until a callback is invoked cv.wait(lk, [&status] { return status != REDOX_UNINIT; }); - cmd_obj->reply_val = val; - cmd_obj->reply_status = status; + c->reply_val = val; + c->reply_status = status; - return cmd_obj; + return c; } } // End namespace redis