diff --git a/CMakeLists.txt b/CMakeLists.txt index ba78727..f15bff6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,7 +7,7 @@ option(examples "Build all examples." ON) # Use RelWithDebInfo if no configuration specified if(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE) - set(CMAKE_BUILD_TYPE RelWithDebInfo) + set(CMAKE_BUILD_TYPE Release) endif(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall") @@ -22,7 +22,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -Wall") set(SRC_DIR ${CMAKE_SOURCE_DIR}/src) set(SRC_CORE - ${SRC_DIR}/redox.cpp + ${SRC_DIR}/client.cpp ${SRC_DIR}/command.cpp ${SRC_DIR}/subscriber.cpp ) @@ -31,11 +31,24 @@ set(SRC_LOGGER ${SRC_DIR}/utils/logger.cpp) set(SRC_ALL ${SRC_CORE} ${SRC_LOGGER}) +include_directories(${SRC_DIR}) + +# --------------------------------------------------------- +# Linked libraries +# --------------------------------------------------------- + +set(LIB_REDOX redox hiredis ev pthread) + # --------------------------------------------------------- -# Libraries +# Library generation # --------------------------------------------------------- -set(LIB_REDIS hiredis ev pthread) +if (library) + + add_library(redox SHARED ${SRC_ALL}) + add_library(redox_static STATIC ${SRC_ALL}) + +endif() # --------------------------------------------------------- # Test suite @@ -44,11 +57,11 @@ if (tests) enable_testing() find_package(GTest REQUIRED) - include_directories(${GTEST_INCLUDE_DIRS}) - add_executable(test_redox test/test.cpp ${SRC_ALL}) + add_executable(test_redox test/test.cpp) - target_link_libraries(test_redox ${LIB_REDIS} gtest) + target_include_directories(test_redox PUBLIC ${GTEST_INCLUDE_DIRS}) + target_link_libraries(test_redox ${LIB_REDOX} gtest) # So that we can run 'make test' add_test(test_redox test_redox) @@ -60,37 +73,37 @@ endif() # --------------------------------------------------------- if (examples) - add_executable(basic examples/basic.cpp ${SRC_ALL}) - target_link_libraries(basic ${LIB_REDIS}) + add_executable(basic examples/basic.cpp) + target_link_libraries(basic ${LIB_REDOX}) - add_executable(basic_threaded examples/basic_threaded.cpp ${SRC_ALL}) - target_link_libraries(basic_threaded ${LIB_REDIS}) + add_executable(basic_threaded examples/basic_threaded.cpp) + target_link_libraries(basic_threaded ${LIB_REDOX}) - add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL}) - target_link_libraries(lpush_benchmark ${LIB_REDIS}) + add_executable(lpush_benchmark examples/lpush_benchmark.cpp) + target_link_libraries(lpush_benchmark ${LIB_REDOX}) - add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL}) - target_link_libraries(speed_test_async ${LIB_REDIS}) + add_executable(speed_test_async examples/speed_test_async.cpp) + target_link_libraries(speed_test_async ${LIB_REDOX}) - add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL}) - target_link_libraries(speed_test_sync ${LIB_REDIS}) + add_executable(speed_test_sync examples/speed_test_sync.cpp) + target_link_libraries(speed_test_sync ${LIB_REDOX}) - add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL}) - target_link_libraries(speed_test_async_multi ${LIB_REDIS}) + add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp) + target_link_libraries(speed_test_async_multi ${LIB_REDOX}) - add_executable(data_types examples/data_types.cpp ${SRC_ALL}) - target_link_libraries(data_types ${LIB_REDIS}) + add_executable(data_types examples/data_types.cpp) + target_link_libraries(data_types ${LIB_REDOX}) - add_executable(multi_client examples/multi-client.cpp ${SRC_ALL}) - target_link_libraries(multi_client ${LIB_REDIS}) + add_executable(multi_client examples/multi-client.cpp) + target_link_libraries(multi_client ${LIB_REDOX}) - add_executable(binary_data examples/binary_data.cpp ${SRC_ALL}) - target_link_libraries(binary_data ${LIB_REDIS}) + add_executable(binary_data examples/binary_data.cpp) + target_link_libraries(binary_data ${LIB_REDOX}) - add_executable(pub_sub examples/pub_sub.cpp ${SRC_ALL}) - target_link_libraries(pub_sub ${LIB_REDIS}) + add_executable(pub_sub examples/pub_sub.cpp) + target_link_libraries(pub_sub ${LIB_REDOX}) add_executable(speed_test_pubsub examples/speed_test_pubsub ${SRC_ALL}) - target_link_libraries(speed_test_pubsub ${LIB_REDIS}) + target_link_libraries(speed_test_pubsub ${LIB_REDOX}) endif() diff --git a/examples/basic.cpp b/examples/basic.cpp index 866880c..90155ba 100644 --- a/examples/basic.cpp +++ b/examples/basic.cpp @@ -3,7 +3,7 @@ */ #include -#include "../src/redox.hpp" +#include "redox.hpp" using namespace std; using redox::Redox; diff --git a/examples/basic_threaded.cpp b/examples/basic_threaded.cpp index 999b252..11f0acd 100644 --- a/examples/basic_threaded.cpp +++ b/examples/basic_threaded.cpp @@ -5,7 +5,7 @@ #include #include #include -#include "../src/redox.hpp" +#include "redox.hpp" using namespace std; using redox::Redox; diff --git a/examples/binary_data.cpp b/examples/binary_data.cpp index d6d7c9b..484b20f 100644 --- a/examples/binary_data.cpp +++ b/examples/binary_data.cpp @@ -5,7 +5,7 @@ #include #include #include -#include "../src/redox.hpp" +#include "redox.hpp" using namespace std; using redox::Redox; diff --git a/examples/data_types.cpp b/examples/data_types.cpp index 1c17c6e..00afc51 100644 --- a/examples/data_types.cpp +++ b/examples/data_types.cpp @@ -3,7 +3,7 @@ */ #include -#include "../src/redox.hpp" +#include "redox.hpp" #include #include #include diff --git a/examples/lpush_benchmark.cpp b/examples/lpush_benchmark.cpp index 65bd2e4..dba9bfa 100644 --- a/examples/lpush_benchmark.cpp +++ b/examples/lpush_benchmark.cpp @@ -3,7 +3,7 @@ */ #include -#include "../src/redox.hpp" +#include "redox.hpp" using namespace std; using redox::Redox; diff --git a/examples/multi-client.cpp b/examples/multi-client.cpp index a35882b..98d5001 100644 --- a/examples/multi-client.cpp +++ b/examples/multi-client.cpp @@ -3,7 +3,7 @@ */ #include -#include "../src/redox.hpp" +#include "redox.hpp" using namespace std; using redox::Redox; diff --git a/examples/pub_sub.cpp b/examples/pub_sub.cpp index 68c4623..8a369b3 100644 --- a/examples/pub_sub.cpp +++ b/examples/pub_sub.cpp @@ -4,8 +4,7 @@ #include #include -#include "../src/redox.hpp" -#include "../src/subscriber.hpp" +#include "redox.hpp" using namespace std; diff --git a/examples/speed_test_async.cpp b/examples/speed_test_async.cpp index 0ec5a3f..05970be 100644 --- a/examples/speed_test_async.cpp +++ b/examples/speed_test_async.cpp @@ -5,7 +5,7 @@ */ #include -#include "../src/redox.hpp" +#include "redox.hpp" using namespace std; using redox::Redox; diff --git a/examples/speed_test_async_multi.cpp b/examples/speed_test_async_multi.cpp index a66ad66..ea25061 100644 --- a/examples/speed_test_async_multi.cpp +++ b/examples/speed_test_async_multi.cpp @@ -6,7 +6,7 @@ #include #include -#include "../src/redox.hpp" +#include "redox.hpp" using namespace std; using redox::Redox; diff --git a/examples/speed_test_pubsub.cpp b/examples/speed_test_pubsub.cpp index aa03529..97d527f 100644 --- a/examples/speed_test_pubsub.cpp +++ b/examples/speed_test_pubsub.cpp @@ -1,6 +1,5 @@ #include -#include "../src/redox.hpp" -#include "../src/subscriber.hpp" +#include "redox.hpp" using namespace std; using redox::Redox; diff --git a/examples/speed_test_sync.cpp b/examples/speed_test_sync.cpp index 3674c20..5cfe11d 100644 --- a/examples/speed_test_sync.cpp +++ b/examples/speed_test_sync.cpp @@ -5,7 +5,7 @@ */ #include -#include "../src/redox.hpp" +#include "redox.hpp" using namespace std; using namespace redox; diff --git a/src/redox.cpp b/src/client.cpp index 4bc3da9..4bc3da9 100644 --- a/src/redox.cpp +++ b/src/client.cpp diff --git a/src/client.hpp b/src/client.hpp new file mode 100644 index 0000000..5b43037 --- /dev/null +++ b/src/client.hpp @@ -0,0 +1,435 @@ +/** +* Redox - A modern, asynchronous, and wicked fast C++11 client for Redis +* +* https://github.com/hmartiro/redox +* +* Copyright 2015 - Hayk Martirosyan +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "utils/logger.hpp" +#include "command.hpp" + +namespace redox { + +static const std::string REDIS_DEFAULT_HOST = "localhost"; +static const int REDIS_DEFAULT_PORT = 6379; + +/** +* Redox intro here. +*/ +class Redox { + +public: + + // Connection states + static const int NOT_YET_CONNECTED = 0; + static const int CONNECTED = 1; + static const int DISCONNECTED = 2; + static const int CONNECT_ERROR = 3; + static const int DISCONNECT_ERROR = 4; + + // ------------------------------------------------ + // Core public API + // ------------------------------------------------ + + /** + * Initializes everything, connects over TCP to a Redis server. + */ + Redox( + const std::string& host = REDIS_DEFAULT_HOST, + const int port = REDIS_DEFAULT_PORT, + std::function connection_callback = nullptr, + std::ostream& log_stream = std::cout, + log::Level log_level = log::Info + ); + + /** + * Initializes everything, connects over unix sockets to a Redis server. + */ + Redox( + const std::string& path, + std::function connection_callback, + std::ostream& log_stream = std::cout, + log::Level log_level = log::Info + ); + + /** + * Disconnects from the Redis server, shuts down the event loop, and cleans up. + * Internally calls disconnect() and wait(). + */ + ~Redox(); + + /** + * Connects to Redis and starts an event loop in a separate thread. Returns + * true once everything is ready, or false on failure. + */ + bool connect(); + + /** + * Signal the event loop thread to disconnect from Redis and shut down. + */ + void disconnect(); + + /** + * Blocks until the event loop exits and disconnection is complete, then returns. + * Usually no need to call manually as it is handled in the destructor. + */ + void wait(); + + /** + * Asynchronously runs a command and invokes the callback when a reply is + * received or there is an error. The callback is guaranteed to be invoked + * exactly once. The Command object is provided to the callback, and the + * memory for it is automatically freed when the callback returns. + */ + template + void command( + const std::string& cmd, + const std::function&)>& callback = nullptr + ); + + /** + * Asynchronously runs a command and ignores any errors or replies. + */ + void command(const std::string& cmd); + + /** + * Synchronously runs a command, returning the Command object only once + * a reply is received or there is an error. The user is responsible for + * calling .free() on the returned Command object. + */ + template + Command& commandSync(const std::string& cmd); + + /** + * Synchronously runs a command, returning only once a reply is received + * or there's an error. Returns true on successful reply, false on error. + */ + bool commandSync(const std::string& cmd); + + /** + * Creates an asynchronous command that is run every [repeat] seconds, + * with the first one run in [after] seconds. If [repeat] is 0, the + * command is run only once. The user is responsible for calling .free() + * on the returned Command object. + */ + template + Command& commandLoop( + const std::string& cmd, + const std::function&)>& callback, + double repeat, + double after = 0.0 + ); + + /** + * Creates an asynchronous command that is run once after a given + * delay. The callback is invoked exactly once on a successful reply + * or error, and the Command object memory is automatically freed + * after the callback returns. + */ + template + void commandDelayed( + const std::string& cmd, + const std::function&)>& callback, + double after + ); + + // ------------------------------------------------ + // Wrapper methods for convenience only + // ------------------------------------------------ + + /** + * Redis GET command wrapper - return the value for the given key, or throw + * an exception if there is an error. Blocking call. + */ + std::string get(const std::string& key); + + /** + * Redis SET command wrapper - set the value for the given key. Return + * true if succeeded, false if error. Blocking call. + */ + bool set(const std::string& key, const std::string& value); + + /** + * Redis DEL command wrapper - delete the given key. Return true if succeeded, + * false if error. Blocking call. + */ + bool del(const std::string& key); + + /** + * Redis PUBLISH command wrapper - publish the given message to all subscribers. + * Non-blocking call. + */ + void publish(const std::string& topic, const std::string& msg); + + // ------------------------------------------------ + // Public members + // ------------------------------------------------ + + // Hiredis context, left public to allow low-level access + redisAsyncContext * ctx_; + + // Redox server over TCP + const std::string host_; + const int port_; + + // Redox server over unix + const std::string path_; + + // Logger + log::Logger logger_; + +private: + + // ------------------------------------------------ + // Private methods + // ------------------------------------------------ + + // One stop shop for creating commands. The base of all public + // methods that run commands. + template + Command& createCommand( + const std::string& cmd, + const std::function&)>& callback = nullptr, + double repeat = 0.0, + double after = 0.0, + bool free_memory = true + ); + + // Setup code for the constructors + void init_ev(); + void init_hiredis(); + + // Callbacks invoked on server connection/disconnection + static void connectedCallback(const redisAsyncContext* c, int status); + static void disconnectedCallback(const redisAsyncContext* c, int status); + + // Main event loop, run in a separate thread + void runEventLoop(); + + // Return the command map corresponding to the templated reply type + template + std::unordered_map*>& getCommandMap(); + + // Return the given Command from the relevant command map, or nullptr if not there + template + Command* findCommand(long id); + + // Send all commands in the command queue to the server + static void processQueuedCommands(struct ev_loop* loop, ev_async* async, int revents); + + // Process the command with the given ID. Return true if the command had the + // templated type, and false if it was not in the command map of that type. + template + bool processQueuedCommand(long id); + + // Callback given to libev for a Command's timer watcher, to be processed in + // a deferred or looping state + template + static void submitCommandCallback(struct ev_loop* loop, ev_timer* timer, int revents); + + // Submit an asynchronous command to the Redox server. Return + // true if succeeded, false otherwise. + template + static bool submitToServer(Command* c); + + // Callback given to hiredis to invoke when a reply is received + template + static void commandCallback(redisAsyncContext* ctx, void* r, void* privdata); + + // Free all commands in the commands_to_free_ queue + static void freeQueuedCommands(struct ev_loop* loop, ev_async* async, int revents); + + // Free the command with the given ID. Return true if the command had the templated + // type, and false if it was not in the command map of that type. + template + bool freeQueuedCommand(long id); + + // Invoked by Command objects when they are completed. Removes them + // from the command map. + template + void deregisterCommand(const long id) { + std::lock_guard lg1(command_map_guard_); + getCommandMap().erase(id); + commands_deleted_ += 1; + } + + // Free all commands remaining in the command maps + long freeAllCommands(); + + // Helper function for freeAllCommands to access a specific command map + template + long freeAllCommandsOfType(); + + // ------------------------------------------------ + // Private members + // ------------------------------------------------ + + // Manage connection state + std::atomic_int connect_state_ = {NOT_YET_CONNECTED}; + std::mutex connect_lock_; + std::condition_variable connect_waiter_; + + // User connect/disconnect callbacks + std::function user_connection_callback_; + + // Dynamically allocated libev event loop + struct ev_loop* evloop_; + + // Asynchronous watchers + ev_async watcher_command_; // For processing commands + ev_async watcher_stop_; // For breaking the loop + ev_async watcher_free_; // For freeing commands + + // Track of Command objects allocated. Also provides unique Command IDs. + std::atomic_long commands_created_ = {0}; + std::atomic_long commands_deleted_ = {0}; + + // Separate thread to have a non-blocking event loop + std::thread event_loop_thread_; + + // Variable and CV to know when the event loop starts running + std::atomic_bool running_ = {false}; + std::mutex running_waiter_lock_; + std::condition_variable running_waiter_; + + // Variable and CV to know when the event loop stops running + std::atomic_bool to_exit_ = {false}; // Signal to exit + std::atomic_bool exited_ = {false}; // Event thread exited + std::mutex exit_waiter_lock_; + std::condition_variable exit_waiter_; + + // Maps of each Command, fetchable by the unique ID number + // In C++14, member variable templates will replace all of these types + // with a single templated declaration + // --------- + // template + // std::unordered_map*> commands_; + // --------- + std::unordered_map*> commands_redis_reply_; + std::unordered_map*> commands_string_; + std::unordered_map*> commands_char_p_; + std::unordered_map*> commands_int_; + std::unordered_map*> commands_long_long_int_; + std::unordered_map*> commands_null_; + std::unordered_map>*> commands_vector_string_; + std::unordered_map>*> commands_set_string_; + std::unordered_map>*> commands_unordered_set_string_; + std::mutex command_map_guard_; // Guards access to all of the above + + // Command IDs pending to be sent to the server + std::queue command_queue_; + std::mutex queue_guard_; + + // Commands IDs pending to be freed by the event loop + std::queue commands_to_free_; + std::mutex free_queue_guard_; + + // Commands use this method to deregister themselves from Redox, + // give it access to private members + template + friend void Command::free(); +}; + +// ------------------------------------------------ +// Implementation of templated methods +// ------------------------------------------------ + +template +Command& Redox::createCommand( + const std::string& cmd, + const std::function&)>& callback, + double repeat, + double after, + bool free_memory +) { + + if(!running_) { + throw std::runtime_error("[ERROR] Need to connect Redox before running commands!"); + } + + commands_created_ += 1; + auto* c = new Command( + this, commands_created_, cmd, + callback, repeat, after, free_memory, logger_ + ); + + std::lock_guard lg(queue_guard_); + std::lock_guard lg2(command_map_guard_); + + getCommandMap()[c->id_] = c; + command_queue_.push(c->id_); + + // Signal the event loop to process this command + ev_async_send(evloop_, &watcher_command_); + + return *c; +} + +template +void Redox::command( + const std::string& cmd, + const std::function&)>& callback +) { + createCommand(cmd, callback); +} + +template +Command& Redox::commandLoop( + const std::string& cmd, + const std::function&)>& callback, + double repeat, + double after +) { + return createCommand(cmd, callback, repeat, after, false); +} + +template +void Redox::commandDelayed( + const std::string& cmd, + const std::function&)>& callback, + double after +) { + createCommand(cmd, callback, 0, after, true); +} + +template +Command& Redox::commandSync(const std::string& cmd) { + auto& c = createCommand(cmd, nullptr, 0, 0, false); + c.wait(); + return c; +} + +} // End namespace redis diff --git a/src/command.cpp b/src/command.cpp index 01e9514..3f94782 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -23,7 +23,7 @@ #include #include "command.hpp" -#include "redox.hpp" +#include "client.hpp" using namespace std; diff --git a/src/redox.hpp b/src/redox.hpp index 5b43037..07791b0 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -20,416 +20,6 @@ #pragma once -#include -#include - -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include -#include -#include - -#include "utils/logger.hpp" +#include "client.hpp" #include "command.hpp" - -namespace redox { - -static const std::string REDIS_DEFAULT_HOST = "localhost"; -static const int REDIS_DEFAULT_PORT = 6379; - -/** -* Redox intro here. -*/ -class Redox { - -public: - - // Connection states - static const int NOT_YET_CONNECTED = 0; - static const int CONNECTED = 1; - static const int DISCONNECTED = 2; - static const int CONNECT_ERROR = 3; - static const int DISCONNECT_ERROR = 4; - - // ------------------------------------------------ - // Core public API - // ------------------------------------------------ - - /** - * Initializes everything, connects over TCP to a Redis server. - */ - Redox( - const std::string& host = REDIS_DEFAULT_HOST, - const int port = REDIS_DEFAULT_PORT, - std::function connection_callback = nullptr, - std::ostream& log_stream = std::cout, - log::Level log_level = log::Info - ); - - /** - * Initializes everything, connects over unix sockets to a Redis server. - */ - Redox( - const std::string& path, - std::function connection_callback, - std::ostream& log_stream = std::cout, - log::Level log_level = log::Info - ); - - /** - * Disconnects from the Redis server, shuts down the event loop, and cleans up. - * Internally calls disconnect() and wait(). - */ - ~Redox(); - - /** - * Connects to Redis and starts an event loop in a separate thread. Returns - * true once everything is ready, or false on failure. - */ - bool connect(); - - /** - * Signal the event loop thread to disconnect from Redis and shut down. - */ - void disconnect(); - - /** - * Blocks until the event loop exits and disconnection is complete, then returns. - * Usually no need to call manually as it is handled in the destructor. - */ - void wait(); - - /** - * Asynchronously runs a command and invokes the callback when a reply is - * received or there is an error. The callback is guaranteed to be invoked - * exactly once. The Command object is provided to the callback, and the - * memory for it is automatically freed when the callback returns. - */ - template - void command( - const std::string& cmd, - const std::function&)>& callback = nullptr - ); - - /** - * Asynchronously runs a command and ignores any errors or replies. - */ - void command(const std::string& cmd); - - /** - * Synchronously runs a command, returning the Command object only once - * a reply is received or there is an error. The user is responsible for - * calling .free() on the returned Command object. - */ - template - Command& commandSync(const std::string& cmd); - - /** - * Synchronously runs a command, returning only once a reply is received - * or there's an error. Returns true on successful reply, false on error. - */ - bool commandSync(const std::string& cmd); - - /** - * Creates an asynchronous command that is run every [repeat] seconds, - * with the first one run in [after] seconds. If [repeat] is 0, the - * command is run only once. The user is responsible for calling .free() - * on the returned Command object. - */ - template - Command& commandLoop( - const std::string& cmd, - const std::function&)>& callback, - double repeat, - double after = 0.0 - ); - - /** - * Creates an asynchronous command that is run once after a given - * delay. The callback is invoked exactly once on a successful reply - * or error, and the Command object memory is automatically freed - * after the callback returns. - */ - template - void commandDelayed( - const std::string& cmd, - const std::function&)>& callback, - double after - ); - - // ------------------------------------------------ - // Wrapper methods for convenience only - // ------------------------------------------------ - - /** - * Redis GET command wrapper - return the value for the given key, or throw - * an exception if there is an error. Blocking call. - */ - std::string get(const std::string& key); - - /** - * Redis SET command wrapper - set the value for the given key. Return - * true if succeeded, false if error. Blocking call. - */ - bool set(const std::string& key, const std::string& value); - - /** - * Redis DEL command wrapper - delete the given key. Return true if succeeded, - * false if error. Blocking call. - */ - bool del(const std::string& key); - - /** - * Redis PUBLISH command wrapper - publish the given message to all subscribers. - * Non-blocking call. - */ - void publish(const std::string& topic, const std::string& msg); - - // ------------------------------------------------ - // Public members - // ------------------------------------------------ - - // Hiredis context, left public to allow low-level access - redisAsyncContext * ctx_; - - // Redox server over TCP - const std::string host_; - const int port_; - - // Redox server over unix - const std::string path_; - - // Logger - log::Logger logger_; - -private: - - // ------------------------------------------------ - // Private methods - // ------------------------------------------------ - - // One stop shop for creating commands. The base of all public - // methods that run commands. - template - Command& createCommand( - const std::string& cmd, - const std::function&)>& callback = nullptr, - double repeat = 0.0, - double after = 0.0, - bool free_memory = true - ); - - // Setup code for the constructors - void init_ev(); - void init_hiredis(); - - // Callbacks invoked on server connection/disconnection - static void connectedCallback(const redisAsyncContext* c, int status); - static void disconnectedCallback(const redisAsyncContext* c, int status); - - // Main event loop, run in a separate thread - void runEventLoop(); - - // Return the command map corresponding to the templated reply type - template - std::unordered_map*>& getCommandMap(); - - // Return the given Command from the relevant command map, or nullptr if not there - template - Command* findCommand(long id); - - // Send all commands in the command queue to the server - static void processQueuedCommands(struct ev_loop* loop, ev_async* async, int revents); - - // Process the command with the given ID. Return true if the command had the - // templated type, and false if it was not in the command map of that type. - template - bool processQueuedCommand(long id); - - // Callback given to libev for a Command's timer watcher, to be processed in - // a deferred or looping state - template - static void submitCommandCallback(struct ev_loop* loop, ev_timer* timer, int revents); - - // Submit an asynchronous command to the Redox server. Return - // true if succeeded, false otherwise. - template - static bool submitToServer(Command* c); - - // Callback given to hiredis to invoke when a reply is received - template - static void commandCallback(redisAsyncContext* ctx, void* r, void* privdata); - - // Free all commands in the commands_to_free_ queue - static void freeQueuedCommands(struct ev_loop* loop, ev_async* async, int revents); - - // Free the command with the given ID. Return true if the command had the templated - // type, and false if it was not in the command map of that type. - template - bool freeQueuedCommand(long id); - - // Invoked by Command objects when they are completed. Removes them - // from the command map. - template - void deregisterCommand(const long id) { - std::lock_guard lg1(command_map_guard_); - getCommandMap().erase(id); - commands_deleted_ += 1; - } - - // Free all commands remaining in the command maps - long freeAllCommands(); - - // Helper function for freeAllCommands to access a specific command map - template - long freeAllCommandsOfType(); - - // ------------------------------------------------ - // Private members - // ------------------------------------------------ - - // Manage connection state - std::atomic_int connect_state_ = {NOT_YET_CONNECTED}; - std::mutex connect_lock_; - std::condition_variable connect_waiter_; - - // User connect/disconnect callbacks - std::function user_connection_callback_; - - // Dynamically allocated libev event loop - struct ev_loop* evloop_; - - // Asynchronous watchers - ev_async watcher_command_; // For processing commands - ev_async watcher_stop_; // For breaking the loop - ev_async watcher_free_; // For freeing commands - - // Track of Command objects allocated. Also provides unique Command IDs. - std::atomic_long commands_created_ = {0}; - std::atomic_long commands_deleted_ = {0}; - - // Separate thread to have a non-blocking event loop - std::thread event_loop_thread_; - - // Variable and CV to know when the event loop starts running - std::atomic_bool running_ = {false}; - std::mutex running_waiter_lock_; - std::condition_variable running_waiter_; - - // Variable and CV to know when the event loop stops running - std::atomic_bool to_exit_ = {false}; // Signal to exit - std::atomic_bool exited_ = {false}; // Event thread exited - std::mutex exit_waiter_lock_; - std::condition_variable exit_waiter_; - - // Maps of each Command, fetchable by the unique ID number - // In C++14, member variable templates will replace all of these types - // with a single templated declaration - // --------- - // template - // std::unordered_map*> commands_; - // --------- - std::unordered_map*> commands_redis_reply_; - std::unordered_map*> commands_string_; - std::unordered_map*> commands_char_p_; - std::unordered_map*> commands_int_; - std::unordered_map*> commands_long_long_int_; - std::unordered_map*> commands_null_; - std::unordered_map>*> commands_vector_string_; - std::unordered_map>*> commands_set_string_; - std::unordered_map>*> commands_unordered_set_string_; - std::mutex command_map_guard_; // Guards access to all of the above - - // Command IDs pending to be sent to the server - std::queue command_queue_; - std::mutex queue_guard_; - - // Commands IDs pending to be freed by the event loop - std::queue commands_to_free_; - std::mutex free_queue_guard_; - - // Commands use this method to deregister themselves from Redox, - // give it access to private members - template - friend void Command::free(); -}; - -// ------------------------------------------------ -// Implementation of templated methods -// ------------------------------------------------ - -template -Command& Redox::createCommand( - const std::string& cmd, - const std::function&)>& callback, - double repeat, - double after, - bool free_memory -) { - - if(!running_) { - throw std::runtime_error("[ERROR] Need to connect Redox before running commands!"); - } - - commands_created_ += 1; - auto* c = new Command( - this, commands_created_, cmd, - callback, repeat, after, free_memory, logger_ - ); - - std::lock_guard lg(queue_guard_); - std::lock_guard lg2(command_map_guard_); - - getCommandMap()[c->id_] = c; - command_queue_.push(c->id_); - - // Signal the event loop to process this command - ev_async_send(evloop_, &watcher_command_); - - return *c; -} - -template -void Redox::command( - const std::string& cmd, - const std::function&)>& callback -) { - createCommand(cmd, callback); -} - -template -Command& Redox::commandLoop( - const std::string& cmd, - const std::function&)>& callback, - double repeat, - double after -) { - return createCommand(cmd, callback, repeat, after, false); -} - -template -void Redox::commandDelayed( - const std::string& cmd, - const std::function&)>& callback, - double after -) { - createCommand(cmd, callback, 0, after, true); -} - -template -Command& Redox::commandSync(const std::string& cmd) { - auto& c = createCommand(cmd, nullptr, 0, 0, false); - c.wait(); - return c; -} - -} // End namespace redis +#include "subscriber.hpp" diff --git a/src/subscriber.hpp b/src/subscriber.hpp index 234a0eb..c2e455c 100644 --- a/src/subscriber.hpp +++ b/src/subscriber.hpp @@ -20,7 +20,7 @@ #pragma once -#include "redox.hpp" +#include "client.hpp" namespace redox { diff --git a/test/test.cpp b/test/test.cpp index ca86f07..359353e 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -20,7 +20,7 @@ #include -#include "../src/redox.hpp" +#include "redox.hpp" #include "gtest/gtest.h" namespace {