Commit 3492968061ef89680b7186ac737d4aae2894a7ee

Authored by Wiebe Cazemier
1 parent 16119952

Replace app start time with queued session removal

FlashMQTests/tst_maintests.cpp
@@ -1007,14 +1007,12 @@ void MainTests::testSavingSessions() @@ -1007,14 +1007,12 @@ void MainTests::testSavingSessions()
1007 std::shared_ptr<Client> c1(new Client(0, t, nullptr, false, nullptr, settings, false)); 1007 std::shared_ptr<Client> c1(new Client(0, t, nullptr, false, nullptr, settings, false));
1008 c1->setClientProperties(ProtocolVersion::Mqtt311, "c1", "user1", true, 60); 1008 c1->setClientProperties(ProtocolVersion::Mqtt311, "c1", "user1", true, 60);
1009 store->registerClientAndKickExistingOne(c1, false, 512, 120); 1009 store->registerClientAndKickExistingOne(c1, false, 512, 120);
1010 - c1->getSession()->touch();  
1011 c1->getSession()->addIncomingQoS2MessageId(2); 1010 c1->getSession()->addIncomingQoS2MessageId(2);
1012 c1->getSession()->addIncomingQoS2MessageId(3); 1011 c1->getSession()->addIncomingQoS2MessageId(3);
1013 1012
1014 std::shared_ptr<Client> c2(new Client(0, t, nullptr, false, nullptr, settings, false)); 1013 std::shared_ptr<Client> c2(new Client(0, t, nullptr, false, nullptr, settings, false));
1015 c2->setClientProperties(ProtocolVersion::Mqtt311, "c2", "user2", true, 60); 1014 c2->setClientProperties(ProtocolVersion::Mqtt311, "c2", "user2", true, 60);
1016 store->registerClientAndKickExistingOne(c2, false, 512, 120); 1015 store->registerClientAndKickExistingOne(c2, false, 512, 120);
1017 - c2->getSession()->touch();  
1018 c2->getSession()->addOutgoingQoS2MessageId(55); 1016 c2->getSession()->addOutgoingQoS2MessageId(55);
1019 c2->getSession()->addOutgoingQoS2MessageId(66); 1017 c2->getSession()->addOutgoingQoS2MessageId(66);
1020 1018
client.cpp
@@ -154,8 +154,6 @@ bool Client::readFdIntoBuffer() @@ -154,8 +154,6 @@ bool Client::readFdIntoBuffer()
154 } 154 }
155 155
156 lastActivity = std::chrono::steady_clock::now(); 156 lastActivity = std::chrono::steady_clock::now();
157 - if (session)  
158 - session->touch(lastActivity);  
159 157
160 return true; 158 return true;
161 } 159 }
mqttpacket.cpp
@@ -568,7 +568,15 @@ void MqttPacket::handleConnect() @@ -568,7 +568,15 @@ void MqttPacket::handleConnect()
568 568
569 if (accessGranted) 569 if (accessGranted)
570 { 570 {
571 - bool sessionPresent = protocolVersion >= ProtocolVersion::Mqtt311 && !clean_start && subscriptionStore->sessionPresent(client_id); 571 + bool sessionPresent = false;
  572 + std::shared_ptr<Session> existingSession;
  573 +
  574 + if (protocolVersion >= ProtocolVersion::Mqtt311 && !clean_start)
  575 + {
  576 + existingSession = subscriptionStore->lockSession(client_id);
  577 + if (existingSession)
  578 + sessionPresent = true;
  579 + }
572 580
573 sender->setAuthenticated(true); 581 sender->setAuthenticated(true);
574 ConnAck connAck(protocolVersion, ReasonCodes::Success, sessionPresent); 582 ConnAck connAck(protocolVersion, ReasonCodes::Success, sessionPresent);
session.cpp
@@ -22,8 +22,6 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -22,8 +22,6 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
22 #include "threadglobals.h" 22 #include "threadglobals.h"
23 #include "threadglobals.h" 23 #include "threadglobals.h"
24 24
25 -std::chrono::time_point<std::chrono::steady_clock> appStartTime = std::chrono::steady_clock::now();  
26 -  
27 Session::Session() 25 Session::Session()
28 { 26 {
29 const Settings &settings = *ThreadGlobals::getSettings(); 27 const Settings &settings = *ThreadGlobals::getSettings();
@@ -33,44 +31,6 @@ Session::Session() @@ -33,44 +31,6 @@ Session::Session()
33 this->sessionExpiryInterval = settings.expireSessionsAfterSeconds; 31 this->sessionExpiryInterval = settings.expireSessionsAfterSeconds;
34 } 32 }
35 33
36 -int64_t Session::getProgramStartedAtUnixTimestamp()  
37 -{  
38 - auto secondsSinceEpoch = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();  
39 - const std::chrono::seconds age = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - appStartTime);  
40 - int64_t result = secondsSinceEpoch - age.count();  
41 - return result;  
42 -}  
43 -  
44 -void Session::setProgramStartedAtUnixTimestamp(const int64_t unix_timestamp)  
45 -{  
46 - auto secondsSinceEpoch = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch());  
47 - const std::chrono::seconds _unix_timestamp = std::chrono::seconds(unix_timestamp);  
48 - const std::chrono::seconds age_in_s = secondsSinceEpoch - _unix_timestamp;  
49 - appStartTime = std::chrono::steady_clock::now() - age_in_s;  
50 -}  
51 -  
52 -/**  
53 - * @brief Session::getSessionRelativeAgeInMs is used to get the value to store on disk when saving sessions.  
54 - * @return  
55 - */  
56 -int64_t Session::getSessionRelativeAgeInMs() const  
57 -{  
58 - const std::chrono::milliseconds sessionAge = std::chrono::duration_cast<std::chrono::milliseconds>(lastTouched - appStartTime);  
59 - const int64_t sInMs = sessionAge.count();  
60 - return sInMs;  
61 -}  
62 -  
63 -/**  
64 - * @brief Session::setSessionTouch is the set 'lastTouched' value relative to the app start time when a session is loaded from disk.  
65 - * @param ageInMs  
66 - */  
67 -void Session::setSessionTouch(int64_t ageInMs)  
68 -{  
69 - std::chrono::milliseconds ms(ageInMs);  
70 - std::chrono::time_point<std::chrono::steady_clock> point = appStartTime + ms;  
71 - lastTouched = point;  
72 -}  
73 -  
74 bool Session::requiresPacketRetransmission() const 34 bool Session::requiresPacketRetransmission() const
75 { 35 {
76 const std::shared_ptr<Client> client = makeSharedClient(); 36 const std::shared_ptr<Client> client = makeSharedClient();
@@ -110,7 +70,6 @@ Session::Session(const Session &amp;other) @@ -110,7 +70,6 @@ Session::Session(const Session &amp;other)
110 this->incomingQoS2MessageIds = other.incomingQoS2MessageIds; 70 this->incomingQoS2MessageIds = other.incomingQoS2MessageIds;
111 this->outgoingQoS2MessageIds = other.outgoingQoS2MessageIds; 71 this->outgoingQoS2MessageIds = other.outgoingQoS2MessageIds;
112 this->nextPacketId = other.nextPacketId; 72 this->nextPacketId = other.nextPacketId;
113 - this->lastTouched = other.lastTouched;  
114 73
115 // TODO: see git history for a change here. We now copy the whole queued publish. Do we want to address that? 74 // TODO: see git history for a change here. We now copy the whole queued publish. Do we want to address that?
116 this->qosPacketQueue = other.qosPacketQueue; 75 this->qosPacketQueue = other.qosPacketQueue;
@@ -284,28 +243,9 @@ uint64_t Session::sendPendingQosMessages() @@ -284,28 +243,9 @@ uint64_t Session::sendPendingQosMessages()
284 return count; 243 return count;
285 } 244 }
286 245
287 -/**  
288 - * @brief Session::touch with a time value allowed touching without causing another sys/lib call to get the time.  
289 - * @param newval  
290 - */  
291 -void Session::touch(std::chrono::time_point<std::chrono::steady_clock> newval) 246 +bool Session::hasActiveClient() const
292 { 247 {
293 - lastTouched = newval;  
294 -}  
295 -  
296 -void Session::touch()  
297 -{  
298 - lastTouched = std::chrono::steady_clock::now();  
299 -}  
300 -  
301 -bool Session::hasExpired() const  
302 -{  
303 - if (!client.expired())  
304 - return false;  
305 -  
306 - std::chrono::seconds expireAfter(sessionExpiryInterval);  
307 - std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();  
308 - return (lastTouched + expireAfter) < now; 248 + return !client.expired();
309 } 249 }
310 250
311 void Session::clearWill() 251 void Session::clearWill()
session.h
@@ -50,12 +50,9 @@ class Session @@ -50,12 +50,9 @@ class Session
50 uint16_t maxQosMsgPending; 50 uint16_t maxQosMsgPending;
51 uint16_t QoSLogPrintedAtId = 0; 51 uint16_t QoSLogPrintedAtId = 0;
52 bool destroyOnDisconnect = false; 52 bool destroyOnDisconnect = false;
53 - std::chrono::time_point<std::chrono::steady_clock> lastTouched = std::chrono::steady_clock::now();  
54 std::shared_ptr<Publish> willPublish; 53 std::shared_ptr<Publish> willPublish;
55 Logger *logger = Logger::getInstance(); 54 Logger *logger = Logger::getInstance();
56 55
57 - int64_t getSessionRelativeAgeInMs() const;  
58 - void setSessionTouch(int64_t ageInMs);  
59 bool requiresPacketRetransmission() const; 56 bool requiresPacketRetransmission() const;
60 void increasePacketId(); 57 void increasePacketId();
61 58
@@ -66,9 +63,6 @@ public: @@ -66,9 +63,6 @@ public:
66 Session(Session &&other) = delete; 63 Session(Session &&other) = delete;
67 ~Session(); 64 ~Session();
68 65
69 - static int64_t getProgramStartedAtUnixTimestamp();  
70 - static void setProgramStartedAtUnixTimestamp(const int64_t unix_timestamp);  
71 -  
72 std::unique_ptr<Session> getCopy() const; 66 std::unique_ptr<Session> getCopy() const;
73 67
74 const std::string &getClientId() const { return client_id; } 68 const std::string &getClientId() const { return client_id; }
@@ -77,9 +71,7 @@ public: @@ -77,9 +71,7 @@ public:
77 void writePacket(PublishCopyFactory &copyFactory, const char max_qos, uint64_t &count); 71 void writePacket(PublishCopyFactory &copyFactory, const char max_qos, uint64_t &count);
78 void clearQosMessage(uint16_t packet_id); 72 void clearQosMessage(uint16_t packet_id);
79 uint64_t sendPendingQosMessages(); 73 uint64_t sendPendingQosMessages();
80 - void touch(std::chrono::time_point<std::chrono::steady_clock> val);  
81 - void touch();  
82 - bool hasExpired() const; 74 + bool hasActiveClient() const;
83 void clearWill(); 75 void clearWill();
84 std::shared_ptr<Publish> &getWill(); 76 std::shared_ptr<Publish> &getWill();
85 77
sessionsandsubscriptionsdb.cpp
@@ -67,12 +67,14 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -67,12 +67,14 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
67 { 67 {
68 bool eofFound = false; 68 bool eofFound = false;
69 69
70 - const int64_t programStartStamp = readInt64(eofFound); 70 + const int64_t fileSavedAt = readInt64(eofFound);
71 if (eofFound) 71 if (eofFound)
72 continue; 72 continue;
73 73
74 - logger->logf(LOG_DEBUG, "Setting first app start time to timestamp %ld", programStartStamp);  
75 - Session::setProgramStartedAtUnixTimestamp(programStartStamp); 74 + const int64_t now_epoch = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  75 + const int64_t persistence_state_age = fileSavedAt > now_epoch ? 0 : now_epoch - fileSavedAt;
  76 +
  77 + logger->logf(LOG_DEBUG, "Session file was saved at %ld. That's %ld seconds ago.", fileSavedAt, persistence_state_age);
76 78
77 const uint32_t nrOfSessions = readUint32(eofFound); 79 const uint32_t nrOfSessions = readUint32(eofFound);
78 80
@@ -146,19 +148,16 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -146,19 +148,16 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
146 logger->logf(LOG_DEBUG, "Loaded next packetid %d.", ses->nextPacketId); 148 logger->logf(LOG_DEBUG, "Loaded next packetid %d.", ses->nextPacketId);
147 ses->nextPacketId = nextPacketId; 149 ses->nextPacketId = nextPacketId;
148 150
149 - int64_t sessionAge = readInt64(eofFound);  
150 - logger->logf(LOG_DEBUG, "Loaded session age: %ld ms.", sessionAge);  
151 - ses->setSessionTouch(sessionAge); 151 + const uint32_t originalSessionExpiryInterval = readUint32(eofFound);
  152 + const uint32_t compensatedSessionExpiry = persistence_state_age > originalSessionExpiryInterval ? 0 : originalSessionExpiryInterval - persistence_state_age;
  153 + const uint32_t sessionExpiryInterval = std::min<uint32_t>(compensatedSessionExpiry, settings->getExpireSessionAfterSeconds());
152 154
153 - const uint32_t sessionExpiryInterval = std::min<uint32_t>(readUint32(eofFound), settings->getExpireSessionAfterSeconds());  
154 const uint16_t maxQosPending = std::min<uint16_t>(readUint16(eofFound), settings->maxQosMsgPendingPerClient); 155 const uint16_t maxQosPending = std::min<uint16_t>(readUint16(eofFound), settings->maxQosMsgPendingPerClient);
155 156
156 - // TODO: perhaps I should calculate a new sessionExpiryInterval, minus the time it was off?  
157 -  
158 - // Setting the sessionExpiryInterval back to what it was is somewhat naive, in that when you have the  
159 - // server off for a week, you basically suspended time and will delay all session destructions. But,  
160 - // I'm chosing that option versus kicking out all sessions if the server was off for a longer period.  
161 - ses->setSessionProperties(maxQosPending, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5); // The protocol version is just dummy, to get the behavior I want. 157 + // We will set the session expiry interval as it would have had time continued. If a connection picks up session, it will update
  158 + // it with a more relevant value.
  159 + // The protocol version 5 is just dummy, to get the behavior I want.
  160 + ses->setSessionProperties(maxQosPending, sessionExpiryInterval, 0, ProtocolVersion::Mqtt5);
162 } 161 }
163 162
164 const uint32_t nrOfSubscriptions = readUint32(eofFound); 163 const uint32_t nrOfSubscriptions = readUint32(eofFound);
@@ -206,9 +205,9 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -206,9 +205,9 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
206 char reserved[RESERVED_SPACE_SESSIONS_DB_V2]; 205 char reserved[RESERVED_SPACE_SESSIONS_DB_V2];
207 std::memset(reserved, 0, RESERVED_SPACE_SESSIONS_DB_V2); 206 std::memset(reserved, 0, RESERVED_SPACE_SESSIONS_DB_V2);
208 207
209 - const int64_t start_stamp = Session::getProgramStartedAtUnixTimestamp();  
210 - logger->logf(LOG_DEBUG, "Saving program first start time stamp as %ld", start_stamp);  
211 - writeInt64(start_stamp); 208 + const int64_t now_epoch = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  209 + logger->logf(LOG_DEBUG, "Saving current time stamp %ld", now_epoch);
  210 + writeInt64(now_epoch);
212 211
213 writeUint32(sessions.size()); 212 writeUint32(sessions.size());
214 213
@@ -269,10 +268,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -269,10 +268,6 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
269 logger->logf(LOG_DEBUG, "Writing next packetid %d.", ses->nextPacketId); 268 logger->logf(LOG_DEBUG, "Writing next packetid %d.", ses->nextPacketId);
270 writeUint16(ses->nextPacketId); 269 writeUint16(ses->nextPacketId);
271 270
272 - const int64_t sInMs = ses->getSessionRelativeAgeInMs();  
273 - logger->logf(LOG_DEBUG, "Writing session age: %ld ms.", sInMs);  
274 - writeInt64(sInMs);  
275 -  
276 writeUint32(ses->sessionExpiryInterval); 271 writeUint32(ses->sessionExpiryInterval);
277 writeUint16(ses->maxQosMsgPending); 272 writeUint16(ses->maxQosMsgPending);
278 } 273 }
subscriptionstore.cpp
@@ -253,20 +253,23 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt; @@ -253,20 +253,23 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
253 client->getThreadData()->incrementSentMessageCount(count); 253 client->getThreadData()->incrementSentMessageCount(count);
254 } 254 }
255 255
256 -bool SubscriptionStore::sessionPresent(const std::string &clientid) 256 +/**
  257 + * @brief SubscriptionStore::lockSession returns the session if it exists. Returning is done keep the shared pointer active, to
  258 + * avoid race conditions with session removal.
  259 + * @param clientid
  260 + * @return
  261 + */
  262 +std::shared_ptr<Session> SubscriptionStore::lockSession(const std::string &clientid)
257 { 263 {
258 RWLockGuard lock_guard(&subscriptionsRwlock); 264 RWLockGuard lock_guard(&subscriptionsRwlock);
259 lock_guard.rdlock(); 265 lock_guard.rdlock();
260 266
261 - bool result = false;  
262 -  
263 auto it = sessionsByIdConst.find(clientid); 267 auto it = sessionsByIdConst.find(clientid);
264 if (it != sessionsByIdConst.end()) 268 if (it != sessionsByIdConst.end())
265 { 269 {
266 - it->second->touch(); // Touching to avoid a race condition between using the session after this, and it expiring.  
267 - result = true; 270 + return it->second;
268 } 271 }
269 - return result; 272 + return std::shared_ptr<Session>();
270 } 273 }
271 274
272 void SubscriptionStore::sendQueuedWillMessages() 275 void SubscriptionStore::sendQueuedWillMessages()
@@ -612,7 +615,7 @@ void SubscriptionStore::removeExpiredSessionsClients() @@ -612,7 +615,7 @@ void SubscriptionStore::removeExpiredSessionsClients()
612 } 615 }
613 616
614 // A session could have been picked up again, so we have to verify its expiration status. 617 // A session could have been picked up again, so we have to verify its expiration status.
615 - if (session->hasExpired()) 618 + if (!session->hasActiveClient())
616 { 619 {
617 removeSession(session); 620 removeSession(session);
618 } 621 }
@@ -632,6 +635,10 @@ void SubscriptionStore::removeExpiredSessionsClients() @@ -632,6 +635,10 @@ void SubscriptionStore::removeExpiredSessionsClients()
632 } 635 }
633 } 636 }
634 637
  638 +/**
  639 + * @brief SubscriptionStore::queueSessionRemoval places session efficiently in a sorted list that is periodically dequeued.
  640 + * @param session
  641 + */
635 void SubscriptionStore::queueSessionRemoval(const std::shared_ptr<Session> &session) 642 void SubscriptionStore::queueSessionRemoval(const std::shared_ptr<Session> &session)
636 { 643 {
637 if (!session) 644 if (!session)
subscriptionstore.h
@@ -84,6 +84,12 @@ class RetainedMessageNode @@ -84,6 +84,12 @@ class RetainedMessageNode
84 RetainedMessageNode *getChildren(const std::string &subtopic) const; 84 RetainedMessageNode *getChildren(const std::string &subtopic) const;
85 }; 85 };
86 86
  87 +/**
  88 + * @brief A QueuedSessionRemoval is a sort of delayed request for removal. They are kept in a sorted list for fast insertion,
  89 + * and fast dequeueing of expired entries from the start.
  90 + *
  91 + * You can have multiple of these in the pending list. If a client has picked up the session again, the removal is not executed.
  92 + */
87 class QueuedSessionRemoval 93 class QueuedSessionRemoval
88 { 94 {
89 std::weak_ptr<Session> session; 95 std::weak_ptr<Session> session;
@@ -142,7 +148,7 @@ public: @@ -142,7 +148,7 @@ public:
142 void removeSubscription(std::shared_ptr<Client> &client, const std::string &topic); 148 void removeSubscription(std::shared_ptr<Client> &client, const std::string &topic);
143 void registerClientAndKickExistingOne(std::shared_ptr<Client> &client); 149 void registerClientAndKickExistingOne(std::shared_ptr<Client> &client);
144 void registerClientAndKickExistingOne(std::shared_ptr<Client> &client, bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval); 150 void registerClientAndKickExistingOne(std::shared_ptr<Client> &client, bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval);
145 - bool sessionPresent(const std::string &clientid); 151 + std::shared_ptr<Session> lockSession(const std::string &clientid);
146 152
147 void sendQueuedWillMessages(); 153 void sendQueuedWillMessages();
148 void queueWillMessage(std::shared_ptr<Publish> &willMessage, bool forceNow = false); 154 void queueWillMessage(std::shared_ptr<Publish> &willMessage, bool forceNow = false);