From 6736e5e07e75823186de581eea5fd0c3e4df7cbb Mon Sep 17 00:00:00 2001 From: Patric Stout Date: Sun, 16 Oct 2022 14:05:25 +0200 Subject: [PATCH] feat: reduce memory footprint for subscriptions significantly --- README.md | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- example/pubsub/main.cpp | 16 ++++++++-------- example/stress/main.cpp | 8 ++++---- include/TrueMQTT.h | 20 ++++++++++++-------- src/Client.cpp | 211 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------------- src/ClientImpl.h | 36 ++++++++++++++++++------------------ src/Packet.cpp | 23 +++++++++++------------ src/Packet.h | 15 +++++++-------- 8 files changed, 245 insertions(+), 140 deletions(-) diff --git a/README.md b/README.md index a0ebbc1..216ba9d 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,67 @@ # TrueMQTT - A modern C++ MQTT Client library This project is currently a Work In Progress. -Although the basics are functional, it is untested. + +All basic functionality is in there, but it is lacking some QoL functionalities, and it has not really been hardened / battle-tested yet. ## Development ```bash mkdir build cd build -cmake .. -DBUILD_SHARED_LIBS=ON -DMIN_LOGGER_LEVEL=TRACE +cmake .. -DBUILD_SHARED_LIBS=ON -DMIN_LOGGER_LEVEL=INFO make -j$(nproc) example/pubsub/truemqtt_pubsub ``` + +## Design choices + +### MQTT v3 only + +Although this is a contested choice, for now the library only supports MQTT v3. +There is added value in MQTT v5, but it comes with extra overhead, both in performance and memory. + +This library aims to supply an interface for the more common way of using MQTT, which is a simple publish / subscribe interface. + +In the future this might change, because, as said, MQTT v5 has solid additions, that might be worth delving in it. + +### Copy-once + +A packet that is received from a broker, is only copied once in memory (from `recv()` to an internal buffer). +All subscription callbacks get a `std::string_view` which is directly in this buffer. + +This way, the library only needs to allocate memory once, heavily reducing the memory footprint. +This also means the library can handle big payloads without issue. + +For publishing a similar approach is taken, and the topic / message is only copied once in an internal buffer. +The only exception here is when the client isn't connected to the broker (yet). +In this scenario, a copy of topic / message is made, and there will be two allocations for both, instead of one. + +Either way, this makes this library highly efficient in terms of memory usage. + +The drawback is that you have to be careful with your callbacks. +You always receive a `std::string_view`, that is only valid within that callback. +As soon as the callback returns, the memory becomes invalid. + +This means that if you need to keep the topic and/or message around, you need to make a copy. + +### QoS 0 + +This library only supports QoS 0. +This is mainly because that is the only QoS I have ever used since using MQTT. + +The problem with other QoSes is that is is mostly pointless up till the point it is useful. +MQTT uses TCP, and as such, delivery over the socket is guaranteed if both sides are still alive. +In other words, QoS 1 doesn't add any guarantees in the normal situation, where lines / brokers aren't saturated. +When it does get saturated, QoS 1 becomes useful. + +But, there is a trade-off here. +If you always do QoS 1 for the cases where the line does get saturated, you put more pressure on the line in all cases, which results in a line that is saturated more quickly. +And in reality, it is very hard to recover from such scenarios anyway. + +MQTT 5 corrects this situation, by a bit of a cheat. +If you publish with QoS 1, but the TCP connection was working as expected, it in fact handles it as a QoS 0 request. + +For this reason, this library only supports QoS 0. +As added benefit, it makes for easier code, which is less like to have bugs / problems. diff --git a/example/pubsub/main.cpp b/example/pubsub/main.cpp index 388e3ff..c33fcd8 100644 --- a/example/pubsub/main.cpp +++ b/example/pubsub/main.cpp @@ -14,10 +14,10 @@ int main() // Create a connection to the local broker. TrueMQTT::Client client("localhost", 1883, "test"); - client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string message) + client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string_view message) { std::cout << "Log " << level << ": " << message << std::endl; }); client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 10); - client.setErrorCallback([](TrueMQTT::Client::Error error, std::string message) + client.setErrorCallback([](TrueMQTT::Client::Error error, std::string_view message) { std::cout << "Error " << error << ": " << message << std::endl; }); client.setLastWill("example/pubsub/lastwill", "example pubsub finished", true); @@ -26,23 +26,23 @@ int main() int stop = 0; // Subscribe to the topic we will be publishing under in a bit. - client.subscribe("example/pubsub/test/subtest", [&stop](const std::string topic, const std::string payload) + client.subscribe("example/pubsub/test/subtest", [&stop](const std::string_view topic, const std::string_view payload) { std::cout << "Received message on exact topic " << topic << ": " << payload << std::endl; stop++; }); - client.subscribe("example/pubsub/test/subtest", [&stop](const std::string topic, const std::string payload) + client.subscribe("example/pubsub/test/subtest", [&stop](const std::string_view topic, const std::string_view payload) { - std::cout << "Received message on exact topic " << topic << ": " << payload << std::endl; + std::cout << "Received message on exact topic " << topic << " again: " << payload << std::endl; stop++; }); - client.subscribe("example/pubsub/+/subtest", [&stop](const std::string topic, const std::string payload) + client.subscribe("example/pubsub/+/subtest", [&stop](const std::string_view topic, const std::string_view payload) { std::cout << "Received message on single wildcard topic " << topic << ": " << payload << std::endl; stop++; }); - client.subscribe("example/pubsub/test/#", [&stop](const std::string topic, const std::string payload) + client.subscribe("example/pubsub/test/#", [&stop](const std::string_view topic, const std::string_view payload) { std::cout << "Received message on multi wildcard topic " << topic << ": " << payload << std::endl; stop++; }); - client.subscribe("example/pubsub/test/+", [&stop](const std::string topic, const std::string payload) + client.subscribe("example/pubsub/test/+", [&stop](const std::string_view topic, const std::string_view payload) { /* Never actually called, as we unsubscribe a bit later */ }); diff --git a/example/stress/main.cpp b/example/stress/main.cpp index 2adc45b..aa071fd 100644 --- a/example/stress/main.cpp +++ b/example/stress/main.cpp @@ -14,10 +14,10 @@ int main() // Create a connection to the local broker. TrueMQTT::Client client("localhost", 1883, "test"); - client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string message) + client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string_view message) { std::cout << "Log " << level << ": " << message << std::endl; }); client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 100); - client.setErrorCallback([](TrueMQTT::Client::Error error, std::string message) + client.setErrorCallback([](TrueMQTT::Client::Error error, std::string_view message) { std::cout << "Error " << error << ": " << message << std::endl; }); client.setLastWill("test/lastwill", "example pubsub finished", true); @@ -30,11 +30,11 @@ int main() int64_t totalLatency = 0; // Subscribe to the topic we are going to stress test. - client.subscribe("example/stress/+", [&received, &totalLatency](const std::string topic, const std::string payload) + client.subscribe("example/stress/+", [&received, &totalLatency](const std::string_view topic, const std::string_view payload) { // Calculate the latency. auto now = std::chrono::steady_clock::now(); - auto then = std::chrono::time_point(std::chrono::microseconds(std::stoll(payload))); + auto then = std::chrono::time_point(std::chrono::microseconds(std::stoll(std::string(payload)))); auto latency = std::chrono::duration_cast(now - then).count(); totalLatency += latency; diff --git a/include/TrueMQTT.h b/include/TrueMQTT.h index 98e2b23..0fbaf68 100644 --- a/include/TrueMQTT.h +++ b/include/TrueMQTT.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace TrueMQTT { @@ -116,9 +117,9 @@ namespace TrueMQTT * @param connection_backoff_max Maximum time between backoff attempts in seconds. * @param keep_alive_interval Interval in seconds between keep-alive messages. */ - Client(const std::string &host, + Client(const std::string_view host, int port, - const std::string &client_id, + const std::string_view client_id, std::chrono::milliseconds connection_timeout = std::chrono::milliseconds(5000), std::chrono::milliseconds connection_backoff = std::chrono::milliseconds(1000), std::chrono::milliseconds connection_backoff_max = std::chrono::milliseconds(30000), @@ -142,7 +143,7 @@ namespace TrueMQTT * @note This library doesn't contain a logger, so you need to provide one. * If this method is not called, no logging will be done. */ - void setLogger(LogLevel log_level, const std::function &logger) const; + void setLogger(LogLevel log_level, const std::function &logger) const; /** * @brief Set the last will message on the connection. @@ -155,14 +156,14 @@ namespace TrueMQTT * * @note Cannot be called after \ref connect. */ - void setLastWill(const std::string &topic, const std::string &message, bool retain) const; + void setLastWill(const std::string_view topic, const std::string_view message, bool retain) const; /** * @brief Set the error callback, called when any error occurs. * * @param callback The callback to call when an error occurs. */ - void setErrorCallback(const std::function &callback) const; + void setErrorCallback(const std::function &callback) const; /** * @brief Set the publish queue to use. @@ -247,7 +248,7 @@ namespace TrueMQTT * in this case, but it is wise to back off for a while before sending something * again. */ - bool publish(const std::string &topic, const std::string &message, bool retain) const; + bool publish(const std::string_view topic, const std::string_view message, bool retain) const; /** * @brief Subscribe to a topic, and call the callback function when a message arrives. @@ -261,6 +262,9 @@ namespace TrueMQTT * @param topic The topic to subscribe to. * @param callback The callback to call when a message arrives on this topic. * + * @note The callback receives a string_view for topic/message, which is only valid + * for the duration of the callback. If you need to retain the value of longer, + * make sure to copy the content. * @note Subscription can overlap, even on the exact same topic. All callbacks that * match the topic will be called. * @note Depending on the broker, overlapping subscriptions can trigger one or more @@ -279,7 +283,7 @@ namespace TrueMQTT * moment the connection to the broker is established, and there are messages in the * publish queue and/or subscriptions. */ - void subscribe(const std::string &topic, const std::function &callback) const; + void subscribe(const std::string_view topic, const std::function &callback) const; /** * @brief Unsubscribe from a topic. @@ -295,7 +299,7 @@ namespace TrueMQTT * moment the connection to the broker is established, and there are messages in the * publish queue and/or subscriptions. */ - void unsubscribe(const std::string &topic) const; + void unsubscribe(const std::string_view topic) const; private: // Private implementation diff --git a/src/Client.cpp b/src/Client.cpp index 4d1f183..9b4601b 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -13,9 +13,9 @@ #include -TrueMQTT::Client::Client(const std::string &host, +TrueMQTT::Client::Client(const std::string_view host, int port, - const std::string &client_id, + const std::string_view client_id, std::chrono::milliseconds connection_timeout, std::chrono::milliseconds connection_backoff, std::chrono::milliseconds connection_backoff_max, @@ -33,9 +33,9 @@ TrueMQTT::Client::~Client() disconnect(); } -TrueMQTT::Client::Impl::Impl(const std::string &host, +TrueMQTT::Client::Impl::Impl(const std::string_view host, int port, - const std::string &client_id, + const std::string_view client_id, std::chrono::milliseconds connection_timeout, std::chrono::milliseconds connection_backoff, std::chrono::milliseconds connection_backoff_max, @@ -54,7 +54,7 @@ TrueMQTT::Client::Impl::~Impl() { } -void TrueMQTT::Client::setLogger(Client::LogLevel log_level, const std::function &logger) const +void TrueMQTT::Client::setLogger(Client::LogLevel log_level, const std::function &logger) const { LOG_TRACE(m_impl, "Setting logger to log level " + std::to_string(log_level)); @@ -64,7 +64,7 @@ void TrueMQTT::Client::setLogger(Client::LogLevel log_level, const std::function LOG_DEBUG(m_impl, "Log level now on " + std::to_string(m_impl->m_log_level)); } -void TrueMQTT::Client::setLastWill(const std::string &topic, const std::string &message, bool retain) const +void TrueMQTT::Client::setLastWill(const std::string_view topic, const std::string_view message, bool retain) const { if (m_impl->m_state != Client::Impl::State::DISCONNECTED) { @@ -72,14 +72,14 @@ void TrueMQTT::Client::setLastWill(const std::string &topic, const std::string & return; } - LOG_TRACE(m_impl, "Setting last will to topic " + topic + " with message " + message + " and retain " + std::to_string(retain)); + LOG_TRACE(m_impl, "Setting last will to topic " + std::string(topic) + " with message " + std::string(message) + " and retain " + std::to_string(retain)); m_impl->m_last_will_topic = topic; m_impl->m_last_will_message = message; m_impl->m_last_will_retain = retain; } -void TrueMQTT::Client::setErrorCallback(const std::function &callback) const +void TrueMQTT::Client::setErrorCallback(const std::function &callback) const { LOG_TRACE(m_impl, "Setting error callback"); @@ -144,11 +144,11 @@ void TrueMQTT::Client::disconnect() const m_impl->disconnect(); } -bool TrueMQTT::Client::publish(const std::string &topic, const std::string &message, bool retain) const +bool TrueMQTT::Client::publish(const std::string_view topic, const std::string_view message, bool retain) const { std::scoped_lock lock(m_impl->m_state_mutex); - LOG_DEBUG(m_impl, "Publishing message on topic '" + topic + "': " + message + " (" + (retain ? "retained" : "not retained") + ")"); + LOG_DEBUG(m_impl, "Publishing message on topic '" + std::string(topic) + "': " + std::string(message) + " (" + (retain ? "retained" : "not retained") + ")"); switch (m_impl->m_state) { @@ -164,7 +164,7 @@ bool TrueMQTT::Client::publish(const std::string &topic, const std::string &mess return false; } -void TrueMQTT::Client::subscribe(const std::string &topic, const std::function &callback) const +void TrueMQTT::Client::subscribe(const std::string_view topic, const std::function &callback) const { std::scoped_lock lock(m_impl->m_state_mutex); @@ -174,23 +174,44 @@ void TrueMQTT::Client::subscribe(const std::string &topic, const std::functionm_subscriptions.try_emplace(part).first->second; - while (std::getline(stopic, part, '/')) + // Find where in the tree the callback for this subscription should be added. + Client::Impl::SubscriptionPart *subscriptions = nullptr; + std::string_view topic_search = topic; + while (true) { - subscriptions = &subscriptions->children.try_emplace(part).first->second; + std::string_view part = topic_search; + + // Find the next part of the topic. + auto pos = topic_search.find('/'); + if (pos != std::string_view::npos) + { + part = topic_search.substr(0, pos); + topic_search.remove_prefix(pos + 1); + } + + // Find the next subscription in the tree. + if (subscriptions == nullptr) + { + subscriptions = &m_impl->m_subscriptions.try_emplace(std::string(part)).first->second; + } + else + { + subscriptions = &subscriptions->children.try_emplace(std::string(part)).first->second; + } + + // If this was the last element, we're done. + if (pos == std::string_view::npos) + { + break; + } } + // Add the callback to the leaf node. subscriptions->callbacks.push_back(callback); - m_impl->m_subscription_topics.insert(topic); + m_impl->m_subscription_topics.insert(std::string(topic)); if (m_impl->m_state == Client::Impl::State::CONNECTED) { if (!m_impl->sendSubscribe(topic)) @@ -202,7 +223,7 @@ void TrueMQTT::Client::subscribe(const std::string &topic, const std::functionm_state_mutex); @@ -212,34 +233,63 @@ void TrueMQTT::Client::unsubscribe(const std::string &topic) const return; } - LOG_DEBUG(m_impl, "Unsubscribing from topic '" + topic + "'"); + if (m_impl->m_subscription_topics.find(topic) == m_impl->m_subscription_topics.end()) + { + LOG_ERROR(m_impl, "Cannot unsubscribe from topic '" + std::string(topic) + "' because we are not subscribed to it"); + return; + } + + LOG_DEBUG(m_impl, "Unsubscribing from topic '" + std::string(topic) + "'"); - // Split the topic on /, to find each part. - std::string part; - std::stringstream stopic(topic); - std::getline(stopic, part, '/'); + std::vector> reverse; - // Find the root node, and walk down till we find the leaf node. - std::vector> reverse; - Client::Impl::SubscriptionPart *subscriptions = &m_impl->m_subscriptions[part]; - reverse.emplace_back(part, subscriptions); - while (std::getline(stopic, part, '/')) + // Find where in the tree the callback for this subscription should be removed. + Client::Impl::SubscriptionPart *subscriptions = nullptr; + std::string_view topic_search = topic; + while (true) { - subscriptions = &subscriptions->children[part]; + std::string_view part = topic_search; + + // Find the next part of the topic. + auto pos = topic_search.find('/'); + if (pos != std::string_view::npos) + { + part = topic_search.substr(0, pos); + topic_search.remove_prefix(pos + 1); + } + + // Find the next subscription in the tree. + if (subscriptions == nullptr) + { + subscriptions = &m_impl->m_subscriptions.find(part)->second; + } + else + { + subscriptions = &subscriptions->children.find(part)->second; + } + + // Update the reverse lookup. reverse.emplace_back(part, subscriptions); + + // If this was the last element, we're done. + if (pos == std::string_view::npos) + { + break; + } } + // Clear the callbacks in the leaf node. subscriptions->callbacks.clear(); // Bookkeeping: remove any empty nodes. // Otherwise we will slowly grow in memory if a user does a lot of unsubscribes // on different topics. - std::string remove_next = ""; + std::string_view remove_next = ""; for (auto it = reverse.rbegin(); it != reverse.rend(); it++) { if (!remove_next.empty()) { - std::get<1>(*it)->children.erase(remove_next); + std::get<1>(*it)->children.erase(std::get<1>(*it)->children.find(remove_next)); remove_next = ""; } @@ -250,10 +300,10 @@ void TrueMQTT::Client::unsubscribe(const std::string &topic) const } if (!remove_next.empty()) { - m_impl->m_subscriptions.erase(remove_next); + m_impl->m_subscriptions.erase(m_impl->m_subscriptions.find(remove_next)); } - m_impl->m_subscription_topics.erase(topic); + m_impl->m_subscription_topics.erase(m_impl->m_subscription_topics.find(topic)); if (m_impl->m_state == Client::Impl::State::CONNECTED) { if (!m_impl->sendUnsubscribe(topic)) @@ -312,7 +362,7 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected) } } -bool TrueMQTT::Client::Impl::toPublishQueue(const std::string &topic, const std::string &message, bool retain) +bool TrueMQTT::Client::Impl::toPublishQueue(const std::string_view topic, const std::string_view message, bool retain) { if (m_state != Client::Impl::State::CONNECTING) { @@ -346,26 +396,34 @@ bool TrueMQTT::Client::Impl::toPublishQueue(const std::string &topic, const std: return true; } -void TrueMQTT::Client::Impl::findSubscriptionMatch(std::vector> &matching_callbacks, const std::map &subscriptions, std::deque &parts) +void TrueMQTT::Client::Impl::findSubscriptionMatch(std::string_view topic, std::string_view message, std::string_view topic_search, const std::map> &subscriptions) { - // If we reached the end of the topic, do nothing anymore. - if (parts.empty()) + std::string_view part = topic_search; + + // Find the next part of the topic. + auto pos = topic_search.find('/'); + if (pos != std::string_view::npos) { - return; + part = topic_search.substr(0, pos); + topic_search.remove_prefix(pos + 1); } - LOG_TRACE(this, "Finding subscription match for part '" + parts.front() + "'"); - // Find the match based on the part. - auto it = subscriptions.find(parts.front()); + auto it = subscriptions.find(part); if (it != subscriptions.end()) { - LOG_TRACE(this, "Found subscription match for part '" + parts.front() + "' with " + std::to_string(it->second.callbacks.size()) + " callbacks"); + LOG_TRACE(this, "Found subscription match for part '" + std::string(part) + "' with " + std::to_string(it->second.callbacks.size()) + " callbacks"); - matching_callbacks.insert(matching_callbacks.end(), it->second.callbacks.begin(), it->second.callbacks.end()); + for (const auto &callback : it->second.callbacks) + { + callback(topic, message); + } - std::deque remaining_parts(parts.begin() + 1, parts.end()); - findSubscriptionMatch(matching_callbacks, it->second.children, remaining_parts); + // Recursively find the match for the next part if we didn't reach the end. + if (pos != std::string_view::npos) + { + findSubscriptionMatch(topic, message, topic_search, it->second.children); + } } // Find the match if this part is a wildcard. @@ -374,10 +432,16 @@ void TrueMQTT::Client::Impl::findSubscriptionMatch(std::vectorsecond.callbacks.size()) + " callbacks"); - matching_callbacks.insert(matching_callbacks.end(), it->second.callbacks.begin(), it->second.callbacks.end()); + for (const auto &callback : it->second.callbacks) + { + callback(topic, message); + } - std::deque remaining_parts(parts.begin() + 1, parts.end()); - findSubscriptionMatch(matching_callbacks, it->second.children, remaining_parts); + // Recursively find the match for the next part if we didn't reach the end. + if (pos != std::string_view::npos) + { + findSubscriptionMatch(topic, message, topic_search, it->second.children); + } } // Find the match if the remaining is a wildcard. @@ -386,40 +450,27 @@ void TrueMQTT::Client::Impl::findSubscriptionMatch(std::vectorsecond.callbacks.size()) + " callbacks"); - matching_callbacks.insert(matching_callbacks.end(), it->second.callbacks.begin(), it->second.callbacks.end()); + for (const auto &callback : it->second.callbacks) + { + callback(topic, message); + } + // No more recursion here, as we implicit consume the rest of the parts too. } } -void TrueMQTT::Client::Impl::messageReceived(std::string topic, std::string message) +void TrueMQTT::Client::Impl::messageReceived(std::string_view topic, std::string_view message) { - LOG_TRACE(this, "Message received on topic '" + topic + "': " + message); - - // Split the topic on the / in parts. - std::string part; - std::stringstream stopic(topic); - std::deque parts; - while (std::getline(stopic, part, '/')) - { - parts.emplace_back(part); - } - - // Find the matching subscription(s) with recursion. - std::vector> matching_callbacks; - findSubscriptionMatch(matching_callbacks, m_subscriptions, parts); + std::scoped_lock lock(m_state_mutex); - LOG_TRACE(this, "Found " + std::to_string(matching_callbacks.size()) + " subscription(s) for topic '" + topic + "'"); + LOG_TRACE(this, "Message received on topic '" + std::string(topic) + "': " + std::string(message)); - if (matching_callbacks.size() == 1) + if (m_state != State::CONNECTED) { - // For a single callback there is no need to copy the topic/message. - matching_callbacks[0](std::move(topic), std::move(message)); - } - else - { - for (const auto &callback : matching_callbacks) - { - callback(topic, message); - } + // This happens easily when the subscribed to a really busy topic and you disconnect. + LOG_ERROR(this, "Received message while not connected"); + return; } + + findSubscriptionMatch(topic, message, topic, m_subscriptions); } diff --git a/src/ClientImpl.h b/src/ClientImpl.h index 53c78bf..19d687f 100644 --- a/src/ClientImpl.h +++ b/src/ClientImpl.h @@ -22,9 +22,9 @@ class TrueMQTT::Client::Impl { public: - Impl(const std::string &host, + Impl(const std::string_view host, int port, - const std::string &client_id, + const std::string_view client_id, std::chrono::milliseconds connection_timeout, std::chrono::milliseconds connection_backoff, std::chrono::milliseconds connection_backoff_max, @@ -41,20 +41,20 @@ public: class SubscriptionPart { public: - std::map children; - std::vector> callbacks; + std::map> children; + std::vector> callbacks; }; - void connect(); ///< Connect to the broker. - void disconnect(); ///< Disconnect from the broker. - bool sendPublish(const std::string &topic, const std::string &message, bool retain); ///< Send a publish message to the broker. - bool sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker. - bool sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker. - void connectionStateChange(bool connected); ///< Called when a connection goes from CONNECTING state to CONNECTED state or visa versa. - bool toPublishQueue(const std::string &topic, const std::string &message, bool retain); ///< Add a publish message to the publish queue. - void messageReceived(std::string topic, std::string message); ///< Called when a message is received from the broker. + void connect(); ///< Connect to the broker. + void disconnect(); ///< Disconnect from the broker. + bool sendPublish(const std::string_view topic, const std::string_view message, bool retain); ///< Send a publish message to the broker. + bool sendSubscribe(const std::string_view topic); ///< Send a subscribe message to the broker. + bool sendUnsubscribe(const std::string_view topic); ///< Send an unsubscribe message to the broker. + void connectionStateChange(bool connected); ///< Called when a connection goes from CONNECTING state to CONNECTED state or visa versa. + bool toPublishQueue(const std::string_view topic, const std::string_view message, bool retain); ///< Add a publish message to the publish queue. + void messageReceived(std::string_view topic, std::string_view message); ///< Called when a message is received from the broker. - void findSubscriptionMatch(std::vector> &callbacks, const std::map &subscriptions, std::deque &parts); ///< Recursive function to find any matching subscription based on parts. + void findSubscriptionMatch(std::string_view topic, std::string_view message, std::string_view topic_search, const std::map> &subscriptions); ///< Recursive function to find any matching subscription based on parts. State m_state = State::DISCONNECTED; ///< The current state of the client. std::mutex m_state_mutex; ///< Mutex to protect state changes. @@ -67,14 +67,14 @@ public: std::chrono::milliseconds m_connection_backoff_max; ///< Maximum time between backoff attempts in seconds. std::chrono::milliseconds m_keep_alive_interval; ///< Interval in seconds between keep-alive messages. - Client::LogLevel m_log_level = Client::LogLevel::NONE; ///< The log level to use. - std::function m_logger = [](Client::LogLevel, std::string) { /* empty */ }; ///< Logger callback. + Client::LogLevel m_log_level = Client::LogLevel::NONE; ///< The log level to use. + std::function m_logger = [](Client::LogLevel, std::string_view) { /* empty */ }; ///< Logger callback. std::string m_last_will_topic = ""; ///< Topic to publish the last will message to. std::string m_last_will_message = ""; ///< Message to publish on the last will topic. bool m_last_will_retain = false; ///< Whether to retain the last will message. - std::function m_error_callback = [](Error, std::string) { /* empty */ }; ///< Error callback. + std::function m_error_callback = [](Error, std::string_view) { /* empty */ }; ///< Error callback. Client::PublishQueueType m_publish_queue_type = Client::PublishQueueType::DROP; ///< The type of queue to use for the publish queue. size_t m_publish_queue_size = -1; ///< Size of the publish queue. @@ -82,8 +82,8 @@ public: size_t m_send_queue_size = 1000; ///< Size of the send queue. - std::set m_subscription_topics; ///< Flat list of topics the client is subscribed to. - std::map m_subscriptions; ///< Tree of active subscriptions build up from the parts on the topic. + std::set> m_subscription_topics; ///< Flat list of topics the client is subscribed to. + std::map> m_subscriptions; ///< Tree of active subscriptions build up from the parts on the topic. class Connection; std::unique_ptr m_connection; ///< Connection to the broker. diff --git a/src/Packet.cpp b/src/Packet.cpp index 683bb92..b199b41 100644 --- a/src/Packet.cpp +++ b/src/Packet.cpp @@ -177,19 +177,19 @@ bool TrueMQTT::Client::Impl::Connection::recvLoop() } case Packet::PacketType::PUBLISH: { - std::string topic; + std::string_view topic; if (!packet.read_string(topic)) { LOG_ERROR(&m_impl, "Malformed packet received, closing connection"); return false; } - std::string message; + std::string_view message; packet.read_remaining(message); - LOG_DEBUG(&m_impl, "Received PUBLISH with topic " + topic + ": " + message); + LOG_DEBUG(&m_impl, "Received PUBLISH with topic " + std::string(topic) + ": " + std::string(message)); - m_impl.messageReceived(std::move(topic), std::move(message)); + m_impl.messageReceived(topic, message); break; } case Packet::PacketType::SUBACK: @@ -288,7 +288,6 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet packet, bool has_priority) { m_send_queue.push_back(std::move(packet)); } - } // Notify the write thread that there is a new packet. m_send_queue_cv.notify_one(); @@ -374,9 +373,9 @@ bool TrueMQTT::Client::Impl::Connection::sendPingRequest() return send(std::move(packet), true); } -bool TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain) +bool TrueMQTT::Client::Impl::sendPublish(const std::string_view topic, const std::string_view message, bool retain) { - LOG_TRACE(this, "Sending PUBLISH packet to topic '" + topic + "': " + message + " (" + (retain ? "retained" : "not retained") + ")"); + LOG_TRACE(this, "Sending PUBLISH packet to topic '" + std::string(topic) + "': " + std::string(message) + " (" + (retain ? "retained" : "not retained") + ")"); uint8_t flags = 0; flags |= (retain ? 1 : 0) << 0; // Retain @@ -386,14 +385,14 @@ bool TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::st Packet packet(Packet::PacketType::PUBLISH, flags); packet.write_string(topic); - packet.write(message.c_str(), message.size()); + packet.write(message.data(), message.size()); return m_connection->send(std::move(packet)); } -bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic) +bool TrueMQTT::Client::Impl::sendSubscribe(const std::string_view topic) { - LOG_TRACE(this, "Sending SUBSCRIBE packet for topic '" + topic + "'"); + LOG_TRACE(this, "Sending SUBSCRIBE packet for topic '" + std::string(topic) + "'"); Packet packet(Packet::PacketType::SUBSCRIBE, 2); @@ -410,9 +409,9 @@ bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic) return m_connection->send(std::move(packet)); } -bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &topic) +bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string_view topic) { - LOG_TRACE(this, "Sending unsubscribe message for topic '" + topic + "'"); + LOG_TRACE(this, "Sending unsubscribe message for topic '" + std::string(topic) + "'"); Packet packet(Packet::PacketType::UNSUBSCRIBE, 2); diff --git a/src/Packet.h b/src/Packet.h index 4a47604..8039492 100644 --- a/src/Packet.h +++ b/src/Packet.h @@ -66,10 +66,10 @@ public: m_buffer.insert(m_buffer.end(), data, data + length); } - void write_string(const std::string &str) + void write_string(const std::string_view str) { write_uint16(static_cast(str.size())); - write(str.c_str(), str.size()); + write(str.data(), str.size()); } bool read_uint8(uint8_t &value) @@ -93,7 +93,7 @@ public: return true; } - bool read_string(std::string &str) + bool read_string(std::string_view &str) { uint16_t length; if (!read_uint16(length)) @@ -104,16 +104,15 @@ public: { return false; } - const char *data = reinterpret_cast(m_buffer.data()) + m_read_offset; - str.assign(data, length); + + str = std::string_view(reinterpret_cast(m_buffer.data() + m_read_offset), length); m_read_offset += length; return true; } - void read_remaining(std::string &str) + void read_remaining(std::string_view &str) { - const char *data = reinterpret_cast(m_buffer.data()) + m_read_offset; - str.assign(data, m_buffer.size() - m_read_offset); + str = std::string_view(reinterpret_cast(m_buffer.data() + m_read_offset), m_buffer.size() - m_read_offset); m_read_offset = m_buffer.size(); } -- libgit2 0.21.4