Commit d5d8534062b76364f621ef49f790450997d7e6f4

Authored by Patric Stout
1 parent 15005b6b

fix(packet): use a single buffer for a packet

Before this commit, we had one small buffer telling the packet
type and length, and another buffer with the payload. They were
send to the kernel one by one. For small packets, this is a
problem, as NODELAY causes the first buffer to be send on the
IP stack, and the payload after. This increases the IP overhead
for no good reason.

Now instead, already reserve room in the packet to write the
header, and send it as one single unit to the kernel.
Showing 1 changed file with 33 additions and 26 deletions
src/Packet.cpp
@@ -38,6 +38,12 @@ public: @@ -38,6 +38,12 @@ public:
38 : m_packet_type(packet_type), 38 : m_packet_type(packet_type),
39 m_flags(flags) 39 m_flags(flags)
40 { 40 {
  41 + // Reserve space for the header.
  42 + m_buffer.push_back(0); // Packet type and flags.
  43 + m_buffer.push_back(0); // Remaining length (at most 4 bytes).
  44 + m_buffer.push_back(0);
  45 + m_buffer.push_back(0);
  46 + m_buffer.push_back(0);
41 } 47 }
42 48
43 Packet(PacketType packet_type, uint8_t flags, std::vector<uint8_t> data) 49 Packet(PacketType packet_type, uint8_t flags, std::vector<uint8_t> data)
@@ -344,13 +350,17 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet &amp;packet) const @@ -344,13 +350,17 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet &amp;packet) const
344 350
345 LOG_TRACE(&m_impl, "Sending packet of type " + std::string(magic_enum::enum_name(packet.m_packet_type)) + " with flags " + std::to_string(packet.m_flags) + " and length " + std::to_string(packet.m_buffer.size())); 351 LOG_TRACE(&m_impl, "Sending packet of type " + std::string(magic_enum::enum_name(packet.m_packet_type)) + " with flags " + std::to_string(packet.m_flags) + " and length " + std::to_string(packet.m_buffer.size()));
346 352
347 - std::vector<uint8_t> buffer; 353 + // Calculate where in the header we need to start writing, to create
  354 + // a contiguous buffer. The buffer size is including the header, but
  355 + // the length should be without. Hence the minus five.
  356 + size_t length = packet.m_buffer.size() - 5;
  357 + size_t offset = length <= 127 ? 3 : (length <= 16383 ? 2 : (length <= 2097151 ? 1 : 0));
  358 + size_t bufferOffset = offset;
348 359
349 - // Create the header.  
350 - buffer.push_back((static_cast<uint8_t>(packet.m_packet_type) << 4) | packet.m_flags); 360 + // Set the header.
  361 + packet.m_buffer[offset++] = (static_cast<uint8_t>(packet.m_packet_type) << 4) | packet.m_flags;
351 362
352 - // Calculate the length buffer.  
353 - size_t length = packet.m_buffer.size(); 363 + // Set the remaining length.
354 do 364 do
355 { 365 {
356 uint8_t byte = length & 0x7F; 366 uint8_t byte = length & 0x7F;
@@ -359,43 +369,40 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet &amp;packet) const @@ -359,43 +369,40 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet &amp;packet) const
359 { 369 {
360 byte |= 0x80; 370 byte |= 0x80;
361 } 371 }
362 - buffer.push_back(byte); 372 + packet.m_buffer[offset++] = byte;
363 } while (length > 0); 373 } while (length > 0);
364 374
365 - // Write header and packet.  
366 - if (::send(m_socket, (char *)buffer.data(), buffer.size(), MSG_NOSIGNAL) < 0) 375 + ssize_t res = ::send(m_socket, (char *)packet.m_buffer.data() + bufferOffset, packet.m_buffer.size() - bufferOffset, MSG_NOSIGNAL);
  376 + // If the first packet is rejected in full, return this to the caller.
  377 + if (res < 0)
367 { 378 {
368 if (errno == EAGAIN) 379 if (errno == EAGAIN)
369 { 380 {
370 // sndbuf is full, so we hand it back to the sender to deal with this. 381 // sndbuf is full, so we hand it back to the sender to deal with this.
371 return false; 382 return false;
372 } 383 }
373 - if (errno == ECONNRESET || errno == EPIPE)  
374 - {  
375 - LOG_ERROR(&m_impl, "Connection closed by broker");  
376 - m_impl.m_connection->socketError();  
377 - return false;  
378 - }  
379 384
380 - LOG_WARNING(&m_impl, "Connection write error: " + std::string(strerror(errno))); 385 + LOG_ERROR(&m_impl, "Connection write error: " + std::string(strerror(errno)));
  386 + m_impl.m_connection->socketError();
381 return false; 387 return false;
382 } 388 }
383 - if (::send(m_socket, (char *)packet.m_buffer.data(), packet.m_buffer.size(), MSG_NOSIGNAL) < 0) 389 + // If we still have data to send for this packet, keep trying to send the data till we succeed.
  390 + bufferOffset += res;
  391 + while (bufferOffset < packet.m_buffer.size())
384 { 392 {
385 - if (errno == EAGAIN) 393 + res = ::send(m_socket, (char *)packet.m_buffer.data() + bufferOffset, packet.m_buffer.size() - bufferOffset, MSG_NOSIGNAL);
  394 + if (res < 0)
386 { 395 {
387 - // sndbuf is full, so we hand it back to the sender to deal with this.  
388 - return false;  
389 - }  
390 - if (errno == ECONNRESET || errno == EPIPE)  
391 - {  
392 - LOG_ERROR(&m_impl, "Connection closed by broker"); 396 + if (errno == EAGAIN)
  397 + {
  398 + continue;
  399 + }
  400 +
  401 + LOG_ERROR(&m_impl, "Connection write error: " + std::string(strerror(errno)));
393 m_impl.m_connection->socketError(); 402 m_impl.m_connection->socketError();
394 return false; 403 return false;
395 } 404 }
396 -  
397 - LOG_WARNING(&m_impl, "Connection write error: " + std::string(strerror(errno)));  
398 - return false; 405 + bufferOffset += res;
399 } 406 }
400 407
401 return true; 408 return true;