Commit 0b1af55520a32fd36c4cf5ba8db222d3d79637f0

Authored by Wiebe Cazemier
1 parent bad46416

Put threadloop function in separate file

CMakeLists.txt
@@ -57,6 +57,7 @@ add_executable(FlashMQ @@ -57,6 +57,7 @@ add_executable(FlashMQ
57 sessionsandsubscriptionsdb.h 57 sessionsandsubscriptionsdb.h
58 qospacketqueue.h 58 qospacketqueue.h
59 threadauth.h 59 threadauth.h
  60 + threadloop.h
60 61
61 mainapp.cpp 62 mainapp.cpp
62 main.cpp 63 main.cpp
@@ -93,6 +94,7 @@ add_executable(FlashMQ @@ -93,6 +94,7 @@ add_executable(FlashMQ
93 sessionsandsubscriptionsdb.cpp 94 sessionsandsubscriptionsdb.cpp
94 qospacketqueue.cpp 95 qospacketqueue.cpp
95 threadauth.cpp 96 threadauth.cpp
  97 + threadloop.cpp
96 98
97 ) 99 )
98 100
FlashMQTests/FlashMQTests.pro
@@ -48,6 +48,7 @@ SOURCES += tst_maintests.cpp \ @@ -48,6 +48,7 @@ SOURCES += tst_maintests.cpp \
48 ../sessionsandsubscriptionsdb.cpp \ 48 ../sessionsandsubscriptionsdb.cpp \
49 ../qospacketqueue.cpp \ 49 ../qospacketqueue.cpp \
50 ../threadauth.cpp \ 50 ../threadauth.cpp \
  51 + ../threadloop.cpp \
51 mainappthread.cpp \ 52 mainappthread.cpp \
52 twoclienttestcontext.cpp 53 twoclienttestcontext.cpp
53 54
@@ -88,6 +89,7 @@ HEADERS += \ @@ -88,6 +89,7 @@ HEADERS += \
88 ../sessionsandsubscriptionsdb.h \ 89 ../sessionsandsubscriptionsdb.h \
89 ../qospacketqueue.h \ 90 ../qospacketqueue.h \
90 ../threadauth.h \ 91 ../threadauth.h \
  92 + ../threadloop.h \
91 mainappthread.h \ 93 mainappthread.h \
92 twoclienttestcontext.h 94 twoclienttestcontext.h
93 95
mainapp.cpp
@@ -29,153 +29,10 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>. @@ -29,153 +29,10 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>.
29 29
30 #include "logger.h" 30 #include "logger.h"
31 #include "threadauth.h" 31 #include "threadauth.h"
32 -  
33 -#define MAX_EVENTS 1024 32 +#include "threadloop.h"
34 33
35 MainApp *MainApp::instance = nullptr; 34 MainApp *MainApp::instance = nullptr;
36 35
37 -void do_thread_work(ThreadData *threadData)  
38 -{  
39 - int epoll_fd = threadData->epollfd;  
40 - ThreadAuth::assign(&threadData->authentication);  
41 -  
42 - struct epoll_event events[MAX_EVENTS];  
43 - memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS);  
44 -  
45 - std::vector<MqttPacket> packetQueueIn;  
46 -  
47 - Logger *logger = Logger::getInstance();  
48 -  
49 - try  
50 - {  
51 - logger->logf(LOG_NOTICE, "Thread %d doing auth init.", threadData->threadnr);  
52 - threadData->initAuthPlugin();  
53 - }  
54 - catch(std::exception &ex)  
55 - {  
56 - logger->logf(LOG_ERR, "Error initializing auth back-end: %s", ex.what());  
57 - threadData->running = false;  
58 - MainApp *instance = MainApp::getMainApp();  
59 - instance->quit();  
60 - }  
61 -  
62 - while (threadData->running)  
63 - {  
64 - int fdcount = epoll_wait(epoll_fd, events, MAX_EVENTS, 100);  
65 -  
66 - if (fdcount < 0)  
67 - {  
68 - if (errno == EINTR)  
69 - continue;  
70 - logger->logf(LOG_ERR, "Problem waiting for fd: %s", strerror(errno));  
71 - }  
72 - else if (fdcount > 0)  
73 - {  
74 - for (int i = 0; i < fdcount; i++)  
75 - {  
76 - struct epoll_event cur_ev = events[i];  
77 - int fd = cur_ev.data.fd;  
78 -  
79 - if (fd == threadData->taskEventFd)  
80 - {  
81 - uint64_t eventfd_value = 0;  
82 - check<std::runtime_error>(read(fd, &eventfd_value, sizeof(uint64_t)));  
83 -  
84 - std::lock_guard<std::mutex> locker(threadData->taskQueueMutex);  
85 - for(auto &f : threadData->taskQueue)  
86 - {  
87 - f();  
88 - }  
89 - threadData->taskQueue.clear();  
90 -  
91 - continue;  
92 - }  
93 -  
94 - std::shared_ptr<Client> client = threadData->getClient(fd);  
95 -  
96 - if (client)  
97 - {  
98 - try  
99 - {  
100 - if (cur_ev.events & (EPOLLERR | EPOLLHUP))  
101 - {  
102 - client->setDisconnectReason("epoll says socket is in ERR or HUP state.");  
103 - threadData->removeClient(client);  
104 - continue;  
105 - }  
106 - if (client->isSsl() && !client->isSslAccepted())  
107 - {  
108 - client->startOrContinueSslAccept();  
109 - continue;  
110 - }  
111 - if ((cur_ev.events & EPOLLIN) || ((cur_ev.events & EPOLLOUT) && client->getSslReadWantsWrite()))  
112 - {  
113 - bool readSuccess = client->readFdIntoBuffer();  
114 - client->bufferToMqttPackets(packetQueueIn, client);  
115 -  
116 - if (!readSuccess)  
117 - {  
118 - client->setDisconnectReason("socket disconnect detected");  
119 - threadData->removeClient(client);  
120 - continue;  
121 - }  
122 - }  
123 - if ((cur_ev.events & EPOLLOUT) || ((cur_ev.events & EPOLLIN) && client->getSslWriteWantsRead()))  
124 - {  
125 - if (!client->writeBufIntoFd())  
126 - {  
127 - threadData->removeClient(client);  
128 - continue;  
129 - }  
130 -  
131 - if (client->readyForDisconnecting())  
132 - {  
133 - threadData->removeClient(client);  
134 - continue;  
135 - }  
136 - }  
137 - }  
138 - catch(std::exception &ex)  
139 - {  
140 - client->setDisconnectReason(ex.what());  
141 - logger->logf(LOG_ERR, "Packet read/write error: %s. Removing client.", ex.what());  
142 - threadData->removeClient(client);  
143 - }  
144 - }  
145 - }  
146 - }  
147 -  
148 - for (MqttPacket &packet : packetQueueIn)  
149 - {  
150 - try  
151 - {  
152 - packet.handle();  
153 - }  
154 - catch (std::exception &ex)  
155 - {  
156 - packet.getSender()->setDisconnectReason(ex.what());  
157 - logger->logf(LOG_ERR, "MqttPacket handling error: %s. Removing client.", ex.what());  
158 - threadData->removeClient(packet.getSender());  
159 - }  
160 - }  
161 - packetQueueIn.clear();  
162 - }  
163 -  
164 - try  
165 - {  
166 - logger->logf(LOG_NOTICE, "Thread %d doing auth cleanup.", threadData->threadnr);  
167 - threadData->cleanupAuthPlugin();  
168 - }  
169 - catch(std::exception &ex)  
170 - {  
171 - logger->logf(LOG_ERR, "Error cleaning auth back-end: %s", ex.what());  
172 - }  
173 -  
174 - threadData->finished = true;  
175 -}  
176 -  
177 -  
178 -  
179 MainApp::MainApp(const std::string &configFilePath) : 36 MainApp::MainApp(const std::string &configFilePath) :
180 subscriptionStore(new SubscriptionStore()) 37 subscriptionStore(new SubscriptionStore())
181 { 38 {
threadloop.cpp 0 → 100644
  1 +/*
  2 +This file is part of FlashMQ (https://www.flashmq.org)
  3 +Copyright (C) 2021 Wiebe Cazemier
  4 +
  5 +FlashMQ is free software: you can redistribute it and/or modify
  6 +it under the terms of the GNU Affero General Public License as
  7 +published by the Free Software Foundation, version 3.
  8 +
  9 +FlashMQ is distributed in the hope that it will be useful,
  10 +but WITHOUT ANY WARRANTY; without even the implied warranty of
  11 +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12 +GNU Affero General Public License for more details.
  13 +
  14 +You should have received a copy of the GNU Affero General Public
  15 +License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>.
  16 +*/
  17 +
  18 +#include "threadloop.h"
  19 +
  20 +void do_thread_work(ThreadData *threadData)
  21 +{
  22 + int epoll_fd = threadData->epollfd;
  23 + ThreadAuth::assign(&threadData->authentication);
  24 +
  25 + struct epoll_event events[MAX_EVENTS];
  26 + memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS);
  27 +
  28 + std::vector<MqttPacket> packetQueueIn;
  29 +
  30 + Logger *logger = Logger::getInstance();
  31 +
  32 + try
  33 + {
  34 + logger->logf(LOG_NOTICE, "Thread %d doing auth init.", threadData->threadnr);
  35 + threadData->initAuthPlugin();
  36 + }
  37 + catch(std::exception &ex)
  38 + {
  39 + logger->logf(LOG_ERR, "Error initializing auth back-end: %s", ex.what());
  40 + threadData->running = false;
  41 + MainApp *instance = MainApp::getMainApp();
  42 + instance->quit();
  43 + }
  44 +
  45 + while (threadData->running)
  46 + {
  47 + int fdcount = epoll_wait(epoll_fd, events, MAX_EVENTS, 100);
  48 +
  49 + if (fdcount < 0)
  50 + {
  51 + if (errno == EINTR)
  52 + continue;
  53 + logger->logf(LOG_ERR, "Problem waiting for fd: %s", strerror(errno));
  54 + }
  55 + else if (fdcount > 0)
  56 + {
  57 + for (int i = 0; i < fdcount; i++)
  58 + {
  59 + struct epoll_event cur_ev = events[i];
  60 + int fd = cur_ev.data.fd;
  61 +
  62 + if (fd == threadData->taskEventFd)
  63 + {
  64 + uint64_t eventfd_value = 0;
  65 + check<std::runtime_error>(read(fd, &eventfd_value, sizeof(uint64_t)));
  66 +
  67 + std::lock_guard<std::mutex> locker(threadData->taskQueueMutex);
  68 + for(auto &f : threadData->taskQueue)
  69 + {
  70 + f();
  71 + }
  72 + threadData->taskQueue.clear();
  73 +
  74 + continue;
  75 + }
  76 +
  77 + std::shared_ptr<Client> client = threadData->getClient(fd);
  78 +
  79 + if (client)
  80 + {
  81 + try
  82 + {
  83 + if (cur_ev.events & (EPOLLERR | EPOLLHUP))
  84 + {
  85 + client->setDisconnectReason("epoll says socket is in ERR or HUP state.");
  86 + threadData->removeClient(client);
  87 + continue;
  88 + }
  89 + if (client->isSsl() && !client->isSslAccepted())
  90 + {
  91 + client->startOrContinueSslAccept();
  92 + continue;
  93 + }
  94 + if ((cur_ev.events & EPOLLIN) || ((cur_ev.events & EPOLLOUT) && client->getSslReadWantsWrite()))
  95 + {
  96 + bool readSuccess = client->readFdIntoBuffer();
  97 + client->bufferToMqttPackets(packetQueueIn, client);
  98 +
  99 + if (!readSuccess)
  100 + {
  101 + client->setDisconnectReason("socket disconnect detected");
  102 + threadData->removeClient(client);
  103 + continue;
  104 + }
  105 + }
  106 + if ((cur_ev.events & EPOLLOUT) || ((cur_ev.events & EPOLLIN) && client->getSslWriteWantsRead()))
  107 + {
  108 + if (!client->writeBufIntoFd())
  109 + {
  110 + threadData->removeClient(client);
  111 + continue;
  112 + }
  113 +
  114 + if (client->readyForDisconnecting())
  115 + {
  116 + threadData->removeClient(client);
  117 + continue;
  118 + }
  119 + }
  120 + }
  121 + catch(std::exception &ex)
  122 + {
  123 + client->setDisconnectReason(ex.what());
  124 + logger->logf(LOG_ERR, "Packet read/write error: %s. Removing client.", ex.what());
  125 + threadData->removeClient(client);
  126 + }
  127 + }
  128 + }
  129 + }
  130 +
  131 + for (MqttPacket &packet : packetQueueIn)
  132 + {
  133 + try
  134 + {
  135 + packet.handle();
  136 + }
  137 + catch (std::exception &ex)
  138 + {
  139 + packet.getSender()->setDisconnectReason(ex.what());
  140 + logger->logf(LOG_ERR, "MqttPacket handling error: %s. Removing client.", ex.what());
  141 + threadData->removeClient(packet.getSender());
  142 + }
  143 + }
  144 + packetQueueIn.clear();
  145 + }
  146 +
  147 + try
  148 + {
  149 + logger->logf(LOG_NOTICE, "Thread %d doing auth cleanup.", threadData->threadnr);
  150 + threadData->cleanupAuthPlugin();
  151 + }
  152 + catch(std::exception &ex)
  153 + {
  154 + logger->logf(LOG_ERR, "Error cleaning auth back-end: %s", ex.what());
  155 + }
  156 +
  157 + threadData->finished = true;
  158 +}
threadloop.h 0 → 100644
  1 +/*
  2 +This file is part of FlashMQ (https://www.flashmq.org)
  3 +Copyright (C) 2021 Wiebe Cazemier
  4 +
  5 +FlashMQ is free software: you can redistribute it and/or modify
  6 +it under the terms of the GNU Affero General Public License as
  7 +published by the Free Software Foundation, version 3.
  8 +
  9 +FlashMQ is distributed in the hope that it will be useful,
  10 +but WITHOUT ANY WARRANTY; without even the implied warranty of
  11 +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12 +GNU Affero General Public License for more details.
  13 +
  14 +You should have received a copy of the GNU Affero General Public
  15 +License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>.
  16 +*/
  17 +
  18 +#ifndef THREADLOOP_H
  19 +#define THREADLOOP_H
  20 +
  21 +#include "threaddata.h"
  22 +#include "threadauth.h"
  23 +
  24 +#define MAX_EVENTS 1024
  25 +
  26 +void do_thread_work(ThreadData *threadData);
  27 +
  28 +
  29 +#endif // THREADLOOP_H