client.h
4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#ifndef CLIENT_H
#define CLIENT_H
#include <fcntl.h>
#include <unistd.h>
#include <vector>
#include <mutex>
#include <iostream>
#include <time.h>
#include <openssl/ssl.h>
#include "forward_declarations.h"
#include "threaddata.h"
#include "mqttpacket.h"
#include "exceptions.h"
#include "cirbuf.h"
#include <openssl/ssl.h>
#include <openssl/err.h>
#define CLIENT_BUFFER_SIZE 1024 // Must be power of 2
#define MAX_PACKET_SIZE 268435461 // 256 MB + 5
#define MQTT_HEADER_LENGH 2
#define OPENSSL_ERROR_STRING_SIZE 256 // OpenSSL requires at least 256.
#define OPENSSL_WRONG_VERSION_NUMBER 336130315
enum class IoWrapResult
{
Success = 0,
Interrupted = 1,
Wouldblock = 2,
Disconnected = 3,
Error = 4
};
/*
* OpenSSL doc: "When a write function call has to be repeated because SSL_get_error(3) returned
* SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE, it must be repeated with the same arguments"
*/
struct IncompleteSslWrite
{
const void *buf = nullptr;
size_t nbytes = 0;
IncompleteSslWrite() = default;
IncompleteSslWrite(const void *buf, size_t nbytes);
bool hasPendingWrite();
void reset();
};
// TODO: give accepted addr, for showing in logs
class Client
{
int fd;
SSL *ssl = nullptr;
bool sslAccepted = false;
IncompleteSslWrite incompleteSslWrite;
bool sslReadWantsWrite = false;
bool sslWriteWantsRead = false;
CirBuf readbuf;
uint8_t readBufIsZeroCount = 0;
CirBuf writebuf;
uint8_t writeBufIsZeroCount = 0;
bool authenticated = false;
bool connectPacketSeen = false;
bool readyForWriting = false;
bool readyForReading = true;
bool disconnectWhenBytesWritten = false;
bool disconnecting = false;
std::string disconnectReason;
time_t lastActivity = time(NULL);
std::string clientid;
std::string username;
uint16_t keepalive = 0;
bool cleanSession = false;
std::string will_topic;
std::string will_payload;
bool will_retain = false;
char will_qos = 0;
ThreadData_p threadData;
std::mutex writeBufMutex;
std::shared_ptr<Session> session;
Logger *logger = Logger::getInstance();
void setReadyForWriting(bool val);
void setReadyForReading(bool val);
public:
Client(int fd, ThreadData_p threadData, SSL *ssl);
Client(const Client &other) = delete;
Client(Client &&other) = delete;
~Client();
int getFd() { return fd;}
bool isSslAccepted() const;
bool isSsl() const;
bool getSslReadWantsWrite() const;
bool getSslWriteWantsRead() const;
void startOrContinueSslAccept();
void markAsDisconnecting();
ssize_t readWrap(int fd, void *buf, size_t nbytes, IoWrapResult *error);
bool readFdIntoBuffer();
bool bufferToMqttPackets(std::vector<MqttPacket> &packetQueueIn, Client_p &sender);
void setClientProperties(const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, bool cleanSession);
void setWill(const std::string &topic, const std::string &payload, bool retain, char qos);
void setAuthenticated(bool value) { authenticated = value;}
bool getAuthenticated() { return authenticated; }
bool hasConnectPacketSeen() { return connectPacketSeen; }
ThreadData_p getThreadData() { return threadData; }
std::string &getClientId() { return this->clientid; }
bool getCleanSession() { return cleanSession; }
void assignSession(std::shared_ptr<Session> &session);
std::shared_ptr<Session> getSession();
void setDisconnectReason(const std::string &reason);
void writePingResp();
void writeMqttPacket(const MqttPacket &packet);
void writeMqttPacketAndBlameThisClient(const MqttPacket &packet);
ssize_t writeWrap(int fd, const void *buf, size_t nbytes, IoWrapResult *error);
bool writeBufIntoFd();
bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; }
// Do this before calling an action that makes this client ready for writing, so that the EPOLLOUT will handle it.
void setReadyForDisconnect() { disconnectWhenBytesWritten = true; }
std::string repr();
bool keepAliveExpired();
std::string getKeepAliveInfoString() const;
};
#endif // CLIENT_H