From bc5029262e7294f75289fe2cb127d49108edf9b5 Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Sun, 21 Aug 2022 15:04:48 +0200 Subject: [PATCH] Reducing locking and memory overhead for saving sessions --- sessionsandsubscriptionsdb.cpp | 15 ++++++++++++--- sessionsandsubscriptionsdb.h | 2 +- subscriptionstore.cpp | 49 ++++++++++++++++++++++++++----------------------- subscriptionstore.h | 2 +- 4 files changed, 40 insertions(+), 28 deletions(-) diff --git a/sessionsandsubscriptionsdb.cpp b/sessionsandsubscriptionsdb.cpp index 0d6ae88..a5108f8 100644 --- a/sessionsandsubscriptionsdb.cpp +++ b/sessionsandsubscriptionsdb.cpp @@ -242,7 +242,7 @@ void SessionsAndSubscriptionsDB::writeRowHeader() } -void SessionsAndSubscriptionsDB::saveData(const std::vector> &sessions, const std::unordered_map> &subscriptions) +void SessionsAndSubscriptionsDB::saveData(const std::vector> &sessions, const std::unordered_map> &subscriptions) { if (!f) return; @@ -254,12 +254,21 @@ void SessionsAndSubscriptionsDB::saveData(const std::vectorlogf(LOG_DEBUG, "Saving current time stamp %ld", now_epoch); writeInt64(now_epoch); - writeUint32(sessions.size()); + std::vector> sessionsToSave; + // Sessions created with clean session need to be destroyed when disconnecting, so no point in saving them. + std::copy_if(sessions.begin(), sessions.end(), std::back_inserter(sessionsToSave), [](const std::shared_ptr &ses) { + return !ses->destroyOnDisconnect; + }); + + writeUint32(sessionsToSave.size()); CirBuf cirbuf(1024); - for (const std::unique_ptr &ses : sessions) + for (const std::shared_ptr &_ses : sessionsToSave) { + // Takes care of locking, and working on the snapshot/copy prevents doing disk IO under lock. + std::unique_ptr ses = _ses->getCopy(); + logger->logf(LOG_DEBUG, "Saving session '%s'.", ses->getClientId().c_str()); writeRowHeader(); diff --git a/sessionsandsubscriptionsdb.h b/sessionsandsubscriptionsdb.h index 5b79156..a2756ef 100644 --- a/sessionsandsubscriptionsdb.h +++ b/sessionsandsubscriptionsdb.h @@ -68,7 +68,7 @@ public: void openWrite(); void openRead(); - void saveData(const std::vector> &sessions, const std::unordered_map> &subscriptions); + void saveData(const std::vector> &sessions, const std::unordered_map> &subscriptions); SessionsAndSubscriptionsResult readData(); }; diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 69bd20d..fe631b2 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -134,7 +134,7 @@ SubscriptionNode *SubscriptionStore::getDeepestNode(const std::string &topic, co void SubscriptionStore::addSubscription(std::shared_ptr &client, const std::string &topic, const std::vector &subtopics, char qos) { - RWLockGuard lock_guard(&subscriptionsRwlock); + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock); lock_guard.wrlock(); SubscriptionNode *deepestNode = getDeepestNode(topic, subtopics); @@ -160,7 +160,7 @@ void SubscriptionStore::removeSubscription(std::shared_ptr &client, cons if (topic.length() > 0 && topic[0] == '$') deepestNode = &rootDollar; - RWLockGuard lock_guard(&subscriptionsRwlock); + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock); lock_guard.wrlock(); // This code looks like that for addSubscription(), but it's specifically different in that we don't want to default-create non-existing @@ -227,7 +227,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr { ThreadGlobals::getThreadData()->queueClientNextKeepAliveCheckLocked(client, true); - RWLockGuard lock_guard(&subscriptionsRwlock); + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock); lock_guard.wrlock(); if (client->getClientId().empty()) @@ -274,7 +274,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr */ std::shared_ptr SubscriptionStore::lockSession(const std::string &clientid) { - RWLockGuard lock_guard(&subscriptionsRwlock); + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock); lock_guard.rdlock(); auto it = sessionsByIdConst.find(clientid); @@ -463,7 +463,7 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory ©Factory { const std::vector &subtopics = copyFactory.getSubtopics(); - RWLockGuard lock_guard(&subscriptionsRwlock); + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock); lock_guard.rdlock(); publishRecursively(subtopics.begin(), subtopics.end(), startNode, subscriberSessions); } @@ -649,7 +649,7 @@ void SubscriptionStore::removeSession(const std::shared_ptr &session) queueWillMessage(will, session, true); } - RWLockGuard lock_guard(&subscriptionsRwlock); + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock); lock_guard.wrlock(); auto session_it = sessionsById.find(clientid); @@ -724,7 +724,7 @@ void SubscriptionStore::removeExpiredSessionsClients() if (lastTreeCleanup + std::chrono::minutes(30) < now) { - RWLockGuard lock_guard(&subscriptionsRwlock); + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock); lock_guard.wrlock(); logger->logf(LOG_NOTICE, "Rebuilding subscription tree"); @@ -764,7 +764,7 @@ int64_t SubscriptionStore::getSubscriptionCount() { int64_t count = 0; - RWLockGuard lock_guard(&subscriptionsRwlock); + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock); lock_guard.rdlock(); countSubscriptions(&root, count); @@ -902,38 +902,41 @@ void SubscriptionStore::loadRetainedMessages(const std::string &filePath) void SubscriptionStore::saveSessionsAndSubscriptions(const std::string &filePath) { - logger->logf(LOG_INFO, "Saving sessions and subscriptions to '%s'", filePath.c_str()); + logger->logf(LOG_INFO, "Saving sessions and subscriptions to '%s' in thread.", filePath.c_str()); - std::vector> sessionCopies; + const std::chrono::time_point start = std::chrono::steady_clock::now(); + + std::vector> sessionPointers; std::unordered_map> subscriptionCopies; { - RWLockGuard lock_guard(&subscriptionsRwlock); + RWLockGuard lock_guard(&sessionsAndSubscriptionsRwlock); lock_guard.rdlock(); - sessionCopies.reserve(sessionsByIdConst.size()); + sessionPointers.reserve(sessionsByIdConst.size()); for (const auto &pair : sessionsByIdConst) { - const Session &org = *pair.second.get(); - - // Sessions created with clean session need to be destroyed when disconnecting, so no point in saving them. - if (org.getDestroyOnDisconnect()) - continue; - - sessionCopies.push_back(org.getCopy()); + sessionPointers.push_back(pair.second); } getSubscriptions(&root, "", true, subscriptionCopies); } - // Then write the copies to disk, after having released the lock + const std::chrono::time_point doneCopying = std::chrono::steady_clock::now(); - logger->logf(LOG_DEBUG, "Collected %ld sessions and %ld subscriptions to save.", sessionCopies.size(), subscriptionCopies.size()); + const std::chrono::milliseconds copyDuration = std::chrono::duration_cast(doneCopying - start); + logger->logf(LOG_DEBUG, "Collected %ld sessions and %ld subscriptions to save in %ld ms.", sessionPointers.size(), subscriptionCopies.size(), copyDuration.count()); SessionsAndSubscriptionsDB db(filePath); db.openWrite(); - db.saveData(sessionCopies, subscriptionCopies); + db.saveData(sessionPointers, subscriptionCopies); + + const std::chrono::time_point doneSaving = std::chrono::steady_clock::now(); + + const std::chrono::milliseconds saveDuration = std::chrono::duration_cast(doneSaving - doneCopying); + logger->logf(LOG_INFO, "Saved %ld sessions and %ld subscriptions to '%s' in %ld ms.", + sessionPointers.size(), subscriptionCopies.size(), filePath.c_str(), saveDuration.count()); } void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &filePath) @@ -946,7 +949,7 @@ void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &filePath db.openRead(); SessionsAndSubscriptionsResult loadedData = db.readData(); - RWLockGuard locker(&subscriptionsRwlock); + RWLockGuard locker(&sessionsAndSubscriptionsRwlock); locker.wrlock(); for (std::shared_ptr &session : loadedData.sessions) diff --git a/subscriptionstore.h b/subscriptionstore.h index ca77405..efebf43 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -106,7 +106,7 @@ class SubscriptionStore SubscriptionNode root; SubscriptionNode rootDollar; - pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER; + pthread_rwlock_t sessionsAndSubscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER; std::unordered_map> sessionsById; const std::unordered_map> &sessionsByIdConst; -- libgit2 0.21.4