Commit 367579cc24954a856a89a48e819ba4ba788ef6e4

Authored by Wiebe Cazemier
1 parent e44119c5

Properly handle dollar topics

Also include a few stats.
FlashMQTests/tst_maintests.cpp
@@ -323,6 +323,9 @@ void MainTests::test_validSubscribePath() @@ -323,6 +323,9 @@ void MainTests::test_validSubscribePath()
323 QVERIFY(isValidSubscribePath("")); 323 QVERIFY(isValidSubscribePath(""));
324 QVERIFY(isValidSubscribePath("hello")); 324 QVERIFY(isValidSubscribePath("hello"));
325 325
  326 + QVERIFY(isValidSubscribePath("$SYS/hello"));
  327 + QVERIFY(isValidSubscribePath("hello/$SYS")); // Hmm, is this valid?
  328 +
326 QVERIFY(!isValidSubscribePath("one/tw+o/three")); 329 QVERIFY(!isValidSubscribePath("one/tw+o/three"));
327 QVERIFY(!isValidSubscribePath("one/+o/three")); 330 QVERIFY(!isValidSubscribePath("one/+o/three"));
328 QVERIFY(!isValidSubscribePath("one/a+/three")); 331 QVERIFY(!isValidSubscribePath("one/a+/three"));
@@ -707,6 +710,7 @@ void MainTests::test_validUtf8Sse() @@ -707,6 +710,7 @@ void MainTests::test_validUtf8Sse()
707 QVERIFY(!data.isValidUtf8("+", true)); 710 QVERIFY(!data.isValidUtf8("+", true));
708 QVERIFY(!data.isValidUtf8("🩰+asdfasdfasdf", true)); 711 QVERIFY(!data.isValidUtf8("🩰+asdfasdfasdf", true));
709 QVERIFY(!data.isValidUtf8("+asdfasdfasdf", true)); 712 QVERIFY(!data.isValidUtf8("+asdfasdfasdf", true));
  713 + QVERIFY(!data.isValidUtf8("$SYS/asdfasdfasdf", true));
710 714
711 std::memset(m, 0, 16); 715 std::memset(m, 0, 16);
712 m[0] = 'a'; 716 m[0] = 'a';
mainapp.cpp
@@ -187,6 +187,10 @@ MainApp::MainApp(const std::string &configFilePath) : @@ -187,6 +187,10 @@ MainApp::MainApp(const std::string &configFilePath) :
187 187
188 auto fPasswordFileReload = std::bind(&MainApp::queuePasswordFileReloadAllThreads, this); 188 auto fPasswordFileReload = std::bind(&MainApp::queuePasswordFileReloadAllThreads, this);
189 timer.addCallback(fPasswordFileReload, 2000, "Password file reload."); 189 timer.addCallback(fPasswordFileReload, 2000, "Password file reload.");
  190 +
  191 + auto fPublishStats = std::bind(&MainApp::publishStatsOnDollarTopic, this);
  192 + timer.addCallback(fPublishStats, 10000, "Publish stats on $SYS");
  193 + publishStatsOnDollarTopic();
190 } 194 }
191 195
192 MainApp::~MainApp() 196 MainApp::~MainApp()
@@ -306,6 +310,43 @@ void MainApp::setFuzzFile(const std::string &fuzzFilePath) @@ -306,6 +310,43 @@ void MainApp::setFuzzFile(const std::string &fuzzFilePath)
306 this->fuzzFilePath = fuzzFilePath; 310 this->fuzzFilePath = fuzzFilePath;
307 } 311 }
308 312
  313 +void MainApp::publishStatsOnDollarTopic()
  314 +{
  315 + uint nrOfClients = 0;
  316 + uint64_t receivedMessageCountPerSecond = 0;
  317 + uint64_t receivedMessageCount = 0;
  318 + uint64_t sentMessageCountPerSecond = 0;
  319 + uint64_t sentMessageCount = 0;
  320 +
  321 + for (std::shared_ptr<ThreadData> &thread : threads)
  322 + {
  323 + nrOfClients += thread->getNrOfClients();
  324 +
  325 + receivedMessageCountPerSecond += thread->getReceivedMessagePerSecond();
  326 + receivedMessageCount += thread->getReceivedMessageCount();
  327 +
  328 + sentMessageCountPerSecond += thread->getSentMessagePerSecond();
  329 + sentMessageCount += thread->getSentMessageCount();
  330 + }
  331 +
  332 + publishStat("$SYS/broker/clients/total", nrOfClients);
  333 +
  334 + publishStat("$SYS/broker/load/messages/received/total", receivedMessageCount);
  335 + publishStat("$SYS/broker/load/messages/received/persecond", receivedMessageCountPerSecond);
  336 +
  337 + publishStat("$SYS/broker/load/messages/sent/total", sentMessageCount);
  338 + publishStat("$SYS/broker/load/messages/sent/persecond", sentMessageCountPerSecond);
  339 +}
  340 +
  341 +void MainApp::publishStat(const std::string &topic, uint64_t n)
  342 +{
  343 + std::vector<std::string> *subtopics = utils.splitTopic(topic);
  344 + const std::string payload = std::to_string(n);
  345 + Publish p(topic, payload, 0);
  346 + subscriptionStore->queuePacketAtSubscribers(*subtopics, p, true);
  347 + subscriptionStore->setRetainedMessage(topic, payload, 0);
  348 +}
  349 +
309 void MainApp::initMainApp(int argc, char *argv[]) 350 void MainApp::initMainApp(int argc, char *argv[])
310 { 351 {
311 if (instance != nullptr) 352 if (instance != nullptr)
mainapp.h
@@ -41,6 +41,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -41,6 +41,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
41 #include "timer.h" 41 #include "timer.h"
42 #include "scopedsocket.h" 42 #include "scopedsocket.h"
43 #include "oneinstancelock.h" 43 #include "oneinstancelock.h"
  44 +#include "threadlocalutils.h"
44 45
45 #define VERSION "0.7.0" 46 #define VERSION "0.7.0"
46 47
@@ -64,6 +65,7 @@ class MainApp @@ -64,6 +65,7 @@ class MainApp
64 std::mutex quitMutex; 65 std::mutex quitMutex;
65 std::string fuzzFilePath; 66 std::string fuzzFilePath;
66 OneInstanceLock oneInstanceLock; 67 OneInstanceLock oneInstanceLock;
  68 + Utils utils;
67 69
68 Logger *logger = Logger::getInstance(); 70 Logger *logger = Logger::getInstance();
69 71
@@ -77,6 +79,8 @@ class MainApp @@ -77,6 +79,8 @@ class MainApp
77 void queueKeepAliveCheckAtAllThreads(); 79 void queueKeepAliveCheckAtAllThreads();
78 void queuePasswordFileReloadAllThreads(); 80 void queuePasswordFileReloadAllThreads();
79 void setFuzzFile(const std::string &fuzzFilePath); 81 void setFuzzFile(const std::string &fuzzFilePath);
  82 + void publishStatsOnDollarTopic();
  83 + void publishStat(const std::string &topic, uint64_t n);
80 84
81 MainApp(const std::string &configFilePath); 85 MainApp(const std::string &configFilePath);
82 public: 86 public:
mqttpacket.cpp
@@ -497,6 +497,8 @@ void MqttPacket::handlePublish() @@ -497,6 +497,8 @@ void MqttPacket::handlePublish()
497 logger->logf(LOG_DEBUG, "Publish received, topic '%s'. QoS=%d. Retain=%d, dup=%d", topic.c_str(), qos, retain, dup); 497 logger->logf(LOG_DEBUG, "Publish received, topic '%s'. QoS=%d. Retain=%d, dup=%d", topic.c_str(), qos, retain, dup);
498 #endif 498 #endif
499 499
  500 + sender->getThreadData()->incrementReceivedMessageCount();
  501 +
500 if (qos) 502 if (qos)
501 { 503 {
502 packet_id_pos = pos; 504 packet_id_pos = pos;
session.cpp
@@ -48,7 +48,7 @@ void Session::assignActiveConnection(std::shared_ptr&lt;Client&gt; &amp;client) @@ -48,7 +48,7 @@ void Session::assignActiveConnection(std::shared_ptr&lt;Client&gt; &amp;client)
48 this->thread = client->getThreadData(); 48 this->thread = client->getThreadData();
49 } 49 }
50 50
51 -void Session::writePacket(const MqttPacket &packet, char max_qos) 51 +void Session::writePacket(const MqttPacket &packet, char max_qos, uint64_t &count)
52 { 52 {
53 assert(max_qos <= 2); 53 assert(max_qos <= 2);
54 54
@@ -62,6 +62,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos) @@ -62,6 +62,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos)
62 { 62 {
63 std::shared_ptr<Client> c = makeSharedClient(); 63 std::shared_ptr<Client> c = makeSharedClient();
64 c->writeMqttPacketAndBlameThisClient(packet, qos); 64 c->writeMqttPacketAndBlameThisClient(packet, qos);
  65 + count++;
65 } 66 }
66 } 67 }
67 else if (qos > 0) 68 else if (qos > 0)
@@ -93,6 +94,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos) @@ -93,6 +94,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos)
93 std::shared_ptr<Client> c = makeSharedClient(); 94 std::shared_ptr<Client> c = makeSharedClient();
94 c->writeMqttPacketAndBlameThisClient(*copyPacket.get(), qos); 95 c->writeMqttPacketAndBlameThisClient(*copyPacket.get(), qos);
95 copyPacket->setDuplicate(); // Any dealings with this packet from here will be a duplicate. 96 copyPacket->setDuplicate(); // Any dealings with this packet from here will be a duplicate.
  97 + count++;
96 } 98 }
97 } 99 }
98 } 100 }
@@ -136,8 +138,10 @@ void Session::clearQosMessage(uint16_t packet_id) @@ -136,8 +138,10 @@ void Session::clearQosMessage(uint16_t packet_id)
136 // 138 //
137 // There is a bit of a hole there, I think. When we write out a packet to a receiver, it may decide to drop it, if its buffers 139 // There is a bit of a hole there, I think. When we write out a packet to a receiver, it may decide to drop it, if its buffers
138 // are full, for instance. We are not required to (periodically) retry. TODO Perhaps I will implement that retry anyway. 140 // are full, for instance. We are not required to (periodically) retry. TODO Perhaps I will implement that retry anyway.
139 -void Session::sendPendingQosMessages() 141 +uint64_t Session::sendPendingQosMessages()
140 { 142 {
  143 + uint64_t count = 0;
  144 +
141 if (!clientDisconnected()) 145 if (!clientDisconnected())
142 { 146 {
143 std::shared_ptr<Client> c = makeSharedClient(); 147 std::shared_ptr<Client> c = makeSharedClient();
@@ -146,6 +150,7 @@ void Session::sendPendingQosMessages() @@ -146,6 +150,7 @@ void Session::sendPendingQosMessages()
146 { 150 {
147 c->writeMqttPacketAndBlameThisClient(*qosMessage.packet.get(), qosMessage.packet->getQos()); 151 c->writeMqttPacketAndBlameThisClient(*qosMessage.packet.get(), qosMessage.packet->getQos());
148 qosMessage.packet->setDuplicate(); // Any dealings with this packet from here will be a duplicate. 152 qosMessage.packet->setDuplicate(); // Any dealings with this packet from here will be a duplicate.
  153 + count++;
149 } 154 }
150 155
151 for (const uint16_t packet_id : outgoingQoS2MessageIds) 156 for (const uint16_t packet_id : outgoingQoS2MessageIds)
@@ -155,6 +160,8 @@ void Session::sendPendingQosMessages() @@ -155,6 +160,8 @@ void Session::sendPendingQosMessages()
155 c->writeMqttPacketAndBlameThisClient(packet, 2); 160 c->writeMqttPacketAndBlameThisClient(packet, 2);
156 } 161 }
157 } 162 }
  163 +
  164 + return count;
158 } 165 }
159 166
160 void Session::touch(time_t val) 167 void Session::touch(time_t val)
session.h
@@ -64,9 +64,9 @@ public: @@ -64,9 +64,9 @@ public:
64 bool clientDisconnected() const; 64 bool clientDisconnected() const;
65 std::shared_ptr<Client> makeSharedClient() const; 65 std::shared_ptr<Client> makeSharedClient() const;
66 void assignActiveConnection(std::shared_ptr<Client> &client); 66 void assignActiveConnection(std::shared_ptr<Client> &client);
67 - void writePacket(const MqttPacket &packet, char max_qos); 67 + void writePacket(const MqttPacket &packet, char max_qos, uint64_t &count);
68 void clearQosMessage(uint16_t packet_id); 68 void clearQosMessage(uint16_t packet_id);
69 - void sendPendingQosMessages(); 69 + uint64_t sendPendingQosMessages();
70 void touch(time_t val = 0); 70 void touch(time_t val = 0);
71 bool hasExpired(); 71 bool hasExpired();
72 72
subscriptionstore.cpp
@@ -78,6 +78,7 @@ SubscriptionNode *SubscriptionNode::getChildren(const std::string &amp;subtopic) con @@ -78,6 +78,7 @@ SubscriptionNode *SubscriptionNode::getChildren(const std::string &amp;subtopic) con
78 78
79 SubscriptionStore::SubscriptionStore() : 79 SubscriptionStore::SubscriptionStore() :
80 root("root"), 80 root("root"),
  81 + rootDollar("rootDollar"),
81 sessionsByIdConst(sessionsById) 82 sessionsByIdConst(sessionsById)
82 { 83 {
83 84
@@ -87,10 +88,13 @@ void SubscriptionStore::addSubscription(std::shared_ptr&lt;Client&gt; &amp;client, const s @@ -87,10 +88,13 @@ void SubscriptionStore::addSubscription(std::shared_ptr&lt;Client&gt; &amp;client, const s
87 { 88 {
88 const std::list<std::string> subtopics = split(topic, '/'); 89 const std::list<std::string> subtopics = split(topic, '/');
89 90
  91 + SubscriptionNode *deepestNode = &root;
  92 + if (topic.length() > 0 && topic[0] == '$')
  93 + deepestNode = &rootDollar;
  94 +
90 RWLockGuard lock_guard(&subscriptionsRwlock); 95 RWLockGuard lock_guard(&subscriptionsRwlock);
91 lock_guard.wrlock(); 96 lock_guard.wrlock();
92 97
93 - SubscriptionNode *deepestNode = &root;  
94 for(const std::string &subtopic : subtopics) 98 for(const std::string &subtopic : subtopics)
95 { 99 {
96 std::unique_ptr<SubscriptionNode> *selectedChildren = nullptr; 100 std::unique_ptr<SubscriptionNode> *selectedChildren = nullptr;
@@ -120,25 +124,27 @@ void SubscriptionStore::addSubscription(std::shared_ptr&lt;Client&gt; &amp;client, const s @@ -120,25 +124,27 @@ void SubscriptionStore::addSubscription(std::shared_ptr&lt;Client&gt; &amp;client, const s
120 { 124 {
121 const std::shared_ptr<Session> &ses = session_it->second; 125 const std::shared_ptr<Session> &ses = session_it->second;
122 deepestNode->addSubscriber(ses, qos); 126 deepestNode->addSubscriber(ses, qos);
123 - giveClientRetainedMessages(ses, topic, qos); 127 + uint64_t count = giveClientRetainedMessages(ses, topic, qos);
  128 + client->getThreadData()->incrementSentMessageCount(count);
124 } 129 }
125 } 130 }
126 131
127 lock_guard.unlock(); 132 lock_guard.unlock();
128 -  
129 -  
130 } 133 }
131 134
132 void SubscriptionStore::removeSubscription(std::shared_ptr<Client> &client, const std::string &topic) 135 void SubscriptionStore::removeSubscription(std::shared_ptr<Client> &client, const std::string &topic)
133 { 136 {
134 const std::list<std::string> subtopics = split(topic, '/'); 137 const std::list<std::string> subtopics = split(topic, '/');
135 138
  139 + SubscriptionNode *deepestNode = &root;
  140 + if (topic.length() > 0 && topic[0] == '$')
  141 + deepestNode = &rootDollar;
  142 +
136 RWLockGuard lock_guard(&subscriptionsRwlock); 143 RWLockGuard lock_guard(&subscriptionsRwlock);
137 lock_guard.wrlock(); 144 lock_guard.wrlock();
138 145
139 // This code looks like that for addSubscription(), but it's specifically different in that we don't want to default-create non-existing 146 // This code looks like that for addSubscription(), but it's specifically different in that we don't want to default-create non-existing
140 // nodes. We need to abort when that happens. 147 // nodes. We need to abort when that happens.
141 - SubscriptionNode *deepestNode = &root;  
142 for(const std::string &subtopic : subtopics) 148 for(const std::string &subtopic : subtopics)
143 { 149 {
144 SubscriptionNode *selectedChildren = nullptr; 150 SubscriptionNode *selectedChildren = nullptr;
@@ -208,7 +214,8 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt; @@ -208,7 +214,8 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
208 214
209 session->assignActiveConnection(client); 215 session->assignActiveConnection(client);
210 client->assignSession(session); 216 client->assignSession(session);
211 - session->sendPendingQosMessages(); 217 + uint64_t count = session->sendPendingQosMessages();
  218 + client->getThreadData()->incrementSentMessageCount(count);
212 } 219 }
213 220
214 bool SubscriptionStore::sessionPresent(const std::string &clientid) 221 bool SubscriptionStore::sessionPresent(const std::string &clientid)
@@ -227,7 +234,7 @@ bool SubscriptionStore::sessionPresent(const std::string &amp;clientid) @@ -227,7 +234,7 @@ bool SubscriptionStore::sessionPresent(const std::string &amp;clientid)
227 return result; 234 return result;
228 } 235 }
229 236
230 -void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const std::vector<Subscription> &subscribers) const 237 +void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const std::vector<Subscription> &subscribers, uint64_t &count) const
231 { 238 {
232 for (const Subscription &sub : subscribers) 239 for (const Subscription &sub : subscribers)
233 { 240 {
@@ -235,18 +242,29 @@ void SubscriptionStore::publishNonRecursively(const MqttPacket &amp;packet, const st @@ -235,18 +242,29 @@ void SubscriptionStore::publishNonRecursively(const MqttPacket &amp;packet, const st
235 if (!session_weak.expired()) // Shared pointer expires when session has been cleaned by 'clean session' connect. 242 if (!session_weak.expired()) // Shared pointer expires when session has been cleaned by 'clean session' connect.
236 { 243 {
237 const std::shared_ptr<Session> session = session_weak.lock(); 244 const std::shared_ptr<Session> session = session_weak.lock();
238 - session->writePacket(packet, sub.qos); 245 + session->writePacket(packet, sub.qos, count);
239 } 246 }
240 } 247 }
241 } 248 }
242 249
  250 +/**
  251 + * @brief SubscriptionStore::publishRecursively
  252 + * @param cur_subtopic_it
  253 + * @param end
  254 + * @param this_node
  255 + * @param packet
  256 + * @param count as a reference (vs return value) because a return value introduces an extra call i.e. limits tail recursion optimization.
  257 + *
  258 + * As noted in the params section, this method was written so that it could be (somewhat) optimized for tail recursion by the kernel. If you refactor this,
  259 + * look at objdump --disassemble --demangle to see how many calls (not jumps) to itself are made and compare.
  260 + */
243 void SubscriptionStore::publishRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end, 261 void SubscriptionStore::publishRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end,
244 - SubscriptionNode *this_node, const MqttPacket &packet) const 262 + SubscriptionNode *this_node, const MqttPacket &packet, uint64_t &count) const
245 { 263 {
246 if (cur_subtopic_it == end) // This is the end of the topic path, so look for subscribers here. 264 if (cur_subtopic_it == end) // This is the end of the topic path, so look for subscribers here.
247 { 265 {
248 if (this_node) 266 if (this_node)
249 - publishNonRecursively(packet, this_node->getSubscribers()); 267 + publishNonRecursively(packet, this_node->getSubscribers(), count);
250 return; 268 return;
251 } 269 }
252 270
@@ -263,33 +281,44 @@ void SubscriptionStore::publishRecursively(std::vector&lt;std::string&gt;::const_itera @@ -263,33 +281,44 @@ void SubscriptionStore::publishRecursively(std::vector&lt;std::string&gt;::const_itera
263 281
264 if (this_node->childrenPound) 282 if (this_node->childrenPound)
265 { 283 {
266 - publishNonRecursively(packet, this_node->childrenPound->getSubscribers()); 284 + publishNonRecursively(packet, this_node->childrenPound->getSubscribers(), count);
267 } 285 }
268 286
269 const auto &sub_node = this_node->children.find(cur_subtop); 287 const auto &sub_node = this_node->children.find(cur_subtop);
270 if (sub_node != this_node->children.end()) 288 if (sub_node != this_node->children.end())
271 { 289 {
272 - publishRecursively(next_subtopic, end, sub_node->second.get(), packet); 290 + publishRecursively(next_subtopic, end, sub_node->second.get(), packet, count);
273 } 291 }
274 292
275 if (this_node->childrenPlus) 293 if (this_node->childrenPlus)
276 { 294 {
277 - publishRecursively(next_subtopic, end, this_node->childrenPlus.get(), packet); 295 + publishRecursively(next_subtopic, end, this_node->childrenPlus.get(), packet, count);
278 } 296 }
279 } 297 }
280 298
281 -void SubscriptionStore::queuePacketAtSubscribers(const std::vector<std::string> &subtopics, const MqttPacket &packet) 299 +void SubscriptionStore::queuePacketAtSubscribers(const std::vector<std::string> &subtopics, const MqttPacket &packet, bool dollar)
282 { 300 {
283 assert(subtopics.size() > 0); 301 assert(subtopics.size() > 0);
284 302
  303 + SubscriptionNode *startNode = dollar ? &rootDollar : &root;
  304 +
285 RWLockGuard lock_guard(&subscriptionsRwlock); 305 RWLockGuard lock_guard(&subscriptionsRwlock);
286 lock_guard.rdlock(); 306 lock_guard.rdlock();
287 307
288 - publishRecursively(subtopics.begin(), subtopics.end(), &root, packet); 308 + uint64_t count = 0;
  309 + publishRecursively(subtopics.begin(), subtopics.end(), startNode, packet, count);
  310 +
  311 + std::shared_ptr<Client> sender = packet.getSender();
  312 + if (sender)
  313 + {
  314 + sender->getThreadData()->incrementSentMessageCount(count);
  315 + }
289 } 316 }
290 317
291 -void SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr<Session> &ses, const std::string &subscribe_topic, char max_qos) 318 +uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr<Session> &ses, const std::string &subscribe_topic, char max_qos)
292 { 319 {
  320 + uint64_t count = 0;
  321 +
293 RWLockGuard locker(&retainedMessagesRwlock); 322 RWLockGuard locker(&retainedMessagesRwlock);
294 locker.rdlock(); 323 locker.rdlock();
295 324
@@ -300,8 +329,12 @@ void SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr&lt;Session @@ -300,8 +329,12 @@ void SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr&lt;Session
300 const MqttPacket packet(publish); 329 const MqttPacket packet(publish);
301 330
302 if (topicsMatch(subscribe_topic, rm.topic)) 331 if (topicsMatch(subscribe_topic, rm.topic))
303 - ses->writePacket(packet, max_qos); 332 + {
  333 + ses->writePacket(packet, max_qos, count);
  334 + }
304 } 335 }
  336 +
  337 + return count;
305 } 338 }
306 339
307 void SubscriptionStore::setRetainedMessage(const std::string &topic, const std::string &payload, char qos) 340 void SubscriptionStore::setRetainedMessage(const std::string &topic, const std::string &payload, char qos)
subscriptionstore.h
@@ -72,6 +72,7 @@ public: @@ -72,6 +72,7 @@ public:
72 class SubscriptionStore 72 class SubscriptionStore
73 { 73 {
74 SubscriptionNode root; 74 SubscriptionNode root;
  75 + SubscriptionNode rootDollar;
75 pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER; 76 pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER;
76 std::unordered_map<std::string, std::shared_ptr<Session>> sessionsById; 77 std::unordered_map<std::string, std::shared_ptr<Session>> sessionsById;
77 const std::unordered_map<std::string, std::shared_ptr<Session>> &sessionsByIdConst; 78 const std::unordered_map<std::string, std::shared_ptr<Session>> &sessionsByIdConst;
@@ -81,9 +82,9 @@ class SubscriptionStore @@ -81,9 +82,9 @@ class SubscriptionStore
81 82
82 Logger *logger = Logger::getInstance(); 83 Logger *logger = Logger::getInstance();
83 84
84 - void publishNonRecursively(const MqttPacket &packet, const std::vector<Subscription> &subscribers) const; 85 + void publishNonRecursively(const MqttPacket &packet, const std::vector<Subscription> &subscribers, uint64_t &count) const;
85 void publishRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end, 86 void publishRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end,
86 - SubscriptionNode *this_node, const MqttPacket &packet) const; 87 + SubscriptionNode *this_node, const MqttPacket &packet, uint64_t &count) const;
87 88
88 public: 89 public:
89 SubscriptionStore(); 90 SubscriptionStore();
@@ -93,8 +94,8 @@ public: @@ -93,8 +94,8 @@ public:
93 void registerClientAndKickExistingOne(std::shared_ptr<Client> &client); 94 void registerClientAndKickExistingOne(std::shared_ptr<Client> &client);
94 bool sessionPresent(const std::string &clientid); 95 bool sessionPresent(const std::string &clientid);
95 96
96 - void queuePacketAtSubscribers(const std::vector<std::string> &subtopics, const MqttPacket &packet);  
97 - void giveClientRetainedMessages(const std::shared_ptr<Session> &ses, const std::string &subscribe_topic, char max_qos); 97 + void queuePacketAtSubscribers(const std::vector<std::string> &subtopics, const MqttPacket &packet, bool dollar = false);
  98 + uint64_t giveClientRetainedMessages(const std::shared_ptr<Session> &ses, const std::string &subscribe_topic, char max_qos);
98 99
99 void setRetainedMessage(const std::string &topic, const std::string &payload, char qos); 100 void setRetainedMessage(const std::string &topic, const std::string &payload, char qos);
100 101
threaddata.cpp
@@ -157,6 +157,59 @@ void ThreadData::queuePasswdFileReload() @@ -157,6 +157,59 @@ void ThreadData::queuePasswdFileReload()
157 wakeUpThread(); 157 wakeUpThread();
158 } 158 }
159 159
  160 +int ThreadData::getNrOfClients() const
  161 +{
  162 + return clients_by_fd.size();
  163 +}
  164 +
  165 +void ThreadData::incrementReceivedMessageCount()
  166 +{
  167 + receivedMessageCount++;
  168 +}
  169 +
  170 +uint64_t ThreadData::getReceivedMessageCount() const
  171 +{
  172 + return receivedMessageCount;
  173 +}
  174 +
  175 +/**
  176 + * @brief ThreadData::getReceivedMessagePerSecond gets the amount of seconds received, averaged over the last time this was called.
  177 + * @return
  178 + *
  179 + * Locking is not required, because the counter is not written to from here.
  180 + */
  181 +uint64_t ThreadData::getReceivedMessagePerSecond()
  182 +{
  183 + std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
  184 + std::chrono::milliseconds msSinceLastTime = std::chrono::duration_cast<std::chrono::milliseconds>(now - receivedMessagePreviousTime);
  185 + uint64_t messagesTimes1000 = (receivedMessageCount - receivedMessageCountPrevious) * 1000;
  186 + uint64_t result = messagesTimes1000 / (msSinceLastTime.count() + 1); // branchless avoidance of div by 0;
  187 + receivedMessagePreviousTime = now;
  188 + receivedMessageCountPrevious = receivedMessageCount;
  189 + return result;
  190 +}
  191 +
  192 +void ThreadData::incrementSentMessageCount(uint64_t n)
  193 +{
  194 + sentMessageCount += n;
  195 +}
  196 +
  197 +uint64_t ThreadData::getSentMessageCount() const
  198 +{
  199 + return sentMessageCount;
  200 +}
  201 +
  202 +uint64_t ThreadData::getSentMessagePerSecond()
  203 +{
  204 + std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
  205 + std::chrono::milliseconds msSinceLastTime = std::chrono::duration_cast<std::chrono::milliseconds>(now - sentMessagePreviousTime);
  206 + uint64_t messagesTimes1000 = (sentMessageCount - sentMessageCountPrevious) * 1000;
  207 + uint64_t result = messagesTimes1000 / (msSinceLastTime.count() + 1); // branchless avoidance of div by 0;
  208 + sentMessagePreviousTime = now;
  209 + sentMessageCountPrevious = sentMessageCount;
  210 + return result;
  211 +}
  212 +
160 // TODO: profile how fast hash iteration is. Perhaps having a second list/vector is beneficial? 213 // TODO: profile how fast hash iteration is. Perhaps having a second list/vector is beneficial?
161 void ThreadData::doKeepAliveCheck() 214 void ThreadData::doKeepAliveCheck()
162 { 215 {
threaddata.h
@@ -28,6 +28,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -28,6 +28,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
28 #include <mutex> 28 #include <mutex>
29 #include <shared_mutex> 29 #include <shared_mutex>
30 #include <functional> 30 #include <functional>
  31 +#include <chrono>
31 32
32 #include "forward_declarations.h" 33 #include "forward_declarations.h"
33 34
@@ -47,6 +48,15 @@ class ThreadData @@ -47,6 +48,15 @@ class ThreadData
47 std::shared_ptr<SubscriptionStore> subscriptionStore; 48 std::shared_ptr<SubscriptionStore> subscriptionStore;
48 Logger *logger; 49 Logger *logger;
49 50
  51 + uint64_t receivedMessageCount = 0;
  52 + uint64_t receivedMessageCountPrevious = 0;
  53 + std::chrono::time_point<std::chrono::steady_clock> receivedMessagePreviousTime = std::chrono::steady_clock::now();
  54 +
  55 + uint64_t sentMessageCount = 0;
  56 + uint64_t sentMessageCountPrevious = 0;
  57 + std::chrono::time_point<std::chrono::steady_clock> sentMessagePreviousTime = std::chrono::steady_clock::now();
  58 +
  59 +
50 void reload(std::shared_ptr<Settings> settings); 60 void reload(std::shared_ptr<Settings> settings);
51 void wakeUpThread(); 61 void wakeUpThread();
52 void doKeepAliveCheck(); 62 void doKeepAliveCheck();
@@ -81,6 +91,16 @@ public: @@ -81,6 +91,16 @@ public:
81 void queueQuit(); 91 void queueQuit();
82 void waitForQuit(); 92 void waitForQuit();
83 void queuePasswdFileReload(); 93 void queuePasswdFileReload();
  94 +
  95 + int getNrOfClients() const;
  96 +
  97 + void incrementReceivedMessageCount();
  98 + uint64_t getReceivedMessageCount() const;
  99 + uint64_t getReceivedMessagePerSecond();
  100 +
  101 + void incrementSentMessageCount(uint64_t n);
  102 + uint64_t getSentMessageCount() const;
  103 + uint64_t getSentMessagePerSecond();
84 }; 104 };
85 105
86 #endif // THREADDATA_H 106 #endif // THREADDATA_H
threadlocalutils.cpp
@@ -60,6 +60,9 @@ bool Utils::isValidUtf8(const std::string &amp;s, bool alsoCheckInvalidPublishChars) @@ -60,6 +60,9 @@ bool Utils::isValidUtf8(const std::string &amp;s, bool alsoCheckInvalidPublishChars)
60 std::memcpy(topicCopy.data(), s.c_str(), len); 60 std::memcpy(topicCopy.data(), s.c_str(), len);
61 std::memset(&topicCopy.data()[len], 0x20, 16); // I fill out with spaces, as valid chars 61 std::memset(&topicCopy.data()[len], 0x20, 16); // I fill out with spaces, as valid chars
62 62
  63 + if (alsoCheckInvalidPublishChars && len > 0 && s[0] == '$')
  64 + return false;
  65 +
63 int n = 0; 66 int n = 0;
64 const char *i = topicCopy.data(); 67 const char *i = topicCopy.data();
65 while (n < len) 68 while (n < len)
utils.cpp
@@ -57,6 +57,9 @@ bool topicsMatch(const std::string &amp;subscribeTopic, const std::string &amp;publishTo @@ -57,6 +57,9 @@ bool topicsMatch(const std::string &amp;subscribeTopic, const std::string &amp;publishTo
57 if (subscribeTopic.find("+") == std::string::npos && subscribeTopic.find("#") == std::string::npos) 57 if (subscribeTopic.find("+") == std::string::npos && subscribeTopic.find("#") == std::string::npos)
58 return subscribeTopic == publishTopic; 58 return subscribeTopic == publishTopic;
59 59
  60 + if (!subscribeTopic.empty() && !publishTopic.empty() && publishTopic[0] == '$' && subscribeTopic[0] != '$')
  61 + return false;
  62 +
60 const std::vector<std::string> subscribeParts = splitToVector(subscribeTopic, '/'); 63 const std::vector<std::string> subscribeParts = splitToVector(subscribeTopic, '/');
61 const std::vector<std::string> publishParts = splitToVector(publishTopic, '/'); 64 const std::vector<std::string> publishParts = splitToVector(publishTopic, '/');
62 65