Client.cpp
15.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
/*
* Copyright (c) TrueBrain
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "TrueMQTT.h"
#include "ClientImpl.h"
#include "Connection.h"
#include "Log.h"
#include <sstream>
TrueMQTT::Client::Client(const std::string_view host,
int port,
const std::string_view client_id,
std::chrono::milliseconds connection_timeout,
std::chrono::milliseconds connection_backoff,
std::chrono::milliseconds connection_backoff_max,
std::chrono::milliseconds keep_alive_interval)
{
m_impl = std::make_unique<Client::Impl>(host, port, client_id, connection_timeout, connection_backoff, connection_backoff_max, keep_alive_interval);
LOG_TRACE(m_impl, "Constructor of client called");
}
TrueMQTT::Client::~Client()
{
LOG_TRACE(m_impl, "Destructor of client called");
disconnect();
}
TrueMQTT::Client::Impl::Impl(const std::string_view host,
int port,
const std::string_view client_id,
std::chrono::milliseconds connection_timeout,
std::chrono::milliseconds connection_backoff,
std::chrono::milliseconds connection_backoff_max,
std::chrono::milliseconds keep_alive_interval)
: m_host(host),
m_port(port),
m_client_id(client_id),
m_connection_timeout(connection_timeout),
m_connection_backoff(connection_backoff),
m_connection_backoff_max(connection_backoff_max),
m_keep_alive_interval(keep_alive_interval)
{
}
TrueMQTT::Client::Impl::~Impl() = default;
void TrueMQTT::Client::setLogger(Client::LogLevel log_level, const std::function<void(Client::LogLevel, std::string_view)> &logger) const
{
LOG_TRACE(m_impl, "Setting logger to log level " + std::to_string(log_level));
m_impl->m_log_level = log_level;
m_impl->m_logger = logger;
LOG_DEBUG(m_impl, "Log level now on " + std::to_string(m_impl->m_log_level));
}
void TrueMQTT::Client::setLastWill(const std::string_view topic, const std::string_view message, bool retain) const
{
if (m_impl->m_state != Client::State::DISCONNECTED)
{
LOG_ERROR(m_impl, "Cannot set last will when not disconnected");
return;
}
LOG_TRACE(m_impl, "Setting last will to topic " + std::string(topic) + " with message " + std::string(message) + " and retain " + std::to_string(retain));
m_impl->m_last_will_topic = topic;
m_impl->m_last_will_message = message;
m_impl->m_last_will_retain = retain;
}
void TrueMQTT::Client::setErrorCallback(const std::function<void(Error, std::string_view)> &callback) const
{
LOG_TRACE(m_impl, "Setting error callback");
m_impl->m_error_callback = callback;
}
void TrueMQTT::Client::setPublishQueue(Client::PublishQueueType queue_type, size_t size) const
{
if (m_impl->m_state != Client::State::DISCONNECTED)
{
LOG_ERROR(m_impl, "Cannot set publish queue when not disconnected");
return;
}
LOG_TRACE(m_impl, "Setting publish queue to type " + std::to_string(queue_type) + " and size " + std::to_string(size));
m_impl->m_publish_queue_type = queue_type;
m_impl->m_publish_queue_size = size;
}
void TrueMQTT::Client::setSendQueue(size_t size) const
{
if (m_impl->m_state != Client::State::DISCONNECTED)
{
LOG_ERROR(m_impl, "Cannot set send queue when not disconnected");
return;
}
LOG_TRACE(m_impl, "Setting send queue to size " + std::to_string(size));
m_impl->m_send_queue_size = size;
}
void TrueMQTT::Client::setStateChangeCallback(const std::function<void(State)> &callback) const
{
LOG_TRACE(m_impl, "Setting state callback");
m_impl->m_state_change_callback = callback;
}
void TrueMQTT::Client::connect() const
{
std::scoped_lock lock(m_impl->m_state_mutex);
if (m_impl->m_state != Client::State::DISCONNECTED)
{
LOG_ERROR(m_impl, "Can't connect when already connecting / connected");
return;
}
LOG_INFO(m_impl, "Connecting to " + m_impl->m_host + ":" + std::to_string(m_impl->m_port));
m_impl->m_state = Client::State::CONNECTING;
m_impl->m_state_change_callback(m_impl->m_state);
m_impl->connect();
}
void TrueMQTT::Client::disconnect() const
{
std::scoped_lock lock(m_impl->m_state_mutex);
if (m_impl->m_state == Client::State::DISCONNECTED)
{
LOG_ERROR(m_impl, "Can't disconnect when already disconnected");
return;
}
LOG_INFO(m_impl, "Disconnecting from broker");
m_impl->m_state = Client::State::DISCONNECTED;
m_impl->m_state_change_callback(m_impl->m_state);
m_impl->disconnect();
}
bool TrueMQTT::Client::publish(const std::string_view topic, const std::string_view message, bool retain) const
{
std::scoped_lock lock(m_impl->m_state_mutex);
LOG_DEBUG(m_impl, "Publishing message on topic '" + std::string(topic) + "': " + std::string(message) + " (" + (retain ? "retained" : "not retained") + ")");
switch (m_impl->m_state)
{
case Client::State::DISCONNECTED:
LOG_ERROR(m_impl, "Cannot publish when disconnected");
return false;
case Client::State::CONNECTING:
return m_impl->toPublishQueue(topic, message, retain);
case Client::State::CONNECTED:
return m_impl->sendPublish(topic, message, retain);
}
return false;
}
void TrueMQTT::Client::subscribe(const std::string_view topic, const std::function<void(std::string_view, std::string_view)> &callback) const
{
std::scoped_lock lock(m_impl->m_state_mutex);
if (m_impl->m_state == Client::State::DISCONNECTED)
{
LOG_ERROR(m_impl, "Cannot subscribe when disconnected");
return;
}
LOG_DEBUG(m_impl, "Subscribing to topic '" + std::string(topic) + "'");
// Find where in the tree the callback for this subscription should be added.
Client::Impl::SubscriptionPart *subscriptions = nullptr;
std::string_view topic_search = topic;
while (true)
{
std::string_view part = topic_search;
// Find the next part of the topic.
auto pos = topic_search.find('/');
if (pos != std::string_view::npos)
{
part = topic_search.substr(0, pos);
topic_search.remove_prefix(pos + 1);
}
// Find the next subscription in the tree.
if (subscriptions == nullptr)
{
subscriptions = &m_impl->m_subscriptions.try_emplace(std::string(part)).first->second;
}
else
{
subscriptions = &subscriptions->children.try_emplace(std::string(part)).first->second;
}
// If this was the last element, we're done.
if (pos == std::string_view::npos)
{
break;
}
}
// Add the callback to the leaf node.
subscriptions->callbacks.push_back(callback);
m_impl->m_subscription_topics.emplace(topic);
if (m_impl->m_state == Client::State::CONNECTED)
{
if (!m_impl->sendSubscribe(topic))
{
LOG_ERROR(m_impl, "Failed to send subscribe message. Closing connection to broker and trying again");
m_impl->disconnect();
m_impl->connect();
}
}
}
void TrueMQTT::Client::unsubscribe(const std::string_view topic) const
{
std::scoped_lock lock(m_impl->m_state_mutex);
if (m_impl->m_state == Client::State::DISCONNECTED)
{
LOG_ERROR(m_impl, "Cannot unsubscribe when disconnected");
return;
}
if (m_impl->m_subscription_topics.find(topic) == m_impl->m_subscription_topics.end())
{
LOG_ERROR(m_impl, "Cannot unsubscribe from topic '" + std::string(topic) + "' because we are not subscribed to it");
return;
}
LOG_DEBUG(m_impl, "Unsubscribing from topic '" + std::string(topic) + "'");
std::vector<std::tuple<std::string_view, Client::Impl::SubscriptionPart *>> reverse;
// Find where in the tree the callback for this subscription should be removed.
Client::Impl::SubscriptionPart *subscriptions = nullptr;
std::string_view topic_search = topic;
while (true)
{
std::string_view part = topic_search;
// Find the next part of the topic.
auto pos = topic_search.find('/');
if (pos != std::string_view::npos)
{
part = topic_search.substr(0, pos);
topic_search.remove_prefix(pos + 1);
}
// Find the next subscription in the tree.
if (subscriptions == nullptr)
{
subscriptions = &m_impl->m_subscriptions.find(part)->second;
}
else
{
subscriptions = &subscriptions->children.find(part)->second;
}
// Update the reverse lookup.
reverse.emplace_back(part, subscriptions);
// If this was the last element, we're done.
if (pos == std::string_view::npos)
{
break;
}
}
// Clear the callbacks in the leaf node.
subscriptions->callbacks.clear();
// Bookkeeping: remove any empty nodes.
// Otherwise we will slowly grow in memory if a user does a lot of unsubscribes
// on different topics.
std::string_view remove_next = "";
for (auto it = reverse.rbegin(); it != reverse.rend(); it++)
{
if (!remove_next.empty())
{
std::get<1>(*it)->children.erase(std::get<1>(*it)->children.find(remove_next));
remove_next = "";
}
if (std::get<1>(*it)->callbacks.empty() && std::get<1>(*it)->children.empty())
{
remove_next = std::get<0>(*it);
}
}
if (!remove_next.empty())
{
m_impl->m_subscriptions.erase(m_impl->m_subscriptions.find(remove_next));
}
m_impl->m_subscription_topics.erase(m_impl->m_subscription_topics.find(topic));
if (m_impl->m_state == Client::State::CONNECTED)
{
if (!m_impl->sendUnsubscribe(topic))
{
LOG_ERROR(m_impl, "Failed to send subscribe message. Closing connection to broker and trying again");
m_impl->disconnect();
m_impl->connect();
}
}
}
void TrueMQTT::Client::Impl::connectionStateChange(bool connected)
{
std::scoped_lock lock(m_state_mutex);
if (connected)
{
LOG_INFO(this, "Connected to broker");
m_state = Client::State::CONNECTED;
m_state_change_callback(m_state);
// Restoring subscriptions and flushing the queue is done while still under
// the lock. This to prevent \ref disconnect from being called while we are
// still sending messages.
// The drawback is that we are blocking \ref publish and \ref subscribe too
// when they are called just when we create a connection. But in the grand
// scheme of things, this is not likely, and this makes for a far easier
// implementation.
// First restore any subscription.
for (auto &subscription : m_subscription_topics)
{
if (!sendSubscribe(subscription))
{
LOG_ERROR(this, "Failed to send subscribe message. Closing connection to broker and trying again");
disconnect();
connect();
return;
}
}
// Flush the publish queue.
for (const auto &[topic, message, retain] : m_publish_queue)
{
if (!sendPublish(topic, message, retain))
{
LOG_ERROR(this, "Failed to send queued publish message. Discarding rest of queue");
break;
}
}
m_publish_queue.clear();
}
else
{
LOG_INFO(this, "Disconnected from broker");
m_state = Client::State::CONNECTING;
m_state_change_callback(m_state);
}
}
bool TrueMQTT::Client::Impl::toPublishQueue(const std::string_view topic, const std::string_view message, bool retain)
{
if (m_state != Client::State::CONNECTING)
{
LOG_ERROR(this, "Cannot queue publish message when not connecting");
return false;
}
switch (m_publish_queue_type)
{
case Client::PublishQueueType::DROP:
LOG_WARNING(this, "Publish queue is disabled, dropping message");
return false;
case Client::PublishQueueType::FIFO:
if (m_publish_queue.size() >= m_publish_queue_size)
{
LOG_WARNING(this, "Publish queue is full, dropping oldest message on queue");
m_publish_queue.pop_front();
}
break;
case Client::PublishQueueType::LIFO:
if (m_publish_queue.size() >= m_publish_queue_size)
{
LOG_WARNING(this, "Publish queue is full, dropping newest message on queue");
m_publish_queue.pop_back();
}
break;
}
LOG_TRACE(this, "Adding message to publish queue");
m_publish_queue.emplace_back(topic, message, retain);
return true;
}
void TrueMQTT::Client::Impl::findSubscriptionMatch(std::string_view topic, std::string_view message, std::string_view topic_search, const std::map<std::string, Client::Impl::SubscriptionPart, std::less<>> &subscriptions)
{
std::string_view part = topic_search;
// Find the next part of the topic.
auto pos = topic_search.find('/');
if (pos != std::string_view::npos)
{
part = topic_search.substr(0, pos);
topic_search.remove_prefix(pos + 1);
}
// Find the match based on the part.
auto it = subscriptions.find(part);
if (it != subscriptions.end())
{
LOG_TRACE(this, "Found subscription match for part '" + std::string(part) + "' with " + std::to_string(it->second.callbacks.size()) + " callbacks");
for (const auto &callback : it->second.callbacks)
{
callback(topic, message);
}
// Recursively find the match for the next part if we didn't reach the end.
if (pos != std::string_view::npos)
{
findSubscriptionMatch(topic, message, topic_search, it->second.children);
}
}
// Find the match if this part is a wildcard.
it = subscriptions.find("+");
if (it != subscriptions.end())
{
LOG_TRACE(this, "Found subscription match for '+' with " + std::to_string(it->second.callbacks.size()) + " callbacks");
for (const auto &callback : it->second.callbacks)
{
callback(topic, message);
}
// Recursively find the match for the next part if we didn't reach the end.
if (pos != std::string_view::npos)
{
findSubscriptionMatch(topic, message, topic_search, it->second.children);
}
}
// Find the match if the remaining is a wildcard.
it = subscriptions.find("#");
if (it != subscriptions.end())
{
LOG_TRACE(this, "Found subscription match for '#' with " + std::to_string(it->second.callbacks.size()) + " callbacks");
for (const auto &callback : it->second.callbacks)
{
callback(topic, message);
}
// No more recursion here, as we implicit consume the rest of the parts too.
}
}
void TrueMQTT::Client::Impl::messageReceived(std::string_view topic, std::string_view message)
{
std::scoped_lock lock(m_state_mutex);
LOG_TRACE(this, "Message received on topic '" + std::string(topic) + "': " + std::string(message));
if (m_state != State::CONNECTED)
{
// This happens easily when the subscribed to a really busy topic and you disconnect.
LOG_ERROR(this, "Received message while not connected");
return;
}
findSubscriptionMatch(topic, message, topic, m_subscriptions);
}