Commit 295cf8eeabc557a317b1cb30f102f30a6d16182f

Authored by Wiebe Cazemier
Committed by Wiebe Cazemier
1 parent 6672b8c9

Lock (QoS data of) sessions during copying/saving

The only mutable session data of a client is QoS related, so when we're
copying sessions (for saving them), we need to lock the QoS data,
because that gets modified from active client traffic in worker threads.

Note: not super well tested at this point, nor was I ever able to
trigger actual errors despite long stress testing, so it's a theoretical
fix.
Showing 2 changed files with 13 additions and 2 deletions
session.cpp
@@ -67,6 +67,10 @@ void Session::setSessionTouch(int64_t ageInMs) @@ -67,6 +67,10 @@ void Session::setSessionTouch(int64_t ageInMs)
67 */ 67 */
68 Session::Session(const Session &other) 68 Session::Session(const Session &other)
69 { 69 {
  70 + // Only the QoS data is modified by worker threads (vs (locked) timed events), so it could change during copying, because
  71 + // it gets called from a separate thread.
  72 + std::unique_lock<std::mutex> locker(qosQueueMutex);
  73 +
70 this->username = other.username; 74 this->username = other.username;
71 this->client_id = other.client_id; 75 this->client_id = other.client_id;
72 this->incomingQoS2MessageIds = other.incomingQoS2MessageIds; 76 this->incomingQoS2MessageIds = other.incomingQoS2MessageIds;
@@ -236,17 +240,21 @@ bool Session::hasExpired(int expireAfterSeconds) @@ -236,17 +240,21 @@ bool Session::hasExpired(int expireAfterSeconds)
236 240
237 void Session::addIncomingQoS2MessageId(uint16_t packet_id) 241 void Session::addIncomingQoS2MessageId(uint16_t packet_id)
238 { 242 {
  243 + std::unique_lock<std::mutex> locker(qosQueueMutex);
239 incomingQoS2MessageIds.insert(packet_id); 244 incomingQoS2MessageIds.insert(packet_id);
240 } 245 }
241 246
242 -bool Session::incomingQoS2MessageIdInTransit(uint16_t packet_id) const 247 +bool Session::incomingQoS2MessageIdInTransit(uint16_t packet_id)
243 { 248 {
  249 + std::unique_lock<std::mutex> locker(qosQueueMutex);
244 const auto it = incomingQoS2MessageIds.find(packet_id); 250 const auto it = incomingQoS2MessageIds.find(packet_id);
245 return it != incomingQoS2MessageIds.end(); 251 return it != incomingQoS2MessageIds.end();
246 } 252 }
247 253
248 void Session::removeIncomingQoS2MessageId(u_int16_t packet_id) 254 void Session::removeIncomingQoS2MessageId(u_int16_t packet_id)
249 { 255 {
  256 + std::unique_lock<std::mutex> locker(qosQueueMutex);
  257 +
250 #ifndef NDEBUG 258 #ifndef NDEBUG
251 logger->logf(LOG_DEBUG, "As QoS 2 receiver: publish released (PUBREL) for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, incomingQoS2MessageIds.size()); 259 logger->logf(LOG_DEBUG, "As QoS 2 receiver: publish released (PUBREL) for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, incomingQoS2MessageIds.size());
252 #endif 260 #endif
@@ -258,11 +266,14 @@ void Session::removeIncomingQoS2MessageId(u_int16_t packet_id) @@ -258,11 +266,14 @@ void Session::removeIncomingQoS2MessageId(u_int16_t packet_id)
258 266
259 void Session::addOutgoingQoS2MessageId(uint16_t packet_id) 267 void Session::addOutgoingQoS2MessageId(uint16_t packet_id)
260 { 268 {
  269 + std::unique_lock<std::mutex> locker(qosQueueMutex);
261 outgoingQoS2MessageIds.insert(packet_id); 270 outgoingQoS2MessageIds.insert(packet_id);
262 } 271 }
263 272
264 void Session::removeOutgoingQoS2MessageId(u_int16_t packet_id) 273 void Session::removeOutgoingQoS2MessageId(u_int16_t packet_id)
265 { 274 {
  275 + std::unique_lock<std::mutex> locker(qosQueueMutex);
  276 +
266 #ifndef NDEBUG 277 #ifndef NDEBUG
267 logger->logf(LOG_DEBUG, "As QoS 2 sender: publish complete (PUBCOMP) for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, outgoingQoS2MessageIds.size()); 278 logger->logf(LOG_DEBUG, "As QoS 2 sender: publish complete (PUBCOMP) for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, outgoingQoS2MessageIds.size());
268 #endif 279 #endif
session.h
@@ -77,7 +77,7 @@ public: @@ -77,7 +77,7 @@ public:
77 bool hasExpired(int expireAfterSeconds); 77 bool hasExpired(int expireAfterSeconds);
78 78
79 void addIncomingQoS2MessageId(uint16_t packet_id); 79 void addIncomingQoS2MessageId(uint16_t packet_id);
80 - bool incomingQoS2MessageIdInTransit(uint16_t packet_id) const; 80 + bool incomingQoS2MessageIdInTransit(uint16_t packet_id);
81 void removeIncomingQoS2MessageId(u_int16_t packet_id); 81 void removeIncomingQoS2MessageId(u_int16_t packet_id);
82 82
83 void addOutgoingQoS2MessageId(uint16_t packet_id); 83 void addOutgoingQoS2MessageId(uint16_t packet_id);