Commit 49cd81e23234674c41529dbc83bc11c3fc661801

Authored by Patric Stout
1 parent ed2f25ca

feat(client): finish everything up till the actual socket communication

All administration should been taken care of now, and the only
thing remaining is creating connect/disconnect and implementing
the sendXXX functions.
CMakeLists.txt
@@ -12,7 +12,7 @@ project(truemqtt VERSION 1.0.0 DESCRIPTION "A modern C++ MQTT Client library") @@ -12,7 +12,7 @@ project(truemqtt VERSION 1.0.0 DESCRIPTION "A modern C++ MQTT Client library")
12 set(CMAKE_CXX_STANDARD 17) 12 set(CMAKE_CXX_STANDARD 17)
13 set(CMAKE_CXX_STANDARD_REQUIRED True) 13 set(CMAKE_CXX_STANDARD_REQUIRED True)
14 14
15 -set(MIN_LOGGER_LEVEL "INFO" CACHE STRING "Set minimal logger level (TRACE, DEBUG, INFO, WARN, ERROR). No logs below this level will be omitted.") 15 +set(MIN_LOGGER_LEVEL "INFO" CACHE STRING "Set minimal logger level (TRACE, DEBUG, INFO, WARNING, ERROR). No logs below this level will be omitted.")
16 16
17 include(GNUInstallDirs) 17 include(GNUInstallDirs)
18 18
example/pubsub/main.cpp
@@ -7,26 +7,36 @@ @@ -7,26 +7,36 @@
7 7
8 #include <TrueMQTT.h> 8 #include <TrueMQTT.h>
9 #include <iostream> 9 #include <iostream>
  10 +#include <thread>
10 11
11 int main() 12 int main()
12 { 13 {
13 // Create a connection to the local broker. 14 // Create a connection to the local broker.
14 TrueMQTT::Client client("localhost", 1883, "test"); 15 TrueMQTT::Client client("localhost", 1883, "test");
15 16
16 - client.setLogger(TrueMQTT::Client::LogLevel::TRACE, [](TrueMQTT::Client::LogLevel level, std::string message) {  
17 - std::cout << "Log " << level << ": " << message << std::endl;  
18 - }); 17 + client.setLogger(TrueMQTT::Client::LogLevel::TRACE, [](TrueMQTT::Client::LogLevel level, std::string message)
  18 + { std::cout << "Log " << level << ": " << message << std::endl; });
  19 + client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 10);
19 20
20 client.connect(); 21 client.connect();
21 22
  23 + bool stop = false;
  24 +
22 // Subscribe to the topic we will be publishing under in a bit. 25 // Subscribe to the topic we will be publishing under in a bit.
23 - client.subscribe("test", [](const std::string &topic, const std::string &payload) { 26 + client.subscribe("test", [&stop](const std::string &topic, const std::string &payload)
  27 + {
24 std::cout << "Received message on topic " << topic << ": " << payload << std::endl; 28 std::cout << "Received message on topic " << topic << ": " << payload << std::endl;
25 - }); 29 + stop = true; });
26 30
27 // Publish a message on the same topic as we subscribed too. 31 // Publish a message on the same topic as we subscribed too.
28 client.publish("test", "Hello World!", false); 32 client.publish("test", "Hello World!", false);
29 33
  34 + // Wait till we receive the message back on our subscription.
  35 + while (!stop)
  36 + {
  37 + std::this_thread::sleep_for(std::chrono::milliseconds(10));
  38 + }
  39 +
30 client.disconnect(); 40 client.disconnect();
31 41
32 return 0; 42 return 0;
include/TrueMQTT.h
@@ -32,13 +32,45 @@ namespace TrueMQTT @@ -32,13 +32,45 @@ namespace TrueMQTT
32 32
33 /** 33 /**
34 * @brief The type of queue that can be set for publishing messages. 34 * @brief The type of queue that can be set for publishing messages.
  35 + *
  36 + * When there is no connection to a broker, some choices have to be made what
  37 + * to do with messages that are published.
  38 + *
  39 + * - Do we queue the message?
  40 + * - What do we do when the queue is full?
  41 + *
  42 + * After all, memory is finite, so this allows you to configure what scenario
  43 + * works best for you.
35 */ 44 */
36 - enum QueueType 45 + enum PublishQueueType
37 { 46 {
38 - DROP, ///< Do not queue.  
39 - FIFO, ///< Global FIFO.  
40 - LIFO, ///< Global LIFO.  
41 - LIFO_PER_TOPIC, ///< Per topic LIFO. 47 + DROP, ///< Do not queue.
  48 +
  49 + /**
  50 + * @brief First-in-First-out queue.
  51 + *
  52 + * For a size 3 queue this means if we publish 6 messages (M1 .. M6), the result is:
  53 + *
  54 + * [ M4, M5, M6 ]
  55 + *
  56 + * When publishing the next message (M7) it becomes:
  57 + *
  58 + * [ M5, M6, M7 ]
  59 + */
  60 + FIFO,
  61 +
  62 + /**
  63 + * @brief Last-in-First-out queue.
  64 + *
  65 + * For a size 3 queue this means if we publish 6 messages (M1 .. M6), the result is:
  66 + *
  67 + * [ M1, M2, M6 ]
  68 + *
  69 + * When publishing the next message (M7) it becomes:
  70 + *
  71 + * [ M1, M2, M7 ]
  72 + */
  73 + LIFO,
42 }; 74 };
43 75
44 /** 76 /**
@@ -90,11 +122,14 @@ namespace TrueMQTT @@ -90,11 +122,14 @@ namespace TrueMQTT
90 * @param topic The topic to publish the last will message to. 122 * @param topic The topic to publish the last will message to.
91 * @param payload The payload of the last will message. 123 * @param payload The payload of the last will message.
92 * @param retain Whether to retain the last will message. 124 * @param retain Whether to retain the last will message.
  125 + *
  126 + * @note Cannot be called after \ref connect.
93 */ 127 */
94 void setLastWill(const std::string &topic, const std::string &payload, bool retain); 128 void setLastWill(const std::string &topic, const std::string &payload, bool retain);
95 129
96 /** 130 /**
97 * @brief Set the error callback, called when any error occurs. 131 * @brief Set the error callback, called when any error occurs.
  132 + *
98 * @param callback The callback to call when an error occurs. 133 * @param callback The callback to call when an error occurs.
99 */ 134 */
100 void setErrorCallback(std::function<void(Error, std::string &)> callback); 135 void setErrorCallback(std::function<void(Error, std::string &)> callback);
@@ -102,10 +137,12 @@ namespace TrueMQTT @@ -102,10 +137,12 @@ namespace TrueMQTT
102 /** 137 /**
103 * @brief Set the publish queue to use. 138 * @brief Set the publish queue to use.
104 * 139 *
105 - * @param queue_type The \ref QueueType to use for the publish queue. 140 + * @param queue_type The \ref PublishQueueType to use for the publish queue.
106 * @param size The size of the queue. If the queue is full, the type of queue defines what happens. 141 * @param size The size of the queue. If the queue is full, the type of queue defines what happens.
  142 + *
  143 + * @note Cannot be called after \ref connect.
107 */ 144 */
108 - void setPublishQueue(QueueType queue_type, int size); 145 + void setPublishQueue(PublishQueueType queue_type, size_t size);
109 146
110 /** 147 /**
111 * @brief Connect to the broker. 148 * @brief Connect to the broker.
@@ -128,12 +165,19 @@ namespace TrueMQTT @@ -128,12 +165,19 @@ namespace TrueMQTT
128 * Additionally, it will clean any publish / subscribe information it has. 165 * Additionally, it will clean any publish / subscribe information it has.
129 * 166 *
130 * @note Calling disconnect twice has no effect. 167 * @note Calling disconnect twice has no effect.
  168 + * @note This function can stall for a short moment if you disconnect just at the
  169 + * moment the connection to the broker is established, and there are messages in the
  170 + * publish queue and/or subscriptions.
131 */ 171 */
132 void disconnect(); 172 void disconnect();
133 173
134 /** 174 /**
135 * @brief Publish a payload on a topic. 175 * @brief Publish a payload on a topic.
136 * 176 *
  177 + * After \ref connect is called, this function will either publish the message
  178 + * immediately (if connected) or queue it for later (if still connecting).
  179 + * In the latter case, it will be published as soon as the connection is established.
  180 + *
137 * @param topic The topic to publish the payload on. 181 * @param topic The topic to publish the payload on.
138 * @param payload The payload to publish. 182 * @param payload The payload to publish.
139 * @param retain Whether to retain the message on the broker. 183 * @param retain Whether to retain the message on the broker.
@@ -142,12 +186,23 @@ namespace TrueMQTT @@ -142,12 +186,23 @@ namespace TrueMQTT
142 * other QoS level. 186 * other QoS level.
143 * @note This call is non-blocking, and it is not possible to know whether the message 187 * @note This call is non-blocking, and it is not possible to know whether the message
144 * was actually published or not. 188 * was actually published or not.
  189 + * @note You cannot publish a message if you are disconnected from the broker. Call
  190 + * \ref connect first.
  191 + * @note This function can stall for a short moment if you publish just at the
  192 + * moment the connection to the broker is established, and there are messages in the
  193 + * publish queue and/or subscriptions.
145 */ 194 */
146 void publish(const std::string &topic, const std::string &payload, bool retain); 195 void publish(const std::string &topic, const std::string &payload, bool retain);
147 196
148 /** 197 /**
149 * @brief Subscribe to a topic, and call the callback function when a message arrives. 198 * @brief Subscribe to a topic, and call the callback function when a message arrives.
150 * 199 *
  200 + * After \ref connect is called, this function will either subscribe to the topic
  201 + * immediately (if connected) or subscribe to it once a connection has been made.
  202 + * In case of a reconnect, it will also automatically resubscribe.
  203 + *
  204 + * If the broker refuses the subscribe request, the error-callback is called.
  205 + *
151 * @param topic The topic to subscribe to. 206 * @param topic The topic to subscribe to.
152 * @param callback The callback to call when a message arrives on this topic. 207 * @param callback The callback to call when a message arrives on this topic.
153 * 208 *
@@ -155,15 +210,27 @@ namespace TrueMQTT @@ -155,15 +210,27 @@ namespace TrueMQTT
155 * If you do, the callback of the first subscription will be overwritten. 210 * If you do, the callback of the first subscription will be overwritten.
156 * In other words, "a/+" and "a/b" is fine, and callbacks for both subscribes will be 211 * In other words, "a/+" and "a/b" is fine, and callbacks for both subscribes will be
157 * called when something is published on "a/b". 212 * called when something is published on "a/b".
  213 + * @note You cannot subscribe a topic if you are disconnected from the broker. Call
  214 + * \ref connect first.
  215 + * @note This function can stall for a short moment if you publish just at the
  216 + * moment the connection to the broker is established, and there are messages in the
  217 + * publish queue and/or subscriptions.
158 */ 218 */
159 void subscribe(const std::string &topic, std::function<void(std::string, std::string)> callback); 219 void subscribe(const std::string &topic, std::function<void(std::string, std::string)> callback);
160 220
161 /** 221 /**
162 * @brief Unsubscribe from a topic. 222 * @brief Unsubscribe from a topic.
163 * 223 *
  224 + * If the broker refuses the unsubscribe request, the error-callback is called.
  225 + *
164 * @param topic The topic to unsubscribe from. 226 * @param topic The topic to unsubscribe from.
165 * 227 *
166 * @note If you unsubscribe from a topic you were not subscribed too, nothing happens. 228 * @note If you unsubscribe from a topic you were not subscribed too, nothing happens.
  229 + * @note You cannot unsubscribe from a topic if you are disconnected from the broker.
  230 + * Call \ref connect (and \ref subscribe) first.
  231 + * @note This function can stall for a short moment if you publish just at the
  232 + * moment the connection to the broker is established, and there are messages in the
  233 + * publish queue and/or subscriptions.
167 */ 234 */
168 void unsubscribe(const std::string &topic); 235 void unsubscribe(const std::string &topic);
169 236
src/Client.cpp
@@ -8,6 +8,9 @@ @@ -8,6 +8,9 @@
8 #include "TrueMQTT.h" 8 #include "TrueMQTT.h"
9 #include "Log.h" 9 #include "Log.h"
10 10
  11 +#include <deque>
  12 +#include <map>
  13 +#include <mutex>
11 #include <string> 14 #include <string>
12 15
13 using TrueMQTT::Client; 16 using TrueMQTT::Client;
@@ -29,12 +32,19 @@ public: @@ -29,12 +32,19 @@ public:
29 32
30 enum State 33 enum State
31 { 34 {
32 - DISCONNECTED,  
33 - CONNECTING,  
34 - CONNECTED, 35 + DISCONNECTED, ///< The client is not connected to the broker, nor is it trying to connect.
  36 + CONNECTING, ///< The client is either connecting or reconnecting to the broker. This can be in any stage of the connection process.
  37 + CONNECTED, ///< The client is connected to the broker.
35 }; 38 };
36 39
  40 + void sendPublish(const std::string &topic, const std::string &payload, bool retain); ///< Send a publish message to the broker.
  41 + void sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker.
  42 + void sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker.
  43 + void changeToConnected(); ///< Called when a connection goes from CONNECTING state to CONNECTED state.
  44 + void toPublishQueue(const std::string &topic, const std::string &payload, bool retain); ///< Add a publish message to the publish queue.
  45 +
37 State state = State::DISCONNECTED; ///< The current state of the client. 46 State state = State::DISCONNECTED; ///< The current state of the client.
  47 + std::mutex state_mutex; ///< Mutex to protect state changes.
38 48
39 std::string host; ///< Host of the broker. 49 std::string host; ///< Host of the broker.
40 int port; ///< Port of the broker. 50 int port; ///< Port of the broker.
@@ -52,37 +62,46 @@ public: @@ -52,37 +62,46 @@ public:
52 62
53 std::function<void(Error, std::string &)> error_callback = [](Error, std::string &) {}; ///< Error callback. 63 std::function<void(Error, std::string &)> error_callback = [](Error, std::string &) {}; ///< Error callback.
54 64
55 - Client::QueueType publish_queue_type = Client::QueueType::DROP; ///< The type of queue to use for the publish queue.  
56 - int publish_queue_size = -1; ///< Size of the publish queue. 65 + Client::PublishQueueType publish_queue_type = Client::PublishQueueType::DROP; ///< The type of queue to use for the publish queue.
  66 + size_t publish_queue_size = -1; ///< Size of the publish queue.
  67 + std::deque<std::tuple<std::string, std::string, bool>> publish_queue; ///< Queue of publish messages to send to the broker.
  68 +
  69 + std::map<std::string, std::function<void(std::string, std::string)>> subscriptions; ///< Map of active subscriptions.
57 }; 70 };
58 71
59 Client::Client(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) 72 Client::Client(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval)
60 { 73 {
61 this->m_impl = std::make_unique<Client::Impl>(host, port, client_id, connection_timeout, connection_backoff_max, keep_alive_interval); 74 this->m_impl = std::make_unique<Client::Impl>(host, port, client_id, connection_timeout, connection_backoff_max, keep_alive_interval);
62 75
63 - LOG_TRACE("Constructor of client called"); 76 + LOG_TRACE(this->m_impl, "Constructor of client called");
64 } 77 }
65 78
66 Client::~Client() 79 Client::~Client()
67 { 80 {
68 - LOG_TRACE("Destructor of client called"); 81 + LOG_TRACE(this->m_impl, "Destructor of client called");
69 82
70 this->disconnect(); 83 this->disconnect();
71 } 84 }
72 85
73 void Client::setLogger(Client::LogLevel log_level, std::function<void(Client::LogLevel, std::string)> logger) 86 void Client::setLogger(Client::LogLevel log_level, std::function<void(Client::LogLevel, std::string)> logger)
74 { 87 {
75 - LOG_TRACE("Setting logger to log level " + std::to_string(log_level)); 88 + LOG_TRACE(this->m_impl, "Setting logger to log level " + std::to_string(log_level));
76 89
77 this->m_impl->log_level = log_level; 90 this->m_impl->log_level = log_level;
78 this->m_impl->logger = logger; 91 this->m_impl->logger = logger;
79 92
80 - LOG_DEBUG("Log level now on " + std::to_string(this->m_impl->log_level)); 93 + LOG_DEBUG(this->m_impl, "Log level now on " + std::to_string(this->m_impl->log_level));
81 } 94 }
82 95
83 void Client::setLastWill(const std::string &topic, const std::string &payload, bool retain) 96 void Client::setLastWill(const std::string &topic, const std::string &payload, bool retain)
84 { 97 {
85 - LOG_TRACE("Setting last will to topic " + topic + " with payload " + payload + " and retain " + std::to_string(retain)); 98 + if (this->m_impl->state != Client::Impl::State::DISCONNECTED)
  99 + {
  100 + LOG_ERROR(this->m_impl, "Cannot set last will when not disconnected");
  101 + return;
  102 + }
  103 +
  104 + LOG_TRACE(this->m_impl, "Setting last will to topic " + topic + " with payload " + payload + " and retain " + std::to_string(retain));
86 105
87 this->m_impl->last_will_topic = topic; 106 this->m_impl->last_will_topic = topic;
88 this->m_impl->last_will_payload = payload; 107 this->m_impl->last_will_payload = payload;
@@ -91,14 +110,20 @@ void Client::setLastWill(const std::string &amp;topic, const std::string &amp;payload, b @@ -91,14 +110,20 @@ void Client::setLastWill(const std::string &amp;topic, const std::string &amp;payload, b
91 110
92 void Client::setErrorCallback(std::function<void(Error, std::string &)> callback) 111 void Client::setErrorCallback(std::function<void(Error, std::string &)> callback)
93 { 112 {
94 - LOG_TRACE("Setting error callback"); 113 + LOG_TRACE(this->m_impl, "Setting error callback");
95 114
96 this->m_impl->error_callback = callback; 115 this->m_impl->error_callback = callback;
97 } 116 }
98 117
99 -void Client::setPublishQueue(Client::QueueType queue_type, int size) 118 +void Client::setPublishQueue(Client::PublishQueueType queue_type, size_t size)
100 { 119 {
101 - LOG_TRACE("Setting publish queue to type " + std::to_string(queue_type) + " and size " + std::to_string(size)); 120 + if (this->m_impl->state != Client::Impl::State::DISCONNECTED)
  121 + {
  122 + LOG_ERROR(this->m_impl, "Cannot set publish queue when not disconnected");
  123 + return;
  124 + }
  125 +
  126 + LOG_TRACE(this->m_impl, "Setting publish queue to type " + std::to_string(queue_type) + " and size " + std::to_string(size));
102 127
103 this->m_impl->publish_queue_type = queue_type; 128 this->m_impl->publish_queue_type = queue_type;
104 this->m_impl->publish_queue_size = size; 129 this->m_impl->publish_queue_size = size;
@@ -106,42 +131,167 @@ void Client::setPublishQueue(Client::QueueType queue_type, int size) @@ -106,42 +131,167 @@ void Client::setPublishQueue(Client::QueueType queue_type, int size)
106 131
107 void Client::connect() 132 void Client::connect()
108 { 133 {
  134 + std::scoped_lock lock(this->m_impl->state_mutex);
  135 +
109 if (this->m_impl->state != Client::Impl::State::DISCONNECTED) 136 if (this->m_impl->state != Client::Impl::State::DISCONNECTED)
110 { 137 {
111 return; 138 return;
112 } 139 }
113 140
114 - LOG_INFO("Connecting to " + this->m_impl->host + ":" + std::to_string(this->m_impl->port)); 141 + LOG_INFO(this->m_impl, "Connecting to " + this->m_impl->host + ":" + std::to_string(this->m_impl->port));
115 142
116 this->m_impl->state = Client::Impl::State::CONNECTING; 143 this->m_impl->state = Client::Impl::State::CONNECTING;
117 } 144 }
118 145
119 void Client::disconnect() 146 void Client::disconnect()
120 { 147 {
  148 + std::scoped_lock lock(this->m_impl->state_mutex);
  149 +
121 if (this->m_impl->state == Client::Impl::State::DISCONNECTED) 150 if (this->m_impl->state == Client::Impl::State::DISCONNECTED)
122 { 151 {
123 - LOG_TRACE("Already disconnected"); 152 + LOG_TRACE(this->m_impl, "Already disconnected");
124 return; 153 return;
125 } 154 }
126 155
127 - LOG_INFO("Disconnecting from broker"); 156 + LOG_INFO(this->m_impl, "Disconnecting from broker");
128 157
129 this->m_impl->state = Client::Impl::State::DISCONNECTED; 158 this->m_impl->state = Client::Impl::State::DISCONNECTED;
  159 + this->m_impl->subscriptions.clear();
130 } 160 }
131 161
132 void Client::publish(const std::string &topic, const std::string &payload, bool retain) 162 void Client::publish(const std::string &topic, const std::string &payload, bool retain)
133 { 163 {
134 - LOG_DEBUG("Publishing message on topic '" + topic + "': " + payload + " (" + (retain ? "retained" : "not retained") + ")"); 164 + std::scoped_lock lock(this->m_impl->state_mutex);
  165 +
  166 + LOG_DEBUG(this->m_impl, "Publishing message on topic '" + topic + "': " + payload + " (" + (retain ? "retained" : "not retained") + ")");
  167 +
  168 + switch (this->m_impl->state)
  169 + {
  170 + case Client::Impl::State::DISCONNECTED:
  171 + LOG_ERROR(this->m_impl, "Cannot publish when disconnected");
  172 + return;
  173 + case Client::Impl::State::CONNECTING:
  174 + this->m_impl->toPublishQueue(topic, payload, retain);
  175 + return;
  176 + case Client::Impl::State::CONNECTED:
  177 + this->m_impl->sendPublish(topic, payload, retain);
  178 + return;
  179 + }
135 } 180 }
136 181
137 void Client::subscribe(const std::string &topic, std::function<void(std::string, std::string)> callback) 182 void Client::subscribe(const std::string &topic, std::function<void(std::string, std::string)> callback)
138 { 183 {
139 - LOG_DEBUG("Subscribing to topic '" + topic + "'"); 184 + std::scoped_lock lock(this->m_impl->state_mutex);
  185 +
  186 + if (this->m_impl->state == Client::Impl::State::DISCONNECTED)
  187 + {
  188 + LOG_ERROR(this->m_impl, "Cannot subscribe when disconnected");
  189 + return;
  190 + }
  191 +
  192 + LOG_DEBUG(this->m_impl, "Subscribing to topic '" + topic + "'");
140 193
141 - (void)callback; 194 + this->m_impl->subscriptions[topic] = callback;
  195 +
  196 + if (this->m_impl->state == Client::Impl::State::CONNECTED)
  197 + {
  198 + this->m_impl->sendSubscribe(topic);
  199 + }
142 } 200 }
143 201
144 void Client::unsubscribe(const std::string &topic) 202 void Client::unsubscribe(const std::string &topic)
145 { 203 {
146 - LOG_DEBUG("Unsubscribing from topic '" + topic + "'"); 204 + std::scoped_lock lock(this->m_impl->state_mutex);
  205 +
  206 + if (this->m_impl->state == Client::Impl::State::DISCONNECTED)
  207 + {
  208 + LOG_ERROR(this->m_impl, "Cannot unsubscribe when disconnected");
  209 + return;
  210 + }
  211 +
  212 + LOG_DEBUG(this->m_impl, "Unsubscribing from topic '" + topic + "'");
  213 +
  214 + this->m_impl->subscriptions.erase(topic);
  215 +
  216 + if (this->m_impl->state == Client::Impl::State::CONNECTED)
  217 + {
  218 + this->m_impl->sendUnsubscribe(topic);
  219 + }
  220 +}
  221 +
  222 +void Client::Impl::sendPublish(const std::string &topic, const std::string &payload, bool retain)
  223 +{
  224 + LOG_TRACE(this, "Sending publish message on topic '" + topic + "': " + payload + " (" + (retain ? "retained" : "not retained") + ")");
  225 +}
  226 +
  227 +void Client::Impl::sendSubscribe(const std::string &topic)
  228 +{
  229 + LOG_TRACE(this, "Sending subscribe message for topic '" + topic + "'");
  230 +}
  231 +
  232 +void Client::Impl::sendUnsubscribe(const std::string &topic)
  233 +{
  234 + LOG_TRACE(this, "Sending unsubscribe message for topic '" + topic + "'");
  235 +}
  236 +
  237 +void Client::Impl::changeToConnected()
  238 +{
  239 + std::scoped_lock lock(this->state_mutex);
  240 +
  241 + LOG_INFO(this, "Connected to broker");
  242 +
  243 + this->state = Client::Impl::State::CONNECTED;
  244 +
  245 + // Restoring subscriptions and flushing the queue is done while still under
  246 + // the lock. This to prevent \ref disconnect from being called while we are
  247 + // still sending messages.
  248 + // The drawback is that we are blocking \ref publish and \ref subscribe too
  249 + // when they are called just when we create a connection. But in the grand
  250 + // scheme of things, this is not likely, and this makes for a far easier
  251 + // implementation.
  252 +
  253 + // First restore any subscription.
  254 + for (auto &subscription : this->subscriptions)
  255 + {
  256 + this->sendSubscribe(subscription.first);
  257 + }
  258 + // Flush the publish queue.
  259 + for (auto &message : this->publish_queue)
  260 + {
  261 + this->sendPublish(std::get<0>(message), std::get<1>(message), std::get<2>(message));
  262 + }
  263 + this->publish_queue.clear();
  264 +}
  265 +
  266 +void Client::Impl::toPublishQueue(const std::string &topic, const std::string &payload, bool retain)
  267 +{
  268 + if (this->state != Client::Impl::State::CONNECTING)
  269 + {
  270 + LOG_ERROR(this, "Cannot queue publish message when not connecting");
  271 + return;
  272 + }
  273 +
  274 + switch (this->publish_queue_type)
  275 + {
  276 + case Client::PublishQueueType::DROP:
  277 + LOG_WARNING(this, "Publish queue is disabled, dropping message");
  278 + return;
  279 + case Client::PublishQueueType::FIFO:
  280 + if (this->publish_queue.size() >= this->publish_queue_size)
  281 + {
  282 + LOG_WARNING(this, "Publish queue is full, dropping oldest message on queue");
  283 + this->publish_queue.pop_front();
  284 + }
  285 + break;
  286 + case Client::PublishQueueType::LIFO:
  287 + if (this->publish_queue.size() >= this->publish_queue_size)
  288 + {
  289 + LOG_WARNING(this, "Publish queue is full, dropping newest message on queue");
  290 + this->publish_queue.pop_back();
  291 + }
  292 + break;
  293 + }
  294 +
  295 + LOG_TRACE(this, "Adding message to publish queue");
  296 + this->publish_queue.push_back({topic, payload, retain});
147 } 297 }
src/Log.h
@@ -8,11 +8,10 @@ @@ -8,11 +8,10 @@
8 #pragma once 8 #pragma once
9 9
10 // Wrappers to make logging a tiny bit easier to read. 10 // Wrappers to make logging a tiny bit easier to read.
11 -// It heavily depends on Client.cpp's structure, and assumes  
12 -// this->m_impl is reachable. 11 +// The "obj" is the Client::Impl instance to point to.
13 12
14 #define LOGGER_LEVEL_ERROR 0 13 #define LOGGER_LEVEL_ERROR 0
15 -#define LOGGER_LEVEL_WARN 1 14 +#define LOGGER_LEVEL_WARNING 1
16 #define LOGGER_LEVEL_INFO 2 15 #define LOGGER_LEVEL_INFO 2
17 #define LOGGER_LEVEL_DEBUG 3 16 #define LOGGER_LEVEL_DEBUG 3
18 #define LOGGER_LEVEL_TRACE 4 17 #define LOGGER_LEVEL_TRACE 4
@@ -23,51 +22,51 @@ @@ -23,51 +22,51 @@
23 #endif 22 #endif
24 23
25 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_ERROR 24 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_ERROR
26 -#define LOG_ERROR(x) \  
27 - if (this->m_impl->log_level >= Client::LogLevel::ERROR) \  
28 - { \  
29 - this->m_impl->logger(Client::LogLevel::ERROR, x); \ 25 +#define LOG_ERROR(obj, x) \
  26 + if (obj->log_level >= Client::LogLevel::ERROR) \
  27 + { \
  28 + obj->logger(Client::LogLevel::ERROR, x); \
30 } 29 }
31 #else 30 #else
32 -#define LOG_ERROR(x) 31 +#define LOG_ERROR(obj, x)
33 #endif 32 #endif
34 33
35 -#if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_WARN  
36 -#define LOG_WARN(x) \  
37 - if (this->m_impl->log_level >= Client::LogLevel::WARN) \  
38 - { \  
39 - this->m_impl->logger(Client::LogLevel::WARN, x); \ 34 +#if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_WARNING
  35 +#define LOG_WARNING(obj, x) \
  36 + if (obj->log_level >= Client::LogLevel::WARNING) \
  37 + { \
  38 + obj->logger(Client::LogLevel::WARNING, x); \
40 } 39 }
41 #else 40 #else
42 -#define LOG_WARN(x) 41 +#define LOG_WARNING(obj, x)
43 #endif 42 #endif
44 43
45 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_INFO 44 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_INFO
46 -#define LOG_INFO(x) \  
47 - if (this->m_impl->log_level >= Client::LogLevel::INFO) \  
48 - { \  
49 - this->m_impl->logger(Client::LogLevel::INFO, x); \ 45 +#define LOG_INFO(obj, x) \
  46 + if (obj->log_level >= Client::LogLevel::INFO) \
  47 + { \
  48 + obj->logger(Client::LogLevel::INFO, x); \
50 } 49 }
51 #else 50 #else
52 -#define LOG_INFO(x) 51 +#define LOG_INFO(obj, x)
53 #endif 52 #endif
54 53
55 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_DEBUG 54 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_DEBUG
56 -#define LOG_DEBUG(x) \  
57 - if (this->m_impl->log_level >= Client::LogLevel::DEBUG) \  
58 - { \  
59 - this->m_impl->logger(Client::LogLevel::DEBUG, x); \ 55 +#define LOG_DEBUG(obj, x) \
  56 + if (obj->log_level >= Client::LogLevel::DEBUG) \
  57 + { \
  58 + obj->logger(Client::LogLevel::DEBUG, x); \
60 } 59 }
61 #else 60 #else
62 -#define LOG_DEBUG(x) 61 +#define LOG_DEBUG(obj, x)
63 #endif 62 #endif
64 63
65 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_TRACE 64 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_TRACE
66 -#define LOG_TRACE(x) \  
67 - if (this->m_impl->log_level >= Client::LogLevel::TRACE) \  
68 - { \  
69 - this->m_impl->logger(Client::LogLevel::TRACE, x); \ 65 +#define LOG_TRACE(obj, x) \
  66 + if (obj->log_level >= Client::LogLevel::TRACE) \
  67 + { \
  68 + obj->logger(Client::LogLevel::TRACE, x); \
70 } 69 }
71 #else 70 #else
72 -#define LOG_TRACE(x) 71 +#define LOG_TRACE(obj, x)
73 #endif 72 #endif