/** * Redis C++11 wrapper. */ #include #include #include #include #include #include #include #include "redisx.hpp" using namespace std; namespace redisx { mutex connected_lock; void connected(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { cerr << "[ERROR] Connecting to Redis: " << c->errstr << endl; return; } cout << "Connected to Redis." << endl; connected_lock.unlock(); } void disconnected(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { cerr << "[ERROR] Disconnecting from Redis: " << c->errstr << endl; return; } cout << "Disconnected from Redis." << endl; connected_lock.lock(); } Redis::Redis(const string& host, const int port) : host(host), port(port), io_ops(0), to_exit(false) { lock_guard lg(queue_guard); connected_lock.lock(); signal(SIGPIPE, SIG_IGN); c = redisAsyncConnect(host.c_str(), port); if (c->err) { printf("Error: %s\n", c->errstr); return; } redisLibevAttach(EV_DEFAULT_ c); redisAsyncSetConnectCallback(c, connected); redisAsyncSetDisconnectCallback(c, disconnected); } Redis::~Redis() { redisAsyncDisconnect(c); stop(); } void Redis::run_blocking() { // Events to connect to Redis ev_run(EV_DEFAULT_ EVRUN_NOWAIT); lock_guard lg(connected_lock); // Continuously create events and handle them while (!to_exit) { process_queued_commands(); ev_run(EV_DEFAULT_ EVRUN_NOWAIT); } // Handle exit events ev_run(EV_DEFAULT_ EVRUN_NOWAIT); } void Redis::run() { event_loop_thread = thread([this] { run_blocking(); }); event_loop_thread.detach(); } void Redis::stop() { to_exit = true; } template bool Redis::submit_to_server(const CommandAsync* cmd_obj) { if (redisAsyncCommand(c, command_callback, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << c->errstr << endl; delete cmd_obj; return false; } return true; } template bool Redis::process_queued_command(void* cmd_ptr) { auto& command_map = get_command_map(); auto it = command_map.find(cmd_ptr); if(it == command_map.end()) return false; CommandAsync* cmd_obj = it->second; command_map.erase(cmd_ptr); submit_to_server(cmd_obj); return true; } void Redis::process_queued_commands() { lock_guard lg(queue_guard); while(!command_queue.empty()) { void* cmd_ptr = command_queue.front(); if(process_queued_command(cmd_ptr)) {} else if(process_queued_command(cmd_ptr)) {} else if(process_queued_command(cmd_ptr)) {} else if(process_queued_command(cmd_ptr)) {} else if(process_queued_command(cmd_ptr)) {} else throw runtime_error("[FATAL] Command pointer not found in any queue!"); command_queue.pop(); } } // ---------------------------- template<> unordered_map*>& Redis::get_command_map() { return commands_redis_reply; } template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { cmd_obj->invoke(reply); } template<> unordered_map*>& Redis::get_command_map() { return commands_string_r; } template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl; return; } cmd_obj->invoke(reply->str); } template<> unordered_map*>& Redis::get_command_map() { return commands_char_p; } template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_STRING && reply->type != REDIS_REPLY_STATUS) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-string reply." << endl; return; } cmd_obj->invoke(reply->str); } template<> unordered_map*>& Redis::get_command_map() { return commands_int; } template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_INTEGER) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl; return; } cmd_obj->invoke((int)reply->integer); } template<> unordered_map*>& Redis::get_command_map() { return commands_long_long_int; } template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { if(reply->type != REDIS_REPLY_INTEGER) { cerr << "[ERROR] " << cmd_obj->cmd << ": Received non-integer reply." << endl; return; } cmd_obj->invoke(reply->integer); } // ---------------------------- // Helpers // ---------------------------- void Redis::command(const char* cmd) { command(cmd, NULL); } //void Redis::get(const char* key, function callback) { // string cmd = string("GET ") + key; // command(cmd.c_str(), callback); //} // //void Redis::set(const char* key, const char* value) { // string cmd = string("SET ") + key + " " + value; // command(cmd.c_str(), [](const string& command, const char* reply) { // if(strcmp(reply, "OK")) // cerr << "[ERROR] " << command << ": SET failed with reply " << reply << endl; // }); //} // //void Redis::set(const char* key, const char* value, function callback) { // string cmd = string("SET ") + key + " " + value; // command(cmd.c_str(), callback); //} // //void Redis::del(const char* key) { // string cmd = string("DEL ") + key; // command(cmd.c_str(), [](const string& command, long long int num_deleted) { // if(num_deleted != 1) // cerr << "[ERROR] " << command << ": Deleted " << num_deleted << " keys." << endl; // }); //} // //void Redis::del(const char* key, function callback) { // string cmd = string("DEL ") + key; // command(cmd.c_str(), callback); //} } // End namespace redis