Commit 1a01c9dd300f6351cb4c99e4b71195c46c933217

Authored by Hayk Martirosyan
1 parent 3bb3949a

Implement .get and .set helper commands, running check

Easy few lines for get and set commands that are easy to use.

Implemented a condition variable that blocks run() until the event
loop is running and Redis is connected. Also added a check to
command() that will throw an exception if you try to add a command
before calling run().
CMakeLists.txt
@@ -41,11 +41,11 @@ target_link_libraries(basic_threaded ${LIB_REDIS}) @@ -41,11 +41,11 @@ target_link_libraries(basic_threaded ${LIB_REDIS})
41 #add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL}) 41 #add_executable(lpush_benchmark examples/lpush_benchmark.cpp ${SRC_ALL})
42 #target_link_libraries(lpush_benchmark ${LIB_REDIS}) 42 #target_link_libraries(lpush_benchmark ${LIB_REDIS})
43 43
44 -add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL})  
45 -target_link_libraries(speed_test_async ${LIB_REDIS}) 44 +#add_executable(speed_test_async examples/speed_test_async.cpp ${SRC_ALL})
  45 +#target_link_libraries(speed_test_async ${LIB_REDIS})
46 46
47 -add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL})  
48 -target_link_libraries(speed_test_sync ${LIB_REDIS}) 47 +#add_executable(speed_test_sync examples/speed_test_sync.cpp ${SRC_ALL})
  48 +#target_link_libraries(speed_test_sync ${LIB_REDIS})
49 49
50 add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL}) 50 add_executable(speed_test_async_multi examples/speed_test_async_multi.cpp ${SRC_ALL})
51 target_link_libraries(speed_test_async_multi ${LIB_REDIS}) 51 target_link_libraries(speed_test_async_multi ${LIB_REDIS})
examples/basic.cpp
@@ -10,14 +10,12 @@ using namespace std; @@ -10,14 +10,12 @@ using namespace std;
10 int main(int argc, char* argv[]) { 10 int main(int argc, char* argv[]) {
11 11
12 redox::Redox rdx = {"localhost", 6379}; 12 redox::Redox rdx = {"localhost", 6379};
  13 + rdx.run();
13 14
14 - rdx.command<string>("SET alaska rules!", [](const string &cmd, const string &value) {  
15 - cout << cmd << ": " << value << endl;  
16 - }); 15 + if(!rdx.set("alaska", "rules"))
  16 + cerr << "Failed to set key!" << endl;
17 17
18 - rdx.command<string>("GET alaska", [](const string &cmd, const string &value) {  
19 - cout << cmd << ": " << value << endl;  
20 - }); 18 + cout << "alaska: " << rdx.get("alaska") << endl;
21 19
22 - rdx.run_blocking(); 20 + rdx.stop();
23 } 21 }
src/command.hpp
@@ -64,6 +64,7 @@ public: @@ -64,6 +64,7 @@ public:
64 64
65 const ReplyT& reply(); 65 const ReplyT& reply();
66 int status() { return reply_status; }; 66 int status() { return reply_status; };
  67 + bool ok() { return reply_status == REDOX_OK; }
67 bool is_completed() { return completed; } 68 bool is_completed() { return completed; }
68 69
69 /** 70 /**
@@ -212,7 +213,7 @@ void Command&lt;ReplyT&gt;::free() { @@ -212,7 +213,7 @@ void Command&lt;ReplyT&gt;::free() {
212 213
213 template<class ReplyT> 214 template<class ReplyT>
214 const ReplyT& Command<ReplyT>::reply() { 215 const ReplyT& Command<ReplyT>::reply() {
215 - if(reply_status != REDOX_OK) { 216 + if(!ok()) {
216 std::cout << "[WARNING] " << cmd 217 std::cout << "[WARNING] " << cmd
217 << ": Accessing value of reply with status != OK." << std::endl; 218 << ": Accessing value of reply with status != OK." << std::endl;
218 } 219 }
src/redox.cpp
@@ -88,6 +88,9 @@ void Redox::run_blocking() { @@ -88,6 +88,9 @@ void Redox::run_blocking() {
88 connected_lock.lock(); 88 connected_lock.lock();
89 connected_lock.unlock(); 89 connected_lock.unlock();
90 90
  91 + running = true;
  92 + running_waiter.notify_one();
  93 +
91 // Continuously create events and handle them 94 // Continuously create events and handle them
92 while (!to_exit) { 95 while (!to_exit) {
93 process_queued_commands(); 96 process_queued_commands();
@@ -107,6 +110,7 @@ void Redox::run_blocking() { @@ -107,6 +110,7 @@ void Redox::run_blocking() {
107 } 110 }
108 111
109 exited = true; 112 exited = true;
  113 + running = false;
110 114
111 // Let go for block_until_stopped method 115 // Let go for block_until_stopped method
112 exit_waiter.notify_one(); 116 exit_waiter.notify_one();
@@ -118,8 +122,9 @@ void Redox::run() { @@ -118,8 +122,9 @@ void Redox::run() {
118 122
119 event_loop_thread = thread([this] { run_blocking(); }); 123 event_loop_thread = thread([this] { run_blocking(); });
120 124
121 - // Don't return until connected  
122 - lock_guard<mutex> lg(connected_lock); 125 + // Block until connected and running the event loop
  126 + unique_lock<mutex> ul(running_waiter_lock);
  127 + running_waiter.wait(ul, [this] { return running.load(); });
123 } 128 }
124 129
125 void Redox::stop_signal() { 130 void Redox::stop_signal() {
@@ -299,4 +304,20 @@ bool Redox::command_blocking(const string&amp; cmd) { @@ -299,4 +304,20 @@ bool Redox::command_blocking(const string&amp; cmd) {
299 return succeeded; 304 return succeeded;
300 } 305 }
301 306
  307 +string Redox::get(const string& key) {
  308 +
  309 + auto c = command_blocking<char*>("GET " + key);
  310 + if(!c->ok()) {
  311 + throw runtime_error("[FATAL] Error getting key " + key + ": " + to_string(c->status()));
  312 + }
  313 + string reply = c->reply();
  314 + c->free();
  315 + return reply;
  316 +};
  317 +
  318 +bool Redox::set(const std::string& key, const std::string& value) {
  319 +
  320 + return command_blocking("SET " + key + " " + value);
  321 +}
  322 +
302 } // End namespace redis 323 } // End namespace redis
src/redox.hpp
@@ -35,7 +35,6 @@ public: @@ -35,7 +35,6 @@ public:
35 redisAsyncContext *ctx; 35 redisAsyncContext *ctx;
36 36
37 void run(); 37 void run();
38 - void run_blocking();  
39 38
40 void stop_signal(); 39 void stop_signal();
41 void block(); 40 void block();
@@ -86,6 +85,10 @@ public: @@ -86,6 +85,10 @@ public:
86 template<class ReplyT> 85 template<class ReplyT>
87 std::unordered_map<long, Command<ReplyT>*>& get_command_map(); 86 std::unordered_map<long, Command<ReplyT>*>& get_command_map();
88 87
  88 + // Helpers
  89 + std::string get(const std::string& key);
  90 + bool set(const std::string& key, const std::string& value);
  91 +
89 private: 92 private:
90 93
91 // Redox server 94 // Redox server
@@ -101,6 +104,10 @@ private: @@ -101,6 +104,10 @@ private:
101 // Number of commands processed 104 // Number of commands processed
102 std::atomic_long cmd_count = {0}; 105 std::atomic_long cmd_count = {0};
103 106
  107 + std::atomic_bool running = {false};
  108 + std::mutex running_waiter_lock;
  109 + std::condition_variable running_waiter;
  110 +
104 std::atomic_bool to_exit = {false}; // Signal to exit 111 std::atomic_bool to_exit = {false}; // Signal to exit
105 std::atomic_bool exited = {false}; // Event thread exited 112 std::atomic_bool exited = {false}; // Event thread exited
106 std::mutex exit_waiter_lock; 113 std::mutex exit_waiter_lock;
@@ -122,6 +129,8 @@ private: @@ -122,6 +129,8 @@ private:
122 129
123 template<class ReplyT> 130 template<class ReplyT>
124 bool process_queued_command(long id); 131 bool process_queued_command(long id);
  132 +
  133 + void run_blocking();
125 }; 134 };
126 135
127 // --------------------------- 136 // ---------------------------
@@ -136,6 +145,10 @@ Command&lt;ReplyT&gt;* Redox::command( @@ -136,6 +145,10 @@ Command&lt;ReplyT&gt;* Redox::command(
136 bool free_memory 145 bool free_memory
137 ) { 146 ) {
138 147
  148 + if(!running) {
  149 + throw std::runtime_error("[ERROR] Need to start Redox before running commands!");
  150 + }
  151 +
139 commands_created += 1; 152 commands_created += 1;
140 auto* c = new Command<ReplyT>(this, commands_created, cmd, 153 auto* c = new Command<ReplyT>(this, commands_created, cmd,
141 callback, error_callback, repeat, after, free_memory); 154 callback, error_callback, repeat, after, free_memory);