Commit 3f53ee96a1e9915b326b3adb9a39d5b1009455cc
1 parent
ebe7c845
Fix resetting session expire timeout on reloading state
I didn't count the seconds it was already waiting. It does now.
Showing
4 changed files
with
30 additions
and
1 deletions
session.cpp
| @@ -72,6 +72,11 @@ Session::Session(const Session &other) | @@ -72,6 +72,11 @@ Session::Session(const Session &other) | ||
| 72 | this->nextPacketId = other.nextPacketId; | 72 | this->nextPacketId = other.nextPacketId; |
| 73 | this->sessionExpiryInterval = other.sessionExpiryInterval; | 73 | this->sessionExpiryInterval = other.sessionExpiryInterval; |
| 74 | this->willPublish = other.willPublish; | 74 | this->willPublish = other.willPublish; |
| 75 | + this->removalQueued = other.removalQueued; | ||
| 76 | + this->removalQueuedAt = other.removalQueuedAt; | ||
| 77 | + | ||
| 78 | + | ||
| 79 | + // TODO: perhaps this copy constructor is nonsense now. | ||
| 75 | 80 | ||
| 76 | // TODO: see git history for a change here. We now copy the whole queued publish. Do we want to address that? | 81 | // TODO: see git history for a change here. We now copy the whole queued publish. Do we want to address that? |
| 77 | this->qosPacketQueue = other.qosPacketQueue; | 82 | this->qosPacketQueue = other.qosPacketQueue; |
| @@ -107,6 +112,7 @@ void Session::assignActiveConnection(std::shared_ptr<Client> &client) | @@ -107,6 +112,7 @@ void Session::assignActiveConnection(std::shared_ptr<Client> &client) | ||
| 107 | this->client_id = client->getClientId(); | 112 | this->client_id = client->getClientId(); |
| 108 | this->username = client->getUsername(); | 113 | this->username = client->getUsername(); |
| 109 | this->willPublish = client->getWill(); | 114 | this->willPublish = client->getWill(); |
| 115 | + this->removalQueued = false; | ||
| 110 | } | 116 | } |
| 111 | 117 | ||
| 112 | /** | 118 | /** |
| @@ -371,8 +377,25 @@ void Session::setSessionExpiryInterval(uint32_t newVal) | @@ -371,8 +377,25 @@ void Session::setSessionExpiryInterval(uint32_t newVal) | ||
| 371 | this->sessionExpiryInterval = newVal; | 377 | this->sessionExpiryInterval = newVal; |
| 372 | } | 378 | } |
| 373 | 379 | ||
| 380 | +void Session::setQueuedRemovalAt() | ||
| 381 | +{ | ||
| 382 | + this->removalQueuedAt = std::chrono::steady_clock::now(); | ||
| 383 | + this->removalQueued = true; | ||
| 384 | +} | ||
| 385 | + | ||
| 374 | uint32_t Session::getSessionExpiryInterval() const | 386 | uint32_t Session::getSessionExpiryInterval() const |
| 375 | { | 387 | { |
| 376 | return this->sessionExpiryInterval; | 388 | return this->sessionExpiryInterval; |
| 377 | } | 389 | } |
| 378 | 390 | ||
| 391 | +uint32_t Session::getCurrentSessionExpiryInterval() const | ||
| 392 | +{ | ||
| 393 | + if (!this->removalQueued || hasActiveClient()) | ||
| 394 | + return this->sessionExpiryInterval; | ||
| 395 | + | ||
| 396 | + const std::chrono::seconds age = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - this->removalQueuedAt); | ||
| 397 | + const uint32_t ageInSeconds = age.count(); | ||
| 398 | + const uint32_t result = ageInSeconds <= this->sessionExpiryInterval ? this->sessionExpiryInterval - age.count() : 0; | ||
| 399 | + return result; | ||
| 400 | +} | ||
| 401 | + |
session.h
| @@ -51,6 +51,8 @@ class Session | @@ -51,6 +51,8 @@ class Session | ||
| 51 | uint16_t QoSLogPrintedAtId = 0; | 51 | uint16_t QoSLogPrintedAtId = 0; |
| 52 | bool destroyOnDisconnect = false; | 52 | bool destroyOnDisconnect = false; |
| 53 | std::shared_ptr<WillPublish> willPublish; | 53 | std::shared_ptr<WillPublish> willPublish; |
| 54 | + bool removalQueued = false; | ||
| 55 | + std::chrono::time_point<std::chrono::steady_clock> removalQueuedAt; | ||
| 54 | Logger *logger = Logger::getInstance(); | 56 | Logger *logger = Logger::getInstance(); |
| 55 | 57 | ||
| 56 | bool requiresPacketRetransmission() const; | 58 | bool requiresPacketRetransmission() const; |
| @@ -87,7 +89,9 @@ public: | @@ -87,7 +89,9 @@ public: | ||
| 87 | 89 | ||
| 88 | void setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version); | 90 | void setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version); |
| 89 | void setSessionExpiryInterval(uint32_t newVal); | 91 | void setSessionExpiryInterval(uint32_t newVal); |
| 92 | + void setQueuedRemovalAt(); | ||
| 90 | uint32_t getSessionExpiryInterval() const; | 93 | uint32_t getSessionExpiryInterval() const; |
| 94 | + uint32_t getCurrentSessionExpiryInterval() const; | ||
| 91 | }; | 95 | }; |
| 92 | 96 | ||
| 93 | #endif // SESSION_H | 97 | #endif // SESSION_H |
sessionsandsubscriptionsdb.cpp
| @@ -313,7 +313,7 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector<std::unique_ptr<Sess | @@ -313,7 +313,7 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector<std::unique_ptr<Sess | ||
| 313 | logger->logf(LOG_DEBUG, "Writing next packetid %d.", ses->nextPacketId); | 313 | logger->logf(LOG_DEBUG, "Writing next packetid %d.", ses->nextPacketId); |
| 314 | writeUint16(ses->nextPacketId); | 314 | writeUint16(ses->nextPacketId); |
| 315 | 315 | ||
| 316 | - writeUint32(ses->sessionExpiryInterval); | 316 | + writeUint32(ses->getCurrentSessionExpiryInterval()); |
| 317 | writeUint16(ses->maxQosMsgPending); | 317 | writeUint16(ses->maxQosMsgPending); |
| 318 | 318 | ||
| 319 | const bool hasWillThatShouldSurviveRestart = ses->getWill().operator bool() && ses->getWill()->will_delay > 0; | 319 | const bool hasWillThatShouldSurviveRestart = ses->getWill().operator bool() && ses->getWill()->will_delay > 0; |
subscriptionstore.cpp
| @@ -695,6 +695,8 @@ void SubscriptionStore::queueSessionRemoval(const std::shared_ptr<Session> &sess | @@ -695,6 +695,8 @@ void SubscriptionStore::queueSessionRemoval(const std::shared_ptr<Session> &sess | ||
| 695 | return a.getExpiresAt() < b.getExpiresAt(); | 695 | return a.getExpiresAt() < b.getExpiresAt(); |
| 696 | }; | 696 | }; |
| 697 | 697 | ||
| 698 | + session->setQueuedRemovalAt(); | ||
| 699 | + | ||
| 698 | std::lock_guard<std::mutex>(this->queuedSessionRemovalsMutex); | 700 | std::lock_guard<std::mutex>(this->queuedSessionRemovalsMutex); |
| 699 | auto pos = std::upper_bound(this->queuedSessionRemovals.begin(), this->queuedSessionRemovals.end(), qsr, comp); | 701 | auto pos = std::upper_bound(this->queuedSessionRemovals.begin(), this->queuedSessionRemovals.end(), qsr, comp); |
| 700 | this->queuedSessionRemovals.insert(pos, qsr); | 702 | this->queuedSessionRemovals.insert(pos, qsr); |