diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..89a2a4a --- /dev/null +++ b/.clang-format @@ -0,0 +1,65 @@ +--- +Language: Cpp +# BasedOnStyle: LLVM +AccessModifierOffset: -2 +AlignAfterOpenBracket: true +AlignEscapedNewlinesLeft: false +AlignOperands: true +AlignTrailingComments: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortBlocksOnASingleLine: false +AllowShortCaseLabelsOnASingleLine: false +AllowShortIfStatementsOnASingleLine: false +AllowShortLoopsOnASingleLine: false +AllowShortFunctionsOnASingleLine: All +AlwaysBreakAfterDefinitionReturnType: false +AlwaysBreakTemplateDeclarations: false +AlwaysBreakBeforeMultilineStrings: false +BreakBeforeBinaryOperators: None +BreakBeforeTernaryOperators: true +BreakConstructorInitializersBeforeComma: false +BinPackParameters: true +BinPackArguments: true +ColumnLimit: 100 +ConstructorInitializerAllOnOneLineOrOnePerLine: false +ConstructorInitializerIndentWidth: 4 +DerivePointerAlignment: false +ExperimentalAutoDetectBinPacking: false +IndentCaseLabels: false +IndentWrappedFunctionNames: false +IndentFunctionDeclarationAfterType: false +MaxEmptyLinesToKeep: 1 +KeepEmptyLinesAtTheStartOfBlocks: true +NamespaceIndentation: None +ObjCBlockIndentWidth: 2 +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: true +PenaltyBreakBeforeFirstCallParameter: 19 +PenaltyBreakComment: 300 +PenaltyBreakString: 1000 +PenaltyBreakFirstLessLess: 120 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 60 +PointerAlignment: Right +SpacesBeforeTrailingComments: 1 +Cpp11BracedListStyle: true +Standard: Cpp11 +IndentWidth: 2 +TabWidth: 8 +UseTab: Never +BreakBeforeBraces: Attach +SpacesInParentheses: false +SpacesInSquareBrackets: false +SpacesInAngles: false +SpaceInEmptyParentheses: false +SpacesInCStyleCastParentheses: false +SpaceAfterCStyleCast: false +SpacesInContainerLiterals: true +SpaceBeforeAssignmentOperators: true +ContinuationIndentWidth: 4 +CommentPragmas: '^ IWYU pragma:' +ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ] +SpaceBeforeParens: ControlStatements +DisableFormat: false +... + diff --git a/include/redox/client.hpp b/include/redox/client.hpp index 713d980..05c6969 100644 --- a/include/redox/client.hpp +++ b/include/redox/client.hpp @@ -54,14 +54,13 @@ static const std::string REDIS_DEFAULT_PATH = "/var/run/redis/redis.sock"; class Redox { public: - // Connection states static const int NOT_YET_CONNECTED = 0; // Starting state - static const int CONNECTED = 1; // Successfully connected - static const int DISCONNECTED = 2; // Successfully disconnected - static const int CONNECT_ERROR = 3; // Error connecting - static const int DISCONNECT_ERROR = 4; // Disconnected on error - static const int INIT_ERROR = 5; // Failed to init data structures + static const int CONNECTED = 1; // Successfully connected + static const int DISCONNECTED = 2; // Successfully disconnected + static const int CONNECT_ERROR = 3; // Error connecting + static const int DISCONNECT_ERROR = 4; // Disconnected on error + static const int INIT_ERROR = 5; // Failed to init data structures // ------------------------------------------------ // Core public API @@ -70,10 +69,7 @@ public: /** * Constructor. Optionally specify a log stream and a log level. */ - Redox( - std::ostream& log_stream = std::cout, - log::Level log_level = log::Warning - ); + Redox(std::ostream &log_stream = std::cout, log::Level log_level = log::Warning); /** * Disconnects from the Redis server, shuts down the event loop, and cleans up. @@ -89,7 +85,7 @@ public: * is off. * * Implementation note: When enabled, the event thread calls libev's ev_run in a - * loop with the EVRUN_NOWAIT flag. + * loop with the EVRUN_NOWAIT flag. */ void noWait(bool state); @@ -97,18 +93,15 @@ public: * Connects to Redis over TCP and starts an event loop in a separate thread. Returns * true once everything is ready, or false on failure. */ - bool connect( - const std::string& host = REDIS_DEFAULT_HOST, - const int port = REDIS_DEFAULT_PORT, - std::function connection_callback = nullptr); + bool connect(const std::string &host = REDIS_DEFAULT_HOST, const int port = REDIS_DEFAULT_PORT, + std::function connection_callback = nullptr); /** * Connects to Redis over a unix socket and starts an event loop in a separate * thread. Returns true once everything is ready, or false on failure. */ - bool connectUnix( - const std::string& path = REDIS_DEFAULT_PATH, - std::function connection_callback = nullptr); + bool connectUnix(const std::string &path = REDIS_DEFAULT_PATH, + std::function connection_callback = nullptr); /** * Disconnect from Redis, shut down the event loop, then return. A simple @@ -134,16 +127,14 @@ public: * memory for it is automatically freed when the callback returns. */ - template - void command( - const std::vector& cmd, - const std::function&)>& callback = nullptr - ); + template + void command(const std::vector &cmd, + const std::function &)> &callback = nullptr); /** * Asynchronously runs a command and ignores any errors or replies. */ - void command(const std::vector& cmd); + void command(const std::vector &cmd); /** * Synchronously runs a command, returning the Command object only once @@ -151,14 +142,13 @@ public: * calling .free() on the returned Command object. */ - template - Command& commandSync(const std::vector& cmd); + template Command &commandSync(const std::vector &cmd); /** * Synchronously runs a command, returning only once a reply is received * or there's an error. Returns true on successful reply, false on error. */ - bool commandSync(const std::vector& cmd); + bool commandSync(const std::vector &cmd); /** * Creates an asynchronous command that is run every [repeat] seconds, @@ -167,13 +157,10 @@ public: * on the returned Command object. */ - template - Command& commandLoop( - const std::vector& cmd, - const std::function&)>& callback, - double repeat, - double after = 0.0 - ); + template + Command &commandLoop(const std::vector &cmd, + const std::function &)> &callback, + double repeat, double after = 0.0); /** * Creates an asynchronous command that is run once after a given @@ -182,12 +169,9 @@ public: * after the callback returns. */ - template - void commandDelayed( - const std::vector& cmd, - const std::function&)>& callback, - double after - ); + template + void commandDelayed(const std::vector &cmd, + const std::function &)> &callback, double after); // ------------------------------------------------ // Utility methods @@ -197,13 +181,13 @@ public: * Given a vector of strings, returns a string of the concatenated elements, separated * by the delimiter. Useful for printing out a command string from a vector. */ - static std::string vecToStr(const std::vector& vec, const char delimiter = ' '); + static std::string vecToStr(const std::vector &vec, const char delimiter = ' '); /** * Given a command string, returns a vector of strings by splitting the input by * the delimiter. Useful for turning a string input into a command. */ - static std::vector strToVec(const std::string& s, const char delimiter = ' '); + static std::vector strToVec(const std::string &s, const char delimiter = ' '); // ------------------------------------------------ // Command wrapper methods for convenience only @@ -213,32 +197,32 @@ public: * Redis GET command wrapper - return the value for the given key, or throw * an exception if there is an error. Blocking call. */ - std::string get(const std::string& key); + std::string get(const std::string &key); /** * Redis SET command wrapper - set the value for the given key. Return * true if succeeded, false if error. Blocking call. */ - bool set(const std::string& key, const std::string& value); + bool set(const std::string &key, const std::string &value); /** * Redis DEL command wrapper - delete the given key. Return true if succeeded, * false if error. Blocking call. */ - bool del(const std::string& key); + bool del(const std::string &key); /** * Redis PUBLISH command wrapper - publish the given message to all subscribers. * Non-blocking call. */ - void publish(const std::string& topic, const std::string& msg); + void publish(const std::string &topic, const std::string &msg); // ------------------------------------------------ // Public members // ------------------------------------------------ // Hiredis context, left public to allow low-level access - redisAsyncContext * ctx_; + redisAsyncContext *ctx_; // TODO make these private // Redox server over TCP @@ -252,7 +236,6 @@ public: log::Logger logger_; private: - // ------------------------------------------------ // Private methods // ------------------------------------------------ @@ -262,14 +245,10 @@ private: // One stop shop for creating commands. The base of all public // methods that run commands. - template - Command& createCommand( - const std::vector& cmd, - const std::function&)>& callback = nullptr, - double repeat = 0.0, - double after = 0.0, - bool free_memory = true - ); + template + Command &createCommand(const std::vector &cmd, + const std::function &)> &callback = nullptr, + double repeat = 0.0, double after = 0.0, bool free_memory = true); // Setup code for the constructors // Return true on success, false on failure @@ -277,54 +256,48 @@ private: bool initHiredis(); // Callbacks invoked on server connection/disconnection - static void connectedCallback(const redisAsyncContext* c, int status); - static void disconnectedCallback(const redisAsyncContext* c, int status); + static void connectedCallback(const redisAsyncContext *c, int status); + static void disconnectedCallback(const redisAsyncContext *c, int status); // Main event loop, run in a separate thread void runEventLoop(); // Return the command map corresponding to the templated reply type - template - std::unordered_map*>& getCommandMap(); + template std::unordered_map *> &getCommandMap(); // Return the given Command from the relevant command map, or nullptr if not there - template - Command* findCommand(long id); + template Command *findCommand(long id); // Send all commands in the command queue to the server - static void processQueuedCommands(struct ev_loop* loop, ev_async* async, int revents); + static void processQueuedCommands(struct ev_loop *loop, ev_async *async, int revents); // Process the command with the given ID. Return true if the command had the // templated type, and false if it was not in the command map of that type. - template - bool processQueuedCommand(long id); + template bool processQueuedCommand(long id); // Callback given to libev for a Command's timer watcher, to be processed in // a deferred or looping state - template - static void submitCommandCallback(struct ev_loop* loop, ev_timer* timer, int revents); + template + static void submitCommandCallback(struct ev_loop *loop, ev_timer *timer, int revents); // Submit an asynchronous command to the Redox server. Return // true if succeeded, false otherwise. - template - static bool submitToServer(Command* c); + template static bool submitToServer(Command *c); // Callback given to hiredis to invoke when a reply is received - template - static void commandCallback(redisAsyncContext* ctx, void* r, void* privdata); + template + static void commandCallback(redisAsyncContext *ctx, void *r, void *privdata); // Free all commands in the commands_to_free_ queue - static void freeQueuedCommands(struct ev_loop* loop, ev_async* async, int revents); + static void freeQueuedCommands(struct ev_loop *loop, ev_async *async, int revents); // Free the command with the given ID. Return true if the command had the templated // type, and false if it was not in the command map of that type. - template - bool freeQueuedCommand(long id); + template bool freeQueuedCommand(long id); // Invoked by Command objects when they are completed. Removes them // from the command map. - template - void deregisterCommand(const long id) { + template void deregisterCommand(const long id) { std::lock_guard lg1(command_map_guard_); getCommandMap().erase(id); commands_deleted_ += 1; @@ -334,8 +307,7 @@ private: long freeAllCommands(); // Helper function for freeAllCommands to access a specific command map - template - long freeAllCommandsOfType(); + template long freeAllCommandsOfType(); // ------------------------------------------------ // Private members @@ -350,15 +322,15 @@ private: std::function user_connection_callback_; // Dynamically allocated libev event loop - struct ev_loop* evloop_; + struct ev_loop *evloop_; // No-wait mode for high-performance std::atomic_bool nowait_ = {false}; // Asynchronous watchers ev_async watcher_command_; // For processing commands - ev_async watcher_stop_; // For breaking the loop - ev_async watcher_free_; // For freeing commands + ev_async watcher_stop_; // For breaking the loop + ev_async watcher_free_; // For freeing commands // Track of Command objects allocated. Also provides unique Command IDs. std::atomic_long commands_created_ = {0}; @@ -374,7 +346,7 @@ private: // Variable and CV to know when the event loop stops running std::atomic_bool to_exit_ = {false}; // Signal to exit - std::atomic_bool exited_ = {false}; // Event thread exited + std::atomic_bool exited_ = {false}; // Event thread exited std::mutex exit_waiter_lock_; std::condition_variable exit_waiter_; @@ -385,15 +357,16 @@ private: // template // std::unordered_map*> commands_; // --------- - std::unordered_map*> commands_redis_reply_; - std::unordered_map*> commands_string_; - std::unordered_map*> commands_char_p_; - std::unordered_map*> commands_int_; - std::unordered_map*> commands_long_long_int_; - std::unordered_map*> commands_null_; - std::unordered_map>*> commands_vector_string_; - std::unordered_map>*> commands_set_string_; - std::unordered_map>*> commands_unordered_set_string_; + std::unordered_map *> commands_redis_reply_; + std::unordered_map *> commands_string_; + std::unordered_map *> commands_char_p_; + std::unordered_map *> commands_int_; + std::unordered_map *> commands_long_long_int_; + std::unordered_map *> commands_null_; + std::unordered_map> *> commands_vector_string_; + std::unordered_map> *> commands_set_string_; + std::unordered_map> *> + commands_unordered_set_string_; std::mutex command_map_guard_; // Guards access to all of the above // Command IDs pending to be sent to the server @@ -406,36 +379,28 @@ private: // Commands use this method to deregister themselves from Redox, // give it access to private members - template - friend void Command::free(); + template friend void Command::free(); // Access to call disconnectedCallback - template - friend void Command::processReply(redisReply* r); + template friend void Command::processReply(redisReply *r); }; // ------------------------------------------------ // Implementation of templated methods // ------------------------------------------------ -template -Command& Redox::createCommand( - const std::vector& cmd, - const std::function&)>& callback, - double repeat, - double after, - bool free_memory -) { +template +Command &Redox::createCommand(const std::vector &cmd, + const std::function &)> &callback, + double repeat, double after, bool free_memory) { - if(!running_) { + if (!running_) { throw std::runtime_error("[ERROR] Need to connect Redox before running commands!"); } commands_created_ += 1; - auto* c = new Command( - this, commands_created_, cmd, - callback, repeat, after, free_memory, logger_ - ); + auto *c = new Command(this, commands_created_, cmd, callback, repeat, after, free_memory, + logger_); std::lock_guard lg(queue_guard_); std::lock_guard lg2(command_map_guard_); @@ -449,36 +414,27 @@ Command& Redox::createCommand( return *c; } -template -void Redox::command( - const std::vector& cmd, - const std::function&)>& callback -) { +template +void Redox::command(const std::vector &cmd, + const std::function &)> &callback) { createCommand(cmd, callback); } -template -Command& Redox::commandLoop( - const std::vector& cmd, - const std::function&)>& callback, - double repeat, - double after -) { +template +Command &Redox::commandLoop(const std::vector &cmd, + const std::function &)> &callback, + double repeat, double after) { return createCommand(cmd, callback, repeat, after, false); } -template -void Redox::commandDelayed( - const std::vector& cmd, - const std::function&)>& callback, - double after -) { +template +void Redox::commandDelayed(const std::vector &cmd, + const std::function &)> &callback, double after) { createCommand(cmd, callback, 0, after, true); } -template -Command& Redox::commandSync(const std::vector& cmd) { - auto& c = createCommand(cmd, nullptr, 0, 0, false); +template Command &Redox::commandSync(const std::vector &cmd) { + auto &c = createCommand(cmd, nullptr, 0, 0, false); c.wait(); return c; } diff --git a/include/redox/command.hpp b/include/redox/command.hpp index 522c46b..be7f30a 100644 --- a/include/redox/command.hpp +++ b/include/redox/command.hpp @@ -42,19 +42,17 @@ class Redox; * represent a deferred or looping command, in which case the success or * error callbacks are invoked more than once. */ -template -class Command { +template class Command { public: - // Reply codes - static const int NO_REPLY = -1; // No reply yet - static const int OK_REPLY = 0; // Successful reply of the expected type - static const int NIL_REPLY = 1; // Got a nil reply + static const int NO_REPLY = -1; // No reply yet + static const int OK_REPLY = 0; // Successful reply of the expected type + static const int NIL_REPLY = 1; // Got a nil reply static const int ERROR_REPLY = 2; // Got an error reply - static const int SEND_ERROR = 3; // Could not send to server - static const int WRONG_TYPE = 4; // Got reply, but it was not the expected type - static const int TIMEOUT = 5; // No reply, timed out + static const int SEND_ERROR = 3; // Could not send to server + static const int WRONG_TYPE = 4; // Got reply, but it was not the expected type + static const int TIMEOUT = 5; // No reply, timed out /** * Returns the reply status of this command. @@ -94,7 +92,7 @@ public: std::string cmd() const; // Allow public access to constructed data - Redox* const rdx_; + Redox *const rdx_; const long id_; const std::vector cmd_; const double repeat_; @@ -102,26 +100,22 @@ public: const bool free_memory_; private: - - Command( - Redox* rdx, - long id, - const std::vector& cmd, - const std::function&)>& callback, - double repeat, double after, - bool free_memory, - log::Logger& logger - ); + Command(Redox *rdx, long id, const std::vector &cmd, + const std::function &)> &callback, double repeat, double after, + bool free_memory, log::Logger &logger); // Handles a new reply from the server - void processReply(redisReply* r); + void processReply(redisReply *r); // Invoke a user callback from the reply object. This method is specialized // for each ReplyT of Command. void parseReplyObject(); // Directly invoke the user callback if it exists - void invoke() { if(callback_) callback_(*this); } + void invoke() { + if (callback_) + callback_(*this); + } bool checkErrorReply(); bool checkNilReply(); @@ -132,10 +126,10 @@ private: void freeReply(); // The last server reply - redisReply* reply_obj_ = nullptr; + redisReply *reply_obj_ = nullptr; // User callback - const std::function&)> callback_; + const std::function &)> callback_; // Place to store the reply value and status. ReplyT reply_val_; @@ -161,13 +155,13 @@ private: std::atomic_bool waiting_done_ = {false}; // Passed on from Redox class - log::Logger& logger_; + log::Logger &logger_; // Explicitly delete copy constructor and assignment operator, // Command objects should never be copied because they hold // state with a network resource. - Command(const Command&) = delete; - Command& operator=(const Command&) = delete; + Command(const Command &) = delete; + Command &operator=(const Command &) = delete; friend class Redox; }; diff --git a/include/redox/subscriber.hpp b/include/redox/subscriber.hpp index 80351c6..9635ec7 100644 --- a/include/redox/subscriber.hpp +++ b/include/redox/subscriber.hpp @@ -27,14 +27,10 @@ namespace redox { class Subscriber { public: - /** * Constructor. Same as Redox. */ - Subscriber( - std::ostream& log_stream = std::cout, - log::Level log_level = log::Warning - ); + Subscriber(std::ostream &log_stream = std::cout, log::Level log_level = log::Warning); /** * Cleans up. @@ -49,19 +45,16 @@ public: /** * Same as .connect() on a Redox instance. */ - bool connect( - const std::string& host = REDIS_DEFAULT_HOST, - const int port = REDIS_DEFAULT_PORT, - std::function connection_callback = nullptr) { + bool connect(const std::string &host = REDIS_DEFAULT_HOST, const int port = REDIS_DEFAULT_PORT, + std::function connection_callback = nullptr) { return rdx_.connect(host, port, connection_callback); } /** * Same as .connectUnix() on a Redox instance. */ - bool connectUnix( - const std::string& path = REDIS_DEFAULT_PATH, - std::function connection_callback = nullptr) { + bool connectUnix(const std::string &path = REDIS_DEFAULT_PATH, + std::function connection_callback = nullptr) { return rdx_.connectUnix(path, connection_callback); } @@ -88,11 +81,10 @@ public: * err_callback: invoked on some error state */ void subscribe(const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr - ); + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr); /** * Subscribe to a topic with a pattern. @@ -102,11 +94,10 @@ public: * err_callback: invoked on some error state */ void psubscribe(const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr - ); + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr); /** * Unsubscribe from a topic. @@ -114,8 +105,7 @@ public: * err_callback: invoked on some error state */ void unsubscribe(const std::string topic, - std::function err_callback = nullptr - ); + std::function err_callback = nullptr); /** * Unsubscribe from a topic with a pattern. @@ -123,8 +113,7 @@ public: * err_callback: invoked on some error state */ void punsubscribe(const std::string topic, - std::function err_callback = nullptr - ); + std::function err_callback = nullptr); /** * Return the topics that are subscribed() to. @@ -143,19 +132,16 @@ public: } private: - // Base for subscribe and psubscribe void subscribeBase(const std::string cmd_name, const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr - ); + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr); // Base for unsubscribe and punsubscribe void unsubscribeBase(const std::string cmd_name, const std::string topic, - std::function err_callback = nullptr - ); + std::function err_callback = nullptr); // Underlying Redis client Redox rdx_; @@ -170,10 +156,10 @@ private: std::mutex psubscribed_topics_guard_; // Set of persisting commands, so that we can cancel them - std::set*> commands_; + std::set *> commands_; // Reference to rdx_.logger_ for convenience - log::Logger& logger_; + log::Logger &logger_; // CVs to wait for unsubscriptions std::condition_variable cv_unsub_; diff --git a/src/client.cpp b/src/client.cpp index 0b1a901..7d652d0 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -26,63 +26,56 @@ using namespace std; namespace redox { -Redox::Redox( - ostream& log_stream, - log::Level log_level -) : logger_(log_stream, log_level), evloop_(nullptr) {} +Redox::Redox(ostream &log_stream, log::Level log_level) + : logger_(log_stream, log_level), evloop_(nullptr) {} -bool Redox::connect( - const std::string& host, const int port, - std::function connection_callback -) { +bool Redox::connect(const string &host, const int port, + function connection_callback) { host_ = host; port_ = port; user_connection_callback_ = connection_callback; - if(!initEv()) return false; + if (!initEv()) + return false; // Connect over TCP ctx_ = redisAsyncConnect(host.c_str(), port); - if(!initHiredis()) return false; + if (!initHiredis()) + return false; event_loop_thread_ = thread([this] { runEventLoop(); }); // Block until connected and running the event loop, or until // a connection error happens and the event loop exits unique_lock ul(running_waiter_lock_); - running_waiter_.wait(ul, [this] { - return running_.load() || connect_state_ == CONNECT_ERROR; - }); + running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); // Return if succeeded return connect_state_ == CONNECTED; } -bool Redox::connectUnix( - const std::string& path, - std::function connection_callback -) { +bool Redox::connectUnix(const string &path, function connection_callback) { path_ = path; user_connection_callback_ = connection_callback; - if(!initEv()) return false; + if (!initEv()) + return false; // Connect over unix sockets ctx_ = redisAsyncConnectUnix(path.c_str()); - if(!initHiredis()) return false; + if (!initHiredis()) + return false; event_loop_thread_ = thread([this] { runEventLoop(); }); // Block until connected and running the event loop, or until // a connection error happens and the event loop exits unique_lock ul(running_waiter_lock_); - running_waiter_.wait(ul, [this] { - return running_.load() || connect_state_ == CONNECT_ERROR; - }); + running_waiter_.wait(ul, [this] { return running_.load() || connect_state_ == CONNECT_ERROR; }); // Return if succeeded return connect_state_ == CONNECTED; @@ -107,16 +100,20 @@ void Redox::wait() { Redox::~Redox() { // Bring down the event loop - if(running_ == true) { stop(); } + if (running_ == true) { + stop(); + } - if(event_loop_thread_.joinable()) event_loop_thread_.join(); + if (event_loop_thread_.joinable()) + event_loop_thread_.join(); - if(evloop_ != nullptr) ev_loop_destroy(evloop_); + if (evloop_ != nullptr) + ev_loop_destroy(evloop_); } -void Redox::connectedCallback(const redisAsyncContext* ctx, int status) { +void Redox::connectedCallback(const redisAsyncContext *ctx, int status) { - Redox* rdx = (Redox*) ctx->data; + Redox *rdx = (Redox *)ctx->data; if (status != REDIS_OK) { rdx->logger_.fatal() << "Could not connect to Redis: " << ctx->errstr; @@ -131,12 +128,13 @@ void Redox::connectedCallback(const redisAsyncContext* ctx, int status) { } rdx->connect_waiter_.notify_all(); - if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_); + if (rdx->user_connection_callback_) + rdx->user_connection_callback_(rdx->connect_state_); } -void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) { +void Redox::disconnectedCallback(const redisAsyncContext *ctx, int status) { - Redox* rdx = (Redox*) ctx->data; + Redox *rdx = (Redox *)ctx->data; if (status != REDIS_OK) { rdx->logger_.error() << "Disconnected from Redis on error: " << ctx->errstr; @@ -148,25 +146,26 @@ void Redox::disconnectedCallback(const redisAsyncContext* ctx, int status) { rdx->stop(); rdx->connect_waiter_.notify_all(); - if(rdx->user_connection_callback_) rdx->user_connection_callback_(rdx->connect_state_); + if (rdx->user_connection_callback_) + rdx->user_connection_callback_(rdx->connect_state_); } bool Redox::initEv() { signal(SIGPIPE, SIG_IGN); evloop_ = ev_loop_new(EVFLAG_AUTO); - if(evloop_ == nullptr) { + if (evloop_ == nullptr) { logger_.fatal() << "Could not create a libev event loop."; connect_state_ = INIT_ERROR; connect_waiter_.notify_all(); return false; } - ev_set_userdata(evloop_, (void*)this); // Back-reference + ev_set_userdata(evloop_, (void *)this); // Back-reference return true; } bool Redox::initHiredis() { - ctx_->data = (void*)this; // Back-reference + ctx_->data = (void *)this; // Back-reference if (ctx_->err) { logger_.fatal() << "Could not create a hiredis context: " << ctx_->errstr; @@ -176,7 +175,7 @@ bool Redox::initHiredis() { } // Attach event loop to hiredis - if(redisLibevAttach(evloop_, ctx_) != REDIS_OK) { + if (redisLibevAttach(evloop_, ctx_) != REDIS_OK) { logger_.fatal() << "Could not attach libev event loop to hiredis."; connect_state_ = INIT_ERROR; connect_waiter_.notify_all(); @@ -184,14 +183,14 @@ bool Redox::initHiredis() { } // Set the callbacks to be invoked on server connection/disconnection - if(redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) { + if (redisAsyncSetConnectCallback(ctx_, Redox::connectedCallback) != REDIS_OK) { logger_.fatal() << "Could not attach connect callback to hiredis."; connect_state_ = INIT_ERROR; connect_waiter_.notify_all(); return false; } - if(redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) { + if (redisAsyncSetDisconnectCallback(ctx_, Redox::disconnectedCallback) != REDIS_OK) { logger_.fatal() << "Could not attach disconnect callback to hiredis."; connect_state_ = INIT_ERROR; connect_waiter_.notify_all(); @@ -202,12 +201,14 @@ bool Redox::initHiredis() { } void Redox::noWait(bool state) { - if(state) logger_.info() << "No-wait mode enabled."; - else logger_.info() << "No-wait mode disabled."; + if (state) + logger_.info() << "No-wait mode enabled."; + else + logger_.info() << "No-wait mode disabled."; nowait_ = state; } -void breakEventLoop(struct ev_loop* loop, ev_async* async, int revents) { +void breakEventLoop(struct ev_loop *loop, ev_async *async, int revents) { ev_break(loop, EVBREAK_ALL); } @@ -222,7 +223,7 @@ void Redox::runEventLoop() { connect_waiter_.wait(ul, [this] { return connect_state_ != NOT_YET_CONNECTED; }); // Handle connection error - if(connect_state_ != CONNECTED) { + if (connect_state_ != CONNECTED) { logger_.warning() << "Did not connect, event loop exiting."; exited_ = true; running_ = false; @@ -250,7 +251,7 @@ void Redox::runEventLoop() { // Run the event loop, using NOWAIT if enabled for maximum // throughput by avoiding any sleeping while (!to_exit_) { - if(nowait_) { + if (nowait_) { ev_run(evloop_, EVRUN_NOWAIT); } else { ev_run(evloop_); @@ -266,14 +267,15 @@ void Redox::runEventLoop() { this_thread::sleep_for(chrono::milliseconds(10)); ev_run(evloop_, EVRUN_NOWAIT); - if(connect_state_ == CONNECTED) redisAsyncDisconnect(ctx_); + if (connect_state_ == CONNECTED) + redisAsyncDisconnect(ctx_); // Run once more to disconnect ev_run(evloop_, EVRUN_NOWAIT); - if(commands_created_ != commands_deleted_) { - logger_.error() << "All commands were not freed! " - << commands_deleted_ << "/" << commands_created_; + if (commands_created_ != commands_deleted_) { + logger_.error() << "All commands were not freed! " << commands_deleted_ << "/" + << commands_created_; } exited_ = true; @@ -285,27 +287,26 @@ void Redox::runEventLoop() { logger_.info() << "Event thread exited."; } -template -Command* Redox::findCommand(long id) { +template Command *Redox::findCommand(long id) { lock_guard lg(command_map_guard_); - auto& command_map = getCommandMap(); + auto &command_map = getCommandMap(); auto it = command_map.find(id); - if(it == command_map.end()) return nullptr; + if (it == command_map.end()) + return nullptr; return it->second; } -template -void Redox::commandCallback(redisAsyncContext* ctx, void* r, void* privdata) { +template +void Redox::commandCallback(redisAsyncContext *ctx, void *r, void *privdata) { - Redox* rdx = (Redox*) ctx->data; + Redox *rdx = (Redox *)ctx->data; long id = (long)privdata; - redisReply* reply_obj = (redisReply*) r; + redisReply *reply_obj = (redisReply *)r; - Command* c = rdx->findCommand(id); - if(c == nullptr) { -// rdx->logger.warning() << "Couldn't find Command " << id << " in command_map (commandCallback)."; + Command *c = rdx->findCommand(id); + if (c == nullptr) { freeReplyObject(reply_obj); return; } @@ -313,26 +314,23 @@ void Redox::commandCallback(redisAsyncContext* ctx, void* r, void* privdata) { c->processReply(reply_obj); } -template -bool Redox::submitToServer(Command* c) { +template bool Redox::submitToServer(Command *c) { - Redox* rdx = c->rdx_; + Redox *rdx = c->rdx_; c->pending_++; // Construct a char** from the vector - vector argv; + vector argv; transform(c->cmd_.begin(), c->cmd_.end(), back_inserter(argv), - [](const string& s){ return s.c_str(); } - ); + [](const string &s) { return s.c_str(); }); // Construct a size_t* of string lengths from the vector vector argvlen; transform(c->cmd_.begin(), c->cmd_.end(), back_inserter(argvlen), - [](const string& s) { return s.size(); } - ); + [](const string &s) { return s.size(); }); - if(redisAsyncCommandArgv(rdx->ctx_, commandCallback, (void*) c->id_, - argv.size(), &argv[0], &argvlen[0]) != REDIS_OK) { + if (redisAsyncCommandArgv(rdx->ctx_, commandCallback, (void *)c->id_, argv.size(), + &argv[0], &argvlen[0]) != REDIS_OK) { rdx->logger_.error() << "Could not send \"" << c->cmd() << "\": " << rdx->ctx_->errstr; c->reply_status_ = Command::SEND_ERROR; c->invoke(); @@ -342,34 +340,34 @@ bool Redox::submitToServer(Command* c) { return true; } -template -void Redox::submitCommandCallback(struct ev_loop* loop, ev_timer* timer, int revents) { +template +void Redox::submitCommandCallback(struct ev_loop *loop, ev_timer *timer, int revents) { - Redox* rdx = (Redox*) ev_userdata(loop); + Redox *rdx = (Redox *)ev_userdata(loop); long id = (long)timer->data; - Command* c = rdx->findCommand(id); - if(c == nullptr) { + Command *c = rdx->findCommand(id); + if (c == nullptr) { rdx->logger_.error() << "Couldn't find Command " << id - << " in command_map (submitCommandCallback)."; + << " in command_map (submitCommandCallback)."; return; } submitToServer(c); } -template -bool Redox::processQueuedCommand(long id) { +template bool Redox::processQueuedCommand(long id) { - Command* c = findCommand(id); - if(c == nullptr) return false; + Command *c = findCommand(id); + if (c == nullptr) + return false; - if((c->repeat_ == 0) && (c->after_ == 0)) { + if ((c->repeat_ == 0) && (c->after_ == 0)) { submitToServer(c); } else { - c->timer_.data = (void*)c->id_; + c->timer_.data = (void *)c->id_; ev_timer_init(&c->timer_, submitCommandCallback, c->after_, c->repeat_); ev_timer_start(evloop_, &c->timer_); @@ -379,62 +377,64 @@ bool Redox::processQueuedCommand(long id) { return true; } -void Redox::processQueuedCommands(struct ev_loop* loop, ev_async* async, int revents) { +void Redox::processQueuedCommands(struct ev_loop *loop, ev_async *async, int revents) { - Redox* rdx = (Redox*) ev_userdata(loop); + Redox *rdx = (Redox *)ev_userdata(loop); lock_guard lg(rdx->queue_guard_); - while(!rdx->command_queue_.empty()) { + while (!rdx->command_queue_.empty()) { long id = rdx->command_queue_.front(); rdx->command_queue_.pop(); - if(rdx->processQueuedCommand(id)) {} - else if(rdx->processQueuedCommand(id)) {} - else if(rdx->processQueuedCommand(id)) {} - else if(rdx->processQueuedCommand(id)) {} - else if(rdx->processQueuedCommand(id)) {} - else if(rdx->processQueuedCommand(id)) {} - else if(rdx->processQueuedCommand>(id)) {} - else if(rdx->processQueuedCommand>(id)) {} - else if(rdx->processQueuedCommand>(id)) {} - else throw runtime_error("Command pointer not found in any queue!"); + if (rdx->processQueuedCommand(id)) { + } else if (rdx->processQueuedCommand(id)) { + } else if (rdx->processQueuedCommand(id)) { + } else if (rdx->processQueuedCommand(id)) { + } else if (rdx->processQueuedCommand(id)) { + } else if (rdx->processQueuedCommand(id)) { + } else if (rdx->processQueuedCommand>(id)) { + } else if (rdx->processQueuedCommand>(id)) { + } else if (rdx->processQueuedCommand>(id)) { + } else + throw runtime_error("Command pointer not found in any queue!"); } } -void Redox::freeQueuedCommands(struct ev_loop* loop, ev_async* async, int revents) { +void Redox::freeQueuedCommands(struct ev_loop *loop, ev_async *async, int revents) { - Redox* rdx = (Redox*) ev_userdata(loop); + Redox *rdx = (Redox *)ev_userdata(loop); lock_guard lg(rdx->free_queue_guard_); - while(!rdx->commands_to_free_.empty()) { + while (!rdx->commands_to_free_.empty()) { long id = rdx->commands_to_free_.front(); rdx->commands_to_free_.pop(); - if(rdx->freeQueuedCommand(id)) {} - else if(rdx->freeQueuedCommand(id)) {} - else if(rdx->freeQueuedCommand(id)) {} - else if(rdx->freeQueuedCommand(id)) {} - else if(rdx->freeQueuedCommand(id)) {} - else if(rdx->freeQueuedCommand(id)) {} - else if(rdx->freeQueuedCommand>(id)) {} - else if(rdx->freeQueuedCommand>(id)) {} - else if(rdx->freeQueuedCommand>(id)) {} - else {} + if (rdx->freeQueuedCommand(id)) { + } else if (rdx->freeQueuedCommand(id)) { + } else if (rdx->freeQueuedCommand(id)) { + } else if (rdx->freeQueuedCommand(id)) { + } else if (rdx->freeQueuedCommand(id)) { + } else if (rdx->freeQueuedCommand(id)) { + } else if (rdx->freeQueuedCommand>(id)) { + } else if (rdx->freeQueuedCommand>(id)) { + } else if (rdx->freeQueuedCommand>(id)) { + } else { + } } } -template -bool Redox::freeQueuedCommand(long id) { - Command* c = findCommand(id); - if(c == nullptr) return false; +template bool Redox::freeQueuedCommand(long id) { + Command *c = findCommand(id); + if (c == nullptr) + return false; c->freeReply(); // Stop the libev timer if this is a repeating command - if((c->repeat_ != 0) || (c->after_ != 0)) { + if ((c->repeat_ != 0) || (c->after_ != 0)) { lock_guard lg(c->timer_guard_); ev_timer_stop(c->rdx_->evloop_, &c->timer_); } @@ -447,33 +447,28 @@ bool Redox::freeQueuedCommand(long id) { } long Redox::freeAllCommands() { - return freeAllCommandsOfType() + - freeAllCommandsOfType() + - freeAllCommandsOfType() + - freeAllCommandsOfType() + - freeAllCommandsOfType() + - freeAllCommandsOfType() + - freeAllCommandsOfType>() + - freeAllCommandsOfType>() + - freeAllCommandsOfType>(); + return freeAllCommandsOfType() + freeAllCommandsOfType() + + freeAllCommandsOfType() + freeAllCommandsOfType() + + freeAllCommandsOfType() + freeAllCommandsOfType() + + freeAllCommandsOfType>() + freeAllCommandsOfType>() + + freeAllCommandsOfType>(); } -template -long Redox::freeAllCommandsOfType() { +template long Redox::freeAllCommandsOfType() { lock_guard lg(free_queue_guard_); lock_guard lg2(queue_guard_); - auto& command_map = getCommandMap(); + auto &command_map = getCommandMap(); long len = command_map.size(); - for(auto& pair : command_map) { - Command* c = pair.second; + for (auto &pair : command_map) { + Command *c = pair.second; c->freeReply(); // Stop the libev timer if this is a repeating command - if((c->repeat_ != 0) || (c->after_ != 0)) { + if ((c->repeat_ != 0) || (c->after_ != 0)) { lock_guard lg3(c->timer_guard_); ev_timer_stop(c->rdx_->evloop_, &c->timer_); } @@ -491,89 +486,95 @@ long Redox::freeAllCommandsOfType() { // get_command_map specializations // --------------------------------- -template<> unordered_map*>& -Redox::getCommandMap() { return commands_redis_reply_; } +template <> unordered_map *> &Redox::getCommandMap() { + return commands_redis_reply_; +} -template<> unordered_map*>& -Redox::getCommandMap() { return commands_string_; } +template <> unordered_map *> &Redox::getCommandMap() { + return commands_string_; +} -template<> unordered_map*>& -Redox::getCommandMap() { return commands_char_p_; } +template <> unordered_map *> &Redox::getCommandMap() { + return commands_char_p_; +} -template<> unordered_map*>& -Redox::getCommandMap() { return commands_int_; } +template <> unordered_map *> &Redox::getCommandMap() { + return commands_int_; +} -template<> unordered_map*>& -Redox::getCommandMap() { return commands_long_long_int_; } +template <> unordered_map *> &Redox::getCommandMap() { + return commands_long_long_int_; +} -template<> unordered_map*>& -Redox::getCommandMap() { return commands_null_; } +template <> unordered_map *> &Redox::getCommandMap() { + return commands_null_; +} -template<> unordered_map>*>& -Redox::getCommandMap>() { return commands_vector_string_; } +template <> unordered_map> *> &Redox::getCommandMap>() { + return commands_vector_string_; +} -template<> unordered_map>*>& -Redox::getCommandMap>() { return commands_set_string_; } +template <> unordered_map> *> &Redox::getCommandMap>() { + return commands_set_string_; +} -template<> unordered_map>*>& -Redox::getCommandMap>() { return commands_unordered_set_string_; } +template <> +unordered_map> *> & +Redox::getCommandMap>() { + return commands_unordered_set_string_; +} // ---------------------------- // Helpers // ---------------------------- -string Redox::vecToStr(const vector& vec, const char delimiter) { +string Redox::vecToStr(const vector &vec, const char delimiter) { string str; - for(size_t i = 0; i < vec.size() - 1; i++) + for (size_t i = 0; i < vec.size() - 1; i++) str += vec[i] + delimiter; - str += vec[vec.size()-1]; + str += vec[vec.size() - 1]; return str; } -vector Redox::strToVec(const string& s, const char delimiter) { +vector Redox::strToVec(const string &s, const char delimiter) { vector vec; size_t last = 0; size_t next = 0; while ((next = s.find(delimiter, last)) != string::npos) { - vec.push_back(s.substr(last, next-last)); + vec.push_back(s.substr(last, next - last)); last = next + 1; } vec.push_back(s.substr(last)); return vec; } -void Redox::command(const std::vector& cmd) { - command(cmd, nullptr); -} +void Redox::command(const vector &cmd) { command(cmd, nullptr); } -bool Redox::commandSync(const std::vector& cmd) { - auto& c = commandSync(cmd); +bool Redox::commandSync(const vector &cmd) { + auto &c = commandSync(cmd); bool succeeded = c.ok(); c.free(); return succeeded; } -string Redox::get(const string& key) { +string Redox::get(const string &key) { - Command& c = commandSync({"GET", key}); - if(!c.ok()) { - throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + to_string(c.status())); + Command &c = commandSync({"GET", key}); + if (!c.ok()) { + throw runtime_error("[FATAL] Error getting key " + key + ": Status code " + + to_string(c.status())); } string reply = c.reply(); c.free(); return reply; }; -bool Redox::set(const string& key, const string& value) { - return commandSync({"SET", key, value}); -} +bool Redox::set(const string &key, const string &value) { return commandSync({"SET", key, value}); } -bool Redox::del(const string& key) { - return commandSync({"DEL", key}); -} +bool Redox::del(const string &key) { return commandSync({"DEL", key}); } -void Redox::publish(const string& topic, const string& msg) { - command({"PUBLISH", topic, msg}); +void Redox::publish(const string &topic, const string &msg) { + command({"PUBLISH", topic, msg}); } } // End namespace redis diff --git a/src/command.cpp b/src/command.cpp index 3baeb60..d7d39a1 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -29,32 +29,27 @@ using namespace std; namespace redox { -template -Command::Command( - Redox* rdx, - long id, - const std::vector& cmd, - const std::function&)>& callback, - double repeat, double after, bool free_memory, log::Logger& logger -) : rdx_(rdx), id_(id), cmd_(cmd), repeat_(repeat), after_(after), free_memory_(free_memory), - callback_(callback), last_error_(), logger_(logger) { +template +Command::Command(Redox *rdx, long id, const vector &cmd, + const function &)> &callback, double repeat, + double after, bool free_memory, log::Logger &logger) + : rdx_(rdx), id_(id), cmd_(cmd), repeat_(repeat), after_(after), free_memory_(free_memory), + callback_(callback), last_error_(), logger_(logger) { timer_guard_.lock(); } -template -void Command::wait() { - std::unique_lock lk(waiter_lock_); +template void Command::wait() { + unique_lock lk(waiter_lock_); waiter_.wait(lk, [this]() { return waiting_done_.load(); }); waiting_done_ = {false}; } -template -void Command::processReply(redisReply* r) { +template void Command::processReply(redisReply *r) { last_error_.clear(); reply_obj_ = r; - if(reply_obj_ == nullptr) { + if (reply_obj_ == nullptr) { reply_status_ = ERROR_REPLY; last_error_ = "Received null redisReply* from hiredis."; logger_.error() << last_error_; @@ -73,34 +68,35 @@ void Command::processReply(redisReply* r) { waiter_.notify_all(); // Always free the reply object for repeating commands - if(repeat_ > 0) { + if (repeat_ > 0) { freeReply(); } else { // User calls .free() - if (!free_memory_) return; + if (!free_memory_) + return; // Free non-repeating commands automatically // once we receive expected replies - if(pending_ == 0) free(); + if (pending_ == 0) + free(); } } // This is the only method in Command that has // access to private members of Redox -template -void Command::free() { +template void Command::free() { lock_guard lg(rdx_->free_queue_guard_); rdx_->commands_to_free_.push(id_); ev_async_send(rdx_->evloop_, &rdx_->watcher_free_); } -template -void Command::freeReply() { +template void Command::freeReply() { - if (reply_obj_ == nullptr) return; + if (reply_obj_ == nullptr) + return; freeReplyObject(reply_obj_); reply_obj_ = nullptr; @@ -111,60 +107,55 @@ void Command::freeReply() { * to make sure we don't return a reply while it is being * modified. */ -template -ReplyT Command::reply() { - std::lock_guard lg(reply_guard_); +template ReplyT Command::reply() { + lock_guard lg(reply_guard_); if (!ok()) { logger_.warning() << cmd() << ": Accessing reply value while status != OK."; } return reply_val_; } -template -std::string Command::cmd() const { - return rdx_->vecToStr(cmd_); -} +template string Command::cmd() const { return rdx_->vecToStr(cmd_); } -template -bool Command::isExpectedReply(int type) { +template bool Command::isExpectedReply(int type) { - if(reply_obj_->type == type) { + if (reply_obj_->type == type) { reply_status_ = OK_REPLY; return true; } - if(checkErrorReply() || checkNilReply()) return false; + if (checkErrorReply() || checkNilReply()) + return false; - std::stringstream errorMessage; - errorMessage << "Received reply of type " << reply_obj_->type - << ", expected type " << type << "."; + stringstream errorMessage; + errorMessage << "Received reply of type " << reply_obj_->type << ", expected type " << type + << "."; last_error_ = errorMessage.str(); logger_.error() << cmd() << ": " << last_error_; reply_status_ = WRONG_TYPE; return false; } -template -bool Command::isExpectedReply(int typeA, int typeB) { +template bool Command::isExpectedReply(int typeA, int typeB) { - if((reply_obj_->type == typeA) || (reply_obj_->type == typeB)) { + if ((reply_obj_->type == typeA) || (reply_obj_->type == typeB)) { reply_status_ = OK_REPLY; return true; } - if(checkErrorReply() || checkNilReply()) return false; + if (checkErrorReply() || checkNilReply()) + return false; - std::stringstream errorMessage; - errorMessage << "Received reply of type " << reply_obj_->type - << ", expected type " << typeA << " or " << typeB << "."; + stringstream errorMessage; + errorMessage << "Received reply of type " << reply_obj_->type << ", expected type " << typeA + << " or " << typeB << "."; last_error_ = errorMessage.str(); logger_.error() << cmd() << ": " << last_error_; reply_status_ = WRONG_TYPE; return false; } -template -bool Command::checkErrorReply() { +template bool Command::checkErrorReply() { if (reply_obj_->type == REDIS_REPLY_ERROR) { if (reply_obj_->str != 0) { @@ -178,8 +169,7 @@ bool Command::checkErrorReply() { return false; } -template -bool Command::checkNilReply() { +template bool Command::checkNilReply() { if (reply_obj_->type == REDIS_REPLY_NIL) { logger_.warning() << cmd() << ": Nil reply."; @@ -193,74 +183,74 @@ bool Command::checkNilReply() { // Specializations of parseReplyObject for all expected return types // ---------------------------------------------------------------------------- -template<> -void Command::parseReplyObject() { - if(!checkErrorReply()) reply_status_ = OK_REPLY; +template <> void Command::parseReplyObject() { + if (!checkErrorReply()) + reply_status_ = OK_REPLY; reply_val_ = reply_obj_; } -template<> -void Command::parseReplyObject() { - if(!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) return; +template <> void Command::parseReplyObject() { + if (!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) + return; reply_val_ = {reply_obj_->str, static_cast(reply_obj_->len)}; } -template<> -void Command::parseReplyObject() { - if(!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) return; +template <> void Command::parseReplyObject() { + if (!isExpectedReply(REDIS_REPLY_STRING, REDIS_REPLY_STATUS)) + return; reply_val_ = reply_obj_->str; } -template<> -void Command::parseReplyObject() { +template <> void Command::parseReplyObject() { - if(!isExpectedReply(REDIS_REPLY_INTEGER)) return; - reply_val_ = (int) reply_obj_->integer; + if (!isExpectedReply(REDIS_REPLY_INTEGER)) + return; + reply_val_ = (int)reply_obj_->integer; } -template<> -void Command::parseReplyObject() { +template <> void Command::parseReplyObject() { - if(!isExpectedReply(REDIS_REPLY_INTEGER)) return; + if (!isExpectedReply(REDIS_REPLY_INTEGER)) + return; reply_val_ = reply_obj_->integer; } -template<> -void Command::parseReplyObject() { +template <> void Command::parseReplyObject() { - if(!isExpectedReply(REDIS_REPLY_NIL)) return; + if (!isExpectedReply(REDIS_REPLY_NIL)) + return; reply_val_ = nullptr; } -template<> -void Command>::parseReplyObject() { +template <> void Command>::parseReplyObject() { - if(!isExpectedReply(REDIS_REPLY_ARRAY)) return; + if (!isExpectedReply(REDIS_REPLY_ARRAY)) + return; - for(size_t i = 0; i < reply_obj_->elements; i++) { - redisReply* r = *(reply_obj_->element + i); + for (size_t i = 0; i < reply_obj_->elements; i++) { + redisReply *r = *(reply_obj_->element + i); reply_val_.emplace_back(r->str, r->len); } } -template<> -void Command>::parseReplyObject() { +template <> void Command>::parseReplyObject() { - if(!isExpectedReply(REDIS_REPLY_ARRAY)) return; + if (!isExpectedReply(REDIS_REPLY_ARRAY)) + return; - for(size_t i = 0; i < reply_obj_->elements; i++) { - redisReply* r = *(reply_obj_->element + i); + for (size_t i = 0; i < reply_obj_->elements; i++) { + redisReply *r = *(reply_obj_->element + i); reply_val_.emplace(r->str, r->len); } } -template<> -void Command>::parseReplyObject() { +template <> void Command>::parseReplyObject() { - if(!isExpectedReply(REDIS_REPLY_ARRAY)) return; + if (!isExpectedReply(REDIS_REPLY_ARRAY)) + return; - for(size_t i = 0; i < reply_obj_->elements; i++) { - redisReply* r = *(reply_obj_->element + i); + for (size_t i = 0; i < reply_obj_->elements; i++) { + redisReply *r = *(reply_obj_->element + i); reply_val_.emplace(r->str, r->len); } } @@ -268,9 +258,9 @@ void Command>::parseReplyObject() { // Explicit template instantiation for available types, so that the generated // library contains them and we can keep the method definitions out of the // header file. -template class Command; +template class Command; template class Command; -template class Command; +template class Command; template class Command; template class Command; template class Command; diff --git a/src/subscriber.cpp b/src/subscriber.cpp index bf1d730..c650d58 100644 --- a/src/subscriber.cpp +++ b/src/subscriber.cpp @@ -25,9 +25,8 @@ using namespace std; namespace redox { -Subscriber::Subscriber( - std::ostream& log_stream, log::Level log_level -) : rdx_(log_stream, log_level), logger_(rdx_.logger_) {} +Subscriber::Subscriber(ostream &log_stream, log::Level log_level) + : rdx_(log_stream, log_level), logger_(rdx_.logger_) {} Subscriber::~Subscriber() {} @@ -36,9 +35,7 @@ void Subscriber::disconnect() { wait(); } -void Subscriber::wait() { - rdx_.wait(); -} +void Subscriber::wait() { rdx_.wait(); } // This is a fairly awkward way of shutting down, where // we pause to wait for subscriptions to happen, and then @@ -46,114 +43,127 @@ void Subscriber::wait() { // The reason is because hiredis goes into // a segfault in freeReplyObject() under redisAsyncDisconnect() // if we don't do this first. -// TODO look at hiredis, ask them what causes the error +// TODO(hayk): look at hiredis, ask them what causes the error void Subscriber::stop() { this_thread::sleep_for(chrono::milliseconds(1000)); - for(const string& topic : subscribedTopics()) + for (const string &topic : subscribedTopics()) unsubscribe(topic); - for(const string& topic : psubscribedTopics()) + for (const string &topic : psubscribedTopics()) punsubscribe(topic); unique_lock ul(cv_unsub_guard_); cv_unsub_.wait(ul, [this] { - std::lock_guard lg(subscribed_topics_guard_); + lock_guard lg(subscribed_topics_guard_); return (subscribed_topics_.size() == 0); }); unique_lock ul2(cv_punsub_guard_); cv_punsub_.wait(ul, [this] { - std::lock_guard lg(subscribed_topics_guard_); + lock_guard lg(subscribed_topics_guard_); return (psubscribed_topics_.size() == 0); }); - for(Command* c : commands_) + for (Command *c : commands_) c->free(); rdx_.stop(); } // For debugging only -void debugReply(Command c) { +void debugReply(Command c) { - redisReply* reply = c.reply(); + redisReply *reply = c.reply(); cout << "------" << endl; cout << c.cmd() << " " << (reply->type == REDIS_REPLY_ARRAY) << " " << (reply->elements) << endl; - for(size_t i = 0; i < reply->elements; i++) { - redisReply* r = reply->element[i]; + for (size_t i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; cout << "element " << i << ", reply type = " << r->type << " "; - if(r->type == REDIS_REPLY_STRING) cout << r->str << endl; - else if(r->type == REDIS_REPLY_INTEGER) cout << r->integer << endl; - else cout << "some other type" << endl; + if (r->type == REDIS_REPLY_STRING) + cout << r->str << endl; + else if (r->type == REDIS_REPLY_INTEGER) + cout << r->integer << endl; + else + cout << "some other type" << endl; } cout << "------" << endl; } void Subscriber::subscribeBase(const string cmd_name, const string topic, - function msg_callback, - function sub_callback, - function unsub_callback, - function err_callback -) { + function msg_callback, + function sub_callback, + function unsub_callback, + function err_callback) { - Command& sub_cmd = rdx_.commandLoop({cmd_name, topic}, - [this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command& c) { + Command &sub_cmd = rdx_.commandLoop( + {cmd_name, topic}, + [this, topic, msg_callback, err_callback, sub_callback, unsub_callback]( + Command &c) { if (!c.ok()) { num_pending_subs_--; - if (err_callback) err_callback(topic, c.status()); + if (err_callback) + err_callback(topic, c.status()); return; } - redisReply* reply = c.reply(); + redisReply *reply = c.reply(); // If the last entry is an integer, then it is a [p]sub/[p]unsub command if ((reply->type == REDIS_REPLY_ARRAY) && (reply->element[reply->elements - 1]->type == REDIS_REPLY_INTEGER)) { - std::lock_guard lg(subscribed_topics_guard_); - std::lock_guard lg2(psubscribed_topics_guard_); + lock_guard lg(subscribed_topics_guard_); + lock_guard lg2(psubscribed_topics_guard_); if (!strncmp(reply->element[0]->str, "sub", 3)) { subscribed_topics_.insert(topic); num_pending_subs_--; - if (sub_callback) sub_callback(topic); + if (sub_callback) + sub_callback(topic); } else if (!strncmp(reply->element[0]->str, "psub", 4)) { psubscribed_topics_.insert(topic); num_pending_subs_--; - if (sub_callback) sub_callback(topic); + if (sub_callback) + sub_callback(topic); } else if (!strncmp(reply->element[0]->str, "uns", 3)) { subscribed_topics_.erase(topic); - if (unsub_callback) unsub_callback(topic); + if (unsub_callback) + unsub_callback(topic); cv_unsub_.notify_all(); } else if (!strncmp(reply->element[0]->str, "puns", 4)) { psubscribed_topics_.erase(topic); - if (unsub_callback) unsub_callback(topic); + if (unsub_callback) + unsub_callback(topic); cv_punsub_.notify_all(); } - else logger_.error() << "Unknown pubsub message: " << reply->element[0]->str; + else + logger_.error() << "Unknown pubsub message: " << reply->element[0]->str; } - // Message for subscribe + // Message for subscribe else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) { - char* msg = reply->element[2]->str; - if (msg && msg_callback) msg_callback(topic, reply->element[2]->str); + char *msg = reply->element[2]->str; + if (msg && msg_callback) + msg_callback(topic, reply->element[2]->str); } - // Message for psubscribe + // Message for psubscribe else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 4)) { - char* msg = reply->element[2]->str; - if (msg && msg_callback) msg_callback(reply->element[2]->str, reply->element[3]->str); + char *msg = reply->element[2]->str; + if (msg && msg_callback) + msg_callback(reply->element[2]->str, reply->element[3]->str); } - else logger_.error() << "Unknown pubsub message of type " << reply->type; + else + logger_.error() << "Unknown pubsub message of type " << reply->type; }, 1e10 // To keep the command around for a few hundred years - ); + ); // Add it to the command list commands_.insert(&sub_cmd); @@ -161,13 +171,12 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic, } void Subscriber::subscribe(const string topic, - function msg_callback, - function sub_callback, - function unsub_callback, - function err_callback -) { - std::lock_guard lg(subscribed_topics_guard_); - if(subscribed_topics_.find(topic) != subscribed_topics_.end()) { + function msg_callback, + function sub_callback, + function unsub_callback, + function err_callback) { + lock_guard lg(subscribed_topics_guard_); + if (subscribed_topics_.find(topic) != subscribed_topics_.end()) { logger_.warning() << "Already subscribed to " << topic << "!"; return; } @@ -175,13 +184,12 @@ void Subscriber::subscribe(const string topic, } void Subscriber::psubscribe(const string topic, - function msg_callback, - function sub_callback, - function unsub_callback, - function err_callback -) { - std::lock_guard lg(psubscribed_topics_guard_); - if(psubscribed_topics_.find(topic) != psubscribed_topics_.end()) { + function msg_callback, + function sub_callback, + function unsub_callback, + function err_callback) { + lock_guard lg(psubscribed_topics_guard_); + if (psubscribed_topics_.find(topic) != psubscribed_topics_.end()) { logger_.warning() << "Already psubscribed to " << topic << "!"; return; } @@ -189,23 +197,19 @@ void Subscriber::psubscribe(const string topic, } void Subscriber::unsubscribeBase(const string cmd_name, const string topic, - function err_callback -) { - rdx_.command({cmd_name, topic}, - [topic, err_callback](Command& c) { - if(!c.ok()) { - if (err_callback) err_callback(topic, c.status()); - return; - } - } - ); + function err_callback) { + rdx_.command({cmd_name, topic}, [topic, err_callback](Command &c) { + if (!c.ok()) { + if (err_callback) + err_callback(topic, c.status()); + return; + } + }); } -void Subscriber::unsubscribe(const string topic, - function err_callback -) { - std::lock_guard lg(subscribed_topics_guard_); - if(subscribed_topics_.find(topic) == subscribed_topics_.end()) { +void Subscriber::unsubscribe(const string topic, function err_callback) { + lock_guard lg(subscribed_topics_guard_); + if (subscribed_topics_.find(topic) == subscribed_topics_.end()) { logger_.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!"; return; } @@ -213,10 +217,9 @@ void Subscriber::unsubscribe(const string topic, } void Subscriber::punsubscribe(const string topic, - function err_callback -) { - std::lock_guard lg(psubscribed_topics_guard_); - if(psubscribed_topics_.find(topic) == psubscribed_topics_.end()) { + function err_callback) { + lock_guard lg(psubscribed_topics_guard_); + if (psubscribed_topics_.find(topic) == psubscribed_topics_.end()) { logger_.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!"; return; } diff --git a/test/test.cpp b/test/test.cpp index bfd168f..c457641 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -35,7 +35,6 @@ using redox::Command; class RedoxTest : public ::testing::Test { protected: - Redox rdx; RedoxTest() {} @@ -57,19 +56,18 @@ protected: mutex cmd_waiter_lock; // To make the callback code nicer - template - using Callback = std::function&)>; + template using Callback = std::function &)>; /** * Helper function that returns a command callback to print out the * command/reply and to test the reply against the provided value. */ - template - Callback check(const ReplyT& value) { + template Callback check(const ReplyT &value) { cmd_count++; - return [this, value](Command& c) { + return [this, value](Command &c) { EXPECT_TRUE(c.ok()); - if(c.ok()) EXPECT_EQ(value, c.reply()); + if (c.ok()) + EXPECT_EQ(value, c.reply()); cmd_count--; cmd_waiter.notify_all(); }; @@ -78,10 +76,10 @@ protected: /** * Wrapper for the callback that also prints out the command. */ - template - Callback print(Callback callback) { - return [callback](Command& c) { - if(c.ok()) cout << "[ASYNC] " << c.cmd() << ": " << c.reply() << endl; + template Callback print(Callback callback) { + return [callback](Command &c) { + if (c.ok()) + cout << "[ASYNC] " << c.cmd() << ": " << c.reply() << endl; callback(c); }; } @@ -89,21 +87,19 @@ protected: /** * Combination of print and check for simplicity. */ - template - Callback print_and_check(const ReplyT& value) { + template Callback print_and_check(const ReplyT &value) { return print(check(value)); } /** * Check the error */ - template - Callback print_and_check_error(const ReplyT& value) { + template Callback print_and_check_error(const ReplyT &value) { cmd_count++; - return [this, value](Command& c) { + return [this, value](Command &c) { EXPECT_FALSE(c.ok()); EXPECT_FALSE(c.lastError().empty()); -// EXPECT_EQ(value, c.reply()); + // EXPECT_EQ(value, c.reply()); cout << c.cmd() << ": " << c.lastError() << endl; cmd_count--; cmd_waiter.notify_all(); @@ -120,15 +116,13 @@ protected: rdx.disconnect(); } - template - void check_sync(Command& c, const ReplyT& value) { + template void check_sync(Command &c, const ReplyT &value) { ASSERT_TRUE(c.ok()); EXPECT_EQ(c.reply(), value); c.free(); } - template - void print_and_check_sync(Command& c, const ReplyT& value) { + template void print_and_check_sync(Command &c, const ReplyT &value) { ASSERT_TRUE(c.ok()); EXPECT_EQ(c.reply(), value); cout << "[SYNC] " << c.cmd() << ": " << c.reply() << endl; @@ -138,12 +132,11 @@ protected: /** * Check the error */ - template - void print_and_check_error_sync(Command& c, const ReplyT& value) { - EXPECT_FALSE(c.ok()); - EXPECT_FALSE(c.lastError().empty()); -// EXPECT_EQ(value, c.reply()); - cout << c.cmd() << ": " << c.lastError() << endl; + template void print_and_check_error_sync(Command &c, const ReplyT &value) { + EXPECT_FALSE(c.ok()); + EXPECT_FALSE(c.lastError().empty()); + // EXPECT_EQ(value, c.reply()); + cout << c.cmd() << ": " << c.lastError() << endl; } }; @@ -151,18 +144,14 @@ protected: // Core unit tests - asynchronous // ------------------------------------------- -TEST_F(RedoxTest, TestConnection) { - EXPECT_TRUE(rdx.connect("localhost", 6379)); -} +TEST_F(RedoxTest, TestConnection) { EXPECT_TRUE(rdx.connect("localhost", 6379)); } -TEST_F(RedoxTest, TestConnectionFailure) { - EXPECT_FALSE(rdx.connect("localhost", 6380)); -} +TEST_F(RedoxTest, TestConnectionFailure) { EXPECT_FALSE(rdx.connect("localhost", 6380)); } TEST_F(RedoxTest, GetSet) { connect(); rdx.command({"SET", "redox_test:a", "apple"}, print_and_check("OK")); - rdx.command({"GET", "redox_test:a"}, print_and_check("apple")); + rdx.command({"GET", "redox_test:a"}, print_and_check("apple")); wait_for_replies(); } @@ -177,8 +166,8 @@ TEST_F(RedoxTest, Delete) { TEST_F(RedoxTest, Incr) { connect(); int count = 100; - for(int i = 0; i < count; i++) { - rdx.command({"INCR", "redox_test:a"}, check(i+1)); + for (int i = 0; i < count; i++) { + rdx.command({"INCR", "redox_test:a"}, check(i + 1)); } rdx.command({"GET", "redox_test:a"}, print_and_check(to_string(count))); wait_for_replies(); @@ -197,12 +186,8 @@ TEST_F(RedoxTest, Loop) { int count = 0; int target_count = 20; double dt = 0.005; - Command& cmd = rdx.commandLoop({"INCR", "redox_test:a"}, - [this, &count](Command& c) { - check(++count)(c); - }, - dt - ); + Command &cmd = rdx.commandLoop( + {"INCR", "redox_test:a"}, [this, &count](Command &c) { check(++count)(c); }, dt); double wait_time = dt * (target_count - 0.5); this_thread::sleep_for(std::chrono::duration(wait_time)); @@ -215,7 +200,7 @@ TEST_F(RedoxTest, Loop) { TEST_F(RedoxTest, GetSetError) { connect(); rdx.command({"SET", "redox_test:a", "apple"}, print_and_check("OK")); - rdx.command({"GET", "redox_test:a"}, print_and_check_error(3)); + rdx.command({"GET", "redox_test:a"}, print_and_check_error(3)); wait_for_replies(); } @@ -241,8 +226,8 @@ TEST_F(RedoxTest, DeleteSync) { TEST_F(RedoxTest, IncrSync) { connect(); int count = 100; - for(int i = 0; i < count; i++) { - check_sync(rdx.commandSync({"INCR", "redox_test:a"}), i+1); + for (int i = 0; i < count; i++) { + check_sync(rdx.commandSync({"INCR", "redox_test:a"}), i + 1); } print_and_check_sync(rdx.commandSync({"GET", "redox_test:a"}), to_string(count)); rdx.disconnect(); @@ -259,7 +244,7 @@ TEST_F(RedoxTest, GetSetSyncError) { // End tests // ------------------------------------------- -} // namespace +} // namespace int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv);