From d5d8534062b76364f621ef49f790450997d7e6f4 Mon Sep 17 00:00:00 2001 From: Patric Stout Date: Sun, 25 Sep 2022 10:31:28 +0200 Subject: [PATCH] fix(packet): use a single buffer for a packet --- src/Packet.cpp | 59 +++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/src/Packet.cpp b/src/Packet.cpp index 25c31c8..8dcf058 100644 --- a/src/Packet.cpp +++ b/src/Packet.cpp @@ -38,6 +38,12 @@ public: : m_packet_type(packet_type), m_flags(flags) { + // Reserve space for the header. + m_buffer.push_back(0); // Packet type and flags. + m_buffer.push_back(0); // Remaining length (at most 4 bytes). + m_buffer.push_back(0); + m_buffer.push_back(0); + m_buffer.push_back(0); } Packet(PacketType packet_type, uint8_t flags, std::vector data) @@ -344,13 +350,17 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet &packet) const LOG_TRACE(&m_impl, "Sending packet of type " + std::string(magic_enum::enum_name(packet.m_packet_type)) + " with flags " + std::to_string(packet.m_flags) + " and length " + std::to_string(packet.m_buffer.size())); - std::vector buffer; + // Calculate where in the header we need to start writing, to create + // a contiguous buffer. The buffer size is including the header, but + // the length should be without. Hence the minus five. + size_t length = packet.m_buffer.size() - 5; + size_t offset = length <= 127 ? 3 : (length <= 16383 ? 2 : (length <= 2097151 ? 1 : 0)); + size_t bufferOffset = offset; - // Create the header. - buffer.push_back((static_cast(packet.m_packet_type) << 4) | packet.m_flags); + // Set the header. + packet.m_buffer[offset++] = (static_cast(packet.m_packet_type) << 4) | packet.m_flags; - // Calculate the length buffer. - size_t length = packet.m_buffer.size(); + // Set the remaining length. do { uint8_t byte = length & 0x7F; @@ -359,43 +369,40 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet &packet) const { byte |= 0x80; } - buffer.push_back(byte); + packet.m_buffer[offset++] = byte; } while (length > 0); - // Write header and packet. - if (::send(m_socket, (char *)buffer.data(), buffer.size(), MSG_NOSIGNAL) < 0) + ssize_t res = ::send(m_socket, (char *)packet.m_buffer.data() + bufferOffset, packet.m_buffer.size() - bufferOffset, MSG_NOSIGNAL); + // If the first packet is rejected in full, return this to the caller. + if (res < 0) { if (errno == EAGAIN) { // sndbuf is full, so we hand it back to the sender to deal with this. return false; } - if (errno == ECONNRESET || errno == EPIPE) - { - LOG_ERROR(&m_impl, "Connection closed by broker"); - m_impl.m_connection->socketError(); - return false; - } - LOG_WARNING(&m_impl, "Connection write error: " + std::string(strerror(errno))); + LOG_ERROR(&m_impl, "Connection write error: " + std::string(strerror(errno))); + m_impl.m_connection->socketError(); return false; } - if (::send(m_socket, (char *)packet.m_buffer.data(), packet.m_buffer.size(), MSG_NOSIGNAL) < 0) + // If we still have data to send for this packet, keep trying to send the data till we succeed. + bufferOffset += res; + while (bufferOffset < packet.m_buffer.size()) { - if (errno == EAGAIN) + res = ::send(m_socket, (char *)packet.m_buffer.data() + bufferOffset, packet.m_buffer.size() - bufferOffset, MSG_NOSIGNAL); + if (res < 0) { - // sndbuf is full, so we hand it back to the sender to deal with this. - return false; - } - if (errno == ECONNRESET || errno == EPIPE) - { - LOG_ERROR(&m_impl, "Connection closed by broker"); + if (errno == EAGAIN) + { + continue; + } + + LOG_ERROR(&m_impl, "Connection write error: " + std::string(strerror(errno))); m_impl.m_connection->socketError(); return false; } - - LOG_WARNING(&m_impl, "Connection write error: " + std::string(strerror(errno))); - return false; + bufferOffset += res; } return true; -- libgit2 0.21.4