From 995dd53fb41fbd457b17cafe381a23f2edb9cedc Mon Sep 17 00:00:00 2001 From: Patric Stout Date: Sat, 10 Sep 2022 15:34:21 +0200 Subject: [PATCH] feat(connection): connection logic using Happy Eyeballs --- CMakeLists.txt | 5 +++++ example/pubsub/main.cpp | 2 ++ include/TrueMQTT.h | 37 ++++++++++++++++++++++++++++++++----- src/Client.cpp | 110 +++++++++++++++++++++++++++++++------------------------------------------------------------------------------- src/ClientImpl.h | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Connection.cpp | 390 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Connection.h | 69 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Log.h | 42 +++++++++++++++++++++--------------------- 8 files changed, 626 insertions(+), 105 deletions(-) create mode 100644 src/ClientImpl.h create mode 100644 src/Connection.cpp create mode 100644 src/Connection.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 6e7b2fe..01346d8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,6 +11,7 @@ project(truemqtt VERSION 1.0.0 DESCRIPTION "A modern C++ MQTT Client library") set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED True) +set(THREADS_PREFER_PTHREAD_FLAG ON) set(MIN_LOGGER_LEVEL "INFO" CACHE STRING "Set minimal logger level (TRACE, DEBUG, INFO, WARNING, ERROR). No logs below this level will be omitted.") @@ -18,9 +19,13 @@ include(GNUInstallDirs) add_library(${PROJECT_NAME} src/Client.cpp + src/Connection.cpp ) target_include_directories(${PROJECT_NAME} PUBLIC include PRIVATE src) +find_package(Threads REQUIRED) +target_link_libraries(${PROJECT_NAME} PRIVATE Threads::Threads) + set_target_properties(${PROJECT_NAME} PROPERTIES VERSION ${PROJECT_VERSION} SOVERSION 1 PUBLIC_HEADER include/TrueMQTT.h) configure_file(truemqtt.pc.in truemqtt.pc @ONLY) diff --git a/example/pubsub/main.cpp b/example/pubsub/main.cpp index c387a88..eab126f 100644 --- a/example/pubsub/main.cpp +++ b/example/pubsub/main.cpp @@ -17,6 +17,8 @@ int main() client.setLogger(TrueMQTT::Client::LogLevel::TRACE, [](TrueMQTT::Client::LogLevel level, std::string message) { std::cout << "Log " << level << ": " << message << std::endl; }); client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 10); + client.setErrorCallback([](TrueMQTT::Client::Error error, std::string message) + { std::cout << "Error " << error << ": " << message << std::endl; }); client.connect(); diff --git a/include/TrueMQTT.h b/include/TrueMQTT.h index 4adb444..5ebd705 100644 --- a/include/TrueMQTT.h +++ b/include/TrueMQTT.h @@ -24,10 +24,37 @@ namespace TrueMQTT */ enum Error { - SUBSCRIBE_FAILED, ///< The subscription failed. The topic that failed to subscribe is passed as the second argument. - UNSUBSCRIBE_FAILED, ///< The unsubscription failed. The topic that failed to unsubscribe is passed as the second argument. - DISCONNECTED, ///< The connection was lost. The reason for the disconnection is passed as the second argument. - CONNECTION_FAILED, ///< The connection failed. The reason for the failure is passed as the second argument. + /** + * @brief The hostname could not be resolved into either an IPv4 or IPv6 address. + * + * This happens if the DNS server didn't return any valid IPv4 or IPv6 address + * based on the hostname given. + * + * Due to the nature of this error, this library has no way to recover from + * this. As such, this is considered a fatal error and the library takes no + * attempt to gracefully handle this. + * + * @note This is a fatal error. You have to call \ref disconnect after this. + */ + HOSTNAME_LOOKUP_FAILED, + + /** + * @brief The subscription failed. + * + * The topic that failed to subscribe is passed as the second argument. + * + * @note This error is non-fatal. + */ + SUBSCRIBE_FAILED, + + /** + * @brief The unsubscription failed. + * + * The topic that failed to unsubscribe is passed as the second argument. + * + * @note This error is non-fatal. + */ + UNSUBSCRIBE_FAILED, }; /** @@ -132,7 +159,7 @@ namespace TrueMQTT * * @param callback The callback to call when an error occurs. */ - void setErrorCallback(std::function callback); + void setErrorCallback(std::function callback); /** * @brief Set the publish queue to use. diff --git a/src/Client.cpp b/src/Client.cpp index c70398d..c8c1ccb 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -6,69 +6,12 @@ */ #include "TrueMQTT.h" -#include "Log.h" -#include -#include -#include -#include +#include "ClientImpl.h" +#include "Log.h" using TrueMQTT::Client; -// This class tracks all internal variables of the client. This way the header -// doesn't need to include the internal implementation of the Client. -class Client::Impl -{ -public: - Impl(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) - : host(host), - port(port), - client_id(client_id), - connection_timeout(connection_timeout), - connection_backoff_max(connection_backoff_max), - keep_alive_interval(keep_alive_interval) - { - } - - enum State - { - DISCONNECTED, ///< The client is not connected to the broker, nor is it trying to connect. - CONNECTING, ///< The client is either connecting or reconnecting to the broker. This can be in any stage of the connection process. - CONNECTED, ///< The client is connected to the broker. - }; - - void sendPublish(const std::string &topic, const std::string &payload, bool retain); ///< Send a publish message to the broker. - void sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker. - void sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker. - void changeToConnected(); ///< Called when a connection goes from CONNECTING state to CONNECTED state. - void toPublishQueue(const std::string &topic, const std::string &payload, bool retain); ///< Add a publish message to the publish queue. - - State state = State::DISCONNECTED; ///< The current state of the client. - std::mutex state_mutex; ///< Mutex to protect state changes. - - std::string host; ///< Host of the broker. - int port; ///< Port of the broker. - std::string client_id; ///< Client ID to use when connecting to the broker. - int connection_timeout; ///< Timeout in seconds for the connection to the broker. - int connection_backoff_max; ///< Maximum time between backoff attempts in seconds. - int keep_alive_interval; ///< Interval in seconds between keep-alive messages. - - Client::LogLevel log_level = Client::LogLevel::NONE; ///< The log level to use. - std::function logger = [](Client::LogLevel, std::string) {}; ///< Logger callback. - - std::string last_will_topic = ""; ///< Topic to publish the last will message to. - std::string last_will_payload = ""; ///< Payload of the last will message. - bool last_will_retain = false; ///< Whether to retain the last will message. - - std::function error_callback = [](Error, std::string &) {}; ///< Error callback. - - Client::PublishQueueType publish_queue_type = Client::PublishQueueType::DROP; ///< The type of queue to use for the publish queue. - size_t publish_queue_size = -1; ///< Size of the publish queue. - std::deque> publish_queue; ///< Queue of publish messages to send to the broker. - - std::map> subscriptions; ///< Map of active subscriptions. -}; - Client::Client(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) { this->m_impl = std::make_unique(host, port, client_id, connection_timeout, connection_backoff_max, keep_alive_interval); @@ -108,7 +51,7 @@ void Client::setLastWill(const std::string &topic, const std::string &payload, b this->m_impl->last_will_retain = retain; } -void Client::setErrorCallback(std::function callback) +void Client::setErrorCallback(std::function callback) { LOG_TRACE(this->m_impl, "Setting error callback"); @@ -141,6 +84,7 @@ void Client::connect() LOG_INFO(this->m_impl, "Connecting to " + this->m_impl->host + ":" + std::to_string(this->m_impl->port)); this->m_impl->state = Client::Impl::State::CONNECTING; + this->m_impl->connect(); } void Client::disconnect() @@ -156,7 +100,7 @@ void Client::disconnect() LOG_INFO(this->m_impl, "Disconnecting from broker"); this->m_impl->state = Client::Impl::State::DISCONNECTED; - this->m_impl->subscriptions.clear(); + this->m_impl->disconnect(); } void Client::publish(const std::string &topic, const std::string &payload, bool retain) @@ -234,33 +178,41 @@ void Client::Impl::sendUnsubscribe(const std::string &topic) LOG_TRACE(this, "Sending unsubscribe message for topic '" + topic + "'"); } -void Client::Impl::changeToConnected() +void Client::Impl::connectionStateChange(bool connected) { std::scoped_lock lock(this->state_mutex); - LOG_INFO(this, "Connected to broker"); + if (connected) + { + LOG_INFO(this, "Connected to broker"); - this->state = Client::Impl::State::CONNECTED; + this->state = Client::Impl::State::CONNECTED; - // Restoring subscriptions and flushing the queue is done while still under - // the lock. This to prevent \ref disconnect from being called while we are - // still sending messages. - // The drawback is that we are blocking \ref publish and \ref subscribe too - // when they are called just when we create a connection. But in the grand - // scheme of things, this is not likely, and this makes for a far easier - // implementation. + // Restoring subscriptions and flushing the queue is done while still under + // the lock. This to prevent \ref disconnect from being called while we are + // still sending messages. + // The drawback is that we are blocking \ref publish and \ref subscribe too + // when they are called just when we create a connection. But in the grand + // scheme of things, this is not likely, and this makes for a far easier + // implementation. - // First restore any subscription. - for (auto &subscription : this->subscriptions) - { - this->sendSubscribe(subscription.first); + // First restore any subscription. + for (auto &subscription : this->subscriptions) + { + this->sendSubscribe(subscription.first); + } + // Flush the publish queue. + for (auto &message : this->publish_queue) + { + this->sendPublish(std::get<0>(message), std::get<1>(message), std::get<2>(message)); + } + this->publish_queue.clear(); } - // Flush the publish queue. - for (auto &message : this->publish_queue) + else { - this->sendPublish(std::get<0>(message), std::get<1>(message), std::get<2>(message)); + LOG_INFO(this, "Disconnected from broker"); + this->state = Client::Impl::State::CONNECTING; } - this->publish_queue.clear(); } void Client::Impl::toPublishQueue(const std::string &topic, const std::string &payload, bool retain) diff --git a/src/ClientImpl.h b/src/ClientImpl.h new file mode 100644 index 0000000..bd33176 --- /dev/null +++ b/src/ClientImpl.h @@ -0,0 +1,76 @@ +/* + * Copyright (c) TrueBrain + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include "TrueMQTT.h" + +#include "Connection.h" + +#include +#include +#include +#include +#include + +// This class tracks all internal variables of the client. This way the header +// doesn't need to include the internal implementation of the Client. +class TrueMQTT::Client::Impl +{ +public: + Impl(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) + : host(host), + port(port), + client_id(client_id), + connection_timeout(connection_timeout), + connection_backoff_max(connection_backoff_max), + keep_alive_interval(keep_alive_interval) + { + } + + enum State + { + DISCONNECTED, ///< The client is not connected to the broker, nor is it trying to connect. + CONNECTING, ///< The client is either connecting or reconnecting to the broker. This can be in any stage of the connection process. + CONNECTED, ///< The client is connected to the broker. + }; + + void connect(); ///< Connect to the broker. + void disconnect(); ///< Disconnect from the broker. + void sendPublish(const std::string &topic, const std::string &payload, bool retain); ///< Send a publish message to the broker. + void sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker. + void sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker. + void connectionStateChange(bool connected); ///< Called when a connection goes from CONNECTING state to CONNECTED state or visa versa. + void toPublishQueue(const std::string &topic, const std::string &payload, bool retain); ///< Add a publish message to the publish queue. + + State state = State::DISCONNECTED; ///< The current state of the client. + std::mutex state_mutex; ///< Mutex to protect state changes. + + std::string host; ///< Host of the broker. + int port; ///< Port of the broker. + std::string client_id; ///< Client ID to use when connecting to the broker. + int connection_timeout; ///< Timeout in seconds for the connection to the broker. + int connection_backoff_max; ///< Maximum time between backoff attempts in seconds. + int keep_alive_interval; ///< Interval in seconds between keep-alive messages. + + Client::LogLevel log_level = Client::LogLevel::NONE; ///< The log level to use. + std::function logger = [](Client::LogLevel, std::string) {}; ///< Logger callback. + + std::string last_will_topic = ""; ///< Topic to publish the last will message to. + std::string last_will_payload = ""; ///< Payload of the last will message. + bool last_will_retain = false; ///< Whether to retain the last will message. + + std::function error_callback = [](Error, std::string) {}; ///< Error callback. + + Client::PublishQueueType publish_queue_type = Client::PublishQueueType::DROP; ///< The type of queue to use for the publish queue. + size_t publish_queue_size = -1; ///< Size of the publish queue. + std::deque> publish_queue; ///< Queue of publish messages to send to the broker. + + std::map> subscriptions; ///< Map of active subscriptions. + + std::unique_ptr connection; ///< Connection to the broker. +}; diff --git a/src/Connection.cpp b/src/Connection.cpp new file mode 100644 index 0000000..6c8c20e --- /dev/null +++ b/src/Connection.cpp @@ -0,0 +1,390 @@ +/* + * Copyright (c) TrueBrain + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "ClientImpl.h" +#include "Connection.h" +#include "Log.h" + +#include +#include +#include +#include +#include +#include + +Connection::Connection(TrueMQTT::Client::LogLevel log_level, + const std::function logger, + const std::function error_callback, + const std::function connection_change_callback, + const std::string &host, + int port) + : log_level(log_level), + logger(std::move(logger)), + m_error_callback(std::move(error_callback)), + m_connection_change_callback(std::move(connection_change_callback)), + m_host(host), + m_port(port), + m_thread(&Connection::run, this) +{ +} + +Connection::~Connection() +{ + // Make sure the connection thread is terminated. + if (m_thread.joinable()) + { + m_thread.join(); + } + + // freeaddrinfo() is one of those functions that doesn't take kind to NULL pointers + // on some platforms. + if (this->m_host_resolved != NULL) + { + freeaddrinfo(this->m_host_resolved); + this->m_host_resolved = NULL; + } +} + +std::string Connection::addrinfo_to_string(addrinfo *address) +{ + char host[NI_MAXHOST]; + getnameinfo(address->ai_addr, address->ai_addrlen, host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST); + + return std::string(host); +} + +void Connection::run() +{ + while (true) + { + switch (m_state) + { + case State::RESOLVING: + resolve(); + break; + + case State::CONNECTING: + if (!connect_to_any()) + { + m_state = State::BACKOFF; + } + break; + + case State::BACKOFF: + LOG_WARNING(this, "Connection failed; will retry in NNN seconds"); + + // TODO: use the configuration + std::this_thread::sleep_for(std::chrono::seconds(5)); + + m_state = State::RESOLVING; + break; + + case State::CONNECTED: + { + char buf[9000]; + ssize_t res = recv(m_socket, buf, sizeof(buf), 0); + + if (res == 0) + { + LOG_WARNING(this, "Connection closed by peer"); + m_state = State::BACKOFF; + m_connection_change_callback(false); + } + else if (res < 0) + { + LOG_WARNING(this, "Connection read error: " + std::string(strerror(errno))); + m_state = State::BACKOFF; + m_connection_change_callback(false); + } + else + { + LOG_TRACE(this, "Received " + std::to_string(res) + " bytes"); + } + + break; + } + } + } +} + +void Connection::resolve() +{ + m_address_current = 0; + m_socket = INVALID_SOCKET; + m_addresses.clear(); + + addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; // Request IPv4 and IPv6. + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_ADDRCONFIG; + + // Request the OS to resolve the hostname into an IP address. + // We do this even if the hostname is already an IP address, as that + // makes for far easier code. + int error = getaddrinfo(m_host.c_str(), std::to_string(m_port).c_str(), &hints, &m_host_resolved); + if (error != 0) + { + m_error_callback(TrueMQTT::Client::Error::HOSTNAME_LOOKUP_FAILED, std::string(gai_strerror(error))); + return; + } + + // Split the list of addresses in two lists, one for IPv4 and one for + // IPv6. + std::deque addresses_ipv4; + std::deque addresses_ipv6; + for (addrinfo *ai = this->m_host_resolved; ai != nullptr; ai = ai->ai_next) + { + if (ai->ai_family == AF_INET6) + { + addresses_ipv6.emplace_back(ai); + } + else if (ai->ai_family == AF_INET) + { + addresses_ipv4.emplace_back(ai); + } + // Sometimes there can also be other types of families, but we are + // not interested in those results. + } + + // Interweave the IPv6 and IPv4 addresses. For connections we apply + // "Happy Eyeballs", where we try an IPv6 connection first, and if that + // doesn't connect after 100ms, we try an IPv4 connection. + // This is to prevent long timeouts when IPv6 is not available, but + // still prefer IPv6 where possible. + while (!addresses_ipv6.empty() || !addresses_ipv4.empty()) + { + if (!addresses_ipv6.empty()) + { + m_addresses.emplace_back(addresses_ipv6.front()); + addresses_ipv6.pop_front(); + } + if (!addresses_ipv4.empty()) + { + m_addresses.emplace_back(addresses_ipv4.front()); + addresses_ipv4.pop_front(); + } + } + +#if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_DEBUG + // For debugging, print the addresses we resolved into. + if (this->log_level >= TrueMQTT::Client::LogLevel::DEBUG) + { + LOG_DEBUG(this, "Resolved hostname '" + m_host + "' to:"); + for (addrinfo *res : m_addresses) + { + LOG_DEBUG(this, "- " + addrinfo_to_string(res)); + } + } +#endif + + // In some odd cases, the list can be empty. This is a fatal error. + if (m_addresses.empty()) + { + m_error_callback(TrueMQTT::Client::Error::HOSTNAME_LOOKUP_FAILED, ""); + return; + } + + m_state = State::CONNECTING; +} + +bool Connection::connect_to_any() +{ + // Check if we have pending attempts. If not, queue a new attempt. + if (m_sockets.empty()) + { + return try_next_address(); + } + + // Check for at most 100ms if there is any activity on the sockets. + timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 100; + + fd_set write_fds; + FD_ZERO(&write_fds); + for (const auto &socket : m_sockets) + { + FD_SET(socket, &write_fds); + } + + int result = select(FD_SETSIZE, NULL, &write_fds, NULL, &timeout); + + // Check if there was an error on select(). This is hard to recover from. + if (result < 0) + { + LOG_ERROR(this, "select() failed: " + std::string(strerror(errno))); + return true; + } + + // A result of zero means there was no activity on any of the sockets. + if (result == 0) + { + // Check if it was more than 250ms ago since we started our last connection. + if (std::chrono::steady_clock::now() < m_last_attempt + std::chrono::milliseconds(250)) + { + return true; + } + + // Try to queue the next address for a connection. + if (try_next_address()) + { + return true; + } + + // Check if it is more than the timeout ago since we last tried a connection. + // TODO -- Used to configured value + if (std::chrono::steady_clock::now() < m_last_attempt + std::chrono::seconds(10)) + { + return true; + } + + LOG_ERROR(this, "Connection attempt to broker timed out"); + + // Cleanup all sockets. + for (const auto &socket : m_sockets) + { + closesocket(socket); + } + m_socket_to_address.clear(); + m_sockets.clear(); + + return false; + } + + // A socket that reports to be writeable is either connected or in error-state. + // Remove all sockets that are in error-state. The first that is left and writeable, + // will be the socket to use for the connection. + SOCKET socket_connected = INVALID_SOCKET; + for (auto socket_it = m_sockets.begin(); socket_it != m_sockets.end(); /* nothing */) + { + // Check if the socket is in error-state. + int err; + socklen_t len = sizeof(err); + getsockopt(*socket_it, SOL_SOCKET, SO_ERROR, (char *)&err, &len); + if (err != 0) + { + // It is in error-state: report about it, and remove it. + LOG_ERROR(this, "Could not connect to " + addrinfo_to_string(m_socket_to_address[*socket_it]) + ": " + std::string(strerror(err))); + closesocket(*socket_it); + m_socket_to_address.erase(*socket_it); + socket_it = m_sockets.erase(socket_it); + continue; + } + + if (socket_connected == INVALID_SOCKET && FD_ISSET(*socket_it, &write_fds)) + { + socket_connected = *socket_it; + } + + socket_it++; + } + + if (socket_connected == INVALID_SOCKET) + { + // No socket is connected yet. Continue waiting. + return true; + } + + // We have a connected socket. + LOG_DEBUG(this, "Connected to " + addrinfo_to_string(m_socket_to_address[socket_connected])); + + // Close all other pending connections. + for (const auto &socket : m_sockets) + { + if (socket != socket_connected) + { + closesocket(socket); + } + } + m_socket_to_address.clear(); + m_sockets.clear(); + + // Disable non-blocking, as we will be reading from a thread, which can be blocking. + int nonblocking = 0; + if (ioctl(socket_connected, FIONBIO, &nonblocking) != 0) + { + LOG_WARNING(this, "Could not set socket to non-blocking; expect performance impact"); + } + + m_socket = socket_connected; + m_state = State::CONNECTED; + m_connection_change_callback(true); + return true; +} + +bool Connection::try_next_address() +{ + if (m_address_current >= m_addresses.size()) + { + return false; + } + + m_last_attempt = std::chrono::steady_clock::now(); + connect(m_addresses[m_address_current++]); + + return true; +} + +void Connection::connect(addrinfo *address) +{ + // Create a new socket based on the resolved information. + SOCKET sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol); + if (sock == INVALID_SOCKET) + { + LOG_ERROR(this, "Could not create new socket"); + return; + } + + // Set socket to no-delay; this improves latency, but reduces throughput. + int flags = 1; + /* The (const char*) cast is needed for Windows */ + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags)) != 0) + { + LOG_WARNING(this, "Could not set TCP_NODELAY on socket"); + } + // Set socket to non-blocking; this allows for multiple connects to be pending. This is + // needed to apply Happy Eyeballs. + int nonblocking = 1; + if (ioctl(sock, FIONBIO, &nonblocking) != 0) + { + LOG_WARNING(this, "Could not set socket to non-blocking; expect performance impact"); + } + + // Start the actual connection attempt. + LOG_DEBUG(this, "Connecting to " + addrinfo_to_string(address)); + int err = ::connect(sock, address->ai_addr, (int)address->ai_addrlen); + if (err != 0 && errno != EINPROGRESS) + { + // As we are non-blocking, normally this returns "in progress". If anything + // else, something is wrong. Report the error and close the socket. + closesocket(sock); + + LOG_ERROR(this, "Could not connect to " + addrinfo_to_string(address) + ": " + std::string(strerror(errno))); + return; + } + + // Connection is pending. + m_socket_to_address[sock] = address; + m_sockets.push_back(sock); +} + +void TrueMQTT::Client::Impl::connect() +{ + this->connection = std::make_unique( + this->log_level, this->logger, this->error_callback, [this](bool connected) + { this->connectionStateChange(connected); }, + this->host, this->port); +} + +void TrueMQTT::Client::Impl::disconnect() +{ + this->subscriptions.clear(); + this->publish_queue.clear(); + + this->connection.reset(); +} diff --git a/src/Connection.h b/src/Connection.h new file mode 100644 index 0000000..ad1e3eb --- /dev/null +++ b/src/Connection.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) TrueBrain + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include "TrueMQTT.h" + +#include +#include +#include +#include +#include + +// Some definitions to make future cross-platform work easier. +#define SOCKET int +#define INVALID_SOCKET -1 +#define closesocket close + +class Connection +{ +public: + Connection(TrueMQTT::Client::LogLevel log_level, + const std::function logger, + const std::function error_callback, + const std::function connection_change_callback, + const std::string &host, + int port); + ~Connection(); + +private: + void run(); + void resolve(); + bool try_next_address(); + void connect(addrinfo *address); + bool connect_to_any(); + std::string addrinfo_to_string(addrinfo *address); + + enum class State + { + RESOLVING, + CONNECTING, + CONNECTED, + BACKOFF, + }; + + TrueMQTT::Client::LogLevel log_level; + const std::function logger; + + const std::function m_error_callback; + const std::function m_connection_change_callback; + + const std::string &m_host; ///< The hostname or IP address to connect to. + int m_port; ///< The port to connect to. + + State m_state = State::RESOLVING; + std::thread m_thread; ///< Current thread used to run this connection. + + addrinfo *m_host_resolved = nullptr; ///< Address info of the hostname, once looked up. + std::vector m_addresses = {}; ///< List of addresses to try to connect to. + size_t m_address_current = 0; ///< Index of the address we are currently trying to connect to. + std::chrono::steady_clock::time_point m_last_attempt = {}; ///< Time of the last attempt to connect to the current address. + std::vector m_sockets = {}; ///< List of sockets we are currently trying to connect to. + std::map m_socket_to_address = {}; ///< Map of sockets to the address they are trying to connect to. + SOCKET m_socket = INVALID_SOCKET; ///< The socket we are currently connected with, or INVALID_SOCKET if not connected. +}; diff --git a/src/Log.h b/src/Log.h index aa713c7..9cfccc0 100644 --- a/src/Log.h +++ b/src/Log.h @@ -8,7 +8,7 @@ #pragma once // Wrappers to make logging a tiny bit easier to read. -// The "obj" is the Client::Impl instance to point to. +// The "obj" is the TrueMQTT::Client::Impl instance to point to. #define LOGGER_LEVEL_ERROR 0 #define LOGGER_LEVEL_WARNING 1 @@ -22,50 +22,50 @@ #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_ERROR -#define LOG_ERROR(obj, x) \ - if (obj->log_level >= Client::LogLevel::ERROR) \ - { \ - obj->logger(Client::LogLevel::ERROR, x); \ +#define LOG_ERROR(obj, x) \ + if (obj->log_level >= TrueMQTT::Client::LogLevel::ERROR) \ + { \ + obj->logger(TrueMQTT::Client::LogLevel::ERROR, x); \ } #else #define LOG_ERROR(obj, x) #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_WARNING -#define LOG_WARNING(obj, x) \ - if (obj->log_level >= Client::LogLevel::WARNING) \ - { \ - obj->logger(Client::LogLevel::WARNING, x); \ +#define LOG_WARNING(obj, x) \ + if (obj->log_level >= TrueMQTT::Client::LogLevel::WARNING) \ + { \ + obj->logger(TrueMQTT::Client::LogLevel::WARNING, x); \ } #else #define LOG_WARNING(obj, x) #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_INFO -#define LOG_INFO(obj, x) \ - if (obj->log_level >= Client::LogLevel::INFO) \ - { \ - obj->logger(Client::LogLevel::INFO, x); \ +#define LOG_INFO(obj, x) \ + if (obj->log_level >= TrueMQTT::Client::LogLevel::INFO) \ + { \ + obj->logger(TrueMQTT::Client::LogLevel::INFO, x); \ } #else #define LOG_INFO(obj, x) #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_DEBUG -#define LOG_DEBUG(obj, x) \ - if (obj->log_level >= Client::LogLevel::DEBUG) \ - { \ - obj->logger(Client::LogLevel::DEBUG, x); \ +#define LOG_DEBUG(obj, x) \ + if (obj->log_level >= TrueMQTT::Client::LogLevel::DEBUG) \ + { \ + obj->logger(TrueMQTT::Client::LogLevel::DEBUG, x); \ } #else #define LOG_DEBUG(obj, x) #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_TRACE -#define LOG_TRACE(obj, x) \ - if (obj->log_level >= Client::LogLevel::TRACE) \ - { \ - obj->logger(Client::LogLevel::TRACE, x); \ +#define LOG_TRACE(obj, x) \ + if (obj->log_level >= TrueMQTT::Client::LogLevel::TRACE) \ + { \ + obj->logger(TrueMQTT::Client::LogLevel::TRACE, x); \ } #else #define LOG_TRACE(obj, x) -- libgit2 0.21.4