From 49cd81e23234674c41529dbc83bc11c3fc661801 Mon Sep 17 00:00:00 2001 From: Patric Stout Date: Sat, 10 Sep 2022 09:32:56 +0200 Subject: [PATCH] feat(client): finish everything up till the actual socket communication --- CMakeLists.txt | 2 +- example/pubsub/main.cpp | 20 +++++++++++++++----- include/TrueMQTT.h | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------- src/Client.cpp | 190 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------- src/Log.h | 57 ++++++++++++++++++++++++++++----------------------------- 5 files changed, 288 insertions(+), 62 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4029bef..6e7b2fe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,7 +12,7 @@ project(truemqtt VERSION 1.0.0 DESCRIPTION "A modern C++ MQTT Client library") set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED True) -set(MIN_LOGGER_LEVEL "INFO" CACHE STRING "Set minimal logger level (TRACE, DEBUG, INFO, WARN, ERROR). No logs below this level will be omitted.") +set(MIN_LOGGER_LEVEL "INFO" CACHE STRING "Set minimal logger level (TRACE, DEBUG, INFO, WARNING, ERROR). No logs below this level will be omitted.") include(GNUInstallDirs) diff --git a/example/pubsub/main.cpp b/example/pubsub/main.cpp index 42fdd96..c387a88 100644 --- a/example/pubsub/main.cpp +++ b/example/pubsub/main.cpp @@ -7,26 +7,36 @@ #include #include +#include int main() { // Create a connection to the local broker. TrueMQTT::Client client("localhost", 1883, "test"); - client.setLogger(TrueMQTT::Client::LogLevel::TRACE, [](TrueMQTT::Client::LogLevel level, std::string message) { - std::cout << "Log " << level << ": " << message << std::endl; - }); + client.setLogger(TrueMQTT::Client::LogLevel::TRACE, [](TrueMQTT::Client::LogLevel level, std::string message) + { std::cout << "Log " << level << ": " << message << std::endl; }); + client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 10); client.connect(); + bool stop = false; + // Subscribe to the topic we will be publishing under in a bit. - client.subscribe("test", [](const std::string &topic, const std::string &payload) { + client.subscribe("test", [&stop](const std::string &topic, const std::string &payload) + { std::cout << "Received message on topic " << topic << ": " << payload << std::endl; - }); + stop = true; }); // Publish a message on the same topic as we subscribed too. client.publish("test", "Hello World!", false); + // Wait till we receive the message back on our subscription. + while (!stop) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + client.disconnect(); return 0; diff --git a/include/TrueMQTT.h b/include/TrueMQTT.h index d2ed2df..4adb444 100644 --- a/include/TrueMQTT.h +++ b/include/TrueMQTT.h @@ -32,13 +32,45 @@ namespace TrueMQTT /** * @brief The type of queue that can be set for publishing messages. + * + * When there is no connection to a broker, some choices have to be made what + * to do with messages that are published. + * + * - Do we queue the message? + * - What do we do when the queue is full? + * + * After all, memory is finite, so this allows you to configure what scenario + * works best for you. */ - enum QueueType + enum PublishQueueType { - DROP, ///< Do not queue. - FIFO, ///< Global FIFO. - LIFO, ///< Global LIFO. - LIFO_PER_TOPIC, ///< Per topic LIFO. + DROP, ///< Do not queue. + + /** + * @brief First-in-First-out queue. + * + * For a size 3 queue this means if we publish 6 messages (M1 .. M6), the result is: + * + * [ M4, M5, M6 ] + * + * When publishing the next message (M7) it becomes: + * + * [ M5, M6, M7 ] + */ + FIFO, + + /** + * @brief Last-in-First-out queue. + * + * For a size 3 queue this means if we publish 6 messages (M1 .. M6), the result is: + * + * [ M1, M2, M6 ] + * + * When publishing the next message (M7) it becomes: + * + * [ M1, M2, M7 ] + */ + LIFO, }; /** @@ -90,11 +122,14 @@ namespace TrueMQTT * @param topic The topic to publish the last will message to. * @param payload The payload of the last will message. * @param retain Whether to retain the last will message. + * + * @note Cannot be called after \ref connect. */ void setLastWill(const std::string &topic, const std::string &payload, bool retain); /** * @brief Set the error callback, called when any error occurs. + * * @param callback The callback to call when an error occurs. */ void setErrorCallback(std::function callback); @@ -102,10 +137,12 @@ namespace TrueMQTT /** * @brief Set the publish queue to use. * - * @param queue_type The \ref QueueType to use for the publish queue. + * @param queue_type The \ref PublishQueueType to use for the publish queue. * @param size The size of the queue. If the queue is full, the type of queue defines what happens. + * + * @note Cannot be called after \ref connect. */ - void setPublishQueue(QueueType queue_type, int size); + void setPublishQueue(PublishQueueType queue_type, size_t size); /** * @brief Connect to the broker. @@ -128,12 +165,19 @@ namespace TrueMQTT * Additionally, it will clean any publish / subscribe information it has. * * @note Calling disconnect twice has no effect. + * @note This function can stall for a short moment if you disconnect just at the + * moment the connection to the broker is established, and there are messages in the + * publish queue and/or subscriptions. */ void disconnect(); /** * @brief Publish a payload on a topic. * + * After \ref connect is called, this function will either publish the message + * immediately (if connected) or queue it for later (if still connecting). + * In the latter case, it will be published as soon as the connection is established. + * * @param topic The topic to publish the payload on. * @param payload The payload to publish. * @param retain Whether to retain the message on the broker. @@ -142,12 +186,23 @@ namespace TrueMQTT * other QoS level. * @note This call is non-blocking, and it is not possible to know whether the message * was actually published or not. + * @note You cannot publish a message if you are disconnected from the broker. Call + * \ref connect first. + * @note This function can stall for a short moment if you publish just at the + * moment the connection to the broker is established, and there are messages in the + * publish queue and/or subscriptions. */ void publish(const std::string &topic, const std::string &payload, bool retain); /** * @brief Subscribe to a topic, and call the callback function when a message arrives. * + * After \ref connect is called, this function will either subscribe to the topic + * immediately (if connected) or subscribe to it once a connection has been made. + * In case of a reconnect, it will also automatically resubscribe. + * + * If the broker refuses the subscribe request, the error-callback is called. + * * @param topic The topic to subscribe to. * @param callback The callback to call when a message arrives on this topic. * @@ -155,15 +210,27 @@ namespace TrueMQTT * If you do, the callback of the first subscription will be overwritten. * In other words, "a/+" and "a/b" is fine, and callbacks for both subscribes will be * called when something is published on "a/b". + * @note You cannot subscribe a topic if you are disconnected from the broker. Call + * \ref connect first. + * @note This function can stall for a short moment if you publish just at the + * moment the connection to the broker is established, and there are messages in the + * publish queue and/or subscriptions. */ void subscribe(const std::string &topic, std::function callback); /** * @brief Unsubscribe from a topic. * + * If the broker refuses the unsubscribe request, the error-callback is called. + * * @param topic The topic to unsubscribe from. * * @note If you unsubscribe from a topic you were not subscribed too, nothing happens. + * @note You cannot unsubscribe from a topic if you are disconnected from the broker. + * Call \ref connect (and \ref subscribe) first. + * @note This function can stall for a short moment if you publish just at the + * moment the connection to the broker is established, and there are messages in the + * publish queue and/or subscriptions. */ void unsubscribe(const std::string &topic); diff --git a/src/Client.cpp b/src/Client.cpp index 64de5af..c70398d 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -8,6 +8,9 @@ #include "TrueMQTT.h" #include "Log.h" +#include +#include +#include #include using TrueMQTT::Client; @@ -29,12 +32,19 @@ public: enum State { - DISCONNECTED, - CONNECTING, - CONNECTED, + DISCONNECTED, ///< The client is not connected to the broker, nor is it trying to connect. + CONNECTING, ///< The client is either connecting or reconnecting to the broker. This can be in any stage of the connection process. + CONNECTED, ///< The client is connected to the broker. }; + void sendPublish(const std::string &topic, const std::string &payload, bool retain); ///< Send a publish message to the broker. + void sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker. + void sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker. + void changeToConnected(); ///< Called when a connection goes from CONNECTING state to CONNECTED state. + void toPublishQueue(const std::string &topic, const std::string &payload, bool retain); ///< Add a publish message to the publish queue. + State state = State::DISCONNECTED; ///< The current state of the client. + std::mutex state_mutex; ///< Mutex to protect state changes. std::string host; ///< Host of the broker. int port; ///< Port of the broker. @@ -52,37 +62,46 @@ public: std::function error_callback = [](Error, std::string &) {}; ///< Error callback. - Client::QueueType publish_queue_type = Client::QueueType::DROP; ///< The type of queue to use for the publish queue. - int publish_queue_size = -1; ///< Size of the publish queue. + Client::PublishQueueType publish_queue_type = Client::PublishQueueType::DROP; ///< The type of queue to use for the publish queue. + size_t publish_queue_size = -1; ///< Size of the publish queue. + std::deque> publish_queue; ///< Queue of publish messages to send to the broker. + + std::map> subscriptions; ///< Map of active subscriptions. }; Client::Client(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) { this->m_impl = std::make_unique(host, port, client_id, connection_timeout, connection_backoff_max, keep_alive_interval); - LOG_TRACE("Constructor of client called"); + LOG_TRACE(this->m_impl, "Constructor of client called"); } Client::~Client() { - LOG_TRACE("Destructor of client called"); + LOG_TRACE(this->m_impl, "Destructor of client called"); this->disconnect(); } void Client::setLogger(Client::LogLevel log_level, std::function logger) { - LOG_TRACE("Setting logger to log level " + std::to_string(log_level)); + LOG_TRACE(this->m_impl, "Setting logger to log level " + std::to_string(log_level)); this->m_impl->log_level = log_level; this->m_impl->logger = logger; - LOG_DEBUG("Log level now on " + std::to_string(this->m_impl->log_level)); + LOG_DEBUG(this->m_impl, "Log level now on " + std::to_string(this->m_impl->log_level)); } void Client::setLastWill(const std::string &topic, const std::string &payload, bool retain) { - LOG_TRACE("Setting last will to topic " + topic + " with payload " + payload + " and retain " + std::to_string(retain)); + if (this->m_impl->state != Client::Impl::State::DISCONNECTED) + { + LOG_ERROR(this->m_impl, "Cannot set last will when not disconnected"); + return; + } + + LOG_TRACE(this->m_impl, "Setting last will to topic " + topic + " with payload " + payload + " and retain " + std::to_string(retain)); this->m_impl->last_will_topic = topic; this->m_impl->last_will_payload = payload; @@ -91,14 +110,20 @@ void Client::setLastWill(const std::string &topic, const std::string &payload, b void Client::setErrorCallback(std::function callback) { - LOG_TRACE("Setting error callback"); + LOG_TRACE(this->m_impl, "Setting error callback"); this->m_impl->error_callback = callback; } -void Client::setPublishQueue(Client::QueueType queue_type, int size) +void Client::setPublishQueue(Client::PublishQueueType queue_type, size_t size) { - LOG_TRACE("Setting publish queue to type " + std::to_string(queue_type) + " and size " + std::to_string(size)); + if (this->m_impl->state != Client::Impl::State::DISCONNECTED) + { + LOG_ERROR(this->m_impl, "Cannot set publish queue when not disconnected"); + return; + } + + LOG_TRACE(this->m_impl, "Setting publish queue to type " + std::to_string(queue_type) + " and size " + std::to_string(size)); this->m_impl->publish_queue_type = queue_type; this->m_impl->publish_queue_size = size; @@ -106,42 +131,167 @@ void Client::setPublishQueue(Client::QueueType queue_type, int size) void Client::connect() { + std::scoped_lock lock(this->m_impl->state_mutex); + if (this->m_impl->state != Client::Impl::State::DISCONNECTED) { return; } - LOG_INFO("Connecting to " + this->m_impl->host + ":" + std::to_string(this->m_impl->port)); + LOG_INFO(this->m_impl, "Connecting to " + this->m_impl->host + ":" + std::to_string(this->m_impl->port)); this->m_impl->state = Client::Impl::State::CONNECTING; } void Client::disconnect() { + std::scoped_lock lock(this->m_impl->state_mutex); + if (this->m_impl->state == Client::Impl::State::DISCONNECTED) { - LOG_TRACE("Already disconnected"); + LOG_TRACE(this->m_impl, "Already disconnected"); return; } - LOG_INFO("Disconnecting from broker"); + LOG_INFO(this->m_impl, "Disconnecting from broker"); this->m_impl->state = Client::Impl::State::DISCONNECTED; + this->m_impl->subscriptions.clear(); } void Client::publish(const std::string &topic, const std::string &payload, bool retain) { - LOG_DEBUG("Publishing message on topic '" + topic + "': " + payload + " (" + (retain ? "retained" : "not retained") + ")"); + std::scoped_lock lock(this->m_impl->state_mutex); + + LOG_DEBUG(this->m_impl, "Publishing message on topic '" + topic + "': " + payload + " (" + (retain ? "retained" : "not retained") + ")"); + + switch (this->m_impl->state) + { + case Client::Impl::State::DISCONNECTED: + LOG_ERROR(this->m_impl, "Cannot publish when disconnected"); + return; + case Client::Impl::State::CONNECTING: + this->m_impl->toPublishQueue(topic, payload, retain); + return; + case Client::Impl::State::CONNECTED: + this->m_impl->sendPublish(topic, payload, retain); + return; + } } void Client::subscribe(const std::string &topic, std::function callback) { - LOG_DEBUG("Subscribing to topic '" + topic + "'"); + std::scoped_lock lock(this->m_impl->state_mutex); + + if (this->m_impl->state == Client::Impl::State::DISCONNECTED) + { + LOG_ERROR(this->m_impl, "Cannot subscribe when disconnected"); + return; + } + + LOG_DEBUG(this->m_impl, "Subscribing to topic '" + topic + "'"); - (void)callback; + this->m_impl->subscriptions[topic] = callback; + + if (this->m_impl->state == Client::Impl::State::CONNECTED) + { + this->m_impl->sendSubscribe(topic); + } } void Client::unsubscribe(const std::string &topic) { - LOG_DEBUG("Unsubscribing from topic '" + topic + "'"); + std::scoped_lock lock(this->m_impl->state_mutex); + + if (this->m_impl->state == Client::Impl::State::DISCONNECTED) + { + LOG_ERROR(this->m_impl, "Cannot unsubscribe when disconnected"); + return; + } + + LOG_DEBUG(this->m_impl, "Unsubscribing from topic '" + topic + "'"); + + this->m_impl->subscriptions.erase(topic); + + if (this->m_impl->state == Client::Impl::State::CONNECTED) + { + this->m_impl->sendUnsubscribe(topic); + } +} + +void Client::Impl::sendPublish(const std::string &topic, const std::string &payload, bool retain) +{ + LOG_TRACE(this, "Sending publish message on topic '" + topic + "': " + payload + " (" + (retain ? "retained" : "not retained") + ")"); +} + +void Client::Impl::sendSubscribe(const std::string &topic) +{ + LOG_TRACE(this, "Sending subscribe message for topic '" + topic + "'"); +} + +void Client::Impl::sendUnsubscribe(const std::string &topic) +{ + LOG_TRACE(this, "Sending unsubscribe message for topic '" + topic + "'"); +} + +void Client::Impl::changeToConnected() +{ + std::scoped_lock lock(this->state_mutex); + + LOG_INFO(this, "Connected to broker"); + + this->state = Client::Impl::State::CONNECTED; + + // Restoring subscriptions and flushing the queue is done while still under + // the lock. This to prevent \ref disconnect from being called while we are + // still sending messages. + // The drawback is that we are blocking \ref publish and \ref subscribe too + // when they are called just when we create a connection. But in the grand + // scheme of things, this is not likely, and this makes for a far easier + // implementation. + + // First restore any subscription. + for (auto &subscription : this->subscriptions) + { + this->sendSubscribe(subscription.first); + } + // Flush the publish queue. + for (auto &message : this->publish_queue) + { + this->sendPublish(std::get<0>(message), std::get<1>(message), std::get<2>(message)); + } + this->publish_queue.clear(); +} + +void Client::Impl::toPublishQueue(const std::string &topic, const std::string &payload, bool retain) +{ + if (this->state != Client::Impl::State::CONNECTING) + { + LOG_ERROR(this, "Cannot queue publish message when not connecting"); + return; + } + + switch (this->publish_queue_type) + { + case Client::PublishQueueType::DROP: + LOG_WARNING(this, "Publish queue is disabled, dropping message"); + return; + case Client::PublishQueueType::FIFO: + if (this->publish_queue.size() >= this->publish_queue_size) + { + LOG_WARNING(this, "Publish queue is full, dropping oldest message on queue"); + this->publish_queue.pop_front(); + } + break; + case Client::PublishQueueType::LIFO: + if (this->publish_queue.size() >= this->publish_queue_size) + { + LOG_WARNING(this, "Publish queue is full, dropping newest message on queue"); + this->publish_queue.pop_back(); + } + break; + } + + LOG_TRACE(this, "Adding message to publish queue"); + this->publish_queue.push_back({topic, payload, retain}); } diff --git a/src/Log.h b/src/Log.h index 1e875dc..aa713c7 100644 --- a/src/Log.h +++ b/src/Log.h @@ -8,11 +8,10 @@ #pragma once // Wrappers to make logging a tiny bit easier to read. -// It heavily depends on Client.cpp's structure, and assumes -// this->m_impl is reachable. +// The "obj" is the Client::Impl instance to point to. #define LOGGER_LEVEL_ERROR 0 -#define LOGGER_LEVEL_WARN 1 +#define LOGGER_LEVEL_WARNING 1 #define LOGGER_LEVEL_INFO 2 #define LOGGER_LEVEL_DEBUG 3 #define LOGGER_LEVEL_TRACE 4 @@ -23,51 +22,51 @@ #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_ERROR -#define LOG_ERROR(x) \ - if (this->m_impl->log_level >= Client::LogLevel::ERROR) \ - { \ - this->m_impl->logger(Client::LogLevel::ERROR, x); \ +#define LOG_ERROR(obj, x) \ + if (obj->log_level >= Client::LogLevel::ERROR) \ + { \ + obj->logger(Client::LogLevel::ERROR, x); \ } #else -#define LOG_ERROR(x) +#define LOG_ERROR(obj, x) #endif -#if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_WARN -#define LOG_WARN(x) \ - if (this->m_impl->log_level >= Client::LogLevel::WARN) \ - { \ - this->m_impl->logger(Client::LogLevel::WARN, x); \ +#if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_WARNING +#define LOG_WARNING(obj, x) \ + if (obj->log_level >= Client::LogLevel::WARNING) \ + { \ + obj->logger(Client::LogLevel::WARNING, x); \ } #else -#define LOG_WARN(x) +#define LOG_WARNING(obj, x) #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_INFO -#define LOG_INFO(x) \ - if (this->m_impl->log_level >= Client::LogLevel::INFO) \ - { \ - this->m_impl->logger(Client::LogLevel::INFO, x); \ +#define LOG_INFO(obj, x) \ + if (obj->log_level >= Client::LogLevel::INFO) \ + { \ + obj->logger(Client::LogLevel::INFO, x); \ } #else -#define LOG_INFO(x) +#define LOG_INFO(obj, x) #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_DEBUG -#define LOG_DEBUG(x) \ - if (this->m_impl->log_level >= Client::LogLevel::DEBUG) \ - { \ - this->m_impl->logger(Client::LogLevel::DEBUG, x); \ +#define LOG_DEBUG(obj, x) \ + if (obj->log_level >= Client::LogLevel::DEBUG) \ + { \ + obj->logger(Client::LogLevel::DEBUG, x); \ } #else -#define LOG_DEBUG(x) +#define LOG_DEBUG(obj, x) #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_TRACE -#define LOG_TRACE(x) \ - if (this->m_impl->log_level >= Client::LogLevel::TRACE) \ - { \ - this->m_impl->logger(Client::LogLevel::TRACE, x); \ +#define LOG_TRACE(obj, x) \ + if (obj->log_level >= Client::LogLevel::TRACE) \ + { \ + obj->logger(Client::LogLevel::TRACE, x); \ } #else -#define LOG_TRACE(x) +#define LOG_TRACE(obj, x) #endif -- libgit2 0.21.4