From d502382ea8bbb2fa230c8726a116ce05e5cb0084 Mon Sep 17 00:00:00 2001 From: Patric Stout Date: Sat, 17 Sep 2022 10:41:37 +0200 Subject: [PATCH] chore: rework that Connection can use Client::Impl --- src/Client.cpp | 47 ++++++++++++++++++++++++++++++----------------- src/ClientImpl.h | 14 +++----------- src/Connection.cpp | 74 ++++++++++++++++++++++++++++---------------------------------------------- src/Connection.h | 26 +++++++------------------- src/Log.h | 70 +++++++++++++++++++++++++++++++++++----------------------------------- src/Packet.cpp | 62 +++++++++++++++++++++++++++++++------------------------------- 6 files changed, 134 insertions(+), 159 deletions(-) diff --git a/src/Client.cpp b/src/Client.cpp index 72c7245..3bc3e3d 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -8,27 +8,40 @@ #include "TrueMQTT.h" #include "ClientImpl.h" +#include "Connection.h" #include "Log.h" #include -using TrueMQTT::Client; - -Client::Client(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) +TrueMQTT::Client::Client(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) { this->m_impl = std::make_unique(host, port, client_id, connection_timeout, connection_backoff_max, keep_alive_interval); LOG_TRACE(this->m_impl, "Constructor of client called"); } -Client::~Client() +TrueMQTT::Client::~Client() { LOG_TRACE(this->m_impl, "Destructor of client called"); this->disconnect(); } -void Client::setLogger(Client::LogLevel log_level, const std::function &logger) const +TrueMQTT::Client::Impl::Impl(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) + : host(host), + port(port), + client_id(client_id), + connection_timeout(connection_timeout), + connection_backoff_max(connection_backoff_max), + keep_alive_interval(keep_alive_interval) +{ +} + +TrueMQTT::Client::Impl::~Impl() +{ +} + +void TrueMQTT::Client::setLogger(Client::LogLevel log_level, const std::function &logger) const { LOG_TRACE(this->m_impl, "Setting logger to log level " + std::to_string(log_level)); @@ -38,7 +51,7 @@ void Client::setLogger(Client::LogLevel log_level, const std::functionm_impl, "Log level now on " + std::to_string(this->m_impl->log_level)); } -void Client::setLastWill(const std::string &topic, const std::string &payload, bool retain) const +void TrueMQTT::Client::setLastWill(const std::string &topic, const std::string &payload, bool retain) const { if (this->m_impl->state != Client::Impl::State::DISCONNECTED) { @@ -53,14 +66,14 @@ void Client::setLastWill(const std::string &topic, const std::string &payload, b this->m_impl->last_will_retain = retain; } -void Client::setErrorCallback(const std::function &callback) const +void TrueMQTT::Client::setErrorCallback(const std::function &callback) const { LOG_TRACE(this->m_impl, "Setting error callback"); this->m_impl->error_callback = callback; } -void Client::setPublishQueue(Client::PublishQueueType queue_type, size_t size) const +void TrueMQTT::Client::setPublishQueue(Client::PublishQueueType queue_type, size_t size) const { if (this->m_impl->state != Client::Impl::State::DISCONNECTED) { @@ -74,7 +87,7 @@ void Client::setPublishQueue(Client::PublishQueueType queue_type, size_t size) c this->m_impl->publish_queue_size = size; } -void Client::connect() const +void TrueMQTT::Client::connect() const { std::scoped_lock lock(this->m_impl->state_mutex); @@ -89,7 +102,7 @@ void Client::connect() const this->m_impl->connect(); } -void Client::disconnect() const +void TrueMQTT::Client::disconnect() const { std::scoped_lock lock(this->m_impl->state_mutex); @@ -105,7 +118,7 @@ void Client::disconnect() const this->m_impl->disconnect(); } -void Client::publish(const std::string &topic, const std::string &payload, bool retain) const +void TrueMQTT::Client::publish(const std::string &topic, const std::string &payload, bool retain) const { std::scoped_lock lock(this->m_impl->state_mutex); @@ -125,7 +138,7 @@ void Client::publish(const std::string &topic, const std::string &payload, bool } } -void Client::subscribe(const std::string &topic, const std::function &callback) const +void TrueMQTT::Client::subscribe(const std::string &topic, const std::function &callback) const { std::scoped_lock lock(this->m_impl->state_mutex); @@ -158,7 +171,7 @@ void Client::subscribe(const std::string &topic, const std::functionm_impl->state_mutex); @@ -216,7 +229,7 @@ void Client::unsubscribe(const std::string &topic) const } } -void Client::Impl::connectionStateChange(bool connected) +void TrueMQTT::Client::Impl::connectionStateChange(bool connected) { std::scoped_lock lock(this->state_mutex); @@ -253,7 +266,7 @@ void Client::Impl::connectionStateChange(bool connected) } } -void Client::Impl::toPublishQueue(const std::string &topic, const std::string &payload, bool retain) +void TrueMQTT::Client::Impl::toPublishQueue(const std::string &topic, const std::string &payload, bool retain) { if (this->state != Client::Impl::State::CONNECTING) { @@ -286,7 +299,7 @@ void Client::Impl::toPublishQueue(const std::string &topic, const std::string &p this->publish_queue.emplace_back(topic, payload, retain); } -void Client::Impl::findSubscriptionMatch(std::vector> &matching_callbacks, const std::map &subscriptions, std::deque &parts) +void TrueMQTT::Client::Impl::findSubscriptionMatch(std::vector> &matching_callbacks, const std::map &subscriptions, std::deque &parts) { // If we reached the end of the topic, do nothing anymore. if (parts.empty()) @@ -331,7 +344,7 @@ void Client::Impl::findSubscriptionMatch(std::vector #include #include @@ -23,15 +21,8 @@ class TrueMQTT::Client::Impl { public: - Impl(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval) - : host(host), - port(port), - client_id(client_id), - connection_timeout(connection_timeout), - connection_backoff_max(connection_backoff_max), - keep_alive_interval(keep_alive_interval) - { - } + Impl(const std::string &host, int port, const std::string &client_id, int connection_timeout, int connection_backoff_max, int keep_alive_interval); + ~Impl(); enum State { @@ -84,6 +75,7 @@ public: std::set subscription_topics; ///< Flat list of topics the client is subscribed to. std::map subscriptions; ///< Tree of active subscriptions build up from the parts on the topic. + class Connection; std::unique_ptr connection; ///< Connection to the broker. uint16_t packet_id = 0; ///< The next packet ID to use. Will overflow on 65535 to 0. }; diff --git a/src/Connection.cpp b/src/Connection.cpp index 07e236a..8553935 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -16,25 +16,13 @@ #include #include -Connection::Connection(TrueMQTT::Client::LogLevel log_level, - const std::function &logger, - const std::function &error_callback, - const std::function &publish_callback, - const std::function &connection_change_callback, - const std::string &host, - int port) - : log_level(log_level), - logger(std::move(logger)), - m_error_callback(std::move(error_callback)), - m_publish_callback(std::move(publish_callback)), - m_connection_change_callback(std::move(connection_change_callback)), - m_host(host), - m_port(port), +TrueMQTT::Client::Impl::Connection::Connection(Client::Impl &impl) + : m_impl(impl), m_thread(&Connection::run, this) { } -Connection::~Connection() +TrueMQTT::Client::Impl::Connection::~Connection() { m_state = State::STOP; @@ -53,7 +41,7 @@ Connection::~Connection() } } -std::string Connection::addrinfoToString(const addrinfo *address) const +std::string TrueMQTT::Client::Impl::Connection::addrinfoToString(const addrinfo *address) const { char host[NI_MAXHOST]; getnameinfo(address->ai_addr, address->ai_addrlen, host, NI_MAXHOST, nullptr, 0, NI_NUMERICHOST); @@ -61,7 +49,7 @@ std::string Connection::addrinfoToString(const addrinfo *address) const return std::string(host); } -void Connection::run() +void TrueMQTT::Client::Impl::Connection::run() { while (true) { @@ -79,7 +67,7 @@ void Connection::run() break; case State::BACKOFF: - LOG_WARNING(this, "Connection failed; will retry in NNN seconds"); + LOG_WARNING(&m_impl, "Connection failed; will retry in NNN seconds"); // TODO: use the configuration std::this_thread::sleep_for(std::chrono::seconds(5)); @@ -102,7 +90,7 @@ void Connection::run() m_socket = INVALID_SOCKET; } m_state = State::BACKOFF; - m_connection_change_callback(false); + m_impl.connectionStateChange(false); } break; } @@ -113,7 +101,7 @@ void Connection::run() } } -void Connection::resolve() +void TrueMQTT::Client::Impl::Connection::resolve() { m_address_current = 0; m_socket = INVALID_SOCKET; @@ -135,10 +123,10 @@ void Connection::resolve() // Request the OS to resolve the hostname into an IP address. // We do this even if the hostname is already an IP address, as that // makes for far easier code. - int error = getaddrinfo(m_host.c_str(), std::to_string(m_port).c_str(), &hints, &m_host_resolved); + int error = getaddrinfo(m_impl.host.c_str(), std::to_string(m_impl.port).c_str(), &hints, &m_host_resolved); if (error != 0) { - m_error_callback(TrueMQTT::Client::Error::HOSTNAME_LOOKUP_FAILED, std::string(gai_strerror(error))); + m_impl.error_callback(TrueMQTT::Client::Error::HOSTNAME_LOOKUP_FAILED, std::string(gai_strerror(error))); return; } @@ -181,12 +169,12 @@ void Connection::resolve() #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_DEBUG // For debugging, print the addresses we resolved into. - if (this->log_level >= TrueMQTT::Client::LogLevel::DEBUG) + if (m_impl.log_level >= TrueMQTT::Client::LogLevel::DEBUG) { - LOG_DEBUG(this, "Resolved hostname '" + m_host + "' to:"); + LOG_DEBUG(&m_impl, "Resolved hostname '" + m_impl.host + "' to:"); for (const addrinfo *res : m_addresses) { - LOG_DEBUG(this, "- " + addrinfoToString(res)); + LOG_DEBUG(&m_impl, "- " + addrinfoToString(res)); } } #endif @@ -194,7 +182,7 @@ void Connection::resolve() // In some odd cases, the list can be empty. This is a fatal error. if (m_addresses.empty()) { - m_error_callback(TrueMQTT::Client::Error::HOSTNAME_LOOKUP_FAILED, ""); + m_impl.error_callback(TrueMQTT::Client::Error::HOSTNAME_LOOKUP_FAILED, ""); return; } @@ -205,7 +193,7 @@ void Connection::resolve() } } -bool Connection::connectToAny() +bool TrueMQTT::Client::Impl::Connection::connectToAny() { // Check if we have pending attempts. If not, queue a new attempt. if (m_sockets.empty()) @@ -235,7 +223,7 @@ bool Connection::connectToAny() // Check if there was an error on select(). This is hard to recover from. if (result < 0) { - LOG_ERROR(this, "select() failed: " + std::string(strerror(errno))); + LOG_ERROR(&m_impl, "select() failed: " + std::string(strerror(errno))); return true; } @@ -261,7 +249,7 @@ bool Connection::connectToAny() return true; } - LOG_ERROR(this, "Connection attempt to broker timed out"); + LOG_ERROR(&m_impl, "Connection attempt to broker timed out"); // Cleanup all sockets. for (const auto &socket : m_sockets) @@ -287,7 +275,7 @@ bool Connection::connectToAny() if (err != 0) { // It is in error-state: report about it, and remove it. - LOG_ERROR(this, "Could not connect to " + addrinfoToString(m_socket_to_address[*socket_it]) + ": " + std::string(strerror(err))); + LOG_ERROR(&m_impl, "Could not connect to " + addrinfoToString(m_socket_to_address[*socket_it]) + ": " + std::string(strerror(err))); closesocket(*socket_it); m_socket_to_address.erase(*socket_it); socket_it = m_sockets.erase(socket_it); @@ -309,7 +297,7 @@ bool Connection::connectToAny() } // We have a connected socket. - LOG_DEBUG(this, "Connected to " + addrinfoToString(m_socket_to_address[socket_connected])); + LOG_DEBUG(&m_impl, "Connected to " + addrinfoToString(m_socket_to_address[socket_connected])); // Close all other pending connections. for (const auto &socket : m_sockets) @@ -326,7 +314,7 @@ bool Connection::connectToAny() int nonblocking = 0; if (ioctl(socket_connected, FIONBIO, &nonblocking) != 0) { - LOG_WARNING(this, "Could not set socket to non-blocking; expect performance impact"); + LOG_WARNING(&m_impl, "Could not set socket to non-blocking; expect performance impact"); } // Only change the state if no disconnect() has been requested in the mean time. @@ -339,7 +327,7 @@ bool Connection::connectToAny() return true; } -bool Connection::tryNextAddress() +bool TrueMQTT::Client::Impl::Connection::tryNextAddress() { if (m_address_current >= m_addresses.size()) { @@ -352,13 +340,13 @@ bool Connection::tryNextAddress() return true; } -void Connection::connect(addrinfo *address) +void TrueMQTT::Client::Impl::Connection::connect(addrinfo *address) { // Create a new socket based on the resolved information. SOCKET sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol); if (sock == INVALID_SOCKET) { - LOG_ERROR(this, "Could not create new socket"); + LOG_ERROR(&m_impl, "Could not create new socket"); return; } @@ -367,18 +355,18 @@ void Connection::connect(addrinfo *address) /* The (const char*) cast is needed for Windows */ if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags)) != 0) { - LOG_WARNING(this, "Could not set TCP_NODELAY on socket"); + LOG_WARNING(&m_impl, "Could not set TCP_NODELAY on socket"); } // Set socket to non-blocking; this allows for multiple connects to be pending. This is // needed to apply Happy Eyeballs. int nonblocking = 1; if (ioctl(sock, FIONBIO, &nonblocking) != 0) { - LOG_WARNING(this, "Could not set socket to non-blocking; expect performance impact"); + LOG_WARNING(&m_impl, "Could not set socket to non-blocking; expect performance impact"); } // Start the actual connection attempt. - LOG_DEBUG(this, "Connecting to " + addrinfoToString(address)); + LOG_DEBUG(&m_impl, "Connecting to " + addrinfoToString(address)); int err = ::connect(sock, address->ai_addr, (int)address->ai_addrlen); if (err != 0 && errno != EINPROGRESS) { @@ -386,7 +374,7 @@ void Connection::connect(addrinfo *address) // else, something is wrong. Report the error and close the socket. closesocket(sock); - LOG_ERROR(this, "Could not connect to " + addrinfoToString(address) + ": " + std::string(strerror(errno))); + LOG_ERROR(&m_impl, "Could not connect to " + addrinfoToString(address) + ": " + std::string(strerror(errno))); return; } @@ -397,13 +385,7 @@ void Connection::connect(addrinfo *address) void TrueMQTT::Client::Impl::connect() { - this->connection = std::make_unique( - this->log_level, this->logger, this->error_callback, - [this](std::string topic, std::string payload) - { this->messageReceived(std::move(topic), std::move(payload)); }, - [this](bool connected) - { this->connectionStateChange(connected); }, - this->host, this->port); + this->connection = std::make_unique(*this); } void TrueMQTT::Client::Impl::disconnect() diff --git a/src/Connection.h b/src/Connection.h index 0ccc2f1..445eadd 100644 --- a/src/Connection.h +++ b/src/Connection.h @@ -7,7 +7,7 @@ #pragma once -#include "TrueMQTT.h" +#include "ClientImpl.h" #include #include @@ -20,19 +20,15 @@ #define INVALID_SOCKET -1 #define closesocket close -class Connection +class Packet; + +class TrueMQTT::Client::Impl::Connection { public: - Connection(TrueMQTT::Client::LogLevel log_level, - const std::function &logger, - const std::function &error_callback, - const std::function &publish_callback, - const std::function &connection_change_callback, - const std::string &host, - int port); + Connection(TrueMQTT::Client::Impl &impl); ~Connection(); - void send(class Packet &packet) const; + void send(Packet &packet) const; private: // Implemented in Connection.cpp @@ -58,15 +54,7 @@ private: STOP, }; - TrueMQTT::Client::LogLevel log_level; - const std::function logger; - - const std::function m_error_callback; - const std::function m_publish_callback; - const std::function m_connection_change_callback; - - const std::string m_host; ///< The hostname or IP address to connect to. - int m_port; ///< The port to connect to. + TrueMQTT::Client::Impl &m_impl; State m_state = State::RESOLVING; std::thread m_thread; ///< Current thread used to run this connection. diff --git a/src/Log.h b/src/Log.h index 12bfc3a..4e12ec1 100644 --- a/src/Log.h +++ b/src/Log.h @@ -22,65 +22,65 @@ #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_ERROR -#define LOG_ERROR(obj, x) \ - do \ - { \ - if (obj->log_level >= TrueMQTT::Client::LogLevel::ERROR) \ - { \ - obj->logger(TrueMQTT::Client::LogLevel::ERROR, x); \ - } \ +#define LOG_ERROR(obj, x) \ + do \ + { \ + if ((obj)->log_level >= TrueMQTT::Client::LogLevel::ERROR) \ + { \ + (obj)->logger(TrueMQTT::Client::LogLevel::ERROR, x); \ + } \ } while (0) #else #define LOG_ERROR(obj, x) #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_WARNING -#define LOG_WARNING(obj, x) \ - do \ - { \ - if (obj->log_level >= TrueMQTT::Client::LogLevel::WARNING) \ - { \ - obj->logger(TrueMQTT::Client::LogLevel::WARNING, x); \ - } \ +#define LOG_WARNING(obj, x) \ + do \ + { \ + if ((obj)->log_level >= TrueMQTT::Client::LogLevel::WARNING) \ + { \ + (obj)->logger(TrueMQTT::Client::LogLevel::WARNING, x); \ + } \ } while (0) #else #define LOG_WARNING(obj, x) #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_INFO -#define LOG_INFO(obj, x) \ - do \ - { \ - if (obj->log_level >= TrueMQTT::Client::LogLevel::INFO) \ - { \ - obj->logger(TrueMQTT::Client::LogLevel::INFO, x); \ - } \ +#define LOG_INFO(obj, x) \ + do \ + { \ + if ((obj)->log_level >= TrueMQTT::Client::LogLevel::INFO) \ + { \ + (obj)->logger(TrueMQTT::Client::LogLevel::INFO, x); \ + } \ } while (0) #else #define LOG_INFO(obj, x) #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_DEBUG -#define LOG_DEBUG(obj, x) \ - do \ - { \ - if (obj->log_level >= TrueMQTT::Client::LogLevel::DEBUG) \ - { \ - obj->logger(TrueMQTT::Client::LogLevel::DEBUG, x); \ - } \ +#define LOG_DEBUG(obj, x) \ + do \ + { \ + if ((obj)->log_level >= TrueMQTT::Client::LogLevel::DEBUG) \ + { \ + (obj)->logger(TrueMQTT::Client::LogLevel::DEBUG, x); \ + } \ } while (0) #else #define LOG_DEBUG(obj, x) #endif #if MIN_LOGGER_LEVEL >= LOGGER_LEVEL_TRACE -#define LOG_TRACE(obj, x) \ - do \ - { \ - if (obj->log_level >= TrueMQTT::Client::LogLevel::TRACE) \ - { \ - obj->logger(TrueMQTT::Client::LogLevel::TRACE, x); \ - } \ +#define LOG_TRACE(obj, x) \ + do \ + { \ + if ((obj)->log_level >= TrueMQTT::Client::LogLevel::TRACE) \ + { \ + (obj)->logger(TrueMQTT::Client::LogLevel::TRACE, x); \ + } \ } while (0) #else #define LOG_TRACE(obj, x) diff --git a/src/Packet.cpp b/src/Packet.cpp index 5469415..44b98aa 100644 --- a/src/Packet.cpp +++ b/src/Packet.cpp @@ -121,7 +121,7 @@ public: uint8_t m_flags; }; -ssize_t Connection::recv(char *buffer, size_t length) const +ssize_t TrueMQTT::Client::Impl::Connection::recv(char *buffer, size_t length) const { // We idle-check every 100ms if we are requested to stop, as otherwise // this thread will block till the server disconnects us. @@ -142,27 +142,27 @@ ssize_t Connection::recv(char *buffer, size_t length) const } if (m_state == State::STOP) { - LOG_TRACE(this, "Closing connection as STOP has been requested"); + LOG_TRACE(&m_impl, "Closing connection as STOP has been requested"); return -1; } ssize_t res = ::recv(m_socket, buffer, length, 0); if (res == 0) { - LOG_INFO(this, "Connection closed by peer"); + LOG_INFO(&m_impl, "Connection closed by peer"); return -1; } if (res < 0) { - LOG_WARNING(this, "Connection read error: " + std::string(strerror(errno))); + LOG_WARNING(&m_impl, "Connection read error: " + std::string(strerror(errno))); return -1; } - LOG_TRACE(this, "Received " + std::to_string(res) + " bytes"); + LOG_TRACE(&m_impl, "Received " + std::to_string(res) + " bytes"); return res; } -bool Connection::recvLoop() +bool TrueMQTT::Client::Impl::Connection::recvLoop() { uint8_t buffer; @@ -177,7 +177,7 @@ bool Connection::recvLoop() if (packet_type_raw < 1 || packet_type_raw > 14) { - LOG_ERROR(this, "Received invalid packet type (" + std::to_string(packet_type_raw) + ") from broker, closing connection"); + LOG_ERROR(&m_impl, "Received invalid packet type (" + std::to_string(packet_type_raw) + ") from broker, closing connection"); return false; } @@ -203,11 +203,11 @@ bool Connection::recvLoop() } if ((buffer & 0x80) != 0) { - LOG_ERROR(this, "Malformed packet length received, closing connection"); + LOG_ERROR(&m_impl, "Malformed packet length received, closing connection"); return false; } - LOG_TRACE(this, "Received packet of type " + std::string(magic_enum::enum_name(packet_type)) + " with flags " + std::to_string(flags) + " and length " + std::to_string(remaining_length)); + LOG_TRACE(&m_impl, "Received packet of type " + std::string(magic_enum::enum_name(packet_type)) + " with flags " + std::to_string(flags) + " and length " + std::to_string(remaining_length)); // Read the rest of the packet. std::vector data; @@ -235,25 +235,25 @@ bool Connection::recvLoop() if (!packet.read_uint8(acknowledge)) { - LOG_ERROR(this, "Malformed packet received, closing connection"); + LOG_ERROR(&m_impl, "Malformed packet received, closing connection"); return false; } if (!packet.read_uint8(return_code)) { - LOG_ERROR(this, "Malformed packet received, closing connection"); + LOG_ERROR(&m_impl, "Malformed packet received, closing connection"); return false; } - LOG_DEBUG(this, "Received CONNACK with acknowledge " + std::to_string(acknowledge) + " and return code " + std::to_string(return_code)); + LOG_DEBUG(&m_impl, "Received CONNACK with acknowledge " + std::to_string(acknowledge) + " and return code " + std::to_string(return_code)); if (return_code != 0) { - LOG_ERROR(this, "Broker actively refused our connection"); + LOG_ERROR(&m_impl, "Broker actively refused our connection"); return false; } m_state = State::CONNECTED; - m_connection_change_callback(true); + m_impl.connectionStateChange(true); break; } case Packet::PacketType::PUBLISH: @@ -261,16 +261,16 @@ bool Connection::recvLoop() std::string topic; if (!packet.read_string(topic)) { - LOG_ERROR(this, "Malformed packet received, closing connection"); + LOG_ERROR(&m_impl, "Malformed packet received, closing connection"); return false; } std::string payload; packet.read_remaining(payload); - LOG_DEBUG(this, "Received PUBLISH with topic " + topic + ": " + payload); + LOG_DEBUG(&m_impl, "Received PUBLISH with topic " + topic + ": " + payload); - m_publish_callback(std::move(topic), std::move(payload)); + m_impl.messageReceived(std::move(topic), std::move(payload)); break; } case Packet::PacketType::SUBACK: @@ -280,22 +280,22 @@ bool Connection::recvLoop() if (!packet.read_uint16(packet_id)) { - LOG_ERROR(this, "Malformed packet received, closing connection"); + LOG_ERROR(&m_impl, "Malformed packet received, closing connection"); return false; } if (!packet.read_uint8(return_code)) { - LOG_ERROR(this, "Malformed packet received, closing connection"); + LOG_ERROR(&m_impl, "Malformed packet received, closing connection"); return false; } - LOG_DEBUG(this, "Received SUBACK with packet id " + std::to_string(packet_id) + " and return code " + std::to_string(return_code)); + LOG_DEBUG(&m_impl, "Received SUBACK with packet id " + std::to_string(packet_id) + " and return code " + std::to_string(return_code)); if (return_code > 2) { - LOG_WARNING(this, "Broker refused our subscription"); + LOG_WARNING(&m_impl, "Broker refused our subscription"); // TODO -- Keep track of the topic per ticket - m_error_callback(TrueMQTT::Client::Error::SUBSCRIBE_FAILED, ""); + m_impl.error_callback(TrueMQTT::Client::Error::SUBSCRIBE_FAILED, ""); } break; @@ -306,25 +306,25 @@ bool Connection::recvLoop() if (!packet.read_uint16(packet_id)) { - LOG_ERROR(this, "Malformed packet received, closing connection"); + LOG_ERROR(&m_impl, "Malformed packet received, closing connection"); return false; } - LOG_DEBUG(this, "Received UNSUBACK with packet id " + std::to_string(packet_id)); + LOG_DEBUG(&m_impl, "Received UNSUBACK with packet id " + std::to_string(packet_id)); break; } default: - LOG_ERROR(this, "Received unexpected packet type " + std::string(magic_enum::enum_name(packet_type)) + " from broker, closing connection"); + LOG_ERROR(&m_impl, "Received unexpected packet type " + std::string(magic_enum::enum_name(packet_type)) + " from broker, closing connection"); return false; } return true; } -void Connection::send(Packet &packet) const +void TrueMQTT::Client::Impl::Connection::send(Packet &packet) const { - LOG_TRACE(this, "Sending packet of type " + std::string(magic_enum::enum_name(packet.m_packet_type)) + " with flags " + std::to_string(packet.m_flags) + " and length " + std::to_string(packet.m_buffer.size())); + LOG_TRACE(&m_impl, "Sending packet of type " + std::string(magic_enum::enum_name(packet.m_packet_type)) + " with flags " + std::to_string(packet.m_flags) + " and length " + std::to_string(packet.m_buffer.size())); std::vector buffer; @@ -347,19 +347,19 @@ void Connection::send(Packet &packet) const // Write header and packet. if (::send(m_socket, (char *)buffer.data(), buffer.size(), MSG_NOSIGNAL) < 0) { - LOG_WARNING(this, "Connection write error: " + std::string(strerror(errno))); + LOG_WARNING(&m_impl, "Connection write error: " + std::string(strerror(errno))); return; } if (::send(m_socket, (char *)packet.m_buffer.data(), packet.m_buffer.size(), MSG_NOSIGNAL) < 0) { - LOG_WARNING(this, "Connection write error: " + std::string(strerror(errno))); + LOG_WARNING(&m_impl, "Connection write error: " + std::string(strerror(errno))); return; } } -void Connection::sendConnect() +void TrueMQTT::Client::Impl::Connection::sendConnect() { - LOG_TRACE(this, "Sending CONNECT packet"); + LOG_TRACE(&m_impl, "Sending CONNECT packet"); static std::string protocol_name("MQTT"); static std::string client_id("ClientID"); -- libgit2 0.21.4