Commit 87fe34d7e16ec2c27cb86c4da3fa821c4907d635

Authored by Patric Stout
1 parent 584030be

feat: implement last-will support

example/pubsub/main.cpp
... ... @@ -19,6 +19,7 @@ int main()
19 19 client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 10);
20 20 client.setErrorCallback([](TrueMQTT::Client::Error error, std::string message)
21 21 { std::cout << "Error " << error << ": " << message << std::endl; });
  22 + client.setLastWill("test/lastwill", "example pubsub finished", true);
22 23  
23 24 client.connect();
24 25 std::this_thread::sleep_for(std::chrono::milliseconds(100));
... ...
include/TrueMQTT.h
... ... @@ -146,12 +146,12 @@ namespace TrueMQTT
146 146 * @brief Set the last will message on the connection.
147 147 *
148 148 * @param topic The topic to publish the last will message to.
149   - * @param payload The payload of the last will message.
  149 + * @param message The message of the last will message.
150 150 * @param retain Whether to retain the last will message.
151 151 *
152 152 * @note Cannot be called after \ref connect.
153 153 */
154   - void setLastWill(const std::string &topic, const std::string &payload, bool retain) const;
  154 + void setLastWill(const std::string &topic, const std::string &message, bool retain) const;
155 155  
156 156 /**
157 157 * @brief Set the error callback, called when any error occurs.
... ... @@ -198,14 +198,14 @@ namespace TrueMQTT
198 198 void disconnect() const;
199 199  
200 200 /**
201   - * @brief Publish a payload on a topic.
  201 + * @brief Publish a message on a topic.
202 202 *
203 203 * After \ref connect is called, this function will either publish the message
204 204 * immediately (if connected) or queue it for later (if still connecting).
205 205 * In the latter case, it will be published as soon as the connection is established.
206 206 *
207   - * @param topic The topic to publish the payload on.
208   - * @param payload The payload to publish.
  207 + * @param topic The topic to publish the message on.
  208 + * @param message The message to publish.
209 209 * @param retain Whether to retain the message on the broker.
210 210 *
211 211 * @note All messages are always published under QoS 0, and this library supports no
... ... @@ -218,7 +218,7 @@ namespace TrueMQTT
218 218 * moment the connection to the broker is established, and there are messages in the
219 219 * publish queue and/or subscriptions.
220 220 */
221   - void publish(const std::string &topic, const std::string &payload, bool retain) const;
  221 + void publish(const std::string &topic, const std::string &message, bool retain) const;
222 222  
223 223 /**
224 224 * @brief Subscribe to a topic, and call the callback function when a message arrives.
... ...
src/Client.cpp
... ... @@ -64,7 +64,7 @@ void TrueMQTT::Client::setLogger(Client::LogLevel log_level, const std::function
64 64 LOG_DEBUG(m_impl, "Log level now on " + std::to_string(m_impl->m_log_level));
65 65 }
66 66  
67   -void TrueMQTT::Client::setLastWill(const std::string &topic, const std::string &payload, bool retain) const
  67 +void TrueMQTT::Client::setLastWill(const std::string &topic, const std::string &message, bool retain) const
68 68 {
69 69 if (m_impl->m_state != Client::Impl::State::DISCONNECTED)
70 70 {
... ... @@ -72,10 +72,10 @@ void TrueMQTT::Client::setLastWill(const std::string &amp;topic, const std::string &amp;
72 72 return;
73 73 }
74 74  
75   - LOG_TRACE(m_impl, "Setting last will to topic " + topic + " with payload " + payload + " and retain " + std::to_string(retain));
  75 + LOG_TRACE(m_impl, "Setting last will to topic " + topic + " with message " + message + " and retain " + std::to_string(retain));
76 76  
77 77 m_impl->m_last_will_topic = topic;
78   - m_impl->m_last_will_payload = payload;
  78 + m_impl->m_last_will_message = message;
79 79 m_impl->m_last_will_retain = retain;
80 80 }
81 81  
... ... @@ -131,11 +131,11 @@ void TrueMQTT::Client::disconnect() const
131 131 m_impl->disconnect();
132 132 }
133 133  
134   -void TrueMQTT::Client::publish(const std::string &topic, const std::string &payload, bool retain) const
  134 +void TrueMQTT::Client::publish(const std::string &topic, const std::string &message, bool retain) const
135 135 {
136 136 std::scoped_lock lock(m_impl->m_state_mutex);
137 137  
138   - LOG_DEBUG(m_impl, "Publishing message on topic '" + topic + "': " + payload + " (" + (retain ? "retained" : "not retained") + ")");
  138 + LOG_DEBUG(m_impl, "Publishing message on topic '" + topic + "': " + message + " (" + (retain ? "retained" : "not retained") + ")");
139 139  
140 140 switch (m_impl->m_state)
141 141 {
... ... @@ -143,10 +143,10 @@ void TrueMQTT::Client::publish(const std::string &amp;topic, const std::string &amp;payl
143 143 LOG_ERROR(m_impl, "Cannot publish when disconnected");
144 144 return;
145 145 case Client::Impl::State::CONNECTING:
146   - m_impl->toPublishQueue(topic, payload, retain);
  146 + m_impl->toPublishQueue(topic, message, retain);
147 147 return;
148 148 case Client::Impl::State::CONNECTED:
149   - m_impl->sendPublish(topic, payload, retain);
  149 + m_impl->sendPublish(topic, message, retain);
150 150 return;
151 151 }
152 152 }
... ... @@ -266,9 +266,9 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected)
266 266 sendSubscribe(subscription);
267 267 }
268 268 // Flush the publish queue.
269   - for (const auto &[topic, payload, retain] : m_publish_queue)
  269 + for (const auto &[topic, message, retain] : m_publish_queue)
270 270 {
271   - sendPublish(topic, payload, retain);
  271 + sendPublish(topic, message, retain);
272 272 }
273 273 m_publish_queue.clear();
274 274 }
... ... @@ -279,7 +279,7 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected)
279 279 }
280 280 }
281 281  
282   -void TrueMQTT::Client::Impl::toPublishQueue(const std::string &topic, const std::string &payload, bool retain)
  282 +void TrueMQTT::Client::Impl::toPublishQueue(const std::string &topic, const std::string &message, bool retain)
283 283 {
284 284 if (m_state != Client::Impl::State::CONNECTING)
285 285 {
... ... @@ -309,7 +309,7 @@ void TrueMQTT::Client::Impl::toPublishQueue(const std::string &amp;topic, const std:
309 309 }
310 310  
311 311 LOG_TRACE(this, "Adding message to publish queue");
312   - m_publish_queue.emplace_back(topic, payload, retain);
  312 + m_publish_queue.emplace_back(topic, message, retain);
313 313 }
314 314  
315 315 void TrueMQTT::Client::Impl::findSubscriptionMatch(std::vector<std::function<void(std::string, std::string)>> &matching_callbacks, const std::map<std::string, Client::Impl::SubscriptionPart> &subscriptions, std::deque<std::string> &parts)
... ... @@ -357,9 +357,9 @@ void TrueMQTT::Client::Impl::findSubscriptionMatch(std::vector&lt;std::function&lt;voi
357 357 }
358 358 }
359 359  
360   -void TrueMQTT::Client::Impl::messageReceived(std::string topic, std::string payload)
  360 +void TrueMQTT::Client::Impl::messageReceived(std::string topic, std::string message)
361 361 {
362   - LOG_TRACE(this, "Message received on topic '" + topic + "': " + payload);
  362 + LOG_TRACE(this, "Message received on topic '" + topic + "': " + message);
363 363  
364 364 // Split the topic on the / in parts.
365 365 std::string part;
... ... @@ -378,14 +378,14 @@ void TrueMQTT::Client::Impl::messageReceived(std::string topic, std::string payl
378 378  
379 379 if (matching_callbacks.size() == 1)
380 380 {
381   - // For a single callback there is no need to copy the topic/payload.
382   - matching_callbacks[0](std::move(topic), std::move(payload));
  381 + // For a single callback there is no need to copy the topic/message.
  382 + matching_callbacks[0](std::move(topic), std::move(message));
383 383 }
384 384 else
385 385 {
386 386 for (const auto &callback : matching_callbacks)
387 387 {
388   - callback(topic, payload);
  388 + callback(topic, message);
389 389 }
390 390 }
391 391 }
... ...
src/ClientImpl.h
... ... @@ -47,12 +47,12 @@ public:
47 47  
48 48 void connect(); ///< Connect to the broker.
49 49 void disconnect(); ///< Disconnect from the broker.
50   - void sendPublish(const std::string &topic, const std::string &payload, bool retain); ///< Send a publish message to the broker.
  50 + void sendPublish(const std::string &topic, const std::string &message, bool retain); ///< Send a publish message to the broker.
51 51 void sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker.
52 52 void sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker.
53 53 void connectionStateChange(bool connected); ///< Called when a connection goes from CONNECTING state to CONNECTED state or visa versa.
54   - void toPublishQueue(const std::string &topic, const std::string &payload, bool retain); ///< Add a publish message to the publish queue.
55   - void messageReceived(std::string topic, std::string payload); ///< Called when a message is received from the broker.
  54 + void toPublishQueue(const std::string &topic, const std::string &message, bool retain); ///< Add a publish message to the publish queue.
  55 + void messageReceived(std::string topic, std::string message); ///< Called when a message is received from the broker.
56 56  
57 57 void findSubscriptionMatch(std::vector<std::function<void(std::string, std::string)>> &callbacks, const std::map<std::string, SubscriptionPart> &subscriptions, std::deque<std::string> &parts); ///< Recursive function to find any matching subscription based on parts.
58 58  
... ... @@ -71,7 +71,7 @@ public:
71 71 std::function<void(Client::LogLevel, std::string)> m_logger = [](Client::LogLevel, std::string) { /* empty */ }; ///< Logger callback.
72 72  
73 73 std::string m_last_will_topic = ""; ///< Topic to publish the last will message to.
74   - std::string m_last_will_payload = ""; ///< Payload of the last will message.
  74 + std::string m_last_will_message = ""; ///< Message to publish on the last will topic.
75 75 bool m_last_will_retain = false; ///< Whether to retain the last will message.
76 76  
77 77 std::function<void(Error, std::string)> m_error_callback = [](Error, std::string) { /* empty */ }; ///< Error callback.
... ...
src/Packet.cpp
... ... @@ -265,12 +265,12 @@ bool TrueMQTT::Client::Impl::Connection::recvLoop()
265 265 return false;
266 266 }
267 267  
268   - std::string payload;
269   - packet.read_remaining(payload);
  268 + std::string message;
  269 + packet.read_remaining(message);
270 270  
271   - LOG_DEBUG(&m_impl, "Received PUBLISH with topic " + topic + ": " + payload);
  271 + LOG_DEBUG(&m_impl, "Received PUBLISH with topic " + topic + ": " + message);
272 272  
273   - m_impl.messageReceived(std::move(topic), std::move(payload));
  273 + m_impl.messageReceived(std::move(topic), std::move(message));
274 274 break;
275 275 }
276 276 case Packet::PacketType::SUBACK:
... ... @@ -366,7 +366,16 @@ void TrueMQTT::Client::Impl::Connection::sendConnect()
366 366  
367 367 uint8_t flags = 0;
368 368 flags |= 1 << 1; // Clean session
369   - // TODO -- Support for last-will
  369 + if (!m_impl.m_last_will_topic.empty())
  370 + {
  371 + flags |= 1 << 2; // Last will
  372 + flags |= 0 << 3; // Last will QoS
  373 +
  374 + if (m_impl.m_last_will_retain)
  375 + {
  376 + flags |= 1 << 5; // Last will retain
  377 + }
  378 + }
370 379  
371 380 Packet packet(Packet::PacketType::CONNECT, 0);
372 381  
... ... @@ -377,14 +386,18 @@ void TrueMQTT::Client::Impl::Connection::sendConnect()
377 386 packet.write_uint16(30); // Keep-alive
378 387  
379 388 packet.write_string(client_id); // Client ID
380   - // TODO -- Last will topic & message
  389 + if (!m_impl.m_last_will_topic.empty())
  390 + {
  391 + packet.write_string(m_impl.m_last_will_topic);
  392 + packet.write_string(m_impl.m_last_will_message);
  393 + }
381 394  
382 395 send(packet);
383 396 }
384 397  
385   -void TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &payload, bool retain)
  398 +void TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain)
386 399 {
387   - LOG_TRACE(this, "Sending PUBLISH packet to topic '" + topic + "': " + payload + " (" + (retain ? "retained" : "not retained") + ")");
  400 + LOG_TRACE(this, "Sending PUBLISH packet to topic '" + topic + "': " + message + " (" + (retain ? "retained" : "not retained") + ")");
388 401  
389 402 uint8_t flags = 0;
390 403 flags |= (retain ? 1 : 0) << 0; // Retain
... ... @@ -394,7 +407,7 @@ void TrueMQTT::Client::Impl::sendPublish(const std::string &amp;topic, const std::st
394 407 Packet packet(Packet::PacketType::PUBLISH, flags);
395 408  
396 409 packet.write_string(topic);
397   - packet.write(payload.c_str(), payload.size());
  410 + packet.write(message.c_str(), message.size());
398 411  
399 412 m_connection->send(packet);
400 413 }
... ...