From 2eed0cfe0e2b0c73567b158ec612c5b34c844cb0 Mon Sep 17 00:00:00 2001 From: Patric Stout Date: Sun, 25 Sep 2022 16:08:54 +0200 Subject: [PATCH] feat(keep-alive): send keep-alive every interval --- src/Connection.cpp | 2 ++ src/Connection.h | 10 +++++++--- src/Packet.cpp | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/src/Connection.cpp b/src/Connection.cpp index b44b7b1..4abc4d2 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -396,6 +396,8 @@ bool TrueMQTT::Client::Impl::Connection::connectToAny() } m_socket = socket_connected; + m_last_sent_packet = std::chrono::steady_clock::now(); + m_last_received_packet = std::chrono::steady_clock::now(); // Only change the state if no disconnect() has been requested in the mean time. if (m_state != State::STOP) diff --git a/src/Connection.h b/src/Connection.h index a500a84..a89985c 100644 --- a/src/Connection.h +++ b/src/Connection.h @@ -32,7 +32,7 @@ public: Connection(TrueMQTT::Client::Impl &impl); ~Connection(); - bool send(Packet packet); + bool send(Packet packet, bool has_priority = false); void socketError(); private: @@ -47,10 +47,11 @@ private: std::optional popSendQueueBlocking(); // Implemented in Packet.cpp - ssize_t recv(char *buffer, size_t length) const; + ssize_t recv(char *buffer, size_t length); bool recvLoop(); bool sendConnect(); - void sendPacket(Packet &packet) const; + bool sendPingRequest(); + void sendPacket(Packet &packet); enum class State { @@ -85,4 +86,7 @@ private: std::deque m_send_queue = {}; ///< Queue of packets to send to the broker. std::mutex m_send_queue_mutex; ///< Mutex to protect the send queue. std::condition_variable m_send_queue_cv; ///< Condition variable to wake up the write thread when the send queue is not empty. + + std::chrono::steady_clock::time_point m_last_sent_packet = {}; ///< Time of the last packet sent to the broker. + std::chrono::steady_clock::time_point m_last_received_packet = {}; ///< Time of the last packet received from the broker. }; diff --git a/src/Packet.cpp b/src/Packet.cpp index 0cd8fcb..ed85892 100644 --- a/src/Packet.cpp +++ b/src/Packet.cpp @@ -14,7 +14,7 @@ #include -ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) const +ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) { // We idle-check every 100ms if we are requested to stop or if there was // an error. This is to prevent the recv() call from blocking forever. @@ -38,6 +38,26 @@ ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) co return -1; } + auto now = std::chrono::steady_clock::now(); + + // Send a keep-alive to the broker if we haven't sent anything for a while. + if (m_last_sent_packet + m_impl.m_keep_alive_interval < now) + { + // Force updating the time, as it might be a while before the ping actually leaves + // the send queue. And we don't need to send more than one for the whole interval. + m_last_sent_packet = std::chrono::steady_clock::now(); + sendPingRequest(); + } + + // As the broker should send a ping every keep-alive interval, we really should + // see something no later than every 1.5 times the keep-alive interval. If not, + // we assume the connection is dead. + if (m_last_received_packet + m_impl.m_keep_alive_interval * 15 / 10 < now) + { + LOG_ERROR(&m_impl, "Connection timed out"); + return -1; + } + if (ret == 0) { continue; @@ -213,18 +233,24 @@ bool TrueMQTT::Client::Impl::Connection::recvLoop() break; } + case Packet::PacketType::PINGRESP: + { + LOG_DEBUG(&m_impl, "Received PINGRESP"); + break; + } default: LOG_ERROR(&m_impl, "Received unexpected packet type " + std::string(magic_enum::enum_name(packet_type)) + " from broker, closing connection"); return false; } + m_last_received_packet = std::chrono::steady_clock::now(); return true; } -bool TrueMQTT::Client::Impl::Connection::send(Packet packet) +bool TrueMQTT::Client::Impl::Connection::send(Packet packet, bool has_priority) { // Push back if the internal queue gets too big. - if (m_send_queue.size() > m_impl.m_send_queue_size) + if (!has_priority && m_send_queue.size() > m_impl.m_send_queue_size) { return false; } @@ -254,7 +280,15 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet packet) // Add the packet to the queue. { std::scoped_lock lock(m_send_queue_mutex); - m_send_queue.push_back(std::move(packet)); + if (has_priority) + { + m_send_queue.push_front(std::move(packet)); + } + else + { + m_send_queue.push_back(std::move(packet)); + } + } // Notify the write thread that there is a new packet. m_send_queue_cv.notify_one(); @@ -262,7 +296,7 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet packet) return true; } -void TrueMQTT::Client::Impl::Connection::sendPacket(Packet &packet) const +void TrueMQTT::Client::Impl::Connection::sendPacket(Packet &packet) { if (m_state != State::AUTHENTICATING && m_state != State::CONNECTED) { @@ -290,6 +324,8 @@ void TrueMQTT::Client::Impl::Connection::sendPacket(Packet &packet) const } packet.m_write_offset += res; } + + m_last_sent_packet = std::chrono::steady_clock::now(); } bool TrueMQTT::Client::Impl::Connection::sendConnect() @@ -329,6 +365,15 @@ bool TrueMQTT::Client::Impl::Connection::sendConnect() return send(std::move(packet)); } +bool TrueMQTT::Client::Impl::Connection::sendPingRequest() +{ + LOG_TRACE(&m_impl, "Sending PINGREQ packet"); + + Packet packet(Packet::PacketType::PINGREQ, 0); + + return send(std::move(packet), true); +} + bool TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain) { LOG_TRACE(this, "Sending PUBLISH packet to topic '" + topic + "': " + message + " (" + (retain ? "retained" : "not retained") + ")"); -- libgit2 0.21.4