diff --git a/src/command.cpp b/src/command.cpp index 1841730..ad62bc5 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -9,179 +9,288 @@ #include "command.hpp" #include "redox.hpp" +using namespace std; + namespace redox { template -bool Command::is_error_reply() { +Command::Command( + Redox* rdx, + long id, + const std::string& cmd, + const std::function& callback, + const std::function& error_callback, + double repeat, double after, bool free_memory, log::Logger& logger +) : rdx_(rdx), id_(id), cmd_(cmd), repeat_(repeat), after_(after), free_memory_(free_memory), + success_callback_(callback), error_callback_(error_callback), logger_(logger) { + timer_guard_.lock(); +} + +template +void Command::processReply(redisReply* r) { + + free_guard_.lock(); + + reply_obj_ = r; + handleCallback(); + + pending_--; + + // Allow free() method to free memory + if (!free_memory_) { +// logger.trace() << "Command memory not being freed, free_memory = " << free_memory; + free_guard_.unlock(); + return; + } + + freeReply(); + + // Handle memory if all pending replies have arrived + if (pending_ == 0) { + + // Just free non-repeating commands + if (repeat_ == 0) { + freeCommand(this); + return; + + // Free repeating commands if timer is stopped + } else { + if ((long)(timer_.data) == 0) { + freeCommand(this); + return; + } + } + } + + free_guard_.unlock(); +} + +template +void Command::free() { + + free_guard_.lock(); + freeReply(); + free_guard_.unlock(); - if (reply_obj->type == REDIS_REPLY_ERROR) { - logger.error() << cmd << ": " << reply_obj->str; + freeCommand(this); +} + +template +void Command::freeReply() { + + if (reply_obj_ == nullptr) { + logger_.error() << cmd_ << ": Attempting to double free reply object."; + return; + } + + freeReplyObject(reply_obj_); + reply_obj_ = nullptr; +} + +template +void Command::freeCommand(Command* c) { + c->rdx_->template remove_active_command(c->id_); +// logger.debug() << "Deleted Command " << c->id << " at " << c; + delete c; +} + + +template +const ReplyT& Command::reply() { + if (!ok()) { + logger_.warning() << cmd_ << ": Accessing value of reply with status != OK."; + } + return reply_val_; +} + +template +bool Command::isErrorReply() { + + if (reply_obj_->type == REDIS_REPLY_ERROR) { + logger_.error() << cmd_ << ": " << reply_obj_->str; return true; } return false; } template -bool Command::is_nil_reply() { +bool Command::isNilReply() { - if (reply_obj->type == REDIS_REPLY_NIL) { - logger.warning() << cmd << ": Nil reply."; + if (reply_obj_->type == REDIS_REPLY_NIL) { + logger_.warning() << cmd_ << ": Nil reply."; return true; } return false; } +// ---------------------------------------------------------------------------- +// Specializations of handleCallback for all data types +// ---------------------------------------------------------------------------- + template<> -void Command::invoke_callback() { - invoke(reply_obj); +void Command::handleCallback() { + invokeSuccess(reply_obj_); } template<> -void Command::invoke_callback() { +void Command::handleCallback() { - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); - else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY); + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); + else if(isNilReply()) invokeError(REDOX_NIL_REPLY); - else if(reply_obj->type != REDIS_REPLY_STRING && reply_obj->type != REDIS_REPLY_STATUS) { - logger.error() << cmd << ": Received non-string reply."; - invoke_error(REDOX_WRONG_TYPE); + else if(reply_obj_->type != REDIS_REPLY_STRING && reply_obj_->type != REDIS_REPLY_STATUS) { + logger_.error() << cmd_ << ": Received non-string reply."; + invokeError(REDOX_WRONG_TYPE); } else { - std::string s(reply_obj->str, reply_obj->len); - invoke(s); + string s(reply_obj_->str, reply_obj_->len); + invokeSuccess(s); } } template<> -void Command::invoke_callback() { +void Command::handleCallback() { - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); - else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY); + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); + else if(isNilReply()) invokeError(REDOX_NIL_REPLY); - else if(reply_obj->type != REDIS_REPLY_STRING && reply_obj->type != REDIS_REPLY_STATUS) { - logger.error() << cmd << ": Received non-string reply."; - invoke_error(REDOX_WRONG_TYPE); + else if(reply_obj_->type != REDIS_REPLY_STRING && reply_obj_->type != REDIS_REPLY_STATUS) { + logger_.error() << cmd_ << ": Received non-string reply."; + invokeError(REDOX_WRONG_TYPE); } else { - invoke(reply_obj->str); + invokeSuccess(reply_obj_->str); } } template<> -void Command::invoke_callback() { +void Command::handleCallback() { - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); - else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY); + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); + else if(isNilReply()) invokeError(REDOX_NIL_REPLY); - else if(reply_obj->type != REDIS_REPLY_INTEGER) { - logger.error() << cmd << ": Received non-integer reply."; - invoke_error(REDOX_WRONG_TYPE); + else if(reply_obj_->type != REDIS_REPLY_INTEGER) { + logger_.error() << cmd_ << ": Received non-integer reply."; + invokeError(REDOX_WRONG_TYPE); } else { - invoke((int) reply_obj->integer); + invokeSuccess((int) reply_obj_->integer); } } template<> -void Command::invoke_callback() { +void Command::handleCallback() { - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); - else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY); + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); + else if(isNilReply()) invokeError(REDOX_NIL_REPLY); - else if(reply_obj->type != REDIS_REPLY_INTEGER) { - logger.error() << cmd << ": Received non-integer reply."; - invoke_error(REDOX_WRONG_TYPE); + else if(reply_obj_->type != REDIS_REPLY_INTEGER) { + logger_.error() << cmd_ << ": Received non-integer reply."; + invokeError(REDOX_WRONG_TYPE); } else { - invoke(reply_obj->integer); + invokeSuccess(reply_obj_->integer); } } template<> -void Command::invoke_callback() { +void Command::handleCallback() { - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); - else if(reply_obj->type != REDIS_REPLY_NIL) { - logger.error() << cmd << ": Received non-nil reply."; - invoke_error(REDOX_WRONG_TYPE); + else if(reply_obj_->type != REDIS_REPLY_NIL) { + logger_.error() << cmd_ << ": Received non-nil reply."; + invokeError(REDOX_WRONG_TYPE); } else { - invoke(nullptr); + invokeSuccess(nullptr); } } template<> -void Command>::invoke_callback() { +void Command>::handleCallback() { - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); - else if(reply_obj->type != REDIS_REPLY_ARRAY) { - logger.error() << cmd << ": Received non-array reply."; - invoke_error(REDOX_WRONG_TYPE); + else if(reply_obj_->type != REDIS_REPLY_ARRAY) { + logger_.error() << cmd_ << ": Received non-array reply."; + invokeError(REDOX_WRONG_TYPE); } else { - std::vector v; - size_t count = reply_obj->elements; + vector v; + size_t count = reply_obj_->elements; for(size_t i = 0; i < count; i++) { - redisReply* r = *(reply_obj->element + i); + redisReply* r = *(reply_obj_->element + i); if(r->type != REDIS_REPLY_STRING) { - logger.error() << cmd << ": Received non-array reply."; - invoke_error(REDOX_WRONG_TYPE); + logger_.error() << cmd_ << ": Received non-array reply."; + invokeError(REDOX_WRONG_TYPE); } v.emplace_back(r->str, r->len); } - invoke(v); + invokeSuccess(v); } } template<> -void Command>::invoke_callback() { +void Command>::handleCallback() { - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); - else if(reply_obj->type != REDIS_REPLY_ARRAY) { - logger.error() << cmd << ": Received non-array reply."; - invoke_error(REDOX_WRONG_TYPE); + else if(reply_obj_->type != REDIS_REPLY_ARRAY) { + logger_.error() << cmd_ << ": Received non-array reply."; + invokeError(REDOX_WRONG_TYPE); } else { - std::unordered_set v; - size_t count = reply_obj->elements; + unordered_set v; + size_t count = reply_obj_->elements; for(size_t i = 0; i < count; i++) { - redisReply* r = *(reply_obj->element + i); + redisReply* r = *(reply_obj_->element + i); if(r->type != REDIS_REPLY_STRING) { - logger.error() << cmd << ": Received non-array reply."; - invoke_error(REDOX_WRONG_TYPE); + logger_.error() << cmd_ << ": Received non-array reply."; + invokeError(REDOX_WRONG_TYPE); } v.emplace(r->str, r->len); } - invoke(v); + invokeSuccess(v); } } template<> -void Command>::invoke_callback() { +void Command>::handleCallback() { - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); - else if(reply_obj->type != REDIS_REPLY_ARRAY) { - logger.error() << cmd << ": Received non-array reply."; - invoke_error(REDOX_WRONG_TYPE); + else if(reply_obj_->type != REDIS_REPLY_ARRAY) { + logger_.error() << cmd_ << ": Received non-array reply."; + invokeError(REDOX_WRONG_TYPE); } else { - std::set v; - size_t count = reply_obj->elements; + set v; + size_t count = reply_obj_->elements; for(size_t i = 0; i < count; i++) { - redisReply* r = *(reply_obj->element + i); + redisReply* r = *(reply_obj_->element + i); if(r->type != REDIS_REPLY_STRING) { - logger.error() << cmd << ": Received non-array reply."; - invoke_error(REDOX_WRONG_TYPE); + logger_.error() << cmd_ << ": Received non-array reply."; + invokeError(REDOX_WRONG_TYPE); } v.emplace(r->str, r->len); } - invoke(v); + invokeSuccess(v); } } +// Explicit template instantiation for available types, so that the generated +// library contains them and we can keep the method definitions out of the +// header file. +template class Command; +template class Command; +template class Command; +template class Command; +template class Command; +template class Command; +template class Command>; +template class Command>; +template class Command>; + } // End namespace redox diff --git a/src/command.hpp b/src/command.hpp index 0bb75cc..a49a07e 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -4,7 +4,6 @@ #pragma once -#include #include #include #include @@ -30,184 +29,113 @@ class Redox; template class Command { -friend class Redox; - public: - Command( - Redox* rdx, - long id, - const std::string& cmd, - const std::function& callback, - const std::function& error_callback, - double repeat, double after, - bool free_memory, - log::Logger& logger - ); - - Redox* rdx; - - const long id; - const std::string cmd; - const double repeat; - const double after; - - const bool free_memory; - - redisReply* reply_obj = nullptr; - - std::atomic_int pending = {0}; - - void invoke(const ReplyT& reply); - void invoke_error(int status); - - const ReplyT& reply(); - int status() { return reply_status; }; - bool ok() { return reply_status == REDOX_OK; } - bool is_canceled() { return canceled; } - - void cancel() { canceled = true; } /** - * Called by the user to free the redisReply object, when the free_memory - * flag is set to false for a command. + * Frees memory allocated by this command. Commands with free_memory = false + * must be freed by the user. */ void free(); - void process_reply(redisReply* r); - - ev_timer* get_timer() { - std::lock_guard lg(timer_guard); - return &timer; - } - - static void free_command(Command* c); - -private: - - const std::function callback; - const std::function error_callback; - - // Place to store the reply value and status. - // ONLY for blocking commands - ReplyT reply_val; - int reply_status; - - std::atomic_bool canceled = {false}; - - ev_timer timer; - std::mutex timer_guard; - - // Make sure we don't free resources until details taken care of - std::mutex free_guard; - - void free_reply_object(); - - void invoke_callback(); - bool is_error_reply(); - bool is_nil_reply(); + /** + * Cancels a repeating or delayed command. + */ + void cancel() { canceled_ = true; } - log::Logger& logger; -}; + /** + * Returns true if the command has been canceled. + */ + bool canceled() { return canceled_; } -template -Command::Command( - Redox* rdx, - long id, - const std::string& cmd, - const std::function& callback, - const std::function& error_callback, - double repeat, double after, bool free_memory, log::Logger& logger -) : rdx(rdx), id(id), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), - callback(callback), error_callback(error_callback), logger(logger) -{ - timer_guard.lock(); -} + /** + * Returns the reply status of this command. + * Use ONLY with command_blocking. + */ + int status() { return reply_status_; }; -template -void Command::process_reply(redisReply* r) { + /** + * Returns true if this command got a successful reply. + * Use ONLY with command_blocking. + */ + bool ok() { return reply_status_ == REDOX_OK; } - free_guard.lock(); + /** + * Returns the reply value, if the reply was successful (ok() == true). + * Use ONLY with command_blocking. + */ + const ReplyT& reply(); - reply_obj = r; - invoke_callback(); + // Allow public access to constructed data + Redox* const rdx_; + const long id_; + const std::string cmd_; + const double repeat_; + const double after_; + const bool free_memory_; - pending--; +private: - // Allow free() method to free memory - if(!free_memory) { -// logger.trace() << "Command memory not being freed, free_memory = " << free_memory; - free_guard.unlock(); - return; - } + Command( + Redox* rdx, + long id, + const std::string& cmd, + const std::function& callback, + const std::function& error_callback, + double repeat, double after, + bool free_memory, + log::Logger& logger + ); - free_reply_object(); + // Handles a new reply from the server + void processReply(redisReply* r); - // Handle memory if all pending replies have arrived - if(pending == 0) { + // Invoke a user callback from the reply object. This method is specialized + // for each ReplyT of Command. + void handleCallback(); - // Just free non-repeating commands - if (repeat == 0) { - free_command(this); - return; + // Directly invoke the user callbacks if the exist + void invokeSuccess(const ReplyT& reply) { if (success_callback_) success_callback_(cmd_, reply); } + void invokeError(int status) { if (error_callback_) error_callback_(cmd_, status); } - // Free repeating commands if timer is stopped - } else { - if((long)(get_timer()->data) == 0) { - free_command(this); - return; - } - } - } + bool isErrorReply(); + bool isNilReply(); - free_guard.unlock(); -} + // Delete the provided Command object and deregister as an active + // command from its Redox instance. + static void freeCommand(Command* c); -template -void Command< - ReplyT>::invoke(const ReplyT& r) { - if(callback) callback(cmd, r); -} + // If needed, free the redisReply + void freeReply(); -template -void Command::invoke_error(int status) { - if(error_callback) error_callback(cmd, status); -} + // The last server reply + redisReply* reply_obj_ = nullptr; -template -void Command::free_reply_object() { + // Callbacks on success and error + const std::function success_callback_; + const std::function error_callback_; - if(reply_obj == nullptr) { - logger.error() << cmd << ": Attempting to double free reply object."; - return; - } + // Place to store the reply value and status. + // ONLY for blocking commands + ReplyT reply_val_; + int reply_status_; - freeReplyObject(reply_obj); - reply_obj = nullptr; -} + // How many messages sent to server but not received reply + std::atomic_int pending_ = {0}; -template -void Command::free_command(Command* c) { - c->rdx->template remove_active_command(c->id); -// logger.debug() << "Deleted Command " << c->id << " at " << c; - delete c; -} + // Whether a repeating or delayed command is canceled + std::atomic_bool canceled_ = {false}; -template -void Command::free() { + // libev timer watcher + ev_timer timer_; + std::mutex timer_guard_; - free_guard.lock(); - free_reply_object(); - free_guard.unlock(); + // Make sure we don't free resources until details taken care of + std::mutex free_guard_; - free_command(this); -} + // Passed on from Redox class + log::Logger& logger_; -template -const ReplyT& Command::reply() { - if(!ok()) { - logger.warning() << cmd << ": Accessing value of reply with status != OK."; - } - return reply_val; -} + friend class Redox; +}; } // End namespace redis diff --git a/src/redox.cpp b/src/redox.cpp index f7b9669..3d26f3a 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -240,7 +240,7 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { return; } - c->process_reply(reply_obj); + c->processReply(reply_obj); // Increment the Redox object command counter rdx->cmd_count++; @@ -253,35 +253,35 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { template bool Redox::submit_to_server(Command* c) { - Redox* rdx = c->rdx; - c->pending++; + Redox* rdx = c->rdx_; + c->pending_++; // Process binary data if trailing quotation. This is a limited implementation // to allow binary data between the first and the last quotes of the command string, // if the very last character of the command is a quote ('"'). - if(c->cmd[c->cmd.size()-1] == '"') { + if(c->cmd_[c->cmd_.size()-1] == '"') { // Indices of the quotes - size_t first = c->cmd.find('"'); - size_t last = c->cmd.size()-1; + size_t first = c->cmd_.find('"'); + size_t last = c->cmd_.size()-1; // Proceed only if the first and last quotes are different if(first != last) { - string format = c->cmd.substr(0, first) + "%b"; - string value = c->cmd.substr(first+1, last-first-1); - if (redisAsyncCommand(rdx->ctx, command_callback, (void*)c->id, format.c_str(), value.c_str(), value.size()) != REDIS_OK) { - rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << rdx->ctx->errstr; - c->invoke_error(REDOX_SEND_ERROR); + string format = c->cmd_.substr(0, first) + "%b"; + string value = c->cmd_.substr(first+1, last-first-1); + if (redisAsyncCommand(rdx->ctx, command_callback, (void*)c->id_, format.c_str(), value.c_str(), value.size()) != REDIS_OK) { + rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr; + c->invokeError(REDOX_SEND_ERROR); return false; } return true; } } - if (redisAsyncCommand(rdx->ctx, command_callback, (void*)c->id, c->cmd.c_str()) != REDIS_OK) { - rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << rdx->ctx->errstr; - c->invoke_error(REDOX_SEND_ERROR); + if (redisAsyncCommand(rdx->ctx, command_callback, (void*)c->id_, c->cmd_.c_str()) != REDIS_OK) { + rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr; + c->invokeError(REDOX_SEND_ERROR); return false; } @@ -301,17 +301,17 @@ void Redox::submit_command_callback(struct ev_loop* loop, ev_timer* timer, int r return; } - if(c->is_canceled()) { + if(c->canceled()) { // logger.info() << "Command " << c << " is completed, stopping event timer."; - c->timer_guard.lock(); - if((c->repeat != 0) || (c->after != 0)) - ev_timer_stop(loop, &c->timer); - c->timer_guard.unlock(); + c->timer_guard_.lock(); + if((c->repeat_ != 0) || (c->after_ != 0)) + ev_timer_stop(loop, &c->timer_); + c->timer_guard_.unlock(); // Mark for memory to be freed when all callbacks are received - c->timer.data = (void*)0; + c->timer_.data = (void*)(long)0; return; } @@ -325,16 +325,16 @@ bool Redox::process_queued_command(long id) { Command* c = find_command(id); if(c == nullptr) return false; - if((c->repeat == 0) && (c->after == 0)) { + if((c->repeat_ == 0) && (c->after_ == 0)) { submit_to_server(c); } else { - c->timer.data = (void*)c->id; - ev_timer_init(&c->timer, submit_command_callback, c->after, c->repeat); - ev_timer_start(evloop, &c->timer); + c->timer_.data = (void*)c->id_; + ev_timer_init(&c->timer_, submit_command_callback, c->after_, c->repeat_); + ev_timer_start(evloop, &c->timer_); - c->timer_guard.unlock(); + c->timer_guard_.unlock(); } return true; diff --git a/src/redox.hpp b/src/redox.hpp index fadabc7..3b99bf9 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -398,8 +398,8 @@ Command* Redox::command( std::lock_guard lg(queue_guard); std::lock_guard lg2(command_map_guard); - get_command_map()[c->id] = c; - command_queue.push(c->id); + get_command_map()[c->id_] = c; + command_queue.push(c->id_); // Signal the event loop to process this command ev_async_send(evloop, &async_w); @@ -438,8 +438,8 @@ Command* Redox::command_blocking(const std::string& cmd) { ); cv.wait(lk, [&status] { return status != REDOX_UNINIT; }); - c->reply_val = val; - c->reply_status = status; + c->reply_val_ = val; + c->reply_status_ = status; return c; }