Commit e2ea3ea8765aa80177758b3401a9e9294235cb26

Authored by Wiebe Cazemier
Committed by Wiebe Cazemier
1 parent bf2193f9

Add stats about connection count in $SYS

This required adding a global stats object.

It also contains a bit of refactor to make a type out of the derived
counters.
CMakeLists.txt
... ... @@ -61,6 +61,8 @@ add_executable(FlashMQ
61 61 publishcopyfactory.h
62 62 variablebyteint.h
63 63 mqtt5properties.h
  64 + globalstats.h
  65 + derivablecounter.h
64 66  
65 67 mainapp.cpp
66 68 main.cpp
... ... @@ -101,6 +103,8 @@ add_executable(FlashMQ
101 103 publishcopyfactory.cpp
102 104 variablebyteint.cpp
103 105 mqtt5properties.cpp
  106 + globalstats.cpp
  107 + derivablecounter.cpp
104 108  
105 109 )
106 110  
... ...
FlashMQTests/FlashMQTests.pro
... ... @@ -52,6 +52,8 @@ SOURCES += tst_maintests.cpp \
52 52 ../publishcopyfactory.cpp \
53 53 ../variablebyteint.cpp \
54 54 ../mqtt5properties.cpp \
  55 + ../globalstats.cpp \
  56 + ../derivablecounter.cpp \
55 57 mainappthread.cpp \
56 58 twoclienttestcontext.cpp
57 59  
... ... @@ -96,6 +98,8 @@ HEADERS += \
96 98 ../publishcopyfactory.h \
97 99 ../variablebyteint.h \
98 100 ../mqtt5properties.h \
  101 + ../globalstats.h \
  102 + ../derivablecounter.h \
99 103 mainappthread.h \
100 104 twoclienttestcontext.h
101 105  
... ...
client.cpp
... ... @@ -223,7 +223,7 @@ void Client::writeMqttPacket(const MqttPacket &packet)
223 223 if (packet.packetType == PacketType::PUBLISH)
224 224 {
225 225 ThreadData *td = ThreadGlobals::getThreadData();
226   - td->incrementSentMessageCount(1);
  226 + td->sentMessageCounter.inc();
227 227 }
228 228 else if (packet.packetType == PacketType::DISCONNECT)
229 229 setReadyForDisconnect();
... ...
derivablecounter.cpp 0 → 100644
  1 +#include "derivablecounter.h"
  2 +
  3 +void DerivableCounter::inc(uint64_t n)
  4 +{
  5 + val += n;
  6 +}
  7 +
  8 +uint64_t DerivableCounter::get() const
  9 +{
  10 + return val;
  11 +}
  12 +
  13 +/**
  14 + * @brief DerivableCounter::getPerSecond returns the amount per second since last time this method was called.
  15 + * @return
  16 + *
  17 + * Even though the class it not meant to be thread-safe, this method does use a mutex, because obtaining the value can be
  18 + * scheduled in different threads.
  19 + */
  20 +uint64_t DerivableCounter::getPerSecond()
  21 +{
  22 + std::lock_guard<std::mutex> locker(timeMutex);
  23 +
  24 + std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
  25 + std::chrono::milliseconds msSinceLastTime = std::chrono::duration_cast<std::chrono::milliseconds>(now - timeOfPrevious);
  26 + uint64_t messagesTimes1000 = (val - valPrevious) * 1000;
  27 + uint64_t result = messagesTimes1000 / (msSinceLastTime.count() + 1); // branchless avoidance of div by 0;
  28 + timeOfPrevious = now;
  29 + valPrevious = val;
  30 + return result;
  31 +}
... ...
derivablecounter.h 0 → 100644
  1 +#ifndef DERIVABLECOUNTER_H
  2 +#define DERIVABLECOUNTER_H
  3 +
  4 +#include <chrono>
  5 +#include <mutex>
  6 +
  7 +/**
  8 + * @brief The DerivableCounter is a counter which can derive val/dt.
  9 + *
  10 + * It's not thread-safe, to avoid unnecessary locking. You should have counters per thread.
  11 + */
  12 +class DerivableCounter
  13 +{
  14 + uint64_t val = 0;
  15 + uint64_t valPrevious = 0;
  16 + std::chrono::time_point<std::chrono::steady_clock> timeOfPrevious = std::chrono::steady_clock::now();
  17 + std::mutex timeMutex;
  18 +
  19 +public:
  20 +
  21 + void inc(uint64_t n = 1);
  22 + uint64_t get() const;
  23 + uint64_t getPerSecond();
  24 +};
  25 +
  26 +#endif // DERIVABLECOUNTER_H
... ...
globalstats.cpp 0 → 100644
  1 +#include "globalstats.h"
  2 +
  3 +GlobalStats *GlobalStats::instance = nullptr;
  4 +
  5 +GlobalStats::GlobalStats()
  6 +{
  7 +
  8 +}
  9 +
  10 +GlobalStats *GlobalStats::getInstance()
  11 +{
  12 + if (GlobalStats::instance == nullptr)
  13 + GlobalStats::instance = new GlobalStats();
  14 +
  15 + return GlobalStats::instance;
  16 +}
  17 +
... ...
globalstats.h 0 → 100644
  1 +#ifndef GLOBALSTATS_H
  2 +#define GLOBALSTATS_H
  3 +
  4 +#include "stdint.h"
  5 +#include "derivablecounter.h"
  6 +
  7 +class GlobalStats
  8 +{
  9 + static GlobalStats *instance;
  10 +
  11 + GlobalStats();
  12 +public:
  13 + static GlobalStats *getInstance();
  14 +
  15 + DerivableCounter socketConnects;
  16 +};
  17 +
  18 +#endif // GLOBALSTATS_H
... ...
mainapp.cpp
... ... @@ -33,6 +33,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
33 33 #include "threadloop.h"
34 34 #include "authplugin.h"
35 35 #include "threadglobals.h"
  36 +#include "globalstats.h"
36 37  
37 38 MainApp *MainApp::instance = nullptr;
38 39  
... ... @@ -529,6 +530,8 @@ void MainApp::start()
529 530 }
530 531 #endif
531 532  
  533 + GlobalStats *globalStats = GlobalStats::getInstance();
  534 +
532 535 for (int i = 0; i < num_threads; i++)
533 536 {
534 537 std::shared_ptr<ThreadData> t = std::make_shared<ThreadData>(i, subscriptionStore, settings);
... ... @@ -592,6 +595,8 @@ void MainApp::start()
592 595  
593 596 std::shared_ptr<Client> client = std::make_shared<Client>(fd, thread_data, clientSSL, listener->websocket, addr, settings.get());
594 597 thread_data->giveClient(client);
  598 +
  599 + globalStats->socketConnects.inc();
595 600 }
596 601 else
597 602 {
... ...
mqttpacket.cpp
... ... @@ -328,6 +328,8 @@ void MqttPacket::handleConnect()
328 328  
329 329 std::shared_ptr<SubscriptionStore> subscriptionStore = sender->getThreadData()->getSubscriptionStore();
330 330  
  331 + sender->getThreadData()->mqttConnectCounter.inc();
  332 +
331 333 uint16_t variable_header_length = readTwoBytesToUInt16();
332 334  
333 335 const Settings &settings = *ThreadGlobals::getSettings();
... ... @@ -1129,7 +1131,7 @@ void MqttPacket::handlePublish()
1129 1131  
1130 1132 ReasonCodes ackCode = ReasonCodes::Success;
1131 1133  
1132   - sender->getThreadData()->incrementReceivedMessageCount();
  1134 + sender->getThreadData()->receivedMessageCounter.inc();
1133 1135  
1134 1136 Authentication &authentication = *ThreadGlobals::getAuth();
1135 1137  
... ...
threaddata.cpp
... ... @@ -20,6 +20,8 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
20 20 #include <sstream>
21 21 #include <cassert>
22 22  
  23 +#include "globalstats.h"
  24 +
23 25 KeepAliveCheck::KeepAliveCheck(const std::shared_ptr<Client> client) :
24 26 client(client)
25 27 {
... ... @@ -143,17 +145,31 @@ void ThreadData::publishStatsOnDollarTopic(std::vector&lt;std::shared_ptr&lt;ThreadDat
143 145 uint64_t sentMessageCountPerSecond = 0;
144 146 uint64_t sentMessageCount = 0;
145 147  
  148 + uint64_t mqttConnectCountPerSecond = 0;
  149 + uint64_t mqttConnectCount = 0;
  150 +
146 151 for (const std::shared_ptr<ThreadData> &thread : threads)
147 152 {
148 153 nrOfClients += thread->getNrOfClients();
149 154  
150   - receivedMessageCountPerSecond += thread->getReceivedMessagePerSecond();
151   - receivedMessageCount += thread->getReceivedMessageCount();
  155 + receivedMessageCountPerSecond += thread->receivedMessageCounter.getPerSecond();
  156 + receivedMessageCount += thread->receivedMessageCounter.get();
152 157  
153   - sentMessageCountPerSecond += thread->getSentMessagePerSecond();
154   - sentMessageCount += thread->getSentMessageCount();
  158 + sentMessageCountPerSecond += thread->sentMessageCounter.getPerSecond();
  159 + sentMessageCount += thread->sentMessageCounter.get();
  160 +
  161 + mqttConnectCountPerSecond += thread->mqttConnectCounter.getPerSecond();
  162 + mqttConnectCount += thread->mqttConnectCounter.get();
155 163 }
156 164  
  165 + GlobalStats *globalStats = GlobalStats::getInstance();
  166 +
  167 + publishStat("$SYS/broker/network/socketconnects/total", globalStats->socketConnects.get());
  168 + publishStat("$SYS/broker/network/socketconnects/persecond", globalStats->socketConnects.getPerSecond());
  169 +
  170 + publishStat("$SYS/broker/clients/mqttconnects/total", mqttConnectCount);
  171 + publishStat("$SYS/broker/clients/mqttconnects/persecond", mqttConnectCountPerSecond);
  172 +
157 173 publishStat("$SYS/broker/clients/total", nrOfClients);
158 174  
159 175 publishStat("$SYS/broker/load/messages/received/total", receivedMessageCount);
... ... @@ -391,54 +407,6 @@ int ThreadData::getNrOfClients() const
391 407 return clients_by_fd.size();
392 408 }
393 409  
394   -void ThreadData::incrementReceivedMessageCount()
395   -{
396   - receivedMessageCount++;
397   -}
398   -
399   -uint64_t ThreadData::getReceivedMessageCount() const
400   -{
401   - return receivedMessageCount;
402   -}
403   -
404   -/**
405   - * @brief ThreadData::getReceivedMessagePerSecond gets the amount of seconds received, averaged over the last time this was called.
406   - * @return
407   - *
408   - * Locking is not required, because the counter is not written to from here.
409   - */
410   -uint64_t ThreadData::getReceivedMessagePerSecond()
411   -{
412   - std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
413   - std::chrono::milliseconds msSinceLastTime = std::chrono::duration_cast<std::chrono::milliseconds>(now - receivedMessagePreviousTime);
414   - uint64_t messagesTimes1000 = (receivedMessageCount - receivedMessageCountPrevious) * 1000;
415   - uint64_t result = messagesTimes1000 / (msSinceLastTime.count() + 1); // branchless avoidance of div by 0;
416   - receivedMessagePreviousTime = now;
417   - receivedMessageCountPrevious = receivedMessageCount;
418   - return result;
419   -}
420   -
421   -void ThreadData::incrementSentMessageCount(uint64_t n)
422   -{
423   - sentMessageCount += n;
424   -}
425   -
426   -uint64_t ThreadData::getSentMessageCount() const
427   -{
428   - return sentMessageCount;
429   -}
430   -
431   -uint64_t ThreadData::getSentMessagePerSecond()
432   -{
433   - std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
434   - std::chrono::milliseconds msSinceLastTime = std::chrono::duration_cast<std::chrono::milliseconds>(now - sentMessagePreviousTime);
435   - uint64_t messagesTimes1000 = (sentMessageCount - sentMessageCountPrevious) * 1000;
436   - uint64_t result = messagesTimes1000 / (msSinceLastTime.count() + 1); // branchless avoidance of div by 0;
437   - sentMessagePreviousTime = now;
438   - sentMessageCountPrevious = sentMessageCount;
439   - return result;
440   -}
441   -
442 410 void ThreadData::queueAuthPluginPeriodicEvent()
443 411 {
444 412 std::lock_guard<std::mutex> locker(taskQueueMutex);
... ...
threaddata.h
... ... @@ -37,6 +37,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
37 37 #include "configfileparser.h"
38 38 #include "authplugin.h"
39 39 #include "logger.h"
  40 +#include "derivablecounter.h"
40 41  
41 42 typedef void (*thread_f)(ThreadData *);
42 43  
... ... @@ -55,14 +56,6 @@ class ThreadData
55 56 std::shared_ptr<SubscriptionStore> subscriptionStore;
56 57 Logger *logger;
57 58  
58   - uint64_t receivedMessageCount = 0;
59   - uint64_t receivedMessageCountPrevious = 0;
60   - std::chrono::time_point<std::chrono::steady_clock> receivedMessagePreviousTime = std::chrono::steady_clock::now();
61   -
62   - uint64_t sentMessageCount = 0;
63   - uint64_t sentMessageCountPrevious = 0;
64   - std::chrono::time_point<std::chrono::steady_clock> sentMessagePreviousTime = std::chrono::steady_clock::now();
65   -
66 59 std::mutex clientsToRemoveMutex;
67 60 std::forward_list<std::weak_ptr<Client>> clientsQueuedForRemoving;
68 61  
... ... @@ -97,6 +90,10 @@ public:
97 90 std::mutex taskQueueMutex;
98 91 std::forward_list<std::function<void()>> taskQueue;
99 92  
  93 + DerivableCounter receivedMessageCounter;
  94 + DerivableCounter sentMessageCounter;
  95 + DerivableCounter mqttConnectCounter;
  96 +
100 97 ThreadData(int threadnr, std::shared_ptr<SubscriptionStore> &subscriptionStore, std::shared_ptr<Settings> settings);
101 98 ThreadData(const ThreadData &other) = delete;
102 99 ThreadData(ThreadData &&other) = delete;
... ... @@ -124,14 +121,6 @@ public:
124 121  
125 122 int getNrOfClients() const;
126 123  
127   - void incrementReceivedMessageCount();
128   - uint64_t getReceivedMessageCount() const;
129   - uint64_t getReceivedMessagePerSecond();
130   -
131   - void incrementSentMessageCount(uint64_t n);
132   - uint64_t getSentMessageCount() const;
133   - uint64_t getSentMessagePerSecond();
134   -
135 124 void queueAuthPluginPeriodicEvent();
136 125 void authPluginPeriodicEvent();
137 126  
... ...