Commit be212624ea551e3270762acb604c161473268ced

Authored by Wiebe Cazemier
1 parent a6333881

Add some signal handling

I put the main app in a seperate class for it, because it was easier.
CMakeLists.txt
... ... @@ -7,6 +7,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
7 7  
8 8  
9 9 add_executable(FlashMQ
  10 + mainapp.cpp
10 11 main.cpp
11 12 utils.cpp
12 13 threaddata.cpp
... ...
main.cpp
1 1 #include <iostream>
2   -#include <sys/socket.h>
3   -#include <stdexcept>
4   -#include <netinet/in.h>
5   -#include <sys/epoll.h>
6   -#include <fcntl.h>
7   -#include <thread>
8   -#include <vector>
  2 +#include <signal.h>
  3 +#include <memory>
  4 +#include <string.h>
9 5  
  6 +#include "mainapp.h"
10 7  
11   -#include "utils.h"
12   -#include "threaddata.h"
13   -#include "client.h"
14   -#include "mqttpacket.h"
15   -#include "subscriptionstore.h"
  8 +static MainApp mainApp;
16 9  
17   -#define MAX_EVENTS 1024
18   -#define NR_OF_THREADS 4
19   -
20   -
21   -
22   -void do_thread_work(ThreadData *threadData)
  10 +static void signal_handler(int signal)
23 11 {
24   - int epoll_fd = threadData->epollfd;
25   -
26   - struct epoll_event events[MAX_EVENTS];
27   - memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS);
28   -
29   - std::vector<MqttPacket> packetQueueIn;
30   -
31   - uint64_t eventfd_value = 0;
32   -
33   - while (1)
  12 + if (signal == SIGPIPE)
  13 + {
  14 + return;
  15 + }
  16 + if (signal == SIGHUP)
34 17 {
35   - if (eventfd_value > 0)
36   - {
37   - for (Client_p client : threadData->getReadyForDequeueing())
38   - {
39   - //client->queuedMessagesToBuffer();
40   - client->writeBufIntoFd();
41   - }
42   - threadData->clearReadyForDequeueing();
43   - eventfd_value = 0;
44   - }
45   -
46   - // TODO: do all the buftofd here, not spread out over
47   -
48   - int fdcount = epoll_wait(epoll_fd, events, MAX_EVENTS, 100);
49   -
50   - if (fdcount > 0)
51   - {
52   - for (int i = 0; i < fdcount; i++)
53   - {
54   - struct epoll_event cur_ev = events[i];
55   - int fd = cur_ev.data.fd;
56   -
57   - // If this thread was actively woken up.
58   - if (fd == threadData->event_fd)
59   - {
60   - read(fd, &eventfd_value, sizeof(uint64_t));
61   - continue;
62   - }
63   -
64   - Client_p client = threadData->getClient(fd);
65   -
66   - if (client)
67   - {
68   - if (cur_ev.events & EPOLLIN)
69   - {
70   - bool readSuccess = client->readFdIntoBuffer();
71   - client->bufferToMqttPackets(packetQueueIn, client);
72   -
73   - if (!readSuccess)
74   - {
75   - std::cout << "Disconnect: " << client->repr() << std::endl;
76   - threadData->removeClient(client);
77   - }
78   - }
79   - if (cur_ev.events & EPOLLOUT)
80   - {
81   - if (!client->writeBufIntoFd())
82   - threadData->removeClient(client);
83   - }
84   - }
85   - }
86   - }
87 18  
88   - for (MqttPacket &packet : packetQueueIn)
89   - {
90   - packet.handle(threadData->getSubscriptionStore());
91   - }
92   - packetQueueIn.clear();
  19 + }
  20 + else if (signal == SIGTERM || signal == SIGINT)
  21 + {
  22 + mainApp.quit();
  23 + }
  24 + else
  25 + {
  26 + std::cerr << "Received signal " << signal << std::endl;
93 27 }
94 28 }
95 29  
96   -int main()
  30 +int register_signal_handers()
97 31 {
98   - int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
99   -
100   - // Not needed for now. Maybe I will make multiple accept threads later, with SO_REUSEPORT.
101   - //int optval = 1;
102   - //check<std::runtime_error>(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &optval, sizeof(optval)));
  32 + struct sigaction sa;
  33 + memset(&sa, 0, sizeof (struct sigaction));
  34 + sa.sa_handler = &signal_handler;
  35 + sigemptyset(&sa.sa_mask);
  36 + sa.sa_flags = SA_RESTART;
103 37  
104   - int flags = fcntl(listen_fd, F_GETFL);
105   - check<std::runtime_error>(fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK ));
106   -
107   - struct sockaddr_in in_addr;
108   - in_addr.sin_family = AF_INET;
109   - in_addr.sin_addr.s_addr = INADDR_ANY;
110   - in_addr.sin_port = htons(1883);
111   -
112   - check<std::runtime_error>(bind(listen_fd, (struct sockaddr *)(&in_addr), sizeof(struct sockaddr_in)));
113   - check<std::runtime_error>(listen(listen_fd, 1024));
114   -
115   - int epoll_fd_accept = check<std::runtime_error>(epoll_create(999));
116   -
117   - struct epoll_event events[MAX_EVENTS];
118   - struct epoll_event ev;
119   - memset(&ev, 0, sizeof (struct epoll_event));
120   - memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS);
121   -
122   - ev.data.fd = listen_fd;
123   - ev.events = EPOLLIN;
124   - check<std::runtime_error>(epoll_ctl(epoll_fd_accept, EPOLL_CTL_ADD, listen_fd, &ev));
125   -
126   - std::shared_ptr<SubscriptionStore> subscriptionStore(new SubscriptionStore());
127   -
128   - std::vector<std::shared_ptr<ThreadData>> threads;
129   -
130   - for (int i = 0; i < NR_OF_THREADS; i++)
  38 + if (sigaction(SIGHUP, &sa, nullptr) != 0 || sigaction(SIGTERM, &sa, nullptr) != 0 || sigaction(SIGINT, &sa, nullptr) != 0)
131 39 {
132   - std::shared_ptr<ThreadData> t(new ThreadData(i, subscriptionStore));
133   - std::thread thread(do_thread_work, t.get());
134   - t->thread = std::move(thread);
135   - threads.push_back(t);
  40 + std::cerr << "Error registering signals" << std::endl;
  41 + return 1;
136 42 }
137 43  
138   - std::cout << "Listening..." << std::endl;
139   -
140   - uint next_thread_index = 0;
  44 + sigset_t set;
  45 + sigemptyset(&set);
  46 + sigaddset(&set,SIGPIPE);
141 47  
142   - while (1)
  48 + int r;
  49 + if ((r = sigprocmask(SIG_BLOCK, &set, NULL) != 0))
143 50 {
144   - int num_fds = epoll_wait(epoll_fd_accept, events, MAX_EVENTS, 100);
145   -
146   - for (int i = 0; i < num_fds; i++)
147   - {
148   - int cur_fd = events[i].data.fd;
149   - if (cur_fd == listen_fd)
150   - {
151   - std::shared_ptr<ThreadData> thread_data = threads[next_thread_index++ % NR_OF_THREADS];
152   -
153   - std::cout << "Accepting connection on thread " << thread_data->threadnr << std::endl;
154   -
155   - struct sockaddr addr;
156   - memset(&addr, 0, sizeof(struct sockaddr));
157   - socklen_t len = sizeof(struct sockaddr);
158   - int fd = check<std::runtime_error>(accept(cur_fd, &addr, &len));
159   -
160   - Client_p client(new Client(fd, thread_data));
161   - thread_data->giveClient(client);
162   - }
163   - else
164   - {
165   - throw std::runtime_error("The main thread had activity on an accepted socket?");
166   - }
  51 + return r;
  52 + }
167 53  
168   - }
  54 + return 0;
  55 +}
169 56  
  57 +int main()
  58 +{
  59 + try
  60 + {
  61 + check<std::runtime_error>(register_signal_handers());
  62 + mainApp.start();
  63 + }
  64 + catch (std::exception &ex)
  65 + {
  66 + std::cerr << ex.what() << std::endl;
  67 + return 1;
170 68 }
171 69  
172 70 return 0;
... ...
mainapp.cpp 0 → 100644
  1 +#include "mainapp.h"
  2 +
  3 +#define MAX_EVENTS 1024
  4 +#define NR_OF_THREADS 4
  5 +
  6 +void do_thread_work(ThreadData *threadData)
  7 +{
  8 + int epoll_fd = threadData->epollfd;
  9 +
  10 + struct epoll_event events[MAX_EVENTS];
  11 + memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS);
  12 +
  13 + std::vector<MqttPacket> packetQueueIn;
  14 +
  15 + uint64_t eventfd_value = 0;
  16 +
  17 + while (threadData->running)
  18 + {
  19 + if (eventfd_value > 0)
  20 + {
  21 + for (Client_p client : threadData->getReadyForDequeueing())
  22 + {
  23 + //client->queuedMessagesToBuffer();
  24 + client->writeBufIntoFd();
  25 + }
  26 + threadData->clearReadyForDequeueing();
  27 + eventfd_value = 0;
  28 + }
  29 +
  30 + // TODO: do all the buftofd here, not spread out over
  31 +
  32 + int fdcount = epoll_wait(epoll_fd, events, MAX_EVENTS, 100);
  33 +
  34 + if (fdcount > 0)
  35 + {
  36 + for (int i = 0; i < fdcount; i++)
  37 + {
  38 + struct epoll_event cur_ev = events[i];
  39 + int fd = cur_ev.data.fd;
  40 +
  41 + // If this thread was actively woken up.
  42 + if (fd == threadData->event_fd)
  43 + {
  44 + read(fd, &eventfd_value, sizeof(uint64_t));
  45 + continue;
  46 + }
  47 +
  48 + Client_p client = threadData->getClient(fd);
  49 +
  50 + if (client)
  51 + {
  52 + if (cur_ev.events & EPOLLIN)
  53 + {
  54 + bool readSuccess = client->readFdIntoBuffer();
  55 + client->bufferToMqttPackets(packetQueueIn, client);
  56 +
  57 + if (!readSuccess)
  58 + {
  59 + std::cout << "Disconnect: " << client->repr() << std::endl;
  60 + threadData->removeClient(client);
  61 + }
  62 + }
  63 + if (cur_ev.events & EPOLLOUT)
  64 + {
  65 + if (!client->writeBufIntoFd())
  66 + threadData->removeClient(client);
  67 + }
  68 + }
  69 + }
  70 + }
  71 +
  72 + for (MqttPacket &packet : packetQueueIn)
  73 + {
  74 + packet.handle(threadData->getSubscriptionStore());
  75 + }
  76 + packetQueueIn.clear();
  77 + }
  78 +}
  79 +
  80 +MainApp::MainApp() :
  81 + subscriptionStore(new SubscriptionStore())
  82 +{
  83 +
  84 +}
  85 +
  86 +void MainApp::start()
  87 +{
  88 + int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  89 +
  90 + // Not needed for now. Maybe I will make multiple accept threads later, with SO_REUSEPORT.
  91 + //int optval = 1;
  92 + //check<std::runtime_error>(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &optval, sizeof(optval)));
  93 +
  94 + int flags = fcntl(listen_fd, F_GETFL);
  95 + check<std::runtime_error>(fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK ));
  96 +
  97 + struct sockaddr_in in_addr;
  98 + in_addr.sin_family = AF_INET;
  99 + in_addr.sin_addr.s_addr = INADDR_ANY;
  100 + in_addr.sin_port = htons(1883);
  101 +
  102 + check<std::runtime_error>(bind(listen_fd, (struct sockaddr *)(&in_addr), sizeof(struct sockaddr_in)));
  103 + check<std::runtime_error>(listen(listen_fd, 1024));
  104 +
  105 + int epoll_fd_accept = check<std::runtime_error>(epoll_create(999));
  106 +
  107 + struct epoll_event events[MAX_EVENTS];
  108 + struct epoll_event ev;
  109 + memset(&ev, 0, sizeof (struct epoll_event));
  110 + memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS);
  111 +
  112 + ev.data.fd = listen_fd;
  113 + ev.events = EPOLLIN;
  114 + check<std::runtime_error>(epoll_ctl(epoll_fd_accept, EPOLL_CTL_ADD, listen_fd, &ev));
  115 +
  116 + for (int i = 0; i < NR_OF_THREADS; i++)
  117 + {
  118 + std::shared_ptr<ThreadData> t(new ThreadData(i, subscriptionStore));
  119 + std::thread thread(do_thread_work, t.get());
  120 + t->thread = std::move(thread);
  121 + threads.push_back(t);
  122 + }
  123 +
  124 + std::cout << "Listening..." << std::endl;
  125 +
  126 + uint next_thread_index = 0;
  127 +
  128 + while (running)
  129 + {
  130 + int num_fds = epoll_wait(epoll_fd_accept, events, MAX_EVENTS, 100);
  131 +
  132 + for (int i = 0; i < num_fds; i++)
  133 + {
  134 + int cur_fd = events[i].data.fd;
  135 + if (cur_fd == listen_fd)
  136 + {
  137 + std::shared_ptr<ThreadData> thread_data = threads[next_thread_index++ % NR_OF_THREADS];
  138 +
  139 + std::cout << "Accepting connection on thread " << thread_data->threadnr << std::endl;
  140 +
  141 + struct sockaddr addr;
  142 + memset(&addr, 0, sizeof(struct sockaddr));
  143 + socklen_t len = sizeof(struct sockaddr);
  144 + int fd = check<std::runtime_error>(accept(cur_fd, &addr, &len));
  145 +
  146 + Client_p client(new Client(fd, thread_data));
  147 + thread_data->giveClient(client);
  148 + }
  149 + else
  150 + {
  151 + throw std::runtime_error("The main thread had activity on an accepted socket?");
  152 + }
  153 +
  154 + }
  155 + }
  156 +}
  157 +
  158 +void MainApp::quit()
  159 +{
  160 + std::cout << "Quitting application" << std::endl;
  161 +
  162 + running = false;
  163 +
  164 + for(std::shared_ptr<ThreadData> &thread : threads)
  165 + {
  166 + thread->quit();
  167 + }
  168 +}
... ...
mainapp.h 0 → 100644
  1 +#ifndef MAINAPP_H
  2 +#define MAINAPP_H
  3 +
  4 +#include <iostream>
  5 +#include <sys/socket.h>
  6 +#include <stdexcept>
  7 +#include <netinet/in.h>
  8 +#include <fcntl.h>
  9 +#include <thread>
  10 +#include <vector>
  11 +
  12 +#include "forward_declarations.h"
  13 +
  14 +#include "utils.h"
  15 +#include "threaddata.h"
  16 +#include "client.h"
  17 +#include "mqttpacket.h"
  18 +#include "subscriptionstore.h"
  19 +
  20 +
  21 +class MainApp
  22 +{
  23 + bool running = true;
  24 + std::vector<std::shared_ptr<ThreadData>> threads;
  25 + std::shared_ptr<SubscriptionStore> subscriptionStore;
  26 +
  27 +public:
  28 + MainApp();
  29 + void start();
  30 + void quit();
  31 +};
  32 +
  33 +#endif // MAINAPP_H
... ...
threaddata.cpp
... ... @@ -15,6 +15,12 @@ ThreadData::ThreadData(int threadnr, std::shared_ptr&lt;SubscriptionStore&gt; &amp;subscri
15 15 check<std::runtime_error>(epoll_ctl(epollfd, EPOLL_CTL_ADD, event_fd, &ev));
16 16 }
17 17  
  18 +void ThreadData::quit()
  19 +{
  20 + running = false;
  21 + thread.join();
  22 +}
  23 +
18 24 void ThreadData::giveClient(Client_p client)
19 25 {
20 26 int fd = client->getFd();
... ...
threaddata.h
... ... @@ -26,6 +26,7 @@ class ThreadData
26 26 std::mutex readForDequeuingMutex;
27 27  
28 28 public:
  29 + bool running = true;
29 30 std::thread thread;
30 31 int threadnr = 0;
31 32 int epollfd = 0;
... ... @@ -33,6 +34,7 @@ public:
33 34  
34 35 ThreadData(int threadnr, std::shared_ptr<SubscriptionStore> &subscriptionStore);
35 36  
  37 + void quit();
36 38 void giveClient(Client_p client);
37 39 Client_p getClient(int fd);
38 40 void removeClient(Client_p client);
... ...