From 1f70ca5de647292a16373cfcde39b8c616262e53 Mon Sep 17 00:00:00 2001 From: Patric Stout Date: Sun, 25 Sep 2022 11:45:02 +0200 Subject: [PATCH] feat(send): deligate sending of packets to its own thread --- include/TrueMQTT.h | 28 ++++++++++++++++++++++++++-- src/Client.cpp | 13 +++++++++++++ src/ClientImpl.h | 2 ++ src/Connection.cpp | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- src/Connection.h | 21 ++++++++++++++++----- src/Packet.cpp | 192 ++++++++++++++++++++++++++++++++++++++---------------------------------------------------------------------------------------------------------------------------------------------------------- src/Packet.h | 126 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 290 insertions(+), 165 deletions(-) create mode 100644 src/Packet.h diff --git a/include/TrueMQTT.h b/include/TrueMQTT.h index 1dafc30..98e2b23 100644 --- a/include/TrueMQTT.h +++ b/include/TrueMQTT.h @@ -134,6 +134,8 @@ namespace TrueMQTT /** * @brief Set the logger callback and level. * + * By default no logger is set. + * * @param log_level The \ref LogLevel to use for logging. * @param logger The callback to call when a log message is generated. * @@ -145,6 +147,8 @@ namespace TrueMQTT /** * @brief Set the last will message on the connection. * + * By default no last will is set. + * * @param topic The topic to publish the last will message to. * @param message The message of the last will message. * @param retain Whether to retain the last will message. @@ -163,6 +167,8 @@ namespace TrueMQTT /** * @brief Set the publish queue to use. * + * The default is DROP. + * * @param queue_type The \ref PublishQueueType to use for the publish queue. * @param size The size of the queue. If the queue is full, the type of queue defines what happens. * @@ -171,6 +177,23 @@ namespace TrueMQTT void setPublishQueue(PublishQueueType queue_type, size_t size) const; /** + * @brief Set the size of the send queue. + * + * The send queue is used to transfer MQTT packets from the main thread to the + * network thread. This queue is used to prevent the main thread from blocking + * when sending a lot of data. + * + * Setting the queue too big will cause the memory usage to increase, while + * setting it too small will cause functions like \ref publish to return false, + * as the queue is full. + * + * The default is 1000. + * + * @param size Size of the send queue. + */ + void setSendQueue(size_t size) const; + + /** * @brief Connect to the broker. * * After calling this function, the library will try a connection to the broker. @@ -220,8 +243,9 @@ namespace TrueMQTT * moment the connection to the broker is established, and there are messages in the * publish queue and/or subscriptions. * @note If the return value is false, but there is a connection with the broker, - * this means the sndbuf of the socket is full. It is up to the caller to consider - * what to do in this case. + * this means the send queue is full. It is up to the caller to consider what to do + * in this case, but it is wise to back off for a while before sending something + * again. */ bool publish(const std::string &topic, const std::string &message, bool retain) const; diff --git a/src/Client.cpp b/src/Client.cpp index c7064f2..4d1f183 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -100,6 +100,19 @@ void TrueMQTT::Client::setPublishQueue(Client::PublishQueueType queue_type, size m_impl->m_publish_queue_size = size; } +void TrueMQTT::Client::setSendQueue(size_t size) const +{ + if (m_impl->m_state != Client::Impl::State::DISCONNECTED) + { + LOG_ERROR(m_impl, "Cannot set send queue when not disconnected"); + return; + } + + LOG_TRACE(m_impl, "Setting send queue to size " + std::to_string(size)); + + m_impl->m_send_queue_size = size; +} + void TrueMQTT::Client::connect() const { std::scoped_lock lock(m_impl->m_state_mutex); diff --git a/src/ClientImpl.h b/src/ClientImpl.h index cb2c6a5..53c78bf 100644 --- a/src/ClientImpl.h +++ b/src/ClientImpl.h @@ -80,6 +80,8 @@ public: size_t m_publish_queue_size = -1; ///< Size of the publish queue. std::deque> m_publish_queue; ///< Queue of publish messages to send to the broker. + size_t m_send_queue_size = 1000; ///< Size of the send queue. + std::set m_subscription_topics; ///< Flat list of topics the client is subscribed to. std::map m_subscriptions; ///< Tree of active subscriptions build up from the parts on the topic. diff --git a/src/Connection.cpp b/src/Connection.cpp index dbe7383..f2845ef 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -18,7 +18,8 @@ TrueMQTT::Client::Impl::Connection::Connection(Client::Impl &impl) : m_impl(impl), - m_thread(&Connection::run, this), + m_thread_read(&Connection::runRead, this), + m_thread_write(&Connection::runWrite, this), m_backoff(impl.m_connection_backoff) { } @@ -26,11 +27,16 @@ TrueMQTT::Client::Impl::Connection::Connection(Client::Impl &impl) TrueMQTT::Client::Impl::Connection::~Connection() { m_state = State::STOP; + m_send_queue_cv.notify_one(); // Make sure the connection thread is terminated. - if (m_thread.joinable()) + if (m_thread_read.joinable()) { - m_thread.join(); + m_thread_read.join(); + } + if (m_thread_write.joinable()) + { + m_thread_write.join(); } // freeaddrinfo() is one of those functions that doesn't take kind to NULL pointers @@ -50,7 +56,7 @@ std::string TrueMQTT::Client::Impl::Connection::addrinfoToString(const addrinfo return std::string(host); } -void TrueMQTT::Client::Impl::Connection::run() +void TrueMQTT::Client::Impl::Connection::runRead() { while (true) { @@ -112,6 +118,58 @@ void TrueMQTT::Client::Impl::Connection::run() } } +std::optional TrueMQTT::Client::Impl::Connection::popSendQueueBlocking() +{ + std::unique_lock lock(m_send_queue_mutex); + if (!m_send_queue.empty()) + { + auto packet = m_send_queue.front(); + m_send_queue.pop_front(); + return packet; + } + + m_send_queue_cv.wait(lock, [this] + { return !m_send_queue.empty() || m_state == State::STOP; }); + + if (m_state == State::STOP) + { + return {}; + } + + Packet packet = m_send_queue.front(); + m_send_queue.pop_front(); + return packet; +} + +void TrueMQTT::Client::Impl::Connection::runWrite() +{ + while (true) + { + switch (m_state) + { + case State::AUTHENTICATING: + case State::CONNECTED: + { + auto packet = popSendQueueBlocking(); + if (!packet) + { + break; + } + sendPacket(packet.value()); + break; + } + + case State::STOP: + return; + + default: + // Sleep for a bit to avoid hogging the CPU. + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + break; + } + } +} + void TrueMQTT::Client::Impl::Connection::socketError() { m_state = State::SOCKET_ERROR; @@ -330,6 +388,13 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny() m_socket_to_address.clear(); m_sockets.clear(); + // Disable non-blocking, as we will be reading/writing from a thread, which can be blocking. + int nonblocking = 0; + if (ioctl(socket_connected, FIONBIO, &nonblocking) != 0) + { + LOG_WARNING(&m_impl, "Could not set socket to non-blocking; expect performance impact"); + } + m_socket = socket_connected; // Only change the state if no disconnect() has been requested in the mean time. diff --git a/src/Connection.h b/src/Connection.h index c4e5e4a..a500a84 100644 --- a/src/Connection.h +++ b/src/Connection.h @@ -8,10 +8,15 @@ #pragma once #include "ClientImpl.h" +#include "Packet.h" #include +#include +#include +#include #include #include +#include #include #include #include @@ -21,30 +26,31 @@ #define INVALID_SOCKET -1 #define closesocket close -class Packet; - class TrueMQTT::Client::Impl::Connection { public: Connection(TrueMQTT::Client::Impl &impl); ~Connection(); - bool send(Packet &packet) const; + bool send(Packet packet); void socketError(); private: // Implemented in Connection.cpp - void run(); + void runRead(); + void runWrite(); void resolve(); bool tryNextAddress(); void connect(addrinfo *address); bool connectToAny(); std::string addrinfoToString(const addrinfo *address) const; + std::optional popSendQueueBlocking(); // Implemented in Packet.cpp ssize_t recv(char *buffer, size_t length) const; bool recvLoop(); bool sendConnect(); + void sendPacket(Packet &packet) const; enum class State { @@ -60,7 +66,8 @@ private: TrueMQTT::Client::Impl &m_impl; State m_state = State::RESOLVING; ///< Current state of the connection. - std::thread m_thread; ///< Current thread used to run this connection. + std::thread m_thread_read; ///< Current read thread used to run this connection. + std::thread m_thread_write; ///< Current write thread used to run this connection. std::chrono::milliseconds m_backoff; ///< Current backoff time. @@ -74,4 +81,8 @@ private: std::map m_socket_to_address = {}; ///< Map of sockets to the address they are trying to connect to. SOCKET m_socket = INVALID_SOCKET; ///< The socket we are currently connected with, or INVALID_SOCKET if not connected. + + std::deque m_send_queue = {}; ///< Queue of packets to send to the broker. + std::mutex m_send_queue_mutex; ///< Mutex to protect the send queue. + std::condition_variable m_send_queue_cv; ///< Condition variable to wake up the write thread when the send queue is not empty. }; diff --git a/src/Packet.cpp b/src/Packet.cpp index 8dcf058..121ea7d 100644 --- a/src/Packet.cpp +++ b/src/Packet.cpp @@ -8,125 +8,12 @@ #include "ClientImpl.h" #include "Connection.h" #include "Log.h" +#include "Packet.h" #include "magic_enum.hpp" #include -class Packet -{ -public: - enum class PacketType - { - CONNECT = 1, - CONNACK = 2, - PUBLISH = 3, - PUBACK = 4, - PUBREC = 5, - PUBREL = 6, - PUBCOMP = 7, - SUBSCRIBE = 8, - SUBACK = 9, - UNSUBSCRIBE = 10, - UNSUBACK = 11, - PINGREQ = 12, - PINGRESP = 13, - DISCONNECT = 14, - }; - - Packet(PacketType packet_type, uint8_t flags) - : m_packet_type(packet_type), - m_flags(flags) - { - // Reserve space for the header. - m_buffer.push_back(0); // Packet type and flags. - m_buffer.push_back(0); // Remaining length (at most 4 bytes). - m_buffer.push_back(0); - m_buffer.push_back(0); - m_buffer.push_back(0); - } - - Packet(PacketType packet_type, uint8_t flags, std::vector data) - : m_buffer(std::move(data)), - m_packet_type(packet_type), - m_flags(flags) - { - } - - void write_uint8(uint8_t value) - { - m_buffer.push_back(value); - } - - void write_uint16(uint16_t value) - { - m_buffer.push_back(value >> 8); - m_buffer.push_back(value & 0xFF); - } - - void write(const char *data, size_t length) - { - m_buffer.insert(m_buffer.end(), data, data + length); - } - - void write_string(const std::string &str) - { - write_uint16(static_cast(str.size())); - write(str.c_str(), str.size()); - } - - bool read_uint8(uint8_t &value) - { - if (m_buffer.size() < m_read_offset + 1) - { - return false; - } - value = m_buffer[m_read_offset++]; - return true; - } - - bool read_uint16(uint16_t &value) - { - if (m_buffer.size() < m_read_offset + 2) - { - return false; - } - value = m_buffer[m_read_offset++] << 8; - value |= m_buffer[m_read_offset++]; - return true; - } - - bool read_string(std::string &str) - { - uint16_t length; - if (!read_uint16(length)) - { - return false; - } - if (m_buffer.size() < m_read_offset + length) - { - return false; - } - const char *data = reinterpret_cast(m_buffer.data()) + m_read_offset; - str.assign(data, length); - m_read_offset += length; - return true; - } - - void read_remaining(std::string &str) - { - const char *data = reinterpret_cast(m_buffer.data()) + m_read_offset; - str.assign(data, m_buffer.size() - m_read_offset); - m_read_offset = m_buffer.size(); - } - - std::vector m_buffer; - size_t m_read_offset = 0; - - PacketType m_packet_type; - uint8_t m_flags; -}; - ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) const { // We idle-check every 10ms if we are requested to stop or if there was @@ -334,28 +221,20 @@ bool TrueMQTT::Client::Impl::Connection::recvLoop() return true; } -bool TrueMQTT::Client::Impl::Connection::send(Packet &packet) const +bool TrueMQTT::Client::Impl::Connection::send(Packet packet) { - if (m_state != State::AUTHENTICATING && m_state != State::CONNECTED) + // Push back if the internal queue gets too big. + if (m_send_queue.size() > m_impl.m_send_queue_size) { - // This happens in the small window the connection thread hasn't - // spotted yet the connection is closed, while this function closed - // the socket earlier due to the broker closing the connection. - // Basically, it can only be caused if the broker actively closes - // the connection due to a write while publishing a lot of data - // quickly. - LOG_DEBUG(&m_impl, "Attempted to send packet while not connected"); return false; } - LOG_TRACE(&m_impl, "Sending packet of type " + std::string(magic_enum::enum_name(packet.m_packet_type)) + " with flags " + std::to_string(packet.m_flags) + " and length " + std::to_string(packet.m_buffer.size())); - // Calculate where in the header we need to start writing, to create // a contiguous buffer. The buffer size is including the header, but // the length should be without. Hence the minus five. size_t length = packet.m_buffer.size() - 5; size_t offset = length <= 127 ? 3 : (length <= 16383 ? 2 : (length <= 2097151 ? 1 : 0)); - size_t bufferOffset = offset; + packet.m_write_offset = offset; // Set the header. packet.m_buffer[offset++] = (static_cast(packet.m_packet_type) << 4) | packet.m_flags; @@ -372,40 +251,45 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet &packet) const packet.m_buffer[offset++] = byte; } while (length > 0); - ssize_t res = ::send(m_socket, (char *)packet.m_buffer.data() + bufferOffset, packet.m_buffer.size() - bufferOffset, MSG_NOSIGNAL); - // If the first packet is rejected in full, return this to the caller. - if (res < 0) + // Add the packet to the queue. { - if (errno == EAGAIN) - { - // sndbuf is full, so we hand it back to the sender to deal with this. - return false; - } + std::scoped_lock lock(m_send_queue_mutex); + m_send_queue.push_back(std::move(packet)); + } + // Notify the write thread that there is a new packet. + m_send_queue_cv.notify_one(); - LOG_ERROR(&m_impl, "Connection write error: " + std::string(strerror(errno))); - m_impl.m_connection->socketError(); - return false; + return true; +} + +void TrueMQTT::Client::Impl::Connection::sendPacket(Packet &packet) const +{ + if (m_state != State::AUTHENTICATING && m_state != State::CONNECTED) + { + // This happens in the small window the connection thread hasn't + // spotted yet the connection is closed, while this function closed + // the socket earlier due to the broker closing the connection. + // Basically, it can only be caused if the broker actively closes + // the connection due to a write while publishing a lot of data + // quickly. + LOG_DEBUG(&m_impl, "Attempted to send packet while not connected"); + return; } - // If we still have data to send for this packet, keep trying to send the data till we succeed. - bufferOffset += res; - while (bufferOffset < packet.m_buffer.size()) + + LOG_TRACE(&m_impl, "Sending packet of type " + std::string(magic_enum::enum_name(packet.m_packet_type)) + " with flags " + std::to_string(packet.m_flags) + " and length " + std::to_string(packet.m_buffer.size() - 5)); + + // Send the packet to the broker. + while (packet.m_write_offset < packet.m_buffer.size()) { - res = ::send(m_socket, (char *)packet.m_buffer.data() + bufferOffset, packet.m_buffer.size() - bufferOffset, MSG_NOSIGNAL); + ssize_t res = ::send(m_socket, (char *)packet.m_buffer.data() + packet.m_write_offset, packet.m_buffer.size() - packet.m_write_offset, MSG_NOSIGNAL); if (res < 0) { - if (errno == EAGAIN) - { - continue; - } - LOG_ERROR(&m_impl, "Connection write error: " + std::string(strerror(errno))); m_impl.m_connection->socketError(); - return false; + return; } - bufferOffset += res; + packet.m_write_offset += res; } - - return true; } bool TrueMQTT::Client::Impl::Connection::sendConnect() @@ -442,7 +326,7 @@ bool TrueMQTT::Client::Impl::Connection::sendConnect() packet.write_string(m_impl.m_last_will_message); } - return send(packet); + return send(std::move(packet)); } bool TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain) @@ -459,7 +343,7 @@ bool TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::st packet.write_string(topic); packet.write(message.c_str(), message.size()); - return m_connection->send(packet); + return m_connection->send(std::move(packet)); } bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic) @@ -478,7 +362,7 @@ bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic) packet.write_string(topic); packet.write_uint8(0); // QoS - return m_connection->send(packet); + return m_connection->send(std::move(packet)); } bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &topic) @@ -496,5 +380,5 @@ bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &topic) packet.write_uint16(m_packet_id++); packet.write_string(topic); - return m_connection->send(packet); + return m_connection->send(std::move(packet)); } diff --git a/src/Packet.h b/src/Packet.h new file mode 100644 index 0000000..4a47604 --- /dev/null +++ b/src/Packet.h @@ -0,0 +1,126 @@ +/* + * Copyright (c) TrueBrain + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include + +class Packet +{ +public: + enum class PacketType + { + CONNECT = 1, + CONNACK = 2, + PUBLISH = 3, + PUBACK = 4, + PUBREC = 5, + PUBREL = 6, + PUBCOMP = 7, + SUBSCRIBE = 8, + SUBACK = 9, + UNSUBSCRIBE = 10, + UNSUBACK = 11, + PINGREQ = 12, + PINGRESP = 13, + DISCONNECT = 14, + }; + + Packet(PacketType packet_type, uint8_t flags) + : m_packet_type(packet_type), + m_flags(flags) + { + // Reserve space for the header. + m_buffer.push_back(0); // Packet type and flags. + m_buffer.push_back(0); // Remaining length (at most 4 bytes). + m_buffer.push_back(0); + m_buffer.push_back(0); + m_buffer.push_back(0); + } + + Packet(PacketType packet_type, uint8_t flags, std::vector data) + : m_buffer(std::move(data)), + m_packet_type(packet_type), + m_flags(flags) + { + } + + void write_uint8(uint8_t value) + { + m_buffer.push_back(value); + } + + void write_uint16(uint16_t value) + { + m_buffer.push_back(value >> 8); + m_buffer.push_back(value & 0xFF); + } + + void write(const char *data, size_t length) + { + m_buffer.insert(m_buffer.end(), data, data + length); + } + + void write_string(const std::string &str) + { + write_uint16(static_cast(str.size())); + write(str.c_str(), str.size()); + } + + bool read_uint8(uint8_t &value) + { + if (m_buffer.size() < m_read_offset + 1) + { + return false; + } + value = m_buffer[m_read_offset++]; + return true; + } + + bool read_uint16(uint16_t &value) + { + if (m_buffer.size() < m_read_offset + 2) + { + return false; + } + value = m_buffer[m_read_offset++] << 8; + value |= m_buffer[m_read_offset++]; + return true; + } + + bool read_string(std::string &str) + { + uint16_t length; + if (!read_uint16(length)) + { + return false; + } + if (m_buffer.size() < m_read_offset + length) + { + return false; + } + const char *data = reinterpret_cast(m_buffer.data()) + m_read_offset; + str.assign(data, length); + m_read_offset += length; + return true; + } + + void read_remaining(std::string &str) + { + const char *data = reinterpret_cast(m_buffer.data()) + m_read_offset; + str.assign(data, m_buffer.size() - m_read_offset); + m_read_offset = m_buffer.size(); + } + + std::vector m_buffer; + size_t m_read_offset = 0; + size_t m_write_offset = 0; + + PacketType m_packet_type; + uint8_t m_flags; +}; -- libgit2 0.21.4