Commit 3f3708fd4e81449d0e0005801274018afe2a068d

Authored by Wiebe Cazemier
1 parent f67238c5

Make QoS limits configurable

configfileparser.cpp
@@ -110,6 +110,8 @@ ConfigFileParser::ConfigFileParser(const std::string &path) : @@ -110,6 +110,8 @@ ConfigFileParser::ConfigFileParser(const std::string &path) :
110 validKeys.insert("expire_sessions_after_seconds"); 110 validKeys.insert("expire_sessions_after_seconds");
111 validKeys.insert("thread_count"); 111 validKeys.insert("thread_count");
112 validKeys.insert("storage_dir"); 112 validKeys.insert("storage_dir");
  113 + validKeys.insert("max_qos_msg_pending_per_client");
  114 + validKeys.insert("max_qos_bytes_pending_per_client");
113 115
114 validListenKeys.insert("port"); 116 validListenKeys.insert("port");
115 validListenKeys.insert("protocol"); 117 validListenKeys.insert("protocol");
@@ -439,6 +441,26 @@ void ConfigFileParser::loadFile(bool test) @@ -439,6 +441,26 @@ void ConfigFileParser::loadFile(bool test)
439 } 441 }
440 tmpSettings->threadCount = newVal; 442 tmpSettings->threadCount = newVal;
441 } 443 }
  444 +
  445 + if (key == "max_qos_msg_pending_per_client")
  446 + {
  447 + int newVal = std::stoi(value);
  448 + if (newVal < 32 || newVal > 65530)
  449 + {
  450 + throw ConfigFileException(formatString("max_qos_msg_pending_per_client value '%d' is invalid. Valid values between 32 and 65530.", newVal));
  451 + }
  452 + tmpSettings->maxQosMsgPendingPerClient = newVal;
  453 + }
  454 +
  455 + if (key == "max_qos_bytes_pending_per_client")
  456 + {
  457 + int newVal = std::stoi(value);
  458 + if (newVal < 4096)
  459 + {
  460 + throw ConfigFileException(formatString("max_qos_bytes_pending_per_client value '%d' is invalid. Valid values are 4096 or higher.", newVal));
  461 + }
  462 + tmpSettings->maxQosBytesPendingPerClient = newVal;
  463 + }
442 } 464 }
443 } 465 }
444 catch (std::invalid_argument &ex) // catch for the stoi() 466 catch (std::invalid_argument &ex) // catch for the stoi()
mainapp.cpp
@@ -644,6 +644,8 @@ void MainApp::loadConfig() @@ -644,6 +644,8 @@ void MainApp::loadConfig()
644 confFileParser->loadFile(true); 644 confFileParser->loadFile(true);
645 confFileParser->loadFile(false); 645 confFileParser->loadFile(false);
646 settings = confFileParser->moveSettings(); 646 settings = confFileParser->moveSettings();
  647 + settingsLocalCopy = *settings.get();
  648 + ThreadAuth::assignSettings(&settingsLocalCopy);
647 649
648 if (settings->listeners.empty()) 650 if (settings->listeners.empty())
649 { 651 {
mainapp.h
@@ -61,7 +61,11 @@ class MainApp @@ -61,7 +61,11 @@ class MainApp
61 int taskEventFd = -1; 61 int taskEventFd = -1;
62 std::mutex eventMutex; 62 std::mutex eventMutex;
63 Timer timer; 63 Timer timer;
  64 +
  65 + // We need to keep a settings copy as well as a shared pointer, depending on threads, queueing of config reloads, etc.
64 std::shared_ptr<Settings> settings; 66 std::shared_ptr<Settings> settings;
  67 + Settings settingsLocalCopy;
  68 +
65 std::list<std::shared_ptr<Listener>> listeners; 69 std::list<std::shared_ptr<Listener>> listeners;
66 std::mutex quitMutex; 70 std::mutex quitMutex;
67 std::string fuzzFilePath; 71 std::string fuzzFilePath;
session.cpp
@@ -23,7 +23,9 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -23,7 +23,9 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
23 23
24 std::chrono::time_point<std::chrono::steady_clock> appStartTime = std::chrono::steady_clock::now(); 24 std::chrono::time_point<std::chrono::steady_clock> appStartTime = std::chrono::steady_clock::now();
25 25
26 -Session::Session() 26 +Session::Session() :
  27 + maxQosMsgPending(ThreadAuth::getSettings()->maxQosMsgPendingPerClient),
  28 + maxQosBytesPending(ThreadAuth::getSettings()->maxQosBytesPendingPerClient)
27 { 29 {
28 30
29 } 31 }
@@ -181,7 +183,7 @@ void Session::writePacket(MqttPacket &amp;packet, char max_qos, std::shared_ptr&lt;Mqtt @@ -181,7 +183,7 @@ void Session::writePacket(MqttPacket &amp;packet, char max_qos, std::shared_ptr&lt;Mqtt
181 std::unique_lock<std::mutex> locker(qosQueueMutex); 183 std::unique_lock<std::mutex> locker(qosQueueMutex);
182 184
183 const size_t totalQosPacketsInTransit = qosPacketQueue.size() + incomingQoS2MessageIds.size() + outgoingQoS2MessageIds.size(); 185 const size_t totalQosPacketsInTransit = qosPacketQueue.size() + incomingQoS2MessageIds.size() + outgoingQoS2MessageIds.size();
184 - if (totalQosPacketsInTransit >= MAX_QOS_MSG_PENDING_PER_CLIENT || (qosPacketQueue.getByteSize() >= MAX_QOS_BYTES_PENDING_PER_CLIENT && qosPacketQueue.size() > 0)) 186 + if (totalQosPacketsInTransit >= maxQosMsgPending || (qosPacketQueue.getByteSize() >= maxQosBytesPending && qosPacketQueue.size() > 0))
185 { 187 {
186 if (QoSLogPrintedAtId != nextPacketId) 188 if (QoSLogPrintedAtId != nextPacketId)
187 { 189 {
session.h
@@ -28,10 +28,6 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -28,10 +28,6 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
28 #include "sessionsandsubscriptionsdb.h" 28 #include "sessionsandsubscriptionsdb.h"
29 #include "qospacketqueue.h" 29 #include "qospacketqueue.h"
30 30
31 -// TODO make settings. But, num of packets can't exceed 65536, because the counter is 16 bit.  
32 -#define MAX_QOS_MSG_PENDING_PER_CLIENT 32  
33 -#define MAX_QOS_BYTES_PENDING_PER_CLIENT 4096  
34 -  
35 class Session 31 class Session
36 { 32 {
37 #ifdef TESTING 33 #ifdef TESTING
@@ -52,6 +48,9 @@ class Session @@ -52,6 +48,9 @@ class Session
52 uint16_t QoSLogPrintedAtId = 0; 48 uint16_t QoSLogPrintedAtId = 0;
53 std::chrono::time_point<std::chrono::steady_clock> lastTouched = std::chrono::steady_clock::now(); 49 std::chrono::time_point<std::chrono::steady_clock> lastTouched = std::chrono::steady_clock::now();
54 Logger *logger = Logger::getInstance(); 50 Logger *logger = Logger::getInstance();
  51 + const size_t maxQosMsgPending = 0;
  52 + const size_t maxQosBytesPending = 0;
  53 +
55 int64_t getSessionRelativeAgeInMs() const; 54 int64_t getSessionRelativeAgeInMs() const;
56 void setSessionTouch(int64_t ageInMs); 55 void setSessionTouch(int64_t ageInMs);
57 bool requiresPacketRetransmission() const; 56 bool requiresPacketRetransmission() const;
settings.h
@@ -56,6 +56,8 @@ public: @@ -56,6 +56,8 @@ public:
56 int authPluginTimerPeriod = 60; 56 int authPluginTimerPeriod = 60;
57 std::string storageDir; 57 std::string storageDir;
58 int threadCount = 0; 58 int threadCount = 0;
  59 + int maxQosMsgPendingPerClient = 512;
  60 + int maxQosBytesPendingPerClient = 65536;
59 std::list<std::shared_ptr<Listener>> listeners; // Default one is created later, when none are defined. 61 std::list<std::shared_ptr<Listener>> listeners; // Default one is created later, when none are defined.
60 62
61 AuthOptCompatWrap &getAuthOptsCompat(); 63 AuthOptCompatWrap &getAuthOptsCompat();
threadauth.cpp
1 #include "threadauth.h" 1 #include "threadauth.h"
2 2
3 thread_local Authentication *ThreadAuth::auth = nullptr; 3 thread_local Authentication *ThreadAuth::auth = nullptr;
  4 +thread_local ThreadData *ThreadAuth::threadData = nullptr;
  5 +thread_local Settings *ThreadAuth::settings = nullptr;
4 6
5 void ThreadAuth::assign(Authentication *auth) 7 void ThreadAuth::assign(Authentication *auth)
6 { 8 {
@@ -11,3 +13,23 @@ Authentication *ThreadAuth::getAuth() @@ -11,3 +13,23 @@ Authentication *ThreadAuth::getAuth()
11 { 13 {
12 return auth; 14 return auth;
13 } 15 }
  16 +
  17 +void ThreadAuth::assignThreadData(ThreadData *threadData)
  18 +{
  19 + ThreadAuth::threadData = threadData;
  20 +}
  21 +
  22 +ThreadData *ThreadAuth::getThreadData()
  23 +{
  24 + return threadData;
  25 +}
  26 +
  27 +void ThreadAuth::assignSettings(Settings *settings)
  28 +{
  29 + ThreadAuth::settings = settings;
  30 +}
  31 +
  32 +Settings *ThreadAuth::getSettings()
  33 +{
  34 + return settings;
  35 +}
threadauth.h
1 #ifndef THREADAUTH_H 1 #ifndef THREADAUTH_H
2 #define THREADAUTH_H 2 #define THREADAUTH_H
3 3
  4 +#include "forward_declarations.h"
  5 +
4 class Authentication; 6 class Authentication;
5 7
  8 +// TODO: rename, this is no longer just auth, but thread local globals.
6 class ThreadAuth 9 class ThreadAuth
7 { 10 {
8 static thread_local Authentication *auth; 11 static thread_local Authentication *auth;
  12 + static thread_local ThreadData *threadData;
  13 + static thread_local Settings *settings;
9 public: 14 public:
10 static void assign(Authentication *auth); 15 static void assign(Authentication *auth);
11 static Authentication *getAuth(); 16 static Authentication *getAuth();
  17 +
  18 + static void assignThreadData(ThreadData *threadData);
  19 + static ThreadData *getThreadData();
  20 +
  21 + static void assignSettings(Settings *settings);
  22 + static Settings *getSettings();
12 }; 23 };
13 24
14 #endif // THREADAUTH_H 25 #endif // THREADAUTH_H
threadloop.cpp
@@ -21,6 +21,8 @@ void do_thread_work(ThreadData *threadData) @@ -21,6 +21,8 @@ void do_thread_work(ThreadData *threadData)
21 { 21 {
22 int epoll_fd = threadData->epollfd; 22 int epoll_fd = threadData->epollfd;
23 ThreadAuth::assign(&threadData->authentication); 23 ThreadAuth::assign(&threadData->authentication);
  24 + ThreadAuth::assignThreadData(threadData);
  25 + ThreadAuth::assignSettings(&threadData->settingsLocalCopy);
24 26
25 struct epoll_event events[MAX_EVENTS]; 27 struct epoll_event events[MAX_EVENTS];
26 memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS); 28 memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS);