diff --git a/CMakeLists.txt b/CMakeLists.txt index ce08a59..b782441 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 2.8.4) -project(redisx) +project(redox) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3") #set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -g -fno-omit-frame-pointer") @@ -12,7 +12,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall -O3") set(SRC_DIR ${CMAKE_SOURCE_DIR}/src) set(SRC_CORE - ${SRC_DIR}/redisx.cpp + ${SRC_DIR}/redox.cpp ) set(SRC_ALL ${SRC_CORE}) diff --git a/examples/simple_loop.cpp b/examples/simple_loop.cpp index dab032d..8a3a9ce 100644 --- a/examples/simple_loop.cpp +++ b/examples/simple_loop.cpp @@ -3,10 +3,10 @@ */ #include -#include "../src/redisx.hpp" +#include "../src/redox.hpp" using namespace std; -using namespace redisx; +using namespace redox; double time_s() { unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); @@ -15,7 +15,7 @@ double time_s() { int main(int argc, char* argv[]) { - Redis rdx = {"localhost", 6379}; + Redox rdx = {"localhost", 6379}; rdx.run(); // // Command* del_cmd = rdx.command_blocking("DEL simple_loop:count"); diff --git a/examples/simple_sync_loop.cpp b/examples/simple_sync_loop.cpp index 9b26231..595c785 100644 --- a/examples/simple_sync_loop.cpp +++ b/examples/simple_sync_loop.cpp @@ -3,10 +3,10 @@ */ #include -#include "../src/redisx.hpp" +#include "../src/redox.hpp" using namespace std; -using namespace redisx; +using namespace redox; double time_s() { unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); @@ -15,7 +15,7 @@ double time_s() { int main(int argc, char* argv[]) { - Redis rdx = {"localhost", 6379}; + Redox rdx = {"localhost", 6379}; rdx.run(); if(rdx.command_blocking("DEL simple_loop:count")) cout << "Deleted simple_loop:count" << endl; diff --git a/src/command.hpp b/src/command.hpp index 87be8c0..7710d48 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -12,7 +12,7 @@ #include #include -namespace redisx { +namespace redox { static const int REDISX_UNINIT = -1; static const int REDISX_OK = 0; @@ -25,7 +25,7 @@ static const int REDISX_TIMEOUT = 5; template class Command { -friend class Redis; +friend class Redox; public: Command( diff --git a/src/redisx.cpp b/src/redox.cpp index 4cc171d..aa60bcc 100644 --- a/src/redisx.cpp +++ b/src/redox.cpp @@ -1,48 +1,46 @@ /** -* Redis C++11 wrapper. +* Redox C++11 wrapper. */ #include #include -//#include -#include "redisx.hpp" +#include "redox.hpp" using namespace std; -namespace redisx { +namespace redox { // Global mutex to manage waiting for connected state // TODO get rid of this as the only global variable? mutex connected_lock; -/** -* Dummy function given to hiredis to use for freeing reply -* objects, so the memory can be managed here instead. -*/ -void dummy_free_reply(void *reply) {} - -void connected(const redisAsyncContext *c, int status) { +void Redox::connected(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { cerr << "[ERROR] Connecting to Redis: " << c->errstr << endl; return; } - c->c.reader->fn->freeObject = dummy_free_reply; + // Disable hiredis automatically freeing reply objects + c->c.reader->fn->freeObject = [](void* reply) {}; + cout << "Connected to Redis." << endl; connected_lock.unlock(); } -void disconnected(const redisAsyncContext *c, int status) { +void Redox::disconnected(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { cerr << "[ERROR] Disconnecting from Redis: " << c->errstr << endl; return; } + + // Re-enable hiredis automatically freeing reply objects c->c.reader->fn->freeObject = freeReplyObject; + cout << "Disconnected from Redis." << endl; connected_lock.lock(); } -Redis::Redis(const string& host, const int port) +Redox::Redox(const string& host, const int port) : host(host), port(port), cmd_count(0), to_exit(false) { lock_guard lg(queue_guard); @@ -57,18 +55,18 @@ Redis::Redis(const string& host, const int port) } redisLibevAttach(EV_DEFAULT_ c); - redisAsyncSetConnectCallback(c, connected); - redisAsyncSetDisconnectCallback(c, disconnected); + redisAsyncSetConnectCallback(c, Redox::connected); + redisAsyncSetDisconnectCallback(c, Redox::disconnected); } -Redis::~Redis() { +Redox::~Redox() { redisAsyncDisconnect(c); stop(); } -void Redis::run_blocking() { +void Redox::run_blocking() { - // Events to connect to Redis + // Events to connect to Redox ev_run(EV_DEFAULT_ EVRUN_NOWAIT); lock_guard lg(connected_lock); @@ -85,29 +83,57 @@ void Redis::run_blocking() { exit_waiter.notify_one(); } -void Redis::run() { +void Redox::run() { event_loop_thread = thread([this] { run_blocking(); }); event_loop_thread.detach(); } -void Redis::stop() { +void Redox::stop() { to_exit = true; } -void Redis::block() { +void Redox::block() { unique_lock ul(exit_waiter_lock); exit_waiter.wait(ul, [this]() { return to_exit.load(); }); } +template +void invoke_callback( + Command* cmd_obj, + redisReply* reply +); + +template +void Redox::command_callback(redisAsyncContext *c, void *r, void *privdata) { + + auto *cmd_obj = (Command *) privdata; + cmd_obj->reply_obj = (redisReply *) r; + + if (cmd_obj->reply_obj->type == REDIS_REPLY_ERROR) { + std::cerr << "[ERROR redisx.hpp:121] " << cmd_obj->cmd << ": " << cmd_obj->reply_obj->str << std::endl; + cmd_obj->invoke_error(REDISX_ERROR_REPLY); + + } else if(cmd_obj->reply_obj->type == REDIS_REPLY_NIL) { + std::cerr << "[WARNING] " << cmd_obj->cmd << ": Nil reply." << std::endl; + cmd_obj->invoke_error(REDISX_NIL_REPLY); + + } else { + invoke_callback(cmd_obj, cmd_obj->reply_obj); + } + + // Free the reply object unless told not to + if(cmd_obj->free_memory) cmd_obj->free_reply_object(); +} + /** -* Submit an asynchronous command to the Redis server. Return +* Submit an asynchronous command to the Redox server. Return * true if succeeded, false otherwise. */ template bool submit_to_server(Command* cmd_obj) { cmd_obj->pending++; - if (redisAsyncCommand(cmd_obj->c, command_callback, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { + if (redisAsyncCommand(cmd_obj->c, Redox::command_callback, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { cerr << "[ERROR] Could not send \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl; cmd_obj->invoke_error(REDISX_SEND_ERROR); return false; @@ -129,7 +155,7 @@ void submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) } template -bool Redis::process_queued_command(void* cmd_ptr) { +bool Redox::process_queued_command(void* cmd_ptr) { auto& command_map = get_command_map(); @@ -153,7 +179,7 @@ bool Redis::process_queued_command(void* cmd_ptr) { return true; } -void Redis::process_queued_commands() { +void Redox::process_queued_commands() { lock_guard lg(queue_guard); @@ -172,20 +198,24 @@ void Redis::process_queued_commands() { } } -long Redis::num_commands_processed() { +long Redox::num_commands_processed() { lock_guard lg(queue_guard); return cmd_count; } // ---------------------------- -template<> unordered_map*>& Redis::get_command_map() { return commands_redis_reply; } +template<> unordered_map*>& +Redox::get_command_map() { return commands_redis_reply; } + template<> void invoke_callback(Command* cmd_obj, redisReply* reply) { cmd_obj->invoke(reply); } -template<> unordered_map*>& Redis::get_command_map() { return commands_string_r; } +template<> unordered_map*>& +Redox::get_command_map() { return commands_string_r; } + template<> void invoke_callback(Command* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { @@ -198,7 +228,9 @@ void invoke_callback(Command* cmd_obj, redisReply* reply) { cmd_obj->invoke(s); } -template<> unordered_map*>& Redis::get_command_map() { return commands_char_p; } +template<> unordered_map*>& +Redox::get_command_map() { return commands_char_p; } + template<> void invoke_callback(Command* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { @@ -209,7 +241,9 @@ void invoke_callback(Command* cmd_obj, redisReply* reply) { cmd_obj->invoke(reply->str); } -template<> unordered_map*>& Redis::get_command_map() { return commands_int; } +template<> unordered_map*>& +Redox::get_command_map() { return commands_int; } + template<> void invoke_callback(Command* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_INTEGER) { @@ -220,7 +254,9 @@ void invoke_callback(Command* cmd_obj, redisReply* reply) { cmd_obj->invoke((int)reply->integer); } -template<> unordered_map*>& Redis::get_command_map() { return commands_long_long_int; } +template<> unordered_map*>& +Redox::get_command_map() { return commands_long_long_int; } + template<> void invoke_callback(Command* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_INTEGER) { @@ -235,11 +271,11 @@ void invoke_callback(Command* cmd_obj, redisReply* reply) { // Helpers // ---------------------------- -void Redis::command(const string& cmd) { +void Redox::command(const string& cmd) { command(cmd, NULL); } -bool Redis::command_blocking(const string& cmd) { +bool Redox::command_blocking(const string& cmd) { Command* c = command_blocking(cmd); bool succeeded = (c->status() == REDISX_OK); c->free(); diff --git a/src/redisx.hpp b/src/redox.hpp index d17a308..879dcf0 100644 --- a/src/redisx.hpp +++ b/src/redox.hpp @@ -1,5 +1,5 @@ /** -* Redis C++11 wrapper. +* Redox C++11 wrapper. */ #pragma once @@ -22,14 +22,14 @@ #include "command.hpp" -namespace redisx { +namespace redox { -class Redis { +class Redox { public: - Redis(const std::string& host, const int port); - ~Redis(); + Redox(const std::string& host, const int port); + ~Redox(); void run(); void run_blocking(); @@ -57,13 +57,19 @@ public: long num_commands_processed(); + template + static void command_callback(redisAsyncContext *c, void *r, void *privdata); + + static void connected(const redisAsyncContext *c, int status); + static void disconnected(const redisAsyncContext *c, int status); + // void publish(std::string channel, std::string msg); // void subscribe(std::string channel, std::function callback); // void unsubscribe(std::string channel); private: - // Redis server + // Redox server std::string host; int port; @@ -98,35 +104,7 @@ private: // --------------------------- template -void invoke_callback( - Command* cmd_obj, - redisReply* reply -); - -template -void command_callback(redisAsyncContext *c, void *r, void *privdata) { - - auto *cmd_obj = (Command *) privdata; - cmd_obj->reply_obj = (redisReply *) r; - - if (cmd_obj->reply_obj->type == REDIS_REPLY_ERROR) { - std::cerr << "[ERROR redisx.hpp:121] " << cmd_obj->cmd << ": " << cmd_obj->reply_obj->str << std::endl; - cmd_obj->invoke_error(REDISX_ERROR_REPLY); - - } else if(cmd_obj->reply_obj->type == REDIS_REPLY_NIL) { - std::cerr << "[WARNING] " << cmd_obj->cmd << ": Nil reply." << std::endl; - cmd_obj->invoke_error(REDISX_NIL_REPLY); - - } else { - invoke_callback(cmd_obj, cmd_obj->reply_obj); - } - - // Free the reply object unless told not to - if(cmd_obj->free_memory) cmd_obj->free_reply_object(); -} - -template -Command* Redis::command( +Command* Redox::command( const std::string& cmd, const std::function& callback, const std::function& error_callback, @@ -142,7 +120,7 @@ Command* Redis::command( } template -bool Redis::cancel(Command* cmd_obj) { +bool Redox::cancel(Command* cmd_obj) { if(cmd_obj == NULL) { std::cerr << "[ERROR] Canceling null command." << std::endl; @@ -161,7 +139,7 @@ bool Redis::cancel(Command* cmd_obj) { } template -Command* Redis::command_blocking(const std::string& cmd) { +Command* Redox::command_blocking(const std::string& cmd) { ReplyT val; std::atomic_int status(REDISX_UNINIT);