/* This file is part of FlashMQ (https://www.flashmq.org) Copyright (C) 2021 Wiebe Cazemier FlashMQ is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3. FlashMQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with FlashMQ. If not, see . */ #ifndef THREADDATA_H #define THREADDATA_H #include #include #include #include #include #include #include #include #include #include "forward_declarations.h" #include "client.h" #include "subscriptionstore.h" #include "utils.h" #include "configfileparser.h" #include "authplugin.h" #include "logger.h" typedef void (*thread_f)(ThreadData *); class ThreadData { std::unordered_map> clients_by_fd; std::mutex clients_by_fd_mutex; std::shared_ptr subscriptionStore; Logger *logger; uint64_t receivedMessageCount = 0; uint64_t receivedMessageCountPrevious = 0; std::chrono::time_point receivedMessagePreviousTime = std::chrono::steady_clock::now(); uint64_t sentMessageCount = 0; uint64_t sentMessageCountPrevious = 0; std::chrono::time_point sentMessagePreviousTime = std::chrono::steady_clock::now(); std::mutex clientsToRemoveMutex; std::forward_list> clientsQueuedForRemoving; void reload(std::shared_ptr settings); void wakeUpThread(); void doKeepAliveCheck(); void quit(); void publishStatsOnDollarTopic(std::vector> &threads); void publishStat(const std::string &topic, uint64_t n); void removeQueuedClients(); public: Settings settingsLocalCopy; // Is updated on reload, within the thread loop. Authentication authentication; bool running = true; bool finished = false; std::thread thread; int threadnr = 0; int epollfd = 0; int taskEventFd = 0; std::mutex taskQueueMutex; std::forward_list> taskQueue; ThreadData(int threadnr, std::shared_ptr &subscriptionStore, std::shared_ptr settings); ThreadData(const ThreadData &other) = delete; ThreadData(ThreadData &&other) = delete; void start(thread_f f); void giveClient(std::shared_ptr client); std::shared_ptr getClient(int fd); void removeClientQueued(const std::shared_ptr &client); void removeClientQueued(int fd); void removeClient(std::shared_ptr client); std::shared_ptr &getSubscriptionStore(); void initAuthPlugin(); void cleanupAuthPlugin(); void queueReload(std::shared_ptr settings); void queueDoKeepAliveCheck(); void queueQuit(); void waitForQuit(); void queuePasswdFileReload(); void queuePublishStatsOnDollarTopic(std::vector> &threads); int getNrOfClients() const; void incrementReceivedMessageCount(); uint64_t getReceivedMessageCount() const; uint64_t getReceivedMessagePerSecond(); void incrementSentMessageCount(uint64_t n); uint64_t getSentMessageCount() const; uint64_t getSentMessagePerSecond(); void queueAuthPluginPeriodicEvent(); void authPluginPeriodicEvent(); }; #endif // THREADDATA_H