Commit d0333fb6092a291fa3dd114fa6a1dcf6f47c251b

Authored by Wiebe Cazemier
1 parent 436a5ffa

Add configurable session expiration

configfileparser.cpp
... ... @@ -105,6 +105,7 @@ ConfigFileParser::ConfigFileParser(const std::string &path) :
105 105 validKeys.insert("mosquitto_acl_file");
106 106 validKeys.insert("allow_anonymous");
107 107 validKeys.insert("rlimit_nofile");
  108 + validKeys.insert("expire_sessions_after_seconds");
108 109  
109 110 validListenKeys.insert("port");
110 111 validListenKeys.insert("protocol");
... ... @@ -390,6 +391,16 @@ void ConfigFileParser::loadFile(bool test)
390 391 }
391 392 tmpSettings->rlimitNoFile = newVal;
392 393 }
  394 +
  395 + if (key == "expire_sessions_after_seconds")
  396 + {
  397 + int64_t newVal = std::stoi(value);
  398 + if (newVal < 0 || (newVal > 0 && newVal <= 300)) // 0 means disable
  399 + {
  400 + throw ConfigFileException(formatString("expire_sessions_after_seconds value '%d' is invalid. Valid values are 0, or 300 or higher.", newVal));
  401 + }
  402 + tmpSettings->expireSessionsAfterSeconds = newVal;
  403 + }
393 404 }
394 405 }
395 406 catch (std::invalid_argument &ex) // catch for the stoi()
... ...
mainapp.cpp
... ... @@ -179,8 +179,13 @@ MainApp::MainApp(const std::string &amp;configFilePath) :
179 179 // TODO: override in conf possibility.
180 180 logger->logf(LOG_NOTICE, "%d CPUs are detected, making as many threads.", num_threads);
181 181  
182   - auto f = std::bind(&MainApp::queueCleanup, this);
183   - timer.addCallback(f, 86400000, "session expiration");
  182 + if (settings->expireSessionsAfterSeconds > 0)
  183 + {
  184 + auto f = std::bind(&MainApp::queueCleanup, this);
  185 + const uint64_t derrivedSessionCheckInterval = std::max<uint64_t>((settings->expireSessionsAfterSeconds)*1000*2, 600000);
  186 + const uint64_t sessionCheckInterval = std::min<uint64_t>(derrivedSessionCheckInterval, 86400000);
  187 + timer.addCallback(f, sessionCheckInterval, "session expiration");
  188 + }
184 189  
185 190 auto fKeepAlive = std::bind(&MainApp::queueKeepAliveCheckAtAllThreads, this);
186 191 timer.addCallback(fKeepAlive, 30000, "keep-alive check");
... ... @@ -726,7 +731,7 @@ void MainApp::queueCleanup()
726 731 {
727 732 std::lock_guard<std::mutex> locker(eventMutex);
728 733  
729   - auto f = std::bind(&SubscriptionStore::removeExpiredSessionsClients, subscriptionStore.get());
  734 + auto f = std::bind(&SubscriptionStore::removeExpiredSessionsClients, subscriptionStore.get(), settings->expireSessionsAfterSeconds);
730 735 taskQueue.push_front(f);
731 736  
732 737 wakeUpThread();
... ...
session.cpp
... ... @@ -164,17 +164,23 @@ uint64_t Session::sendPendingQosMessages()
164 164 return count;
165 165 }
166 166  
167   -std::chrono::time_point<std::chrono::steady_clock> zeroTime;
168   -std::chrono::seconds expireAfter(EXPIRE_SESSION_AFTER);
169   -
170   -void Session::touch(std::chrono::time_point<std::chrono::steady_clock> val)
  167 +/**
  168 + * @brief Session::touch with a time value allowed touching without causing another sys/lib call to get the time.
  169 + * @param newval
  170 + */
  171 +void Session::touch(std::chrono::time_point<std::chrono::steady_clock> newval)
171 172 {
172   - std::chrono::time_point<std::chrono::steady_clock> newval = val > zeroTime ? val : std::chrono::steady_clock::now();
173 173 lastTouched = newval;
174 174 }
175 175  
176   -bool Session::hasExpired()
  176 +void Session::touch()
  177 +{
  178 + lastTouched = std::chrono::steady_clock::now();
  179 +}
  180 +
  181 +bool Session::hasExpired(int expireAfterSeconds)
177 182 {
  183 + std::chrono::seconds expireAfter(expireAfterSeconds);
178 184 std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
179 185 return clientDisconnected() && (lastTouched + expireAfter) < now;
180 186 }
... ...
session.h
... ... @@ -30,9 +30,6 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
30 30 #define MAX_QOS_MSG_PENDING_PER_CLIENT 32
31 31 #define MAX_QOS_BYTES_PENDING_PER_CLIENT 4096
32 32  
33   -// TODO make setting
34   -#define EXPIRE_SESSION_AFTER 1209600
35   -
36 33 struct QueuedQosPacket
37 34 {
38 35 uint16_t id;
... ... @@ -67,8 +64,9 @@ public:
67 64 void writePacket(const MqttPacket &packet, char max_qos, uint64_t &count);
68 65 void clearQosMessage(uint16_t packet_id);
69 66 uint64_t sendPendingQosMessages();
70   - void touch(std::chrono::time_point<std::chrono::steady_clock> val = std::chrono::time_point<std::chrono::steady_clock>());
71   - bool hasExpired();
  67 + void touch(std::chrono::time_point<std::chrono::steady_clock> val);
  68 + void touch();
  69 + bool hasExpired(int expireAfterSeconds);
72 70  
73 71 void addIncomingQoS2MessageId(uint16_t packet_id);
74 72 bool incomingQoS2MessageIdInTransit(uint16_t packet_id) const;
... ...
settings.h
... ... @@ -46,6 +46,7 @@ public:
46 46 std::string mosquittoAclFile;
47 47 bool allowAnonymous = false;
48 48 int rlimitNoFile = 1000000;
  49 + uint64_t expireSessionsAfterSeconds = 1209600;
49 50 std::list<std::shared_ptr<Listener>> listeners; // Default one is created later, when none are defined.
50 51  
51 52 AuthOptCompatWrap &getAuthOptsCompat();
... ...
subscriptionstore.cpp
... ... @@ -417,7 +417,7 @@ int SubscriptionNode::cleanSubscriptions()
417 417 }
418 418  
419 419 // This is not MQTT compliant, but the standard doesn't keep real world constraints into account.
420   -void SubscriptionStore::removeExpiredSessionsClients()
  420 +void SubscriptionStore::removeExpiredSessionsClients(int expireSessionsAfterSeconds)
421 421 {
422 422 RWLockGuard lock_guard(&subscriptionsRwlock);
423 423 lock_guard.wrlock();
... ... @@ -429,11 +429,9 @@ void SubscriptionStore::removeExpiredSessionsClients()
429 429 {
430 430 std::shared_ptr<Session> &session = session_it->second;
431 431  
432   - if (session->hasExpired())
  432 + if (session->hasExpired(expireSessionsAfterSeconds))
433 433 {
434   -#ifndef NDEBUG
435 434 logger->logf(LOG_DEBUG, "Removing expired session from store %s", session->getClientId().c_str());
436   -#endif
437 435 session_it = sessionsById.erase(session_it);
438 436 }
439 437 else
... ...
subscriptionstore.h
... ... @@ -99,7 +99,7 @@ public:
99 99  
100 100 void setRetainedMessage(const std::string &topic, const std::string &payload, char qos);
101 101  
102   - void removeExpiredSessionsClients();
  102 + void removeExpiredSessionsClients(int expireSessionsAfterSeconds);
103 103 };
104 104  
105 105 #endif // SUBSCRIPTIONSTORE_H
... ...
timer.cpp
... ... @@ -83,7 +83,7 @@ void Timer::stop()
83 83  
84 84 void Timer::addCallback(std::function<void ()> f, uint64_t interval_ms, const std::string &name)
85 85 {
86   - logger->logf(LOG_DEBUG, "Adding event '%s' to the timer.", name.c_str());
  86 + logger->logf(LOG_DEBUG, "Adding event '%s' to the timer with an interval of %ld ms.", name.c_str(), interval_ms);
87 87  
88 88 CallbackEntry c;
89 89 c.f = f;
... ...