Commit bc5029262e7294f75289fe2cb127d49108edf9b5

Authored by Wiebe Cazemier
1 parent ef845a8c

Reducing locking and memory overhead for saving sessions

sessionsandsubscriptionsdb.cpp
@@ -242,7 +242,7 @@ void SessionsAndSubscriptionsDB::writeRowHeader() @@ -242,7 +242,7 @@ void SessionsAndSubscriptionsDB::writeRowHeader()
242 242
243 } 243 }
244 244
245 -void SessionsAndSubscriptionsDB::saveData(const std::vector<std::unique_ptr<Session>> &sessions, const std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &subscriptions) 245 +void SessionsAndSubscriptionsDB::saveData(const std::vector<std::shared_ptr<Session>> &sessions, const std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &subscriptions)
246 { 246 {
247 if (!f) 247 if (!f)
248 return; 248 return;
@@ -254,12 +254,21 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -254,12 +254,21 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
254 logger->logf(LOG_DEBUG, "Saving current time stamp %ld", now_epoch); 254 logger->logf(LOG_DEBUG, "Saving current time stamp %ld", now_epoch);
255 writeInt64(now_epoch); 255 writeInt64(now_epoch);
256 256
257 - writeUint32(sessions.size()); 257 + std::vector<std::shared_ptr<Session>> sessionsToSave;
  258 + // Sessions created with clean session need to be destroyed when disconnecting, so no point in saving them.
  259 + std::copy_if(sessions.begin(), sessions.end(), std::back_inserter(sessionsToSave), [](const std::shared_ptr<Session> &ses) {
  260 + return !ses->destroyOnDisconnect;
  261 + });
  262 +
  263 + writeUint32(sessionsToSave.size());
258 264
259 CirBuf cirbuf(1024); 265 CirBuf cirbuf(1024);
260 266
261 - for (const std::unique_ptr<Session> &ses : sessions) 267 + for (const std::shared_ptr<Session> &_ses : sessionsToSave)
262 { 268 {
  269 + // Takes care of locking, and working on the snapshot/copy prevents doing disk IO under lock.
  270 + std::unique_ptr<Session> ses = _ses->getCopy();
  271 +
263 logger->logf(LOG_DEBUG, "Saving session '%s'.", ses->getClientId().c_str()); 272 logger->logf(LOG_DEBUG, "Saving session '%s'.", ses->getClientId().c_str());
264 273
265 writeRowHeader(); 274 writeRowHeader();
sessionsandsubscriptionsdb.h
@@ -68,7 +68,7 @@ public: @@ -68,7 +68,7 @@ public:
68 void openWrite(); 68 void openWrite();
69 void openRead(); 69 void openRead();
70 70
71 - void saveData(const std::vector<std::unique_ptr<Session>> &sessions, const std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &subscriptions); 71 + void saveData(const std::vector<std::shared_ptr<Session>> &sessions, const std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &subscriptions);
72 SessionsAndSubscriptionsResult readData(); 72 SessionsAndSubscriptionsResult readData();
73 }; 73 };
74 74
subscriptionstore.cpp
@@ -134,7 +134,7 @@ SubscriptionNode *SubscriptionStore::getDeepestNode(const std::string &amp;topic, co @@ -134,7 +134,7 @@ SubscriptionNode *SubscriptionStore::getDeepestNode(const std::string &amp;topic, co
134 134
135 void SubscriptionStore::addSubscription(std::shared_ptr<Client> &client, const std::string &topic, const std::vector<std::string> &subtopics, char qos) 135 void SubscriptionStore::addSubscription(std::shared_ptr<Client> &client, const std::string &topic, const std::vector<std::string> &subtopics, char qos)
136 { 136 {
137 - RWLockGuard lock_guard(&subscriptionsRwlock); 137 + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock);
138 lock_guard.wrlock(); 138 lock_guard.wrlock();
139 139
140 SubscriptionNode *deepestNode = getDeepestNode(topic, subtopics); 140 SubscriptionNode *deepestNode = getDeepestNode(topic, subtopics);
@@ -160,7 +160,7 @@ void SubscriptionStore::removeSubscription(std::shared_ptr&lt;Client&gt; &amp;client, cons @@ -160,7 +160,7 @@ void SubscriptionStore::removeSubscription(std::shared_ptr&lt;Client&gt; &amp;client, cons
160 if (topic.length() > 0 && topic[0] == '$') 160 if (topic.length() > 0 && topic[0] == '$')
161 deepestNode = &rootDollar; 161 deepestNode = &rootDollar;
162 162
163 - RWLockGuard lock_guard(&subscriptionsRwlock); 163 + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock);
164 lock_guard.wrlock(); 164 lock_guard.wrlock();
165 165
166 // This code looks like that for addSubscription(), but it's specifically different in that we don't want to default-create non-existing 166 // This code looks like that for addSubscription(), but it's specifically different in that we don't want to default-create non-existing
@@ -227,7 +227,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt; @@ -227,7 +227,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
227 { 227 {
228 ThreadGlobals::getThreadData()->queueClientNextKeepAliveCheckLocked(client, true); 228 ThreadGlobals::getThreadData()->queueClientNextKeepAliveCheckLocked(client, true);
229 229
230 - RWLockGuard lock_guard(&subscriptionsRwlock); 230 + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock);
231 lock_guard.wrlock(); 231 lock_guard.wrlock();
232 232
233 if (client->getClientId().empty()) 233 if (client->getClientId().empty())
@@ -274,7 +274,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt; @@ -274,7 +274,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
274 */ 274 */
275 std::shared_ptr<Session> SubscriptionStore::lockSession(const std::string &clientid) 275 std::shared_ptr<Session> SubscriptionStore::lockSession(const std::string &clientid)
276 { 276 {
277 - RWLockGuard lock_guard(&subscriptionsRwlock); 277 + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock);
278 lock_guard.rdlock(); 278 lock_guard.rdlock();
279 279
280 auto it = sessionsByIdConst.find(clientid); 280 auto it = sessionsByIdConst.find(clientid);
@@ -463,7 +463,7 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory &amp;copyFactory @@ -463,7 +463,7 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory &amp;copyFactory
463 463
464 { 464 {
465 const std::vector<std::string> &subtopics = copyFactory.getSubtopics(); 465 const std::vector<std::string> &subtopics = copyFactory.getSubtopics();
466 - RWLockGuard lock_guard(&subscriptionsRwlock); 466 + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock);
467 lock_guard.rdlock(); 467 lock_guard.rdlock();
468 publishRecursively(subtopics.begin(), subtopics.end(), startNode, subscriberSessions); 468 publishRecursively(subtopics.begin(), subtopics.end(), startNode, subscriberSessions);
469 } 469 }
@@ -649,7 +649,7 @@ void SubscriptionStore::removeSession(const std::shared_ptr&lt;Session&gt; &amp;session) @@ -649,7 +649,7 @@ void SubscriptionStore::removeSession(const std::shared_ptr&lt;Session&gt; &amp;session)
649 queueWillMessage(will, session, true); 649 queueWillMessage(will, session, true);
650 } 650 }
651 651
652 - RWLockGuard lock_guard(&subscriptionsRwlock); 652 + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock);
653 lock_guard.wrlock(); 653 lock_guard.wrlock();
654 654
655 auto session_it = sessionsById.find(clientid); 655 auto session_it = sessionsById.find(clientid);
@@ -724,7 +724,7 @@ void SubscriptionStore::removeExpiredSessionsClients() @@ -724,7 +724,7 @@ void SubscriptionStore::removeExpiredSessionsClients()
724 724
725 if (lastTreeCleanup + std::chrono::minutes(30) < now) 725 if (lastTreeCleanup + std::chrono::minutes(30) < now)
726 { 726 {
727 - RWLockGuard lock_guard(&subscriptionsRwlock); 727 + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock);
728 lock_guard.wrlock(); 728 lock_guard.wrlock();
729 729
730 logger->logf(LOG_NOTICE, "Rebuilding subscription tree"); 730 logger->logf(LOG_NOTICE, "Rebuilding subscription tree");
@@ -764,7 +764,7 @@ int64_t SubscriptionStore::getSubscriptionCount() @@ -764,7 +764,7 @@ int64_t SubscriptionStore::getSubscriptionCount()
764 { 764 {
765 int64_t count = 0; 765 int64_t count = 0;
766 766
767 - RWLockGuard lock_guard(&subscriptionsRwlock); 767 + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock);
768 lock_guard.rdlock(); 768 lock_guard.rdlock();
769 769
770 countSubscriptions(&root, count); 770 countSubscriptions(&root, count);
@@ -902,38 +902,41 @@ void SubscriptionStore::loadRetainedMessages(const std::string &amp;filePath) @@ -902,38 +902,41 @@ void SubscriptionStore::loadRetainedMessages(const std::string &amp;filePath)
902 902
903 void SubscriptionStore::saveSessionsAndSubscriptions(const std::string &filePath) 903 void SubscriptionStore::saveSessionsAndSubscriptions(const std::string &filePath)
904 { 904 {
905 - logger->logf(LOG_INFO, "Saving sessions and subscriptions to '%s'", filePath.c_str()); 905 + logger->logf(LOG_INFO, "Saving sessions and subscriptions to '%s' in thread.", filePath.c_str());
906 906
907 - std::vector<std::unique_ptr<Session>> sessionCopies; 907 + const std::chrono::time_point<std::chrono::steady_clock> start = std::chrono::steady_clock::now();
  908 +
  909 + std::vector<std::shared_ptr<Session>> sessionPointers;
908 std::unordered_map<std::string, std::list<SubscriptionForSerializing>> subscriptionCopies; 910 std::unordered_map<std::string, std::list<SubscriptionForSerializing>> subscriptionCopies;
909 911
910 { 912 {
911 - RWLockGuard lock_guard(&subscriptionsRwlock); 913 + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock);
912 lock_guard.rdlock(); 914 lock_guard.rdlock();
913 915
914 - sessionCopies.reserve(sessionsByIdConst.size()); 916 + sessionPointers.reserve(sessionsByIdConst.size());
915 917
916 for (const auto &pair : sessionsByIdConst) 918 for (const auto &pair : sessionsByIdConst)
917 { 919 {
918 - const Session &org = *pair.second.get();  
919 -  
920 - // Sessions created with clean session need to be destroyed when disconnecting, so no point in saving them.  
921 - if (org.getDestroyOnDisconnect())  
922 - continue;  
923 -  
924 - sessionCopies.push_back(org.getCopy()); 920 + sessionPointers.push_back(pair.second);
925 } 921 }
926 922
927 getSubscriptions(&root, "", true, subscriptionCopies); 923 getSubscriptions(&root, "", true, subscriptionCopies);
928 } 924 }
929 925
930 - // Then write the copies to disk, after having released the lock 926 + const std::chrono::time_point<std::chrono::steady_clock> doneCopying = std::chrono::steady_clock::now();
931 927
932 - logger->logf(LOG_DEBUG, "Collected %ld sessions and %ld subscriptions to save.", sessionCopies.size(), subscriptionCopies.size()); 928 + const std::chrono::milliseconds copyDuration = std::chrono::duration_cast<std::chrono::milliseconds>(doneCopying - start);
  929 + logger->logf(LOG_DEBUG, "Collected %ld sessions and %ld subscriptions to save in %ld ms.", sessionPointers.size(), subscriptionCopies.size(), copyDuration.count());
933 930
934 SessionsAndSubscriptionsDB db(filePath); 931 SessionsAndSubscriptionsDB db(filePath);
935 db.openWrite(); 932 db.openWrite();
936 - db.saveData(sessionCopies, subscriptionCopies); 933 + db.saveData(sessionPointers, subscriptionCopies);
  934 +
  935 + const std::chrono::time_point<std::chrono::steady_clock> doneSaving = std::chrono::steady_clock::now();
  936 +
  937 + const std::chrono::milliseconds saveDuration = std::chrono::duration_cast<std::chrono::milliseconds>(doneSaving - doneCopying);
  938 + logger->logf(LOG_INFO, "Saved %ld sessions and %ld subscriptions to '%s' in %ld ms.",
  939 + sessionPointers.size(), subscriptionCopies.size(), filePath.c_str(), saveDuration.count());
937 } 940 }
938 941
939 void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &filePath) 942 void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &filePath)
@@ -946,7 +949,7 @@ void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &amp;filePath @@ -946,7 +949,7 @@ void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &amp;filePath
946 db.openRead(); 949 db.openRead();
947 SessionsAndSubscriptionsResult loadedData = db.readData(); 950 SessionsAndSubscriptionsResult loadedData = db.readData();
948 951
949 - RWLockGuard locker(&subscriptionsRwlock); 952 + RWLockGuard locker(&sessionsAndSubscriptionsRwlock);
950 locker.wrlock(); 953 locker.wrlock();
951 954
952 for (std::shared_ptr<Session> &session : loadedData.sessions) 955 for (std::shared_ptr<Session> &session : loadedData.sessions)
subscriptionstore.h
@@ -106,7 +106,7 @@ class SubscriptionStore @@ -106,7 +106,7 @@ class SubscriptionStore
106 106
107 SubscriptionNode root; 107 SubscriptionNode root;
108 SubscriptionNode rootDollar; 108 SubscriptionNode rootDollar;
109 - pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER; 109 + pthread_rwlock_t sessionsAndSubscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER;
110 std::unordered_map<std::string, std::shared_ptr<Session>> sessionsById; 110 std::unordered_map<std::string, std::shared_ptr<Session>> sessionsById;
111 const std::unordered_map<std::string, std::shared_ptr<Session>> &sessionsByIdConst; 111 const std::unordered_map<std::string, std::shared_ptr<Session>> &sessionsByIdConst;
112 112