Commit 6736e5e07e75823186de581eea5fd0c3e4df7cbb

Authored by Patric Stout
1 parent e79fa386

feat: reduce memory footprint for subscriptions significantly

This is a breaking API change.

The whole interface of this library now uses "std::string_view"
instead of "std::string" / "std::string &". In result, we can
now promise to only copy data once thoughout the library.

For subscriptions, this is a copy once to read the data from the
socket into a buffer.

For publish, this is a copy once to write the data in a buffer
to send over the socket.

For publish, this doesn't change the memory footprint, as because
"std::string &" was already taking care of this. For subscriptions
however, it reduces the memory usage by a factor of three. And as
result it helps with the throughput.
README.md
1 1 # TrueMQTT - A modern C++ MQTT Client library
2 2  
3 3 This project is currently a Work In Progress.
4   -Although the basics are functional, it is untested.
  4 +
  5 +All basic functionality is in there, but it is lacking some QoL functionalities, and it has not really been hardened / battle-tested yet.
5 6  
6 7 ## Development
7 8  
8 9 ```bash
9 10 mkdir build
10 11 cd build
11   -cmake .. -DBUILD_SHARED_LIBS=ON -DMIN_LOGGER_LEVEL=TRACE
  12 +cmake .. -DBUILD_SHARED_LIBS=ON -DMIN_LOGGER_LEVEL=INFO
12 13 make -j$(nproc)
13 14  
14 15 example/pubsub/truemqtt_pubsub
15 16 ```
  17 +
  18 +## Design choices
  19 +
  20 +### MQTT v3 only
  21 +
  22 +Although this is a contested choice, for now the library only supports MQTT v3.
  23 +There is added value in MQTT v5, but it comes with extra overhead, both in performance and memory.
  24 +
  25 +This library aims to supply an interface for the more common way of using MQTT, which is a simple publish / subscribe interface.
  26 +
  27 +In the future this might change, because, as said, MQTT v5 has solid additions, that might be worth delving in it.
  28 +
  29 +### Copy-once
  30 +
  31 +A packet that is received from a broker, is only copied once in memory (from `recv()` to an internal buffer).
  32 +All subscription callbacks get a `std::string_view` which is directly in this buffer.
  33 +
  34 +This way, the library only needs to allocate memory once, heavily reducing the memory footprint.
  35 +This also means the library can handle big payloads without issue.
  36 +
  37 +For publishing a similar approach is taken, and the topic / message is only copied once in an internal buffer.
  38 +The only exception here is when the client isn't connected to the broker (yet).
  39 +In this scenario, a copy of topic / message is made, and there will be two allocations for both, instead of one.
  40 +
  41 +Either way, this makes this library highly efficient in terms of memory usage.
  42 +
  43 +The drawback is that you have to be careful with your callbacks.
  44 +You always receive a `std::string_view`, that is only valid within that callback.
  45 +As soon as the callback returns, the memory becomes invalid.
  46 +
  47 +This means that if you need to keep the topic and/or message around, you need to make a copy.
  48 +
  49 +### QoS 0
  50 +
  51 +This library only supports QoS 0.
  52 +This is mainly because that is the only QoS I have ever used since using MQTT.
  53 +
  54 +The problem with other QoSes is that is is mostly pointless up till the point it is useful.
  55 +MQTT uses TCP, and as such, delivery over the socket is guaranteed if both sides are still alive.
  56 +In other words, QoS 1 doesn't add any guarantees in the normal situation, where lines / brokers aren't saturated.
  57 +When it does get saturated, QoS 1 becomes useful.
  58 +
  59 +But, there is a trade-off here.
  60 +If you always do QoS 1 for the cases where the line does get saturated, you put more pressure on the line in all cases, which results in a line that is saturated more quickly.
  61 +And in reality, it is very hard to recover from such scenarios anyway.
  62 +
  63 +MQTT 5 corrects this situation, by a bit of a cheat.
  64 +If you publish with QoS 1, but the TCP connection was working as expected, it in fact handles it as a QoS 0 request.
  65 +
  66 +For this reason, this library only supports QoS 0.
  67 +As added benefit, it makes for easier code, which is less like to have bugs / problems.
... ...
example/pubsub/main.cpp
... ... @@ -14,10 +14,10 @@ int main()
14 14 // Create a connection to the local broker.
15 15 TrueMQTT::Client client("localhost", 1883, "test");
16 16  
17   - client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string message)
  17 + client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string_view message)
18 18 { std::cout << "Log " << level << ": " << message << std::endl; });
19 19 client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 10);
20   - client.setErrorCallback([](TrueMQTT::Client::Error error, std::string message)
  20 + client.setErrorCallback([](TrueMQTT::Client::Error error, std::string_view message)
21 21 { std::cout << "Error " << error << ": " << message << std::endl; });
22 22 client.setLastWill("example/pubsub/lastwill", "example pubsub finished", true);
23 23  
... ... @@ -26,23 +26,23 @@ int main()
26 26 int stop = 0;
27 27  
28 28 // Subscribe to the topic we will be publishing under in a bit.
29   - client.subscribe("example/pubsub/test/subtest", [&stop](const std::string topic, const std::string payload)
  29 + client.subscribe("example/pubsub/test/subtest", [&stop](const std::string_view topic, const std::string_view payload)
30 30 {
31 31 std::cout << "Received message on exact topic " << topic << ": " << payload << std::endl;
32 32 stop++; });
33   - client.subscribe("example/pubsub/test/subtest", [&stop](const std::string topic, const std::string payload)
  33 + client.subscribe("example/pubsub/test/subtest", [&stop](const std::string_view topic, const std::string_view payload)
34 34 {
35   - std::cout << "Received message on exact topic " << topic << ": " << payload << std::endl;
  35 + std::cout << "Received message on exact topic " << topic << " again: " << payload << std::endl;
36 36 stop++; });
37   - client.subscribe("example/pubsub/+/subtest", [&stop](const std::string topic, const std::string payload)
  37 + client.subscribe("example/pubsub/+/subtest", [&stop](const std::string_view topic, const std::string_view payload)
38 38 {
39 39 std::cout << "Received message on single wildcard topic " << topic << ": " << payload << std::endl;
40 40 stop++; });
41   - client.subscribe("example/pubsub/test/#", [&stop](const std::string topic, const std::string payload)
  41 + client.subscribe("example/pubsub/test/#", [&stop](const std::string_view topic, const std::string_view payload)
42 42 {
43 43 std::cout << "Received message on multi wildcard topic " << topic << ": " << payload << std::endl;
44 44 stop++; });
45   - client.subscribe("example/pubsub/test/+", [&stop](const std::string topic, const std::string payload)
  45 + client.subscribe("example/pubsub/test/+", [&stop](const std::string_view topic, const std::string_view payload)
46 46 {
47 47 /* Never actually called, as we unsubscribe a bit later */ });
48 48  
... ...
example/stress/main.cpp
... ... @@ -14,10 +14,10 @@ int main()
14 14 // Create a connection to the local broker.
15 15 TrueMQTT::Client client("localhost", 1883, "test");
16 16  
17   - client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string message)
  17 + client.setLogger(TrueMQTT::Client::LogLevel::WARNING, [](TrueMQTT::Client::LogLevel level, std::string_view message)
18 18 { std::cout << "Log " << level << ": " << message << std::endl; });
19 19 client.setPublishQueue(TrueMQTT::Client::PublishQueueType::FIFO, 100);
20   - client.setErrorCallback([](TrueMQTT::Client::Error error, std::string message)
  20 + client.setErrorCallback([](TrueMQTT::Client::Error error, std::string_view message)
21 21 { std::cout << "Error " << error << ": " << message << std::endl; });
22 22 client.setLastWill("test/lastwill", "example pubsub finished", true);
23 23  
... ... @@ -30,11 +30,11 @@ int main()
30 30 int64_t totalLatency = 0;
31 31  
32 32 // Subscribe to the topic we are going to stress test.
33   - client.subscribe("example/stress/+", [&received, &totalLatency](const std::string topic, const std::string payload)
  33 + client.subscribe("example/stress/+", [&received, &totalLatency](const std::string_view topic, const std::string_view payload)
34 34 {
35 35 // Calculate the latency.
36 36 auto now = std::chrono::steady_clock::now();
37   - auto then = std::chrono::time_point<std::chrono::steady_clock>(std::chrono::microseconds(std::stoll(payload)));
  37 + auto then = std::chrono::time_point<std::chrono::steady_clock>(std::chrono::microseconds(std::stoll(std::string(payload))));
38 38 auto latency = std::chrono::duration_cast<std::chrono::microseconds>(now - then).count();
39 39  
40 40 totalLatency += latency;
... ...
include/TrueMQTT.h
... ... @@ -10,6 +10,7 @@
10 10 #include <chrono>
11 11 #include <functional>
12 12 #include <memory>
  13 +#include <string_view>
13 14  
14 15 namespace TrueMQTT
15 16 {
... ... @@ -116,9 +117,9 @@ namespace TrueMQTT
116 117 * @param connection_backoff_max Maximum time between backoff attempts in seconds.
117 118 * @param keep_alive_interval Interval in seconds between keep-alive messages.
118 119 */
119   - Client(const std::string &host,
  120 + Client(const std::string_view host,
120 121 int port,
121   - const std::string &client_id,
  122 + const std::string_view client_id,
122 123 std::chrono::milliseconds connection_timeout = std::chrono::milliseconds(5000),
123 124 std::chrono::milliseconds connection_backoff = std::chrono::milliseconds(1000),
124 125 std::chrono::milliseconds connection_backoff_max = std::chrono::milliseconds(30000),
... ... @@ -142,7 +143,7 @@ namespace TrueMQTT
142 143 * @note This library doesn't contain a logger, so you need to provide one.
143 144 * If this method is not called, no logging will be done.
144 145 */
145   - void setLogger(LogLevel log_level, const std::function<void(LogLevel, std::string)> &logger) const;
  146 + void setLogger(LogLevel log_level, const std::function<void(LogLevel, std::string_view)> &logger) const;
146 147  
147 148 /**
148 149 * @brief Set the last will message on the connection.
... ... @@ -155,14 +156,14 @@ namespace TrueMQTT
155 156 *
156 157 * @note Cannot be called after \ref connect.
157 158 */
158   - void setLastWill(const std::string &topic, const std::string &message, bool retain) const;
  159 + void setLastWill(const std::string_view topic, const std::string_view message, bool retain) const;
159 160  
160 161 /**
161 162 * @brief Set the error callback, called when any error occurs.
162 163 *
163 164 * @param callback The callback to call when an error occurs.
164 165 */
165   - void setErrorCallback(const std::function<void(Error, std::string)> &callback) const;
  166 + void setErrorCallback(const std::function<void(Error, std::string_view)> &callback) const;
166 167  
167 168 /**
168 169 * @brief Set the publish queue to use.
... ... @@ -247,7 +248,7 @@ namespace TrueMQTT
247 248 * in this case, but it is wise to back off for a while before sending something
248 249 * again.
249 250 */
250   - bool publish(const std::string &topic, const std::string &message, bool retain) const;
  251 + bool publish(const std::string_view topic, const std::string_view message, bool retain) const;
251 252  
252 253 /**
253 254 * @brief Subscribe to a topic, and call the callback function when a message arrives.
... ... @@ -261,6 +262,9 @@ namespace TrueMQTT
261 262 * @param topic The topic to subscribe to.
262 263 * @param callback The callback to call when a message arrives on this topic.
263 264 *
  265 + * @note The callback receives a string_view for topic/message, which is only valid
  266 + * for the duration of the callback. If you need to retain the value of longer,
  267 + * make sure to copy the content.
264 268 * @note Subscription can overlap, even on the exact same topic. All callbacks that
265 269 * match the topic will be called.
266 270 * @note Depending on the broker, overlapping subscriptions can trigger one or more
... ... @@ -279,7 +283,7 @@ namespace TrueMQTT
279 283 * moment the connection to the broker is established, and there are messages in the
280 284 * publish queue and/or subscriptions.
281 285 */
282   - void subscribe(const std::string &topic, const std::function<void(std::string, std::string)> &callback) const;
  286 + void subscribe(const std::string_view topic, const std::function<void(std::string_view, std::string_view)> &callback) const;
283 287  
284 288 /**
285 289 * @brief Unsubscribe from a topic.
... ... @@ -295,7 +299,7 @@ namespace TrueMQTT
295 299 * moment the connection to the broker is established, and there are messages in the
296 300 * publish queue and/or subscriptions.
297 301 */
298   - void unsubscribe(const std::string &topic) const;
  302 + void unsubscribe(const std::string_view topic) const;
299 303  
300 304 private:
301 305 // Private implementation
... ...
src/Client.cpp
... ... @@ -13,9 +13,9 @@
13 13  
14 14 #include <sstream>
15 15  
16   -TrueMQTT::Client::Client(const std::string &host,
  16 +TrueMQTT::Client::Client(const std::string_view host,
17 17 int port,
18   - const std::string &client_id,
  18 + const std::string_view client_id,
19 19 std::chrono::milliseconds connection_timeout,
20 20 std::chrono::milliseconds connection_backoff,
21 21 std::chrono::milliseconds connection_backoff_max,
... ... @@ -33,9 +33,9 @@ TrueMQTT::Client::~Client()
33 33 disconnect();
34 34 }
35 35  
36   -TrueMQTT::Client::Impl::Impl(const std::string &host,
  36 +TrueMQTT::Client::Impl::Impl(const std::string_view host,
37 37 int port,
38   - const std::string &client_id,
  38 + const std::string_view client_id,
39 39 std::chrono::milliseconds connection_timeout,
40 40 std::chrono::milliseconds connection_backoff,
41 41 std::chrono::milliseconds connection_backoff_max,
... ... @@ -54,7 +54,7 @@ TrueMQTT::Client::Impl::~Impl()
54 54 {
55 55 }
56 56  
57   -void TrueMQTT::Client::setLogger(Client::LogLevel log_level, const std::function<void(Client::LogLevel, std::string)> &logger) const
  57 +void TrueMQTT::Client::setLogger(Client::LogLevel log_level, const std::function<void(Client::LogLevel, std::string_view)> &logger) const
58 58 {
59 59 LOG_TRACE(m_impl, "Setting logger to log level " + std::to_string(log_level));
60 60  
... ... @@ -64,7 +64,7 @@ void TrueMQTT::Client::setLogger(Client::LogLevel log_level, const std::function
64 64 LOG_DEBUG(m_impl, "Log level now on " + std::to_string(m_impl->m_log_level));
65 65 }
66 66  
67   -void TrueMQTT::Client::setLastWill(const std::string &topic, const std::string &message, bool retain) const
  67 +void TrueMQTT::Client::setLastWill(const std::string_view topic, const std::string_view message, bool retain) const
68 68 {
69 69 if (m_impl->m_state != Client::Impl::State::DISCONNECTED)
70 70 {
... ... @@ -72,14 +72,14 @@ void TrueMQTT::Client::setLastWill(const std::string &amp;topic, const std::string &amp;
72 72 return;
73 73 }
74 74  
75   - LOG_TRACE(m_impl, "Setting last will to topic " + topic + " with message " + message + " and retain " + std::to_string(retain));
  75 + LOG_TRACE(m_impl, "Setting last will to topic " + std::string(topic) + " with message " + std::string(message) + " and retain " + std::to_string(retain));
76 76  
77 77 m_impl->m_last_will_topic = topic;
78 78 m_impl->m_last_will_message = message;
79 79 m_impl->m_last_will_retain = retain;
80 80 }
81 81  
82   -void TrueMQTT::Client::setErrorCallback(const std::function<void(Error, std::string)> &callback) const
  82 +void TrueMQTT::Client::setErrorCallback(const std::function<void(Error, std::string_view)> &callback) const
83 83 {
84 84 LOG_TRACE(m_impl, "Setting error callback");
85 85  
... ... @@ -144,11 +144,11 @@ void TrueMQTT::Client::disconnect() const
144 144 m_impl->disconnect();
145 145 }
146 146  
147   -bool TrueMQTT::Client::publish(const std::string &topic, const std::string &message, bool retain) const
  147 +bool TrueMQTT::Client::publish(const std::string_view topic, const std::string_view message, bool retain) const
148 148 {
149 149 std::scoped_lock lock(m_impl->m_state_mutex);
150 150  
151   - LOG_DEBUG(m_impl, "Publishing message on topic '" + topic + "': " + message + " (" + (retain ? "retained" : "not retained") + ")");
  151 + LOG_DEBUG(m_impl, "Publishing message on topic '" + std::string(topic) + "': " + std::string(message) + " (" + (retain ? "retained" : "not retained") + ")");
152 152  
153 153 switch (m_impl->m_state)
154 154 {
... ... @@ -164,7 +164,7 @@ bool TrueMQTT::Client::publish(const std::string &amp;topic, const std::string &amp;mess
164 164 return false;
165 165 }
166 166  
167   -void TrueMQTT::Client::subscribe(const std::string &topic, const std::function<void(std::string, std::string)> &callback) const
  167 +void TrueMQTT::Client::subscribe(const std::string_view topic, const std::function<void(std::string_view, std::string_view)> &callback) const
168 168 {
169 169 std::scoped_lock lock(m_impl->m_state_mutex);
170 170  
... ... @@ -174,23 +174,44 @@ void TrueMQTT::Client::subscribe(const std::string &amp;topic, const std::function&lt;v
174 174 return;
175 175 }
176 176  
177   - LOG_DEBUG(m_impl, "Subscribing to topic '" + topic + "'");
  177 + LOG_DEBUG(m_impl, "Subscribing to topic '" + std::string(topic) + "'");
178 178  
179   - // Split the topic on /, to find each part.
180   - std::string part;
181   - std::stringstream stopic(topic);
182   - std::getline(stopic, part, '/');
183   -
184   - // Find the root node, and walk down till we find the leaf node.
185   - Client::Impl::SubscriptionPart *subscriptions = &m_impl->m_subscriptions.try_emplace(part).first->second;
186   - while (std::getline(stopic, part, '/'))
  179 + // Find where in the tree the callback for this subscription should be added.
  180 + Client::Impl::SubscriptionPart *subscriptions = nullptr;
  181 + std::string_view topic_search = topic;
  182 + while (true)
187 183 {
188   - subscriptions = &subscriptions->children.try_emplace(part).first->second;
  184 + std::string_view part = topic_search;
  185 +
  186 + // Find the next part of the topic.
  187 + auto pos = topic_search.find('/');
  188 + if (pos != std::string_view::npos)
  189 + {
  190 + part = topic_search.substr(0, pos);
  191 + topic_search.remove_prefix(pos + 1);
  192 + }
  193 +
  194 + // Find the next subscription in the tree.
  195 + if (subscriptions == nullptr)
  196 + {
  197 + subscriptions = &m_impl->m_subscriptions.try_emplace(std::string(part)).first->second;
  198 + }
  199 + else
  200 + {
  201 + subscriptions = &subscriptions->children.try_emplace(std::string(part)).first->second;
  202 + }
  203 +
  204 + // If this was the last element, we're done.
  205 + if (pos == std::string_view::npos)
  206 + {
  207 + break;
  208 + }
189 209 }
  210 +
190 211 // Add the callback to the leaf node.
191 212 subscriptions->callbacks.push_back(callback);
192 213  
193   - m_impl->m_subscription_topics.insert(topic);
  214 + m_impl->m_subscription_topics.insert(std::string(topic));
194 215 if (m_impl->m_state == Client::Impl::State::CONNECTED)
195 216 {
196 217 if (!m_impl->sendSubscribe(topic))
... ... @@ -202,7 +223,7 @@ void TrueMQTT::Client::subscribe(const std::string &amp;topic, const std::function&lt;v
202 223 }
203 224 }
204 225  
205   -void TrueMQTT::Client::unsubscribe(const std::string &topic) const
  226 +void TrueMQTT::Client::unsubscribe(const std::string_view topic) const
206 227 {
207 228 std::scoped_lock lock(m_impl->m_state_mutex);
208 229  
... ... @@ -212,34 +233,63 @@ void TrueMQTT::Client::unsubscribe(const std::string &amp;topic) const
212 233 return;
213 234 }
214 235  
215   - LOG_DEBUG(m_impl, "Unsubscribing from topic '" + topic + "'");
  236 + if (m_impl->m_subscription_topics.find(topic) == m_impl->m_subscription_topics.end())
  237 + {
  238 + LOG_ERROR(m_impl, "Cannot unsubscribe from topic '" + std::string(topic) + "' because we are not subscribed to it");
  239 + return;
  240 + }
  241 +
  242 + LOG_DEBUG(m_impl, "Unsubscribing from topic '" + std::string(topic) + "'");
216 243  
217   - // Split the topic on /, to find each part.
218   - std::string part;
219   - std::stringstream stopic(topic);
220   - std::getline(stopic, part, '/');
  244 + std::vector<std::tuple<std::string_view, Client::Impl::SubscriptionPart *>> reverse;
221 245  
222   - // Find the root node, and walk down till we find the leaf node.
223   - std::vector<std::tuple<std::string, Client::Impl::SubscriptionPart *>> reverse;
224   - Client::Impl::SubscriptionPart *subscriptions = &m_impl->m_subscriptions[part];
225   - reverse.emplace_back(part, subscriptions);
226   - while (std::getline(stopic, part, '/'))
  246 + // Find where in the tree the callback for this subscription should be removed.
  247 + Client::Impl::SubscriptionPart *subscriptions = nullptr;
  248 + std::string_view topic_search = topic;
  249 + while (true)
227 250 {
228   - subscriptions = &subscriptions->children[part];
  251 + std::string_view part = topic_search;
  252 +
  253 + // Find the next part of the topic.
  254 + auto pos = topic_search.find('/');
  255 + if (pos != std::string_view::npos)
  256 + {
  257 + part = topic_search.substr(0, pos);
  258 + topic_search.remove_prefix(pos + 1);
  259 + }
  260 +
  261 + // Find the next subscription in the tree.
  262 + if (subscriptions == nullptr)
  263 + {
  264 + subscriptions = &m_impl->m_subscriptions.find(part)->second;
  265 + }
  266 + else
  267 + {
  268 + subscriptions = &subscriptions->children.find(part)->second;
  269 + }
  270 +
  271 + // Update the reverse lookup.
229 272 reverse.emplace_back(part, subscriptions);
  273 +
  274 + // If this was the last element, we're done.
  275 + if (pos == std::string_view::npos)
  276 + {
  277 + break;
  278 + }
230 279 }
  280 +
231 281 // Clear the callbacks in the leaf node.
232 282 subscriptions->callbacks.clear();
233 283  
234 284 // Bookkeeping: remove any empty nodes.
235 285 // Otherwise we will slowly grow in memory if a user does a lot of unsubscribes
236 286 // on different topics.
237   - std::string remove_next = "";
  287 + std::string_view remove_next = "";
238 288 for (auto it = reverse.rbegin(); it != reverse.rend(); it++)
239 289 {
240 290 if (!remove_next.empty())
241 291 {
242   - std::get<1>(*it)->children.erase(remove_next);
  292 + std::get<1>(*it)->children.erase(std::get<1>(*it)->children.find(remove_next));
243 293 remove_next = "";
244 294 }
245 295  
... ... @@ -250,10 +300,10 @@ void TrueMQTT::Client::unsubscribe(const std::string &amp;topic) const
250 300 }
251 301 if (!remove_next.empty())
252 302 {
253   - m_impl->m_subscriptions.erase(remove_next);
  303 + m_impl->m_subscriptions.erase(m_impl->m_subscriptions.find(remove_next));
254 304 }
255 305  
256   - m_impl->m_subscription_topics.erase(topic);
  306 + m_impl->m_subscription_topics.erase(m_impl->m_subscription_topics.find(topic));
257 307 if (m_impl->m_state == Client::Impl::State::CONNECTED)
258 308 {
259 309 if (!m_impl->sendUnsubscribe(topic))
... ... @@ -312,7 +362,7 @@ void TrueMQTT::Client::Impl::connectionStateChange(bool connected)
312 362 }
313 363 }
314 364  
315   -bool TrueMQTT::Client::Impl::toPublishQueue(const std::string &topic, const std::string &message, bool retain)
  365 +bool TrueMQTT::Client::Impl::toPublishQueue(const std::string_view topic, const std::string_view message, bool retain)
316 366 {
317 367 if (m_state != Client::Impl::State::CONNECTING)
318 368 {
... ... @@ -346,26 +396,34 @@ bool TrueMQTT::Client::Impl::toPublishQueue(const std::string &amp;topic, const std:
346 396 return true;
347 397 }
348 398  
349   -void TrueMQTT::Client::Impl::findSubscriptionMatch(std::vector<std::function<void(std::string, std::string)>> &matching_callbacks, const std::map<std::string, Client::Impl::SubscriptionPart> &subscriptions, std::deque<std::string> &parts)
  399 +void TrueMQTT::Client::Impl::findSubscriptionMatch(std::string_view topic, std::string_view message, std::string_view topic_search, const std::map<std::string, Client::Impl::SubscriptionPart, std::less<>> &subscriptions)
350 400 {
351   - // If we reached the end of the topic, do nothing anymore.
352   - if (parts.empty())
  401 + std::string_view part = topic_search;
  402 +
  403 + // Find the next part of the topic.
  404 + auto pos = topic_search.find('/');
  405 + if (pos != std::string_view::npos)
353 406 {
354   - return;
  407 + part = topic_search.substr(0, pos);
  408 + topic_search.remove_prefix(pos + 1);
355 409 }
356 410  
357   - LOG_TRACE(this, "Finding subscription match for part '" + parts.front() + "'");
358   -
359 411 // Find the match based on the part.
360   - auto it = subscriptions.find(parts.front());
  412 + auto it = subscriptions.find(part);
361 413 if (it != subscriptions.end())
362 414 {
363   - LOG_TRACE(this, "Found subscription match for part '" + parts.front() + "' with " + std::to_string(it->second.callbacks.size()) + " callbacks");
  415 + LOG_TRACE(this, "Found subscription match for part '" + std::string(part) + "' with " + std::to_string(it->second.callbacks.size()) + " callbacks");
364 416  
365   - matching_callbacks.insert(matching_callbacks.end(), it->second.callbacks.begin(), it->second.callbacks.end());
  417 + for (const auto &callback : it->second.callbacks)
  418 + {
  419 + callback(topic, message);
  420 + }
366 421  
367   - std::deque<std::string> remaining_parts(parts.begin() + 1, parts.end());
368   - findSubscriptionMatch(matching_callbacks, it->second.children, remaining_parts);
  422 + // Recursively find the match for the next part if we didn't reach the end.
  423 + if (pos != std::string_view::npos)
  424 + {
  425 + findSubscriptionMatch(topic, message, topic_search, it->second.children);
  426 + }
369 427 }
370 428  
371 429 // Find the match if this part is a wildcard.
... ... @@ -374,10 +432,16 @@ void TrueMQTT::Client::Impl::findSubscriptionMatch(std::vector&lt;std::function&lt;voi
374 432 {
375 433 LOG_TRACE(this, "Found subscription match for '+' with " + std::to_string(it->second.callbacks.size()) + " callbacks");
376 434  
377   - matching_callbacks.insert(matching_callbacks.end(), it->second.callbacks.begin(), it->second.callbacks.end());
  435 + for (const auto &callback : it->second.callbacks)
  436 + {
  437 + callback(topic, message);
  438 + }
378 439  
379   - std::deque<std::string> remaining_parts(parts.begin() + 1, parts.end());
380   - findSubscriptionMatch(matching_callbacks, it->second.children, remaining_parts);
  440 + // Recursively find the match for the next part if we didn't reach the end.
  441 + if (pos != std::string_view::npos)
  442 + {
  443 + findSubscriptionMatch(topic, message, topic_search, it->second.children);
  444 + }
381 445 }
382 446  
383 447 // Find the match if the remaining is a wildcard.
... ... @@ -386,40 +450,27 @@ void TrueMQTT::Client::Impl::findSubscriptionMatch(std::vector&lt;std::function&lt;voi
386 450 {
387 451 LOG_TRACE(this, "Found subscription match for '#' with " + std::to_string(it->second.callbacks.size()) + " callbacks");
388 452  
389   - matching_callbacks.insert(matching_callbacks.end(), it->second.callbacks.begin(), it->second.callbacks.end());
  453 + for (const auto &callback : it->second.callbacks)
  454 + {
  455 + callback(topic, message);
  456 + }
  457 +
390 458 // No more recursion here, as we implicit consume the rest of the parts too.
391 459 }
392 460 }
393 461  
394   -void TrueMQTT::Client::Impl::messageReceived(std::string topic, std::string message)
  462 +void TrueMQTT::Client::Impl::messageReceived(std::string_view topic, std::string_view message)
395 463 {
396   - LOG_TRACE(this, "Message received on topic '" + topic + "': " + message);
397   -
398   - // Split the topic on the / in parts.
399   - std::string part;
400   - std::stringstream stopic(topic);
401   - std::deque<std::string> parts;
402   - while (std::getline(stopic, part, '/'))
403   - {
404   - parts.emplace_back(part);
405   - }
406   -
407   - // Find the matching subscription(s) with recursion.
408   - std::vector<std::function<void(std::string, std::string)>> matching_callbacks;
409   - findSubscriptionMatch(matching_callbacks, m_subscriptions, parts);
  464 + std::scoped_lock lock(m_state_mutex);
410 465  
411   - LOG_TRACE(this, "Found " + std::to_string(matching_callbacks.size()) + " subscription(s) for topic '" + topic + "'");
  466 + LOG_TRACE(this, "Message received on topic '" + std::string(topic) + "': " + std::string(message));
412 467  
413   - if (matching_callbacks.size() == 1)
  468 + if (m_state != State::CONNECTED)
414 469 {
415   - // For a single callback there is no need to copy the topic/message.
416   - matching_callbacks[0](std::move(topic), std::move(message));
417   - }
418   - else
419   - {
420   - for (const auto &callback : matching_callbacks)
421   - {
422   - callback(topic, message);
423   - }
  470 + // This happens easily when the subscribed to a really busy topic and you disconnect.
  471 + LOG_ERROR(this, "Received message while not connected");
  472 + return;
424 473 }
  474 +
  475 + findSubscriptionMatch(topic, message, topic, m_subscriptions);
425 476 }
... ...
src/ClientImpl.h
... ... @@ -22,9 +22,9 @@
22 22 class TrueMQTT::Client::Impl
23 23 {
24 24 public:
25   - Impl(const std::string &host,
  25 + Impl(const std::string_view host,
26 26 int port,
27   - const std::string &client_id,
  27 + const std::string_view client_id,
28 28 std::chrono::milliseconds connection_timeout,
29 29 std::chrono::milliseconds connection_backoff,
30 30 std::chrono::milliseconds connection_backoff_max,
... ... @@ -41,20 +41,20 @@ public:
41 41 class SubscriptionPart
42 42 {
43 43 public:
44   - std::map<std::string, SubscriptionPart> children;
45   - std::vector<std::function<void(std::string, std::string)>> callbacks;
  44 + std::map<std::string, SubscriptionPart, std::less<>> children;
  45 + std::vector<std::function<void(std::string_view, std::string_view)>> callbacks;
46 46 };
47 47  
48   - void connect(); ///< Connect to the broker.
49   - void disconnect(); ///< Disconnect from the broker.
50   - bool sendPublish(const std::string &topic, const std::string &message, bool retain); ///< Send a publish message to the broker.
51   - bool sendSubscribe(const std::string &topic); ///< Send a subscribe message to the broker.
52   - bool sendUnsubscribe(const std::string &topic); ///< Send an unsubscribe message to the broker.
53   - void connectionStateChange(bool connected); ///< Called when a connection goes from CONNECTING state to CONNECTED state or visa versa.
54   - bool toPublishQueue(const std::string &topic, const std::string &message, bool retain); ///< Add a publish message to the publish queue.
55   - void messageReceived(std::string topic, std::string message); ///< Called when a message is received from the broker.
  48 + void connect(); ///< Connect to the broker.
  49 + void disconnect(); ///< Disconnect from the broker.
  50 + bool sendPublish(const std::string_view topic, const std::string_view message, bool retain); ///< Send a publish message to the broker.
  51 + bool sendSubscribe(const std::string_view topic); ///< Send a subscribe message to the broker.
  52 + bool sendUnsubscribe(const std::string_view topic); ///< Send an unsubscribe message to the broker.
  53 + void connectionStateChange(bool connected); ///< Called when a connection goes from CONNECTING state to CONNECTED state or visa versa.
  54 + bool toPublishQueue(const std::string_view topic, const std::string_view message, bool retain); ///< Add a publish message to the publish queue.
  55 + void messageReceived(std::string_view topic, std::string_view message); ///< Called when a message is received from the broker.
56 56  
57   - void findSubscriptionMatch(std::vector<std::function<void(std::string, std::string)>> &callbacks, const std::map<std::string, SubscriptionPart> &subscriptions, std::deque<std::string> &parts); ///< Recursive function to find any matching subscription based on parts.
  57 + void findSubscriptionMatch(std::string_view topic, std::string_view message, std::string_view topic_search, const std::map<std::string, Client::Impl::SubscriptionPart, std::less<>> &subscriptions); ///< Recursive function to find any matching subscription based on parts.
58 58  
59 59 State m_state = State::DISCONNECTED; ///< The current state of the client.
60 60 std::mutex m_state_mutex; ///< Mutex to protect state changes.
... ... @@ -67,14 +67,14 @@ public:
67 67 std::chrono::milliseconds m_connection_backoff_max; ///< Maximum time between backoff attempts in seconds.
68 68 std::chrono::milliseconds m_keep_alive_interval; ///< Interval in seconds between keep-alive messages.
69 69  
70   - Client::LogLevel m_log_level = Client::LogLevel::NONE; ///< The log level to use.
71   - std::function<void(Client::LogLevel, std::string)> m_logger = [](Client::LogLevel, std::string) { /* empty */ }; ///< Logger callback.
  70 + Client::LogLevel m_log_level = Client::LogLevel::NONE; ///< The log level to use.
  71 + std::function<void(Client::LogLevel, std::string_view)> m_logger = [](Client::LogLevel, std::string_view) { /* empty */ }; ///< Logger callback.
72 72  
73 73 std::string m_last_will_topic = ""; ///< Topic to publish the last will message to.
74 74 std::string m_last_will_message = ""; ///< Message to publish on the last will topic.
75 75 bool m_last_will_retain = false; ///< Whether to retain the last will message.
76 76  
77   - std::function<void(Error, std::string)> m_error_callback = [](Error, std::string) { /* empty */ }; ///< Error callback.
  77 + std::function<void(Error, std::string_view)> m_error_callback = [](Error, std::string_view) { /* empty */ }; ///< Error callback.
78 78  
79 79 Client::PublishQueueType m_publish_queue_type = Client::PublishQueueType::DROP; ///< The type of queue to use for the publish queue.
80 80 size_t m_publish_queue_size = -1; ///< Size of the publish queue.
... ... @@ -82,8 +82,8 @@ public:
82 82  
83 83 size_t m_send_queue_size = 1000; ///< Size of the send queue.
84 84  
85   - std::set<std::string> m_subscription_topics; ///< Flat list of topics the client is subscribed to.
86   - std::map<std::string, SubscriptionPart> m_subscriptions; ///< Tree of active subscriptions build up from the parts on the topic.
  85 + std::set<std::string, std::less<>> m_subscription_topics; ///< Flat list of topics the client is subscribed to.
  86 + std::map<std::string, SubscriptionPart, std::less<>> m_subscriptions; ///< Tree of active subscriptions build up from the parts on the topic.
87 87  
88 88 class Connection;
89 89 std::unique_ptr<Connection> m_connection; ///< Connection to the broker.
... ...
src/Packet.cpp
... ... @@ -177,19 +177,19 @@ bool TrueMQTT::Client::Impl::Connection::recvLoop()
177 177 }
178 178 case Packet::PacketType::PUBLISH:
179 179 {
180   - std::string topic;
  180 + std::string_view topic;
181 181 if (!packet.read_string(topic))
182 182 {
183 183 LOG_ERROR(&m_impl, "Malformed packet received, closing connection");
184 184 return false;
185 185 }
186 186  
187   - std::string message;
  187 + std::string_view message;
188 188 packet.read_remaining(message);
189 189  
190   - LOG_DEBUG(&m_impl, "Received PUBLISH with topic " + topic + ": " + message);
  190 + LOG_DEBUG(&m_impl, "Received PUBLISH with topic " + std::string(topic) + ": " + std::string(message));
191 191  
192   - m_impl.messageReceived(std::move(topic), std::move(message));
  192 + m_impl.messageReceived(topic, message);
193 193 break;
194 194 }
195 195 case Packet::PacketType::SUBACK:
... ... @@ -288,7 +288,6 @@ bool TrueMQTT::Client::Impl::Connection::send(Packet packet, bool has_priority)
288 288 {
289 289 m_send_queue.push_back(std::move(packet));
290 290 }
291   -
292 291 }
293 292 // Notify the write thread that there is a new packet.
294 293 m_send_queue_cv.notify_one();
... ... @@ -374,9 +373,9 @@ bool TrueMQTT::Client::Impl::Connection::sendPingRequest()
374 373 return send(std::move(packet), true);
375 374 }
376 375  
377   -bool TrueMQTT::Client::Impl::sendPublish(const std::string &topic, const std::string &message, bool retain)
  376 +bool TrueMQTT::Client::Impl::sendPublish(const std::string_view topic, const std::string_view message, bool retain)
378 377 {
379   - LOG_TRACE(this, "Sending PUBLISH packet to topic '" + topic + "': " + message + " (" + (retain ? "retained" : "not retained") + ")");
  378 + LOG_TRACE(this, "Sending PUBLISH packet to topic '" + std::string(topic) + "': " + std::string(message) + " (" + (retain ? "retained" : "not retained") + ")");
380 379  
381 380 uint8_t flags = 0;
382 381 flags |= (retain ? 1 : 0) << 0; // Retain
... ... @@ -386,14 +385,14 @@ bool TrueMQTT::Client::Impl::sendPublish(const std::string &amp;topic, const std::st
386 385 Packet packet(Packet::PacketType::PUBLISH, flags);
387 386  
388 387 packet.write_string(topic);
389   - packet.write(message.c_str(), message.size());
  388 + packet.write(message.data(), message.size());
390 389  
391 390 return m_connection->send(std::move(packet));
392 391 }
393 392  
394   -bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &topic)
  393 +bool TrueMQTT::Client::Impl::sendSubscribe(const std::string_view topic)
395 394 {
396   - LOG_TRACE(this, "Sending SUBSCRIBE packet for topic '" + topic + "'");
  395 + LOG_TRACE(this, "Sending SUBSCRIBE packet for topic '" + std::string(topic) + "'");
397 396  
398 397 Packet packet(Packet::PacketType::SUBSCRIBE, 2);
399 398  
... ... @@ -410,9 +409,9 @@ bool TrueMQTT::Client::Impl::sendSubscribe(const std::string &amp;topic)
410 409 return m_connection->send(std::move(packet));
411 410 }
412 411  
413   -bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string &topic)
  412 +bool TrueMQTT::Client::Impl::sendUnsubscribe(const std::string_view topic)
414 413 {
415   - LOG_TRACE(this, "Sending unsubscribe message for topic '" + topic + "'");
  414 + LOG_TRACE(this, "Sending unsubscribe message for topic '" + std::string(topic) + "'");
416 415  
417 416 Packet packet(Packet::PacketType::UNSUBSCRIBE, 2);
418 417  
... ...
src/Packet.h
... ... @@ -66,10 +66,10 @@ public:
66 66 m_buffer.insert(m_buffer.end(), data, data + length);
67 67 }
68 68  
69   - void write_string(const std::string &str)
  69 + void write_string(const std::string_view str)
70 70 {
71 71 write_uint16(static_cast<uint16_t>(str.size()));
72   - write(str.c_str(), str.size());
  72 + write(str.data(), str.size());
73 73 }
74 74  
75 75 bool read_uint8(uint8_t &value)
... ... @@ -93,7 +93,7 @@ public:
93 93 return true;
94 94 }
95 95  
96   - bool read_string(std::string &str)
  96 + bool read_string(std::string_view &str)
97 97 {
98 98 uint16_t length;
99 99 if (!read_uint16(length))
... ... @@ -104,16 +104,15 @@ public:
104 104 {
105 105 return false;
106 106 }
107   - const char *data = reinterpret_cast<const char *>(m_buffer.data()) + m_read_offset;
108   - str.assign(data, length);
  107 +
  108 + str = std::string_view(reinterpret_cast<const char *>(m_buffer.data() + m_read_offset), length);
109 109 m_read_offset += length;
110 110 return true;
111 111 }
112 112  
113   - void read_remaining(std::string &str)
  113 + void read_remaining(std::string_view &str)
114 114 {
115   - const char *data = reinterpret_cast<const char *>(m_buffer.data()) + m_read_offset;
116   - str.assign(data, m_buffer.size() - m_read_offset);
  115 + str = std::string_view(reinterpret_cast<const char *>(m_buffer.data() + m_read_offset), m_buffer.size() - m_read_offset);
117 116 m_read_offset = m_buffer.size();
118 117 }
119 118  
... ...