Commit 161199521d8d1d6692c09b89723c28dc827c2da2

Authored by Wiebe Cazemier
1 parent 41ee727a

Untested new format of saving sessions

But, this is a safe point before I will refactor it. I will remove the
appStartTime and session last touched. With the new queued removals,
this is no longer necessary.
mqttpacket.cpp
@@ -343,7 +343,7 @@ void MqttPacket::handleConnect() @@ -343,7 +343,7 @@ void MqttPacket::handleConnect()
343 uint16_t keep_alive = readTwoBytesToUInt16(); 343 uint16_t keep_alive = readTwoBytesToUInt16();
344 344
345 uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient; 345 uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient;
346 - uint32_t session_expire = settings.expireSessionsAfterSeconds > 0 ? settings.expireSessionsAfterSeconds : std::numeric_limits<uint32_t>::max(); 346 + uint32_t session_expire = settings.getExpireSessionAfterSeconds();
347 uint32_t max_outgoing_packet_size = settings.maxPacketSize; 347 uint32_t max_outgoing_packet_size = settings.maxPacketSize;
348 uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases 348 uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases
349 bool request_response_information = false; 349 bool request_response_information = false;
session.cpp
@@ -49,7 +49,10 @@ void Session::setProgramStartedAtUnixTimestamp(const int64_t unix_timestamp) @@ -49,7 +49,10 @@ void Session::setProgramStartedAtUnixTimestamp(const int64_t unix_timestamp)
49 appStartTime = std::chrono::steady_clock::now() - age_in_s; 49 appStartTime = std::chrono::steady_clock::now() - age_in_s;
50 } 50 }
51 51
52 - 52 +/**
  53 + * @brief Session::getSessionRelativeAgeInMs is used to get the value to store on disk when saving sessions.
  54 + * @return
  55 + */
53 int64_t Session::getSessionRelativeAgeInMs() const 56 int64_t Session::getSessionRelativeAgeInMs() const
54 { 57 {
55 const std::chrono::milliseconds sessionAge = std::chrono::duration_cast<std::chrono::milliseconds>(lastTouched - appStartTime); 58 const std::chrono::milliseconds sessionAge = std::chrono::duration_cast<std::chrono::milliseconds>(lastTouched - appStartTime);
@@ -57,6 +60,10 @@ int64_t Session::getSessionRelativeAgeInMs() const @@ -57,6 +60,10 @@ int64_t Session::getSessionRelativeAgeInMs() const
57 return sInMs; 60 return sInMs;
58 } 61 }
59 62
  63 +/**
  64 + * @brief Session::setSessionTouch is the set 'lastTouched' value relative to the app start time when a session is loaded from disk.
  65 + * @param ageInMs
  66 + */
60 void Session::setSessionTouch(int64_t ageInMs) 67 void Session::setSessionTouch(int64_t ageInMs)
61 { 68 {
62 std::chrono::milliseconds ms(ageInMs); 69 std::chrono::milliseconds ms(ageInMs);
sessionsandsubscriptionsdb.cpp
@@ -17,6 +17,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -17,6 +17,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
17 17
18 #include "sessionsandsubscriptionsdb.h" 18 #include "sessionsandsubscriptionsdb.h"
19 #include "mqttpacket.h" 19 #include "mqttpacket.h"
  20 +#include "threadglobals.h"
20 21
21 #include "cassert" 22 #include "cassert"
22 23
@@ -58,18 +59,20 @@ void SessionsAndSubscriptionsDB::openRead() @@ -58,18 +59,20 @@ void SessionsAndSubscriptionsDB::openRead()
58 59
59 SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() 60 SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
60 { 61 {
  62 + const Settings *settings = ThreadGlobals::getSettings();
  63 +
61 SessionsAndSubscriptionsResult result; 64 SessionsAndSubscriptionsResult result;
62 65
63 while (!feof(f)) 66 while (!feof(f))
64 { 67 {
65 bool eofFound = false; 68 bool eofFound = false;
66 69
67 - const int64_t programStartAge = readInt64(eofFound); 70 + const int64_t programStartStamp = readInt64(eofFound);
68 if (eofFound) 71 if (eofFound)
69 continue; 72 continue;
70 73
71 - logger->logf(LOG_DEBUG, "Setting first app start time to timestamp %ld", programStartAge);  
72 - Session::setProgramStartedAtUnixTimestamp(programStartAge); 74 + logger->logf(LOG_DEBUG, "Setting first app start time to timestamp %ld", programStartStamp);
  75 + Session::setProgramStartedAtUnixTimestamp(programStartStamp);
73 76
74 const uint32_t nrOfSessions = readUint32(eofFound); 77 const uint32_t nrOfSessions = readUint32(eofFound);
75 78
@@ -146,6 +149,16 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -146,6 +149,16 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
146 int64_t sessionAge = readInt64(eofFound); 149 int64_t sessionAge = readInt64(eofFound);
147 logger->logf(LOG_DEBUG, "Loaded session age: %ld ms.", sessionAge); 150 logger->logf(LOG_DEBUG, "Loaded session age: %ld ms.", sessionAge);
148 ses->setSessionTouch(sessionAge); 151 ses->setSessionTouch(sessionAge);
  152 +
  153 + const uint32_t sessionExpiryInterval = std::min<uint32_t>(readUint32(eofFound), settings->getExpireSessionAfterSeconds());
  154 + const uint16_t maxQosPending = std::min<uint16_t>(readUint16(eofFound), settings->maxQosMsgPendingPerClient);
  155 +
  156 + // TODO: perhaps I should calculate a new sessionExpiryInterval, minus the time it was off?
  157 +
  158 + // Setting the sessionExpiryInterval back to what it was is somewhat naive, in that when you have the
  159 + // server off for a week, you basically suspended time and will delay all session destructions. But,
  160 + // I'm chosing that option versus kicking out all sessions if the server was off for a longer period.
  161 + ses->setSessionProperties(maxQosPending, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5); // The protocol version is just dummy, to get the behavior I want.
149 } 162 }
150 163
151 const uint32_t nrOfSubscriptions = readUint32(eofFound); 164 const uint32_t nrOfSubscriptions = readUint32(eofFound);
@@ -259,6 +272,9 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -259,6 +272,9 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
259 const int64_t sInMs = ses->getSessionRelativeAgeInMs(); 272 const int64_t sInMs = ses->getSessionRelativeAgeInMs();
260 logger->logf(LOG_DEBUG, "Writing session age: %ld ms.", sInMs); 273 logger->logf(LOG_DEBUG, "Writing session age: %ld ms.", sInMs);
261 writeInt64(sInMs); 274 writeInt64(sInMs);
  275 +
  276 + writeUint32(ses->sessionExpiryInterval);
  277 + writeUint16(ses->maxQosMsgPending);
262 } 278 }
263 279
264 writeUint32(subscriptions.size()); 280 writeUint32(subscriptions.size());
settings.cpp
@@ -45,3 +45,12 @@ std::string Settings::getSessionsDBFile() const @@ -45,3 +45,12 @@ std::string Settings::getSessionsDBFile() const
45 std::string path = formatString("%s/%s", storageDir.c_str(), "sessions.db"); 45 std::string path = formatString("%s/%s", storageDir.c_str(), "sessions.db");
46 return path; 46 return path;
47 } 47 }
  48 +
  49 +/**
  50 + * @brief because 0 means 'forever', we have to translate this.
  51 + * @return
  52 + */
  53 +uint32_t Settings::getExpireSessionAfterSeconds() const
  54 +{
  55 + return expireSessionsAfterSeconds > 0 ? expireSessionsAfterSeconds : std::numeric_limits<uint32_t>::max();
  56 +}
settings.h
@@ -69,6 +69,8 @@ public: @@ -69,6 +69,8 @@ public:
69 69
70 std::string getRetainedMessagesDBFile() const; 70 std::string getRetainedMessagesDBFile() const;
71 std::string getSessionsDBFile() const; 71 std::string getSessionsDBFile() const;
  72 +
  73 + uint32_t getExpireSessionAfterSeconds() const;
72 }; 74 };
73 75
74 #endif // SETTINGS_H 76 #endif // SETTINGS_H
subscriptionstore.cpp
@@ -855,6 +855,7 @@ void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &amp;filePath @@ -855,6 +855,7 @@ void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &amp;filePath
855 for (std::shared_ptr<Session> &session : loadedData.sessions) 855 for (std::shared_ptr<Session> &session : loadedData.sessions)
856 { 856 {
857 sessionsById[session->getClientId()] = session; 857 sessionsById[session->getClientId()] = session;
  858 + queueSessionRemoval(session);
858 } 859 }
859 860
860 std::vector<std::string> subtopics; 861 std::vector<std::string> subtopics;