subscriber.cpp
5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
/**
* Redis C++11 wrapper.
*/
#include <string.h>
#include "subscriber.hpp"
using namespace std;
namespace redox {
Subscriber::Subscriber(
const std::string& host, const int port,
std::function<void(int)> connection_callback,
std::ostream& log_stream, log::Level log_level
) : rdx_(host, port, connection_callback, log_stream, log_level),
logger_(rdx_.logger_) {}
Subscriber::Subscriber(
const std::string& path,
std::function<void(int)> connection_callback,
std::ostream& log_stream, log::Level log_level
) : rdx_(path, connection_callback, log_stream, log_level),
logger_(rdx_.logger_) {}
// For debugging only
void debugReply(Command<redisReply*> c) {
redisReply* reply = c.reply();
cout << "------" << endl;
cout << c.cmd() << " " << (reply->type == REDIS_REPLY_ARRAY) << " " << (reply->elements) << endl;
for(size_t i = 0; i < reply->elements; i++) {
redisReply* r = reply->element[i];
cout << "element " << i << ", reply type = " << r->type << " ";
if(r->type == REDIS_REPLY_STRING) cout << r->str << endl;
else if(r->type == REDIS_REPLY_INTEGER) cout << r->integer << endl;
else cout << "some other type" << endl;
}
cout << "------" << endl;
}
void Subscriber::subscribeBase(const string cmd_name, const string topic,
function<void(const string&, const string&)> msg_callback,
function<void(const string&)> sub_callback,
function<void(const string&)> unsub_callback,
function<void(const string&, int)> err_callback
) {
rdx_.commandLoop<redisReply*>(cmd_name + " " + topic,
[this, topic, msg_callback, err_callback, sub_callback, unsub_callback](Command<redisReply*>& c) {
if (!c.ok()) {
if (err_callback) err_callback(topic, c.status());
return;
}
redisReply* reply = c.reply();
// TODO cancel this command on unsubscription?
// If the last entry is an integer, then it is a [p]sub/[p]unsub command
if ((reply->type == REDIS_REPLY_ARRAY) &&
(reply->element[reply->elements - 1]->type == REDIS_REPLY_INTEGER)) {
if (!strncmp(reply->element[0]->str, "sub", 3)) {
subscribed_topics_.insert(topic);
if (sub_callback) sub_callback(topic);
} else if (!strncmp(reply->element[0]->str, "psub", 4)) {
psubscribed_topics_.insert(topic);
if (sub_callback) sub_callback(topic);
} else if (!strncmp(reply->element[0]->str, "uns", 3)) {
subscribed_topics_.erase(topic);
if (unsub_callback) unsub_callback(topic);
} else if (!strncmp(reply->element[0]->str, "puns", 4)) {
psubscribed_topics_.erase(topic);
if (unsub_callback) unsub_callback(topic);
}
else logger_.error() << "Unknown pubsub message: " << reply->element[0]->str;
}
// Message for subscribe
else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 3)) {
char* msg = reply->element[2]->str;
if (msg && msg_callback) msg_callback(topic, reply->element[2]->str);
}
// Message for psubscribe
else if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements == 4)) {
char* msg = reply->element[2]->str;
if (msg && msg_callback) msg_callback(reply->element[2]->str, reply->element[3]->str);
}
else logger_.error() << "Unknown pubsub message of type " << reply->type;
},
1e10 // To keep the command around for a few hundred years
);
}
void Subscriber::subscribe(const string topic,
function<void(const string&, const string&)> msg_callback,
function<void(const string&)> sub_callback,
function<void(const string&)> unsub_callback,
function<void(const string&, int)> err_callback
) {
if(subscribed_topics_.find(topic) != subscribed_topics_.end()) {
logger_.warning() << "Already subscribed to " << topic << "!";
return;
}
subscribeBase("SUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback);
}
void Subscriber::psubscribe(const string topic,
function<void(const string&, const string&)> msg_callback,
function<void(const string&)> sub_callback,
function<void(const string&)> unsub_callback,
function<void(const string&, int)> err_callback
) {
if(psubscribed_topics_.find(topic) != psubscribed_topics_.end()) {
logger_.warning() << "Already psubscribed to " << topic << "!";
return;
}
subscribeBase("PSUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback);
}
void Subscriber::unsubscribeBase(const string cmd_name, const string topic,
function<void(const string&, int)> err_callback
) {
rdx_.command<redisReply*>(cmd_name + " " + topic,
[topic, err_callback](Command<redisReply*>& c) {
if(!c.ok()) {
if (err_callback) err_callback(topic, c.status());
}
}
);
}
void Subscriber::unsubscribe(const string topic,
function<void(const string&, int)> err_callback
) {
if(subscribed_topics_.find(topic) == subscribed_topics_.end()) {
logger_.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!";
return;
}
unsubscribeBase("UNSUBSCRIBE", topic, err_callback);
}
void Subscriber::punsubscribe(const string topic,
function<void(const string&, int)> err_callback
) {
if(psubscribed_topics_.find(topic) == psubscribed_topics_.end()) {
logger_.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!";
return;
}
unsubscribeBase("PUNSUBSCRIBE", topic, err_callback);
}
} // End namespace