diff --git a/examples/simple_loop.cpp b/examples/simple_loop.cpp index 107f0d6..5263f75 100644 --- a/examples/simple_loop.cpp +++ b/examples/simple_loop.cpp @@ -6,6 +6,7 @@ #include "../src/redisx.hpp" using namespace std; +using namespace redisx; double time_s() { unsigned long ms = chrono::system_clock::now().time_since_epoch() / chrono::microseconds(1); @@ -14,14 +15,22 @@ double time_s() { int main(int argc, char* argv[]) { - redisx::Redis rdx = {"localhost", 6379}; + Redis rdx = {"localhost", 6379}; rdx.run(); - rdx.command_blocking("DEL simple_loop:count"); - rdx.command_blocking("SET simple_loop:count 0"); + Command* del_cmd = rdx.command_blocking("DEL simple_loop:count"); + cout << "deleted key, reply: " << del_cmd->reply() << endl; + del_cmd->free(); - cout << "At the start, simple_loop:count = " - << rdx.command_blocking("GET simple_loop:count") << endl; + Command* set_cmd = rdx.command_blocking("SET simple_loop:count 0"); + cout << "set key, reply: " << set_cmd->reply() << endl; + set_cmd->free(); + + Command* count_cmd = rdx.command_blocking("GET simple_loop:count"); + if(count_cmd->status() == REDISX_OK) { + cout << "At the start, simple_loop:count = " << count_cmd->reply() << endl; + } + count_cmd->free(); string cmd_str = "INCR simple_loop:count"; @@ -33,7 +42,7 @@ int main(int argc, char* argv[]) { << "s for " << t << "s..." << endl; atomic_int count(0); - redisx::Command* c = rdx.command( + Command* c = rdx.command( cmd_str, [&count](const string &cmd, const int& value) { count++; }, NULL, @@ -46,7 +55,7 @@ int main(int argc, char* argv[]) { rdx.cancel(c); cout << "At the end, simple_loop:count = " - << rdx.command_blocking("GET simple_loop:count") << endl; + << rdx.command_blocking("GET simple_loop:count")->reply() << endl; rdx.stop(); diff --git a/src/command.hpp b/src/command.hpp index 8e809f1..1f8bbd4 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -14,6 +14,14 @@ namespace redisx { +static const int REDISX_UNINIT = -1; +static const int REDISX_OK = 0; +static const int REDISX_SEND_ERROR = 1; +static const int REDISX_WRONG_TYPE = 2; +static const int REDISX_NIL_REPLY = 3; +static const int REDISX_ERROR_REPLY = 4; +static const int REDISX_TIMEOUT = 5; + template class Command { @@ -21,28 +29,48 @@ friend class Redis; public: Command( - redisAsyncContext* c, - const std::string& cmd, - const std::function& callback, - const std::function& error_callback, - double repeat, double after + redisAsyncContext* c, + const std::string& cmd, + const std::function& callback, + const std::function& error_callback, + double repeat, double after, + bool free_memory ); const std::string cmd; const double repeat; const double after; + const bool free_memory; + redisAsyncContext* c; + redisReply* reply_obj; std::atomic_int pending; void invoke(const ReplyT& reply); void invoke_error(int status); + const ReplyT& reply(); + int status() { return reply_status; }; + + /** + * Called by the user to free the redisReply object, when the free_memory + * flag is set to false for a command. + */ + void free(); + + void free_reply_object(); + private: const std::function callback; - const std::function& error_callback; + const std::function error_callback; + + // Place to store the reply value and status. + // ONLY for blocking commands + ReplyT reply_val; + int reply_status; std::atomic_bool completed; @@ -61,33 +89,70 @@ Command::Command( const std::string& cmd, const std::function& callback, const std::function& error_callback, - double repeat, double after -) : cmd(cmd), repeat(repeat), after(after), c(c), pending(0), - callback(callback), error_callback(error_callback), completed(false) + double repeat, double after, bool free_memory +) : cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), c(c), reply_obj(NULL), + pending(0), callback(callback), error_callback(error_callback), completed(false) { timer_guard.lock(); } template -void Command::invoke(const ReplyT& reply) { +void Command::invoke(const ReplyT& r) { + + if(callback) callback(cmd, r); - if(callback != NULL) callback(cmd, reply); pending--; - if((pending == 0) && (completed || (repeat == 0))) { -// std::cout << "invoking success, reply: " << reply << std::endl; -// std::cout << "Freeing cmd " << cmd << " in success invoke" << std::endl; + if(!free_memory) return; + if(pending != 0) return; + if(completed || (repeat == 0)) { +// std::cout << cmd << ": suicide!" << std::endl; delete this; } } template void Command::invoke_error(int status) { - if(error_callback != NULL) error_callback(cmd, status); + + if(error_callback) error_callback(cmd, status); + pending--; - if((pending == 0) && (completed || (repeat == 0))) { -// std::cout << "Freeing cmd " << cmd << " in error invoke" << std::endl; + if(!free_memory) return; + if(pending != 0) return; + if(completed || (repeat == 0)) { +// std::cout << cmd << ": suicide!" << std::endl; delete this; } } +template +void Command::free_reply_object() { + + if(reply_obj == NULL) { + std::cerr << "[ERROR] " << cmd << ": Attempting to double free reply object!" << std::endl; + return; + } + + freeReplyObject(reply_obj); + reply_obj = NULL; +} + +template +void Command::free() { + + free_reply_object(); + + // Commit suicide +// std::cout << cmd << ": suicide, by calling free()!" << std::endl; + delete this; +} + +template +const ReplyT& Command::reply() { + if(reply_status != REDISX_OK) { + std::cout << "[WARNING] " << cmd + << ": Accessing value of reply with status != OK." << std::endl; + } + return reply_val; +} + } // End namespace redis diff --git a/src/redisx.cpp b/src/redisx.cpp index 6d8d016..49bd2fe 100644 --- a/src/redisx.cpp +++ b/src/redisx.cpp @@ -17,11 +17,19 @@ std::unordered_map Redis::timer_callbacks; // Global mutex to manage waiting for connected state mutex connected_lock; +/** +* Dummy function given to hiredis to use for freeing reply +* objects, so the memory can be managed here instead. +*/ +void dummy_free_reply(void *reply) {} + void connected(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { cerr << "[ERROR] Connecting to Redis: " << c->errstr << endl; return; } + + c->c.reader->fn->freeObject = dummy_free_reply; cout << "Connected to Redis." << endl; connected_lock.unlock(); } @@ -31,6 +39,7 @@ void disconnected(const redisAsyncContext *c, int status) { cerr << "[ERROR] Disconnecting from Redis: " << c->errstr << endl; return; } + c->c.reader->fn->freeObject = freeReplyObject; cout << "Disconnected from Redis." << endl; connected_lock.lock(); } @@ -101,7 +110,7 @@ template bool submit_to_server(Command* cmd_obj) { cmd_obj->pending++; if (redisAsyncCommand(cmd_obj->c, command_callback, (void*)cmd_obj, cmd_obj->cmd.c_str()) != REDIS_OK) { - cerr << "[ERROR] Async command \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl; + cerr << "[ERROR] Could not send \"" << cmd_obj->cmd << "\": " << cmd_obj->c->errstr << endl; cmd_obj->invoke_error(REDISX_SEND_ERROR); return false; } @@ -134,6 +143,9 @@ bool Redis::process_queued_command(void* cmd_ptr) { } else { // TODO manage memory somehow cmd_obj->timer = new ev_timer(); + + // TODO use cmd_obj->timer->data instead of timer callbacks!!!!! + timer_callbacks[cmd_obj->timer] = (void*)cmd_obj; ev_timer_init(cmd_obj->timer, submit_command_callback, cmd_obj->after, cmd_obj->repeat); ev_timer_start(EV_DEFAULT_ cmd_obj->timer); @@ -175,12 +187,6 @@ template<> void invoke_callback(Command* cmd_obj, redisReply* reply) { cmd_obj->invoke(reply); } -template<> redisReply* Redis::copy_reply(const redisReply*& reply) { - // TODO get rid of this it is dumb. - auto* copy = new redisReply; - *copy = *reply; - return copy; -} template<> unordered_map*>& Redis::get_command_map() { return commands_string_r; } template<> @@ -194,7 +200,6 @@ void invoke_callback(Command* cmd_obj, redisReply* reply) { string s = reply->str; cmd_obj->invoke(s); } -template<> string Redis::copy_reply(const string& reply) { return reply; } template<> unordered_map*>& Redis::get_command_map() { return commands_char_p; } template<> @@ -206,13 +211,6 @@ void invoke_callback(Command* cmd_obj, redisReply* reply) { } cmd_obj->invoke(reply->str); } -template<> char* Redis::copy_reply(const char*& reply) { - // Here, reply MUST be null terminated! - size_t len = strlen(reply); - auto* copy = new char[len+1]; - strcpy(copy, reply); - return copy; -} template<> unordered_map*>& Redis::get_command_map() { return commands_int; } template<> @@ -236,9 +234,6 @@ void invoke_callback(Command* cmd_obj, redisReply* reply) { cmd_obj->invoke(reply->integer); } - - - // ---------------------------- // Helpers // ---------------------------- @@ -251,35 +246,4 @@ void Redis::command_blocking(const string& cmd) { command_blocking(cmd); } -//void Redis::get(const char* key, function callback) { -// string cmd = string("GET ") + key; -// command(cmd.c_str(), callback); -//} -// -//void Redis::set(const char* key, const char* value) { -// string cmd = string("SET ") + key + " " + value; -// command(cmd.c_str(), [](const string& command, const char* reply) { -// if(strcmp(reply, "OK")) -// cerr << "[ERROR] " << command << ": SET failed with reply " << reply << endl; -// }); -//} -// -//void Redis::set(const char* key, const char* value, function callback) { -// string cmd = string("SET ") + key + " " + value; -// command(cmd.c_str(), callback); -//} -// -//void Redis::del(const char* key) { -// string cmd = string("DEL ") + key; -// command(cmd.c_str(), [](const string& command, long long int num_deleted) { -// if(num_deleted != 1) -// cerr << "[ERROR] " << command << ": Deleted " << num_deleted << " keys." << endl; -// }); -//} -// -//void Redis::del(const char* key, function callback) { -// string cmd = string("DEL ") + key; -// command(cmd.c_str(), callback); -//} - } // End namespace redis diff --git a/src/redisx.hpp b/src/redisx.hpp index 10e820b..3281568 100644 --- a/src/redisx.hpp +++ b/src/redisx.hpp @@ -22,14 +22,6 @@ #include "command.hpp" -static const int REDISX_UNINIT = -1; -static const int REDISX_OK = 0; -static const int REDISX_SEND_ERROR = 1; -static const int REDISX_WRONG_TYPE = 2; -static const int REDISX_NIL_REPLY = 3; -static const int REDISX_ERROR_REPLY = 4; -static const int REDISX_TIMEOUT = 5; - namespace redisx { class Redis { @@ -46,11 +38,12 @@ public: template Command* command( - const std::string& cmd, - const std::function& callback = NULL, - const std::function& error_callback = NULL, - double repeat = 0.0, - double after = 0.0 + const std::string& cmd, + const std::function& callback = NULL, + const std::function& error_callback = NULL, + double repeat = 0.0, + double after = 0.0, + bool free_memory = true ); template @@ -59,20 +52,12 @@ public: void command(const std::string& command); template - ReplyT command_blocking(const std::string& cmd); + Command* command_blocking(const std::string& cmd); void command_blocking(const std::string& command); long num_commands_processed(); -// void get(const char* key, std::function callback); -// -// void set(const char* key, const char* value); -// void set(const char* key, const char* value, std::function callback); -// -// void del(const char* key); -// void del(const char* key, std::function callback); - // void publish(std::string channel, std::string msg); // void subscribe(std::string channel, std::function callback); // void unsubscribe(std::string channel); @@ -113,9 +98,6 @@ private: template bool process_queued_command(void* cmd_ptr); - - template - ReplyT copy_reply(const ReplyT& reply); }; // --------------------------- @@ -129,34 +111,36 @@ void invoke_callback( template void command_callback(redisAsyncContext *c, void *r, void *privdata) { - redisReply *reply = (redisReply *) r; auto *cmd_obj = (Command *) privdata; + cmd_obj->reply_obj = (redisReply *) r; - if (reply->type == REDIS_REPLY_ERROR) { - std::cerr << "[ERROR] " << cmd_obj->cmd << ": " << reply->str << std::endl; + if (cmd_obj->reply_obj->type == REDIS_REPLY_ERROR) { + std::cerr << "[ERROR redisx.hpp:121] " << cmd_obj->cmd << ": " << cmd_obj->reply_obj->str << std::endl; cmd_obj->invoke_error(REDISX_ERROR_REPLY); - return; - } - if(reply->type == REDIS_REPLY_NIL) { + } else if(cmd_obj->reply_obj->type == REDIS_REPLY_NIL) { std::cerr << "[WARNING] " << cmd_obj->cmd << ": Nil reply." << std::endl; cmd_obj->invoke_error(REDISX_NIL_REPLY); - return; + + } else { + invoke_callback(cmd_obj, cmd_obj->reply_obj); } - invoke_callback(cmd_obj, reply); + // Free the reply object unless told not to + if(cmd_obj->free_memory) cmd_obj->free_reply_object(); } template Command* Redis::command( - const std::string& cmd, - const std::function& callback, - const std::function& error_callback, - double repeat, - double after + const std::string& cmd, + const std::function& callback, + const std::function& error_callback, + double repeat, + double after, + bool free_memory ) { std::lock_guard lg(queue_guard); - auto* cmd_obj = new Command(c, cmd, callback, error_callback, repeat, after); + auto* cmd_obj = new Command(c, cmd, callback, error_callback, repeat, after, free_memory); get_command_map()[(void*)cmd_obj] = cmd_obj; command_queue.push((void*)cmd_obj); return cmd_obj; @@ -183,43 +167,39 @@ bool Redis::cancel(Command* cmd_obj) { } template -ReplyT Redis::command_blocking(const std::string& cmd) { - std::mutex m; - std::condition_variable cv; +Command* Redis::command_blocking(const std::string& cmd) { + ReplyT val; std::atomic_int status(REDISX_UNINIT); - // There is a memory issue here, because after the callback returns - // all memory is cleared. - // TODO right now just don't use char* or redisReply* for blocking - // Later, maybe specialize a function to copy the pointer types to - // the heap + std::condition_variable cv; + std::mutex m; + + std::unique_lock lk(m); - command(cmd, - [&cv, &val, &status](const std::string& cmd_str, const ReplyT& reply) { - std::cout << "cmd: " << cmd_str << std::endl; - std::cout << "invoking success, reply: " << reply << std::endl; + Command* cmd_obj = command(cmd, + [&val, &status, &m, &cv](const std::string& cmd_str, const ReplyT& reply) { + std::unique_lock lk(m); val = reply; - std::cout << "invoking success, reply copied: " << val << std::endl; status = REDISX_OK; + lk.unlock(); cv.notify_one(); }, - [&cv, &status](const std::string& cmd_str, int error) { + [&status, &m, &cv](const std::string& cmd_str, int error) { + std::unique_lock lk(m); status = error; + lk.unlock(); cv.notify_one(); - } + }, + 0, 0, false // No repeats, don't free memory ); - std::unique_lock ul(m); - cv.wait(ul, [&status] { return status != REDISX_UNINIT; }); + cv.wait(lk, [&status] { return status != REDISX_UNINIT; }); - std::cout << "invoking success, after wait: " << val << std::endl; - std::cout << "response: " << status << std::endl; - if(status == REDISX_OK) return val; - else if(status == REDISX_NIL_REPLY) return val; - else throw std::runtime_error( - "[ERROR] " + cmd + ": redisx error code " + std::to_string(status.load()) - ); + cmd_obj->reply_val = val; + cmd_obj->reply_status = status; + + return cmd_obj; } } // End namespace redis