Commit 26e28ae0cb2ef11b8e80f548850ef1a138c7d5e1

Authored by Patric Stout
1 parent 03833229

fix(connection): handle full socket sndbuf and failing send() graceful

send() no longer is blocking, and all sendNNN calls now return
false if the call couldn't be executed.

Additionally, the library now recovers much better from issues,
like unexpected broker disconnects.
include/TrueMQTT.h
... ... @@ -208,6 +208,8 @@ namespace TrueMQTT
208 208 * @param message The message to publish.
209 209 * @param retain Whether to retain the message on the broker.
210 210 *
  211 + * @return True iff the publish request is either queued or sent.
  212 + *
211 213 * @note All messages are always published under QoS 0, and this library supports no
212 214 * other QoS level.
213 215 * @note This call is non-blocking, and it is not possible to know whether the message
... ... @@ -217,8 +219,11 @@ namespace TrueMQTT
217 219 * @note This function can stall for a short moment if you publish just at the
218 220 * moment the connection to the broker is established, and there are messages in the
219 221 * publish queue and/or subscriptions.
  222 + * @note If the return value is false, but there is a connection with the broker,
  223 + * this means the sndbuf of the socket is full. It is up to the caller to consider
  224 + * what to do in this case.
220 225 */
221   - void publish(const std::string &topic, const std::string &message, bool retain) const;
  226 + bool publish(const std::string &topic, const std::string &message, bool retain) const;
222 227  
223 228 /**
224 229 * @brief Subscribe to a topic, and call the callback function when a message arrives.
... ...
src/Client.cpp
... ... @@ -131,7 +131,7 @@ void TrueMQTT::Client::disconnect() const
131 131 m_impl->disconnect();
132 132 }
133 133  
134   -void TrueMQTT::Client::publish(const std::string &topic, const std::string &message, bool retain) const
  134 +bool TrueMQTT::Client::publish(const std::string &topic, const std::string &message, bool retain) const
135 135 {
136 136 std::scoped_lock lock(m_impl->m_state_mutex);
137 137  
... ... @@ -141,14 +141,14 @@ void TrueMQTT::Client::publish(const std::string &topic, const std::string &mess
141 141 {
142 142 case Client::Impl::State::DISCONNECTED:
143 143 LOG_ERROR(m_impl, "Cannot publish when disconnected");
144   - return;
  144 + return false;
145 145 case Client::Impl::State::CONNECTING:
146   - m_impl->toPublishQueue(topic, message, retain);
147   - return;
  146 + return m_impl->toPublishQueue(topic, message, retain);
148 147 case Client::Impl::State::CONNECTED:
149   - m_impl->sendPublish(topic, message, retain);
150   - return;
  148 + return m_impl->sendPublish(topic, message, retain);
151 149 }
  150 +
  151 + return false;
152 152 }
153 153  
154 154 void TrueMQTT::Client::subscribe(const std::string &topic, const std::function<void(std::string, std::string)> &callback) const
... ... @@ -180,7 +180,12 @@ void TrueMQTT::Client::subscribe(const std::string &amp;topic, const std::function&lt;v
180 180 m_impl->m_subscription_topics.insert(topic);
181 181 if (m_impl->m_state == Client::Impl::State::CONNECTED)
182 182 {
183   - m_impl->sendSubscribe(topic);
  183 + if (!m_impl->sendSubscribe(topic))
  184 + {
  185 + LOG_ERROR(m_impl, "Failed to send subscribe message. Closing connection to broker and trying again");
  186 + m_impl->disconnect();
  187 + m_impl->connect();
  188 + }
184 189 }
185 190 }
186 191  
... ... @@ -238,7 +243,12 @@ void TrueMQTT::Client::unsubscribe(const std::string &amp;topic) const
238 243 m_impl->m_subscription_topics.erase(topic);
239 244 if (m_impl->m_state == Client::Impl::State::CONNECTED)
240 245 {
241   - m_impl->sendUnsubscribe(topic);
  246 + if (!m_impl->sendUnsubscribe(topic))
  247 + {
  248 + LOG_ERROR(m_impl, "Failed to send subscribe message. Closing connection to broker and trying again");
  249 + m_impl->disconnect();
  250 + m_impl->connect();
  251 + }
242 252 }
243 253 }
244 254  
... ... @@ -263,12 +273,22 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected)
263 273 // First restore any subscription.
264 274 for (auto &subscription : m_subscription_topics)
265 275 {
266   - sendSubscribe(subscription);
  276 + if (!sendSubscribe(subscription))
  277 + {
  278 + LOG_ERROR(this, "Failed to send subscribe message. Closing connection to broker and trying again");
  279 + disconnect();
  280 + connect();
  281 + return;
  282 + }
267 283 }
268 284 // Flush the publish queue.
269 285 for (const auto &[topic, message, retain] : m_publish_queue)
270 286 {
271   - sendPublish(topic, message, retain);
  287 + if (!sendPublish(topic, message, retain))
  288 + {
  289 + LOG_ERROR(this, "Failed to send queued publish message. Discarding rest of queue");
  290 + break;
  291 + }
272 292 }
273 293 m_publish_queue.clear();
274 294 }
... ... @@ -279,19 +299,19 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected)
279 299 }
280 300 }
281 301  
282   -void TrueMQTT::Client::Impl::toPublishQueue(const std::string &topic, const std::string &message, bool retain)
  302 +bool TrueMQTT::Client::Impl::toPublishQueue(const std::string &topic, const std::string &message, bool retain)
283 303 {
284 304 if (m_state != Client::Impl::State::CONNECTING)
285 305 {
286 306 LOG_ERROR(this, "Cannot queue publish message when not connecting");
287   - return;
  307 + return false;
288 308 }
289 309  
290 310 switch (m_publish_queue_type)
291 311 {
292 312 case Client::PublishQueueType::DROP:
293 313 LOG_WARNING(this, "Publish queue is disabled, dropping message");
294   - return;
  314 + return false;
295 315 case Client::PublishQueueType::FIFO:
296 316 if (m_publish_queue.size() >= m_publish_queue_size)
297 317 {
... ... @@ -310,6 +330,7 @@ void TrueMQTT::Client::Impl::toPublishQueue(const std::string &amp;topic, const std:
310 330  
311 331 LOG_TRACE(this, "Adding message to publish queue");
312 332 m_publish_queue.emplace_back(topic, message, retain);
  333 + return true;
313 334 }
314 335  
315 336 void TrueMQTT::Client::Impl::findSubscriptionMatch(std::vector<std::function<void(std::string, std::string)>> &matching_callbacks, const std::map<std::string, Client::Impl::SubscriptionPart> &subscriptions, std::deque<std::string> &parts)
... ...
src/ClientImpl.h
... ... @@ -47,11 +47,11 @@ public:
47 47  
48 48 void connect(); ///< Connect to the broker.
49 49 void disconnect(); ///< Disconnect from the broker.
50   - void sendPublish(const std::string &topic, const std::string &message, bool retain); ///< Send a publish message to the broker.
51   - void sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker.
52   - void sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker.
  50 + bool sendPublish(const std::string &topic, const std::string &message, bool retain); ///< Send a publish message to the broker.
  51 + bool sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker.
  52 + bool sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker.
53 53 void connectionStateChange(bool connected); ///< Called when a connection goes from CONNECTING state to CONNECTED state or visa versa.
54   - void toPublishQueue(const std::string &topic, const std::string &message, bool retain); ///< Add a publish message to the publish queue.
  54 + bool toPublishQueue(const std::string &topic, const std::string &message, bool retain); ///< Add a publish message to the publish queue.
55 55 void messageReceived(std::string topic, std::string message); ///< Called when a message is received from the broker.
56 56  
57 57 void findSubscriptionMatch(std::vector<std::function<void(std::string, std::string)>> &callbacks, const std::map<std::string, SubscriptionPart> &subscriptions, std::deque<std::string> &parts); ///< Recursive function to find any matching subscription based on parts.
... ...
src/Connection.cpp
... ... @@ -91,17 +91,16 @@ void TrueMQTT::Client::Impl::Connection::run()
91 91 {
92 92 break;
93 93 }
94   - if (m_socket != INVALID_SOCKET)
95   - {
96   - closesocket(m_socket);
97   - m_socket = INVALID_SOCKET;
98   - }
99   - m_state = State::BACKOFF;
100   - m_impl.connectionStateChange(false);
  94 + socketError();
101 95 }
102 96 break;
103 97 }
104 98  
  99 + case State::SOCKET_ERROR:
  100 + m_state = State::BACKOFF;
  101 + m_impl.connectionStateChange(false);
  102 + break;
  103 +
105 104 case State::STOP:
106 105 if (m_socket != INVALID_SOCKET)
107 106 {
... ... @@ -113,6 +112,16 @@ void TrueMQTT::Client::Impl::Connection::run()
113 112 }
114 113 }
115 114  
  115 +void TrueMQTT::Client::Impl::Connection::socketError()
  116 +{
  117 + m_state = State::SOCKET_ERROR;
  118 + if (m_socket != INVALID_SOCKET)
  119 + {
  120 + closesocket(m_socket);
  121 + m_socket = INVALID_SOCKET;
  122 + }
  123 +}
  124 +
116 125 void TrueMQTT::Client::Impl::Connection::resolve()
117 126 {
118 127 m_address_current = 0;
... ... @@ -321,22 +330,24 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny()
321 330 m_socket_to_address.clear();
322 331 m_sockets.clear();
323 332  
324   - // Disable non-blocking, as we will be reading from a thread, which can be blocking.
325   - int nonblocking = 0;
326   - if (ioctl(socket_connected, FIONBIO, &nonblocking) != 0)
327   - {
328   - LOG_WARNING(&m_impl, "Could not set socket to non-blocking; expect performance impact");
329   - }
330   -
331   - m_backoff = m_impl.m_connection_backoff;
332 333 m_socket = socket_connected;
333 334  
334 335 // Only change the state if no disconnect() has been requested in the mean time.
335 336 if (m_state != State::STOP)
336 337 {
337 338 m_state = State::AUTHENTICATING;
338   - sendConnect();
  339 + if (!sendConnect())
  340 + {
  341 + // We couldn't send the connect packet. That is unusual, so disconnect, and retry.
  342 + LOG_ERROR(&m_impl, "Could not send first packet to broker. Disconnecting.");
  343 + closesocket(m_socket);
  344 + m_socket = INVALID_SOCKET;
  345 + return false;
  346 + }
339 347 }
  348 +
  349 + m_backoff = m_impl.m_connection_backoff;
  350 +
340 351 return true;
341 352 }
342 353  
... ...
src/Connection.h
... ... @@ -29,7 +29,8 @@ public:
29 29 Connection(TrueMQTT::Client::Impl &impl);
30 30 ~Connection();
31 31  
32   - void send(Packet &packet) const;
  32 + bool send(Packet &packet) const;
  33 + void socketError();
33 34  
34 35 private:
35 36 // Implemented in Connection.cpp
... ... @@ -43,7 +44,7 @@ private:
43 44 // Implemented in Packet.cpp
44 45 ssize_t recv(char *buffer, size_t length) const;
45 46 bool recvLoop();
46   - void sendConnect();
  47 + bool sendConnect();
47 48  
48 49 enum class State
49 50 {
... ... @@ -52,6 +53,7 @@ private:
52 53 AUTHENTICATING,
53 54 CONNECTED,
54 55 BACKOFF,
  56 + SOCKET_ERROR,
55 57 STOP,
56 58 };
57 59  
... ...
src/Packet.cpp
... ... @@ -123,33 +123,39 @@ public:
123 123  
124 124 ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) const
125 125 {
126   - // We idle-check every 100ms if we are requested to stop, as otherwise
127   - // this thread will block till the server disconnects us.
128   - while (m_state != State::STOP)
  126 + // We idle-check every 10ms if we are requested to stop or if there was
  127 + // an error. This is to prevent the recv() call from blocking forever.
  128 + while (true)
129 129 {
130 130 // Check if there is any data available on the socket.
131 131 fd_set read_fds;
132 132 FD_ZERO(&read_fds);
133 133 FD_SET(m_socket, &read_fds);
134   - timeval timeout = {0, 100};
  134 + timeval timeout = {0, 10};
135 135 size_t ret = select(m_socket + 1, &read_fds, nullptr, nullptr, &timeout);
136 136  
  137 + if (m_state == State::SOCKET_ERROR || m_state == State::STOP)
  138 + {
  139 +#if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_TRACE
  140 + if (m_state == State::STOP)
  141 + {
  142 + LOG_TRACE(&m_impl, "Closing connection as STOP has been requested");
  143 + }
  144 +#endif
  145 + return -1;
  146 + }
  147 +
137 148 if (ret == 0)
138 149 {
139 150 continue;
140 151 }
141 152 break;
142 153 }
143   - if (m_state == State::STOP)
144   - {
145   - LOG_TRACE(&m_impl, "Closing connection as STOP has been requested");
146   - return -1;
147   - }
148 154  
149 155 ssize_t res = ::recv(m_socket, buffer, length, 0);
150 156 if (res == 0)
151 157 {
152   - LOG_INFO(&m_impl, "Connection closed by peer");
  158 + LOG_WARNING(&m_impl, "Connection closed by broker");
153 159 return -1;
154 160 }
155 161 if (res < 0)
... ... @@ -322,8 +328,20 @@ bool TrueMQTT::Client::Impl::Connection::recvLoop()
322 328 return true;
323 329 }
324 330  
325   -void TrueMQTT::Client::Impl::Connection::send(Packet &packet) const
  331 +bool TrueMQTT::Client::Impl::Connection::send(Packet &packet) const
326 332 {
  333 + if (m_state != State::AUTHENTICATING && m_state != State::CONNECTED)
  334 + {
  335 + // This happens in the small window the connection thread hasn't
  336 + // spotted yet the connection is closed, while this function closed
  337 + // the socket earlier due to the broker closing the connection.
  338 + // Basically, it can only be caused if the broker actively closes
  339 + // the connection due to a write while publishing a lot of data
  340 + // quickly.
  341 + LOG_DEBUG(&m_impl, "Attempted to send packet while not connected");
  342 + return false;
  343 + }
  344 +
327 345 LOG_TRACE(&m_impl, "Sending packet of type " + std::string(magic_enum::enum_name(packet.m_packet_type)) + " with flags " + std::to_string(packet.m_flags) + " and length " + std::to_string(packet.m_buffer.size()));
328 346  
329 347 std::vector<uint8_t> buffer;
... ... @@ -347,17 +365,43 @@ void TrueMQTT::Client::Impl::Connection::send(Packet &amp;packet) const
347 365 // Write header and packet.
348 366 if (::send(m_socket, (char *)buffer.data(), buffer.size(), MSG_NOSIGNAL) < 0)
349 367 {
  368 + if (errno == EAGAIN)
  369 + {
  370 + // sndbuf is full, so we hand it back to the sender to deal with this.
  371 + return false;
  372 + }
  373 + if (errno == ECONNRESET || errno == EPIPE)
  374 + {
  375 + LOG_ERROR(&m_impl, "Connection closed by broker");
  376 + m_impl.m_connection->socketError();
  377 + return false;
  378 + }
  379 +
350 380 LOG_WARNING(&m_impl, "Connection write error: " + std::string(strerror(errno)));
351   - return;
  381 + return false;
352 382 }
353 383 if (::send(m_socket, (char *)packet.m_buffer.data(), packet.m_buffer.size(), MSG_NOSIGNAL) < 0)
354 384 {
  385 + if (errno == EAGAIN)
  386 + {
  387 + // sndbuf is full, so we hand it back to the sender to deal with this.
  388 + return false;
  389 + }
  390 + if (errno == ECONNRESET || errno == EPIPE)
  391 + {
  392 + LOG_ERROR(&m_impl, "Connection closed by broker");
  393 + m_impl.m_connection->socketError();
  394 + return false;
  395 + }
  396 +
355 397 LOG_WARNING(&m_impl, "Connection write error: " + std::string(strerror(errno)));
356   - return;
  398 + return false;
357 399 }
  400 +
  401 + return true;
358 402 }
359 403  
360   -void TrueMQTT::Client::Impl::Connection::sendConnect()
  404 +bool TrueMQTT::Client::Impl::Connection::sendConnect()
361 405 {
362 406 LOG_TRACE(&m_impl, "Sending CONNECT packet");
363 407  
... ... @@ -391,10 +435,10 @@ void TrueMQTT::Client::Impl::Connection::sendConnect()
391 435 packet.write_string(m_impl.m_last_will_message);
392 436 }
393 437  
394   - send(packet);
  438 + return send(packet);
395 439 }
396 440  
397   -void TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain)
  441 +bool TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain)
398 442 {
399 443 LOG_TRACE(this, "Sending PUBLISH packet to topic '" + topic + "': " + message + " (" + (retain ? "retained" : "not retained") + ")");
400 444  
... ... @@ -408,10 +452,10 @@ void TrueMQTT::Client::Impl::sendPublish(const std::string &amp;topic, const std::st
408 452 packet.write_string(topic);
409 453 packet.write(message.c_str(), message.size());
410 454  
411   - m_connection->send(packet);
  455 + return m_connection->send(packet);
412 456 }
413 457  
414   -void TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic)
  458 +bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic)
415 459 {
416 460 LOG_TRACE(this, "Sending SUBSCRIBE packet for topic '" + topic + "'");
417 461  
... ... @@ -427,10 +471,10 @@ void TrueMQTT::Client::Impl::sendSubscribe(const std::string &amp;topic)
427 471 packet.write_string(topic);
428 472 packet.write_uint8(0); // QoS
429 473  
430   - m_connection->send(packet);
  474 + return m_connection->send(packet);
431 475 }
432 476  
433   -void TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &topic)
  477 +bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &topic)
434 478 {
435 479 LOG_TRACE(this, "Sending unsubscribe message for topic '" + topic + "'");
436 480  
... ... @@ -445,5 +489,5 @@ void TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &amp;topic)
445 489 packet.write_uint16(m_packet_id++);
446 490 packet.write_string(topic);
447 491  
448   - m_connection->send(packet);
  492 + return m_connection->send(packet);
449 493 }
... ...