Commit 21bf9bf5eeceb2ce9be15d021eb6cb806e48a462

Authored by Wiebe Cazemier
1 parent 3e7072dd

Error handling and a bit of cleanup

client.cpp
@@ -3,6 +3,7 @@ @@ -3,6 +3,7 @@
3 #include <cstring> 3 #include <cstring>
4 #include <sstream> 4 #include <sstream>
5 #include <iostream> 5 #include <iostream>
  6 +#include <cassert>
6 7
7 Client::Client(int fd, ThreadData_p threadData) : 8 Client::Client(int fd, ThreadData_p threadData) :
8 fd(fd), 9 fd(fd),
@@ -12,6 +13,9 @@ Client::Client(int fd, ThreadData_p threadData) : @@ -12,6 +13,9 @@ Client::Client(int fd, ThreadData_p threadData) :
12 fcntl(fd, F_SETFL, flags | O_NONBLOCK); 13 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
13 readbuf = (char*)malloc(CLIENT_BUFFER_SIZE); 14 readbuf = (char*)malloc(CLIENT_BUFFER_SIZE);
14 writebuf = (char*)malloc(CLIENT_BUFFER_SIZE); 15 writebuf = (char*)malloc(CLIENT_BUFFER_SIZE);
  16 +
  17 + if (readbuf == NULL || writebuf == NULL)
  18 + throw std::runtime_error("Malloc error constructing client.");
15 } 19 }
16 20
17 Client::~Client() 21 Client::~Client()
@@ -25,7 +29,7 @@ void Client::closeConnection() @@ -25,7 +29,7 @@ void Client::closeConnection()
25 { 29 {
26 if (fd < 0) 30 if (fd < 0)
27 return; 31 return;
28 - epoll_ctl(threadData->epollfd, EPOLL_CTL_DEL, fd, NULL); 32 + check<std::runtime_error>(epoll_ctl(threadData->epollfd, EPOLL_CTL_DEL, fd, NULL));
29 close(fd); 33 close(fd);
30 fd = -1; 34 fd = -1;
31 } 35 }
@@ -59,10 +63,7 @@ bool Client::readFdIntoBuffer() @@ -59,10 +63,7 @@ bool Client::readFdIntoBuffer()
59 if (errno == EAGAIN || errno == EWOULDBLOCK) 63 if (errno == EAGAIN || errno == EWOULDBLOCK)
60 break; 64 break;
61 else 65 else
62 - {  
63 - std::cerr << strerror(errno) << std::endl;  
64 - return false;  
65 - } 66 + check<std::runtime_error>(n);
66 } 67 }
67 } 68 }
68 69
@@ -91,13 +92,6 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet) @@ -91,13 +92,6 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet)
91 setReadyForWriting(true); 92 setReadyForWriting(true);
92 } 93 }
93 94
94 -// Not sure if this is the method I want to use  
95 -void Client::writeMqttPacketLocked(const MqttPacket &packet)  
96 -{  
97 - std::lock_guard<std::mutex> lock(writeBufMutex);  
98 - writeMqttPacket(packet);  
99 -}  
100 -  
101 // Ping responses are always the same, so hardcoding it for optimization. 95 // Ping responses are always the same, so hardcoding it for optimization.
102 void Client::writePingResp() 96 void Client::writePingResp()
103 { 97 {
@@ -131,7 +125,7 @@ bool Client::writeBufIntoFd() @@ -131,7 +125,7 @@ bool Client::writeBufIntoFd()
131 if (errno == EAGAIN || errno == EWOULDBLOCK) 125 if (errno == EAGAIN || errno == EWOULDBLOCK)
132 break; 126 break;
133 else 127 else
134 - return false; 128 + check<std::runtime_error>(n);
135 } 129 }
136 } 130 }
137 131
@@ -147,27 +141,14 @@ bool Client::writeBufIntoFd() @@ -147,27 +141,14 @@ bool Client::writeBufIntoFd()
147 return true; 141 return true;
148 } 142 }
149 143
150 -  
151 -  
152 std::string Client::repr() 144 std::string Client::repr()
153 { 145 {
154 std::ostringstream a; 146 std::ostringstream a;
155 a << "Client = " << clientid << ", user = " << username; 147 a << "Client = " << clientid << ", user = " << username;
  148 + a.flush();
156 return a.str(); 149 return a.str();
157 } 150 }
158 151
159 -void Client::queueMessage(const MqttPacket &packet)  
160 -{  
161 -  
162 -  
163 - // TODO: semaphores on stl containers?  
164 -}  
165 -  
166 -void Client::queuedMessagesToBuffer()  
167 -{  
168 -  
169 -}  
170 -  
171 void Client::setReadyForWriting(bool val) 152 void Client::setReadyForWriting(bool val)
172 { 153 {
173 if (val == this->readyForWriting) 154 if (val == this->readyForWriting)
@@ -233,9 +214,7 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_ @@ -233,9 +214,7 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_
233 packetQueueIn.push_back(std::move(packet)); 214 packetQueueIn.push_back(std::move(packet));
234 215
235 ri += packet_length; 216 ri += packet_length;
236 -  
237 - if (ri > wi)  
238 - throw std::runtime_error("hier"); 217 + assert(ri <= wi);
239 } 218 }
240 else 219 else
241 break; 220 break;
client.h
@@ -116,15 +116,10 @@ public: @@ -116,15 +116,10 @@ public:
116 116
117 void writePingResp(); 117 void writePingResp();
118 void writeMqttPacket(const MqttPacket &packet); 118 void writeMqttPacket(const MqttPacket &packet);
119 - void writeMqttPacketLocked(const MqttPacket &packet);  
120 bool writeBufIntoFd(); 119 bool writeBufIntoFd();
121 120
122 std::string repr(); 121 std::string repr();
123 122
124 - void queueMessage(const MqttPacket &packet);  
125 - void queuedMessagesToBuffer();  
126 -  
127 -  
128 }; 123 };
129 124
130 #endif // CLIENT_H 125 #endif // CLIENT_H
exceptions.h
@@ -10,5 +10,11 @@ public: @@ -10,5 +10,11 @@ public:
10 ProtocolError(const std::string &msg) : std::runtime_error(msg) {} 10 ProtocolError(const std::string &msg) : std::runtime_error(msg) {}
11 }; 11 };
12 12
  13 +class NotImplementedException : public std::runtime_error
  14 +{
  15 +public:
  16 + NotImplementedException(const std::string &msg) : std::runtime_error(msg) {}
  17 +};
  18 +
13 19
14 #endif // EXCEPTIONS_H 20 #endif // EXCEPTIONS_H
main.cpp
@@ -38,7 +38,7 @@ int register_signal_handers() @@ -38,7 +38,7 @@ int register_signal_handers()
38 if (sigaction(SIGHUP, &sa, nullptr) != 0 || sigaction(SIGTERM, &sa, nullptr) != 0 || sigaction(SIGINT, &sa, nullptr) != 0) 38 if (sigaction(SIGHUP, &sa, nullptr) != 0 || sigaction(SIGTERM, &sa, nullptr) != 0 || sigaction(SIGINT, &sa, nullptr) != 0)
39 { 39 {
40 std::cerr << "Error registering signals" << std::endl; 40 std::cerr << "Error registering signals" << std::endl;
41 - return 1; 41 + return -1;
42 } 42 }
43 43
44 sigset_t set; 44 sigset_t set;
mainapp.cpp
1 #include "mainapp.h" 1 #include "mainapp.h"
2 #include "cassert" 2 #include "cassert"
  3 +#include "exceptions.h"
3 4
4 #define MAX_EVENTS 1024 5 #define MAX_EVENTS 1024
5 #define NR_OF_THREADS 4 6 #define NR_OF_THREADS 4
@@ -32,7 +33,13 @@ void do_thread_work(ThreadData *threadData) @@ -32,7 +33,13 @@ void do_thread_work(ThreadData *threadData)
32 33
33 int fdcount = epoll_wait(epoll_fd, events, MAX_EVENTS, 100); 34 int fdcount = epoll_wait(epoll_fd, events, MAX_EVENTS, 100);
34 35
35 - if (fdcount > 0) 36 + if (fdcount < 0)
  37 + {
  38 + if (errno == EINTR)
  39 + continue;
  40 + std::cerr << "Problem waiting for fd: " << strerror(errno) << std::endl;
  41 + }
  42 + else if (fdcount > 0)
36 { 43 {
37 for (int i = 0; i < fdcount; i++) 44 for (int i = 0; i < fdcount; i++)
38 { 45 {
@@ -50,21 +57,29 @@ void do_thread_work(ThreadData *threadData) @@ -50,21 +57,29 @@ void do_thread_work(ThreadData *threadData)
50 57
51 if (client) 58 if (client)
52 { 59 {
53 - if (cur_ev.events & EPOLLIN) 60 + try
54 { 61 {
55 - bool readSuccess = client->readFdIntoBuffer();  
56 - client->bufferToMqttPackets(packetQueueIn, client);  
57 -  
58 - if (!readSuccess) 62 + if (cur_ev.events & EPOLLIN)
  63 + {
  64 + bool readSuccess = client->readFdIntoBuffer();
  65 + client->bufferToMqttPackets(packetQueueIn, client);
  66 +
  67 + if (!readSuccess)
  68 + {
  69 + std::cout << "Disconnect: " << client->repr() << std::endl;
  70 + threadData->removeClient(client);
  71 + }
  72 + }
  73 + if (cur_ev.events & EPOLLOUT)
59 { 74 {
60 - std::cout << "Disconnect: " << client->repr() << std::endl;  
61 - threadData->removeClient(client); 75 + if (!client->writeBufIntoFd())
  76 + threadData->removeClient(client);
62 } 77 }
63 } 78 }
64 - if (cur_ev.events & EPOLLOUT) 79 + catch(std::exception &ex)
65 { 80 {
66 - if (!client->writeBufIntoFd())  
67 - threadData->removeClient(client); 81 + std::cerr << ex.what() << std::endl;
  82 + threadData->removeClient(client);
68 } 83 }
69 } 84 }
70 else 85 else
@@ -76,7 +91,15 @@ void do_thread_work(ThreadData *threadData) @@ -76,7 +91,15 @@ void do_thread_work(ThreadData *threadData)
76 91
77 for (MqttPacket &packet : packetQueueIn) 92 for (MqttPacket &packet : packetQueueIn)
78 { 93 {
79 - packet.handle(threadData->getSubscriptionStore()); 94 + try
  95 + {
  96 + packet.handle(threadData->getSubscriptionStore());
  97 + }
  98 + catch (std::exception &ex)
  99 + {
  100 + std::cerr << ex.what() << std::endl;
  101 + threadData->removeClient(packet.getSender());
  102 + }
80 } 103 }
81 packetQueueIn.clear(); 104 packetQueueIn.clear();
82 } 105 }
@@ -142,26 +165,40 @@ void MainApp::start() @@ -142,26 +165,40 @@ void MainApp::start()
142 { 165 {
143 int num_fds = epoll_wait(epoll_fd_accept, events, MAX_EVENTS, 100); 166 int num_fds = epoll_wait(epoll_fd_accept, events, MAX_EVENTS, 100);
144 167
  168 + if (num_fds < 0)
  169 + {
  170 + if (errno == EINTR)
  171 + continue;
  172 + std::cerr << strerror(errno) << std::endl;
  173 + }
  174 +
145 for (int i = 0; i < num_fds; i++) 175 for (int i = 0; i < num_fds; i++)
146 { 176 {
147 int cur_fd = events[i].data.fd; 177 int cur_fd = events[i].data.fd;
148 - if (cur_fd == listen_fd) 178 + try
149 { 179 {
150 - std::shared_ptr<ThreadData> thread_data = threads[next_thread_index++ % NR_OF_THREADS]; 180 + if (cur_fd == listen_fd)
  181 + {
  182 + std::shared_ptr<ThreadData> thread_data = threads[next_thread_index++ % NR_OF_THREADS];
151 183
152 - std::cout << "Accepting connection on thread " << thread_data->threadnr << std::endl; 184 + std::cout << "Accepting connection on thread " << thread_data->threadnr << std::endl;
153 185
154 - struct sockaddr addr;  
155 - memset(&addr, 0, sizeof(struct sockaddr));  
156 - socklen_t len = sizeof(struct sockaddr);  
157 - int fd = check<std::runtime_error>(accept(cur_fd, &addr, &len)); 186 + struct sockaddr addr;
  187 + memset(&addr, 0, sizeof(struct sockaddr));
  188 + socklen_t len = sizeof(struct sockaddr);
  189 + int fd = check<std::runtime_error>(accept(cur_fd, &addr, &len));
158 190
159 - Client_p client(new Client(fd, thread_data));  
160 - thread_data->giveClient(client); 191 + Client_p client(new Client(fd, thread_data));
  192 + thread_data->giveClient(client);
  193 + }
  194 + else
  195 + {
  196 + throw std::runtime_error("The main thread had activity on an accepted socket?");
  197 + }
161 } 198 }
162 - else 199 + catch (std::exception &ex)
163 { 200 {
164 - throw std::runtime_error("The main thread had activity on an accepted socket?"); 201 + std::cerr << "Problem accepting connection: " << ex.what() << std::endl;
165 } 202 }
166 203
167 } 204 }
rwlockguard.cpp
1 #include "rwlockguard.h" 1 #include "rwlockguard.h"
  2 +#include "utils.h"
  3 +#include "stdexcept"
2 4
3 RWLockGuard::RWLockGuard(pthread_rwlock_t *rwlock) : 5 RWLockGuard::RWLockGuard(pthread_rwlock_t *rwlock) :
4 rwlock(rwlock) 6 rwlock(rwlock)
@@ -13,10 +15,12 @@ RWLockGuard::~RWLockGuard() @@ -13,10 +15,12 @@ RWLockGuard::~RWLockGuard()
13 15
14 void RWLockGuard::wrlock() 16 void RWLockGuard::wrlock()
15 { 17 {
16 - pthread_rwlock_wrlock(rwlock); 18 + if (pthread_rwlock_wrlock(rwlock) != 0)
  19 + throw std::runtime_error("wrlock failed.");
17 } 20 }
18 21
19 void RWLockGuard::rdlock() 22 void RWLockGuard::rdlock()
20 { 23 {
21 - pthread_rwlock_wrlock(rwlock); 24 + if (pthread_rwlock_wrlock(rwlock) != 0)
  25 + throw std::runtime_error("rdlock failed.");
22 } 26 }