Commit 6cdda45217cc2f788aaae1a6291692bab8bfd820

Authored by Wiebe Cazemier
1 parent b62854b1

Retrieve store from app instance

Instead of the thread data, which didn't make sense.
FlashMQTests/tst_maintests.cpp
@@ -997,7 +997,7 @@ void MainTests::testSavingSessions() @@ -997,7 +997,7 @@ void MainTests::testSavingSessions()
997 { 997 {
998 std::shared_ptr<Settings> settings(new Settings()); 998 std::shared_ptr<Settings> settings(new Settings());
999 std::shared_ptr<SubscriptionStore> store(new SubscriptionStore()); 999 std::shared_ptr<SubscriptionStore> store(new SubscriptionStore());
1000 - std::shared_ptr<ThreadData> t(new ThreadData(0, store, settings)); 1000 + std::shared_ptr<ThreadData> t(new ThreadData(0, settings));
1001 1001
1002 // Kind of a hack... 1002 // Kind of a hack...
1003 Authentication auth(*settings.get()); 1003 Authentication auth(*settings.get());
@@ -1110,7 +1110,7 @@ void MainTests::testParsePacketHelper(const std::string &amp;topic, char from_qos, b @@ -1110,7 +1110,7 @@ void MainTests::testParsePacketHelper(const std::string &amp;topic, char from_qos, b
1110 std::shared_ptr<Settings> settings(new Settings()); 1110 std::shared_ptr<Settings> settings(new Settings());
1111 settings->logDebug = false; 1111 settings->logDebug = false;
1112 std::shared_ptr<SubscriptionStore> store(new SubscriptionStore()); 1112 std::shared_ptr<SubscriptionStore> store(new SubscriptionStore());
1113 - std::shared_ptr<ThreadData> t(new ThreadData(0, store, settings)); 1113 + std::shared_ptr<ThreadData> t(new ThreadData(0, settings));
1114 1114
1115 // Kind of a hack... 1115 // Kind of a hack...
1116 Authentication auth(*settings.get()); 1116 Authentication auth(*settings.get());
client.cpp
@@ -69,7 +69,7 @@ Client::~Client() @@ -69,7 +69,7 @@ Client::~Client()
69 69
70 logger->logf(LOG_NOTICE, "Removing client '%s'. Reason(s): %s", repr().c_str(), disconnectReason.c_str()); 70 logger->logf(LOG_NOTICE, "Removing client '%s'. Reason(s): %s", repr().c_str(), disconnectReason.c_str());
71 71
72 - std::shared_ptr<SubscriptionStore> &store = this->threadData->getSubscriptionStore(); 72 + std::shared_ptr<SubscriptionStore> store = MainApp::getMainApp()->getSubscriptionStore();
73 73
74 if (willPublish) 74 if (willPublish)
75 { 75 {
@@ -420,7 +420,7 @@ void Client::sendOrQueueWill() @@ -420,7 +420,7 @@ void Client::sendOrQueueWill()
420 if (!this->willPublish) 420 if (!this->willPublish)
421 return; 421 return;
422 422
423 - std::shared_ptr<SubscriptionStore> &store = this->threadData->getSubscriptionStore(); 423 + std::shared_ptr<SubscriptionStore> store = MainApp::getMainApp()->getSubscriptionStore();
424 store->queueWillMessage(willPublish, session); 424 store->queueWillMessage(willPublish, session);
425 this->willPublish.reset(); 425 this->willPublish.reset();
426 } 426 }
mainapp.cpp
@@ -486,7 +486,7 @@ void MainApp::start() @@ -486,7 +486,7 @@ void MainApp::start()
486 Authentication auth(settingsLocalCopy); 486 Authentication auth(settingsLocalCopy);
487 ThreadGlobals::assign(&auth); 487 ThreadGlobals::assign(&auth);
488 488
489 - std::shared_ptr<ThreadData> threaddata = std::make_shared<ThreadData>(0, subscriptionStore, settings); 489 + std::shared_ptr<ThreadData> threaddata = std::make_shared<ThreadData>(0, settings);
490 490
491 std::shared_ptr<Client> client = std::make_shared<Client>(fd, threaddata, nullptr, fuzzWebsockets, nullptr, settings.get(), true); 491 std::shared_ptr<Client> client = std::make_shared<Client>(fd, threaddata, nullptr, fuzzWebsockets, nullptr, settings.get(), true);
492 std::shared_ptr<Client> subscriber = std::make_shared<Client>(fdnull, threaddata, nullptr, fuzzWebsockets, nullptr, settings.get(), true); 492 std::shared_ptr<Client> subscriber = std::make_shared<Client>(fdnull, threaddata, nullptr, fuzzWebsockets, nullptr, settings.get(), true);
@@ -534,7 +534,7 @@ void MainApp::start() @@ -534,7 +534,7 @@ void MainApp::start()
534 534
535 for (int i = 0; i < num_threads; i++) 535 for (int i = 0; i < num_threads; i++)
536 { 536 {
537 - std::shared_ptr<ThreadData> t = std::make_shared<ThreadData>(i, subscriptionStore, settings); 537 + std::shared_ptr<ThreadData> t = std::make_shared<ThreadData>(i, settings);
538 t->start(&do_thread_work); 538 t->start(&do_thread_work);
539 threads.push_back(t); 539 threads.push_back(t);
540 } 540 }
@@ -785,3 +785,8 @@ void MainApp::queueCleanup() @@ -785,3 +785,8 @@ void MainApp::queueCleanup()
785 wakeUpThread(); 785 wakeUpThread();
786 } 786 }
787 787
  788 +std::shared_ptr<SubscriptionStore> MainApp::getSubscriptionStore()
  789 +{
  790 + return this->subscriptionStore;
  791 +}
  792 +
mainapp.h
@@ -110,6 +110,8 @@ public: @@ -110,6 +110,8 @@ public:
110 110
111 void queueConfigReload(); 111 void queueConfigReload();
112 void queueCleanup(); 112 void queueCleanup();
  113 +
  114 + std::shared_ptr<SubscriptionStore> getSubscriptionStore();
113 }; 115 };
114 116
115 #endif // MAINAPP_H 117 #endif // MAINAPP_H
mqttpacket.cpp
@@ -326,7 +326,7 @@ void MqttPacket::handleConnect() @@ -326,7 +326,7 @@ void MqttPacket::handleConnect()
326 if (sender->hasConnectPacketSeen()) 326 if (sender->hasConnectPacketSeen())
327 throw ProtocolError("Client already sent a CONNECT.", ReasonCodes::ProtocolError); 327 throw ProtocolError("Client already sent a CONNECT.", ReasonCodes::ProtocolError);
328 328
329 - std::shared_ptr<SubscriptionStore> subscriptionStore = sender->getThreadData()->getSubscriptionStore(); 329 + std::shared_ptr<SubscriptionStore> subscriptionStore = MainApp::getMainApp()->getSubscriptionStore();
330 330
331 sender->getThreadData()->mqttConnectCounter.inc(); 331 sender->getThreadData()->mqttConnectCounter.inc();
332 332
@@ -775,7 +775,7 @@ void MqttPacket::handleExtendedAuth() @@ -775,7 +775,7 @@ void MqttPacket::handleExtendedAuth()
775 if (finalResult == ReasonCodes::Success) 775 if (finalResult == ReasonCodes::Success)
776 { 776 {
777 sender->sendConnackSuccess(); 777 sender->sendConnackSuccess();
778 - std::shared_ptr<SubscriptionStore> subscriptionStore = sender->getThreadData()->getSubscriptionStore(); 778 + std::shared_ptr<SubscriptionStore> subscriptionStore = MainApp::getMainApp()->getSubscriptionStore();
779 subscriptionStore->registerClientAndKickExistingOne(sender); 779 subscriptionStore->registerClientAndKickExistingOne(sender);
780 } 780 }
781 else 781 else
@@ -926,7 +926,7 @@ void MqttPacket::handleSubscribe() @@ -926,7 +926,7 @@ void MqttPacket::handleSubscribe()
926 if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, subtopics, AclAccess::subscribe, qos, false, getUserProperties()) == AuthResult::success) 926 if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, subtopics, AclAccess::subscribe, qos, false, getUserProperties()) == AuthResult::success)
927 { 927 {
928 logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribed to '%s' QoS %d", sender->repr().c_str(), topic.c_str(), qos); 928 logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribed to '%s' QoS %d", sender->repr().c_str(), topic.c_str(), qos);
929 - sender->getThreadData()->getSubscriptionStore()->addSubscription(sender, topic, subtopics, qos); 929 + MainApp::getMainApp()->getSubscriptionStore()->addSubscription(sender, topic, subtopics, qos);
930 subs_reponse_codes.push_back(static_cast<ReasonCodes>(qos)); 930 subs_reponse_codes.push_back(static_cast<ReasonCodes>(qos));
931 } 931 }
932 else 932 else
@@ -995,7 +995,7 @@ void MqttPacket::handleUnsubscribe() @@ -995,7 +995,7 @@ void MqttPacket::handleUnsubscribe()
995 if (topic.empty()) 995 if (topic.empty())
996 throw ProtocolError("Subscribe topic is empty.", ReasonCodes::MalformedPacket); 996 throw ProtocolError("Subscribe topic is empty.", ReasonCodes::MalformedPacket);
997 997
998 - sender->getThreadData()->getSubscriptionStore()->removeSubscription(sender, topic); 998 + MainApp::getMainApp()->getSubscriptionStore()->removeSubscription(sender, topic);
999 logger->logf(LOG_UNSUBSCRIBE, "Client '%s' unsubscribed from '%s'", sender->repr().c_str(), topic.c_str()); 999 logger->logf(LOG_UNSUBSCRIBE, "Client '%s' unsubscribed from '%s'", sender->repr().c_str(), topic.c_str());
1000 } 1000 }
1001 1001
@@ -1155,7 +1155,7 @@ void MqttPacket::handlePublish() @@ -1155,7 +1155,7 @@ void MqttPacket::handlePublish()
1155 if (publishData.retain) 1155 if (publishData.retain)
1156 { 1156 {
1157 publishData.payload = getPayloadCopy(); 1157 publishData.payload = getPayloadCopy();
1158 - sender->getThreadData()->getSubscriptionStore()->setRetainedMessage(publishData, publishData.subtopics); 1158 + MainApp::getMainApp()->getSubscriptionStore()->setRetainedMessage(publishData, publishData.subtopics);
1159 } 1159 }
1160 1160
1161 // Set dup flag to 0, because that must not be propagated [MQTT-3.3.1-3]. 1161 // Set dup flag to 0, because that must not be propagated [MQTT-3.3.1-3].
@@ -1164,7 +1164,7 @@ void MqttPacket::handlePublish() @@ -1164,7 +1164,7 @@ void MqttPacket::handlePublish()
1164 first_byte = bites[0]; 1164 first_byte = bites[0];
1165 1165
1166 PublishCopyFactory factory(this); 1166 PublishCopyFactory factory(this);
1167 - sender->getThreadData()->getSubscriptionStore()->queuePacketAtSubscribers(factory); 1167 + MainApp::getMainApp()->getSubscriptionStore()->queuePacketAtSubscribers(factory);
1168 } 1168 }
1169 else 1169 else
1170 { 1170 {
threaddata.cpp
@@ -28,8 +28,7 @@ KeepAliveCheck::KeepAliveCheck(const std::shared_ptr&lt;Client&gt; client) : @@ -28,8 +28,7 @@ KeepAliveCheck::KeepAliveCheck(const std::shared_ptr&lt;Client&gt; client) :
28 28
29 } 29 }
30 30
31 -ThreadData::ThreadData(int threadnr, std::shared_ptr<SubscriptionStore> &subscriptionStore, std::shared_ptr<Settings> settings) :  
32 - subscriptionStore(subscriptionStore), 31 +ThreadData::ThreadData(int threadnr, std::shared_ptr<Settings> settings) :
33 settingsLocalCopy(*settings.get()), 32 settingsLocalCopy(*settings.get()),
34 authentication(settingsLocalCopy), 33 authentication(settingsLocalCopy),
35 threadnr(threadnr) 34 threadnr(threadnr)
@@ -178,6 +177,8 @@ void ThreadData::publishStatsOnDollarTopic(std::vector&lt;std::shared_ptr&lt;ThreadDat @@ -178,6 +177,8 @@ void ThreadData::publishStatsOnDollarTopic(std::vector&lt;std::shared_ptr&lt;ThreadDat
178 publishStat("$SYS/broker/load/messages/sent/total", sentMessageCount); 177 publishStat("$SYS/broker/load/messages/sent/total", sentMessageCount);
179 publishStat("$SYS/broker/load/messages/sent/persecond", sentMessageCountPerSecond); 178 publishStat("$SYS/broker/load/messages/sent/persecond", sentMessageCountPerSecond);
180 179
  180 + std::shared_ptr<SubscriptionStore> subscriptionStore = MainApp::getMainApp()->getSubscriptionStore();
  181 +
181 publishStat("$SYS/broker/retained messages/count", subscriptionStore->getRetainedMessageCount()); 182 publishStat("$SYS/broker/retained messages/count", subscriptionStore->getRetainedMessageCount());
182 183
183 publishStat("$SYS/broker/sessions/total", subscriptionStore->getSessionCount()); 184 publishStat("$SYS/broker/sessions/total", subscriptionStore->getSessionCount());
@@ -190,17 +191,20 @@ void ThreadData::publishStat(const std::string &amp;topic, uint64_t n) @@ -190,17 +191,20 @@ void ThreadData::publishStat(const std::string &amp;topic, uint64_t n)
190 const std::string payload = std::to_string(n); 191 const std::string payload = std::to_string(n);
191 Publish p(topic, payload, 0); 192 Publish p(topic, payload, 0);
192 PublishCopyFactory factory(&p); 193 PublishCopyFactory factory(&p);
  194 + std::shared_ptr<SubscriptionStore> subscriptionStore = MainApp::getMainApp()->getSubscriptionStore();
193 subscriptionStore->queuePacketAtSubscribers(factory, true); 195 subscriptionStore->queuePacketAtSubscribers(factory, true);
194 subscriptionStore->setRetainedMessage(p, factory.getSubtopics()); 196 subscriptionStore->setRetainedMessage(p, factory.getSubtopics());
195 } 197 }
196 198
197 void ThreadData::sendQueuedWills() 199 void ThreadData::sendQueuedWills()
198 { 200 {
  201 + std::shared_ptr<SubscriptionStore> subscriptionStore = MainApp::getMainApp()->getSubscriptionStore();
199 subscriptionStore->sendQueuedWillMessages(); 202 subscriptionStore->sendQueuedWillMessages();
200 } 203 }
201 204
202 void ThreadData::removeExpiredSessions() 205 void ThreadData::removeExpiredSessions()
203 { 206 {
  207 + std::shared_ptr<SubscriptionStore> subscriptionStore = MainApp::getMainApp()->getSubscriptionStore();
204 subscriptionStore->removeExpiredSessionsClients(); 208 subscriptionStore->removeExpiredSessionsClients();
205 } 209 }
206 210
@@ -357,11 +361,6 @@ void ThreadData::removeClient(std::shared_ptr&lt;Client&gt; client) @@ -357,11 +361,6 @@ void ThreadData::removeClient(std::shared_ptr&lt;Client&gt; client)
357 clients_by_fd.erase(client->getFd()); 361 clients_by_fd.erase(client->getFd());
358 } 362 }
359 363
360 -std::shared_ptr<SubscriptionStore> &ThreadData::getSubscriptionStore()  
361 -{  
362 - return subscriptionStore;  
363 -}  
364 -  
365 void ThreadData::queueDoKeepAliveCheck() 364 void ThreadData::queueDoKeepAliveCheck()
366 { 365 {
367 std::lock_guard<std::mutex> locker(taskQueueMutex); 366 std::lock_guard<std::mutex> locker(taskQueueMutex);
threaddata.h
@@ -28,11 +28,11 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -28,11 +28,11 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
28 #include <mutex> 28 #include <mutex>
29 #include <functional> 29 #include <functional>
30 #include <chrono> 30 #include <chrono>
  31 +#include <forward_list>
31 32
32 #include "forward_declarations.h" 33 #include "forward_declarations.h"
33 34
34 #include "client.h" 35 #include "client.h"
35 -#include "subscriptionstore.h"  
36 #include "utils.h" 36 #include "utils.h"
37 #include "configfileparser.h" 37 #include "configfileparser.h"
38 #include "authplugin.h" 38 #include "authplugin.h"
@@ -53,7 +53,6 @@ class ThreadData @@ -53,7 +53,6 @@ class ThreadData
53 { 53 {
54 std::unordered_map<int, std::shared_ptr<Client>> clients_by_fd; 54 std::unordered_map<int, std::shared_ptr<Client>> clients_by_fd;
55 std::mutex clients_by_fd_mutex; 55 std::mutex clients_by_fd_mutex;
56 - std::shared_ptr<SubscriptionStore> subscriptionStore;  
57 Logger *logger; 56 Logger *logger;
58 57
59 std::mutex clientsToRemoveMutex; 58 std::mutex clientsToRemoveMutex;
@@ -94,7 +93,7 @@ public: @@ -94,7 +93,7 @@ public:
94 DerivableCounter sentMessageCounter; 93 DerivableCounter sentMessageCounter;
95 DerivableCounter mqttConnectCounter; 94 DerivableCounter mqttConnectCounter;
96 95
97 - ThreadData(int threadnr, std::shared_ptr<SubscriptionStore> &subscriptionStore, std::shared_ptr<Settings> settings); 96 + ThreadData(int threadnr, std::shared_ptr<Settings> settings);
98 ThreadData(const ThreadData &other) = delete; 97 ThreadData(const ThreadData &other) = delete;
99 ThreadData(ThreadData &&other) = delete; 98 ThreadData(ThreadData &&other) = delete;
100 99
@@ -105,7 +104,6 @@ public: @@ -105,7 +104,6 @@ public:
105 void removeClientQueued(const std::shared_ptr<Client> &client); 104 void removeClientQueued(const std::shared_ptr<Client> &client);
106 void removeClientQueued(int fd); 105 void removeClientQueued(int fd);
107 void removeClient(std::shared_ptr<Client> client); 106 void removeClient(std::shared_ptr<Client> client);
108 - std::shared_ptr<SubscriptionStore> &getSubscriptionStore();  
109 107
110 void initAuthPlugin(); 108 void initAuthPlugin();
111 void cleanupAuthPlugin(); 109 void cleanupAuthPlugin();