From 6cdda45217cc2f788aaae1a6291692bab8bfd820 Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Wed, 29 Jun 2022 20:27:19 +0200 Subject: [PATCH] Retrieve store from app instance --- FlashMQTests/tst_maintests.cpp | 4 ++-- client.cpp | 4 ++-- mainapp.cpp | 9 +++++++-- mainapp.h | 2 ++ mqttpacket.cpp | 12 ++++++------ threaddata.cpp | 13 ++++++------- threaddata.h | 6 ++---- 7 files changed, 27 insertions(+), 23 deletions(-) diff --git a/FlashMQTests/tst_maintests.cpp b/FlashMQTests/tst_maintests.cpp index e83843c..4d44f98 100644 --- a/FlashMQTests/tst_maintests.cpp +++ b/FlashMQTests/tst_maintests.cpp @@ -997,7 +997,7 @@ void MainTests::testSavingSessions() { std::shared_ptr settings(new Settings()); std::shared_ptr store(new SubscriptionStore()); - std::shared_ptr t(new ThreadData(0, store, settings)); + std::shared_ptr t(new ThreadData(0, settings)); // Kind of a hack... Authentication auth(*settings.get()); @@ -1110,7 +1110,7 @@ void MainTests::testParsePacketHelper(const std::string &topic, char from_qos, b std::shared_ptr settings(new Settings()); settings->logDebug = false; std::shared_ptr store(new SubscriptionStore()); - std::shared_ptr t(new ThreadData(0, store, settings)); + std::shared_ptr t(new ThreadData(0, settings)); // Kind of a hack... Authentication auth(*settings.get()); diff --git a/client.cpp b/client.cpp index 3c6ed73..069e83e 100644 --- a/client.cpp +++ b/client.cpp @@ -69,7 +69,7 @@ Client::~Client() logger->logf(LOG_NOTICE, "Removing client '%s'. Reason(s): %s", repr().c_str(), disconnectReason.c_str()); - std::shared_ptr &store = this->threadData->getSubscriptionStore(); + std::shared_ptr store = MainApp::getMainApp()->getSubscriptionStore(); if (willPublish) { @@ -420,7 +420,7 @@ void Client::sendOrQueueWill() if (!this->willPublish) return; - std::shared_ptr &store = this->threadData->getSubscriptionStore(); + std::shared_ptr store = MainApp::getMainApp()->getSubscriptionStore(); store->queueWillMessage(willPublish, session); this->willPublish.reset(); } diff --git a/mainapp.cpp b/mainapp.cpp index 0db4703..d80f93e 100644 --- a/mainapp.cpp +++ b/mainapp.cpp @@ -486,7 +486,7 @@ void MainApp::start() Authentication auth(settingsLocalCopy); ThreadGlobals::assign(&auth); - std::shared_ptr threaddata = std::make_shared(0, subscriptionStore, settings); + std::shared_ptr threaddata = std::make_shared(0, settings); std::shared_ptr client = std::make_shared(fd, threaddata, nullptr, fuzzWebsockets, nullptr, settings.get(), true); std::shared_ptr subscriber = std::make_shared(fdnull, threaddata, nullptr, fuzzWebsockets, nullptr, settings.get(), true); @@ -534,7 +534,7 @@ void MainApp::start() for (int i = 0; i < num_threads; i++) { - std::shared_ptr t = std::make_shared(i, subscriptionStore, settings); + std::shared_ptr t = std::make_shared(i, settings); t->start(&do_thread_work); threads.push_back(t); } @@ -785,3 +785,8 @@ void MainApp::queueCleanup() wakeUpThread(); } +std::shared_ptr MainApp::getSubscriptionStore() +{ + return this->subscriptionStore; +} + diff --git a/mainapp.h b/mainapp.h index 4e2da90..140332b 100644 --- a/mainapp.h +++ b/mainapp.h @@ -110,6 +110,8 @@ public: void queueConfigReload(); void queueCleanup(); + + std::shared_ptr getSubscriptionStore(); }; #endif // MAINAPP_H diff --git a/mqttpacket.cpp b/mqttpacket.cpp index 713207d..984f814 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -326,7 +326,7 @@ void MqttPacket::handleConnect() if (sender->hasConnectPacketSeen()) throw ProtocolError("Client already sent a CONNECT.", ReasonCodes::ProtocolError); - std::shared_ptr subscriptionStore = sender->getThreadData()->getSubscriptionStore(); + std::shared_ptr subscriptionStore = MainApp::getMainApp()->getSubscriptionStore(); sender->getThreadData()->mqttConnectCounter.inc(); @@ -775,7 +775,7 @@ void MqttPacket::handleExtendedAuth() if (finalResult == ReasonCodes::Success) { sender->sendConnackSuccess(); - std::shared_ptr subscriptionStore = sender->getThreadData()->getSubscriptionStore(); + std::shared_ptr subscriptionStore = MainApp::getMainApp()->getSubscriptionStore(); subscriptionStore->registerClientAndKickExistingOne(sender); } else @@ -926,7 +926,7 @@ void MqttPacket::handleSubscribe() if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, subtopics, AclAccess::subscribe, qos, false, getUserProperties()) == AuthResult::success) { logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribed to '%s' QoS %d", sender->repr().c_str(), topic.c_str(), qos); - sender->getThreadData()->getSubscriptionStore()->addSubscription(sender, topic, subtopics, qos); + MainApp::getMainApp()->getSubscriptionStore()->addSubscription(sender, topic, subtopics, qos); subs_reponse_codes.push_back(static_cast(qos)); } else @@ -995,7 +995,7 @@ void MqttPacket::handleUnsubscribe() if (topic.empty()) throw ProtocolError("Subscribe topic is empty.", ReasonCodes::MalformedPacket); - sender->getThreadData()->getSubscriptionStore()->removeSubscription(sender, topic); + MainApp::getMainApp()->getSubscriptionStore()->removeSubscription(sender, topic); logger->logf(LOG_UNSUBSCRIBE, "Client '%s' unsubscribed from '%s'", sender->repr().c_str(), topic.c_str()); } @@ -1155,7 +1155,7 @@ void MqttPacket::handlePublish() if (publishData.retain) { publishData.payload = getPayloadCopy(); - sender->getThreadData()->getSubscriptionStore()->setRetainedMessage(publishData, publishData.subtopics); + MainApp::getMainApp()->getSubscriptionStore()->setRetainedMessage(publishData, publishData.subtopics); } // Set dup flag to 0, because that must not be propagated [MQTT-3.3.1-3]. @@ -1164,7 +1164,7 @@ void MqttPacket::handlePublish() first_byte = bites[0]; PublishCopyFactory factory(this); - sender->getThreadData()->getSubscriptionStore()->queuePacketAtSubscribers(factory); + MainApp::getMainApp()->getSubscriptionStore()->queuePacketAtSubscribers(factory); } else { diff --git a/threaddata.cpp b/threaddata.cpp index 1609d6f..75dadde 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -28,8 +28,7 @@ KeepAliveCheck::KeepAliveCheck(const std::shared_ptr client) : } -ThreadData::ThreadData(int threadnr, std::shared_ptr &subscriptionStore, std::shared_ptr settings) : - subscriptionStore(subscriptionStore), +ThreadData::ThreadData(int threadnr, std::shared_ptr settings) : settingsLocalCopy(*settings.get()), authentication(settingsLocalCopy), threadnr(threadnr) @@ -178,6 +177,8 @@ void ThreadData::publishStatsOnDollarTopic(std::vector subscriptionStore = MainApp::getMainApp()->getSubscriptionStore(); + publishStat("$SYS/broker/retained messages/count", subscriptionStore->getRetainedMessageCount()); publishStat("$SYS/broker/sessions/total", subscriptionStore->getSessionCount()); @@ -190,17 +191,20 @@ void ThreadData::publishStat(const std::string &topic, uint64_t n) const std::string payload = std::to_string(n); Publish p(topic, payload, 0); PublishCopyFactory factory(&p); + std::shared_ptr subscriptionStore = MainApp::getMainApp()->getSubscriptionStore(); subscriptionStore->queuePacketAtSubscribers(factory, true); subscriptionStore->setRetainedMessage(p, factory.getSubtopics()); } void ThreadData::sendQueuedWills() { + std::shared_ptr subscriptionStore = MainApp::getMainApp()->getSubscriptionStore(); subscriptionStore->sendQueuedWillMessages(); } void ThreadData::removeExpiredSessions() { + std::shared_ptr subscriptionStore = MainApp::getMainApp()->getSubscriptionStore(); subscriptionStore->removeExpiredSessionsClients(); } @@ -357,11 +361,6 @@ void ThreadData::removeClient(std::shared_ptr client) clients_by_fd.erase(client->getFd()); } -std::shared_ptr &ThreadData::getSubscriptionStore() -{ - return subscriptionStore; -} - void ThreadData::queueDoKeepAliveCheck() { std::lock_guard locker(taskQueueMutex); diff --git a/threaddata.h b/threaddata.h index c466523..d844ca0 100644 --- a/threaddata.h +++ b/threaddata.h @@ -28,11 +28,11 @@ License along with FlashMQ. If not, see . #include #include #include +#include #include "forward_declarations.h" #include "client.h" -#include "subscriptionstore.h" #include "utils.h" #include "configfileparser.h" #include "authplugin.h" @@ -53,7 +53,6 @@ class ThreadData { std::unordered_map> clients_by_fd; std::mutex clients_by_fd_mutex; - std::shared_ptr subscriptionStore; Logger *logger; std::mutex clientsToRemoveMutex; @@ -94,7 +93,7 @@ public: DerivableCounter sentMessageCounter; DerivableCounter mqttConnectCounter; - ThreadData(int threadnr, std::shared_ptr &subscriptionStore, std::shared_ptr settings); + ThreadData(int threadnr, std::shared_ptr settings); ThreadData(const ThreadData &other) = delete; ThreadData(ThreadData &&other) = delete; @@ -105,7 +104,6 @@ public: void removeClientQueued(const std::shared_ptr &client); void removeClientQueued(int fd); void removeClient(std::shared_ptr client); - std::shared_ptr &getSubscriptionStore(); void initAuthPlugin(); void cleanupAuthPlugin(); -- libgit2 0.21.4