From c75df219798da3859274fb5f7b8fedbec19daffa Mon Sep 17 00:00:00 2001 From: Hayk Martirosyan Date: Tue, 23 Dec 2014 02:54:30 -0500 Subject: [PATCH] Start of looped command implementation --- examples/basic.cpp | 45 ++++++++++++++++++++++++--------------------- src/redisx.cpp | 33 +++++++++++++++++++++++++++++++++ src/redisx.hpp | 3 +++ 3 files changed, 60 insertions(+), 21 deletions(-) diff --git a/examples/basic.cpp b/examples/basic.cpp index bf867ed..4f165dd 100644 --- a/examples/basic.cpp +++ b/examples/basic.cpp @@ -18,7 +18,7 @@ unsigned long time_ms() { int main(int argc, char* argv[]) { - redisx::Redis r(REDIS_HOST, REDIS_PORT); + redisx::Redis r = {REDIS_HOST, REDIS_PORT}; r.command("GET blah", [](const string& cmd, const string& value) { cout << "[COMMAND] " << cmd << ": " << value << endl; @@ -45,28 +45,31 @@ int main(int argc, char* argv[]) { cout << "num deleted: " << num_deleted << endl; }); - unsigned long t0 = time_ms(); - unsigned long t1 = t0; - - int len = 1000000; - int count = 0; - - for(int i = 0; i < len; i++) { - r.command("set blah wefoiwef", [&t0, &t1, &count, len](const string& cmd, const string& reply) { - - count++; - if(count == len) { - cout << cmd << ": " << reply << endl; - cout << "Time to queue async commands: " << t1 - t0 << "ms" << endl; - cout << "Time to receive all: " << time_ms() - t1 << "ms" << endl; - cout << "Total time: " << time_ms() - t0 << "ms" << endl; - } - }); - } - t1 = time_ms(); + r.command_loop("LPUSH count 1", 0, 1); + r.command_loop("LPUSH count2 1", 0, 1); + +// unsigned long t0 = time_ms(); +// unsigned long t1 = t0; +// +// int len = 1000000; +// int count = 0; +// +// for(int i = 0; i < len; i++) { +// r.command("set blah wefoiwef", [&t0, &t1, &count, len](const string& cmd, const string& reply) { +// +// count++; +// if(count == len) { +// cout << cmd << ": " << reply << endl; +// cout << "Time to queue async commands: " << t1 - t0 << "ms" << endl; +// cout << "Time to receive all: " << time_ms() - t1 << "ms" << endl; +// cout << "Total time: " << time_ms() - t0 << "ms" << endl; +// } +// }); +// } +// t1 = time_ms(); thread loop([&r] { r.start(); }); loop.join(); return 0; -} +}; diff --git a/src/redisx.cpp b/src/redisx.cpp index b5c892a..6d71a44 100644 --- a/src/redisx.cpp +++ b/src/redisx.cpp @@ -54,6 +54,9 @@ void Redis::start() { event_base_dispatch(base); } + +// ---------------------------- + void Redis::command(const char* cmd) { int status = redisAsyncCommand(c, NULL, NULL, cmd); if (status != REDIS_OK) { @@ -64,6 +67,36 @@ void Redis::command(const char* cmd) { // ---------------------------- +void e_callback(evutil_socket_t fd, short what, void *arg) { + + const char* cmd = "LPUSH count 1"; + + redisAsyncContext* c = (redisAsyncContext*)arg; + + int status = redisAsyncCommand(c, NULL, NULL, cmd); + if (status != REDIS_OK) { + cerr << "[ERROR] Async command \"" << cmd << "\": " << c->errstr << endl; + return; + } +} + +struct event* Redis::command_loop(const char* cmd, long interval_s, long interval_us) { + + struct event* e; + if(interval_s == 0 && interval_us == 0) { + e = event_new(base, -1, EV_PERSIST, e_callback, c); + event_add(e, NULL); + } else { + struct timeval e_time = {interval_s, interval_us}; + e = event_new(base, -1, EV_TIMEOUT | EV_PERSIST, e_callback, c); + event_add(e, &e_time); + } + + return e; +} + +// ---------------------------- + template<> void invoke_callback(const CommandAsync* cmd_obj, redisReply* reply) { cmd_obj->invoke(reply); diff --git a/src/redisx.hpp b/src/redisx.hpp index 9b3ad62..951bbbb 100644 --- a/src/redisx.hpp +++ b/src/redisx.hpp @@ -29,6 +29,8 @@ public: void command(const char* command); + struct event* command_loop(const char* command, long interval_s, long interval_us); + void get(const char* key, std::function callback); void set(const char* key, const char* value); @@ -105,4 +107,5 @@ void Redis::command(const std::string& cmd, const std::function