diff --git a/configfileparser.cpp b/configfileparser.cpp index c32a1ab..d3f8213 100644 --- a/configfileparser.cpp +++ b/configfileparser.cpp @@ -110,6 +110,8 @@ ConfigFileParser::ConfigFileParser(const std::string &path) : validKeys.insert("expire_sessions_after_seconds"); validKeys.insert("thread_count"); validKeys.insert("storage_dir"); + validKeys.insert("max_qos_msg_pending_per_client"); + validKeys.insert("max_qos_bytes_pending_per_client"); validListenKeys.insert("port"); validListenKeys.insert("protocol"); @@ -439,6 +441,26 @@ void ConfigFileParser::loadFile(bool test) } tmpSettings->threadCount = newVal; } + + if (key == "max_qos_msg_pending_per_client") + { + int newVal = std::stoi(value); + if (newVal < 32 || newVal > 65530) + { + throw ConfigFileException(formatString("max_qos_msg_pending_per_client value '%d' is invalid. Valid values between 32 and 65530.", newVal)); + } + tmpSettings->maxQosMsgPendingPerClient = newVal; + } + + if (key == "max_qos_bytes_pending_per_client") + { + int newVal = std::stoi(value); + if (newVal < 4096) + { + throw ConfigFileException(formatString("max_qos_bytes_pending_per_client value '%d' is invalid. Valid values are 4096 or higher.", newVal)); + } + tmpSettings->maxQosBytesPendingPerClient = newVal; + } } } catch (std::invalid_argument &ex) // catch for the stoi() diff --git a/mainapp.cpp b/mainapp.cpp index 7806fa9..53cccc9 100644 --- a/mainapp.cpp +++ b/mainapp.cpp @@ -644,6 +644,8 @@ void MainApp::loadConfig() confFileParser->loadFile(true); confFileParser->loadFile(false); settings = confFileParser->moveSettings(); + settingsLocalCopy = *settings.get(); + ThreadAuth::assignSettings(&settingsLocalCopy); if (settings->listeners.empty()) { diff --git a/mainapp.h b/mainapp.h index 616e53a..38c6c6b 100644 --- a/mainapp.h +++ b/mainapp.h @@ -61,7 +61,11 @@ class MainApp int taskEventFd = -1; std::mutex eventMutex; Timer timer; + + // We need to keep a settings copy as well as a shared pointer, depending on threads, queueing of config reloads, etc. std::shared_ptr settings; + Settings settingsLocalCopy; + std::list> listeners; std::mutex quitMutex; std::string fuzzFilePath; diff --git a/session.cpp b/session.cpp index 9aee6bb..b441940 100644 --- a/session.cpp +++ b/session.cpp @@ -23,7 +23,9 @@ License along with FlashMQ. If not, see . std::chrono::time_point appStartTime = std::chrono::steady_clock::now(); -Session::Session() +Session::Session() : + maxQosMsgPending(ThreadAuth::getSettings()->maxQosMsgPendingPerClient), + maxQosBytesPending(ThreadAuth::getSettings()->maxQosBytesPendingPerClient) { } @@ -181,7 +183,7 @@ void Session::writePacket(MqttPacket &packet, char max_qos, std::shared_ptr locker(qosQueueMutex); const size_t totalQosPacketsInTransit = qosPacketQueue.size() + incomingQoS2MessageIds.size() + outgoingQoS2MessageIds.size(); - if (totalQosPacketsInTransit >= MAX_QOS_MSG_PENDING_PER_CLIENT || (qosPacketQueue.getByteSize() >= MAX_QOS_BYTES_PENDING_PER_CLIENT && qosPacketQueue.size() > 0)) + if (totalQosPacketsInTransit >= maxQosMsgPending || (qosPacketQueue.getByteSize() >= maxQosBytesPending && qosPacketQueue.size() > 0)) { if (QoSLogPrintedAtId != nextPacketId) { diff --git a/session.h b/session.h index ae75728..0f75b5e 100644 --- a/session.h +++ b/session.h @@ -28,10 +28,6 @@ License along with FlashMQ. If not, see . #include "sessionsandsubscriptionsdb.h" #include "qospacketqueue.h" -// TODO make settings. But, num of packets can't exceed 65536, because the counter is 16 bit. -#define MAX_QOS_MSG_PENDING_PER_CLIENT 32 -#define MAX_QOS_BYTES_PENDING_PER_CLIENT 4096 - class Session { #ifdef TESTING @@ -52,6 +48,9 @@ class Session uint16_t QoSLogPrintedAtId = 0; std::chrono::time_point lastTouched = std::chrono::steady_clock::now(); Logger *logger = Logger::getInstance(); + const size_t maxQosMsgPending = 0; + const size_t maxQosBytesPending = 0; + int64_t getSessionRelativeAgeInMs() const; void setSessionTouch(int64_t ageInMs); bool requiresPacketRetransmission() const; diff --git a/settings.h b/settings.h index 1b323ab..37f9c6a 100644 --- a/settings.h +++ b/settings.h @@ -56,6 +56,8 @@ public: int authPluginTimerPeriod = 60; std::string storageDir; int threadCount = 0; + int maxQosMsgPendingPerClient = 512; + int maxQosBytesPendingPerClient = 65536; std::list> listeners; // Default one is created later, when none are defined. AuthOptCompatWrap &getAuthOptsCompat(); diff --git a/threadauth.cpp b/threadauth.cpp index 6e9d54e..44bda2b 100644 --- a/threadauth.cpp +++ b/threadauth.cpp @@ -1,6 +1,8 @@ #include "threadauth.h" thread_local Authentication *ThreadAuth::auth = nullptr; +thread_local ThreadData *ThreadAuth::threadData = nullptr; +thread_local Settings *ThreadAuth::settings = nullptr; void ThreadAuth::assign(Authentication *auth) { @@ -11,3 +13,23 @@ Authentication *ThreadAuth::getAuth() { return auth; } + +void ThreadAuth::assignThreadData(ThreadData *threadData) +{ + ThreadAuth::threadData = threadData; +} + +ThreadData *ThreadAuth::getThreadData() +{ + return threadData; +} + +void ThreadAuth::assignSettings(Settings *settings) +{ + ThreadAuth::settings = settings; +} + +Settings *ThreadAuth::getSettings() +{ + return settings; +} diff --git a/threadauth.h b/threadauth.h index 165cf4b..6f33444 100644 --- a/threadauth.h +++ b/threadauth.h @@ -1,14 +1,25 @@ #ifndef THREADAUTH_H #define THREADAUTH_H +#include "forward_declarations.h" + class Authentication; +// TODO: rename, this is no longer just auth, but thread local globals. class ThreadAuth { static thread_local Authentication *auth; + static thread_local ThreadData *threadData; + static thread_local Settings *settings; public: static void assign(Authentication *auth); static Authentication *getAuth(); + + static void assignThreadData(ThreadData *threadData); + static ThreadData *getThreadData(); + + static void assignSettings(Settings *settings); + static Settings *getSettings(); }; #endif // THREADAUTH_H diff --git a/threadloop.cpp b/threadloop.cpp index 371e939..39883f4 100644 --- a/threadloop.cpp +++ b/threadloop.cpp @@ -21,6 +21,8 @@ void do_thread_work(ThreadData *threadData) { int epoll_fd = threadData->epollfd; ThreadAuth::assign(&threadData->authentication); + ThreadAuth::assignThreadData(threadData); + ThreadAuth::assignSettings(&threadData->settingsLocalCopy); struct epoll_event events[MAX_EVENTS]; memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS);