diff --git a/CMakeLists.txt b/CMakeLists.txt index 828db03..c9617ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,11 +41,11 @@ target_link_libraries(basic_threaded ${LIB_REDIS}) #add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL}) #target_link_libraries(lpush_benchmark ${LIB_REDIS}) -add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL}) -target_link_libraries(speed_test_async ${LIB_REDIS}) +#add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL}) +#target_link_libraries(speed_test_async ${LIB_REDIS}) -add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL}) -target_link_libraries(speed_test_sync ${LIB_REDIS}) +#add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL}) +#target_link_libraries(speed_test_sync ${LIB_REDIS}) add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL}) target_link_libraries(speed_test_async_multi ${LIB_REDIS}) diff --git a/examples/basic.cpp b/examples/basic.cpp index e10d471..c35f91d 100644 --- a/examples/basic.cpp +++ b/examples/basic.cpp @@ -10,14 +10,12 @@ using namespace std; int main(int argc, char* argv[]) { redox::Redox rdx = {"localhost", 6379}; + rdx.run(); - rdx.command("SET alaska rules!", [](const string &cmd, const string &value) { - cout << cmd << ": " << value << endl; - }); + if(!rdx.set("alaska", "rules")) + cerr << "Failed to set key!" << endl; - rdx.command("GET alaska", [](const string &cmd, const string &value) { - cout << cmd << ": " << value << endl; - }); + cout << "alaska: " << rdx.get("alaska") << endl; - rdx.run_blocking(); + rdx.stop(); } diff --git a/src/command.hpp b/src/command.hpp index 71656f9..570b70c 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -64,6 +64,7 @@ public: const ReplyT& reply(); int status() { return reply_status; }; + bool ok() { return reply_status == REDOX_OK; } bool is_completed() { return completed; } /** @@ -212,7 +213,7 @@ void Command::free() { template const ReplyT& Command::reply() { - if(reply_status != REDOX_OK) { + if(!ok()) { std::cout << "[WARNING] " << cmd << ": Accessing value of reply with status != OK." << std::endl; } diff --git a/src/redox.cpp b/src/redox.cpp index 750e54c..e770bd1 100644 --- a/src/redox.cpp +++ b/src/redox.cpp @@ -88,6 +88,9 @@ void Redox::run_blocking() { connected_lock.lock(); connected_lock.unlock(); + running = true; + running_waiter.notify_one(); + // Continuously create events and handle them while (!to_exit) { process_queued_commands(); @@ -107,6 +110,7 @@ void Redox::run_blocking() { } exited = true; + running = false; // Let go for block_until_stopped method exit_waiter.notify_one(); @@ -118,8 +122,9 @@ void Redox::run() { event_loop_thread = thread([this] { run_blocking(); }); - // Don't return until connected - lock_guard lg(connected_lock); + // Block until connected and running the event loop + unique_lock ul(running_waiter_lock); + running_waiter.wait(ul, [this] { return running.load(); }); } void Redox::stop_signal() { @@ -299,4 +304,20 @@ bool Redox::command_blocking(const string& cmd) { return succeeded; } +string Redox::get(const string& key) { + + auto c = command_blocking("GET " + key); + if(!c->ok()) { + throw runtime_error("[FATAL] Error getting key " + key + ": " + to_string(c->status())); + } + string reply = c->reply(); + c->free(); + return reply; +}; + +bool Redox::set(const std::string& key, const std::string& value) { + + return command_blocking("SET " + key + " " + value); +} + } // End namespace redis diff --git a/src/redox.hpp b/src/redox.hpp index 82e3c8b..3dfb420 100644 --- a/src/redox.hpp +++ b/src/redox.hpp @@ -35,7 +35,6 @@ public: redisAsyncContext *ctx; void run(); - void run_blocking(); void stop_signal(); void block(); @@ -86,6 +85,10 @@ public: template std::unordered_map*>& get_command_map(); + // Helpers + std::string get(const std::string& key); + bool set(const std::string& key, const std::string& value); + private: // Redox server @@ -101,6 +104,10 @@ private: // Number of commands processed std::atomic_long cmd_count = {0}; + std::atomic_bool running = {false}; + std::mutex running_waiter_lock; + std::condition_variable running_waiter; + std::atomic_bool to_exit = {false}; // Signal to exit std::atomic_bool exited = {false}; // Event thread exited std::mutex exit_waiter_lock; @@ -122,6 +129,8 @@ private: template bool process_queued_command(long id); + + void run_blocking(); }; // --------------------------- @@ -136,6 +145,10 @@ Command* Redox::command( bool free_memory ) { + if(!running) { + throw std::runtime_error("[ERROR] Need to start Redox before running commands!"); + } + commands_created += 1; auto* c = new Command(this, commands_created, cmd, callback, error_callback, repeat, after, free_memory);