subscriber.hpp
5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/*
* Redox - A modern, asynchronous, and wicked fast C++11 client for Redis
*
* https://github.com/hmartiro/redox
*
* Copyright 2015 - Hayk Martirosyan <hayk.mart at gmail dot com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "client.hpp"
namespace redox {
class Subscriber {
public:
/**
* Constructor. Same as Redox.
*/
Subscriber(std::ostream &log_stream = std::cout, log::Level log_level = log::Warning);
/**
* Cleans up.
*/
~Subscriber();
/**
* Same as .noWait() on a Redox instance.
*/
void noWait(bool state) { rdx_.noWait(state); }
/**
* Same as .connect() on a Redox instance.
*/
bool connect(const std::string &host = REDIS_DEFAULT_HOST, const int port = REDIS_DEFAULT_PORT,
std::function<void(int)> connection_callback = nullptr) {
return rdx_.connect(host, port, connection_callback);
}
/**
* Same as .connectUnix() on a Redox instance.
*/
bool connectUnix(const std::string &path = REDIS_DEFAULT_PATH,
std::function<void(int)> connection_callback = nullptr) {
return rdx_.connectUnix(path, connection_callback);
}
/**
* Same as .stop() on a Redox instance.
*/
void stop();
/**
* Same as .disconnect() on a Redox instance.
*/
void disconnect();
/**
* Same as .wait() on a Redox instance.
*/
void wait();
/**
* Subscribe to a topic.
*
* msg_callback: invoked whenever a message is received.
* sub_callback: invoked when successfully subscribed
* err_callback: invoked on some error state
*/
void subscribe(const std::string topic,
std::function<void(const std::string &, const std::string &)> msg_callback,
std::function<void(const std::string &)> sub_callback = nullptr,
std::function<void(const std::string &)> unsub_callback = nullptr,
std::function<void(const std::string &, int)> err_callback = nullptr);
/**
* Subscribe to a topic with a pattern.
*
* msg_callback: invoked whenever a message is received.
* sub_callback: invoked when successfully subscribed
* err_callback: invoked on some error state
*/
void psubscribe(const std::string topic,
std::function<void(const std::string &, const std::string &)> msg_callback,
std::function<void(const std::string &)> sub_callback = nullptr,
std::function<void(const std::string &)> unsub_callback = nullptr,
std::function<void(const std::string &, int)> err_callback = nullptr);
/**
* Unsubscribe from a topic.
*
* err_callback: invoked on some error state
*/
void unsubscribe(const std::string topic,
std::function<void(const std::string &, int)> err_callback = nullptr);
/**
* Unsubscribe from a topic with a pattern.
*
* err_callback: invoked on some error state
*/
void punsubscribe(const std::string topic,
std::function<void(const std::string &, int)> err_callback = nullptr);
/**
* Return the topics that are subscribed() to.
*/
std::set<std::string> subscribedTopics() {
std::lock_guard<std::mutex> lg(subscribed_topics_guard_);
return subscribed_topics_;
}
/**
* Return the topic patterns that are psubscribed() to.
*/
std::set<std::string> psubscribedTopics() {
std::lock_guard<std::mutex> lg(psubscribed_topics_guard_);
return psubscribed_topics_;
}
private:
// Base for subscribe and psubscribe
void subscribeBase(const std::string cmd_name, const std::string topic,
std::function<void(const std::string &, const std::string &)> msg_callback,
std::function<void(const std::string &)> sub_callback = nullptr,
std::function<void(const std::string &)> unsub_callback = nullptr,
std::function<void(const std::string &, int)> err_callback = nullptr);
// Base for unsubscribe and punsubscribe
void unsubscribeBase(const std::string cmd_name, const std::string topic,
std::function<void(const std::string &, int)> err_callback = nullptr);
// Underlying Redis client
Redox rdx_;
// Keep track of topics because we can only unsubscribe
// from subscribed topics and punsubscribe from
// psubscribed topics, or hiredis leads to segfaults
std::set<std::string> subscribed_topics_;
std::mutex subscribed_topics_guard_;
std::set<std::string> psubscribed_topics_;
std::mutex psubscribed_topics_guard_;
// Set of persisting commands, so that we can cancel them
std::set<Command<redisReply *> *> commands_;
// Reference to rdx_.logger_ for convenience
log::Logger &logger_;
// CVs to wait for unsubscriptions
std::condition_variable cv_unsub_;
std::mutex cv_unsub_guard_;
std::condition_variable cv_punsub_;
std::mutex cv_punsub_guard_;
// Pending subscriptions
std::atomic_int num_pending_subs_ = {0};
};
} // End namespace