/** * Redox - A modern, asynchronous, and wicked fast C++11 client for Redis * * https://github.com/hmartiro/redox * * Copyright 2015 - Hayk Martirosyan * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include "client.hpp" using namespace std; namespace redox { Redox::Redox( ostream& log_stream, log::Level log_level ) : logger_(log_stream, log_level), evloop_(nullptr) {} bool Redox::connect( const std::string& host, const int port, std::function connection_callback ) { host_ = host; port_ = port; user_connection_callback_ = connection_callback; if(!initEv()) return false; // Connect over TCP ctx_ = redisAsyncConnect(host.c_str(), port); if(!initHiredis()) return false; event_loop_thread_ = thread([this] { runEventLoop(); }); // Block until connected and running the event loop, or until // a connection error happens and the event loop exits unique_lock ul(running_waiter_lock_); running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); // Return if succeeded return connect_state_ == CONNECTED; } bool Redox::connectUnix( const std::string& path, std::function connection_callback ) { path_ = path; user_connection_callback_ = connection_callback; if(!initEv()) return false; // Connect over unix sockets ctx_ = redisAsyncConnectUnix(path.c_str()); if(!initHiredis()) return false; event_loop_thread_ = thread([this] { runEventLoop(); }); // Block until connected and running the event loop, or until // a connection error happens and the event loop exits unique_lock ul(running_waiter_lock_); running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); // Return if succeeded return connect_state_ == CONNECTED; } void Redox::disconnect() { stop(); wait(); } void Redox::stop() { to_exit_ = true; logger_.debug() << "stop() called, breaking event loop"; ev_async_send(evloop_, &watcher_stop_); } void Redox::wait() { unique_lock ul(exit_waiter_lock_); exit_waiter_.wait(ul, [this] { return exited_.load(); }); } Redox::~Redox() { // Bring down the event loop if(running_ == true) { stop(); } if(event_loop_thread_.joinable()) event_loop_thread_.join(); if(evloop_ != nullptr) ev_loop_destroy(evloop_); } void Redox::connectedCallback(const redisAsyncContext* ctx, int status) { Redox* rdx = (Redox*) ctx->data; if (status != REDIS_OK) { rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr; rdx->connect_state_ = CONNECT_ERROR; } else { // Disable hiredis automatically freeing reply objects ctx->c.reader->fn->freeObject = [](void *reply) {}; rdx->connect_state_ = CONNECTED; rdx->logger_.info() << "Connected to Redis."; } rdx->connect_waiter_.notify_all(); if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_); } void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) { Redox* rdx = (Redox*) ctx->data; if (status != REDIS_OK) { rdx->logger_.error() << "Disconnected from Redis on error: " << ctx->errstr; rdx->connect_state_ = DISCONNECT_ERROR; } else { rdx->logger_.info() << "Disconnected from Redis as planned."; rdx->connect_state_ = DISCONNECTED; } rdx->stop(); rdx->connect_waiter_.notify_all(); if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_); } bool Redox::initEv() { signal(SIGPIPE, SIG_IGN); evloop_ = ev_loop_new(EVFLAG_AUTO); if(evloop_ == nullptr) { logger_.fatal() << "Could not create a libev event loop."; connect_state_ = INIT_ERROR; connect_waiter_.notify_all(); return false; } ev_set_userdata(evloop_, (void*)this); // Back-reference return true; } bool Redox::initHiredis() { ctx_->data = (void*)this; // Back-reference if (ctx_->err) { logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr; connect_state_ = INIT_ERROR; connect_waiter_.notify_all(); return false; } // Attach event loop to hiredis if(redisLibevAttach(evloop_, ctx_) != REDIS_OK) { logger_.fatal() << "Could not attach libev event loop to hiredis."; connect_state_ = INIT_ERROR; connect_waiter_.notify_all(); return false; } // Set the callbacks to be invoked on server connection/disconnection if(redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) { logger_.fatal() << "Could not attach connect callback to hiredis."; connect_state_ = INIT_ERROR; connect_waiter_.notify_all(); return false; } if(redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) { logger_.fatal() << "Could not attach disconnect callback to hiredis."; connect_state_ = INIT_ERROR; connect_waiter_.notify_all(); return false; } return true; } void breakEventLoop(struct ev_loop* loop, ev_async* async, int revents) { ev_break(loop, EVBREAK_ALL); } void Redox::runEventLoop() { // Events to connect to Redox ev_run(evloop_, EVRUN_NOWAIT); // Block until connected to Redis, or error unique_lock ul(connect_lock_); connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; }); // Handle connection error if(connect_state_ != CONNECTED) { logger_.warning() << "Did not connect, event loop exiting."; running_waiter_.notify_one(); return; } // Set up asynchronous watcher which we signal every // time we add a command ev_async_init(&watcher_command_, processQueuedCommands); ev_async_start(evloop_, &watcher_command_); // Set up an async watcher to break the loop ev_async_init(&watcher_stop_, breakEventLoop); ev_async_start(evloop_, &watcher_stop_); // Set up an async watcher which we signal every time // we want a command freed ev_async_init(&watcher_free_, freeQueuedCommands); ev_async_start(evloop_, &watcher_free_); running_ = true; running_waiter_.notify_one(); // Run the event loop // TODO this hogs resources, but ev_run(evloop_) without // the manual loop is slower. Maybe use a CV to run sparsely // unless there are commands to process? while (!to_exit_) { ev_run(evloop_, EVRUN_NOWAIT); } logger_.info() << "Stop signal detected. Closing down event loop."; // Signal event loop to free all commands freeAllCommands(); // Wait to receive server replies for clean hiredis disconnect this_thread::sleep_for(chrono::milliseconds(10)); ev_run(evloop_, EVRUN_NOWAIT); if(connect_state_ == CONNECTED) redisAsyncDisconnect(ctx_); // Run once more to disconnect ev_run(evloop_, EVRUN_NOWAIT); if(commands_created_ != commands_deleted_) { logger_.error() << "All commands were not freed! " << commands_deleted_ << "/" << commands_created_; } exited_ = true; running_ = false; // Let go for block_until_stopped method exit_waiter_.notify_one(); logger_.info() << "Event thread exited."; } template Command* Redox::findCommand(long id) { lock_guard lg(command_map_guard_); auto& command_map = getCommandMap(); auto it = command_map.find(id); if(it == command_map.end()) return nullptr; return it->second; } template void Redox::commandCallback(redisAsyncContext* ctx, void* r, void* privdata) { Redox* rdx = (Redox*) ctx->data; long id = (long)privdata; redisReply* reply_obj = (redisReply*) r; Command* c = rdx->findCommand(id); if(c == nullptr) { // rdx->logger.warning() << "Couldn't find Command " << id << " in command_map (commandCallback)."; freeReplyObject(reply_obj); return; } c->processReply(reply_obj); } template bool Redox::submitToServer(Command* c) { Redox* rdx = c->rdx_; c->pending_++; // Construct a char** from the vector vector argv; transform(c->cmd_.begin(), c->cmd_.end(), back_inserter(argv), [](const string& s){ return s.c_str(); } ); // Construct a size_t* of string lengths from the vector vector argvlen; transform(c->cmd_.begin(), c->cmd_.end(), back_inserter(argvlen), [](const string& s) { return s.size(); } ); if(redisAsyncCommandArgv(rdx->ctx_, commandCallback, (void*) c->id_, argv.size(), &argv[0], &argvlen[0]) != REDIS_OK) { rdx->logger_.error() << "Could not send \"" << c->cmd() << "\": " << rdx->ctx_->errstr; c->reply_status_ = Command::SEND_ERROR; c->invoke(); return false; } return true; } template void Redox::submitCommandCallback(struct ev_loop* loop, ev_timer* timer, int revents) { Redox* rdx = (Redox*) ev_userdata(loop); long id = (long)timer->data; Command* c = rdx->findCommand(id); if(c == nullptr) { rdx->logger_.error() << "Couldn't find Command " << id << " in command_map (submitCommandCallback)."; return; } submitToServer(c); } template bool Redox::processQueuedCommand(long id) { Command* c = findCommand(id); if(c == nullptr) return false; if((c->repeat_ == 0) && (c->after_ == 0)) { submitToServer(c); } else { c->timer_.data = (void*)c->id_; ev_timer_init(&c->timer_, submitCommandCallback, c->after_, c->repeat_); ev_timer_start(evloop_, &c->timer_); c->timer_guard_.unlock(); } return true; } void Redox::processQueuedCommands(struct ev_loop* loop, ev_async* async, int revents) { Redox* rdx = (Redox*) ev_userdata(loop); lock_guard lg(rdx->queue_guard_); while(!rdx->command_queue_.empty()) { long id = rdx->command_queue_.front(); rdx->command_queue_.pop(); if(rdx->processQueuedCommand(id)) {} else if(rdx->processQueuedCommand(id)) {} else if(rdx->processQueuedCommand(id)) {} else if(rdx->processQueuedCommand(id)) {} else if(rdx->processQueuedCommand(id)) {} else if(rdx->processQueuedCommand(id)) {} else if(rdx->processQueuedCommand>(id)) {} else if(rdx->processQueuedCommand>(id)) {} else if(rdx->processQueuedCommand>(id)) {} else throw runtime_error("Command pointer not found in any queue!"); } } void Redox::freeQueuedCommands(struct ev_loop* loop, ev_async* async, int revents) { Redox* rdx = (Redox*) ev_userdata(loop); lock_guard lg(rdx->free_queue_guard_); while(!rdx->commands_to_free_.empty()) { long id = rdx->commands_to_free_.front(); rdx->commands_to_free_.pop(); if(rdx->freeQueuedCommand(id)) {} else if(rdx->freeQueuedCommand(id)) {} else if(rdx->freeQueuedCommand(id)) {} else if(rdx->freeQueuedCommand(id)) {} else if(rdx->freeQueuedCommand(id)) {} else if(rdx->freeQueuedCommand(id)) {} else if(rdx->freeQueuedCommand>(id)) {} else if(rdx->freeQueuedCommand>(id)) {} else if(rdx->freeQueuedCommand>(id)) {} else {} } } template bool Redox::freeQueuedCommand(long id) { Command* c = findCommand(id); if(c == nullptr) return false; c->freeReply(); // Stop the libev timer if this is a repeating command if((c->repeat_ != 0) || (c->after_ != 0)) { lock_guard lg(c->timer_guard_); ev_timer_stop(c->rdx_->evloop_, &c->timer_); } deregisterCommand(c->id_); delete c; return true; } long Redox::freeAllCommands() { return freeAllCommandsOfType() + freeAllCommandsOfType() + freeAllCommandsOfType() + freeAllCommandsOfType() + freeAllCommandsOfType() + freeAllCommandsOfType() + freeAllCommandsOfType>() + freeAllCommandsOfType>() + freeAllCommandsOfType>(); } template long Redox::freeAllCommandsOfType() { lock_guard lg(free_queue_guard_); lock_guard lg2(queue_guard_); auto& command_map = getCommandMap(); long len = command_map.size(); for(auto& pair : command_map) { Command* c = pair.second; c->freeReply(); // Stop the libev timer if this is a repeating command if((c->repeat_ != 0) || (c->after_ != 0)) { lock_guard lg3(c->timer_guard_); ev_timer_stop(c->rdx_->evloop_, &c->timer_); } delete c; } command_map.clear(); commands_deleted_ += len; return len; } // --------------------------------- // get_command_map specializations // --------------------------------- template<> unordered_map*>& Redox::getCommandMap() { return commands_redis_reply_; } template<> unordered_map*>& Redox::getCommandMap() { return commands_string_; } template<> unordered_map*>& Redox::getCommandMap() { return commands_char_p_; } template<> unordered_map*>& Redox::getCommandMap() { return commands_int_; } template<> unordered_map*>& Redox::getCommandMap() { return commands_long_long_int_; } template<> unordered_map*>& Redox::getCommandMap() { return commands_null_; } template<> unordered_map>*>& Redox::getCommandMap>() { return commands_vector_string_; } template<> unordered_map>*>& Redox::getCommandMap>() { return commands_set_string_; } template<> unordered_map>*>& Redox::getCommandMap>() { return commands_unordered_set_string_; } // ---------------------------- // Helpers // ---------------------------- string Redox::vecToStr(const vector& vec, const char delimiter) { string str; for(size_t i = 0; i < vec.size() - 1; i++) str += vec[i] + delimiter; str += vec[vec.size()-1]; return str; } vector Redox::strToVec(const string& s, const char delimiter) { vector vec; size_t last = 0; size_t next = 0; while ((next = s.find(delimiter, last)) != string::npos) { vec.push_back(s.substr(last, next-last)); last = next + 1; } vec.push_back(s.substr(last)); return vec; } void Redox::command(const std::vector& cmd) { command(cmd, nullptr); } bool Redox::commandSync(const std::vector& cmd) { auto& c = commandSync(cmd); bool succeeded = c.ok(); c.free(); return succeeded; } string Redox::get(const string& key) { Command& c = commandSync({"GET", key}); if(!c.ok()) { throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + to_string(c.status())); } string reply = c.reply(); c.free(); return reply; }; bool Redox::set(const string& key, const string& value) { return commandSync({"SET", key, value}); } bool Redox::del(const string& key) { return commandSync({"DEL", key}); } void Redox::publish(const string& topic, const string& msg) { command({"PUBLISH", topic, msg}); } } // End namespace redis