Commit 81f19eca4dcadb52f6d261f62dbb87c5691e8a9c

Authored by Patric Stout
1 parent d502382e

fix(connection): implement configurable connection timeout/backoff

include/TrueMQTT.h
@@ -7,6 +7,7 @@ @@ -7,6 +7,7 @@
7 7
8 #pragma once 8 #pragma once
9 9
  10 +#include <chrono>
10 #include <functional> 11 #include <functional>
11 #include <memory> 12 #include <memory>
12 13
@@ -111,10 +112,17 @@ namespace TrueMQTT @@ -111,10 +112,17 @@ namespace TrueMQTT
111 * @param port Port of the MQTT broker. 112 * @param port Port of the MQTT broker.
112 * @param client_id Client ID to use when connecting to the broker. 113 * @param client_id Client ID to use when connecting to the broker.
113 * @param connection_timeout Timeout in seconds for the connection to the broker. 114 * @param connection_timeout Timeout in seconds for the connection to the broker.
  115 + * @param connection_backoff Backoff time when connection to the broker failed. This is doubled every time a connection fails, up till \ref connection_backoff_max.
114 * @param connection_backoff_max Maximum time between backoff attempts in seconds. 116 * @param connection_backoff_max Maximum time between backoff attempts in seconds.
115 * @param keep_alive_interval Interval in seconds between keep-alive messages. 117 * @param keep_alive_interval Interval in seconds between keep-alive messages.
116 */ 118 */
117 - Client(const std::string &host, int port, const std::string &client_id, int connection_timeout = 5, int connection_backoff_max = 30, int keep_alive_interval = 30); 119 + Client(const std::string &host,
  120 + int port,
  121 + const std::string &client_id,
  122 + std::chrono::milliseconds connection_timeout = std::chrono::milliseconds(5000),
  123 + std::chrono::milliseconds connection_backoff = std::chrono::milliseconds(1000),
  124 + std::chrono::milliseconds connection_backoff_max = std::chrono::milliseconds(30000),
  125 + std::chrono::milliseconds keep_alive_interval = std::chrono::milliseconds(30000));
118 126
119 /** 127 /**
120 * @brief Destructor of the MQTT client. 128 * @brief Destructor of the MQTT client.
src/Client.cpp
@@ -13,9 +13,15 @@ @@ -13,9 +13,15 @@
13 13
14 #include <sstream> 14 #include <sstream>
15 15
16 -TrueMQTT::Client::Client(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) 16 +TrueMQTT::Client::Client(const std::string &host,
  17 + int port,
  18 + const std::string &client_id,
  19 + std::chrono::milliseconds connection_timeout,
  20 + std::chrono::milliseconds connection_backoff,
  21 + std::chrono::milliseconds connection_backoff_max,
  22 + std::chrono::milliseconds keep_alive_interval)
17 { 23 {
18 - this->m_impl = std::make_unique<Client::Impl>(host, port, client_id, connection_timeout, connection_backoff_max, keep_alive_interval); 24 + this->m_impl = std::make_unique<Client::Impl>(host, port, client_id, connection_timeout, connection_backoff, connection_backoff_max, keep_alive_interval);
19 25
20 LOG_TRACE(this->m_impl, "Constructor of client called"); 26 LOG_TRACE(this->m_impl, "Constructor of client called");
21 } 27 }
@@ -27,11 +33,18 @@ TrueMQTT::Client::~Client() @@ -27,11 +33,18 @@ TrueMQTT::Client::~Client()
27 this->disconnect(); 33 this->disconnect();
28 } 34 }
29 35
30 -TrueMQTT::Client::Impl::Impl(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) 36 +TrueMQTT::Client::Impl::Impl(const std::string &host,
  37 + int port,
  38 + const std::string &client_id,
  39 + std::chrono::milliseconds connection_timeout,
  40 + std::chrono::milliseconds connection_backoff,
  41 + std::chrono::milliseconds connection_backoff_max,
  42 + std::chrono::milliseconds keep_alive_interval)
31 : host(host), 43 : host(host),
32 port(port), 44 port(port),
33 client_id(client_id), 45 client_id(client_id),
34 connection_timeout(connection_timeout), 46 connection_timeout(connection_timeout),
  47 + connection_backoff(connection_backoff),
35 connection_backoff_max(connection_backoff_max), 48 connection_backoff_max(connection_backoff_max),
36 keep_alive_interval(keep_alive_interval) 49 keep_alive_interval(keep_alive_interval)
37 { 50 {
src/ClientImpl.h
@@ -9,6 +9,7 @@ @@ -9,6 +9,7 @@
9 9
10 #include "TrueMQTT.h" 10 #include "TrueMQTT.h"
11 11
  12 +#include <chrono>
12 #include <deque> 13 #include <deque>
13 #include <map> 14 #include <map>
14 #include <mutex> 15 #include <mutex>
@@ -21,7 +22,13 @@ @@ -21,7 +22,13 @@
21 class TrueMQTT::Client::Impl 22 class TrueMQTT::Client::Impl
22 { 23 {
23 public: 24 public:
24 - Impl(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval); 25 + Impl(const std::string &host,
  26 + int port,
  27 + const std::string &client_id,
  28 + std::chrono::milliseconds connection_timeout,
  29 + std::chrono::milliseconds connection_backoff,
  30 + std::chrono::milliseconds connection_backoff_max,
  31 + std::chrono::milliseconds keep_alive_interval);
25 ~Impl(); 32 ~Impl();
26 33
27 enum State 34 enum State
@@ -52,12 +59,13 @@ public: @@ -52,12 +59,13 @@ public:
52 State state = State::DISCONNECTED; ///< The current state of the client. 59 State state = State::DISCONNECTED; ///< The current state of the client.
53 std::mutex state_mutex; ///< Mutex to protect state changes. 60 std::mutex state_mutex; ///< Mutex to protect state changes.
54 61
55 - std::string host; ///< Host of the broker.  
56 - int port; ///< Port of the broker.  
57 - std::string client_id; ///< Client ID to use when connecting to the broker.  
58 - int connection_timeout; ///< Timeout in seconds for the connection to the broker.  
59 - int connection_backoff_max; ///< Maximum time between backoff attempts in seconds.  
60 - int keep_alive_interval; ///< Interval in seconds between keep-alive messages. 62 + std::string host; ///< Host of the broker.
  63 + int port; ///< Port of the broker.
  64 + std::string client_id; ///< Client ID to use when connecting to the broker.
  65 + std::chrono::milliseconds connection_timeout; ///< Timeout in seconds for the connection to the broker.
  66 + std::chrono::milliseconds connection_backoff; ///< Backoff time when connection to the broker failed. This is doubled every time a connection fails, up till \ref connection_backoff_max.
  67 + std::chrono::milliseconds connection_backoff_max; ///< Maximum time between backoff attempts in seconds.
  68 + std::chrono::milliseconds keep_alive_interval; ///< Interval in seconds between keep-alive messages.
61 69
62 Client::LogLevel log_level = Client::LogLevel::NONE; ///< The log level to use. 70 Client::LogLevel log_level = Client::LogLevel::NONE; ///< The log level to use.
63 std::function<void(Client::LogLevel, std::string)> logger = [](Client::LogLevel, std::string) { /* empty */ }; ///< Logger callback. 71 std::function<void(Client::LogLevel, std::string)> logger = [](Client::LogLevel, std::string) { /* empty */ }; ///< Logger callback.
src/Connection.cpp
@@ -18,7 +18,8 @@ @@ -18,7 +18,8 @@
18 18
19 TrueMQTT::Client::Impl::Connection::Connection(Client::Impl &impl) 19 TrueMQTT::Client::Impl::Connection::Connection(Client::Impl &impl)
20 : m_impl(impl), 20 : m_impl(impl),
21 - m_thread(&Connection::run, this) 21 + m_thread(&Connection::run, this),
  22 + m_backoff(impl.connection_backoff)
22 { 23 {
23 } 24 }
24 25
@@ -67,10 +68,16 @@ void TrueMQTT::Client::Impl::Connection::run() @@ -67,10 +68,16 @@ void TrueMQTT::Client::Impl::Connection::run()
67 break; 68 break;
68 69
69 case State::BACKOFF: 70 case State::BACKOFF:
70 - LOG_WARNING(&m_impl, "Connection failed; will retry in NNN seconds"); 71 + LOG_WARNING(&m_impl, "Connection failed; will retry in " + std::to_string(m_backoff.count()) + "ms");
71 72
72 - // TODO: use the configuration  
73 - std::this_thread::sleep_for(std::chrono::seconds(5)); 73 + std::this_thread::sleep_for(m_backoff);
  74 +
  75 + // Calculate the next backoff time, slowly reducing how often we retry.
  76 + m_backoff *= 2;
  77 + if (m_backoff > m_impl.connection_backoff_max)
  78 + {
  79 + m_backoff = m_impl.connection_backoff_max;
  80 + }
74 81
75 m_state = State::RESOLVING; 82 m_state = State::RESOLVING;
76 break; 83 break;
@@ -96,6 +103,11 @@ void TrueMQTT::Client::Impl::Connection::run() @@ -96,6 +103,11 @@ void TrueMQTT::Client::Impl::Connection::run()
96 } 103 }
97 104
98 case State::STOP: 105 case State::STOP:
  106 + if (m_socket != INVALID_SOCKET)
  107 + {
  108 + closesocket(m_socket);
  109 + m_socket = INVALID_SOCKET;
  110 + }
99 return; 111 return;
100 } 112 }
101 } 113 }
@@ -243,8 +255,7 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny() @@ -243,8 +255,7 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny()
243 } 255 }
244 256
245 // Check if it is more than the timeout ago since we last tried a connection. 257 // Check if it is more than the timeout ago since we last tried a connection.
246 - // TODO -- Used to configured value  
247 - if (std::chrono::steady_clock::now() < m_last_attempt + std::chrono::seconds(10)) 258 + if (std::chrono::steady_clock::now() < m_last_attempt + m_impl.connection_timeout)
248 { 259 {
249 return true; 260 return true;
250 } 261 }
@@ -317,11 +328,13 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny() @@ -317,11 +328,13 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny()
317 LOG_WARNING(&m_impl, "Could not set socket to non-blocking; expect performance impact"); 328 LOG_WARNING(&m_impl, "Could not set socket to non-blocking; expect performance impact");
318 } 329 }
319 330
  331 + m_backoff = m_impl.connection_backoff;
  332 + m_socket = socket_connected;
  333 +
320 // Only change the state if no disconnect() has been requested in the mean time. 334 // Only change the state if no disconnect() has been requested in the mean time.
321 if (m_state != State::STOP) 335 if (m_state != State::STOP)
322 { 336 {
323 m_state = State::AUTHENTICATING; 337 m_state = State::AUTHENTICATING;
324 - m_socket = socket_connected;  
325 sendConnect(); 338 sendConnect();
326 } 339 }
327 return true; 340 return true;
src/Connection.h
@@ -9,6 +9,7 @@ @@ -9,6 +9,7 @@
9 9
10 #include "ClientImpl.h" 10 #include "ClientImpl.h"
11 11
  12 +#include <chrono>
12 #include <string> 13 #include <string>
13 #include <map> 14 #include <map>
14 #include <netdb.h> 15 #include <netdb.h>
@@ -56,14 +57,19 @@ private: @@ -56,14 +57,19 @@ private:
56 57
57 TrueMQTT::Client::Impl &m_impl; 58 TrueMQTT::Client::Impl &m_impl;
58 59
59 - State m_state = State::RESOLVING;  
60 - std::thread m_thread; ///< Current thread used to run this connection. 60 + State m_state = State::RESOLVING; ///< Current state of the connection.
  61 + std::thread m_thread; ///< Current thread used to run this connection.
  62 +
  63 + std::chrono::milliseconds m_backoff; ///< Current backoff time.
  64 +
  65 + addrinfo *m_host_resolved = nullptr; ///< Address info of the hostname, once looked up.
  66 + std::vector<addrinfo *> m_addresses = {}; ///< List of addresses to try to connect to.
61 67
62 - addrinfo *m_host_resolved = nullptr; ///< Address info of the hostname, once looked up.  
63 - std::vector<addrinfo *> m_addresses = {}; ///< List of addresses to try to connect to.  
64 size_t m_address_current = 0; ///< Index of the address we are currently trying to connect to. 68 size_t m_address_current = 0; ///< Index of the address we are currently trying to connect to.
65 std::chrono::steady_clock::time_point m_last_attempt = {}; ///< Time of the last attempt to connect to the current address. 69 std::chrono::steady_clock::time_point m_last_attempt = {}; ///< Time of the last attempt to connect to the current address.
66 - std::vector<SOCKET> m_sockets = {}; ///< List of sockets we are currently trying to connect to.  
67 - std::map<SOCKET, addrinfo *> m_socket_to_address = {}; ///< Map of sockets to the address they are trying to connect to.  
68 - SOCKET m_socket = INVALID_SOCKET; ///< The socket we are currently connected with, or INVALID_SOCKET if not connected. 70 +
  71 + std::vector<SOCKET> m_sockets = {}; ///< List of sockets we are currently trying to connect to.
  72 + std::map<SOCKET, addrinfo *> m_socket_to_address = {}; ///< Map of sockets to the address they are trying to connect to.
  73 +
  74 + SOCKET m_socket = INVALID_SOCKET; ///< The socket we are currently connected with, or INVALID_SOCKET if not connected.
69 }; 75 };