command.hpp
4.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/**
* Redox - A modern, asynchronous, and wicked fast C++11 client for Redis
*
* https://github.com/hmartiro/redox
*
* Copyright 2015 - Hayk Martirosyan <hayk.mart at gmail dot com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <string>
#include <functional>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <hiredis/adapters/libev.h>
#include <hiredis/async.h>
#include "utils/logger.hpp"
namespace redox {
class Redox;
/**
* The Command class represents a single command string to be sent to
* a Redis server, for both synchronous and asynchronous usage. It manages
* all of the state relevant to a single command string. A Command can also
* represent a deferred or looping command, in which case the success or
* error callbacks are invoked more than once.
*/
template<class ReplyT>
class Command {
public:
// Reply codes
static const int NO_REPLY = -1; // No reply yet
static const int OK_REPLY = 0; // Successful reply of the expected type
static const int NIL_REPLY = 1; // Got a nil reply
static const int ERROR_REPLY = 2; // Got an error reply
static const int SEND_ERROR = 3; // Could not send to server
static const int WRONG_TYPE = 4; // Got reply, but it was not the expected type
static const int TIMEOUT = 5; // No reply, timed out
/**
* Returns the reply status of this command.
*/
int status() const { return reply_status_; }
/**
* Returns true if this command got a successful reply.
*/
bool ok() const { return reply_status_ == OK_REPLY; }
/**
* Returns the reply value, if the reply was successful (ok() == true).
*/
ReplyT reply();
/**
* Tells the event loop to free memory for this command. The user is
* responsible for calling this on synchronous or looping commands,
* AKA when free_memory_ = false.
*/
void free();
/**
* This method returns once this command's callback has been invoked
* (or would have been invoked if there is none) since the last call
* to wait(). If it is the first call, then returns once the callback
* is invoked for the first time.
*/
void wait();
/**
* Returns the command string represented by this object.
*/
const std::string& cmd() const { return cmd_; };
// Allow public access to constructed data
Redox* const rdx_;
const long id_;
const std::string cmd_;
const double repeat_;
const double after_;
const bool free_memory_;
private:
Command(
Redox* rdx,
long id,
const std::string& cmd,
const std::function<void(Command<ReplyT>&)>& callback,
double repeat, double after,
bool free_memory,
log::Logger& logger
);
// Handles a new reply from the server
void processReply(redisReply* r);
// Invoke a user callback from the reply object. This method is specialized
// for each ReplyT of Command.
void parseReplyObject();
// Directly invoke the user callback if it exists
void invoke() { if(callback_) callback_(*this); }
bool checkErrorReply();
bool checkNilReply();
bool isExpectedReply(int type);
bool isExpectedReply(int typeA, int typeB);
// If needed, free the redisReply
void freeReply();
// The last server reply
redisReply* reply_obj_ = nullptr;
// User callback
const std::function<void(Command<ReplyT>&)> callback_;
// Place to store the reply value and status.
ReplyT reply_val_;
std::atomic_int reply_status_;
// How many messages sent to server but not received reply
std::atomic_int pending_ = {0};
// Whether a repeating or delayed command is canceled
std::atomic_bool canceled_ = {false};
// libev timer watcher
ev_timer timer_;
std::mutex timer_guard_;
// Access the reply value only when not being changed
std::mutex reply_guard_;
// For synchronous use
std::condition_variable waiter_;
std::mutex waiter_lock_;
std::atomic_bool waiting_done_ = {false};
// Passed on from Redox class
log::Logger& logger_;
// Explicitly delete copy constructor and assignment operator,
// Command objects should never be copied because they hold
// state with a network resource.
Command(const Command&) = delete;
Command& operator=(const Command&) = delete;
friend class Redox;
};
} // End namespace redis