Commit 375d845f529b58d37e1c587785efbdedbad57b2c
1 parent
3f65d70a
Conform Command class to code standards
* Member variables with trailing underscore * Methods camelCase * Explicit template instantiation, move definitions to .cpp * Reorder methods to make more sense * Limit public API, comment well
Showing
4 changed files
with
294 additions
and
257 deletions
src/command.cpp
| @@ -9,179 +9,288 @@ | @@ -9,179 +9,288 @@ | ||
| 9 | #include "command.hpp" | 9 | #include "command.hpp" |
| 10 | #include "redox.hpp" | 10 | #include "redox.hpp" |
| 11 | 11 | ||
| 12 | +using namespace std; | ||
| 13 | + | ||
| 12 | namespace redox { | 14 | namespace redox { |
| 13 | 15 | ||
| 14 | template<class ReplyT> | 16 | template<class ReplyT> |
| 15 | -bool Command<ReplyT>::is_error_reply() { | 17 | +Command<ReplyT>::Command( |
| 18 | + Redox* rdx, | ||
| 19 | + long id, | ||
| 20 | + const std::string& cmd, | ||
| 21 | + const std::function<void(const std::string&, const ReplyT&)>& callback, | ||
| 22 | + const std::function<void(const std::string&, int status)>& error_callback, | ||
| 23 | + double repeat, double after, bool free_memory, log::Logger& logger | ||
| 24 | +) : rdx_(rdx), id_(id), cmd_(cmd), repeat_(repeat), after_(after), free_memory_(free_memory), | ||
| 25 | + success_callback_(callback), error_callback_(error_callback), logger_(logger) { | ||
| 26 | + timer_guard_.lock(); | ||
| 27 | +} | ||
| 28 | + | ||
| 29 | +template<class ReplyT> | ||
| 30 | +void Command<ReplyT>::processReply(redisReply* r) { | ||
| 31 | + | ||
| 32 | + free_guard_.lock(); | ||
| 33 | + | ||
| 34 | + reply_obj_ = r; | ||
| 35 | + handleCallback(); | ||
| 36 | + | ||
| 37 | + pending_--; | ||
| 38 | + | ||
| 39 | + // Allow free() method to free memory | ||
| 40 | + if (!free_memory_) { | ||
| 41 | +// logger.trace() << "Command memory not being freed, free_memory = " << free_memory; | ||
| 42 | + free_guard_.unlock(); | ||
| 43 | + return; | ||
| 44 | + } | ||
| 45 | + | ||
| 46 | + freeReply(); | ||
| 47 | + | ||
| 48 | + // Handle memory if all pending replies have arrived | ||
| 49 | + if (pending_ == 0) { | ||
| 50 | + | ||
| 51 | + // Just free non-repeating commands | ||
| 52 | + if (repeat_ == 0) { | ||
| 53 | + freeCommand(this); | ||
| 54 | + return; | ||
| 55 | + | ||
| 56 | + // Free repeating commands if timer is stopped | ||
| 57 | + } else { | ||
| 58 | + if ((long)(timer_.data) == 0) { | ||
| 59 | + freeCommand(this); | ||
| 60 | + return; | ||
| 61 | + } | ||
| 62 | + } | ||
| 63 | + } | ||
| 64 | + | ||
| 65 | + free_guard_.unlock(); | ||
| 66 | +} | ||
| 67 | + | ||
| 68 | +template<class ReplyT> | ||
| 69 | +void Command<ReplyT>::free() { | ||
| 70 | + | ||
| 71 | + free_guard_.lock(); | ||
| 72 | + freeReply(); | ||
| 73 | + free_guard_.unlock(); | ||
| 16 | 74 | ||
| 17 | - if (reply_obj->type == REDIS_REPLY_ERROR) { | ||
| 18 | - logger.error() << cmd << ": " << reply_obj->str; | 75 | + freeCommand(this); |
| 76 | +} | ||
| 77 | + | ||
| 78 | +template<class ReplyT> | ||
| 79 | +void Command<ReplyT>::freeReply() { | ||
| 80 | + | ||
| 81 | + if (reply_obj_ == nullptr) { | ||
| 82 | + logger_.error() << cmd_ << ": Attempting to double free reply object."; | ||
| 83 | + return; | ||
| 84 | + } | ||
| 85 | + | ||
| 86 | + freeReplyObject(reply_obj_); | ||
| 87 | + reply_obj_ = nullptr; | ||
| 88 | +} | ||
| 89 | + | ||
| 90 | +template<class ReplyT> | ||
| 91 | +void Command<ReplyT>::freeCommand(Command<ReplyT>* c) { | ||
| 92 | + c->rdx_->template remove_active_command<ReplyT>(c->id_); | ||
| 93 | +// logger.debug() << "Deleted Command " << c->id << " at " << c; | ||
| 94 | + delete c; | ||
| 95 | +} | ||
| 96 | + | ||
| 97 | + | ||
| 98 | +template<class ReplyT> | ||
| 99 | +const ReplyT& Command<ReplyT>::reply() { | ||
| 100 | + if (!ok()) { | ||
| 101 | + logger_.warning() << cmd_ << ": Accessing value of reply with status != OK."; | ||
| 102 | + } | ||
| 103 | + return reply_val_; | ||
| 104 | +} | ||
| 105 | + | ||
| 106 | +template<class ReplyT> | ||
| 107 | +bool Command<ReplyT>::isErrorReply() { | ||
| 108 | + | ||
| 109 | + if (reply_obj_->type == REDIS_REPLY_ERROR) { | ||
| 110 | + logger_.error() << cmd_ << ": " << reply_obj_->str; | ||
| 19 | return true; | 111 | return true; |
| 20 | } | 112 | } |
| 21 | return false; | 113 | return false; |
| 22 | } | 114 | } |
| 23 | 115 | ||
| 24 | template<class ReplyT> | 116 | template<class ReplyT> |
| 25 | -bool Command<ReplyT>::is_nil_reply() { | 117 | +bool Command<ReplyT>::isNilReply() { |
| 26 | 118 | ||
| 27 | - if (reply_obj->type == REDIS_REPLY_NIL) { | ||
| 28 | - logger.warning() << cmd << ": Nil reply."; | 119 | + if (reply_obj_->type == REDIS_REPLY_NIL) { |
| 120 | + logger_.warning() << cmd_ << ": Nil reply."; | ||
| 29 | return true; | 121 | return true; |
| 30 | } | 122 | } |
| 31 | return false; | 123 | return false; |
| 32 | } | 124 | } |
| 33 | 125 | ||
| 126 | +// ---------------------------------------------------------------------------- | ||
| 127 | +// Specializations of handleCallback for all data types | ||
| 128 | +// ---------------------------------------------------------------------------- | ||
| 129 | + | ||
| 34 | template<> | 130 | template<> |
| 35 | -void Command<redisReply*>::invoke_callback() { | ||
| 36 | - invoke(reply_obj); | 131 | +void Command<redisReply*>::handleCallback() { |
| 132 | + invokeSuccess(reply_obj_); | ||
| 37 | } | 133 | } |
| 38 | 134 | ||
| 39 | template<> | 135 | template<> |
| 40 | -void Command<std::string>::invoke_callback() { | 136 | +void Command<string>::handleCallback() { |
| 41 | 137 | ||
| 42 | - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); | ||
| 43 | - else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY); | 138 | + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); |
| 139 | + else if(isNilReply()) invokeError(REDOX_NIL_REPLY); | ||
| 44 | 140 | ||
| 45 | - else if(reply_obj->type != REDIS_REPLY_STRING && reply_obj->type != REDIS_REPLY_STATUS) { | ||
| 46 | - logger.error() << cmd << ": Received non-string reply."; | ||
| 47 | - invoke_error(REDOX_WRONG_TYPE); | 141 | + else if(reply_obj_->type != REDIS_REPLY_STRING && reply_obj_->type != REDIS_REPLY_STATUS) { |
| 142 | + logger_.error() << cmd_ << ": Received non-string reply."; | ||
| 143 | + invokeError(REDOX_WRONG_TYPE); | ||
| 48 | 144 | ||
| 49 | } else { | 145 | } else { |
| 50 | - std::string s(reply_obj->str, reply_obj->len); | ||
| 51 | - invoke(s); | 146 | + string s(reply_obj_->str, reply_obj_->len); |
| 147 | + invokeSuccess(s); | ||
| 52 | } | 148 | } |
| 53 | } | 149 | } |
| 54 | 150 | ||
| 55 | template<> | 151 | template<> |
| 56 | -void Command<char*>::invoke_callback() { | 152 | +void Command<char*>::handleCallback() { |
| 57 | 153 | ||
| 58 | - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); | ||
| 59 | - else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY); | 154 | + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); |
| 155 | + else if(isNilReply()) invokeError(REDOX_NIL_REPLY); | ||
| 60 | 156 | ||
| 61 | - else if(reply_obj->type != REDIS_REPLY_STRING && reply_obj->type != REDIS_REPLY_STATUS) { | ||
| 62 | - logger.error() << cmd << ": Received non-string reply."; | ||
| 63 | - invoke_error(REDOX_WRONG_TYPE); | 157 | + else if(reply_obj_->type != REDIS_REPLY_STRING && reply_obj_->type != REDIS_REPLY_STATUS) { |
| 158 | + logger_.error() << cmd_ << ": Received non-string reply."; | ||
| 159 | + invokeError(REDOX_WRONG_TYPE); | ||
| 64 | 160 | ||
| 65 | } else { | 161 | } else { |
| 66 | - invoke(reply_obj->str); | 162 | + invokeSuccess(reply_obj_->str); |
| 67 | } | 163 | } |
| 68 | } | 164 | } |
| 69 | 165 | ||
| 70 | template<> | 166 | template<> |
| 71 | -void Command<int>::invoke_callback() { | 167 | +void Command<int>::handleCallback() { |
| 72 | 168 | ||
| 73 | - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); | ||
| 74 | - else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY); | 169 | + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); |
| 170 | + else if(isNilReply()) invokeError(REDOX_NIL_REPLY); | ||
| 75 | 171 | ||
| 76 | - else if(reply_obj->type != REDIS_REPLY_INTEGER) { | ||
| 77 | - logger.error() << cmd << ": Received non-integer reply."; | ||
| 78 | - invoke_error(REDOX_WRONG_TYPE); | 172 | + else if(reply_obj_->type != REDIS_REPLY_INTEGER) { |
| 173 | + logger_.error() << cmd_ << ": Received non-integer reply."; | ||
| 174 | + invokeError(REDOX_WRONG_TYPE); | ||
| 79 | 175 | ||
| 80 | } else { | 176 | } else { |
| 81 | - invoke((int) reply_obj->integer); | 177 | + invokeSuccess((int) reply_obj_->integer); |
| 82 | } | 178 | } |
| 83 | } | 179 | } |
| 84 | 180 | ||
| 85 | template<> | 181 | template<> |
| 86 | -void Command<long long int>::invoke_callback() { | 182 | +void Command<long long int>::handleCallback() { |
| 87 | 183 | ||
| 88 | - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); | ||
| 89 | - else if(is_nil_reply()) invoke_error(REDOX_NIL_REPLY); | 184 | + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); |
| 185 | + else if(isNilReply()) invokeError(REDOX_NIL_REPLY); | ||
| 90 | 186 | ||
| 91 | - else if(reply_obj->type != REDIS_REPLY_INTEGER) { | ||
| 92 | - logger.error() << cmd << ": Received non-integer reply."; | ||
| 93 | - invoke_error(REDOX_WRONG_TYPE); | 187 | + else if(reply_obj_->type != REDIS_REPLY_INTEGER) { |
| 188 | + logger_.error() << cmd_ << ": Received non-integer reply."; | ||
| 189 | + invokeError(REDOX_WRONG_TYPE); | ||
| 94 | 190 | ||
| 95 | } else { | 191 | } else { |
| 96 | - invoke(reply_obj->integer); | 192 | + invokeSuccess(reply_obj_->integer); |
| 97 | } | 193 | } |
| 98 | } | 194 | } |
| 99 | 195 | ||
| 100 | template<> | 196 | template<> |
| 101 | -void Command<std::nullptr_t>::invoke_callback() { | 197 | +void Command<nullptr_t>::handleCallback() { |
| 102 | 198 | ||
| 103 | - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); | 199 | + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); |
| 104 | 200 | ||
| 105 | - else if(reply_obj->type != REDIS_REPLY_NIL) { | ||
| 106 | - logger.error() << cmd << ": Received non-nil reply."; | ||
| 107 | - invoke_error(REDOX_WRONG_TYPE); | 201 | + else if(reply_obj_->type != REDIS_REPLY_NIL) { |
| 202 | + logger_.error() << cmd_ << ": Received non-nil reply."; | ||
| 203 | + invokeError(REDOX_WRONG_TYPE); | ||
| 108 | 204 | ||
| 109 | } else { | 205 | } else { |
| 110 | - invoke(nullptr); | 206 | + invokeSuccess(nullptr); |
| 111 | } | 207 | } |
| 112 | } | 208 | } |
| 113 | 209 | ||
| 114 | 210 | ||
| 115 | template<> | 211 | template<> |
| 116 | -void Command<std::vector<std::string>>::invoke_callback() { | 212 | +void Command<vector<string>>::handleCallback() { |
| 117 | 213 | ||
| 118 | - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); | 214 | + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); |
| 119 | 215 | ||
| 120 | - else if(reply_obj->type != REDIS_REPLY_ARRAY) { | ||
| 121 | - logger.error() << cmd << ": Received non-array reply."; | ||
| 122 | - invoke_error(REDOX_WRONG_TYPE); | 216 | + else if(reply_obj_->type != REDIS_REPLY_ARRAY) { |
| 217 | + logger_.error() << cmd_ << ": Received non-array reply."; | ||
| 218 | + invokeError(REDOX_WRONG_TYPE); | ||
| 123 | 219 | ||
| 124 | } else { | 220 | } else { |
| 125 | - std::vector<std::string> v; | ||
| 126 | - size_t count = reply_obj->elements; | 221 | + vector<string> v; |
| 222 | + size_t count = reply_obj_->elements; | ||
| 127 | for(size_t i = 0; i < count; i++) { | 223 | for(size_t i = 0; i < count; i++) { |
| 128 | - redisReply* r = *(reply_obj->element + i); | 224 | + redisReply* r = *(reply_obj_->element + i); |
| 129 | if(r->type != REDIS_REPLY_STRING) { | 225 | if(r->type != REDIS_REPLY_STRING) { |
| 130 | - logger.error() << cmd << ": Received non-array reply."; | ||
| 131 | - invoke_error(REDOX_WRONG_TYPE); | 226 | + logger_.error() << cmd_ << ": Received non-array reply."; |
| 227 | + invokeError(REDOX_WRONG_TYPE); | ||
| 132 | } | 228 | } |
| 133 | v.emplace_back(r->str, r->len); | 229 | v.emplace_back(r->str, r->len); |
| 134 | } | 230 | } |
| 135 | - invoke(v); | 231 | + invokeSuccess(v); |
| 136 | } | 232 | } |
| 137 | } | 233 | } |
| 138 | 234 | ||
| 139 | template<> | 235 | template<> |
| 140 | -void Command<std::unordered_set<std::string>>::invoke_callback() { | 236 | +void Command<unordered_set<string>>::handleCallback() { |
| 141 | 237 | ||
| 142 | - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); | 238 | + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); |
| 143 | 239 | ||
| 144 | - else if(reply_obj->type != REDIS_REPLY_ARRAY) { | ||
| 145 | - logger.error() << cmd << ": Received non-array reply."; | ||
| 146 | - invoke_error(REDOX_WRONG_TYPE); | 240 | + else if(reply_obj_->type != REDIS_REPLY_ARRAY) { |
| 241 | + logger_.error() << cmd_ << ": Received non-array reply."; | ||
| 242 | + invokeError(REDOX_WRONG_TYPE); | ||
| 147 | 243 | ||
| 148 | } else { | 244 | } else { |
| 149 | - std::unordered_set<std::string> v; | ||
| 150 | - size_t count = reply_obj->elements; | 245 | + unordered_set<string> v; |
| 246 | + size_t count = reply_obj_->elements; | ||
| 151 | for(size_t i = 0; i < count; i++) { | 247 | for(size_t i = 0; i < count; i++) { |
| 152 | - redisReply* r = *(reply_obj->element + i); | 248 | + redisReply* r = *(reply_obj_->element + i); |
| 153 | if(r->type != REDIS_REPLY_STRING) { | 249 | if(r->type != REDIS_REPLY_STRING) { |
| 154 | - logger.error() << cmd << ": Received non-array reply."; | ||
| 155 | - invoke_error(REDOX_WRONG_TYPE); | 250 | + logger_.error() << cmd_ << ": Received non-array reply."; |
| 251 | + invokeError(REDOX_WRONG_TYPE); | ||
| 156 | } | 252 | } |
| 157 | v.emplace(r->str, r->len); | 253 | v.emplace(r->str, r->len); |
| 158 | } | 254 | } |
| 159 | - invoke(v); | 255 | + invokeSuccess(v); |
| 160 | } | 256 | } |
| 161 | } | 257 | } |
| 162 | 258 | ||
| 163 | template<> | 259 | template<> |
| 164 | -void Command<std::set<std::string>>::invoke_callback() { | 260 | +void Command<set<string>>::handleCallback() { |
| 165 | 261 | ||
| 166 | - if(is_error_reply()) invoke_error(REDOX_ERROR_REPLY); | 262 | + if(isErrorReply()) invokeError(REDOX_ERROR_REPLY); |
| 167 | 263 | ||
| 168 | - else if(reply_obj->type != REDIS_REPLY_ARRAY) { | ||
| 169 | - logger.error() << cmd << ": Received non-array reply."; | ||
| 170 | - invoke_error(REDOX_WRONG_TYPE); | 264 | + else if(reply_obj_->type != REDIS_REPLY_ARRAY) { |
| 265 | + logger_.error() << cmd_ << ": Received non-array reply."; | ||
| 266 | + invokeError(REDOX_WRONG_TYPE); | ||
| 171 | 267 | ||
| 172 | } else { | 268 | } else { |
| 173 | - std::set<std::string> v; | ||
| 174 | - size_t count = reply_obj->elements; | 269 | + set<string> v; |
| 270 | + size_t count = reply_obj_->elements; | ||
| 175 | for(size_t i = 0; i < count; i++) { | 271 | for(size_t i = 0; i < count; i++) { |
| 176 | - redisReply* r = *(reply_obj->element + i); | 272 | + redisReply* r = *(reply_obj_->element + i); |
| 177 | if(r->type != REDIS_REPLY_STRING) { | 273 | if(r->type != REDIS_REPLY_STRING) { |
| 178 | - logger.error() << cmd << ": Received non-array reply."; | ||
| 179 | - invoke_error(REDOX_WRONG_TYPE); | 274 | + logger_.error() << cmd_ << ": Received non-array reply."; |
| 275 | + invokeError(REDOX_WRONG_TYPE); | ||
| 180 | } | 276 | } |
| 181 | v.emplace(r->str, r->len); | 277 | v.emplace(r->str, r->len); |
| 182 | } | 278 | } |
| 183 | - invoke(v); | 279 | + invokeSuccess(v); |
| 184 | } | 280 | } |
| 185 | } | 281 | } |
| 186 | 282 | ||
| 283 | +// Explicit template instantiation for available types, so that the generated | ||
| 284 | +// library contains them and we can keep the method definitions out of the | ||
| 285 | +// header file. | ||
| 286 | +template class Command<redisReply*>; | ||
| 287 | +template class Command<string>; | ||
| 288 | +template class Command<char*>; | ||
| 289 | +template class Command<int>; | ||
| 290 | +template class Command<long long int>; | ||
| 291 | +template class Command<nullptr_t>; | ||
| 292 | +template class Command<vector<string>>; | ||
| 293 | +template class Command<set<string>>; | ||
| 294 | +template class Command<unordered_set<string>>; | ||
| 295 | + | ||
| 187 | } // End namespace redox | 296 | } // End namespace redox |
src/command.hpp
| @@ -4,7 +4,6 @@ | @@ -4,7 +4,6 @@ | ||
| 4 | 4 | ||
| 5 | #pragma once | 5 | #pragma once |
| 6 | 6 | ||
| 7 | -#include <iostream> | ||
| 8 | #include <string> | 7 | #include <string> |
| 9 | #include <functional> | 8 | #include <functional> |
| 10 | #include <atomic> | 9 | #include <atomic> |
| @@ -30,184 +29,113 @@ class Redox; | @@ -30,184 +29,113 @@ class Redox; | ||
| 30 | template<class ReplyT> | 29 | template<class ReplyT> |
| 31 | class Command { | 30 | class Command { |
| 32 | 31 | ||
| 33 | -friend class Redox; | ||
| 34 | - | ||
| 35 | public: | 32 | public: |
| 36 | - Command( | ||
| 37 | - Redox* rdx, | ||
| 38 | - long id, | ||
| 39 | - const std::string& cmd, | ||
| 40 | - const std::function<void(const std::string&, const ReplyT&)>& callback, | ||
| 41 | - const std::function<void(const std::string&, int status)>& error_callback, | ||
| 42 | - double repeat, double after, | ||
| 43 | - bool free_memory, | ||
| 44 | - log::Logger& logger | ||
| 45 | - ); | ||
| 46 | - | ||
| 47 | - Redox* rdx; | ||
| 48 | - | ||
| 49 | - const long id; | ||
| 50 | - const std::string cmd; | ||
| 51 | - const double repeat; | ||
| 52 | - const double after; | ||
| 53 | - | ||
| 54 | - const bool free_memory; | ||
| 55 | - | ||
| 56 | - redisReply* reply_obj = nullptr; | ||
| 57 | - | ||
| 58 | - std::atomic_int pending = {0}; | ||
| 59 | - | ||
| 60 | - void invoke(const ReplyT& reply); | ||
| 61 | - void invoke_error(int status); | ||
| 62 | - | ||
| 63 | - const ReplyT& reply(); | ||
| 64 | - int status() { return reply_status; }; | ||
| 65 | - bool ok() { return reply_status == REDOX_OK; } | ||
| 66 | - bool is_canceled() { return canceled; } | ||
| 67 | - | ||
| 68 | - void cancel() { canceled = true; } | ||
| 69 | 33 | ||
| 70 | /** | 34 | /** |
| 71 | - * Called by the user to free the redisReply object, when the free_memory | ||
| 72 | - * flag is set to false for a command. | 35 | + * Frees memory allocated by this command. Commands with free_memory = false |
| 36 | + * must be freed by the user. | ||
| 73 | */ | 37 | */ |
| 74 | void free(); | 38 | void free(); |
| 75 | 39 | ||
| 76 | - void process_reply(redisReply* r); | ||
| 77 | - | ||
| 78 | - ev_timer* get_timer() { | ||
| 79 | - std::lock_guard<std::mutex> lg(timer_guard); | ||
| 80 | - return &timer; | ||
| 81 | - } | ||
| 82 | - | ||
| 83 | - static void free_command(Command<ReplyT>* c); | ||
| 84 | - | ||
| 85 | -private: | ||
| 86 | - | ||
| 87 | - const std::function<void(const std::string&, const ReplyT&)> callback; | ||
| 88 | - const std::function<void(const std::string&, int status)> error_callback; | ||
| 89 | - | ||
| 90 | - // Place to store the reply value and status. | ||
| 91 | - // ONLY for blocking commands | ||
| 92 | - ReplyT reply_val; | ||
| 93 | - int reply_status; | ||
| 94 | - | ||
| 95 | - std::atomic_bool canceled = {false}; | ||
| 96 | - | ||
| 97 | - ev_timer timer; | ||
| 98 | - std::mutex timer_guard; | ||
| 99 | - | ||
| 100 | - // Make sure we don't free resources until details taken care of | ||
| 101 | - std::mutex free_guard; | ||
| 102 | - | ||
| 103 | - void free_reply_object(); | ||
| 104 | - | ||
| 105 | - void invoke_callback(); | ||
| 106 | - bool is_error_reply(); | ||
| 107 | - bool is_nil_reply(); | 40 | + /** |
| 41 | + * Cancels a repeating or delayed command. | ||
| 42 | + */ | ||
| 43 | + void cancel() { canceled_ = true; } | ||
| 108 | 44 | ||
| 109 | - log::Logger& logger; | ||
| 110 | -}; | 45 | + /** |
| 46 | + * Returns true if the command has been canceled. | ||
| 47 | + */ | ||
| 48 | + bool canceled() { return canceled_; } | ||
| 111 | 49 | ||
| 112 | -template<class ReplyT> | ||
| 113 | -Command<ReplyT>::Command( | ||
| 114 | - Redox* rdx, | ||
| 115 | - long id, | ||
| 116 | - const std::string& cmd, | ||
| 117 | - const std::function<void(const std::string&, const ReplyT&)>& callback, | ||
| 118 | - const std::function<void(const std::string&, int status)>& error_callback, | ||
| 119 | - double repeat, double after, bool free_memory, log::Logger& logger | ||
| 120 | -) : rdx(rdx), id(id), cmd(cmd), repeat(repeat), after(after), free_memory(free_memory), | ||
| 121 | - callback(callback), error_callback(error_callback), logger(logger) | ||
| 122 | -{ | ||
| 123 | - timer_guard.lock(); | ||
| 124 | -} | 50 | + /** |
| 51 | + * Returns the reply status of this command. | ||
| 52 | + * Use ONLY with command_blocking. | ||
| 53 | + */ | ||
| 54 | + int status() { return reply_status_; }; | ||
| 125 | 55 | ||
| 126 | -template<class ReplyT> | ||
| 127 | -void Command<ReplyT>::process_reply(redisReply* r) { | 56 | + /** |
| 57 | + * Returns true if this command got a successful reply. | ||
| 58 | + * Use ONLY with command_blocking. | ||
| 59 | + */ | ||
| 60 | + bool ok() { return reply_status_ == REDOX_OK; } | ||
| 128 | 61 | ||
| 129 | - free_guard.lock(); | 62 | + /** |
| 63 | + * Returns the reply value, if the reply was successful (ok() == true). | ||
| 64 | + * Use ONLY with command_blocking. | ||
| 65 | + */ | ||
| 66 | + const ReplyT& reply(); | ||
| 130 | 67 | ||
| 131 | - reply_obj = r; | ||
| 132 | - invoke_callback(); | 68 | + // Allow public access to constructed data |
| 69 | + Redox* const rdx_; | ||
| 70 | + const long id_; | ||
| 71 | + const std::string cmd_; | ||
| 72 | + const double repeat_; | ||
| 73 | + const double after_; | ||
| 74 | + const bool free_memory_; | ||
| 133 | 75 | ||
| 134 | - pending--; | 76 | +private: |
| 135 | 77 | ||
| 136 | - // Allow free() method to free memory | ||
| 137 | - if(!free_memory) { | ||
| 138 | -// logger.trace() << "Command memory not being freed, free_memory = " << free_memory; | ||
| 139 | - free_guard.unlock(); | ||
| 140 | - return; | ||
| 141 | - } | 78 | + Command( |
| 79 | + Redox* rdx, | ||
| 80 | + long id, | ||
| 81 | + const std::string& cmd, | ||
| 82 | + const std::function<void(const std::string&, const ReplyT&)>& callback, | ||
| 83 | + const std::function<void(const std::string&, int status)>& error_callback, | ||
| 84 | + double repeat, double after, | ||
| 85 | + bool free_memory, | ||
| 86 | + log::Logger& logger | ||
| 87 | + ); | ||
| 142 | 88 | ||
| 143 | - free_reply_object(); | 89 | + // Handles a new reply from the server |
| 90 | + void processReply(redisReply* r); | ||
| 144 | 91 | ||
| 145 | - // Handle memory if all pending replies have arrived | ||
| 146 | - if(pending == 0) { | 92 | + // Invoke a user callback from the reply object. This method is specialized |
| 93 | + // for each ReplyT of Command. | ||
| 94 | + void handleCallback(); | ||
| 147 | 95 | ||
| 148 | - // Just free non-repeating commands | ||
| 149 | - if (repeat == 0) { | ||
| 150 | - free_command(this); | ||
| 151 | - return; | 96 | + // Directly invoke the user callbacks if the exist |
| 97 | + void invokeSuccess(const ReplyT& reply) { if (success_callback_) success_callback_(cmd_, reply); } | ||
| 98 | + void invokeError(int status) { if (error_callback_) error_callback_(cmd_, status); } | ||
| 152 | 99 | ||
| 153 | - // Free repeating commands if timer is stopped | ||
| 154 | - } else { | ||
| 155 | - if((long)(get_timer()->data) == 0) { | ||
| 156 | - free_command(this); | ||
| 157 | - return; | ||
| 158 | - } | ||
| 159 | - } | ||
| 160 | - } | 100 | + bool isErrorReply(); |
| 101 | + bool isNilReply(); | ||
| 161 | 102 | ||
| 162 | - free_guard.unlock(); | ||
| 163 | -} | 103 | + // Delete the provided Command object and deregister as an active |
| 104 | + // command from its Redox instance. | ||
| 105 | + static void freeCommand(Command<ReplyT>* c); | ||
| 164 | 106 | ||
| 165 | -template<class ReplyT> | ||
| 166 | -void Command< | ||
| 167 | - ReplyT>::invoke(const ReplyT& r) { | ||
| 168 | - if(callback) callback(cmd, r); | ||
| 169 | -} | 107 | + // If needed, free the redisReply |
| 108 | + void freeReply(); | ||
| 170 | 109 | ||
| 171 | -template<class ReplyT> | ||
| 172 | -void Command<ReplyT>::invoke_error(int status) { | ||
| 173 | - if(error_callback) error_callback(cmd, status); | ||
| 174 | -} | 110 | + // The last server reply |
| 111 | + redisReply* reply_obj_ = nullptr; | ||
| 175 | 112 | ||
| 176 | -template<class ReplyT> | ||
| 177 | -void Command<ReplyT>::free_reply_object() { | 113 | + // Callbacks on success and error |
| 114 | + const std::function<void(const std::string&, const ReplyT&)> success_callback_; | ||
| 115 | + const std::function<void(const std::string&, int status)> error_callback_; | ||
| 178 | 116 | ||
| 179 | - if(reply_obj == nullptr) { | ||
| 180 | - logger.error() << cmd << ": Attempting to double free reply object."; | ||
| 181 | - return; | ||
| 182 | - } | 117 | + // Place to store the reply value and status. |
| 118 | + // ONLY for blocking commands | ||
| 119 | + ReplyT reply_val_; | ||
| 120 | + int reply_status_; | ||
| 183 | 121 | ||
| 184 | - freeReplyObject(reply_obj); | ||
| 185 | - reply_obj = nullptr; | ||
| 186 | -} | 122 | + // How many messages sent to server but not received reply |
| 123 | + std::atomic_int pending_ = {0}; | ||
| 187 | 124 | ||
| 188 | -template<class ReplyT> | ||
| 189 | -void Command<ReplyT>::free_command(Command<ReplyT>* c) { | ||
| 190 | - c->rdx->template remove_active_command<ReplyT>(c->id); | ||
| 191 | -// logger.debug() << "Deleted Command " << c->id << " at " << c; | ||
| 192 | - delete c; | ||
| 193 | -} | 125 | + // Whether a repeating or delayed command is canceled |
| 126 | + std::atomic_bool canceled_ = {false}; | ||
| 194 | 127 | ||
| 195 | -template<class ReplyT> | ||
| 196 | -void Command<ReplyT>::free() { | 128 | + // libev timer watcher |
| 129 | + ev_timer timer_; | ||
| 130 | + std::mutex timer_guard_; | ||
| 197 | 131 | ||
| 198 | - free_guard.lock(); | ||
| 199 | - free_reply_object(); | ||
| 200 | - free_guard.unlock(); | 132 | + // Make sure we don't free resources until details taken care of |
| 133 | + std::mutex free_guard_; | ||
| 201 | 134 | ||
| 202 | - free_command(this); | ||
| 203 | -} | 135 | + // Passed on from Redox class |
| 136 | + log::Logger& logger_; | ||
| 204 | 137 | ||
| 205 | -template<class ReplyT> | ||
| 206 | -const ReplyT& Command<ReplyT>::reply() { | ||
| 207 | - if(!ok()) { | ||
| 208 | - logger.warning() << cmd << ": Accessing value of reply with status != OK."; | ||
| 209 | - } | ||
| 210 | - return reply_val; | ||
| 211 | -} | 138 | + friend class Redox; |
| 139 | +}; | ||
| 212 | 140 | ||
| 213 | } // End namespace redis | 141 | } // End namespace redis |
src/redox.cpp
| @@ -240,7 +240,7 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { | @@ -240,7 +240,7 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { | ||
| 240 | return; | 240 | return; |
| 241 | } | 241 | } |
| 242 | 242 | ||
| 243 | - c->process_reply(reply_obj); | 243 | + c->processReply(reply_obj); |
| 244 | 244 | ||
| 245 | // Increment the Redox object command counter | 245 | // Increment the Redox object command counter |
| 246 | rdx->cmd_count++; | 246 | rdx->cmd_count++; |
| @@ -253,35 +253,35 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { | @@ -253,35 +253,35 @@ void Redox::command_callback(redisAsyncContext *ctx, void *r, void *privdata) { | ||
| 253 | template<class ReplyT> | 253 | template<class ReplyT> |
| 254 | bool Redox::submit_to_server(Command<ReplyT>* c) { | 254 | bool Redox::submit_to_server(Command<ReplyT>* c) { |
| 255 | 255 | ||
| 256 | - Redox* rdx = c->rdx; | ||
| 257 | - c->pending++; | 256 | + Redox* rdx = c->rdx_; |
| 257 | + c->pending_++; | ||
| 258 | 258 | ||
| 259 | // Process binary data if trailing quotation. This is a limited implementation | 259 | // Process binary data if trailing quotation. This is a limited implementation |
| 260 | // to allow binary data between the first and the last quotes of the command string, | 260 | // to allow binary data between the first and the last quotes of the command string, |
| 261 | // if the very last character of the command is a quote ('"'). | 261 | // if the very last character of the command is a quote ('"'). |
| 262 | - if(c->cmd[c->cmd.size()-1] == '"') { | 262 | + if(c->cmd_[c->cmd_.size()-1] == '"') { |
| 263 | 263 | ||
| 264 | // Indices of the quotes | 264 | // Indices of the quotes |
| 265 | - size_t first = c->cmd.find('"'); | ||
| 266 | - size_t last = c->cmd.size()-1; | 265 | + size_t first = c->cmd_.find('"'); |
| 266 | + size_t last = c->cmd_.size()-1; | ||
| 267 | 267 | ||
| 268 | // Proceed only if the first and last quotes are different | 268 | // Proceed only if the first and last quotes are different |
| 269 | if(first != last) { | 269 | if(first != last) { |
| 270 | 270 | ||
| 271 | - string format = c->cmd.substr(0, first) + "%b"; | ||
| 272 | - string value = c->cmd.substr(first+1, last-first-1); | ||
| 273 | - if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id, format.c_str(), value.c_str(), value.size()) != REDIS_OK) { | ||
| 274 | - rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << rdx->ctx->errstr; | ||
| 275 | - c->invoke_error(REDOX_SEND_ERROR); | 271 | + string format = c->cmd_.substr(0, first) + "%b"; |
| 272 | + string value = c->cmd_.substr(first+1, last-first-1); | ||
| 273 | + if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id_, format.c_str(), value.c_str(), value.size()) != REDIS_OK) { | ||
| 274 | + rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr; | ||
| 275 | + c->invokeError(REDOX_SEND_ERROR); | ||
| 276 | return false; | 276 | return false; |
| 277 | } | 277 | } |
| 278 | return true; | 278 | return true; |
| 279 | } | 279 | } |
| 280 | } | 280 | } |
| 281 | 281 | ||
| 282 | - if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id, c->cmd.c_str()) != REDIS_OK) { | ||
| 283 | - rdx->logger.error() << "Could not send \"" << c->cmd << "\": " << rdx->ctx->errstr; | ||
| 284 | - c->invoke_error(REDOX_SEND_ERROR); | 282 | + if (redisAsyncCommand(rdx->ctx, command_callback<ReplyT>, (void*)c->id_, c->cmd_.c_str()) != REDIS_OK) { |
| 283 | + rdx->logger.error() << "Could not send \"" << c->cmd_ << "\": " << rdx->ctx->errstr; | ||
| 284 | + c->invokeError(REDOX_SEND_ERROR); | ||
| 285 | return false; | 285 | return false; |
| 286 | } | 286 | } |
| 287 | 287 | ||
| @@ -301,17 +301,17 @@ void Redox::submit_command_callback(struct ev_loop* loop, ev_timer* timer, int r | @@ -301,17 +301,17 @@ void Redox::submit_command_callback(struct ev_loop* loop, ev_timer* timer, int r | ||
| 301 | return; | 301 | return; |
| 302 | } | 302 | } |
| 303 | 303 | ||
| 304 | - if(c->is_canceled()) { | 304 | + if(c->canceled()) { |
| 305 | 305 | ||
| 306 | // logger.info() << "Command " << c << " is completed, stopping event timer."; | 306 | // logger.info() << "Command " << c << " is completed, stopping event timer."; |
| 307 | 307 | ||
| 308 | - c->timer_guard.lock(); | ||
| 309 | - if((c->repeat != 0) || (c->after != 0)) | ||
| 310 | - ev_timer_stop(loop, &c->timer); | ||
| 311 | - c->timer_guard.unlock(); | 308 | + c->timer_guard_.lock(); |
| 309 | + if((c->repeat_ != 0) || (c->after_ != 0)) | ||
| 310 | + ev_timer_stop(loop, &c->timer_); | ||
| 311 | + c->timer_guard_.unlock(); | ||
| 312 | 312 | ||
| 313 | // Mark for memory to be freed when all callbacks are received | 313 | // Mark for memory to be freed when all callbacks are received |
| 314 | - c->timer.data = (void*)0; | 314 | + c->timer_.data = (void*)(long)0; |
| 315 | 315 | ||
| 316 | return; | 316 | return; |
| 317 | } | 317 | } |
| @@ -325,16 +325,16 @@ bool Redox::process_queued_command(long id) { | @@ -325,16 +325,16 @@ bool Redox::process_queued_command(long id) { | ||
| 325 | Command<ReplyT>* c = find_command<ReplyT>(id); | 325 | Command<ReplyT>* c = find_command<ReplyT>(id); |
| 326 | if(c == nullptr) return false; | 326 | if(c == nullptr) return false; |
| 327 | 327 | ||
| 328 | - if((c->repeat == 0) && (c->after == 0)) { | 328 | + if((c->repeat_ == 0) && (c->after_ == 0)) { |
| 329 | submit_to_server<ReplyT>(c); | 329 | submit_to_server<ReplyT>(c); |
| 330 | 330 | ||
| 331 | } else { | 331 | } else { |
| 332 | 332 | ||
| 333 | - c->timer.data = (void*)c->id; | ||
| 334 | - ev_timer_init(&c->timer, submit_command_callback<ReplyT>, c->after, c->repeat); | ||
| 335 | - ev_timer_start(evloop, &c->timer); | 333 | + c->timer_.data = (void*)c->id_; |
| 334 | + ev_timer_init(&c->timer_, submit_command_callback<ReplyT>, c->after_, c->repeat_); | ||
| 335 | + ev_timer_start(evloop, &c->timer_); | ||
| 336 | 336 | ||
| 337 | - c->timer_guard.unlock(); | 337 | + c->timer_guard_.unlock(); |
| 338 | } | 338 | } |
| 339 | 339 | ||
| 340 | return true; | 340 | return true; |
src/redox.hpp
| @@ -398,8 +398,8 @@ Command<ReplyT>* Redox::command( | @@ -398,8 +398,8 @@ Command<ReplyT>* Redox::command( | ||
| 398 | std::lock_guard<std::mutex> lg(queue_guard); | 398 | std::lock_guard<std::mutex> lg(queue_guard); |
| 399 | std::lock_guard<std::mutex> lg2(command_map_guard); | 399 | std::lock_guard<std::mutex> lg2(command_map_guard); |
| 400 | 400 | ||
| 401 | - get_command_map<ReplyT>()[c->id] = c; | ||
| 402 | - command_queue.push(c->id); | 401 | + get_command_map<ReplyT>()[c->id_] = c; |
| 402 | + command_queue.push(c->id_); | ||
| 403 | 403 | ||
| 404 | // Signal the event loop to process this command | 404 | // Signal the event loop to process this command |
| 405 | ev_async_send(evloop, &async_w); | 405 | ev_async_send(evloop, &async_w); |
| @@ -438,8 +438,8 @@ Command<ReplyT>* Redox::command_blocking(const std::string& cmd) { | @@ -438,8 +438,8 @@ Command<ReplyT>* Redox::command_blocking(const std::string& cmd) { | ||
| 438 | ); | 438 | ); |
| 439 | 439 | ||
| 440 | cv.wait(lk, [&status] { return status != REDOX_UNINIT; }); | 440 | cv.wait(lk, [&status] { return status != REDOX_UNINIT; }); |
| 441 | - c->reply_val = val; | ||
| 442 | - c->reply_status = status; | 441 | + c->reply_val_ = val; |
| 442 | + c->reply_status_ = status; | ||
| 443 | 443 | ||
| 444 | return c; | 444 | return c; |
| 445 | } | 445 | } |