diff --git a/include/TrueMQTT.h b/include/TrueMQTT.h index 5b43970..1dafc30 100644 --- a/include/TrueMQTT.h +++ b/include/TrueMQTT.h @@ -208,6 +208,8 @@ namespace TrueMQTT * @param message The message to publish. * @param retain Whether to retain the message on the broker. * + * @return True iff the publish request is either queued or sent. + * * @note All messages are always published under QoS 0, and this library supports no * other QoS level. * @note This call is non-blocking, and it is not possible to know whether the message @@ -217,8 +219,11 @@ namespace TrueMQTT * @note This function can stall for a short moment if you publish just at the * moment the connection to the broker is established, and there are messages in the * publish queue and/or subscriptions. + * @note If the return value is false, but there is a connection with the broker, + * this means the sndbuf of the socket is full. It is up to the caller to consider + * what to do in this case. */ - void publish(const std::string &topic, const std::string &message, bool retain) const; + bool publish(const std::string &topic, const std::string &message, bool retain) const; /** * @brief Subscribe to a topic, and call the callback function when a message arrives. diff --git a/src/Client.cpp b/src/Client.cpp index 30dcb70..c7064f2 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -131,7 +131,7 @@ void TrueMQTT::Client::disconnect() const m_impl->disconnect(); } -void TrueMQTT::Client::publish(const std::string &topic, const std::string &message, bool retain) const +bool TrueMQTT::Client::publish(const std::string &topic, const std::string &message, bool retain) const { std::scoped_lock lock(m_impl->m_state_mutex); @@ -141,14 +141,14 @@ void TrueMQTT::Client::publish(const std::string &topic, const std::string &mess { case Client::Impl::State::DISCONNECTED: LOG_ERROR(m_impl, "Cannot publish when disconnected"); - return; + return false; case Client::Impl::State::CONNECTING: - m_impl->toPublishQueue(topic, message, retain); - return; + return m_impl->toPublishQueue(topic, message, retain); case Client::Impl::State::CONNECTED: - m_impl->sendPublish(topic, message, retain); - return; + return m_impl->sendPublish(topic, message, retain); } + + return false; } void TrueMQTT::Client::subscribe(const std::string &topic, const std::function &callback) const @@ -180,7 +180,12 @@ void TrueMQTT::Client::subscribe(const std::string &topic, const std::functionm_subscription_topics.insert(topic); if (m_impl->m_state == Client::Impl::State::CONNECTED) { - m_impl->sendSubscribe(topic); + if (!m_impl->sendSubscribe(topic)) + { + LOG_ERROR(m_impl, "Failed to send subscribe message. Closing connection to broker and trying again"); + m_impl->disconnect(); + m_impl->connect(); + } } } @@ -238,7 +243,12 @@ void TrueMQTT::Client::unsubscribe(const std::string &topic) const m_impl->m_subscription_topics.erase(topic); if (m_impl->m_state == Client::Impl::State::CONNECTED) { - m_impl->sendUnsubscribe(topic); + if (!m_impl->sendUnsubscribe(topic)) + { + LOG_ERROR(m_impl, "Failed to send subscribe message. Closing connection to broker and trying again"); + m_impl->disconnect(); + m_impl->connect(); + } } } @@ -263,12 +273,22 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected) // First restore any subscription. for (auto &subscription : m_subscription_topics) { - sendSubscribe(subscription); + if (!sendSubscribe(subscription)) + { + LOG_ERROR(this, "Failed to send subscribe message. Closing connection to broker and trying again"); + disconnect(); + connect(); + return; + } } // Flush the publish queue. for (const auto &[topic, message, retain] : m_publish_queue) { - sendPublish(topic, message, retain); + if (!sendPublish(topic, message, retain)) + { + LOG_ERROR(this, "Failed to send queued publish message. Discarding rest of queue"); + break; + } } m_publish_queue.clear(); } @@ -279,19 +299,19 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected) } } -void TrueMQTT::Client::Impl::toPublishQueue(const std::string &topic, const std::string &message, bool retain) +bool TrueMQTT::Client::Impl::toPublishQueue(const std::string &topic, const std::string &message, bool retain) { if (m_state != Client::Impl::State::CONNECTING) { LOG_ERROR(this, "Cannot queue publish message when not connecting"); - return; + return false; } switch (m_publish_queue_type) { case Client::PublishQueueType::DROP: LOG_WARNING(this, "Publish queue is disabled, dropping message"); - return; + return false; case Client::PublishQueueType::FIFO: if (m_publish_queue.size() >= m_publish_queue_size) { @@ -310,6 +330,7 @@ void TrueMQTT::Client::Impl::toPublishQueue(const std::string &topic, const std: LOG_TRACE(this, "Adding message to publish queue"); m_publish_queue.emplace_back(topic, message, retain); + return true; } void TrueMQTT::Client::Impl::findSubscriptionMatch(std::vector> &matching_callbacks, const std::map &subscriptions, std::deque &parts) diff --git a/src/ClientImpl.h b/src/ClientImpl.h index d1059bb..cb2c6a5 100644 --- a/src/ClientImpl.h +++ b/src/ClientImpl.h @@ -47,11 +47,11 @@ public: void connect(); ///< Connect to the broker. void disconnect(); ///< Disconnect from the broker. - void sendPublish(const std::string &topic, const std::string &message, bool retain); ///< Send a publish message to the broker. - void sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker. - void sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker. + bool sendPublish(const std::string &topic, const std::string &message, bool retain); ///< Send a publish message to the broker. + bool sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker. + bool sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker. void connectionStateChange(bool connected); ///< Called when a connection goes from CONNECTING state to CONNECTED state or visa versa. - void toPublishQueue(const std::string &topic, const std::string &message, bool retain); ///< Add a publish message to the publish queue. + bool toPublishQueue(const std::string &topic, const std::string &message, bool retain); ///< Add a publish message to the publish queue. void messageReceived(std::string topic, std::string message); ///< Called when a message is received from the broker. void findSubscriptionMatch(std::vector> &callbacks, const std::map &subscriptions, std::deque &parts); ///< Recursive function to find any matching subscription based on parts. diff --git a/src/Connection.cpp b/src/Connection.cpp index 78ce045..dbe7383 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -91,17 +91,16 @@ void TrueMQTT::Client::Impl::Connection::run() { break; } - if (m_socket != INVALID_SOCKET) - { - closesocket(m_socket); - m_socket = INVALID_SOCKET; - } - m_state = State::BACKOFF; - m_impl.connectionStateChange(false); + socketError(); } break; } + case State::SOCKET_ERROR: + m_state = State::BACKOFF; + m_impl.connectionStateChange(false); + break; + case State::STOP: if (m_socket != INVALID_SOCKET) { @@ -113,6 +112,16 @@ void TrueMQTT::Client::Impl::Connection::run() } } +void TrueMQTT::Client::Impl::Connection::socketError() +{ + m_state = State::SOCKET_ERROR; + if (m_socket != INVALID_SOCKET) + { + closesocket(m_socket); + m_socket = INVALID_SOCKET; + } +} + void TrueMQTT::Client::Impl::Connection::resolve() { m_address_current = 0; @@ -321,22 +330,24 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny() m_socket_to_address.clear(); m_sockets.clear(); - // Disable non-blocking, as we will be reading from a thread, which can be blocking. - int nonblocking = 0; - if (ioctl(socket_connected, FIONBIO, &nonblocking) != 0) - { - LOG_WARNING(&m_impl, "Could not set socket to non-blocking; expect performance impact"); - } - - m_backoff = m_impl.m_connection_backoff; m_socket = socket_connected; // Only change the state if no disconnect() has been requested in the mean time. if (m_state != State::STOP) { m_state = State::AUTHENTICATING; - sendConnect(); + if (!sendConnect()) + { + // We couldn't send the connect packet. That is unusual, so disconnect, and retry. + LOG_ERROR(&m_impl, "Could not send first packet to broker. Disconnecting."); + closesocket(m_socket); + m_socket = INVALID_SOCKET; + return false; + } } + + m_backoff = m_impl.m_connection_backoff; + return true; } diff --git a/src/Connection.h b/src/Connection.h index 70c8767..c4e5e4a 100644 --- a/src/Connection.h +++ b/src/Connection.h @@ -29,7 +29,8 @@ public: Connection(TrueMQTT::Client::Impl &impl); ~Connection(); - void send(Packet &packet) const; + bool send(Packet &packet) const; + void socketError(); private: // Implemented in Connection.cpp @@ -43,7 +44,7 @@ private: // Implemented in Packet.cpp ssize_t recv(char *buffer, size_t length) const; bool recvLoop(); - void sendConnect(); + bool sendConnect(); enum class State { @@ -52,6 +53,7 @@ private: AUTHENTICATING, CONNECTED, BACKOFF, + SOCKET_ERROR, STOP, }; diff --git a/src/Packet.cpp b/src/Packet.cpp index 3c8ea12..25c31c8 100644 --- a/src/Packet.cpp +++ b/src/Packet.cpp @@ -123,33 +123,39 @@ public: ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) const { - // We idle-check every 100ms if we are requested to stop, as otherwise - // this thread will block till the server disconnects us. - while (m_state != State::STOP) + // We idle-check every 10ms if we are requested to stop or if there was + // an error. This is to prevent the recv() call from blocking forever. + while (true) { // Check if there is any data available on the socket. fd_set read_fds; FD_ZERO(&read_fds); FD_SET(m_socket, &read_fds); - timeval timeout = {0, 100}; + timeval timeout = {0, 10}; size_t ret = select(m_socket + 1, &read_fds, nullptr, nullptr, &timeout); + if (m_state == State::SOCKET_ERROR || m_state == State::STOP) + { +#if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_TRACE + if (m_state == State::STOP) + { + LOG_TRACE(&m_impl, "Closing connection as STOP has been requested"); + } +#endif + return -1; + } + if (ret == 0) { continue; } break; } - if (m_state == State::STOP) - { - LOG_TRACE(&m_impl, "Closing connection as STOP has been requested"); - return -1; - } ssize_t res = ::recv(m_socket, buffer, length, 0); if (res == 0) { - LOG_INFO(&m_impl, "Connection closed by peer"); + LOG_WARNING(&m_impl, "Connection closed by broker"); return -1; } if (res < 0) @@ -322,8 +328,20 @@ bool TrueMQTT::Client::Impl::Connection::recvLoop() return true; } -void TrueMQTT::Client::Impl::Connection::send(Packet &packet) const +bool TrueMQTT::Client::Impl::Connection::send(Packet &packet) const { + if (m_state != State::AUTHENTICATING && m_state != State::CONNECTED) + { + // This happens in the small window the connection thread hasn't + // spotted yet the connection is closed, while this function closed + // the socket earlier due to the broker closing the connection. + // Basically, it can only be caused if the broker actively closes + // the connection due to a write while publishing a lot of data + // quickly. + LOG_DEBUG(&m_impl, "Attempted to send packet while not connected"); + return false; + } + LOG_TRACE(&m_impl, "Sending packet of type " + std::string(magic_enum::enum_name(packet.m_packet_type)) + " with flags " + std::to_string(packet.m_flags) + " and length " + std::to_string(packet.m_buffer.size())); std::vector buffer; @@ -347,17 +365,43 @@ void TrueMQTT::Client::Impl::Connection::send(Packet &packet) const // Write header and packet. if (::send(m_socket, (char *)buffer.data(), buffer.size(), MSG_NOSIGNAL) < 0) { + if (errno == EAGAIN) + { + // sndbuf is full, so we hand it back to the sender to deal with this. + return false; + } + if (errno == ECONNRESET || errno == EPIPE) + { + LOG_ERROR(&m_impl, "Connection closed by broker"); + m_impl.m_connection->socketError(); + return false; + } + LOG_WARNING(&m_impl, "Connection write error: " + std::string(strerror(errno))); - return; + return false; } if (::send(m_socket, (char *)packet.m_buffer.data(), packet.m_buffer.size(), MSG_NOSIGNAL) < 0) { + if (errno == EAGAIN) + { + // sndbuf is full, so we hand it back to the sender to deal with this. + return false; + } + if (errno == ECONNRESET || errno == EPIPE) + { + LOG_ERROR(&m_impl, "Connection closed by broker"); + m_impl.m_connection->socketError(); + return false; + } + LOG_WARNING(&m_impl, "Connection write error: " + std::string(strerror(errno))); - return; + return false; } + + return true; } -void TrueMQTT::Client::Impl::Connection::sendConnect() +bool TrueMQTT::Client::Impl::Connection::sendConnect() { LOG_TRACE(&m_impl, "Sending CONNECT packet"); @@ -391,10 +435,10 @@ void TrueMQTT::Client::Impl::Connection::sendConnect() packet.write_string(m_impl.m_last_will_message); } - send(packet); + return send(packet); } -void TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain) +bool TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain) { LOG_TRACE(this, "Sending PUBLISH packet to topic '" + topic + "': " + message + " (" + (retain ? "retained" : "not retained") + ")"); @@ -408,10 +452,10 @@ void TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::st packet.write_string(topic); packet.write(message.c_str(), message.size()); - m_connection->send(packet); + return m_connection->send(packet); } -void TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic) +bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic) { LOG_TRACE(this, "Sending SUBSCRIBE packet for topic '" + topic + "'"); @@ -427,10 +471,10 @@ void TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic) packet.write_string(topic); packet.write_uint8(0); // QoS - m_connection->send(packet); + return m_connection->send(packet); } -void TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &topic) +bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &topic) { LOG_TRACE(this, "Sending unsubscribe message for topic '" + topic + "'"); @@ -445,5 +489,5 @@ void TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &topic) packet.write_uint16(m_packet_id++); packet.write_string(topic); - m_connection->send(packet); + return m_connection->send(packet); }