From 295cf8eeabc557a317b1cb30f102f30a6d16182f Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Wed, 29 Sep 2021 11:43:58 +0200 Subject: [PATCH] Lock (QoS data of) sessions during copying/saving --- session.cpp | 13 ++++++++++++- session.h | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/session.cpp b/session.cpp index d926494..8d6f45e 100644 --- a/session.cpp +++ b/session.cpp @@ -67,6 +67,10 @@ void Session::setSessionTouch(int64_t ageInMs) */ Session::Session(const Session &other) { + // Only the QoS data is modified by worker threads (vs (locked) timed events), so it could change during copying, because + // it gets called from a separate thread. + std::unique_lock locker(qosQueueMutex); + this->username = other.username; this->client_id = other.client_id; this->incomingQoS2MessageIds = other.incomingQoS2MessageIds; @@ -236,17 +240,21 @@ bool Session::hasExpired(int expireAfterSeconds) void Session::addIncomingQoS2MessageId(uint16_t packet_id) { + std::unique_lock locker(qosQueueMutex); incomingQoS2MessageIds.insert(packet_id); } -bool Session::incomingQoS2MessageIdInTransit(uint16_t packet_id) const +bool Session::incomingQoS2MessageIdInTransit(uint16_t packet_id) { + std::unique_lock locker(qosQueueMutex); const auto it = incomingQoS2MessageIds.find(packet_id); return it != incomingQoS2MessageIds.end(); } void Session::removeIncomingQoS2MessageId(u_int16_t packet_id) { + std::unique_lock locker(qosQueueMutex); + #ifndef NDEBUG logger->logf(LOG_DEBUG, "As QoS 2 receiver: publish released (PUBREL) for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, incomingQoS2MessageIds.size()); #endif @@ -258,11 +266,14 @@ void Session::removeIncomingQoS2MessageId(u_int16_t packet_id) void Session::addOutgoingQoS2MessageId(uint16_t packet_id) { + std::unique_lock locker(qosQueueMutex); outgoingQoS2MessageIds.insert(packet_id); } void Session::removeOutgoingQoS2MessageId(u_int16_t packet_id) { + std::unique_lock locker(qosQueueMutex); + #ifndef NDEBUG logger->logf(LOG_DEBUG, "As QoS 2 sender: publish complete (PUBCOMP) for '%s', packet id '%d'. Left in queue: %d", client_id.c_str(), packet_id, outgoingQoS2MessageIds.size()); #endif diff --git a/session.h b/session.h index e713d22..3f2baf5 100644 --- a/session.h +++ b/session.h @@ -77,7 +77,7 @@ public: bool hasExpired(int expireAfterSeconds); void addIncomingQoS2MessageId(uint16_t packet_id); - bool incomingQoS2MessageIdInTransit(uint16_t packet_id) const; + bool incomingQoS2MessageIdInTransit(uint16_t packet_id); void removeIncomingQoS2MessageId(u_int16_t packet_id); void addOutgoingQoS2MessageId(uint16_t packet_id); -- libgit2 0.21.4