diff --git a/include/TrueMQTT.h b/include/TrueMQTT.h index cb5f216..2ea01ff 100644 --- a/include/TrueMQTT.h +++ b/include/TrueMQTT.h @@ -7,6 +7,7 @@ #pragma once +#include #include #include @@ -111,10 +112,17 @@ namespace TrueMQTT * @param port Port of the MQTT broker. * @param client_id Client ID to use when connecting to the broker. * @param connection_timeout Timeout in seconds for the connection to the broker. + * @param connection_backoff Backoff time when connection to the broker failed. This is doubled every time a connection fails, up till \ref connection_backoff_max. * @param connection_backoff_max Maximum time between backoff attempts in seconds. * @param keep_alive_interval Interval in seconds between keep-alive messages. */ - Client(const std::string &host, int port, const std::string &client_id, int connection_timeout = 5, int connection_backoff_max = 30, int keep_alive_interval = 30); + Client(const std::string &host, + int port, + const std::string &client_id, + std::chrono::milliseconds connection_timeout = std::chrono::milliseconds(5000), + std::chrono::milliseconds connection_backoff = std::chrono::milliseconds(1000), + std::chrono::milliseconds connection_backoff_max = std::chrono::milliseconds(30000), + std::chrono::milliseconds keep_alive_interval = std::chrono::milliseconds(30000)); /** * @brief Destructor of the MQTT client. diff --git a/src/Client.cpp b/src/Client.cpp index 3bc3e3d..b208f91 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -13,9 +13,15 @@ #include -TrueMQTT::Client::Client(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) +TrueMQTT::Client::Client(const std::string &host, + int port, + const std::string &client_id, + std::chrono::milliseconds connection_timeout, + std::chrono::milliseconds connection_backoff, + std::chrono::milliseconds connection_backoff_max, + std::chrono::milliseconds keep_alive_interval) { - this->m_impl = std::make_unique(host, port, client_id, connection_timeout, connection_backoff_max, keep_alive_interval); + this->m_impl = std::make_unique(host, port, client_id, connection_timeout, connection_backoff, connection_backoff_max, keep_alive_interval); LOG_TRACE(this->m_impl, "Constructor of client called"); } @@ -27,11 +33,18 @@ TrueMQTT::Client::~Client() this->disconnect(); } -TrueMQTT::Client::Impl::Impl(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) +TrueMQTT::Client::Impl::Impl(const std::string &host, + int port, + const std::string &client_id, + std::chrono::milliseconds connection_timeout, + std::chrono::milliseconds connection_backoff, + std::chrono::milliseconds connection_backoff_max, + std::chrono::milliseconds keep_alive_interval) : host(host), port(port), client_id(client_id), connection_timeout(connection_timeout), + connection_backoff(connection_backoff), connection_backoff_max(connection_backoff_max), keep_alive_interval(keep_alive_interval) { diff --git a/src/ClientImpl.h b/src/ClientImpl.h index 453fa54..f468f4e 100644 --- a/src/ClientImpl.h +++ b/src/ClientImpl.h @@ -9,6 +9,7 @@ #include "TrueMQTT.h" +#include #include #include #include @@ -21,7 +22,13 @@ class TrueMQTT::Client::Impl { public: - Impl(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval); + Impl(const std::string &host, + int port, + const std::string &client_id, + std::chrono::milliseconds connection_timeout, + std::chrono::milliseconds connection_backoff, + std::chrono::milliseconds connection_backoff_max, + std::chrono::milliseconds keep_alive_interval); ~Impl(); enum State @@ -52,12 +59,13 @@ public: State state = State::DISCONNECTED; ///< The current state of the client. std::mutex state_mutex; ///< Mutex to protect state changes. - std::string host; ///< Host of the broker. - int port; ///< Port of the broker. - std::string client_id; ///< Client ID to use when connecting to the broker. - int connection_timeout; ///< Timeout in seconds for the connection to the broker. - int connection_backoff_max; ///< Maximum time between backoff attempts in seconds. - int keep_alive_interval; ///< Interval in seconds between keep-alive messages. + std::string host; ///< Host of the broker. + int port; ///< Port of the broker. + std::string client_id; ///< Client ID to use when connecting to the broker. + std::chrono::milliseconds connection_timeout; ///< Timeout in seconds for the connection to the broker. + std::chrono::milliseconds connection_backoff; ///< Backoff time when connection to the broker failed. This is doubled every time a connection fails, up till \ref connection_backoff_max. + std::chrono::milliseconds connection_backoff_max; ///< Maximum time between backoff attempts in seconds. + std::chrono::milliseconds keep_alive_interval; ///< Interval in seconds between keep-alive messages. Client::LogLevel log_level = Client::LogLevel::NONE; ///< The log level to use. std::function logger = [](Client::LogLevel, std::string) { /* empty */ }; ///< Logger callback. diff --git a/src/Connection.cpp b/src/Connection.cpp index 8553935..ce1be83 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -18,7 +18,8 @@ TrueMQTT::Client::Impl::Connection::Connection(Client::Impl &impl) : m_impl(impl), - m_thread(&Connection::run, this) + m_thread(&Connection::run, this), + m_backoff(impl.connection_backoff) { } @@ -67,10 +68,16 @@ void TrueMQTT::Client::Impl::Connection::run() break; case State::BACKOFF: - LOG_WARNING(&m_impl, "Connection failed; will retry in NNN seconds"); + LOG_WARNING(&m_impl, "Connection failed; will retry in " + std::to_string(m_backoff.count()) + "ms"); - // TODO: use the configuration - std::this_thread::sleep_for(std::chrono::seconds(5)); + std::this_thread::sleep_for(m_backoff); + + // Calculate the next backoff time, slowly reducing how often we retry. + m_backoff *= 2; + if (m_backoff > m_impl.connection_backoff_max) + { + m_backoff = m_impl.connection_backoff_max; + } m_state = State::RESOLVING; break; @@ -96,6 +103,11 @@ void TrueMQTT::Client::Impl::Connection::run() } case State::STOP: + if (m_socket != INVALID_SOCKET) + { + closesocket(m_socket); + m_socket = INVALID_SOCKET; + } return; } } @@ -243,8 +255,7 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny() } // Check if it is more than the timeout ago since we last tried a connection. - // TODO -- Used to configured value - if (std::chrono::steady_clock::now() < m_last_attempt + std::chrono::seconds(10)) + if (std::chrono::steady_clock::now() < m_last_attempt + m_impl.connection_timeout) { return true; } @@ -317,11 +328,13 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny() LOG_WARNING(&m_impl, "Could not set socket to non-blocking; expect performance impact"); } + m_backoff = m_impl.connection_backoff; + m_socket = socket_connected; + // Only change the state if no disconnect() has been requested in the mean time. if (m_state != State::STOP) { m_state = State::AUTHENTICATING; - m_socket = socket_connected; sendConnect(); } return true; diff --git a/src/Connection.h b/src/Connection.h index 445eadd..70c8767 100644 --- a/src/Connection.h +++ b/src/Connection.h @@ -9,6 +9,7 @@ #include "ClientImpl.h" +#include #include #include #include @@ -56,14 +57,19 @@ private: TrueMQTT::Client::Impl &m_impl; - State m_state = State::RESOLVING; - std::thread m_thread; ///< Current thread used to run this connection. + State m_state = State::RESOLVING; ///< Current state of the connection. + std::thread m_thread; ///< Current thread used to run this connection. + + std::chrono::milliseconds m_backoff; ///< Current backoff time. + + addrinfo *m_host_resolved = nullptr; ///< Address info of the hostname, once looked up. + std::vector m_addresses = {}; ///< List of addresses to try to connect to. - addrinfo *m_host_resolved = nullptr; ///< Address info of the hostname, once looked up. - std::vector m_addresses = {}; ///< List of addresses to try to connect to. size_t m_address_current = 0; ///< Index of the address we are currently trying to connect to. std::chrono::steady_clock::time_point m_last_attempt = {}; ///< Time of the last attempt to connect to the current address. - std::vector m_sockets = {}; ///< List of sockets we are currently trying to connect to. - std::map m_socket_to_address = {}; ///< Map of sockets to the address they are trying to connect to. - SOCKET m_socket = INVALID_SOCKET; ///< The socket we are currently connected with, or INVALID_SOCKET if not connected. + + std::vector m_sockets = {}; ///< List of sockets we are currently trying to connect to. + std::map m_socket_to_address = {}; ///< Map of sockets to the address they are trying to connect to. + + SOCKET m_socket = INVALID_SOCKET; ///< The socket we are currently connected with, or INVALID_SOCKET if not connected. };