Commit 995dd53fb41fbd457b17cafe381a23f2edb9cedc

Authored by Patric Stout
1 parent 7b1eeefa

feat(connection): connection logic using Happy Eyeballs

By using Happy Eyeballs, we stagger connections of a host resolves
into multiple IPs. This is useful for IPv6 / IPv4 hosts, where
one of the two can stutter.

Sadly, creating a connection is rather complex, with many odd
things that can happen along the way. For example, a writeable
socket doesn't mean it is actually connected; it can also mean
the socket is in an error state.

This implementation is inspired by my own work on OpenTTD's
variant of this.
CMakeLists.txt
... ... @@ -11,6 +11,7 @@ project(truemqtt VERSION 1.0.0 DESCRIPTION "A modern C++ MQTT Client library")
11 11  
12 12 set(CMAKE_CXX_STANDARD 17)
13 13 set(CMAKE_CXX_STANDARD_REQUIRED True)
  14 +set(THREADS_PREFER_PTHREAD_FLAG ON)
14 15  
15 16 set(MIN_LOGGER_LEVEL "INFO" CACHE STRING "Set minimal logger level (TRACE, DEBUG, INFO, WARNING, ERROR). No logs below this level will be omitted.")
16 17  
... ... @@ -18,9 +19,13 @@ include(GNUInstallDirs)
18 19  
19 20 add_library(${PROJECT_NAME}
20 21 src/Client.cpp
  22 + src/Connection.cpp
21 23 )
22 24 target_include_directories(${PROJECT_NAME} PUBLIC include PRIVATE src)
23 25  
  26 +find_package(Threads REQUIRED)
  27 +target_link_libraries(${PROJECT_NAME} PRIVATE Threads::Threads)
  28 +
24 29 set_target_properties(${PROJECT_NAME} PROPERTIES VERSION ${PROJECT_VERSION} SOVERSION 1 PUBLIC_HEADER include/TrueMQTT.h)
25 30 configure_file(truemqtt.pc.in truemqtt.pc @ONLY)
26 31  
... ...
example/pubsub/main.cpp
... ... @@ -17,6 +17,8 @@ int main()
17 17 client.setLogger(TrueMQTT::Client::LogLevel::TRACE, [](TrueMQTT::Client::LogLevel level, std::string message)
18 18 { std::cout << "Log " << level << ": " << message << std::endl; });
19 19 client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 10);
  20 + client.setErrorCallback([](TrueMQTT::Client::Error error, std::string message)
  21 + { std::cout << "Error " << error << ": " << message << std::endl; });
20 22  
21 23 client.connect();
22 24  
... ...
include/TrueMQTT.h
... ... @@ -24,10 +24,37 @@ namespace TrueMQTT
24 24 */
25 25 enum Error
26 26 {
27   - SUBSCRIBE_FAILED, ///< The subscription failed. The topic that failed to subscribe is passed as the second argument.
28   - UNSUBSCRIBE_FAILED, ///< The unsubscription failed. The topic that failed to unsubscribe is passed as the second argument.
29   - DISCONNECTED, ///< The connection was lost. The reason for the disconnection is passed as the second argument.
30   - CONNECTION_FAILED, ///< The connection failed. The reason for the failure is passed as the second argument.
  27 + /**
  28 + * @brief The hostname could not be resolved into either an IPv4 or IPv6 address.
  29 + *
  30 + * This happens if the DNS server didn't return any valid IPv4 or IPv6 address
  31 + * based on the hostname given.
  32 + *
  33 + * Due to the nature of this error, this library has no way to recover from
  34 + * this. As such, this is considered a fatal error and the library takes no
  35 + * attempt to gracefully handle this.
  36 + *
  37 + * @note This is a fatal error. You have to call \ref disconnect after this.
  38 + */
  39 + HOSTNAME_LOOKUP_FAILED,
  40 +
  41 + /**
  42 + * @brief The subscription failed.
  43 + *
  44 + * The topic that failed to subscribe is passed as the second argument.
  45 + *
  46 + * @note This error is non-fatal.
  47 + */
  48 + SUBSCRIBE_FAILED,
  49 +
  50 + /**
  51 + * @brief The unsubscription failed.
  52 + *
  53 + * The topic that failed to unsubscribe is passed as the second argument.
  54 + *
  55 + * @note This error is non-fatal.
  56 + */
  57 + UNSUBSCRIBE_FAILED,
31 58 };
32 59  
33 60 /**
... ... @@ -132,7 +159,7 @@ namespace TrueMQTT
132 159 *
133 160 * @param callback The callback to call when an error occurs.
134 161 */
135   - void setErrorCallback(std::function<void(Error, std::string &)> callback);
  162 + void setErrorCallback(std::function<void(Error, std::string)> callback);
136 163  
137 164 /**
138 165 * @brief Set the publish queue to use.
... ...
src/Client.cpp
... ... @@ -6,69 +6,12 @@
6 6 */
7 7  
8 8 #include "TrueMQTT.h"
9   -#include "Log.h"
10 9  
11   -#include <deque>
12   -#include <map>
13   -#include <mutex>
14   -#include <string>
  10 +#include "ClientImpl.h"
  11 +#include "Log.h"
15 12  
16 13 using TrueMQTT::Client;
17 14  
18   -// This class tracks all internal variables of the client. This way the header
19   -// doesn't need to include the internal implementation of the Client.
20   -class Client::Impl
21   -{
22   -public:
23   - Impl(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval)
24   - : host(host),
25   - port(port),
26   - client_id(client_id),
27   - connection_timeout(connection_timeout),
28   - connection_backoff_max(connection_backoff_max),
29   - keep_alive_interval(keep_alive_interval)
30   - {
31   - }
32   -
33   - enum State
34   - {
35   - DISCONNECTED, ///< The client is not connected to the broker, nor is it trying to connect.
36   - CONNECTING, ///< The client is either connecting or reconnecting to the broker. This can be in any stage of the connection process.
37   - CONNECTED, ///< The client is connected to the broker.
38   - };
39   -
40   - void sendPublish(const std::string &topic, const std::string &payload, bool retain); ///< Send a publish message to the broker.
41   - void sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker.
42   - void sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker.
43   - void changeToConnected(); ///< Called when a connection goes from CONNECTING state to CONNECTED state.
44   - void toPublishQueue(const std::string &topic, const std::string &payload, bool retain); ///< Add a publish message to the publish queue.
45   -
46   - State state = State::DISCONNECTED; ///< The current state of the client.
47   - std::mutex state_mutex; ///< Mutex to protect state changes.
48   -
49   - std::string host; ///< Host of the broker.
50   - int port; ///< Port of the broker.
51   - std::string client_id; ///< Client ID to use when connecting to the broker.
52   - int connection_timeout; ///< Timeout in seconds for the connection to the broker.
53   - int connection_backoff_max; ///< Maximum time between backoff attempts in seconds.
54   - int keep_alive_interval; ///< Interval in seconds between keep-alive messages.
55   -
56   - Client::LogLevel log_level = Client::LogLevel::NONE; ///< The log level to use.
57   - std::function<void(Client::LogLevel, std::string)> logger = [](Client::LogLevel, std::string) {}; ///< Logger callback.
58   -
59   - std::string last_will_topic = ""; ///< Topic to publish the last will message to.
60   - std::string last_will_payload = ""; ///< Payload of the last will message.
61   - bool last_will_retain = false; ///< Whether to retain the last will message.
62   -
63   - std::function<void(Error, std::string &)> error_callback = [](Error, std::string &) {}; ///< Error callback.
64   -
65   - Client::PublishQueueType publish_queue_type = Client::PublishQueueType::DROP; ///< The type of queue to use for the publish queue.
66   - size_t publish_queue_size = -1; ///< Size of the publish queue.
67   - std::deque<std::tuple<std::string, std::string, bool>> publish_queue; ///< Queue of publish messages to send to the broker.
68   -
69   - std::map<std::string, std::function<void(std::string, std::string)>> subscriptions; ///< Map of active subscriptions.
70   -};
71   -
72 15 Client::Client(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval)
73 16 {
74 17 this->m_impl = std::make_unique<Client::Impl>(host, port, client_id, connection_timeout, connection_backoff_max, keep_alive_interval);
... ... @@ -108,7 +51,7 @@ void Client::setLastWill(const std::string &amp;topic, const std::string &amp;payload, b
108 51 this->m_impl->last_will_retain = retain;
109 52 }
110 53  
111   -void Client::setErrorCallback(std::function<void(Error, std::string &)> callback)
  54 +void Client::setErrorCallback(std::function<void(Error, std::string)> callback)
112 55 {
113 56 LOG_TRACE(this->m_impl, "Setting error callback");
114 57  
... ... @@ -141,6 +84,7 @@ void Client::connect()
141 84 LOG_INFO(this->m_impl, "Connecting to " + this->m_impl->host + ":" + std::to_string(this->m_impl->port));
142 85  
143 86 this->m_impl->state = Client::Impl::State::CONNECTING;
  87 + this->m_impl->connect();
144 88 }
145 89  
146 90 void Client::disconnect()
... ... @@ -156,7 +100,7 @@ void Client::disconnect()
156 100 LOG_INFO(this->m_impl, "Disconnecting from broker");
157 101  
158 102 this->m_impl->state = Client::Impl::State::DISCONNECTED;
159   - this->m_impl->subscriptions.clear();
  103 + this->m_impl->disconnect();
160 104 }
161 105  
162 106 void Client::publish(const std::string &topic, const std::string &payload, bool retain)
... ... @@ -234,33 +178,41 @@ void Client::Impl::sendUnsubscribe(const std::string &amp;topic)
234 178 LOG_TRACE(this, "Sending unsubscribe message for topic '" + topic + "'");
235 179 }
236 180  
237   -void Client::Impl::changeToConnected()
  181 +void Client::Impl::connectionStateChange(bool connected)
238 182 {
239 183 std::scoped_lock lock(this->state_mutex);
240 184  
241   - LOG_INFO(this, "Connected to broker");
  185 + if (connected)
  186 + {
  187 + LOG_INFO(this, "Connected to broker");
242 188  
243   - this->state = Client::Impl::State::CONNECTED;
  189 + this->state = Client::Impl::State::CONNECTED;
244 190  
245   - // Restoring subscriptions and flushing the queue is done while still under
246   - // the lock. This to prevent \ref disconnect from being called while we are
247   - // still sending messages.
248   - // The drawback is that we are blocking \ref publish and \ref subscribe too
249   - // when they are called just when we create a connection. But in the grand
250   - // scheme of things, this is not likely, and this makes for a far easier
251   - // implementation.
  191 + // Restoring subscriptions and flushing the queue is done while still under
  192 + // the lock. This to prevent \ref disconnect from being called while we are
  193 + // still sending messages.
  194 + // The drawback is that we are blocking \ref publish and \ref subscribe too
  195 + // when they are called just when we create a connection. But in the grand
  196 + // scheme of things, this is not likely, and this makes for a far easier
  197 + // implementation.
252 198  
253   - // First restore any subscription.
254   - for (auto &subscription : this->subscriptions)
255   - {
256   - this->sendSubscribe(subscription.first);
  199 + // First restore any subscription.
  200 + for (auto &subscription : this->subscriptions)
  201 + {
  202 + this->sendSubscribe(subscription.first);
  203 + }
  204 + // Flush the publish queue.
  205 + for (auto &message : this->publish_queue)
  206 + {
  207 + this->sendPublish(std::get<0>(message), std::get<1>(message), std::get<2>(message));
  208 + }
  209 + this->publish_queue.clear();
257 210 }
258   - // Flush the publish queue.
259   - for (auto &message : this->publish_queue)
  211 + else
260 212 {
261   - this->sendPublish(std::get<0>(message), std::get<1>(message), std::get<2>(message));
  213 + LOG_INFO(this, "Disconnected from broker");
  214 + this->state = Client::Impl::State::CONNECTING;
262 215 }
263   - this->publish_queue.clear();
264 216 }
265 217  
266 218 void Client::Impl::toPublishQueue(const std::string &topic, const std::string &payload, bool retain)
... ...
src/ClientImpl.h 0 โ†’ 100644
  1 +/*
  2 + * Copyright (c) TrueBrain
  3 + *
  4 + * This source code is licensed under the MIT license found in the
  5 + * LICENSE file in the root directory of this source tree.
  6 + */
  7 +
  8 +#pragma once
  9 +
  10 +#include "TrueMQTT.h"
  11 +
  12 +#include "Connection.h"
  13 +
  14 +#include <deque>
  15 +#include <map>
  16 +#include <mutex>
  17 +#include <string>
  18 +#include <thread>
  19 +
  20 +// This class tracks all internal variables of the client. This way the header
  21 +// doesn't need to include the internal implementation of the Client.
  22 +class TrueMQTT::Client::Impl
  23 +{
  24 +public:
  25 + Impl(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval)
  26 + : host(host),
  27 + port(port),
  28 + client_id(client_id),
  29 + connection_timeout(connection_timeout),
  30 + connection_backoff_max(connection_backoff_max),
  31 + keep_alive_interval(keep_alive_interval)
  32 + {
  33 + }
  34 +
  35 + enum State
  36 + {
  37 + DISCONNECTED, ///< The client is not connected to the broker, nor is it trying to connect.
  38 + CONNECTING, ///< The client is either connecting or reconnecting to the broker. This can be in any stage of the connection process.
  39 + CONNECTED, ///< The client is connected to the broker.
  40 + };
  41 +
  42 + void connect(); ///< Connect to the broker.
  43 + void disconnect(); ///< Disconnect from the broker.
  44 + void sendPublish(const std::string &topic, const std::string &payload, bool retain); ///< Send a publish message to the broker.
  45 + void sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker.
  46 + void sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker.
  47 + void connectionStateChange(bool connected); ///< Called when a connection goes from CONNECTING state to CONNECTED state or visa versa.
  48 + void toPublishQueue(const std::string &topic, const std::string &payload, bool retain); ///< Add a publish message to the publish queue.
  49 +
  50 + State state = State::DISCONNECTED; ///< The current state of the client.
  51 + std::mutex state_mutex; ///< Mutex to protect state changes.
  52 +
  53 + std::string host; ///< Host of the broker.
  54 + int port; ///< Port of the broker.
  55 + std::string client_id; ///< Client ID to use when connecting to the broker.
  56 + int connection_timeout; ///< Timeout in seconds for the connection to the broker.
  57 + int connection_backoff_max; ///< Maximum time between backoff attempts in seconds.
  58 + int keep_alive_interval; ///< Interval in seconds between keep-alive messages.
  59 +
  60 + Client::LogLevel log_level = Client::LogLevel::NONE; ///< The log level to use.
  61 + std::function<void(Client::LogLevel, std::string)> logger = [](Client::LogLevel, std::string) {}; ///< Logger callback.
  62 +
  63 + std::string last_will_topic = ""; ///< Topic to publish the last will message to.
  64 + std::string last_will_payload = ""; ///< Payload of the last will message.
  65 + bool last_will_retain = false; ///< Whether to retain the last will message.
  66 +
  67 + std::function<void(Error, std::string)> error_callback = [](Error, std::string) {}; ///< Error callback.
  68 +
  69 + Client::PublishQueueType publish_queue_type = Client::PublishQueueType::DROP; ///< The type of queue to use for the publish queue.
  70 + size_t publish_queue_size = -1; ///< Size of the publish queue.
  71 + std::deque<std::tuple<std::string, std::string, bool>> publish_queue; ///< Queue of publish messages to send to the broker.
  72 +
  73 + std::map<std::string, std::function<void(std::string, std::string)>> subscriptions; ///< Map of active subscriptions.
  74 +
  75 + std::unique_ptr<Connection> connection; ///< Connection to the broker.
  76 +};
... ...
src/Connection.cpp 0 โ†’ 100644
  1 +/*
  2 + * Copyright (c) TrueBrain
  3 + *
  4 + * This source code is licensed under the MIT license found in the
  5 + * LICENSE file in the root directory of this source tree.
  6 + */
  7 +
  8 +#include "ClientImpl.h"
  9 +#include "Connection.h"
  10 +#include "Log.h"
  11 +
  12 +#include <memory.h>
  13 +#include <netinet/tcp.h>
  14 +#include <sys/ioctl.h>
  15 +#include <sys/socket.h>
  16 +#include <unistd.h>
  17 +#include <vector>
  18 +
  19 +Connection::Connection(TrueMQTT::Client::LogLevel log_level,
  20 + const std::function<void(TrueMQTT::Client::LogLevel, std::string)> logger,
  21 + const std::function<void(TrueMQTT::Client::Error, std::string)> error_callback,
  22 + const std::function<void(bool)> connection_change_callback,
  23 + const std::string &host,
  24 + int port)
  25 + : log_level(log_level),
  26 + logger(std::move(logger)),
  27 + m_error_callback(std::move(error_callback)),
  28 + m_connection_change_callback(std::move(connection_change_callback)),
  29 + m_host(host),
  30 + m_port(port),
  31 + m_thread(&Connection::run, this)
  32 +{
  33 +}
  34 +
  35 +Connection::~Connection()
  36 +{
  37 + // Make sure the connection thread is terminated.
  38 + if (m_thread.joinable())
  39 + {
  40 + m_thread.join();
  41 + }
  42 +
  43 + // freeaddrinfo() is one of those functions that doesn't take kind to NULL pointers
  44 + // on some platforms.
  45 + if (this->m_host_resolved != NULL)
  46 + {
  47 + freeaddrinfo(this->m_host_resolved);
  48 + this->m_host_resolved = NULL;
  49 + }
  50 +}
  51 +
  52 +std::string Connection::addrinfo_to_string(addrinfo *address)
  53 +{
  54 + char host[NI_MAXHOST];
  55 + getnameinfo(address->ai_addr, address->ai_addrlen, host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
  56 +
  57 + return std::string(host);
  58 +}
  59 +
  60 +void Connection::run()
  61 +{
  62 + while (true)
  63 + {
  64 + switch (m_state)
  65 + {
  66 + case State::RESOLVING:
  67 + resolve();
  68 + break;
  69 +
  70 + case State::CONNECTING:
  71 + if (!connect_to_any())
  72 + {
  73 + m_state = State::BACKOFF;
  74 + }
  75 + break;
  76 +
  77 + case State::BACKOFF:
  78 + LOG_WARNING(this, "Connection failed; will retry in NNN seconds");
  79 +
  80 + // TODO: use the configuration
  81 + std::this_thread::sleep_for(std::chrono::seconds(5));
  82 +
  83 + m_state = State::RESOLVING;
  84 + break;
  85 +
  86 + case State::CONNECTED:
  87 + {
  88 + char buf[9000];
  89 + ssize_t res = recv(m_socket, buf, sizeof(buf), 0);
  90 +
  91 + if (res == 0)
  92 + {
  93 + LOG_WARNING(this, "Connection closed by peer");
  94 + m_state = State::BACKOFF;
  95 + m_connection_change_callback(false);
  96 + }
  97 + else if (res < 0)
  98 + {
  99 + LOG_WARNING(this, "Connection read error: " + std::string(strerror(errno)));
  100 + m_state = State::BACKOFF;
  101 + m_connection_change_callback(false);
  102 + }
  103 + else
  104 + {
  105 + LOG_TRACE(this, "Received " + std::to_string(res) + " bytes");
  106 + }
  107 +
  108 + break;
  109 + }
  110 + }
  111 + }
  112 +}
  113 +
  114 +void Connection::resolve()
  115 +{
  116 + m_address_current = 0;
  117 + m_socket = INVALID_SOCKET;
  118 + m_addresses.clear();
  119 +
  120 + addrinfo hints;
  121 + memset(&hints, 0, sizeof(hints));
  122 + hints.ai_family = AF_UNSPEC; // Request IPv4 and IPv6.
  123 + hints.ai_socktype = SOCK_STREAM;
  124 + hints.ai_flags = AI_ADDRCONFIG;
  125 +
  126 + // Request the OS to resolve the hostname into an IP address.
  127 + // We do this even if the hostname is already an IP address, as that
  128 + // makes for far easier code.
  129 + int error = getaddrinfo(m_host.c_str(), std::to_string(m_port).c_str(), &hints, &m_host_resolved);
  130 + if (error != 0)
  131 + {
  132 + m_error_callback(TrueMQTT::Client::Error::HOSTNAME_LOOKUP_FAILED, std::string(gai_strerror(error)));
  133 + return;
  134 + }
  135 +
  136 + // Split the list of addresses in two lists, one for IPv4 and one for
  137 + // IPv6.
  138 + std::deque<addrinfo *> addresses_ipv4;
  139 + std::deque<addrinfo *> addresses_ipv6;
  140 + for (addrinfo *ai = this->m_host_resolved; ai != nullptr; ai = ai->ai_next)
  141 + {
  142 + if (ai->ai_family == AF_INET6)
  143 + {
  144 + addresses_ipv6.emplace_back(ai);
  145 + }
  146 + else if (ai->ai_family == AF_INET)
  147 + {
  148 + addresses_ipv4.emplace_back(ai);
  149 + }
  150 + // Sometimes there can also be other types of families, but we are
  151 + // not interested in those results.
  152 + }
  153 +
  154 + // Interweave the IPv6 and IPv4 addresses. For connections we apply
  155 + // "Happy Eyeballs", where we try an IPv6 connection first, and if that
  156 + // doesn't connect after 100ms, we try an IPv4 connection.
  157 + // This is to prevent long timeouts when IPv6 is not available, but
  158 + // still prefer IPv6 where possible.
  159 + while (!addresses_ipv6.empty() || !addresses_ipv4.empty())
  160 + {
  161 + if (!addresses_ipv6.empty())
  162 + {
  163 + m_addresses.emplace_back(addresses_ipv6.front());
  164 + addresses_ipv6.pop_front();
  165 + }
  166 + if (!addresses_ipv4.empty())
  167 + {
  168 + m_addresses.emplace_back(addresses_ipv4.front());
  169 + addresses_ipv4.pop_front();
  170 + }
  171 + }
  172 +
  173 +#if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_DEBUG
  174 + // For debugging, print the addresses we resolved into.
  175 + if (this->log_level >= TrueMQTT::Client::LogLevel::DEBUG)
  176 + {
  177 + LOG_DEBUG(this, "Resolved hostname '" + m_host + "' to:");
  178 + for (addrinfo *res : m_addresses)
  179 + {
  180 + LOG_DEBUG(this, "- " + addrinfo_to_string(res));
  181 + }
  182 + }
  183 +#endif
  184 +
  185 + // In some odd cases, the list can be empty. This is a fatal error.
  186 + if (m_addresses.empty())
  187 + {
  188 + m_error_callback(TrueMQTT::Client::Error::HOSTNAME_LOOKUP_FAILED, "");
  189 + return;
  190 + }
  191 +
  192 + m_state = State::CONNECTING;
  193 +}
  194 +
  195 +bool Connection::connect_to_any()
  196 +{
  197 + // Check if we have pending attempts. If not, queue a new attempt.
  198 + if (m_sockets.empty())
  199 + {
  200 + return try_next_address();
  201 + }
  202 +
  203 + // Check for at most 100ms if there is any activity on the sockets.
  204 + timeval timeout;
  205 + timeout.tv_sec = 0;
  206 + timeout.tv_usec = 100;
  207 +
  208 + fd_set write_fds;
  209 + FD_ZERO(&write_fds);
  210 + for (const auto &socket : m_sockets)
  211 + {
  212 + FD_SET(socket, &write_fds);
  213 + }
  214 +
  215 + int result = select(FD_SETSIZE, NULL, &write_fds, NULL, &timeout);
  216 +
  217 + // Check if there was an error on select(). This is hard to recover from.
  218 + if (result < 0)
  219 + {
  220 + LOG_ERROR(this, "select() failed: " + std::string(strerror(errno)));
  221 + return true;
  222 + }
  223 +
  224 + // A result of zero means there was no activity on any of the sockets.
  225 + if (result == 0)
  226 + {
  227 + // Check if it was more than 250ms ago since we started our last connection.
  228 + if (std::chrono::steady_clock::now() < m_last_attempt + std::chrono::milliseconds(250))
  229 + {
  230 + return true;
  231 + }
  232 +
  233 + // Try to queue the next address for a connection.
  234 + if (try_next_address())
  235 + {
  236 + return true;
  237 + }
  238 +
  239 + // Check if it is more than the timeout ago since we last tried a connection.
  240 + // TODO -- Used to configured value
  241 + if (std::chrono::steady_clock::now() < m_last_attempt + std::chrono::seconds(10))
  242 + {
  243 + return true;
  244 + }
  245 +
  246 + LOG_ERROR(this, "Connection attempt to broker timed out");
  247 +
  248 + // Cleanup all sockets.
  249 + for (const auto &socket : m_sockets)
  250 + {
  251 + closesocket(socket);
  252 + }
  253 + m_socket_to_address.clear();
  254 + m_sockets.clear();
  255 +
  256 + return false;
  257 + }
  258 +
  259 + // A socket that reports to be writeable is either connected or in error-state.
  260 + // Remove all sockets that are in error-state. The first that is left and writeable,
  261 + // will be the socket to use for the connection.
  262 + SOCKET socket_connected = INVALID_SOCKET;
  263 + for (auto socket_it = m_sockets.begin(); socket_it != m_sockets.end(); /* nothing */)
  264 + {
  265 + // Check if the socket is in error-state.
  266 + int err;
  267 + socklen_t len = sizeof(err);
  268 + getsockopt(*socket_it, SOL_SOCKET, SO_ERROR, (char *)&err, &len);
  269 + if (err != 0)
  270 + {
  271 + // It is in error-state: report about it, and remove it.
  272 + LOG_ERROR(this, "Could not connect to " + addrinfo_to_string(m_socket_to_address[*socket_it]) + ": " + std::string(strerror(err)));
  273 + closesocket(*socket_it);
  274 + m_socket_to_address.erase(*socket_it);
  275 + socket_it = m_sockets.erase(socket_it);
  276 + continue;
  277 + }
  278 +
  279 + if (socket_connected == INVALID_SOCKET && FD_ISSET(*socket_it, &write_fds))
  280 + {
  281 + socket_connected = *socket_it;
  282 + }
  283 +
  284 + socket_it++;
  285 + }
  286 +
  287 + if (socket_connected == INVALID_SOCKET)
  288 + {
  289 + // No socket is connected yet. Continue waiting.
  290 + return true;
  291 + }
  292 +
  293 + // We have a connected socket.
  294 + LOG_DEBUG(this, "Connected to " + addrinfo_to_string(m_socket_to_address[socket_connected]));
  295 +
  296 + // Close all other pending connections.
  297 + for (const auto &socket : m_sockets)
  298 + {
  299 + if (socket != socket_connected)
  300 + {
  301 + closesocket(socket);
  302 + }
  303 + }
  304 + m_socket_to_address.clear();
  305 + m_sockets.clear();
  306 +
  307 + // Disable non-blocking, as we will be reading from a thread, which can be blocking.
  308 + int nonblocking = 0;
  309 + if (ioctl(socket_connected, FIONBIO, &nonblocking) != 0)
  310 + {
  311 + LOG_WARNING(this, "Could not set socket to non-blocking; expect performance impact");
  312 + }
  313 +
  314 + m_socket = socket_connected;
  315 + m_state = State::CONNECTED;
  316 + m_connection_change_callback(true);
  317 + return true;
  318 +}
  319 +
  320 +bool Connection::try_next_address()
  321 +{
  322 + if (m_address_current >= m_addresses.size())
  323 + {
  324 + return false;
  325 + }
  326 +
  327 + m_last_attempt = std::chrono::steady_clock::now();
  328 + connect(m_addresses[m_address_current++]);
  329 +
  330 + return true;
  331 +}
  332 +
  333 +void Connection::connect(addrinfo *address)
  334 +{
  335 + // Create a new socket based on the resolved information.
  336 + SOCKET sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
  337 + if (sock == INVALID_SOCKET)
  338 + {
  339 + LOG_ERROR(this, "Could not create new socket");
  340 + return;
  341 + }
  342 +
  343 + // Set socket to no-delay; this improves latency, but reduces throughput.
  344 + int flags = 1;
  345 + /* The (const char*) cast is needed for Windows */
  346 + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags)) != 0)
  347 + {
  348 + LOG_WARNING(this, "Could not set TCP_NODELAY on socket");
  349 + }
  350 + // Set socket to non-blocking; this allows for multiple connects to be pending. This is
  351 + // needed to apply Happy Eyeballs.
  352 + int nonblocking = 1;
  353 + if (ioctl(sock, FIONBIO, &nonblocking) != 0)
  354 + {
  355 + LOG_WARNING(this, "Could not set socket to non-blocking; expect performance impact");
  356 + }
  357 +
  358 + // Start the actual connection attempt.
  359 + LOG_DEBUG(this, "Connecting to " + addrinfo_to_string(address));
  360 + int err = ::connect(sock, address->ai_addr, (int)address->ai_addrlen);
  361 + if (err != 0 && errno != EINPROGRESS)
  362 + {
  363 + // As we are non-blocking, normally this returns "in progress". If anything
  364 + // else, something is wrong. Report the error and close the socket.
  365 + closesocket(sock);
  366 +
  367 + LOG_ERROR(this, "Could not connect to " + addrinfo_to_string(address) + ": " + std::string(strerror(errno)));
  368 + return;
  369 + }
  370 +
  371 + // Connection is pending.
  372 + m_socket_to_address[sock] = address;
  373 + m_sockets.push_back(sock);
  374 +}
  375 +
  376 +void TrueMQTT::Client::Impl::connect()
  377 +{
  378 + this->connection = std::make_unique<Connection>(
  379 + this->log_level, this->logger, this->error_callback, [this](bool connected)
  380 + { this->connectionStateChange(connected); },
  381 + this->host, this->port);
  382 +}
  383 +
  384 +void TrueMQTT::Client::Impl::disconnect()
  385 +{
  386 + this->subscriptions.clear();
  387 + this->publish_queue.clear();
  388 +
  389 + this->connection.reset();
  390 +}
... ...
src/Connection.h 0 โ†’ 100644
  1 +/*
  2 + * Copyright (c) TrueBrain
  3 + *
  4 + * This source code is licensed under the MIT license found in the
  5 + * LICENSE file in the root directory of this source tree.
  6 + */
  7 +
  8 +#pragma once
  9 +
  10 +#include "TrueMQTT.h"
  11 +
  12 +#include <string>
  13 +#include <map>
  14 +#include <netdb.h>
  15 +#include <thread>
  16 +#include <vector>
  17 +
  18 +// Some definitions to make future cross-platform work easier.
  19 +#define SOCKET int
  20 +#define INVALID_SOCKET -1
  21 +#define closesocket close
  22 +
  23 +class Connection
  24 +{
  25 +public:
  26 + Connection(TrueMQTT::Client::LogLevel log_level,
  27 + const std::function<void(TrueMQTT::Client::LogLevel, std::string)> logger,
  28 + const std::function<void(TrueMQTT::Client::Error, std::string)> error_callback,
  29 + const std::function<void(bool)> connection_change_callback,
  30 + const std::string &host,
  31 + int port);
  32 + ~Connection();
  33 +
  34 +private:
  35 + void run();
  36 + void resolve();
  37 + bool try_next_address();
  38 + void connect(addrinfo *address);
  39 + bool connect_to_any();
  40 + std::string addrinfo_to_string(addrinfo *address);
  41 +
  42 + enum class State
  43 + {
  44 + RESOLVING,
  45 + CONNECTING,
  46 + CONNECTED,
  47 + BACKOFF,
  48 + };
  49 +
  50 + TrueMQTT::Client::LogLevel log_level;
  51 + const std::function<void(TrueMQTT::Client::LogLevel, std::string)> logger;
  52 +
  53 + const std::function<void(TrueMQTT::Client::Error, std::string)> m_error_callback;
  54 + const std::function<void(bool)> m_connection_change_callback;
  55 +
  56 + const std::string &m_host; ///< The hostname or IP address to connect to.
  57 + int m_port; ///< The port to connect to.
  58 +
  59 + State m_state = State::RESOLVING;
  60 + std::thread m_thread; ///< Current thread used to run this connection.
  61 +
  62 + addrinfo *m_host_resolved = nullptr; ///< Address info of the hostname, once looked up.
  63 + std::vector<addrinfo *> m_addresses = {}; ///< List of addresses to try to connect to.
  64 + size_t m_address_current = 0; ///< Index of the address we are currently trying to connect to.
  65 + std::chrono::steady_clock::time_point m_last_attempt = {}; ///< Time of the last attempt to connect to the current address.
  66 + std::vector<SOCKET> m_sockets = {}; ///< List of sockets we are currently trying to connect to.
  67 + std::map<SOCKET, addrinfo *> m_socket_to_address = {}; ///< Map of sockets to the address they are trying to connect to.
  68 + SOCKET m_socket = INVALID_SOCKET; ///< The socket we are currently connected with, or INVALID_SOCKET if not connected.
  69 +};
... ...
src/Log.h
... ... @@ -8,7 +8,7 @@
8 8 #pragma once
9 9  
10 10 // Wrappers to make logging a tiny bit easier to read.
11   -// The "obj" is the Client::Impl instance to point to.
  11 +// The "obj" is the TrueMQTT::Client::Impl instance to point to.
12 12  
13 13 #define LOGGER_LEVEL_ERROR 0
14 14 #define LOGGER_LEVEL_WARNING 1
... ... @@ -22,50 +22,50 @@
22 22 #endif
23 23  
24 24 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_ERROR
25   -#define LOG_ERROR(obj, x) \
26   - if (obj->log_level >= Client::LogLevel::ERROR) \
27   - { \
28   - obj->logger(Client::LogLevel::ERROR, x); \
  25 +#define LOG_ERROR(obj, x) \
  26 + if (obj->log_level >= TrueMQTT::Client::LogLevel::ERROR) \
  27 + { \
  28 + obj->logger(TrueMQTT::Client::LogLevel::ERROR, x); \
29 29 }
30 30 #else
31 31 #define LOG_ERROR(obj, x)
32 32 #endif
33 33  
34 34 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_WARNING
35   -#define LOG_WARNING(obj, x) \
36   - if (obj->log_level >= Client::LogLevel::WARNING) \
37   - { \
38   - obj->logger(Client::LogLevel::WARNING, x); \
  35 +#define LOG_WARNING(obj, x) \
  36 + if (obj->log_level >= TrueMQTT::Client::LogLevel::WARNING) \
  37 + { \
  38 + obj->logger(TrueMQTT::Client::LogLevel::WARNING, x); \
39 39 }
40 40 #else
41 41 #define LOG_WARNING(obj, x)
42 42 #endif
43 43  
44 44 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_INFO
45   -#define LOG_INFO(obj, x) \
46   - if (obj->log_level >= Client::LogLevel::INFO) \
47   - { \
48   - obj->logger(Client::LogLevel::INFO, x); \
  45 +#define LOG_INFO(obj, x) \
  46 + if (obj->log_level >= TrueMQTT::Client::LogLevel::INFO) \
  47 + { \
  48 + obj->logger(TrueMQTT::Client::LogLevel::INFO, x); \
49 49 }
50 50 #else
51 51 #define LOG_INFO(obj, x)
52 52 #endif
53 53  
54 54 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_DEBUG
55   -#define LOG_DEBUG(obj, x) \
56   - if (obj->log_level >= Client::LogLevel::DEBUG) \
57   - { \
58   - obj->logger(Client::LogLevel::DEBUG, x); \
  55 +#define LOG_DEBUG(obj, x) \
  56 + if (obj->log_level >= TrueMQTT::Client::LogLevel::DEBUG) \
  57 + { \
  58 + obj->logger(TrueMQTT::Client::LogLevel::DEBUG, x); \
59 59 }
60 60 #else
61 61 #define LOG_DEBUG(obj, x)
62 62 #endif
63 63  
64 64 #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_TRACE
65   -#define LOG_TRACE(obj, x) \
66   - if (obj->log_level >= Client::LogLevel::TRACE) \
67   - { \
68   - obj->logger(Client::LogLevel::TRACE, x); \
  65 +#define LOG_TRACE(obj, x) \
  66 + if (obj->log_level >= TrueMQTT::Client::LogLevel::TRACE) \
  67 + { \
  68 + obj->logger(TrueMQTT::Client::LogLevel::TRACE, x); \
69 69 }
70 70 #else
71 71 #define LOG_TRACE(obj, x)
... ...