Commit 1106e210f5a98ce3b3c7e11edfd72ec51a9ca967
1 parent
f3a45a2a
Buffer size and max packet size as setting
Showing
8 changed files
with
55 additions
and
14 deletions
client.cpp
| @@ -7,11 +7,13 @@ | @@ -7,11 +7,13 @@ | ||
| 7 | 7 | ||
| 8 | #include "logger.h" | 8 | #include "logger.h" |
| 9 | 9 | ||
| 10 | -Client::Client(int fd, ThreadData_p threadData, SSL *ssl) : | 10 | +Client::Client(int fd, ThreadData_p threadData, SSL *ssl, const GlobalSettings &settings) : |
| 11 | fd(fd), | 11 | fd(fd), |
| 12 | ssl(ssl), | 12 | ssl(ssl), |
| 13 | - readbuf(CLIENT_BUFFER_SIZE), | ||
| 14 | - writebuf(CLIENT_BUFFER_SIZE), | 13 | + initialBufferSize(settings.clientInitialBufferSize), // The client is constructed in the main thread, so we need to use its settings copy |
| 14 | + maxPacketSize(settings.maxPacketSize), // Same as initialBufferSize comment. | ||
| 15 | + readbuf(initialBufferSize), | ||
| 16 | + writebuf(initialBufferSize), | ||
| 15 | threadData(threadData) | 17 | threadData(threadData) |
| 16 | { | 18 | { |
| 17 | int flags = fcntl(fd, F_GETFL); | 19 | int flags = fcntl(fd, F_GETFL); |
| @@ -206,7 +208,7 @@ bool Client::readFdIntoBuffer() | @@ -206,7 +208,7 @@ bool Client::readFdIntoBuffer() | ||
| 206 | // Make sure we either always have enough space for a next call of this method, or stop reading the fd. | 208 | // Make sure we either always have enough space for a next call of this method, or stop reading the fd. |
| 207 | if (readbuf.freeSpace() == 0) | 209 | if (readbuf.freeSpace() == 0) |
| 208 | { | 210 | { |
| 209 | - if (readbuf.getSize() * 2 < MAX_PACKET_SIZE) | 211 | + if (readbuf.getSize() * 2 < maxPacketSize) |
| 210 | { | 212 | { |
| 211 | readbuf.doubleSize(); | 213 | readbuf.doubleSize(); |
| 212 | } | 214 | } |
| @@ -236,7 +238,7 @@ void Client::writeMqttPacket(const MqttPacket &packet) | @@ -236,7 +238,7 @@ void Client::writeMqttPacket(const MqttPacket &packet) | ||
| 236 | 238 | ||
| 237 | // We have to allow big packets, yet don't allow a slow loris subscriber to grow huge write buffers. This | 239 | // We have to allow big packets, yet don't allow a slow loris subscriber to grow huge write buffers. This |
| 238 | // could be enhanced a lot, but it's a start. | 240 | // could be enhanced a lot, but it's a start. |
| 239 | - const uint32_t growBufMaxTo = std::min<int>(packet.getSizeIncludingNonPresentHeader() * 1000, MAX_PACKET_SIZE); | 241 | + const uint32_t growBufMaxTo = std::min<int>(packet.getSizeIncludingNonPresentHeader() * 1000, maxPacketSize); |
| 240 | 242 | ||
| 241 | // Grow as far as we can. We have to make room for one MQTT packet. | 243 | // Grow as far as we can. We have to make room for one MQTT packet. |
| 242 | while (packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace() && writebuf.getSize() < growBufMaxTo) | 244 | while (packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace() && writebuf.getSize() < growBufMaxTo) |
| @@ -439,13 +441,13 @@ bool Client::writeBufIntoFd() | @@ -439,13 +441,13 @@ bool Client::writeBufIntoFd() | ||
| 439 | if (!bufferHasData) | 441 | if (!bufferHasData) |
| 440 | { | 442 | { |
| 441 | writeBufIsZeroCount++; | 443 | writeBufIsZeroCount++; |
| 442 | - bool doReset = (writeBufIsZeroCount >= 10 && writebuf.getSize() > (MAX_PACKET_SIZE / 10) && writebuf.bufferLastResizedSecondsAgo() > 30); | 444 | + bool doReset = (writeBufIsZeroCount >= 10 && writebuf.getSize() > (maxPacketSize / 10) && writebuf.bufferLastResizedSecondsAgo() > 30); |
| 443 | doReset |= (writeBufIsZeroCount >= 100 && writebuf.bufferLastResizedSecondsAgo() > 300); | 445 | doReset |= (writeBufIsZeroCount >= 100 && writebuf.bufferLastResizedSecondsAgo() > 300); |
| 444 | 446 | ||
| 445 | if (doReset) | 447 | if (doReset) |
| 446 | { | 448 | { |
| 447 | writeBufIsZeroCount = 0; | 449 | writeBufIsZeroCount = 0; |
| 448 | - writebuf.resetSize(CLIENT_BUFFER_SIZE); | 450 | + writebuf.resetSize(initialBufferSize); |
| 449 | } | 451 | } |
| 450 | } | 452 | } |
| 451 | 453 | ||
| @@ -564,13 +566,13 @@ bool Client::bufferToMqttPackets(std::vector<MqttPacket> &packetQueueIn, Client_ | @@ -564,13 +566,13 @@ bool Client::bufferToMqttPackets(std::vector<MqttPacket> &packetQueueIn, Client_ | ||
| 564 | if (readbuf.usedBytes() == 0) | 566 | if (readbuf.usedBytes() == 0) |
| 565 | { | 567 | { |
| 566 | readBufIsZeroCount++; | 568 | readBufIsZeroCount++; |
| 567 | - bool doReset = (readBufIsZeroCount >= 10 && readbuf.getSize() > (MAX_PACKET_SIZE / 10) && readbuf.bufferLastResizedSecondsAgo() > 30); | 569 | + bool doReset = (readBufIsZeroCount >= 10 && readbuf.getSize() > (maxPacketSize / 10) && readbuf.bufferLastResizedSecondsAgo() > 30); |
| 568 | doReset |= (readBufIsZeroCount >= 100 && readbuf.bufferLastResizedSecondsAgo() > 300); | 570 | doReset |= (readBufIsZeroCount >= 100 && readbuf.bufferLastResizedSecondsAgo() > 300); |
| 569 | 571 | ||
| 570 | if (doReset) | 572 | if (doReset) |
| 571 | { | 573 | { |
| 572 | readBufIsZeroCount = 0; | 574 | readBufIsZeroCount = 0; |
| 573 | - readbuf.resetSize(CLIENT_BUFFER_SIZE); | 575 | + readbuf.resetSize(initialBufferSize); |
| 574 | } | 576 | } |
| 575 | } | 577 | } |
| 576 | 578 |
client.h
| @@ -21,8 +21,6 @@ | @@ -21,8 +21,6 @@ | ||
| 21 | #include <openssl/ssl.h> | 21 | #include <openssl/ssl.h> |
| 22 | #include <openssl/err.h> | 22 | #include <openssl/err.h> |
| 23 | 23 | ||
| 24 | -#define CLIENT_BUFFER_SIZE 1024 // Must be power of 2 | ||
| 25 | -#define MAX_PACKET_SIZE 268435461 // 256 MB + 5 | ||
| 26 | #define MQTT_HEADER_LENGH 2 | 24 | #define MQTT_HEADER_LENGH 2 |
| 27 | 25 | ||
| 28 | #define OPENSSL_ERROR_STRING_SIZE 256 // OpenSSL requires at least 256. | 26 | #define OPENSSL_ERROR_STRING_SIZE 256 // OpenSSL requires at least 256. |
| @@ -64,6 +62,9 @@ class Client | @@ -64,6 +62,9 @@ class Client | ||
| 64 | bool sslWriteWantsRead = false; | 62 | bool sslWriteWantsRead = false; |
| 65 | ProtocolVersion protocolVersion = ProtocolVersion::None; | 63 | ProtocolVersion protocolVersion = ProtocolVersion::None; |
| 66 | 64 | ||
| 65 | + const size_t initialBufferSize = 0; | ||
| 66 | + const size_t maxPacketSize = 0; | ||
| 67 | + | ||
| 67 | CirBuf readbuf; | 68 | CirBuf readbuf; |
| 68 | uint8_t readBufIsZeroCount = 0; | 69 | uint8_t readBufIsZeroCount = 0; |
| 69 | 70 | ||
| @@ -101,7 +102,7 @@ class Client | @@ -101,7 +102,7 @@ class Client | ||
| 101 | void setReadyForReading(bool val); | 102 | void setReadyForReading(bool val); |
| 102 | 103 | ||
| 103 | public: | 104 | public: |
| 104 | - Client(int fd, ThreadData_p threadData, SSL *ssl); | 105 | + Client(int fd, ThreadData_p threadData, SSL *ssl, const GlobalSettings &settings); |
| 105 | Client(const Client &other) = delete; | 106 | Client(const Client &other) = delete; |
| 106 | Client(Client &&other) = delete; | 107 | Client(Client &&other) = delete; |
| 107 | ~Client(); | 108 | ~Client(); |
configfileparser.cpp
| @@ -99,6 +99,8 @@ ConfigFileParser::ConfigFileParser(const std::string &path) : | @@ -99,6 +99,8 @@ ConfigFileParser::ConfigFileParser(const std::string &path) : | ||
| 99 | validKeys.insert("fullchain"); | 99 | validKeys.insert("fullchain"); |
| 100 | validKeys.insert("privkey"); | 100 | validKeys.insert("privkey"); |
| 101 | validKeys.insert("allow_unsafe_clientid_chars"); | 101 | validKeys.insert("allow_unsafe_clientid_chars"); |
| 102 | + validKeys.insert("client_initial_buffer_size"); | ||
| 103 | + validKeys.insert("max_packet_size"); | ||
| 102 | } | 104 | } |
| 103 | 105 | ||
| 104 | void ConfigFileParser::loadFile(bool test) | 106 | void ConfigFileParser::loadFile(bool test) |
| @@ -232,8 +234,30 @@ void ConfigFileParser::loadFile(bool test) | @@ -232,8 +234,30 @@ void ConfigFileParser::loadFile(bool test) | ||
| 232 | sslListenPort = sslListenPortNew; | 234 | sslListenPort = sslListenPortNew; |
| 233 | } | 235 | } |
| 234 | 236 | ||
| 237 | + if (key == "client_initial_buffer_size") | ||
| 238 | + { | ||
| 239 | + int newVal = std::stoi(value); | ||
| 240 | + if (!isPowerOfTwo(newVal)) | ||
| 241 | + throw ConfigFileException("client_initial_buffer_size value " + value + " is not a power of two."); | ||
| 242 | + if (!test) | ||
| 243 | + clientInitialBufferSize = newVal; | ||
| 244 | + } | ||
| 245 | + | ||
| 246 | + if (key == "max_packet_size") | ||
| 247 | + { | ||
| 248 | + int newVal = std::stoi(value); | ||
| 249 | + if (newVal > ABSOLUTE_MAX_PACKET_SIZE) | ||
| 250 | + { | ||
| 251 | + std::ostringstream oss; | ||
| 252 | + oss << "Value for max_packet_size " << newVal << "is higher than absolute maximum " << ABSOLUTE_MAX_PACKET_SIZE; | ||
| 253 | + throw ConfigFileException(oss.str()); | ||
| 254 | + } | ||
| 255 | + if (!test) | ||
| 256 | + maxPacketSize = newVal; | ||
| 257 | + } | ||
| 258 | + | ||
| 235 | } | 259 | } |
| 236 | - catch (std::invalid_argument &ex) | 260 | + catch (std::invalid_argument &ex) // catch for the stoi() |
| 237 | { | 261 | { |
| 238 | throw ConfigFileException(ex.what()); | 262 | throw ConfigFileException(ex.what()); |
| 239 | } | 263 | } |
configfileparser.h
| @@ -9,6 +9,8 @@ | @@ -9,6 +9,8 @@ | ||
| 9 | 9 | ||
| 10 | #include "sslctxmanager.h" | 10 | #include "sslctxmanager.h" |
| 11 | 11 | ||
| 12 | +#define ABSOLUTE_MAX_PACKET_SIZE 268435461 // 256 MB + 5 | ||
| 13 | + | ||
| 12 | struct mosquitto_auth_opt | 14 | struct mosquitto_auth_opt |
| 13 | { | 15 | { |
| 14 | char *key = nullptr; | 16 | char *key = nullptr; |
| @@ -55,6 +57,8 @@ public: | @@ -55,6 +57,8 @@ public: | ||
| 55 | uint listenPort = 1883; | 57 | uint listenPort = 1883; |
| 56 | uint sslListenPort = 0; | 58 | uint sslListenPort = 0; |
| 57 | bool allowUnsafeClientidChars = false; | 59 | bool allowUnsafeClientidChars = false; |
| 60 | + int clientInitialBufferSize = 1024; // Must be power of 2 | ||
| 61 | + int maxPacketSize = 268435461; // 256 MB + 5 | ||
| 58 | }; | 62 | }; |
| 59 | 63 | ||
| 60 | #endif // CONFIGFILEPARSER_H | 64 | #endif // CONFIGFILEPARSER_H |
globalsettings.h
| @@ -5,5 +5,7 @@ | @@ -5,5 +5,7 @@ | ||
| 5 | struct GlobalSettings | 5 | struct GlobalSettings |
| 6 | { | 6 | { |
| 7 | bool allow_unsafe_clientid_chars = false; | 7 | bool allow_unsafe_clientid_chars = false; |
| 8 | + int clientInitialBufferSize = 0; | ||
| 9 | + int maxPacketSize = 0; | ||
| 8 | }; | 10 | }; |
| 9 | #endif // GLOBALSETTINGS_H | 11 | #endif // GLOBALSETTINGS_H |
mainapp.cpp
| @@ -417,7 +417,7 @@ void MainApp::start() | @@ -417,7 +417,7 @@ void MainApp::start() | ||
| 417 | SSL_set_fd(clientSSL, fd); | 417 | SSL_set_fd(clientSSL, fd); |
| 418 | } | 418 | } |
| 419 | 419 | ||
| 420 | - Client_p client(new Client(fd, thread_data, clientSSL)); | 420 | + Client_p client(new Client(fd, thread_data, clientSSL, settings)); |
| 421 | thread_data->giveClient(client); | 421 | thread_data->giveClient(client); |
| 422 | } | 422 | } |
| 423 | else if (cur_fd == taskEventFd) | 423 | else if (cur_fd == taskEventFd) |
| @@ -485,6 +485,8 @@ void MainApp::loadConfig() | @@ -485,6 +485,8 @@ void MainApp::loadConfig() | ||
| 485 | } | 485 | } |
| 486 | 486 | ||
| 487 | settings.allow_unsafe_clientid_chars = confFileParser->allowUnsafeClientidChars; | 487 | settings.allow_unsafe_clientid_chars = confFileParser->allowUnsafeClientidChars; |
| 488 | + settings.clientInitialBufferSize = confFileParser->clientInitialBufferSize; | ||
| 489 | + settings.maxPacketSize = confFileParser->maxPacketSize; | ||
| 488 | 490 | ||
| 489 | setCertAndKeyFromConfig(); | 491 | setCertAndKeyFromConfig(); |
| 490 | 492 |
utils.cpp
| @@ -221,3 +221,8 @@ bool stringTruthiness(const std::string &val) | @@ -221,3 +221,8 @@ bool stringTruthiness(const std::string &val) | ||
| 221 | return false; | 221 | return false; |
| 222 | throw ConfigFileException("Value '" + val + "' can't be converted to boolean"); | 222 | throw ConfigFileException("Value '" + val + "' can't be converted to boolean"); |
| 223 | } | 223 | } |
| 224 | + | ||
| 225 | +bool isPowerOfTwo(int n) | ||
| 226 | +{ | ||
| 227 | + return (n != 0) && (n & (n - 1)) == 0; | ||
| 228 | +} |
utils.h
| @@ -41,5 +41,6 @@ int64_t currentMSecsSinceEpoch(); | @@ -41,5 +41,6 @@ int64_t currentMSecsSinceEpoch(); | ||
| 41 | std::string getSecureRandomString(const size_t len); | 41 | std::string getSecureRandomString(const size_t len); |
| 42 | std::string str_tolower(std::string s); | 42 | std::string str_tolower(std::string s); |
| 43 | bool stringTruthiness(const std::string &val); | 43 | bool stringTruthiness(const std::string &val); |
| 44 | +bool isPowerOfTwo(int val); | ||
| 44 | 45 | ||
| 45 | #endif // UTILS_H | 46 | #endif // UTILS_H |