From 161199521d8d1d6692c09b89723c28dc827c2da2 Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Wed, 6 Apr 2022 20:03:17 +0200 Subject: [PATCH] Untested new format of saving sessions --- mqttpacket.cpp | 2 +- session.cpp | 9 ++++++++- sessionsandsubscriptionsdb.cpp | 22 +++++++++++++++++++--- settings.cpp | 9 +++++++++ settings.h | 2 ++ subscriptionstore.cpp | 1 + 6 files changed, 40 insertions(+), 5 deletions(-) diff --git a/mqttpacket.cpp b/mqttpacket.cpp index f5d6762..57f44de 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -343,7 +343,7 @@ void MqttPacket::handleConnect() uint16_t keep_alive = readTwoBytesToUInt16(); uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient; - uint32_t session_expire = settings.expireSessionsAfterSeconds > 0 ? settings.expireSessionsAfterSeconds : std::numeric_limits::max(); + uint32_t session_expire = settings.getExpireSessionAfterSeconds(); uint32_t max_outgoing_packet_size = settings.maxPacketSize; uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases bool request_response_information = false; diff --git a/session.cpp b/session.cpp index 0a149dd..9828e08 100644 --- a/session.cpp +++ b/session.cpp @@ -49,7 +49,10 @@ void Session::setProgramStartedAtUnixTimestamp(const int64_t unix_timestamp) appStartTime = std::chrono::steady_clock::now() - age_in_s; } - +/** + * @brief Session::getSessionRelativeAgeInMs is used to get the value to store on disk when saving sessions. + * @return + */ int64_t Session::getSessionRelativeAgeInMs() const { const std::chrono::milliseconds sessionAge = std::chrono::duration_cast(lastTouched - appStartTime); @@ -57,6 +60,10 @@ int64_t Session::getSessionRelativeAgeInMs() const return sInMs; } +/** + * @brief Session::setSessionTouch is the set 'lastTouched' value relative to the app start time when a session is loaded from disk. + * @param ageInMs + */ void Session::setSessionTouch(int64_t ageInMs) { std::chrono::milliseconds ms(ageInMs); diff --git a/sessionsandsubscriptionsdb.cpp b/sessionsandsubscriptionsdb.cpp index 3812c28..8c205f1 100644 --- a/sessionsandsubscriptionsdb.cpp +++ b/sessionsandsubscriptionsdb.cpp @@ -17,6 +17,7 @@ License along with FlashMQ. If not, see . #include "sessionsandsubscriptionsdb.h" #include "mqttpacket.h" +#include "threadglobals.h" #include "cassert" @@ -58,18 +59,20 @@ void SessionsAndSubscriptionsDB::openRead() SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() { + const Settings *settings = ThreadGlobals::getSettings(); + SessionsAndSubscriptionsResult result; while (!feof(f)) { bool eofFound = false; - const int64_t programStartAge = readInt64(eofFound); + const int64_t programStartStamp = readInt64(eofFound); if (eofFound) continue; - logger->logf(LOG_DEBUG, "Setting first app start time to timestamp %ld", programStartAge); - Session::setProgramStartedAtUnixTimestamp(programStartAge); + logger->logf(LOG_DEBUG, "Setting first app start time to timestamp %ld", programStartStamp); + Session::setProgramStartedAtUnixTimestamp(programStartStamp); const uint32_t nrOfSessions = readUint32(eofFound); @@ -146,6 +149,16 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() int64_t sessionAge = readInt64(eofFound); logger->logf(LOG_DEBUG, "Loaded session age: %ld ms.", sessionAge); ses->setSessionTouch(sessionAge); + + const uint32_t sessionExpiryInterval = std::min(readUint32(eofFound), settings->getExpireSessionAfterSeconds()); + const uint16_t maxQosPending = std::min(readUint16(eofFound), settings->maxQosMsgPendingPerClient); + + // TODO: perhaps I should calculate a new sessionExpiryInterval, minus the time it was off? + + // Setting the sessionExpiryInterval back to what it was is somewhat naive, in that when you have the + // server off for a week, you basically suspended time and will delay all session destructions. But, + // I'm chosing that option versus kicking out all sessions if the server was off for a longer period. + ses->setSessionProperties(maxQosPending, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5); // The protocol version is just dummy, to get the behavior I want. } const uint32_t nrOfSubscriptions = readUint32(eofFound); @@ -259,6 +272,9 @@ void SessionsAndSubscriptionsDB::saveData(const std::vectorgetSessionRelativeAgeInMs(); logger->logf(LOG_DEBUG, "Writing session age: %ld ms.", sInMs); writeInt64(sInMs); + + writeUint32(ses->sessionExpiryInterval); + writeUint16(ses->maxQosMsgPending); } writeUint32(subscriptions.size()); diff --git a/settings.cpp b/settings.cpp index 629ea52..d515f68 100644 --- a/settings.cpp +++ b/settings.cpp @@ -45,3 +45,12 @@ std::string Settings::getSessionsDBFile() const std::string path = formatString("%s/%s", storageDir.c_str(), "sessions.db"); return path; } + +/** + * @brief because 0 means 'forever', we have to translate this. + * @return + */ +uint32_t Settings::getExpireSessionAfterSeconds() const +{ + return expireSessionsAfterSeconds > 0 ? expireSessionsAfterSeconds : std::numeric_limits::max(); +} diff --git a/settings.h b/settings.h index 6202d18..adb1486 100644 --- a/settings.h +++ b/settings.h @@ -69,6 +69,8 @@ public: std::string getRetainedMessagesDBFile() const; std::string getSessionsDBFile() const; + + uint32_t getExpireSessionAfterSeconds() const; }; #endif // SETTINGS_H diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index bafd082..caaa3d7 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -855,6 +855,7 @@ void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &filePath for (std::shared_ptr &session : loadedData.sessions) { sessionsById[session->getClientId()] = session; + queueSessionRemoval(session); } std::vector subtopics; -- libgit2 0.21.4