/** * Redis C++11 wrapper. */ #include #include "redox.hpp" using namespace std; namespace redox { void Redox::connected_callback(const redisAsyncContext *ctx, int status) { Redox* rdx = (Redox*) ctx->data; if (status != REDIS_OK) { cerr << "[ERROR] Connecting to Redis: " << ctx->errstr << endl; rdx->connect_state = REDOX_CONNECT_ERROR; } else { // Disable hiredis automatically freeing reply objects ctx->c.reader->fn->freeObject = [](void *reply) {}; rdx->connect_state = REDOX_CONNECTED; cout << "[INFO] Connected to Redis." << endl; } rdx->connect_waiter.notify_all(); if(rdx->user_connection_callback) rdx->user_connection_callback(rdx->connect_state); } void Redox::disconnected_callback(const redisAsyncContext *ctx, int status) { Redox* rdx = (Redox*) ctx->data; if (status != REDIS_OK) { cerr << "[ERROR] Disconnecting from Redis: " << ctx->errstr << endl; rdx->connect_state = REDOX_DISCONNECT_ERROR; } else { cout << "[INFO] Disconnected from Redis as planned." << endl; rdx->connect_state = REDOX_DISCONNECTED; } rdx->stop_signal(); rdx->connect_waiter.notify_all(); if(rdx->user_connection_callback) rdx->user_connection_callback(rdx->connect_state); } Redox::Redox( const string& host, const int port, std::function connection_callback ) : host(host), port(port), user_connection_callback(connection_callback) { // libev setup signal(SIGPIPE, SIG_IGN); evloop = ev_loop_new(EVFLAG_AUTO); ev_set_userdata(evloop, (void*)this); // Back-reference // Create a redisAsyncContext ctx = redisAsyncConnect(host.c_str(), port); ctx->data = (void*)this; // Back-reference if (ctx->err) { cout << "[ERROR] Could not create a hiredis context: " << ctx->errstr << endl; connect_state = REDOX_CONNECT_ERROR; connect_waiter.notify_all(); return; } // Attach event loop to hiredis redisLibevAttach(evloop, ctx); // Set the callbacks to be invoked on server connection/disconnection redisAsyncSetConnectCallback(ctx, Redox::connected_callback); redisAsyncSetDisconnectCallback(ctx, Redox::disconnected_callback); } void Redox::run_event_loop() { // Events to connect to Redox ev_run(evloop, EVRUN_NOWAIT); // Block until connected to Redis, or error unique_lock ul(connect_lock); connect_waiter.wait(ul, [this] { return connect_state != REDOX_NOT_YET_CONNECTED; }); // Handle connection error if(connect_state != REDOX_CONNECTED) { cout << "[INFO] Did not connect, event loop exiting." << endl; running_waiter.notify_one(); return; } // Set up asynchronous watcher which we signal every // time we add a command ev_async_init(&async_w, process_queued_commands); ev_async_start(evloop, &async_w); running = true; running_waiter.notify_one(); // Run the event loop while (!to_exit) { ev_run(evloop, EVRUN_NOWAIT); } cout << "[INFO] Stop signal detected." << endl; // Run a few more times to clear out canceled events for(int i = 0; i < 100; i++) { ev_run(evloop, EVRUN_NOWAIT); } if(commands_created != commands_deleted) { cerr << "[ERROR] All commands were not freed! " << commands_deleted << "/" << commands_created << endl; } exited = true; running = false; // Let go for block_until_stopped method exit_waiter.notify_one(); cout << "[INFO] Event thread exited." << endl; } bool Redox::start() { event_loop_thread = thread([this] { run_event_loop(); }); // Block until connected and running the event loop, or until // a connection error happens and the event loop exits unique_lock ul(running_waiter_lock); running_waiter.wait(ul, [this] { return running.load() || connect_state == REDOX_CONNECT_ERROR; }); // Return if succeeded return connect_state == REDOX_CONNECTED; } void Redox::stop_signal() { to_exit = true; ev_break(evloop, EVBREAK_ALL); } void Redox::block() { unique_lock ul(exit_waiter_lock); exit_waiter.wait(ul, [this] { return exited.load(); }); } void Redox::stop() { stop_signal(); block(); } void Redox::disconnect() { stop_signal(); if(connect_state == REDOX_CONNECTED) { redisAsyncDisconnect(ctx); block(); } } Redox::~Redox() { disconnect(); if(event_loop_thread.joinable()) event_loop_thread.join(); ev_loop_destroy(evloop); std::cout << "[INFO] Redox created " << commands_created << " Commands and freed " << commands_deleted << "." << std::endl; } template Command* Redox::find_command(long id) { lock_guard lg(command_map_guard); auto& command_map = get_command_map(); auto it = command_map.find(id); if(it == command_map.end()) return nullptr; return it->second; } template void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { Redox* rdx = (Redox*) ctx->data; long id = (long)privdata; redisReply* reply_obj = (redisReply*) r; Command* c = rdx->find_command(id); if(c == nullptr) { // cout << "[WARNING] Couldn't find Command " << id << " in command_map (command_callback)." << endl; freeReplyObject(reply_obj); return; } c->reply_obj = reply_obj; c->process_reply(); // Increment the Redox object command counter rdx->cmd_count++; } /** * Submit an asynchronous command to the Redox server. Return * true if succeeded, false otherwise. */ template bool Redox::submit_to_server(Command* c) { c->pending++; if (redisAsyncCommand(c->rdx->ctx, command_callback, (void*)c->id, c->cmd.c_str()) != REDIS_OK) { cerr << "[ERROR] Could not send \"" << c->cmd << "\": " << c->rdx->ctx->errstr << endl; c->invoke_error(REDOX_SEND_ERROR); return false; } return true; } template void Redox::submit_command_callback(struct ev_loop* loop, ev_timer* timer, int revents) { Redox* rdx = (Redox*) ev_userdata(loop); long id = (long)timer->data; Command* c = rdx->find_command(id); if(c == nullptr) { cout << "[ERROR] Couldn't find Command " << id << " in command_map (submit_command_callback)." << endl; return; } if(c->is_canceled()) { // cout << "[INFO] Command " << c << " is completed, stopping event timer." << endl; c->timer_guard.lock(); if((c->repeat != 0) || (c->after != 0)) ev_timer_stop(loop, &c->timer); c->timer_guard.unlock(); // Mark for memory to be freed when all callbacks are received c->timer.data = (void*)0; return; } submit_to_server(c); } template bool Redox::process_queued_command(long id) { Command* c = find_command(id); if(c == nullptr) return false; if((c->repeat == 0) && (c->after == 0)) { submit_to_server(c); } else { c->timer.data = (void*)c->id; ev_timer_init(&c->timer, submit_command_callback, c->after, c->repeat); ev_timer_start(evloop, &c->timer); c->timer_guard.unlock(); } return true; } void Redox::process_queued_commands(struct ev_loop* loop, ev_async* async, int revents) { Redox* rdx = (Redox*) ev_userdata(loop); lock_guard lg(rdx->queue_guard); while(!rdx->command_queue.empty()) { long id = rdx->command_queue.front(); rdx->command_queue.pop(); if(rdx->process_queued_command(id)) {} else if(rdx->process_queued_command(id)) {} else if(rdx->process_queued_command(id)) {} else if(rdx->process_queued_command(id)) {} else if(rdx->process_queued_command(id)) {} else if(rdx->process_queued_command(id)) {} else if(rdx->process_queued_command>(id)) {} else throw runtime_error("[FATAL] Command pointer not found in any queue!"); } } // ---------------------------- template<> unordered_map*>& Redox::get_command_map() { // cout << "redis reply command map at " << &commands_redis_reply << endl; return commands_redis_reply; } template<> unordered_map*>& Redox::get_command_map() { // cout << "string command map at " << &commands_string_r << endl; return commands_string_r; } template<> unordered_map*>& Redox::get_command_map() { // cout << "char* command map at " << &commands_char_p << endl; return commands_char_p; } template<> unordered_map*>& Redox::get_command_map() { // cout << "int command map at " << &commands_int << " has size: " << commands_int.size() << endl; return commands_int; } template<> unordered_map*>& Redox::get_command_map() { // cout << "long long int command map at " << &commands_long_long_int << endl; return commands_long_long_int; } template<> unordered_map*>& Redox::get_command_map() { return commands_null; } template<> unordered_map>*>& Redox::get_command_map>() { return commands_vector_string; } // ---------------------------- // Helpers // ---------------------------- void Redox::command(const string& cmd) { command(cmd); } bool Redox::command_blocking(const string& cmd) { Command* c = command_blocking(cmd); bool succeeded = (c->status() == REDOX_OK); c->free(); return succeeded; } string Redox::get(const string& key) { auto c = command_blocking("GET " + key); if(!c->ok()) { throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + to_string(c->status())); } string reply = c->reply(); c->free(); return reply; }; bool Redox::set(const std::string& key, const std::string& value) { return command_blocking("SET " + key + " " + value); } bool Redox::del(const std::string& key) { return command_blocking("DEL " + key); } } // End namespace redis