Commit 3c3a273b70a3fa09ffe1c7b1ad9aaaa9efc8ad12

Authored by Patric Stout
1 parent 6f0b3228

feat: callback to be informed about state changes

Useful if you want to take action when a connection to the broker
is established or lost. For example, after a reconnection you can
expect to receive all retained messages again on the topics you are
subscribed too.
example/pubsub/main.cpp
@@ -6,7 +6,9 @@ @@ -6,7 +6,9 @@
6 */ 6 */
7 7
8 #include <TrueMQTT.h> 8 #include <TrueMQTT.h>
  9 +
9 #include <iostream> 10 #include <iostream>
  11 +#include <magic_enum.hpp>
10 #include <thread> 12 #include <thread>
11 13
12 int main() 14 int main()
@@ -15,10 +17,10 @@ int main() @@ -15,10 +17,10 @@ int main()
15 TrueMQTT::Client client("localhost", 1883, "test"); 17 TrueMQTT::Client client("localhost", 1883, "test");
16 18
17 client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string_view message) 19 client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string_view message)
18 - { std::cout << "Log " << level << ": " << message << std::endl; }); 20 + { std::cout << "Log " << std::string(magic_enum::enum_name(level)) << ": " << message << std::endl; });
19 client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 10); 21 client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 10);
20 client.setErrorCallback([](TrueMQTT::Client::Error error, std::string_view message) 22 client.setErrorCallback([](TrueMQTT::Client::Error error, std::string_view message)
21 - { std::cout << "Error " << error << ": " << message << std::endl; }); 23 + { std::cout << "Error " << std::string(magic_enum::enum_name(error)) << ": " << message << std::endl; });
22 client.setLastWill("example/pubsub/lastwill", "example pubsub finished", true); 24 client.setLastWill("example/pubsub/lastwill", "example pubsub finished", true);
23 25
24 client.connect(); 26 client.connect();
example/stress/main.cpp
@@ -6,7 +6,9 @@ @@ -6,7 +6,9 @@
6 */ 6 */
7 7
8 #include <TrueMQTT.h> 8 #include <TrueMQTT.h>
  9 +
9 #include <iostream> 10 #include <iostream>
  11 +#include <magic_enum.hpp>
10 #include <thread> 12 #include <thread>
11 13
12 int main() 14 int main()
@@ -15,10 +17,10 @@ int main() @@ -15,10 +17,10 @@ int main()
15 TrueMQTT::Client client("localhost", 1883, "test"); 17 TrueMQTT::Client client("localhost", 1883, "test");
16 18
17 client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string_view message) 19 client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string_view message)
18 - { std::cout << "Log " << level << ": " << message << std::endl; }); 20 + { std::cout << "Log " << std::string(magic_enum::enum_name(level)) << ": " << message << std::endl; });
19 client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 100); 21 client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 100);
20 client.setErrorCallback([](TrueMQTT::Client::Error error, std::string_view message) 22 client.setErrorCallback([](TrueMQTT::Client::Error error, std::string_view message)
21 - { std::cout << "Error " << error << ": " << message << std::endl; }); 23 + { std::cout << "Error " << std::string(magic_enum::enum_name(error)) << ": " << message << std::endl; });
22 client.setLastWill("test/lastwill", "example pubsub finished", true); 24 client.setLastWill("test/lastwill", "example pubsub finished", true);
23 25
24 client.connect(); 26 client.connect();
include/TrueMQTT.h
@@ -24,7 +24,7 @@ namespace TrueMQTT @@ -24,7 +24,7 @@ namespace TrueMQTT
24 /** 24 /**
25 * @brief Error codes that can be returned in the callback set by \ref setErrorCallback. 25 * @brief Error codes that can be returned in the callback set by \ref setErrorCallback.
26 */ 26 */
27 - enum Error 27 + enum class Error
28 { 28 {
29 /** 29 /**
30 * @brief The hostname could not be resolved into either an IPv4 or IPv6 address. 30 * @brief The hostname could not be resolved into either an IPv4 or IPv6 address.
@@ -62,7 +62,7 @@ namespace TrueMQTT @@ -62,7 +62,7 @@ namespace TrueMQTT
62 * After all, memory is finite, so this allows you to configure what scenario 62 * After all, memory is finite, so this allows you to configure what scenario
63 * works best for you. 63 * works best for you.
64 */ 64 */
65 - enum PublishQueueType 65 + enum class PublishQueueType
66 { 66 {
67 DROP, ///< Do not queue. 67 DROP, ///< Do not queue.
68 68
@@ -96,7 +96,7 @@ namespace TrueMQTT @@ -96,7 +96,7 @@ namespace TrueMQTT
96 /** 96 /**
97 * @brief The log levels used by this library. 97 * @brief The log levels used by this library.
98 */ 98 */
99 - enum LogLevel 99 + enum class LogLevel
100 { 100 {
101 NONE, ///< Do not log anything (default). 101 NONE, ///< Do not log anything (default).
102 ERROR, ///< Something went wrong and the library cannot recover. 102 ERROR, ///< Something went wrong and the library cannot recover.
@@ -107,6 +107,16 @@ namespace TrueMQTT @@ -107,6 +107,16 @@ namespace TrueMQTT
107 }; 107 };
108 108
109 /** 109 /**
  110 + * @brief The state of the connection.
  111 + */
  112 + enum class State
  113 + {
  114 + DISCONNECTED, ///< The client is not connected to a broker.
  115 + CONNECTING, ///< The client is (re)connecting to a broker.
  116 + CONNECTED, ///< The client is connected to a broker.
  117 + };
  118 +
  119 + /**
110 * @brief Constructor for the MQTT client. 120 * @brief Constructor for the MQTT client.
111 * 121 *
112 * @param host The hostname of the MQTT broker. Can be either an IP or a domain name. 122 * @param host The hostname of the MQTT broker. Can be either an IP or a domain name.
@@ -195,6 +205,13 @@ namespace TrueMQTT @@ -195,6 +205,13 @@ namespace TrueMQTT
195 void setSendQueue(size_t size) const; 205 void setSendQueue(size_t size) const;
196 206
197 /** 207 /**
  208 + * @brief Set the state-change callback.
  209 + *
  210 + * @param callback The callback to call when the state changes.
  211 + */
  212 + void setStateChangeCallback(const std::function<void(State)> &callback) const;
  213 +
  214 + /**
198 * @brief Connect to the broker. 215 * @brief Connect to the broker.
199 * 216 *
200 * After calling this function, the library will try a connection to the broker. 217 * After calling this function, the library will try a connection to the broker.
src/Client.cpp
@@ -64,7 +64,7 @@ void TrueMQTT::Client::setLogger(Client::LogLevel log_level, const std::function @@ -64,7 +64,7 @@ void TrueMQTT::Client::setLogger(Client::LogLevel log_level, const std::function
64 64
65 void TrueMQTT::Client::setLastWill(const std::string_view topic, const std::string_view message, bool retain) const 65 void TrueMQTT::Client::setLastWill(const std::string_view topic, const std::string_view message, bool retain) const
66 { 66 {
67 - if (m_impl->m_state != Client::Impl::State::DISCONNECTED) 67 + if (m_impl->m_state != Client::State::DISCONNECTED)
68 { 68 {
69 LOG_ERROR(m_impl, "Cannot set last will when not disconnected"); 69 LOG_ERROR(m_impl, "Cannot set last will when not disconnected");
70 return; 70 return;
@@ -86,7 +86,7 @@ void TrueMQTT::Client::setErrorCallback(const std::function&lt;void(Error, std::str @@ -86,7 +86,7 @@ void TrueMQTT::Client::setErrorCallback(const std::function&lt;void(Error, std::str
86 86
87 void TrueMQTT::Client::setPublishQueue(Client::PublishQueueType queue_type, size_t size) const 87 void TrueMQTT::Client::setPublishQueue(Client::PublishQueueType queue_type, size_t size) const
88 { 88 {
89 - if (m_impl->m_state != Client::Impl::State::DISCONNECTED) 89 + if (m_impl->m_state != Client::State::DISCONNECTED)
90 { 90 {
91 LOG_ERROR(m_impl, "Cannot set publish queue when not disconnected"); 91 LOG_ERROR(m_impl, "Cannot set publish queue when not disconnected");
92 return; 92 return;
@@ -100,7 +100,7 @@ void TrueMQTT::Client::setPublishQueue(Client::PublishQueueType queue_type, size @@ -100,7 +100,7 @@ void TrueMQTT::Client::setPublishQueue(Client::PublishQueueType queue_type, size
100 100
101 void TrueMQTT::Client::setSendQueue(size_t size) const 101 void TrueMQTT::Client::setSendQueue(size_t size) const
102 { 102 {
103 - if (m_impl->m_state != Client::Impl::State::DISCONNECTED) 103 + if (m_impl->m_state != Client::State::DISCONNECTED)
104 { 104 {
105 LOG_ERROR(m_impl, "Cannot set send queue when not disconnected"); 105 LOG_ERROR(m_impl, "Cannot set send queue when not disconnected");
106 return; 106 return;
@@ -111,18 +111,27 @@ void TrueMQTT::Client::setSendQueue(size_t size) const @@ -111,18 +111,27 @@ void TrueMQTT::Client::setSendQueue(size_t size) const
111 m_impl->m_send_queue_size = size; 111 m_impl->m_send_queue_size = size;
112 } 112 }
113 113
  114 +void TrueMQTT::Client::setStateChangeCallback(const std::function<void(State)> &callback) const
  115 +{
  116 + LOG_TRACE(m_impl, "Setting state callback");
  117 +
  118 + m_impl->m_state_change_callback = callback;
  119 +}
  120 +
114 void TrueMQTT::Client::connect() const 121 void TrueMQTT::Client::connect() const
115 { 122 {
116 std::scoped_lock lock(m_impl->m_state_mutex); 123 std::scoped_lock lock(m_impl->m_state_mutex);
117 124
118 - if (m_impl->m_state != Client::Impl::State::DISCONNECTED) 125 + if (m_impl->m_state != Client::State::DISCONNECTED)
119 { 126 {
  127 + LOG_ERROR(m_impl, "Can't connect when already connecting / connected");
120 return; 128 return;
121 } 129 }
122 130
123 LOG_INFO(m_impl, "Connecting to " + m_impl->m_host + ":" + std::to_string(m_impl->m_port)); 131 LOG_INFO(m_impl, "Connecting to " + m_impl->m_host + ":" + std::to_string(m_impl->m_port));
124 132
125 - m_impl->m_state = Client::Impl::State::CONNECTING; 133 + m_impl->m_state = Client::State::CONNECTING;
  134 + m_impl->m_state_change_callback(m_impl->m_state);
126 m_impl->connect(); 135 m_impl->connect();
127 } 136 }
128 137
@@ -130,15 +139,16 @@ void TrueMQTT::Client::disconnect() const @@ -130,15 +139,16 @@ void TrueMQTT::Client::disconnect() const
130 { 139 {
131 std::scoped_lock lock(m_impl->m_state_mutex); 140 std::scoped_lock lock(m_impl->m_state_mutex);
132 141
133 - if (m_impl->m_state == Client::Impl::State::DISCONNECTED) 142 + if (m_impl->m_state == Client::State::DISCONNECTED)
134 { 143 {
135 - LOG_TRACE(m_impl, "Already disconnected"); 144 + LOG_ERROR(m_impl, "Can't disconnect when already disconnected");
136 return; 145 return;
137 } 146 }
138 147
139 LOG_INFO(m_impl, "Disconnecting from broker"); 148 LOG_INFO(m_impl, "Disconnecting from broker");
140 149
141 - m_impl->m_state = Client::Impl::State::DISCONNECTED; 150 + m_impl->m_state = Client::State::DISCONNECTED;
  151 + m_impl->m_state_change_callback(m_impl->m_state);
142 m_impl->disconnect(); 152 m_impl->disconnect();
143 } 153 }
144 154
@@ -150,12 +160,12 @@ bool TrueMQTT::Client::publish(const std::string_view topic, const std::string_v @@ -150,12 +160,12 @@ bool TrueMQTT::Client::publish(const std::string_view topic, const std::string_v
150 160
151 switch (m_impl->m_state) 161 switch (m_impl->m_state)
152 { 162 {
153 - case Client::Impl::State::DISCONNECTED: 163 + case Client::State::DISCONNECTED:
154 LOG_ERROR(m_impl, "Cannot publish when disconnected"); 164 LOG_ERROR(m_impl, "Cannot publish when disconnected");
155 return false; 165 return false;
156 - case Client::Impl::State::CONNECTING: 166 + case Client::State::CONNECTING:
157 return m_impl->toPublishQueue(topic, message, retain); 167 return m_impl->toPublishQueue(topic, message, retain);
158 - case Client::Impl::State::CONNECTED: 168 + case Client::State::CONNECTED:
159 return m_impl->sendPublish(topic, message, retain); 169 return m_impl->sendPublish(topic, message, retain);
160 } 170 }
161 171
@@ -166,7 +176,7 @@ void TrueMQTT::Client::subscribe(const std::string_view topic, const std::functi @@ -166,7 +176,7 @@ void TrueMQTT::Client::subscribe(const std::string_view topic, const std::functi
166 { 176 {
167 std::scoped_lock lock(m_impl->m_state_mutex); 177 std::scoped_lock lock(m_impl->m_state_mutex);
168 178
169 - if (m_impl->m_state == Client::Impl::State::DISCONNECTED) 179 + if (m_impl->m_state == Client::State::DISCONNECTED)
170 { 180 {
171 LOG_ERROR(m_impl, "Cannot subscribe when disconnected"); 181 LOG_ERROR(m_impl, "Cannot subscribe when disconnected");
172 return; 182 return;
@@ -210,7 +220,7 @@ void TrueMQTT::Client::subscribe(const std::string_view topic, const std::functi @@ -210,7 +220,7 @@ void TrueMQTT::Client::subscribe(const std::string_view topic, const std::functi
210 subscriptions->callbacks.push_back(callback); 220 subscriptions->callbacks.push_back(callback);
211 221
212 m_impl->m_subscription_topics.emplace(topic); 222 m_impl->m_subscription_topics.emplace(topic);
213 - if (m_impl->m_state == Client::Impl::State::CONNECTED) 223 + if (m_impl->m_state == Client::State::CONNECTED)
214 { 224 {
215 if (!m_impl->sendSubscribe(topic)) 225 if (!m_impl->sendSubscribe(topic))
216 { 226 {
@@ -225,7 +235,7 @@ void TrueMQTT::Client::unsubscribe(const std::string_view topic) const @@ -225,7 +235,7 @@ void TrueMQTT::Client::unsubscribe(const std::string_view topic) const
225 { 235 {
226 std::scoped_lock lock(m_impl->m_state_mutex); 236 std::scoped_lock lock(m_impl->m_state_mutex);
227 237
228 - if (m_impl->m_state == Client::Impl::State::DISCONNECTED) 238 + if (m_impl->m_state == Client::State::DISCONNECTED)
229 { 239 {
230 LOG_ERROR(m_impl, "Cannot unsubscribe when disconnected"); 240 LOG_ERROR(m_impl, "Cannot unsubscribe when disconnected");
231 return; 241 return;
@@ -302,7 +312,7 @@ void TrueMQTT::Client::unsubscribe(const std::string_view topic) const @@ -302,7 +312,7 @@ void TrueMQTT::Client::unsubscribe(const std::string_view topic) const
302 } 312 }
303 313
304 m_impl->m_subscription_topics.erase(m_impl->m_subscription_topics.find(topic)); 314 m_impl->m_subscription_topics.erase(m_impl->m_subscription_topics.find(topic));
305 - if (m_impl->m_state == Client::Impl::State::CONNECTED) 315 + if (m_impl->m_state == Client::State::CONNECTED)
306 { 316 {
307 if (!m_impl->sendUnsubscribe(topic)) 317 if (!m_impl->sendUnsubscribe(topic))
308 { 318 {
@@ -321,7 +331,8 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected) @@ -321,7 +331,8 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected)
321 { 331 {
322 LOG_INFO(this, "Connected to broker"); 332 LOG_INFO(this, "Connected to broker");
323 333
324 - m_state = Client::Impl::State::CONNECTED; 334 + m_state = Client::State::CONNECTED;
  335 + m_state_change_callback(m_state);
325 336
326 // Restoring subscriptions and flushing the queue is done while still under 337 // Restoring subscriptions and flushing the queue is done while still under
327 // the lock. This to prevent \ref disconnect from being called while we are 338 // the lock. This to prevent \ref disconnect from being called while we are
@@ -356,13 +367,14 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected) @@ -356,13 +367,14 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected)
356 else 367 else
357 { 368 {
358 LOG_INFO(this, "Disconnected from broker"); 369 LOG_INFO(this, "Disconnected from broker");
359 - m_state = Client::Impl::State::CONNECTING; 370 + m_state = Client::State::CONNECTING;
  371 + m_state_change_callback(m_state);
360 } 372 }
361 } 373 }
362 374
363 bool TrueMQTT::Client::Impl::toPublishQueue(const std::string_view topic, const std::string_view message, bool retain) 375 bool TrueMQTT::Client::Impl::toPublishQueue(const std::string_view topic, const std::string_view message, bool retain)
364 { 376 {
365 - if (m_state != Client::Impl::State::CONNECTING) 377 + if (m_state != Client::State::CONNECTING)
366 { 378 {
367 LOG_ERROR(this, "Cannot queue publish message when not connecting"); 379 LOG_ERROR(this, "Cannot queue publish message when not connecting");
368 return false; 380 return false;
src/ClientImpl.h
@@ -31,13 +31,6 @@ public: @@ -31,13 +31,6 @@ public:
31 std::chrono::milliseconds keep_alive_interval); 31 std::chrono::milliseconds keep_alive_interval);
32 ~Impl(); 32 ~Impl();
33 33
34 - enum State  
35 - {  
36 - DISCONNECTED, ///< The client is not connected to the broker, nor is it trying to connect.  
37 - CONNECTING, ///< The client is either connecting or reconnecting to the broker. This can be in any stage of the connection process.  
38 - CONNECTED, ///< The client is connected to the broker.  
39 - };  
40 -  
41 class SubscriptionPart 34 class SubscriptionPart
42 { 35 {
43 public: 36 public:
@@ -70,6 +63,8 @@ public: @@ -70,6 +63,8 @@ public:
70 Client::LogLevel m_log_level = Client::LogLevel::NONE; ///< The log level to use. 63 Client::LogLevel m_log_level = Client::LogLevel::NONE; ///< The log level to use.
71 std::function<void(Client::LogLevel, std::string_view)> m_logger = [](Client::LogLevel, std::string_view) { /* empty */ }; ///< Logger callback. 64 std::function<void(Client::LogLevel, std::string_view)> m_logger = [](Client::LogLevel, std::string_view) { /* empty */ }; ///< Logger callback.
72 65
  66 + std::function<void(State)> m_state_change_callback = [](State) { /* empty */ }; ///< Callback when the connection state changes.
  67 +
73 std::string m_last_will_topic = ""; ///< Topic to publish the last will message to. 68 std::string m_last_will_topic = ""; ///< Topic to publish the last will message to.
74 std::string m_last_will_message = ""; ///< Message to publish on the last will topic. 69 std::string m_last_will_message = ""; ///< Message to publish on the last will topic.
75 bool m_last_will_retain = false; ///< Whether to retain the last will message. 70 bool m_last_will_retain = false; ///< Whether to retain the last will message.
src/Packet.cpp
@@ -10,8 +10,7 @@ @@ -10,8 +10,7 @@
10 #include "Log.h" 10 #include "Log.h"
11 #include "Packet.h" 11 #include "Packet.h"
12 12
13 -#include "magic_enum.hpp"  
14 - 13 +#include <magic_enum.hpp>
15 #include <string.h> 14 #include <string.h>
16 15
17 ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) 16 ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length)