Commit 3aecd638914b985c4a8372cdbeceb3807cdd2fec

Authored by Wiebe Cazemier
1 parent 81986902

Queue wills with second granularity

There's no point in keeping a vector per nanosecond.
subscriptionstore.cpp
@@ -298,14 +298,15 @@ std::shared_ptr<Session> SubscriptionStore::lockSession(const std::string &clien @@ -298,14 +298,15 @@ std::shared_ptr<Session> SubscriptionStore::lockSession(const std::string &clien
298 void SubscriptionStore::sendQueuedWillMessages() 298 void SubscriptionStore::sendQueuedWillMessages()
299 { 299 {
300 const auto now = std::chrono::steady_clock::now(); 300 const auto now = std::chrono::steady_clock::now();
  301 + const std::chrono::seconds secondsSinceEpoch = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
301 std::lock_guard<std::mutex> locker(this->pendingWillsMutex); 302 std::lock_guard<std::mutex> locker(this->pendingWillsMutex);
302 303
303 auto it = pendingWillMessages.begin(); 304 auto it = pendingWillMessages.begin();
304 while (it != pendingWillMessages.end()) 305 while (it != pendingWillMessages.end())
305 { 306 {
306 - const std::chrono::time_point<std::chrono::steady_clock> &sendAt = it->first; 307 + const std::chrono::seconds &sendAt = it->first;
307 308
308 - if (sendAt > now) 309 + if (sendAt > secondsSinceEpoch)
309 break; 310 break;
310 311
311 std::vector<QueuedWill> &willsOfSlot = it->second; 312 std::vector<QueuedWill> &willsOfSlot = it->second;
@@ -373,9 +374,10 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr&lt;WillPublish&gt; &amp;wil @@ -373,9 +374,10 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr&lt;WillPublish&gt; &amp;wil
373 374
374 QueuedWill queuedWill(willMessage, session); 375 QueuedWill queuedWill(willMessage, session);
375 const std::chrono::time_point<std::chrono::steady_clock> sendWillAt = std::chrono::steady_clock::now() + std::chrono::seconds(willMessage->will_delay); 376 const std::chrono::time_point<std::chrono::steady_clock> sendWillAt = std::chrono::steady_clock::now() + std::chrono::seconds(willMessage->will_delay);
  377 + std::chrono::seconds secondsSinceEpoch = std::chrono::duration_cast<std::chrono::seconds>(sendWillAt.time_since_epoch());
376 378
377 std::lock_guard<std::mutex> locker(this->pendingWillsMutex); 379 std::lock_guard<std::mutex> locker(this->pendingWillsMutex);
378 - this->pendingWillMessages[sendWillAt].push_back(queuedWill); 380 + this->pendingWillMessages[secondsSinceEpoch].push_back(queuedWill);
379 } 381 }
380 382
381 void SubscriptionStore::publishNonRecursively(const std::unordered_map<std::string, Subscription> &subscribers, 383 void SubscriptionStore::publishNonRecursively(const std::unordered_map<std::string, Subscription> &subscribers,
subscriptionstore.h
@@ -119,7 +119,7 @@ class SubscriptionStore @@ -119,7 +119,7 @@ class SubscriptionStore
119 int64_t retainedMessageCount = 0; 119 int64_t retainedMessageCount = 0;
120 120
121 std::mutex pendingWillsMutex; 121 std::mutex pendingWillsMutex;
122 - std::map<std::chrono::time_point<std::chrono::steady_clock>, std::vector<QueuedWill>> pendingWillMessages; 122 + std::map<std::chrono::seconds, std::vector<QueuedWill>> pendingWillMessages;
123 123
124 std::chrono::time_point<std::chrono::steady_clock> lastTreeCleanup; 124 std::chrono::time_point<std::chrono::steady_clock> lastTreeCleanup;
125 125