From 7a15a7f2b983c1a8a25f46cd96efefac2e55268b Mon Sep 17 00:00:00 2001 From: Hayk Martirosyan Date: Tue, 27 Jan 2015 15:07:59 -0800 Subject: [PATCH] Separated out include directory, started tutorial --- CMakeLists.txt | 30 +++++++++++++++++++----------- README.md | 43 +++++++++++++++++++++++++++++++------------ include/redox.hpp | 25 +++++++++++++++++++++++++ include/redox/client.hpp | 441 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ include/redox/command.hpp | 172 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ include/redox/subscriber.hpp | 182 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ include/redox/utils/logger.hpp | 83 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/client.cpp | 2 +- src/client.hpp | 441 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/command.hpp | 172 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/redox.hpp | 25 ------------------------- src/subscriber.hpp | 182 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/utils/logger.cpp | 2 +- src/utils/logger.hpp | 83 ----------------------------------------------------------------------------------- 14 files changed, 955 insertions(+), 928 deletions(-) create mode 100644 include/redox.hpp create mode 100644 include/redox/client.hpp create mode 100644 include/redox/command.hpp create mode 100644 include/redox/subscriber.hpp create mode 100644 include/redox/utils/logger.hpp delete mode 100644 src/client.hpp delete mode 100644 src/command.hpp delete mode 100644 src/redox.hpp delete mode 100644 src/subscriber.hpp delete mode 100644 src/utils/logger.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 6eca8c0..e0a7e22 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,23 +26,32 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall") # --------------------------------------------------------- set(SRC_DIR ${CMAKE_SOURCE_DIR}/src) +set(INC_DIR ${CMAKE_SOURCE_DIR}/include/redox) set(SRC_CORE ${SRC_DIR}/client.cpp ${SRC_DIR}/command.cpp - ${SRC_DIR}/subscriber.cpp -) + ${SRC_DIR}/subscriber.cpp) -set(SRC_LOGGER ${SRC_DIR}/utils/logger.cpp) +set(INC_CORE + ${INC_DIR}/client.hpp + ${INC_DIR}/subscriber.hpp + ${INC_DIR}/command.hpp) -set(SRC_ALL ${SRC_CORE} ${SRC_LOGGER}) +set(SRC_UTILS ${SRC_DIR}/utils/logger.cpp) +set(INC_UTILS ${INC_DIR}/utils/logger.hpp) -include_directories(${SRC_DIR}) +set(INC_WRAPPER ${CMAKE_SOURCE_DIR}/include/redox.hpp) + +set(SRC_ALL ${SRC_CORE} ${SRC_UTILS}) +set(INC_ALL ${INC_CORE} ${INC_UTILS} ${INC_WRAPPER}) + +include_directories(${INC_DIR} ${INC_DIR/redox}) # Dependent libraries - you may have to change # pthread to whatever C++11 threads depends on # for your platform -set(REDOX_LIB_DEPS hiredis ev pthread) +set(REDOX_LIB_DEPS ev pthread hiredis) # --------------------------------------------------------- # Library generation @@ -50,7 +59,7 @@ set(REDOX_LIB_DEPS hiredis ev pthread) if (lib) - add_library(redox SHARED ${SRC_ALL}) + add_library(redox SHARED ${SRC_ALL} ${INC_CORE}) target_link_libraries(redox ${REDOX_LIB_DEPS}) set_target_properties(redox @@ -139,17 +148,16 @@ endif() # Install (sudo make install) # --------------------------------------------------------- +set(CMAKE_INSTALL_PREFIX /usr/) + # Install the dynamic library to /usr/lib install(TARGETS redox DESTINATION lib) # Install the headers into /usr/include/redox -set(INC_CORE ${SRC_DIR}/client.hpp ${SRC_DIR}/subscriber.hpp) -set(INC_UTILS ${SRC_DIR}/utils/logger.hpp) install(FILES ${INC_CORE} DESTINATION include/redox) install(FILES ${INC_UTILS} DESTINATION include/redox/utils) -# Install the top-level header into /usr/include directly -set(INC_WRAPPER ${SRC_DIR}/redox.hpp) +# Install the top-level header directly into /usr/include install(FILES ${INC_WRAPPER} DESTINATION include) # --------------------------------------------------------- diff --git a/README.md b/README.md index 2e60aae..838b438 100644 --- a/README.md +++ b/README.md @@ -27,25 +27,23 @@ Redox is built on top of asynchronous API of hiredis, even for synchronous commands. There is no dependency on Boost or any other libraries. -### Performance Benchmarks +## Benchmarks Benchmarks are given by averaging the results of five trials of the speed tests in `examples/` on an AWS t2.medium instance running Ubuntu 14.04 (64-bit). Local Redis server, TCP connection: - * 100 commandLoop calls (`speed_test_async_multi`): **710,014 commands/s** - * One commandLoop call (`speed_test_async`): **195,159 commands/s** - * Looped commandSync call (`speed_test_sync`): **23,609 commands/s** + * 100 command loops (`speed_test_async_multi`): **710,014 commands/s** + * One command loop (`speed_test_async`): **195,159 commands/s** + * Looped synchronous command (`speed_test_sync`): **28,609 commands/s** -Results are comparable to that of an -average laptop. On a high-end laptop or PC, `speed_test_async_multi` usually tops -1 million commands per second. All results are slightly faster if over Unix sockets -than TCP. +Results are comparable to that of an average laptop. On a high-end machine, +`speed_test_async_multi` usually tops 1,000,000 commands/s. -## Install +## Installation Instructions provided are for Ubuntu, but all components are platform-independent. -### Build library from source +#### Build from source Get the build environment and dependencies: sudo apt-get install git cmake build-essential @@ -61,7 +59,7 @@ Install into system directories (optional): sudo make install -### Build examples and test suite +#### Build examples and test suite Enable examples using ccmake or the following: cmake -Dexamples=ON .. @@ -76,4 +74,25 @@ then: ./test_redox ## Tutorial -Coming soon. Take a look at `examples/` for now. +Here is a hello world program for redox: + + #include + #include "redox.hpp" + + int main(int argc, char* argv[]) { + + redox::Redox rdx = {"localhost", 6379}; + if(!rdx.connect()) return 1; + + rdx.set("hello", "world!"); + std::cout << "Hello, " << rdx.get("hello") << std::endl; + + rdx.disconnect(); + return 0; + } + +Compile and run: + + $ g++ hello.cpp -o hello -std=c++11 -lredox -lev -lhiredis + $ ./hello + Hello, world! diff --git a/include/redox.hpp b/include/redox.hpp new file mode 100644 index 0000000..785e693 --- /dev/null +++ b/include/redox.hpp @@ -0,0 +1,25 @@ +/** +* Redox - A modern, asynchronous, and wicked fast C++11 client for Redis +* +* https://github.com/hmartiro/redox +* +* Copyright 2015 - Hayk Martirosyan +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#pragma once + +#include "redox/client.hpp" +#include "redox/command.hpp" +#include "redox/subscriber.hpp" diff --git a/include/redox/client.hpp b/include/redox/client.hpp new file mode 100644 index 0000000..d5f7570 --- /dev/null +++ b/include/redox/client.hpp @@ -0,0 +1,441 @@ +/** +* Redox - A modern, asynchronous, and wicked fast C++11 client for Redis +* +* https://github.com/hmartiro/redox +* +* Copyright 2015 - Hayk Martirosyan +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "utils/logger.hpp" +#include "command.hpp" + +namespace redox { + +static const std::string REDIS_DEFAULT_HOST = "localhost"; +static const int REDIS_DEFAULT_PORT = 6379; + +/** +* Redox intro here. +*/ +class Redox { + +public: + + // Connection states + static const int NOT_YET_CONNECTED = 0; + static const int CONNECTED = 1; + static const int DISCONNECTED = 2; + static const int CONNECT_ERROR = 3; + static const int DISCONNECT_ERROR = 4; + + // ------------------------------------------------ + // Core public API + // ------------------------------------------------ + + /** + * Initializes everything, connects over TCP to a Redis server. + */ + Redox( + const std::string& host = REDIS_DEFAULT_HOST, + const int port = REDIS_DEFAULT_PORT, + std::function connection_callback = nullptr, + std::ostream& log_stream = std::cout, + log::Level log_level = log::Warning + ); + + /** + * Initializes everything, connects over unix sockets to a Redis server. + */ + Redox( + const std::string& path, + std::function connection_callback, + std::ostream& log_stream = std::cout, + log::Level log_level = log::Warning + ); + + /** + * Disconnects from the Redis server, shuts down the event loop, and cleans up. + * Internally calls disconnect() and wait(). + */ + ~Redox(); + + /** + * Connects to Redis and starts an event loop in a separate thread. Returns + * true once everything is ready, or false on failure. + */ + bool connect(); + + /** + * Disconnect from Redis, shut down the event loop, then return. A simple + * combination of .stop() and .wait(). + */ + void disconnect(); + + /** + * Signal the event loop thread to disconnect from Redis and shut down. + */ + void stop(); + + /** + * Blocks until the event loop exits and disconnection is complete, then returns. + * Usually no need to call manually as it is handled in the destructor. + */ + void wait(); + + /** + * Asynchronously runs a command and invokes the callback when a reply is + * received or there is an error. The callback is guaranteed to be invoked + * exactly once. The Command object is provided to the callback, and the + * memory for it is automatically freed when the callback returns. + */ + template + void command( + const std::string& cmd, + const std::function&)>& callback = nullptr + ); + + /** + * Asynchronously runs a command and ignores any errors or replies. + */ + void command(const std::string& cmd); + + /** + * Synchronously runs a command, returning the Command object only once + * a reply is received or there is an error. The user is responsible for + * calling .free() on the returned Command object. + */ + template + Command& commandSync(const std::string& cmd); + + /** + * Synchronously runs a command, returning only once a reply is received + * or there's an error. Returns true on successful reply, false on error. + */ + bool commandSync(const std::string& cmd); + + /** + * Creates an asynchronous command that is run every [repeat] seconds, + * with the first one run in [after] seconds. If [repeat] is 0, the + * command is run only once. The user is responsible for calling .free() + * on the returned Command object. + */ + template + Command& commandLoop( + const std::string& cmd, + const std::function&)>& callback, + double repeat, + double after = 0.0 + ); + + /** + * Creates an asynchronous command that is run once after a given + * delay. The callback is invoked exactly once on a successful reply + * or error, and the Command object memory is automatically freed + * after the callback returns. + */ + template + void commandDelayed( + const std::string& cmd, + const std::function&)>& callback, + double after + ); + + // ------------------------------------------------ + // Wrapper methods for convenience only + // ------------------------------------------------ + + /** + * Redis GET command wrapper - return the value for the given key, or throw + * an exception if there is an error. Blocking call. + */ + std::string get(const std::string& key); + + /** + * Redis SET command wrapper - set the value for the given key. Return + * true if succeeded, false if error. Blocking call. + */ + bool set(const std::string& key, const std::string& value); + + /** + * Redis DEL command wrapper - delete the given key. Return true if succeeded, + * false if error. Blocking call. + */ + bool del(const std::string& key); + + /** + * Redis PUBLISH command wrapper - publish the given message to all subscribers. + * Non-blocking call. + */ + void publish(const std::string& topic, const std::string& msg); + + // ------------------------------------------------ + // Public members + // ------------------------------------------------ + + // Hiredis context, left public to allow low-level access + redisAsyncContext * ctx_; + + // Redox server over TCP + const std::string host_; + const int port_; + + // Redox server over unix + const std::string path_; + + // Logger + log::Logger logger_; + +private: + + // ------------------------------------------------ + // Private methods + // ------------------------------------------------ + + // One stop shop for creating commands. The base of all public + // methods that run commands. + template + Command& createCommand( + const std::string& cmd, + const std::function&)>& callback = nullptr, + double repeat = 0.0, + double after = 0.0, + bool free_memory = true + ); + + // Setup code for the constructors + void init_ev(); + void init_hiredis(); + + // Callbacks invoked on server connection/disconnection + static void connectedCallback(const redisAsyncContext* c, int status); + static void disconnectedCallback(const redisAsyncContext* c, int status); + + // Main event loop, run in a separate thread + void runEventLoop(); + + // Return the command map corresponding to the templated reply type + template + std::unordered_map*>& getCommandMap(); + + // Return the given Command from the relevant command map, or nullptr if not there + template + Command* findCommand(long id); + + // Send all commands in the command queue to the server + static void processQueuedCommands(struct ev_loop* loop, ev_async* async, int revents); + + // Process the command with the given ID. Return true if the command had the + // templated type, and false if it was not in the command map of that type. + template + bool processQueuedCommand(long id); + + // Callback given to libev for a Command's timer watcher, to be processed in + // a deferred or looping state + template + static void submitCommandCallback(struct ev_loop* loop, ev_timer* timer, int revents); + + // Submit an asynchronous command to the Redox server. Return + // true if succeeded, false otherwise. + template + static bool submitToServer(Command* c); + + // Callback given to hiredis to invoke when a reply is received + template + static void commandCallback(redisAsyncContext* ctx, void* r, void* privdata); + + // Free all commands in the commands_to_free_ queue + static void freeQueuedCommands(struct ev_loop* loop, ev_async* async, int revents); + + // Free the command with the given ID. Return true if the command had the templated + // type, and false if it was not in the command map of that type. + template + bool freeQueuedCommand(long id); + + // Invoked by Command objects when they are completed. Removes them + // from the command map. + template + void deregisterCommand(const long id) { + std::lock_guard lg1(command_map_guard_); + getCommandMap().erase(id); + commands_deleted_ += 1; + } + + // Free all commands remaining in the command maps + long freeAllCommands(); + + // Helper function for freeAllCommands to access a specific command map + template + long freeAllCommandsOfType(); + + // ------------------------------------------------ + // Private members + // ------------------------------------------------ + + // Manage connection state + std::atomic_int connect_state_ = {NOT_YET_CONNECTED}; + std::mutex connect_lock_; + std::condition_variable connect_waiter_; + + // User connect/disconnect callbacks + std::function user_connection_callback_; + + // Dynamically allocated libev event loop + struct ev_loop* evloop_; + + // Asynchronous watchers + ev_async watcher_command_; // For processing commands + ev_async watcher_stop_; // For breaking the loop + ev_async watcher_free_; // For freeing commands + + // Track of Command objects allocated. Also provides unique Command IDs. + std::atomic_long commands_created_ = {0}; + std::atomic_long commands_deleted_ = {0}; + + // Separate thread to have a non-blocking event loop + std::thread event_loop_thread_; + + // Variable and CV to know when the event loop starts running + std::atomic_bool running_ = {false}; + std::mutex running_waiter_lock_; + std::condition_variable running_waiter_; + + // Variable and CV to know when the event loop stops running + std::atomic_bool to_exit_ = {false}; // Signal to exit + std::atomic_bool exited_ = {false}; // Event thread exited + std::mutex exit_waiter_lock_; + std::condition_variable exit_waiter_; + + // Maps of each Command, fetchable by the unique ID number + // In C++14, member variable templates will replace all of these types + // with a single templated declaration + // --------- + // template + // std::unordered_map*> commands_; + // --------- + std::unordered_map*> commands_redis_reply_; + std::unordered_map*> commands_string_; + std::unordered_map*> commands_char_p_; + std::unordered_map*> commands_int_; + std::unordered_map*> commands_long_long_int_; + std::unordered_map*> commands_null_; + std::unordered_map>*> commands_vector_string_; + std::unordered_map>*> commands_set_string_; + std::unordered_map>*> commands_unordered_set_string_; + std::mutex command_map_guard_; // Guards access to all of the above + + // Command IDs pending to be sent to the server + std::queue command_queue_; + std::mutex queue_guard_; + + // Commands IDs pending to be freed by the event loop + std::queue commands_to_free_; + std::mutex free_queue_guard_; + + // Commands use this method to deregister themselves from Redox, + // give it access to private members + template + friend void Command::free(); +}; + +// ------------------------------------------------ +// Implementation of templated methods +// ------------------------------------------------ + +template +Command& Redox::createCommand( + const std::string& cmd, + const std::function&)>& callback, + double repeat, + double after, + bool free_memory +) { + + if(!running_) { + throw std::runtime_error("[ERROR] Need to connect Redox before running commands!"); + } + + commands_created_ += 1; + auto* c = new Command( + this, commands_created_, cmd, + callback, repeat, after, free_memory, logger_ + ); + + std::lock_guard lg(queue_guard_); + std::lock_guard lg2(command_map_guard_); + + getCommandMap()[c->id_] = c; + command_queue_.push(c->id_); + + // Signal the event loop to process this command + ev_async_send(evloop_, &watcher_command_); + + return *c; +} + +template +void Redox::command( + const std::string& cmd, + const std::function&)>& callback +) { + createCommand(cmd, callback); +} + +template +Command& Redox::commandLoop( + const std::string& cmd, + const std::function&)>& callback, + double repeat, + double after +) { + return createCommand(cmd, callback, repeat, after, false); +} + +template +void Redox::commandDelayed( + const std::string& cmd, + const std::function&)>& callback, + double after +) { + createCommand(cmd, callback, 0, after, true); +} + +template +Command& Redox::commandSync(const std::string& cmd) { + auto& c = createCommand(cmd, nullptr, 0, 0, false); + c.wait(); + return c; +} + +} // End namespace redis diff --git a/include/redox/command.hpp b/include/redox/command.hpp new file mode 100644 index 0000000..6c574a7 --- /dev/null +++ b/include/redox/command.hpp @@ -0,0 +1,172 @@ +/** +* Redox - A modern, asynchronous, and wicked fast C++11 client for Redis +* +* https://github.com/hmartiro/redox +* +* Copyright 2015 - Hayk Martirosyan +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +#include "utils/logger.hpp" + +namespace redox { + +class Redox; + +/** +* The Command class represents a single command string to be sent to +* a Redis server, for both synchronous and asynchronous usage. It manages +* all of the state relevant to a single command string. A Command can also +* represent a deferred or looping command, in which case the success or +* error callbacks are invoked more than once. +*/ +template +class Command { + +public: + + // Reply codes + static const int NO_REPLY = -1; // No reply yet + static const int OK_REPLY = 0; // Successful reply of the expected type + static const int NIL_REPLY = 1; // Got a nil reply + static const int ERROR_REPLY = 2; // Got an error reply + static const int SEND_ERROR = 3; // Could not send to server + static const int WRONG_TYPE = 4; // Got reply, but it was not the expected type + static const int TIMEOUT = 5; // No reply, timed out + + /** + * Returns the reply status of this command. + */ + int status() const { return reply_status_; } + + /** + * Returns true if this command got a successful reply. + */ + bool ok() const { return reply_status_ == OK_REPLY; } + + /** + * Returns the reply value, if the reply was successful (ok() == true). + */ + ReplyT reply(); + + /** + * Tells the event loop to free memory for this command. The user is + * responsible for calling this on synchronous or looping commands, + * AKA when free_memory_ = false. + */ + void free(); + + /** + * This method returns once this command's callback has been invoked + * (or would have been invoked if there is none) since the last call + * to wait(). If it is the first call, then returns once the callback + * is invoked for the first time. + */ + void wait(); + + /** + * Returns the command string represented by this object. + */ + const std::string& cmd() const { return cmd_; }; + + // Allow public access to constructed data + Redox* const rdx_; + const long id_; + const std::string cmd_; + const double repeat_; + const double after_; + const bool free_memory_; + +private: + + Command( + Redox* rdx, + long id, + const std::string& cmd, + const std::function&)>& callback, + double repeat, double after, + bool free_memory, + log::Logger& logger + ); + + // Handles a new reply from the server + void processReply(redisReply* r); + + // Invoke a user callback from the reply object. This method is specialized + // for each ReplyT of Command. + void parseReplyObject(); + + // Directly invoke the user callback if it exists + void invoke() { if(callback_) callback_(*this); } + + bool checkErrorReply(); + bool checkNilReply(); + bool isExpectedReply(int type); + bool isExpectedReply(int typeA, int typeB); + + // If needed, free the redisReply + void freeReply(); + + // The last server reply + redisReply* reply_obj_ = nullptr; + + // User callback + const std::function&)> callback_; + + // Place to store the reply value and status. + ReplyT reply_val_; + std::atomic_int reply_status_; + + // How many messages sent to server but not received reply + std::atomic_int pending_ = {0}; + + // Whether a repeating or delayed command is canceled + std::atomic_bool canceled_ = {false}; + + // libev timer watcher + ev_timer timer_; + std::mutex timer_guard_; + + // Access the reply value only when not being changed + std::mutex reply_guard_; + + // For synchronous use + std::condition_variable waiter_; + std::mutex waiter_lock_; + std::atomic_bool waiting_done_ = {false}; + + // Passed on from Redox class + log::Logger& logger_; + + // Explicitly delete copy constructor and assignment operator, + // Command objects should never be copied because they hold + // state with a network resource. + Command(const Command&) = delete; + Command& operator=(const Command&) = delete; + + friend class Redox; +}; + +} // End namespace redis diff --git a/include/redox/subscriber.hpp b/include/redox/subscriber.hpp new file mode 100644 index 0000000..bbb444b --- /dev/null +++ b/include/redox/subscriber.hpp @@ -0,0 +1,182 @@ +/** +* Redox - A modern, asynchronous, and wicked fast C++11 client for Redis +* +* https://github.com/hmartiro/redox +* +* Copyright 2015 - Hayk Martirosyan +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#pragma once + +#include "client.hpp" + +namespace redox { + +class Subscriber { + +public: + + /** + * Initializes everything, connects over TCP to a Redis server. + */ + Subscriber( + const std::string& host = REDIS_DEFAULT_HOST, + const int port = REDIS_DEFAULT_PORT, + std::function connection_callback = nullptr, + std::ostream& log_stream = std::cout, + log::Level log_level = log::Warning + ); + + /** + * Initializes everything, connects over unix sockets to a Redis server. + */ + Subscriber( + const std::string& path, + std::function connection_callback, + std::ostream& log_stream = std::cout, + log::Level log_level = log::Warning + ); + + /** + * Cleans up. + */ + ~Subscriber(); + + /** + * Same as .connect() on a Redox instance. + */ + bool connect() { return rdx_.connect(); } + + /** + * Same as .stop() on a Redox instance. + */ + void stop(); + + /** + * Same as .disconnect() on a Redox instance. + */ + void disconnect(); + + /** + * Same as .wait() on a Redox instance. + */ + void wait(); + + /** + * Subscribe to a topic. + * + * msg_callback: invoked whenever a message is received. + * sub_callback: invoked when successfully subscribed + * err_callback: invoked on some error state + */ + void subscribe(const std::string topic, + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr + ); + + /** + * Subscribe to a topic with a pattern. + * + * msg_callback: invoked whenever a message is received. + * sub_callback: invoked when successfully subscribed + * err_callback: invoked on some error state + */ + void psubscribe(const std::string topic, + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr + ); + + /** + * Unsubscribe from a topic. + * + * err_callback: invoked on some error state + */ + void unsubscribe(const std::string topic, + std::function err_callback = nullptr + ); + + /** + * Unsubscribe from a topic with a pattern. + * + * err_callback: invoked on some error state + */ + void punsubscribe(const std::string topic, + std::function err_callback = nullptr + ); + + /** + * Return the topics that are subscribed() to. + */ + std::set subscribedTopics() { + std::lock_guard lg(subscribed_topics_guard_); + return subscribed_topics_; + } + + /** + * Return the topic patterns that are psubscribed() to. + */ + std::set psubscribedTopics() { + std::lock_guard lg(psubscribed_topics_guard_); + return psubscribed_topics_; + } + +private: + + // Base for subscribe and psubscribe + void subscribeBase(const std::string cmd_name, const std::string topic, + std::function msg_callback, + std::function sub_callback = nullptr, + std::function unsub_callback = nullptr, + std::function err_callback = nullptr + ); + + // Base for unsubscribe and punsubscribe + void unsubscribeBase(const std::string cmd_name, const std::string topic, + std::function err_callback = nullptr + ); + + // Underlying Redis client + Redox rdx_; + + // Keep track of topics because we can only unsubscribe + // from subscribed topics and punsubscribe from + // psubscribed topics, or hiredis leads to segfaults + std::set subscribed_topics_; + std::mutex subscribed_topics_guard_; + + std::set psubscribed_topics_; + std::mutex psubscribed_topics_guard_; + + // Set of persisting commands, so that we can cancel them + std::set*> commands_; + + // Reference to rdx_.logger_ for convenience + log::Logger& logger_; + + // CVs to wait for unsubscriptions + std::condition_variable cv_unsub_; + std::mutex cv_unsub_guard_; + std::condition_variable cv_punsub_; + std::mutex cv_punsub_guard_; + + // Pending subscriptions + std::atomic_int num_pending_subs_ = {0}; +}; + +} // End namespace diff --git a/include/redox/utils/logger.hpp b/include/redox/utils/logger.hpp new file mode 100644 index 0000000..14b762b --- /dev/null +++ b/include/redox/utils/logger.hpp @@ -0,0 +1,83 @@ +/** +* Simple stream-based logger for C++11. +* +* Adapted from +* http://vilipetek.com/2014/04/17/thread-safe-simple-logger-in-c11/ +*/ + +#pragma once + +#include +#include +#include +#include +#include + +namespace redox { +namespace log { + +// Log message levels +enum Level { + Trace, Debug, Info, Warning, Error, Fatal, Off +}; + +// Forward declaration +class Logger; + +// -------------------- +// Logstream +// -------------------- + +class Logstream : public std::ostringstream { +public: + Logstream(Logger &logger, Level l); + Logstream(const Logstream &ls); + ~Logstream(); +private: + Logger &m_logger; + Level m_loglevel; +}; + +// -------------------- +// Logger +// -------------------- + +class Logger { +public: + + Logger(std::string filename, Level loglevel = Level::Info); + Logger(std::ostream &outfile, Level loglevel = Level::Info); + + virtual ~Logger(); + + void level(Level l) { m_loglevel = l; } + Level level() { return m_loglevel; } + + void log(Level l, std::string oMessage); + + Logstream operator()(Level l = Level::Info) { return Logstream(*this, l); } + + // Helpers + Logstream trace() { return (*this)(Level::Trace); } + Logstream debug() { return (*this)(Level::Debug); } + Logstream info() { return (*this)(Level::Info); } + Logstream warning() { return (*this)(Level::Warning); } + Logstream error() { return (*this)(Level::Error); } + Logstream fatal() { return (*this)(Level::Fatal); } + +private: + const tm *getLocalTime(); + +private: + std::mutex m_lock; + + std::ofstream m_file; + std::ostream &m_stream; + + tm m_time; + + Level m_loglevel; +}; + +} // End namespace +} // End namespace diff --git a/src/client.cpp b/src/client.cpp index 3364b4e..a209f2a 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -19,7 +19,7 @@ */ #include -#include "redox.hpp" +#include "client.hpp" using namespace std; diff --git a/src/client.hpp b/src/client.hpp deleted file mode 100644 index c00c48c..0000000 --- a/src/client.hpp +++ /dev/null @@ -1,441 +0,0 @@ -/** -* Redox - A modern, asynchronous, and wicked fast C++11 client for Redis -* -* https://github.com/hmartiro/redox -* -* Copyright 2015 - Hayk Martirosyan -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#pragma once - -#include -#include - -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include -#include -#include - -#include "utils/logger.hpp" -#include "command.hpp" - -namespace redox { - -static const std::string REDIS_DEFAULT_HOST = "localhost"; -static const int REDIS_DEFAULT_PORT = 6379; - -/** -* Redox intro here. -*/ -class Redox { - -public: - - // Connection states - static const int NOT_YET_CONNECTED = 0; - static const int CONNECTED = 1; - static const int DISCONNECTED = 2; - static const int CONNECT_ERROR = 3; - static const int DISCONNECT_ERROR = 4; - - // ------------------------------------------------ - // Core public API - // ------------------------------------------------ - - /** - * Initializes everything, connects over TCP to a Redis server. - */ - Redox( - const std::string& host = REDIS_DEFAULT_HOST, - const int port = REDIS_DEFAULT_PORT, - std::function connection_callback = nullptr, - std::ostream& log_stream = std::cout, - log::Level log_level = log::Info - ); - - /** - * Initializes everything, connects over unix sockets to a Redis server. - */ - Redox( - const std::string& path, - std::function connection_callback, - std::ostream& log_stream = std::cout, - log::Level log_level = log::Info - ); - - /** - * Disconnects from the Redis server, shuts down the event loop, and cleans up. - * Internally calls disconnect() and wait(). - */ - ~Redox(); - - /** - * Connects to Redis and starts an event loop in a separate thread. Returns - * true once everything is ready, or false on failure. - */ - bool connect(); - - /** - * Disconnect from Redis, shut down the event loop, then return. A simple - * combination of .stop() and .wait(). - */ - void disconnect(); - - /** - * Signal the event loop thread to disconnect from Redis and shut down. - */ - void stop(); - - /** - * Blocks until the event loop exits and disconnection is complete, then returns. - * Usually no need to call manually as it is handled in the destructor. - */ - void wait(); - - /** - * Asynchronously runs a command and invokes the callback when a reply is - * received or there is an error. The callback is guaranteed to be invoked - * exactly once. The Command object is provided to the callback, and the - * memory for it is automatically freed when the callback returns. - */ - template - void command( - const std::string& cmd, - const std::function&)>& callback = nullptr - ); - - /** - * Asynchronously runs a command and ignores any errors or replies. - */ - void command(const std::string& cmd); - - /** - * Synchronously runs a command, returning the Command object only once - * a reply is received or there is an error. The user is responsible for - * calling .free() on the returned Command object. - */ - template - Command& commandSync(const std::string& cmd); - - /** - * Synchronously runs a command, returning only once a reply is received - * or there's an error. Returns true on successful reply, false on error. - */ - bool commandSync(const std::string& cmd); - - /** - * Creates an asynchronous command that is run every [repeat] seconds, - * with the first one run in [after] seconds. If [repeat] is 0, the - * command is run only once. The user is responsible for calling .free() - * on the returned Command object. - */ - template - Command& commandLoop( - const std::string& cmd, - const std::function&)>& callback, - double repeat, - double after = 0.0 - ); - - /** - * Creates an asynchronous command that is run once after a given - * delay. The callback is invoked exactly once on a successful reply - * or error, and the Command object memory is automatically freed - * after the callback returns. - */ - template - void commandDelayed( - const std::string& cmd, - const std::function&)>& callback, - double after - ); - - // ------------------------------------------------ - // Wrapper methods for convenience only - // ------------------------------------------------ - - /** - * Redis GET command wrapper - return the value for the given key, or throw - * an exception if there is an error. Blocking call. - */ - std::string get(const std::string& key); - - /** - * Redis SET command wrapper - set the value for the given key. Return - * true if succeeded, false if error. Blocking call. - */ - bool set(const std::string& key, const std::string& value); - - /** - * Redis DEL command wrapper - delete the given key. Return true if succeeded, - * false if error. Blocking call. - */ - bool del(const std::string& key); - - /** - * Redis PUBLISH command wrapper - publish the given message to all subscribers. - * Non-blocking call. - */ - void publish(const std::string& topic, const std::string& msg); - - // ------------------------------------------------ - // Public members - // ------------------------------------------------ - - // Hiredis context, left public to allow low-level access - redisAsyncContext * ctx_; - - // Redox server over TCP - const std::string host_; - const int port_; - - // Redox server over unix - const std::string path_; - - // Logger - log::Logger logger_; - -private: - - // ------------------------------------------------ - // Private methods - // ------------------------------------------------ - - // One stop shop for creating commands. The base of all public - // methods that run commands. - template - Command& createCommand( - const std::string& cmd, - const std::function&)>& callback = nullptr, - double repeat = 0.0, - double after = 0.0, - bool free_memory = true - ); - - // Setup code for the constructors - void init_ev(); - void init_hiredis(); - - // Callbacks invoked on server connection/disconnection - static void connectedCallback(const redisAsyncContext* c, int status); - static void disconnectedCallback(const redisAsyncContext* c, int status); - - // Main event loop, run in a separate thread - void runEventLoop(); - - // Return the command map corresponding to the templated reply type - template - std::unordered_map*>& getCommandMap(); - - // Return the given Command from the relevant command map, or nullptr if not there - template - Command* findCommand(long id); - - // Send all commands in the command queue to the server - static void processQueuedCommands(struct ev_loop* loop, ev_async* async, int revents); - - // Process the command with the given ID. Return true if the command had the - // templated type, and false if it was not in the command map of that type. - template - bool processQueuedCommand(long id); - - // Callback given to libev for a Command's timer watcher, to be processed in - // a deferred or looping state - template - static void submitCommandCallback(struct ev_loop* loop, ev_timer* timer, int revents); - - // Submit an asynchronous command to the Redox server. Return - // true if succeeded, false otherwise. - template - static bool submitToServer(Command* c); - - // Callback given to hiredis to invoke when a reply is received - template - static void commandCallback(redisAsyncContext* ctx, void* r, void* privdata); - - // Free all commands in the commands_to_free_ queue - static void freeQueuedCommands(struct ev_loop* loop, ev_async* async, int revents); - - // Free the command with the given ID. Return true if the command had the templated - // type, and false if it was not in the command map of that type. - template - bool freeQueuedCommand(long id); - - // Invoked by Command objects when they are completed. Removes them - // from the command map. - template - void deregisterCommand(const long id) { - std::lock_guard lg1(command_map_guard_); - getCommandMap().erase(id); - commands_deleted_ += 1; - } - - // Free all commands remaining in the command maps - long freeAllCommands(); - - // Helper function for freeAllCommands to access a specific command map - template - long freeAllCommandsOfType(); - - // ------------------------------------------------ - // Private members - // ------------------------------------------------ - - // Manage connection state - std::atomic_int connect_state_ = {NOT_YET_CONNECTED}; - std::mutex connect_lock_; - std::condition_variable connect_waiter_; - - // User connect/disconnect callbacks - std::function user_connection_callback_; - - // Dynamically allocated libev event loop - struct ev_loop* evloop_; - - // Asynchronous watchers - ev_async watcher_command_; // For processing commands - ev_async watcher_stop_; // For breaking the loop - ev_async watcher_free_; // For freeing commands - - // Track of Command objects allocated. Also provides unique Command IDs. - std::atomic_long commands_created_ = {0}; - std::atomic_long commands_deleted_ = {0}; - - // Separate thread to have a non-blocking event loop - std::thread event_loop_thread_; - - // Variable and CV to know when the event loop starts running - std::atomic_bool running_ = {false}; - std::mutex running_waiter_lock_; - std::condition_variable running_waiter_; - - // Variable and CV to know when the event loop stops running - std::atomic_bool to_exit_ = {false}; // Signal to exit - std::atomic_bool exited_ = {false}; // Event thread exited - std::mutex exit_waiter_lock_; - std::condition_variable exit_waiter_; - - // Maps of each Command, fetchable by the unique ID number - // In C++14, member variable templates will replace all of these types - // with a single templated declaration - // --------- - // template - // std::unordered_map*> commands_; - // --------- - std::unordered_map*> commands_redis_reply_; - std::unordered_map*> commands_string_; - std::unordered_map*> commands_char_p_; - std::unordered_map*> commands_int_; - std::unordered_map*> commands_long_long_int_; - std::unordered_map*> commands_null_; - std::unordered_map>*> commands_vector_string_; - std::unordered_map>*> commands_set_string_; - std::unordered_map>*> commands_unordered_set_string_; - std::mutex command_map_guard_; // Guards access to all of the above - - // Command IDs pending to be sent to the server - std::queue command_queue_; - std::mutex queue_guard_; - - // Commands IDs pending to be freed by the event loop - std::queue commands_to_free_; - std::mutex free_queue_guard_; - - // Commands use this method to deregister themselves from Redox, - // give it access to private members - template - friend void Command::free(); -}; - -// ------------------------------------------------ -// Implementation of templated methods -// ------------------------------------------------ - -template -Command& Redox::createCommand( - const std::string& cmd, - const std::function&)>& callback, - double repeat, - double after, - bool free_memory -) { - - if(!running_) { - throw std::runtime_error("[ERROR] Need to connect Redox before running commands!"); - } - - commands_created_ += 1; - auto* c = new Command( - this, commands_created_, cmd, - callback, repeat, after, free_memory, logger_ - ); - - std::lock_guard lg(queue_guard_); - std::lock_guard lg2(command_map_guard_); - - getCommandMap()[c->id_] = c; - command_queue_.push(c->id_); - - // Signal the event loop to process this command - ev_async_send(evloop_, &watcher_command_); - - return *c; -} - -template -void Redox::command( - const std::string& cmd, - const std::function&)>& callback -) { - createCommand(cmd, callback); -} - -template -Command& Redox::commandLoop( - const std::string& cmd, - const std::function&)>& callback, - double repeat, - double after -) { - return createCommand(cmd, callback, repeat, after, false); -} - -template -void Redox::commandDelayed( - const std::string& cmd, - const std::function&)>& callback, - double after -) { - createCommand(cmd, callback, 0, after, true); -} - -template -Command& Redox::commandSync(const std::string& cmd) { - auto& c = createCommand(cmd, nullptr, 0, 0, false); - c.wait(); - return c; -} - -} // End namespace redis diff --git a/src/command.hpp b/src/command.hpp deleted file mode 100644 index 6c574a7..0000000 --- a/src/command.hpp +++ /dev/null @@ -1,172 +0,0 @@ -/** -* Redox - A modern, asynchronous, and wicked fast C++11 client for Redis -* -* https://github.com/hmartiro/redox -* -* Copyright 2015 - Hayk Martirosyan -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#pragma once - -#include -#include -#include -#include -#include - -#include -#include - -#include "utils/logger.hpp" - -namespace redox { - -class Redox; - -/** -* The Command class represents a single command string to be sent to -* a Redis server, for both synchronous and asynchronous usage. It manages -* all of the state relevant to a single command string. A Command can also -* represent a deferred or looping command, in which case the success or -* error callbacks are invoked more than once. -*/ -template -class Command { - -public: - - // Reply codes - static const int NO_REPLY = -1; // No reply yet - static const int OK_REPLY = 0; // Successful reply of the expected type - static const int NIL_REPLY = 1; // Got a nil reply - static const int ERROR_REPLY = 2; // Got an error reply - static const int SEND_ERROR = 3; // Could not send to server - static const int WRONG_TYPE = 4; // Got reply, but it was not the expected type - static const int TIMEOUT = 5; // No reply, timed out - - /** - * Returns the reply status of this command. - */ - int status() const { return reply_status_; } - - /** - * Returns true if this command got a successful reply. - */ - bool ok() const { return reply_status_ == OK_REPLY; } - - /** - * Returns the reply value, if the reply was successful (ok() == true). - */ - ReplyT reply(); - - /** - * Tells the event loop to free memory for this command. The user is - * responsible for calling this on synchronous or looping commands, - * AKA when free_memory_ = false. - */ - void free(); - - /** - * This method returns once this command's callback has been invoked - * (or would have been invoked if there is none) since the last call - * to wait(). If it is the first call, then returns once the callback - * is invoked for the first time. - */ - void wait(); - - /** - * Returns the command string represented by this object. - */ - const std::string& cmd() const { return cmd_; }; - - // Allow public access to constructed data - Redox* const rdx_; - const long id_; - const std::string cmd_; - const double repeat_; - const double after_; - const bool free_memory_; - -private: - - Command( - Redox* rdx, - long id, - const std::string& cmd, - const std::function&)>& callback, - double repeat, double after, - bool free_memory, - log::Logger& logger - ); - - // Handles a new reply from the server - void processReply(redisReply* r); - - // Invoke a user callback from the reply object. This method is specialized - // for each ReplyT of Command. - void parseReplyObject(); - - // Directly invoke the user callback if it exists - void invoke() { if(callback_) callback_(*this); } - - bool checkErrorReply(); - bool checkNilReply(); - bool isExpectedReply(int type); - bool isExpectedReply(int typeA, int typeB); - - // If needed, free the redisReply - void freeReply(); - - // The last server reply - redisReply* reply_obj_ = nullptr; - - // User callback - const std::function&)> callback_; - - // Place to store the reply value and status. - ReplyT reply_val_; - std::atomic_int reply_status_; - - // How many messages sent to server but not received reply - std::atomic_int pending_ = {0}; - - // Whether a repeating or delayed command is canceled - std::atomic_bool canceled_ = {false}; - - // libev timer watcher - ev_timer timer_; - std::mutex timer_guard_; - - // Access the reply value only when not being changed - std::mutex reply_guard_; - - // For synchronous use - std::condition_variable waiter_; - std::mutex waiter_lock_; - std::atomic_bool waiting_done_ = {false}; - - // Passed on from Redox class - log::Logger& logger_; - - // Explicitly delete copy constructor and assignment operator, - // Command objects should never be copied because they hold - // state with a network resource. - Command(const Command&) = delete; - Command& operator=(const Command&) = delete; - - friend class Redox; -}; - -} // End namespace redis diff --git a/src/redox.hpp b/src/redox.hpp deleted file mode 100644 index 07791b0..0000000 --- a/src/redox.hpp +++ /dev/null @@ -1,25 +0,0 @@ -/** -* Redox - A modern, asynchronous, and wicked fast C++11 client for Redis -* -* https://github.com/hmartiro/redox -* -* Copyright 2015 - Hayk Martirosyan -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#pragma once - -#include "client.hpp" -#include "command.hpp" -#include "subscriber.hpp" diff --git a/src/subscriber.hpp b/src/subscriber.hpp deleted file mode 100644 index 4eb3dbc..0000000 --- a/src/subscriber.hpp +++ /dev/null @@ -1,182 +0,0 @@ -/** -* Redox - A modern, asynchronous, and wicked fast C++11 client for Redis -* -* https://github.com/hmartiro/redox -* -* Copyright 2015 - Hayk Martirosyan -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#pragma once - -#include "client.hpp" - -namespace redox { - -class Subscriber { - -public: - - /** - * Initializes everything, connects over TCP to a Redis server. - */ - Subscriber( - const std::string& host = REDIS_DEFAULT_HOST, - const int port = REDIS_DEFAULT_PORT, - std::function connection_callback = nullptr, - std::ostream& log_stream = std::cout, - log::Level log_level = log::Info - ); - - /** - * Initializes everything, connects over unix sockets to a Redis server. - */ - Subscriber( - const std::string& path, - std::function connection_callback, - std::ostream& log_stream = std::cout, - log::Level log_level = log::Info - ); - - /** - * Cleans up. - */ - ~Subscriber(); - - /** - * Same as .connect() on a Redox instance. - */ - bool connect() { return rdx_.connect(); } - - /** - * Same as .stop() on a Redox instance. - */ - void stop(); - - /** - * Same as .disconnect() on a Redox instance. - */ - void disconnect(); - - /** - * Same as .wait() on a Redox instance. - */ - void wait(); - - /** - * Subscribe to a topic. - * - * msg_callback: invoked whenever a message is received. - * sub_callback: invoked when successfully subscribed - * err_callback: invoked on some error state - */ - void subscribe(const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr - ); - - /** - * Subscribe to a topic with a pattern. - * - * msg_callback: invoked whenever a message is received. - * sub_callback: invoked when successfully subscribed - * err_callback: invoked on some error state - */ - void psubscribe(const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr - ); - - /** - * Unsubscribe from a topic. - * - * err_callback: invoked on some error state - */ - void unsubscribe(const std::string topic, - std::function err_callback = nullptr - ); - - /** - * Unsubscribe from a topic with a pattern. - * - * err_callback: invoked on some error state - */ - void punsubscribe(const std::string topic, - std::function err_callback = nullptr - ); - - /** - * Return the topics that are subscribed() to. - */ - std::set subscribedTopics() { - std::lock_guard lg(subscribed_topics_guard_); - return subscribed_topics_; - } - - /** - * Return the topic patterns that are psubscribed() to. - */ - std::set psubscribedTopics() { - std::lock_guard lg(psubscribed_topics_guard_); - return psubscribed_topics_; - } - -private: - - // Base for subscribe and psubscribe - void subscribeBase(const std::string cmd_name, const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr - ); - - // Base for unsubscribe and punsubscribe - void unsubscribeBase(const std::string cmd_name, const std::string topic, - std::function err_callback = nullptr - ); - - // Underlying Redis client - Redox rdx_; - - // Keep track of topics because we can only unsubscribe - // from subscribed topics and punsubscribe from - // psubscribed topics, or hiredis leads to segfaults - std::set subscribed_topics_; - std::mutex subscribed_topics_guard_; - - std::set psubscribed_topics_; - std::mutex psubscribed_topics_guard_; - - // Set of persisting commands, so that we can cancel them - std::set*> commands_; - - // Reference to rdx_.logger_ for convenience - log::Logger& logger_; - - // CVs to wait for unsubscriptions - std::condition_variable cv_unsub_; - std::mutex cv_unsub_guard_; - std::condition_variable cv_punsub_; - std::mutex cv_punsub_guard_; - - // Pending subscriptions - std::atomic_int num_pending_subs_ = {0}; -}; - -} // End namespace diff --git a/src/utils/logger.cpp b/src/utils/logger.cpp index d336460..6d3e8b6 100644 --- a/src/utils/logger.cpp +++ b/src/utils/logger.cpp @@ -5,7 +5,7 @@ * http://vilipetek.com/2014/04/17/thread-safe-simple-logger-in-c11/ */ -#include "logger.hpp" +#include "utils/logger.hpp" #include #include diff --git a/src/utils/logger.hpp b/src/utils/logger.hpp deleted file mode 100644 index 14b762b..0000000 --- a/src/utils/logger.hpp +++ /dev/null @@ -1,83 +0,0 @@ -/** -* Simple stream-based logger for C++11. -* -* Adapted from -* http://vilipetek.com/2014/04/17/thread-safe-simple-logger-in-c11/ -*/ - -#pragma once - -#include -#include -#include -#include -#include - -namespace redox { -namespace log { - -// Log message levels -enum Level { - Trace, Debug, Info, Warning, Error, Fatal, Off -}; - -// Forward declaration -class Logger; - -// -------------------- -// Logstream -// -------------------- - -class Logstream : public std::ostringstream { -public: - Logstream(Logger &logger, Level l); - Logstream(const Logstream &ls); - ~Logstream(); -private: - Logger &m_logger; - Level m_loglevel; -}; - -// -------------------- -// Logger -// -------------------- - -class Logger { -public: - - Logger(std::string filename, Level loglevel = Level::Info); - Logger(std::ostream &outfile, Level loglevel = Level::Info); - - virtual ~Logger(); - - void level(Level l) { m_loglevel = l; } - Level level() { return m_loglevel; } - - void log(Level l, std::string oMessage); - - Logstream operator()(Level l = Level::Info) { return Logstream(*this, l); } - - // Helpers - Logstream trace() { return (*this)(Level::Trace); } - Logstream debug() { return (*this)(Level::Debug); } - Logstream info() { return (*this)(Level::Info); } - Logstream warning() { return (*this)(Level::Warning); } - Logstream error() { return (*this)(Level::Error); } - Logstream fatal() { return (*this)(Level::Fatal); } - -private: - const tm *getLocalTime(); - -private: - std::mutex m_lock; - - std::ofstream m_file; - std::ostream &m_stream; - - tm m_time; - - Level m_loglevel; -}; - -} // End namespace -} // End namespace -- libgit2 0.21.4