From d0333fb6092a291fa3dd114fa6a1dcf6f47c251b Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Sun, 6 Jun 2021 09:46:35 +0200 Subject: [PATCH] Add configurable session expiration --- configfileparser.cpp | 11 +++++++++++ mainapp.cpp | 11 ++++++++--- session.cpp | 18 ++++++++++++------ session.h | 8 +++----- settings.h | 1 + subscriptionstore.cpp | 6 ++---- subscriptionstore.h | 2 +- timer.cpp | 2 +- 8 files changed, 39 insertions(+), 20 deletions(-) diff --git a/configfileparser.cpp b/configfileparser.cpp index f68f475..da46935 100644 --- a/configfileparser.cpp +++ b/configfileparser.cpp @@ -105,6 +105,7 @@ ConfigFileParser::ConfigFileParser(const std::string &path) : validKeys.insert("mosquitto_acl_file"); validKeys.insert("allow_anonymous"); validKeys.insert("rlimit_nofile"); + validKeys.insert("expire_sessions_after_seconds"); validListenKeys.insert("port"); validListenKeys.insert("protocol"); @@ -390,6 +391,16 @@ void ConfigFileParser::loadFile(bool test) } tmpSettings->rlimitNoFile = newVal; } + + if (key == "expire_sessions_after_seconds") + { + int64_t newVal = std::stoi(value); + if (newVal < 0 || (newVal > 0 && newVal <= 300)) // 0 means disable + { + throw ConfigFileException(formatString("expire_sessions_after_seconds value '%d' is invalid. Valid values are 0, or 300 or higher.", newVal)); + } + tmpSettings->expireSessionsAfterSeconds = newVal; + } } } catch (std::invalid_argument &ex) // catch for the stoi() diff --git a/mainapp.cpp b/mainapp.cpp index 2b00eae..4e1a9a8 100644 --- a/mainapp.cpp +++ b/mainapp.cpp @@ -179,8 +179,13 @@ MainApp::MainApp(const std::string &configFilePath) : // TODO: override in conf possibility. logger->logf(LOG_NOTICE, "%d CPUs are detected, making as many threads.", num_threads); - auto f = std::bind(&MainApp::queueCleanup, this); - timer.addCallback(f, 86400000, "session expiration"); + if (settings->expireSessionsAfterSeconds > 0) + { + auto f = std::bind(&MainApp::queueCleanup, this); + const uint64_t derrivedSessionCheckInterval = std::max((settings->expireSessionsAfterSeconds)*1000*2, 600000); + const uint64_t sessionCheckInterval = std::min(derrivedSessionCheckInterval, 86400000); + timer.addCallback(f, sessionCheckInterval, "session expiration"); + } auto fKeepAlive = std::bind(&MainApp::queueKeepAliveCheckAtAllThreads, this); timer.addCallback(fKeepAlive, 30000, "keep-alive check"); @@ -726,7 +731,7 @@ void MainApp::queueCleanup() { std::lock_guard locker(eventMutex); - auto f = std::bind(&SubscriptionStore::removeExpiredSessionsClients, subscriptionStore.get()); + auto f = std::bind(&SubscriptionStore::removeExpiredSessionsClients, subscriptionStore.get(), settings->expireSessionsAfterSeconds); taskQueue.push_front(f); wakeUpThread(); diff --git a/session.cpp b/session.cpp index 10e57a3..421b5a3 100644 --- a/session.cpp +++ b/session.cpp @@ -164,17 +164,23 @@ uint64_t Session::sendPendingQosMessages() return count; } -std::chrono::time_point zeroTime; -std::chrono::seconds expireAfter(EXPIRE_SESSION_AFTER); - -void Session::touch(std::chrono::time_point val) +/** + * @brief Session::touch with a time value allowed touching without causing another sys/lib call to get the time. + * @param newval + */ +void Session::touch(std::chrono::time_point newval) { - std::chrono::time_point newval = val > zeroTime ? val : std::chrono::steady_clock::now(); lastTouched = newval; } -bool Session::hasExpired() +void Session::touch() +{ + lastTouched = std::chrono::steady_clock::now(); +} + +bool Session::hasExpired(int expireAfterSeconds) { + std::chrono::seconds expireAfter(expireAfterSeconds); std::chrono::time_point now = std::chrono::steady_clock::now(); return clientDisconnected() && (lastTouched + expireAfter) < now; } diff --git a/session.h b/session.h index b9742f2..ad6d05c 100644 --- a/session.h +++ b/session.h @@ -30,9 +30,6 @@ License along with FlashMQ. If not, see . #define MAX_QOS_MSG_PENDING_PER_CLIENT 32 #define MAX_QOS_BYTES_PENDING_PER_CLIENT 4096 -// TODO make setting -#define EXPIRE_SESSION_AFTER 1209600 - struct QueuedQosPacket { uint16_t id; @@ -67,8 +64,9 @@ public: void writePacket(const MqttPacket &packet, char max_qos, uint64_t &count); void clearQosMessage(uint16_t packet_id); uint64_t sendPendingQosMessages(); - void touch(std::chrono::time_point val = std::chrono::time_point()); - bool hasExpired(); + void touch(std::chrono::time_point val); + void touch(); + bool hasExpired(int expireAfterSeconds); void addIncomingQoS2MessageId(uint16_t packet_id); bool incomingQoS2MessageIdInTransit(uint16_t packet_id) const; diff --git a/settings.h b/settings.h index 006a5d5..525508e 100644 --- a/settings.h +++ b/settings.h @@ -46,6 +46,7 @@ public: std::string mosquittoAclFile; bool allowAnonymous = false; int rlimitNoFile = 1000000; + uint64_t expireSessionsAfterSeconds = 1209600; std::list> listeners; // Default one is created later, when none are defined. AuthOptCompatWrap &getAuthOptsCompat(); diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 9243dd3..97579f3 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -417,7 +417,7 @@ int SubscriptionNode::cleanSubscriptions() } // This is not MQTT compliant, but the standard doesn't keep real world constraints into account. -void SubscriptionStore::removeExpiredSessionsClients() +void SubscriptionStore::removeExpiredSessionsClients(int expireSessionsAfterSeconds) { RWLockGuard lock_guard(&subscriptionsRwlock); lock_guard.wrlock(); @@ -429,11 +429,9 @@ void SubscriptionStore::removeExpiredSessionsClients() { std::shared_ptr &session = session_it->second; - if (session->hasExpired()) + if (session->hasExpired(expireSessionsAfterSeconds)) { -#ifndef NDEBUG logger->logf(LOG_DEBUG, "Removing expired session from store %s", session->getClientId().c_str()); -#endif session_it = sessionsById.erase(session_it); } else diff --git a/subscriptionstore.h b/subscriptionstore.h index 6f4eeba..e1bd106 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -99,7 +99,7 @@ public: void setRetainedMessage(const std::string &topic, const std::string &payload, char qos); - void removeExpiredSessionsClients(); + void removeExpiredSessionsClients(int expireSessionsAfterSeconds); }; #endif // SUBSCRIPTIONSTORE_H diff --git a/timer.cpp b/timer.cpp index 92add13..bff7e9f 100644 --- a/timer.cpp +++ b/timer.cpp @@ -83,7 +83,7 @@ void Timer::stop() void Timer::addCallback(std::function f, uint64_t interval_ms, const std::string &name) { - logger->logf(LOG_DEBUG, "Adding event '%s' to the timer.", name.c_str()); + logger->logf(LOG_DEBUG, "Adding event '%s' to the timer with an interval of %ld ms.", name.c_str(), interval_ms); CallbackEntry c; c.f = f; -- libgit2 0.21.4