Commit 1f70ca5de647292a16373cfcde39b8c616262e53

Authored by Patric Stout
1 parent f2f0b866

feat(send): deligate sending of packets to its own thread

This means the socket can be blocking, which makes administration
easier. The drawback is that there is now a queue, including
signalling, between the main thread and write thread. This
consumes a bit more CPU; but in return the main thread is never
blocked.
include/TrueMQTT.h
@@ -134,6 +134,8 @@ namespace TrueMQTT @@ -134,6 +134,8 @@ namespace TrueMQTT
134 /** 134 /**
135 * @brief Set the logger callback and level. 135 * @brief Set the logger callback and level.
136 * 136 *
  137 + * By default no logger is set.
  138 + *
137 * @param log_level The \ref LogLevel to use for logging. 139 * @param log_level The \ref LogLevel to use for logging.
138 * @param logger The callback to call when a log message is generated. 140 * @param logger The callback to call when a log message is generated.
139 * 141 *
@@ -145,6 +147,8 @@ namespace TrueMQTT @@ -145,6 +147,8 @@ namespace TrueMQTT
145 /** 147 /**
146 * @brief Set the last will message on the connection. 148 * @brief Set the last will message on the connection.
147 * 149 *
  150 + * By default no last will is set.
  151 + *
148 * @param topic The topic to publish the last will message to. 152 * @param topic The topic to publish the last will message to.
149 * @param message The message of the last will message. 153 * @param message The message of the last will message.
150 * @param retain Whether to retain the last will message. 154 * @param retain Whether to retain the last will message.
@@ -163,6 +167,8 @@ namespace TrueMQTT @@ -163,6 +167,8 @@ namespace TrueMQTT
163 /** 167 /**
164 * @brief Set the publish queue to use. 168 * @brief Set the publish queue to use.
165 * 169 *
  170 + * The default is DROP.
  171 + *
166 * @param queue_type The \ref PublishQueueType to use for the publish queue. 172 * @param queue_type The \ref PublishQueueType to use for the publish queue.
167 * @param size The size of the queue. If the queue is full, the type of queue defines what happens. 173 * @param size The size of the queue. If the queue is full, the type of queue defines what happens.
168 * 174 *
@@ -171,6 +177,23 @@ namespace TrueMQTT @@ -171,6 +177,23 @@ namespace TrueMQTT
171 void setPublishQueue(PublishQueueType queue_type, size_t size) const; 177 void setPublishQueue(PublishQueueType queue_type, size_t size) const;
172 178
173 /** 179 /**
  180 + * @brief Set the size of the send queue.
  181 + *
  182 + * The send queue is used to transfer MQTT packets from the main thread to the
  183 + * network thread. This queue is used to prevent the main thread from blocking
  184 + * when sending a lot of data.
  185 + *
  186 + * Setting the queue too big will cause the memory usage to increase, while
  187 + * setting it too small will cause functions like \ref publish to return false,
  188 + * as the queue is full.
  189 + *
  190 + * The default is 1000.
  191 + *
  192 + * @param size Size of the send queue.
  193 + */
  194 + void setSendQueue(size_t size) const;
  195 +
  196 + /**
174 * @brief Connect to the broker. 197 * @brief Connect to the broker.
175 * 198 *
176 * After calling this function, the library will try a connection to the broker. 199 * After calling this function, the library will try a connection to the broker.
@@ -220,8 +243,9 @@ namespace TrueMQTT @@ -220,8 +243,9 @@ namespace TrueMQTT
220 * moment the connection to the broker is established, and there are messages in the 243 * moment the connection to the broker is established, and there are messages in the
221 * publish queue and/or subscriptions. 244 * publish queue and/or subscriptions.
222 * @note If the return value is false, but there is a connection with the broker, 245 * @note If the return value is false, but there is a connection with the broker,
223 - * this means the sndbuf of the socket is full. It is up to the caller to consider  
224 - * what to do in this case. 246 + * this means the send queue is full. It is up to the caller to consider what to do
  247 + * in this case, but it is wise to back off for a while before sending something
  248 + * again.
225 */ 249 */
226 bool publish(const std::string &topic, const std::string &message, bool retain) const; 250 bool publish(const std::string &topic, const std::string &message, bool retain) const;
227 251
src/Client.cpp
@@ -100,6 +100,19 @@ void TrueMQTT::Client::setPublishQueue(Client::PublishQueueType queue_type, size @@ -100,6 +100,19 @@ void TrueMQTT::Client::setPublishQueue(Client::PublishQueueType queue_type, size
100 m_impl->m_publish_queue_size = size; 100 m_impl->m_publish_queue_size = size;
101 } 101 }
102 102
  103 +void TrueMQTT::Client::setSendQueue(size_t size) const
  104 +{
  105 + if (m_impl->m_state != Client::Impl::State::DISCONNECTED)
  106 + {
  107 + LOG_ERROR(m_impl, "Cannot set send queue when not disconnected");
  108 + return;
  109 + }
  110 +
  111 + LOG_TRACE(m_impl, "Setting send queue to size " + std::to_string(size));
  112 +
  113 + m_impl->m_send_queue_size = size;
  114 +}
  115 +
103 void TrueMQTT::Client::connect() const 116 void TrueMQTT::Client::connect() const
104 { 117 {
105 std::scoped_lock lock(m_impl->m_state_mutex); 118 std::scoped_lock lock(m_impl->m_state_mutex);
src/ClientImpl.h
@@ -80,6 +80,8 @@ public: @@ -80,6 +80,8 @@ public:
80 size_t m_publish_queue_size = -1; ///< Size of the publish queue. 80 size_t m_publish_queue_size = -1; ///< Size of the publish queue.
81 std::deque<std::tuple<std::string, std::string, bool>> m_publish_queue; ///< Queue of publish messages to send to the broker. 81 std::deque<std::tuple<std::string, std::string, bool>> m_publish_queue; ///< Queue of publish messages to send to the broker.
82 82
  83 + size_t m_send_queue_size = 1000; ///< Size of the send queue.
  84 +
83 std::set<std::string> m_subscription_topics; ///< Flat list of topics the client is subscribed to. 85 std::set<std::string> m_subscription_topics; ///< Flat list of topics the client is subscribed to.
84 std::map<std::string, SubscriptionPart> m_subscriptions; ///< Tree of active subscriptions build up from the parts on the topic. 86 std::map<std::string, SubscriptionPart> m_subscriptions; ///< Tree of active subscriptions build up from the parts on the topic.
85 87
src/Connection.cpp
@@ -18,7 +18,8 @@ @@ -18,7 +18,8 @@
18 18
19 TrueMQTT::Client::Impl::Connection::Connection(Client::Impl &impl) 19 TrueMQTT::Client::Impl::Connection::Connection(Client::Impl &impl)
20 : m_impl(impl), 20 : m_impl(impl),
21 - m_thread(&Connection::run, this), 21 + m_thread_read(&Connection::runRead, this),
  22 + m_thread_write(&Connection::runWrite, this),
22 m_backoff(impl.m_connection_backoff) 23 m_backoff(impl.m_connection_backoff)
23 { 24 {
24 } 25 }
@@ -26,11 +27,16 @@ TrueMQTT::Client::Impl::Connection::Connection(Client::Impl &amp;impl) @@ -26,11 +27,16 @@ TrueMQTT::Client::Impl::Connection::Connection(Client::Impl &amp;impl)
26 TrueMQTT::Client::Impl::Connection::~Connection() 27 TrueMQTT::Client::Impl::Connection::~Connection()
27 { 28 {
28 m_state = State::STOP; 29 m_state = State::STOP;
  30 + m_send_queue_cv.notify_one();
29 31
30 // Make sure the connection thread is terminated. 32 // Make sure the connection thread is terminated.
31 - if (m_thread.joinable()) 33 + if (m_thread_read.joinable())
32 { 34 {
33 - m_thread.join(); 35 + m_thread_read.join();
  36 + }
  37 + if (m_thread_write.joinable())
  38 + {
  39 + m_thread_write.join();
34 } 40 }
35 41
36 // freeaddrinfo() is one of those functions that doesn't take kind to NULL pointers 42 // freeaddrinfo() is one of those functions that doesn't take kind to NULL pointers
@@ -50,7 +56,7 @@ std::string TrueMQTT::Client::Impl::Connection::addrinfoToString(const addrinfo @@ -50,7 +56,7 @@ std::string TrueMQTT::Client::Impl::Connection::addrinfoToString(const addrinfo
50 return std::string(host); 56 return std::string(host);
51 } 57 }
52 58
53 -void TrueMQTT::Client::Impl::Connection::run() 59 +void TrueMQTT::Client::Impl::Connection::runRead()
54 { 60 {
55 while (true) 61 while (true)
56 { 62 {
@@ -112,6 +118,58 @@ void TrueMQTT::Client::Impl::Connection::run() @@ -112,6 +118,58 @@ void TrueMQTT::Client::Impl::Connection::run()
112 } 118 }
113 } 119 }
114 120
  121 +std::optional<Packet> TrueMQTT::Client::Impl::Connection::popSendQueueBlocking()
  122 +{
  123 + std::unique_lock<std::mutex> lock(m_send_queue_mutex);
  124 + if (!m_send_queue.empty())
  125 + {
  126 + auto packet = m_send_queue.front();
  127 + m_send_queue.pop_front();
  128 + return packet;
  129 + }
  130 +
  131 + m_send_queue_cv.wait(lock, [this]
  132 + { return !m_send_queue.empty() || m_state == State::STOP; });
  133 +
  134 + if (m_state == State::STOP)
  135 + {
  136 + return {};
  137 + }
  138 +
  139 + Packet packet = m_send_queue.front();
  140 + m_send_queue.pop_front();
  141 + return packet;
  142 +}
  143 +
  144 +void TrueMQTT::Client::Impl::Connection::runWrite()
  145 +{
  146 + while (true)
  147 + {
  148 + switch (m_state)
  149 + {
  150 + case State::AUTHENTICATING:
  151 + case State::CONNECTED:
  152 + {
  153 + auto packet = popSendQueueBlocking();
  154 + if (!packet)
  155 + {
  156 + break;
  157 + }
  158 + sendPacket(packet.value());
  159 + break;
  160 + }
  161 +
  162 + case State::STOP:
  163 + return;
  164 +
  165 + default:
  166 + // Sleep for a bit to avoid hogging the CPU.
  167 + std::this_thread::sleep_for(std::chrono::milliseconds(1));
  168 + break;
  169 + }
  170 + }
  171 +}
  172 +
115 void TrueMQTT::Client::Impl::Connection::socketError() 173 void TrueMQTT::Client::Impl::Connection::socketError()
116 { 174 {
117 m_state = State::SOCKET_ERROR; 175 m_state = State::SOCKET_ERROR;
@@ -330,6 +388,13 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny() @@ -330,6 +388,13 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny()
330 m_socket_to_address.clear(); 388 m_socket_to_address.clear();
331 m_sockets.clear(); 389 m_sockets.clear();
332 390
  391 + // Disable non-blocking, as we will be reading/writing from a thread, which can be blocking.
  392 + int nonblocking = 0;
  393 + if (ioctl(socket_connected, FIONBIO, &nonblocking) != 0)
  394 + {
  395 + LOG_WARNING(&m_impl, "Could not set socket to non-blocking; expect performance impact");
  396 + }
  397 +
333 m_socket = socket_connected; 398 m_socket = socket_connected;
334 399
335 // Only change the state if no disconnect() has been requested in the mean time. 400 // Only change the state if no disconnect() has been requested in the mean time.
src/Connection.h
@@ -8,10 +8,15 @@ @@ -8,10 +8,15 @@
8 #pragma once 8 #pragma once
9 9
10 #include "ClientImpl.h" 10 #include "ClientImpl.h"
  11 +#include "Packet.h"
11 12
12 #include <chrono> 13 #include <chrono>
  14 +#include <condition_variable>
  15 +#include <deque>
  16 +#include <optional>
13 #include <string> 17 #include <string>
14 #include <map> 18 #include <map>
  19 +#include <mutex>
15 #include <netdb.h> 20 #include <netdb.h>
16 #include <thread> 21 #include <thread>
17 #include <vector> 22 #include <vector>
@@ -21,30 +26,31 @@ @@ -21,30 +26,31 @@
21 #define INVALID_SOCKET -1 26 #define INVALID_SOCKET -1
22 #define closesocket close 27 #define closesocket close
23 28
24 -class Packet;  
25 -  
26 class TrueMQTT::Client::Impl::Connection 29 class TrueMQTT::Client::Impl::Connection
27 { 30 {
28 public: 31 public:
29 Connection(TrueMQTT::Client::Impl &impl); 32 Connection(TrueMQTT::Client::Impl &impl);
30 ~Connection(); 33 ~Connection();
31 34
32 - bool send(Packet &packet) const; 35 + bool send(Packet packet);
33 void socketError(); 36 void socketError();
34 37
35 private: 38 private:
36 // Implemented in Connection.cpp 39 // Implemented in Connection.cpp
37 - void run(); 40 + void runRead();
  41 + void runWrite();
38 void resolve(); 42 void resolve();
39 bool tryNextAddress(); 43 bool tryNextAddress();
40 void connect(addrinfo *address); 44 void connect(addrinfo *address);
41 bool connectToAny(); 45 bool connectToAny();
42 std::string addrinfoToString(const addrinfo *address) const; 46 std::string addrinfoToString(const addrinfo *address) const;
  47 + std::optional<Packet> popSendQueueBlocking();
43 48
44 // Implemented in Packet.cpp 49 // Implemented in Packet.cpp
45 ssize_t recv(char *buffer, size_t length) const; 50 ssize_t recv(char *buffer, size_t length) const;
46 bool recvLoop(); 51 bool recvLoop();
47 bool sendConnect(); 52 bool sendConnect();
  53 + void sendPacket(Packet &packet) const;
48 54
49 enum class State 55 enum class State
50 { 56 {
@@ -60,7 +66,8 @@ private: @@ -60,7 +66,8 @@ private:
60 TrueMQTT::Client::Impl &m_impl; 66 TrueMQTT::Client::Impl &m_impl;
61 67
62 State m_state = State::RESOLVING; ///< Current state of the connection. 68 State m_state = State::RESOLVING; ///< Current state of the connection.
63 - std::thread m_thread; ///< Current thread used to run this connection. 69 + std::thread m_thread_read; ///< Current read thread used to run this connection.
  70 + std::thread m_thread_write; ///< Current write thread used to run this connection.
64 71
65 std::chrono::milliseconds m_backoff; ///< Current backoff time. 72 std::chrono::milliseconds m_backoff; ///< Current backoff time.
66 73
@@ -74,4 +81,8 @@ private: @@ -74,4 +81,8 @@ private:
74 std::map<SOCKET, addrinfo *> m_socket_to_address = {}; ///< Map of sockets to the address they are trying to connect to. 81 std::map<SOCKET, addrinfo *> m_socket_to_address = {}; ///< Map of sockets to the address they are trying to connect to.
75 82
76 SOCKET m_socket = INVALID_SOCKET; ///< The socket we are currently connected with, or INVALID_SOCKET if not connected. 83 SOCKET m_socket = INVALID_SOCKET; ///< The socket we are currently connected with, or INVALID_SOCKET if not connected.
  84 +
  85 + std::deque<Packet> m_send_queue = {}; ///< Queue of packets to send to the broker.
  86 + std::mutex m_send_queue_mutex; ///< Mutex to protect the send queue.
  87 + std::condition_variable m_send_queue_cv; ///< Condition variable to wake up the write thread when the send queue is not empty.
77 }; 88 };
src/Packet.cpp
@@ -8,125 +8,12 @@ @@ -8,125 +8,12 @@
8 #include "ClientImpl.h" 8 #include "ClientImpl.h"
9 #include "Connection.h" 9 #include "Connection.h"
10 #include "Log.h" 10 #include "Log.h"
  11 +#include "Packet.h"
11 12
12 #include "magic_enum.hpp" 13 #include "magic_enum.hpp"
13 14
14 #include <string.h> 15 #include <string.h>
15 16
16 -class Packet  
17 -{  
18 -public:  
19 - enum class PacketType  
20 - {  
21 - CONNECT = 1,  
22 - CONNACK = 2,  
23 - PUBLISH = 3,  
24 - PUBACK = 4,  
25 - PUBREC = 5,  
26 - PUBREL = 6,  
27 - PUBCOMP = 7,  
28 - SUBSCRIBE = 8,  
29 - SUBACK = 9,  
30 - UNSUBSCRIBE = 10,  
31 - UNSUBACK = 11,  
32 - PINGREQ = 12,  
33 - PINGRESP = 13,  
34 - DISCONNECT = 14,  
35 - };  
36 -  
37 - Packet(PacketType packet_type, uint8_t flags)  
38 - : m_packet_type(packet_type),  
39 - m_flags(flags)  
40 - {  
41 - // Reserve space for the header.  
42 - m_buffer.push_back(0); // Packet type and flags.  
43 - m_buffer.push_back(0); // Remaining length (at most 4 bytes).  
44 - m_buffer.push_back(0);  
45 - m_buffer.push_back(0);  
46 - m_buffer.push_back(0);  
47 - }  
48 -  
49 - Packet(PacketType packet_type, uint8_t flags, std::vector<uint8_t> data)  
50 - : m_buffer(std::move(data)),  
51 - m_packet_type(packet_type),  
52 - m_flags(flags)  
53 - {  
54 - }  
55 -  
56 - void write_uint8(uint8_t value)  
57 - {  
58 - m_buffer.push_back(value);  
59 - }  
60 -  
61 - void write_uint16(uint16_t value)  
62 - {  
63 - m_buffer.push_back(value >> 8);  
64 - m_buffer.push_back(value & 0xFF);  
65 - }  
66 -  
67 - void write(const char *data, size_t length)  
68 - {  
69 - m_buffer.insert(m_buffer.end(), data, data + length);  
70 - }  
71 -  
72 - void write_string(const std::string &str)  
73 - {  
74 - write_uint16(static_cast<uint16_t>(str.size()));  
75 - write(str.c_str(), str.size());  
76 - }  
77 -  
78 - bool read_uint8(uint8_t &value)  
79 - {  
80 - if (m_buffer.size() < m_read_offset + 1)  
81 - {  
82 - return false;  
83 - }  
84 - value = m_buffer[m_read_offset++];  
85 - return true;  
86 - }  
87 -  
88 - bool read_uint16(uint16_t &value)  
89 - {  
90 - if (m_buffer.size() < m_read_offset + 2)  
91 - {  
92 - return false;  
93 - }  
94 - value = m_buffer[m_read_offset++] << 8;  
95 - value |= m_buffer[m_read_offset++];  
96 - return true;  
97 - }  
98 -  
99 - bool read_string(std::string &str)  
100 - {  
101 - uint16_t length;  
102 - if (!read_uint16(length))  
103 - {  
104 - return false;  
105 - }  
106 - if (m_buffer.size() < m_read_offset + length)  
107 - {  
108 - return false;  
109 - }  
110 - const char *data = reinterpret_cast<const char *>(m_buffer.data()) + m_read_offset;  
111 - str.assign(data, length);  
112 - m_read_offset += length;  
113 - return true;  
114 - }  
115 -  
116 - void read_remaining(std::string &str)  
117 - {  
118 - const char *data = reinterpret_cast<const char *>(m_buffer.data()) + m_read_offset;  
119 - str.assign(data, m_buffer.size() - m_read_offset);  
120 - m_read_offset = m_buffer.size();  
121 - }  
122 -  
123 - std::vector<uint8_t> m_buffer;  
124 - size_t m_read_offset = 0;  
125 -  
126 - PacketType m_packet_type;  
127 - uint8_t m_flags;  
128 -};  
129 -  
130 ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) const 17 ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) const
131 { 18 {
132 // We idle-check every 10ms if we are requested to stop or if there was 19 // We idle-check every 10ms if we are requested to stop or if there was
@@ -334,28 +221,20 @@ bool TrueMQTT::Client::Impl::Connection::recvLoop() @@ -334,28 +221,20 @@ bool TrueMQTT::Client::Impl::Connection::recvLoop()
334 return true; 221 return true;
335 } 222 }
336 223
337 -bool TrueMQTT::Client::Impl::Connection::send(Packet &packet) const 224 +bool TrueMQTT::Client::Impl::Connection::send(Packet packet)
338 { 225 {
339 - if (m_state != State::AUTHENTICATING && m_state != State::CONNECTED) 226 + // Push back if the internal queue gets too big.
  227 + if (m_send_queue.size() > m_impl.m_send_queue_size)
340 { 228 {
341 - // This happens in the small window the connection thread hasn't  
342 - // spotted yet the connection is closed, while this function closed  
343 - // the socket earlier due to the broker closing the connection.  
344 - // Basically, it can only be caused if the broker actively closes  
345 - // the connection due to a write while publishing a lot of data  
346 - // quickly.  
347 - LOG_DEBUG(&m_impl, "Attempted to send packet while not connected");  
348 return false; 229 return false;
349 } 230 }
350 231
351 - LOG_TRACE(&m_impl, "Sending packet of type " + std::string(magic_enum::enum_name(packet.m_packet_type)) + " with flags " + std::to_string(packet.m_flags) + " and length " + std::to_string(packet.m_buffer.size()));  
352 -  
353 // Calculate where in the header we need to start writing, to create 232 // Calculate where in the header we need to start writing, to create
354 // a contiguous buffer. The buffer size is including the header, but 233 // a contiguous buffer. The buffer size is including the header, but
355 // the length should be without. Hence the minus five. 234 // the length should be without. Hence the minus five.
356 size_t length = packet.m_buffer.size() - 5; 235 size_t length = packet.m_buffer.size() - 5;
357 size_t offset = length <= 127 ? 3 : (length <= 16383 ? 2 : (length <= 2097151 ? 1 : 0)); 236 size_t offset = length <= 127 ? 3 : (length <= 16383 ? 2 : (length <= 2097151 ? 1 : 0));
358 - size_t bufferOffset = offset; 237 + packet.m_write_offset = offset;
359 238
360 // Set the header. 239 // Set the header.
361 packet.m_buffer[offset++] = (static_cast<uint8_t>(packet.m_packet_type) << 4) | packet.m_flags; 240 packet.m_buffer[offset++] = (static_cast<uint8_t>(packet.m_packet_type) << 4) | packet.m_flags;
@@ -372,40 +251,45 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet &amp;packet) const @@ -372,40 +251,45 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet &amp;packet) const
372 packet.m_buffer[offset++] = byte; 251 packet.m_buffer[offset++] = byte;
373 } while (length > 0); 252 } while (length > 0);
374 253
375 - ssize_t res = ::send(m_socket, (char *)packet.m_buffer.data() + bufferOffset, packet.m_buffer.size() - bufferOffset, MSG_NOSIGNAL);  
376 - // If the first packet is rejected in full, return this to the caller.  
377 - if (res < 0) 254 + // Add the packet to the queue.
378 { 255 {
379 - if (errno == EAGAIN)  
380 - {  
381 - // sndbuf is full, so we hand it back to the sender to deal with this.  
382 - return false;  
383 - } 256 + std::scoped_lock lock(m_send_queue_mutex);
  257 + m_send_queue.push_back(std::move(packet));
  258 + }
  259 + // Notify the write thread that there is a new packet.
  260 + m_send_queue_cv.notify_one();
384 261
385 - LOG_ERROR(&m_impl, "Connection write error: " + std::string(strerror(errno)));  
386 - m_impl.m_connection->socketError();  
387 - return false; 262 + return true;
  263 +}
  264 +
  265 +void TrueMQTT::Client::Impl::Connection::sendPacket(Packet &packet) const
  266 +{
  267 + if (m_state != State::AUTHENTICATING && m_state != State::CONNECTED)
  268 + {
  269 + // This happens in the small window the connection thread hasn't
  270 + // spotted yet the connection is closed, while this function closed
  271 + // the socket earlier due to the broker closing the connection.
  272 + // Basically, it can only be caused if the broker actively closes
  273 + // the connection due to a write while publishing a lot of data
  274 + // quickly.
  275 + LOG_DEBUG(&m_impl, "Attempted to send packet while not connected");
  276 + return;
388 } 277 }
389 - // If we still have data to send for this packet, keep trying to send the data till we succeed.  
390 - bufferOffset += res;  
391 - while (bufferOffset < packet.m_buffer.size()) 278 +
  279 + LOG_TRACE(&m_impl, "Sending packet of type " + std::string(magic_enum::enum_name(packet.m_packet_type)) + " with flags " + std::to_string(packet.m_flags) + " and length " + std::to_string(packet.m_buffer.size() - 5));
  280 +
  281 + // Send the packet to the broker.
  282 + while (packet.m_write_offset < packet.m_buffer.size())
392 { 283 {
393 - res = ::send(m_socket, (char *)packet.m_buffer.data() + bufferOffset, packet.m_buffer.size() - bufferOffset, MSG_NOSIGNAL); 284 + ssize_t res = ::send(m_socket, (char *)packet.m_buffer.data() + packet.m_write_offset, packet.m_buffer.size() - packet.m_write_offset, MSG_NOSIGNAL);
394 if (res < 0) 285 if (res < 0)
395 { 286 {
396 - if (errno == EAGAIN)  
397 - {  
398 - continue;  
399 - }  
400 -  
401 LOG_ERROR(&m_impl, "Connection write error: " + std::string(strerror(errno))); 287 LOG_ERROR(&m_impl, "Connection write error: " + std::string(strerror(errno)));
402 m_impl.m_connection->socketError(); 288 m_impl.m_connection->socketError();
403 - return false; 289 + return;
404 } 290 }
405 - bufferOffset += res; 291 + packet.m_write_offset += res;
406 } 292 }
407 -  
408 - return true;  
409 } 293 }
410 294
411 bool TrueMQTT::Client::Impl::Connection::sendConnect() 295 bool TrueMQTT::Client::Impl::Connection::sendConnect()
@@ -442,7 +326,7 @@ bool TrueMQTT::Client::Impl::Connection::sendConnect() @@ -442,7 +326,7 @@ bool TrueMQTT::Client::Impl::Connection::sendConnect()
442 packet.write_string(m_impl.m_last_will_message); 326 packet.write_string(m_impl.m_last_will_message);
443 } 327 }
444 328
445 - return send(packet); 329 + return send(std::move(packet));
446 } 330 }
447 331
448 bool TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain) 332 bool TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain)
@@ -459,7 +343,7 @@ bool TrueMQTT::Client::Impl::sendPublish(const std::string &amp;topic, const std::st @@ -459,7 +343,7 @@ bool TrueMQTT::Client::Impl::sendPublish(const std::string &amp;topic, const std::st
459 packet.write_string(topic); 343 packet.write_string(topic);
460 packet.write(message.c_str(), message.size()); 344 packet.write(message.c_str(), message.size());
461 345
462 - return m_connection->send(packet); 346 + return m_connection->send(std::move(packet));
463 } 347 }
464 348
465 bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic) 349 bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic)
@@ -478,7 +362,7 @@ bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &amp;topic) @@ -478,7 +362,7 @@ bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &amp;topic)
478 packet.write_string(topic); 362 packet.write_string(topic);
479 packet.write_uint8(0); // QoS 363 packet.write_uint8(0); // QoS
480 364
481 - return m_connection->send(packet); 365 + return m_connection->send(std::move(packet));
482 } 366 }
483 367
484 bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &topic) 368 bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &topic)
@@ -496,5 +380,5 @@ bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &amp;topic) @@ -496,5 +380,5 @@ bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &amp;topic)
496 packet.write_uint16(m_packet_id++); 380 packet.write_uint16(m_packet_id++);
497 packet.write_string(topic); 381 packet.write_string(topic);
498 382
499 - return m_connection->send(packet); 383 + return m_connection->send(std::move(packet));
500 } 384 }
src/Packet.h 0 โ†’ 100644
  1 +/*
  2 + * Copyright (c) TrueBrain
  3 + *
  4 + * This source code is licensed under the MIT license found in the
  5 + * LICENSE file in the root directory of this source tree.
  6 + */
  7 +
  8 +#pragma once
  9 +
  10 +#include <cstdint>
  11 +#include <vector>
  12 +
  13 +class Packet
  14 +{
  15 +public:
  16 + enum class PacketType
  17 + {
  18 + CONNECT = 1,
  19 + CONNACK = 2,
  20 + PUBLISH = 3,
  21 + PUBACK = 4,
  22 + PUBREC = 5,
  23 + PUBREL = 6,
  24 + PUBCOMP = 7,
  25 + SUBSCRIBE = 8,
  26 + SUBACK = 9,
  27 + UNSUBSCRIBE = 10,
  28 + UNSUBACK = 11,
  29 + PINGREQ = 12,
  30 + PINGRESP = 13,
  31 + DISCONNECT = 14,
  32 + };
  33 +
  34 + Packet(PacketType packet_type, uint8_t flags)
  35 + : m_packet_type(packet_type),
  36 + m_flags(flags)
  37 + {
  38 + // Reserve space for the header.
  39 + m_buffer.push_back(0); // Packet type and flags.
  40 + m_buffer.push_back(0); // Remaining length (at most 4 bytes).
  41 + m_buffer.push_back(0);
  42 + m_buffer.push_back(0);
  43 + m_buffer.push_back(0);
  44 + }
  45 +
  46 + Packet(PacketType packet_type, uint8_t flags, std::vector<uint8_t> data)
  47 + : m_buffer(std::move(data)),
  48 + m_packet_type(packet_type),
  49 + m_flags(flags)
  50 + {
  51 + }
  52 +
  53 + void write_uint8(uint8_t value)
  54 + {
  55 + m_buffer.push_back(value);
  56 + }
  57 +
  58 + void write_uint16(uint16_t value)
  59 + {
  60 + m_buffer.push_back(value >> 8);
  61 + m_buffer.push_back(value & 0xFF);
  62 + }
  63 +
  64 + void write(const char *data, size_t length)
  65 + {
  66 + m_buffer.insert(m_buffer.end(), data, data + length);
  67 + }
  68 +
  69 + void write_string(const std::string &str)
  70 + {
  71 + write_uint16(static_cast<uint16_t>(str.size()));
  72 + write(str.c_str(), str.size());
  73 + }
  74 +
  75 + bool read_uint8(uint8_t &value)
  76 + {
  77 + if (m_buffer.size() < m_read_offset + 1)
  78 + {
  79 + return false;
  80 + }
  81 + value = m_buffer[m_read_offset++];
  82 + return true;
  83 + }
  84 +
  85 + bool read_uint16(uint16_t &value)
  86 + {
  87 + if (m_buffer.size() < m_read_offset + 2)
  88 + {
  89 + return false;
  90 + }
  91 + value = m_buffer[m_read_offset++] << 8;
  92 + value |= m_buffer[m_read_offset++];
  93 + return true;
  94 + }
  95 +
  96 + bool read_string(std::string &str)
  97 + {
  98 + uint16_t length;
  99 + if (!read_uint16(length))
  100 + {
  101 + return false;
  102 + }
  103 + if (m_buffer.size() < m_read_offset + length)
  104 + {
  105 + return false;
  106 + }
  107 + const char *data = reinterpret_cast<const char *>(m_buffer.data()) + m_read_offset;
  108 + str.assign(data, length);
  109 + m_read_offset += length;
  110 + return true;
  111 + }
  112 +
  113 + void read_remaining(std::string &str)
  114 + {
  115 + const char *data = reinterpret_cast<const char *>(m_buffer.data()) + m_read_offset;
  116 + str.assign(data, m_buffer.size() - m_read_offset);
  117 + m_read_offset = m_buffer.size();
  118 + }
  119 +
  120 + std::vector<uint8_t> m_buffer;
  121 + size_t m_read_offset = 0;
  122 + size_t m_write_offset = 0;
  123 +
  124 + PacketType m_packet_type;
  125 + uint8_t m_flags;
  126 +};