From e2ea3ea8765aa80177758b3401a9e9294235cb26 Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Fri, 17 Jun 2022 06:53:31 +0200 Subject: [PATCH] Add stats about connection count in $SYS --- CMakeLists.txt | 4 ++++ FlashMQTests/FlashMQTests.pro | 4 ++++ client.cpp | 2 +- derivablecounter.cpp | 31 +++++++++++++++++++++++++++++++ derivablecounter.h | 26 ++++++++++++++++++++++++++ globalstats.cpp | 17 +++++++++++++++++ globalstats.h | 18 ++++++++++++++++++ mainapp.cpp | 5 +++++ mqttpacket.cpp | 4 +++- threaddata.cpp | 72 ++++++++++++++++++++---------------------------------------------------- threaddata.h | 21 +++++---------------- 11 files changed, 134 insertions(+), 70 deletions(-) create mode 100644 derivablecounter.cpp create mode 100644 derivablecounter.h create mode 100644 globalstats.cpp create mode 100644 globalstats.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 7a2dccb..bbb4b01 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -61,6 +61,8 @@ add_executable(FlashMQ publishcopyfactory.h variablebyteint.h mqtt5properties.h + globalstats.h + derivablecounter.h mainapp.cpp main.cpp @@ -101,6 +103,8 @@ add_executable(FlashMQ publishcopyfactory.cpp variablebyteint.cpp mqtt5properties.cpp + globalstats.cpp + derivablecounter.cpp ) diff --git a/FlashMQTests/FlashMQTests.pro b/FlashMQTests/FlashMQTests.pro index 8c01f08..3ef852d 100644 --- a/FlashMQTests/FlashMQTests.pro +++ b/FlashMQTests/FlashMQTests.pro @@ -52,6 +52,8 @@ SOURCES += tst_maintests.cpp \ ../publishcopyfactory.cpp \ ../variablebyteint.cpp \ ../mqtt5properties.cpp \ + ../globalstats.cpp \ + ../derivablecounter.cpp \ mainappthread.cpp \ twoclienttestcontext.cpp @@ -96,6 +98,8 @@ HEADERS += \ ../publishcopyfactory.h \ ../variablebyteint.h \ ../mqtt5properties.h \ + ../globalstats.h \ + ../derivablecounter.h \ mainappthread.h \ twoclienttestcontext.h diff --git a/client.cpp b/client.cpp index 370e21f..3c6ed73 100644 --- a/client.cpp +++ b/client.cpp @@ -223,7 +223,7 @@ void Client::writeMqttPacket(const MqttPacket &packet) if (packet.packetType == PacketType::PUBLISH) { ThreadData *td = ThreadGlobals::getThreadData(); - td->incrementSentMessageCount(1); + td->sentMessageCounter.inc(); } else if (packet.packetType == PacketType::DISCONNECT) setReadyForDisconnect(); diff --git a/derivablecounter.cpp b/derivablecounter.cpp new file mode 100644 index 0000000..615ff4c --- /dev/null +++ b/derivablecounter.cpp @@ -0,0 +1,31 @@ +#include "derivablecounter.h" + +void DerivableCounter::inc(uint64_t n) +{ + val += n; +} + +uint64_t DerivableCounter::get() const +{ + return val; +} + +/** + * @brief DerivableCounter::getPerSecond returns the amount per second since last time this method was called. + * @return + * + * Even though the class it not meant to be thread-safe, this method does use a mutex, because obtaining the value can be + * scheduled in different threads. + */ +uint64_t DerivableCounter::getPerSecond() +{ + std::lock_guard locker(timeMutex); + + std::chrono::time_point now = std::chrono::steady_clock::now(); + std::chrono::milliseconds msSinceLastTime = std::chrono::duration_cast(now - timeOfPrevious); + uint64_t messagesTimes1000 = (val - valPrevious) * 1000; + uint64_t result = messagesTimes1000 / (msSinceLastTime.count() + 1); // branchless avoidance of div by 0; + timeOfPrevious = now; + valPrevious = val; + return result; +} diff --git a/derivablecounter.h b/derivablecounter.h new file mode 100644 index 0000000..f622062 --- /dev/null +++ b/derivablecounter.h @@ -0,0 +1,26 @@ +#ifndef DERIVABLECOUNTER_H +#define DERIVABLECOUNTER_H + +#include +#include + +/** + * @brief The DerivableCounter is a counter which can derive val/dt. + * + * It's not thread-safe, to avoid unnecessary locking. You should have counters per thread. + */ +class DerivableCounter +{ + uint64_t val = 0; + uint64_t valPrevious = 0; + std::chrono::time_point timeOfPrevious = std::chrono::steady_clock::now(); + std::mutex timeMutex; + +public: + + void inc(uint64_t n = 1); + uint64_t get() const; + uint64_t getPerSecond(); +}; + +#endif // DERIVABLECOUNTER_H diff --git a/globalstats.cpp b/globalstats.cpp new file mode 100644 index 0000000..65bbaa3 --- /dev/null +++ b/globalstats.cpp @@ -0,0 +1,17 @@ +#include "globalstats.h" + +GlobalStats *GlobalStats::instance = nullptr; + +GlobalStats::GlobalStats() +{ + +} + +GlobalStats *GlobalStats::getInstance() +{ + if (GlobalStats::instance == nullptr) + GlobalStats::instance = new GlobalStats(); + + return GlobalStats::instance; +} + diff --git a/globalstats.h b/globalstats.h new file mode 100644 index 0000000..3101a1f --- /dev/null +++ b/globalstats.h @@ -0,0 +1,18 @@ +#ifndef GLOBALSTATS_H +#define GLOBALSTATS_H + +#include "stdint.h" +#include "derivablecounter.h" + +class GlobalStats +{ + static GlobalStats *instance; + + GlobalStats(); +public: + static GlobalStats *getInstance(); + + DerivableCounter socketConnects; +}; + +#endif // GLOBALSTATS_H diff --git a/mainapp.cpp b/mainapp.cpp index 802a135..0db4703 100644 --- a/mainapp.cpp +++ b/mainapp.cpp @@ -33,6 +33,7 @@ License along with FlashMQ. If not, see . #include "threadloop.h" #include "authplugin.h" #include "threadglobals.h" +#include "globalstats.h" MainApp *MainApp::instance = nullptr; @@ -529,6 +530,8 @@ void MainApp::start() } #endif + GlobalStats *globalStats = GlobalStats::getInstance(); + for (int i = 0; i < num_threads; i++) { std::shared_ptr t = std::make_shared(i, subscriptionStore, settings); @@ -592,6 +595,8 @@ void MainApp::start() std::shared_ptr client = std::make_shared(fd, thread_data, clientSSL, listener->websocket, addr, settings.get()); thread_data->giveClient(client); + + globalStats->socketConnects.inc(); } else { diff --git a/mqttpacket.cpp b/mqttpacket.cpp index cc24e95..9aa5909 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -328,6 +328,8 @@ void MqttPacket::handleConnect() std::shared_ptr subscriptionStore = sender->getThreadData()->getSubscriptionStore(); + sender->getThreadData()->mqttConnectCounter.inc(); + uint16_t variable_header_length = readTwoBytesToUInt16(); const Settings &settings = *ThreadGlobals::getSettings(); @@ -1129,7 +1131,7 @@ void MqttPacket::handlePublish() ReasonCodes ackCode = ReasonCodes::Success; - sender->getThreadData()->incrementReceivedMessageCount(); + sender->getThreadData()->receivedMessageCounter.inc(); Authentication &authentication = *ThreadGlobals::getAuth(); diff --git a/threaddata.cpp b/threaddata.cpp index b021d59..1609d6f 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -20,6 +20,8 @@ License along with FlashMQ. If not, see . #include #include +#include "globalstats.h" + KeepAliveCheck::KeepAliveCheck(const std::shared_ptr client) : client(client) { @@ -143,17 +145,31 @@ void ThreadData::publishStatsOnDollarTopic(std::vector &thread : threads) { nrOfClients += thread->getNrOfClients(); - receivedMessageCountPerSecond += thread->getReceivedMessagePerSecond(); - receivedMessageCount += thread->getReceivedMessageCount(); + receivedMessageCountPerSecond += thread->receivedMessageCounter.getPerSecond(); + receivedMessageCount += thread->receivedMessageCounter.get(); - sentMessageCountPerSecond += thread->getSentMessagePerSecond(); - sentMessageCount += thread->getSentMessageCount(); + sentMessageCountPerSecond += thread->sentMessageCounter.getPerSecond(); + sentMessageCount += thread->sentMessageCounter.get(); + + mqttConnectCountPerSecond += thread->mqttConnectCounter.getPerSecond(); + mqttConnectCount += thread->mqttConnectCounter.get(); } + GlobalStats *globalStats = GlobalStats::getInstance(); + + publishStat("$SYS/broker/network/socketconnects/total", globalStats->socketConnects.get()); + publishStat("$SYS/broker/network/socketconnects/persecond", globalStats->socketConnects.getPerSecond()); + + publishStat("$SYS/broker/clients/mqttconnects/total", mqttConnectCount); + publishStat("$SYS/broker/clients/mqttconnects/persecond", mqttConnectCountPerSecond); + publishStat("$SYS/broker/clients/total", nrOfClients); publishStat("$SYS/broker/load/messages/received/total", receivedMessageCount); @@ -391,54 +407,6 @@ int ThreadData::getNrOfClients() const return clients_by_fd.size(); } -void ThreadData::incrementReceivedMessageCount() -{ - receivedMessageCount++; -} - -uint64_t ThreadData::getReceivedMessageCount() const -{ - return receivedMessageCount; -} - -/** - * @brief ThreadData::getReceivedMessagePerSecond gets the amount of seconds received, averaged over the last time this was called. - * @return - * - * Locking is not required, because the counter is not written to from here. - */ -uint64_t ThreadData::getReceivedMessagePerSecond() -{ - std::chrono::time_point now = std::chrono::steady_clock::now(); - std::chrono::milliseconds msSinceLastTime = std::chrono::duration_cast(now - receivedMessagePreviousTime); - uint64_t messagesTimes1000 = (receivedMessageCount - receivedMessageCountPrevious) * 1000; - uint64_t result = messagesTimes1000 / (msSinceLastTime.count() + 1); // branchless avoidance of div by 0; - receivedMessagePreviousTime = now; - receivedMessageCountPrevious = receivedMessageCount; - return result; -} - -void ThreadData::incrementSentMessageCount(uint64_t n) -{ - sentMessageCount += n; -} - -uint64_t ThreadData::getSentMessageCount() const -{ - return sentMessageCount; -} - -uint64_t ThreadData::getSentMessagePerSecond() -{ - std::chrono::time_point now = std::chrono::steady_clock::now(); - std::chrono::milliseconds msSinceLastTime = std::chrono::duration_cast(now - sentMessagePreviousTime); - uint64_t messagesTimes1000 = (sentMessageCount - sentMessageCountPrevious) * 1000; - uint64_t result = messagesTimes1000 / (msSinceLastTime.count() + 1); // branchless avoidance of div by 0; - sentMessagePreviousTime = now; - sentMessageCountPrevious = sentMessageCount; - return result; -} - void ThreadData::queueAuthPluginPeriodicEvent() { std::lock_guard locker(taskQueueMutex); diff --git a/threaddata.h b/threaddata.h index 2c7a358..c466523 100644 --- a/threaddata.h +++ b/threaddata.h @@ -37,6 +37,7 @@ License along with FlashMQ. If not, see . #include "configfileparser.h" #include "authplugin.h" #include "logger.h" +#include "derivablecounter.h" typedef void (*thread_f)(ThreadData *); @@ -55,14 +56,6 @@ class ThreadData std::shared_ptr subscriptionStore; Logger *logger; - uint64_t receivedMessageCount = 0; - uint64_t receivedMessageCountPrevious = 0; - std::chrono::time_point receivedMessagePreviousTime = std::chrono::steady_clock::now(); - - uint64_t sentMessageCount = 0; - uint64_t sentMessageCountPrevious = 0; - std::chrono::time_point sentMessagePreviousTime = std::chrono::steady_clock::now(); - std::mutex clientsToRemoveMutex; std::forward_list> clientsQueuedForRemoving; @@ -97,6 +90,10 @@ public: std::mutex taskQueueMutex; std::forward_list> taskQueue; + DerivableCounter receivedMessageCounter; + DerivableCounter sentMessageCounter; + DerivableCounter mqttConnectCounter; + ThreadData(int threadnr, std::shared_ptr &subscriptionStore, std::shared_ptr settings); ThreadData(const ThreadData &other) = delete; ThreadData(ThreadData &&other) = delete; @@ -124,14 +121,6 @@ public: int getNrOfClients() const; - void incrementReceivedMessageCount(); - uint64_t getReceivedMessageCount() const; - uint64_t getReceivedMessagePerSecond(); - - void incrementSentMessageCount(uint64_t n); - uint64_t getSentMessageCount() const; - uint64_t getSentMessagePerSecond(); - void queueAuthPluginPeriodicEvent(); void authPluginPeriodicEvent(); -- libgit2 0.21.4