Commit acce8dfa21ad617ba788704d1b737e60b58e53a5

Authored by Wiebe Cazemier
1 parent 681eeb82

Get fresh QoS limits per write

This is an (insignificant) amount slower, but otherwise existing
sessions won't get the new limits when reloading the config.
session.cpp
@@ -23,9 +23,7 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>. @@ -23,9 +23,7 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>.
23 23
24 std::chrono::time_point<std::chrono::steady_clock> appStartTime = std::chrono::steady_clock::now(); 24 std::chrono::time_point<std::chrono::steady_clock> appStartTime = std::chrono::steady_clock::now();
25 25
26 -Session::Session() :  
27 - maxQosMsgPending(ThreadGlobals::getSettings()->maxQosMsgPendingPerClient),  
28 - maxQosBytesPending(ThreadGlobals::getSettings()->maxQosBytesPendingPerClient) 26 +Session::Session()
29 { 27 {
30 28
31 } 29 }
@@ -152,6 +150,8 @@ void Session::writePacket(MqttPacket &amp;packet, char max_qos, std::shared_ptr&lt;Mqtt @@ -152,6 +150,8 @@ void Session::writePacket(MqttPacket &amp;packet, char max_qos, std::shared_ptr&lt;Mqtt
152 assert(max_qos <= 2); 150 assert(max_qos <= 2);
153 const char effectiveQos = std::min<char>(packet.getQos(), max_qos); 151 const char effectiveQos = std::min<char>(packet.getQos(), max_qos);
154 152
  153 + const Settings *settings = ThreadGlobals::getSettings();
  154 +
155 Authentication *_auth = ThreadGlobals::getAuth(); 155 Authentication *_auth = ThreadGlobals::getAuth();
156 assert(_auth); 156 assert(_auth);
157 Authentication &auth = *_auth; 157 Authentication &auth = *_auth;
@@ -183,7 +183,8 @@ void Session::writePacket(MqttPacket &amp;packet, char max_qos, std::shared_ptr&lt;Mqtt @@ -183,7 +183,8 @@ void Session::writePacket(MqttPacket &amp;packet, char max_qos, std::shared_ptr&lt;Mqtt
183 std::unique_lock<std::mutex> locker(qosQueueMutex); 183 std::unique_lock<std::mutex> locker(qosQueueMutex);
184 184
185 const size_t totalQosPacketsInTransit = qosPacketQueue.size() + incomingQoS2MessageIds.size() + outgoingQoS2MessageIds.size(); 185 const size_t totalQosPacketsInTransit = qosPacketQueue.size() + incomingQoS2MessageIds.size() + outgoingQoS2MessageIds.size();
186 - if (totalQosPacketsInTransit >= maxQosMsgPending || (qosPacketQueue.getByteSize() >= maxQosBytesPending && qosPacketQueue.size() > 0)) 186 + if (totalQosPacketsInTransit >= settings->maxQosMsgPendingPerClient
  187 + || (qosPacketQueue.getByteSize() >= settings->maxQosBytesPendingPerClient && qosPacketQueue.size() > 0))
187 { 188 {
188 if (QoSLogPrintedAtId != nextPacketId) 189 if (QoSLogPrintedAtId != nextPacketId)
189 { 190 {
session.h
@@ -48,8 +48,6 @@ class Session @@ -48,8 +48,6 @@ class Session
48 uint16_t QoSLogPrintedAtId = 0; 48 uint16_t QoSLogPrintedAtId = 0;
49 std::chrono::time_point<std::chrono::steady_clock> lastTouched = std::chrono::steady_clock::now(); 49 std::chrono::time_point<std::chrono::steady_clock> lastTouched = std::chrono::steady_clock::now();
50 Logger *logger = Logger::getInstance(); 50 Logger *logger = Logger::getInstance();
51 - const size_t maxQosMsgPending = 0;  
52 - const size_t maxQosBytesPending = 0;  
53 51
54 int64_t getSessionRelativeAgeInMs() const; 52 int64_t getSessionRelativeAgeInMs() const;
55 void setSessionTouch(int64_t ageInMs); 53 void setSessionTouch(int64_t ageInMs);
settings.h
@@ -56,8 +56,8 @@ public: @@ -56,8 +56,8 @@ public:
56 int authPluginTimerPeriod = 60; 56 int authPluginTimerPeriod = 60;
57 std::string storageDir; 57 std::string storageDir;
58 int threadCount = 0; 58 int threadCount = 0;
59 - int maxQosMsgPendingPerClient = 512;  
60 - int maxQosBytesPendingPerClient = 65536; 59 + uint maxQosMsgPendingPerClient = 512;
  60 + uint maxQosBytesPendingPerClient = 65536;
61 std::list<std::shared_ptr<Listener>> listeners; // Default one is created later, when none are defined. 61 std::list<std::shared_ptr<Listener>> listeners; // Default one is created later, when none are defined.
62 62
63 AuthOptCompatWrap &getAuthOptsCompat(); 63 AuthOptCompatWrap &getAuthOptsCompat();