/* This file is part of FlashMQ (https://www.flashmq.org) Copyright (C) 2021 Wiebe Cazemier FlashMQ is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3. FlashMQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with FlashMQ. If not, see . */ #ifndef THREADDATA_H #define THREADDATA_H #include #include #include #include #include #include #include #include #include #include "forward_declarations.h" #include "client.h" #include "subscriptionstore.h" #include "utils.h" #include "configfileparser.h" #include "authplugin.h" #include "logger.h" typedef void (*thread_f)(ThreadData *); struct KeepAliveCheck { std::weak_ptr client; bool recheck = true; KeepAliveCheck(const std::shared_ptr client); }; class ThreadData { std::unordered_map> clients_by_fd; std::mutex clients_by_fd_mutex; std::shared_ptr subscriptionStore; Logger *logger; uint64_t receivedMessageCount = 0; uint64_t receivedMessageCountPrevious = 0; std::chrono::time_point receivedMessagePreviousTime = std::chrono::steady_clock::now(); uint64_t sentMessageCount = 0; uint64_t sentMessageCountPrevious = 0; std::chrono::time_point sentMessagePreviousTime = std::chrono::steady_clock::now(); std::mutex clientsToRemoveMutex; std::forward_list> clientsQueuedForRemoving; std::mutex queuedKeepAliveMutex; std::map> queuedKeepAliveChecks; void reload(std::shared_ptr settings); void wakeUpThread(); void doKeepAliveCheck(); void quit(); void publishStatsOnDollarTopic(std::vector> &threads); void publishStat(const std::string &topic, uint64_t n); void sendQueuedWills(); void removeExpiredSessions(); void sendAllWills(); void sendAllDisconnects(); void queueClientNextKeepAliveCheck(std::shared_ptr &client, bool keepRechecking); void removeQueuedClients(); public: Settings settingsLocalCopy; // Is updated on reload, within the thread loop. Authentication authentication; bool running = true; bool finished = false; bool allWillsQueued = false; bool allDisconnectsSent = false; std::thread thread; int threadnr = 0; int epollfd = 0; int taskEventFd = 0; std::mutex taskQueueMutex; std::forward_list> taskQueue; ThreadData(int threadnr, std::shared_ptr &subscriptionStore, std::shared_ptr settings); ThreadData(const ThreadData &other) = delete; ThreadData(ThreadData &&other) = delete; void start(thread_f f); void giveClient(std::shared_ptr client); std::shared_ptr getClient(int fd); void removeClientQueued(const std::shared_ptr &client); void removeClientQueued(int fd); void removeClient(std::shared_ptr client); std::shared_ptr &getSubscriptionStore(); void initAuthPlugin(); void cleanupAuthPlugin(); void queueReload(std::shared_ptr settings); void queueDoKeepAliveCheck(); void queueQuit(); void waitForQuit(); void queuePasswdFileReload(); void queuePublishStatsOnDollarTopic(std::vector> &threads); void queueSendingQueuedWills(); void queueRemoveExpiredSessions(); void queueClientNextKeepAliveCheckLocked(std::shared_ptr &client, bool keepRechecking); int getNrOfClients() const; void incrementReceivedMessageCount(); uint64_t getReceivedMessageCount() const; uint64_t getReceivedMessagePerSecond(); void incrementSentMessageCount(uint64_t n); uint64_t getSentMessageCount() const; uint64_t getSentMessagePerSecond(); void queueAuthPluginPeriodicEvent(); void authPluginPeriodicEvent(); void queueSendWills(); void queueSendDisconnects(); }; #endif // THREADDATA_H