client.cpp
2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#include "client.h"
Client::Client(int fd, ThreadData_p threadData) :
fd(fd),
threadData(threadData)
{
int flags = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
readbuf = (char*)malloc(CLIENT_BUFFER_SIZE);
}
Client::~Client()
{
epoll_ctl(threadData->epollfd, EPOLL_CTL_DEL, fd, NULL); // NOTE: the last NULL can cause crash on old kernels
close(fd);
free(readbuf);
}
// false means any kind of error we want to get rid of the client for.
bool Client::readFdIntoBuffer()
{
int read_size = getMaxWriteSize();
int n;
while ((n = read(fd, &readbuf[wi], read_size)) != 0)
{
if (n < 0)
{
if (errno == EINTR)
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
else
return false;
}
wi += n;
if (getBufBytesUsed() >= bufsize)
{
growBuffer();
}
read_size = getMaxWriteSize();
}
if (n == 0) // client disconnected.
{
return false;
}
return true;
}
bool Client::bufferToMqttPackets(std::vector<MqttPacket> &packetQueueIn)
{
while (getBufBytesUsed() >= MQTT_HEADER_LENGH)
{
// Determine the packet length by decoding the variable length
size_t remaining_length_i = 1;
int multiplier = 1;
size_t packet_length = 0;
unsigned char encodedByte = 0;
do
{
if (remaining_length_i >= getBufBytesUsed())
break;
encodedByte = readbuf[remaining_length_i++];
packet_length += (encodedByte & 127) * multiplier;
multiplier *= 128;
if (multiplier > 128*128*128)
return false;
}
while ((encodedByte & 128) != 0);
packet_length += remaining_length_i;
if (!authenticated && packet_length >= 1024*1024)
{
throw ProtocolError("An unauthenticated client sends a packet of 1 MB or bigger? Probably it's just random bytes.");
}
if (packet_length <= getBufBytesUsed())
{
MqttPacket packet(&readbuf[ri], packet_length, remaining_length_i, this);
packetQueueIn.push_back(std::move(packet));
ri += packet_length;
}
else
break;
}
if (ri == wi)
{
ri = 0;
wi = 0;
}
return true;
// TODO: reset buffer to normal size after a while of not needing it, or not needing the extra space.
}
void Client::setClientProperties(const std::string &clientId, const std::string username, bool connectPacketSeen)
{
this->clientid = clientId;
this->username = username;
this->connectPacketSeen = connectPacketSeen;
}