Commit 2eed0cfe0e2b0c73567b158ec612c5b34c844cb0

Authored by Patric Stout
1 parent 22c2cd01

feat(keep-alive): send keep-alive every interval

Also, check if we get a response every ping-request, so we know
the connection with the broker is still good.
src/Connection.cpp
... ... @@ -396,6 +396,8 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny()
396 396 }
397 397  
398 398 m_socket = socket_connected;
  399 + m_last_sent_packet = std::chrono::steady_clock::now();
  400 + m_last_received_packet = std::chrono::steady_clock::now();
399 401  
400 402 // Only change the state if no disconnect() has been requested in the mean time.
401 403 if (m_state != State::STOP)
... ...
src/Connection.h
... ... @@ -32,7 +32,7 @@ public:
32 32 Connection(TrueMQTT::Client::Impl &impl);
33 33 ~Connection();
34 34  
35   - bool send(Packet packet);
  35 + bool send(Packet packet, bool has_priority = false);
36 36 void socketError();
37 37  
38 38 private:
... ... @@ -47,10 +47,11 @@ private:
47 47 std::optional<Packet> popSendQueueBlocking();
48 48  
49 49 // Implemented in Packet.cpp
50   - ssize_t recv(char *buffer, size_t length) const;
  50 + ssize_t recv(char *buffer, size_t length);
51 51 bool recvLoop();
52 52 bool sendConnect();
53   - void sendPacket(Packet &packet) const;
  53 + bool sendPingRequest();
  54 + void sendPacket(Packet &packet);
54 55  
55 56 enum class State
56 57 {
... ... @@ -85,4 +86,7 @@ private:
85 86 std::deque<Packet> m_send_queue = {}; ///< Queue of packets to send to the broker.
86 87 std::mutex m_send_queue_mutex; ///< Mutex to protect the send queue.
87 88 std::condition_variable m_send_queue_cv; ///< Condition variable to wake up the write thread when the send queue is not empty.
  89 +
  90 + std::chrono::steady_clock::time_point m_last_sent_packet = {}; ///< Time of the last packet sent to the broker.
  91 + std::chrono::steady_clock::time_point m_last_received_packet = {}; ///< Time of the last packet received from the broker.
88 92 };
... ...
src/Packet.cpp
... ... @@ -14,7 +14,7 @@
14 14  
15 15 #include <string.h>
16 16  
17   -ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) const
  17 +ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length)
18 18 {
19 19 // We idle-check every 100ms if we are requested to stop or if there was
20 20 // an error. This is to prevent the recv() call from blocking forever.
... ... @@ -38,6 +38,26 @@ ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) co
38 38 return -1;
39 39 }
40 40  
  41 + auto now = std::chrono::steady_clock::now();
  42 +
  43 + // Send a keep-alive to the broker if we haven't sent anything for a while.
  44 + if (m_last_sent_packet + m_impl.m_keep_alive_interval < now)
  45 + {
  46 + // Force updating the time, as it might be a while before the ping actually leaves
  47 + // the send queue. And we don't need to send more than one for the whole interval.
  48 + m_last_sent_packet = std::chrono::steady_clock::now();
  49 + sendPingRequest();
  50 + }
  51 +
  52 + // As the broker should send a ping every keep-alive interval, we really should
  53 + // see something no later than every 1.5 times the keep-alive interval. If not,
  54 + // we assume the connection is dead.
  55 + if (m_last_received_packet + m_impl.m_keep_alive_interval * 15 / 10 < now)
  56 + {
  57 + LOG_ERROR(&m_impl, "Connection timed out");
  58 + return -1;
  59 + }
  60 +
41 61 if (ret == 0)
42 62 {
43 63 continue;
... ... @@ -213,18 +233,24 @@ bool TrueMQTT::Client::Impl::Connection::recvLoop()
213 233  
214 234 break;
215 235 }
  236 + case Packet::PacketType::PINGRESP:
  237 + {
  238 + LOG_DEBUG(&m_impl, "Received PINGRESP");
  239 + break;
  240 + }
216 241 default:
217 242 LOG_ERROR(&m_impl, "Received unexpected packet type " + std::string(magic_enum::enum_name(packet_type)) + " from broker, closing connection");
218 243 return false;
219 244 }
220 245  
  246 + m_last_received_packet = std::chrono::steady_clock::now();
221 247 return true;
222 248 }
223 249  
224   -bool TrueMQTT::Client::Impl::Connection::send(Packet packet)
  250 +bool TrueMQTT::Client::Impl::Connection::send(Packet packet, bool has_priority)
225 251 {
226 252 // Push back if the internal queue gets too big.
227   - if (m_send_queue.size() > m_impl.m_send_queue_size)
  253 + if (!has_priority && m_send_queue.size() > m_impl.m_send_queue_size)
228 254 {
229 255 return false;
230 256 }
... ... @@ -254,7 +280,15 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet packet)
254 280 // Add the packet to the queue.
255 281 {
256 282 std::scoped_lock lock(m_send_queue_mutex);
257   - m_send_queue.push_back(std::move(packet));
  283 + if (has_priority)
  284 + {
  285 + m_send_queue.push_front(std::move(packet));
  286 + }
  287 + else
  288 + {
  289 + m_send_queue.push_back(std::move(packet));
  290 + }
  291 +
258 292 }
259 293 // Notify the write thread that there is a new packet.
260 294 m_send_queue_cv.notify_one();
... ... @@ -262,7 +296,7 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet packet)
262 296 return true;
263 297 }
264 298  
265   -void TrueMQTT::Client::Impl::Connection::sendPacket(Packet &packet) const
  299 +void TrueMQTT::Client::Impl::Connection::sendPacket(Packet &packet)
266 300 {
267 301 if (m_state != State::AUTHENTICATING && m_state != State::CONNECTED)
268 302 {
... ... @@ -290,6 +324,8 @@ void TrueMQTT::Client::Impl::Connection::sendPacket(Packet &amp;packet) const
290 324 }
291 325 packet.m_write_offset += res;
292 326 }
  327 +
  328 + m_last_sent_packet = std::chrono::steady_clock::now();
293 329 }
294 330  
295 331 bool TrueMQTT::Client::Impl::Connection::sendConnect()
... ... @@ -329,6 +365,15 @@ bool TrueMQTT::Client::Impl::Connection::sendConnect()
329 365 return send(std::move(packet));
330 366 }
331 367  
  368 +bool TrueMQTT::Client::Impl::Connection::sendPingRequest()
  369 +{
  370 + LOG_TRACE(&m_impl, "Sending PINGREQ packet");
  371 +
  372 + Packet packet(Packet::PacketType::PINGREQ, 0);
  373 +
  374 + return send(std::move(packet), true);
  375 +}
  376 +
332 377 bool TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain)
333 378 {
334 379 LOG_TRACE(this, "Sending PUBLISH packet to topic '" + topic + "': " + message + " (" + (retain ? "retained" : "not retained") + ")");
... ...