Commit ce161b1f6b133d707c01707a7abfe5c5d57f7232
1 parent
3d565018
Incoming and outgoing max packet size
Showing
5 changed files
with
46 additions
and
16 deletions
client.cpp
| @@ -31,7 +31,8 @@ Client::Client(int fd, std::shared_ptr<ThreadData> threadData, SSL *ssl, bool we | @@ -31,7 +31,8 @@ Client::Client(int fd, std::shared_ptr<ThreadData> threadData, SSL *ssl, bool we | ||
| 31 | fd(fd), | 31 | fd(fd), |
| 32 | fuzzMode(fuzzMode), | 32 | fuzzMode(fuzzMode), |
| 33 | initialBufferSize(settings->clientInitialBufferSize), // The client is constructed in the main thread, so we need to use its settings copy | 33 | initialBufferSize(settings->clientInitialBufferSize), // The client is constructed in the main thread, so we need to use its settings copy |
| 34 | - maxPacketSize(settings->maxPacketSize), // Same as initialBufferSize comment. | 34 | + maxOutgoingPacketSize(settings->maxPacketSize), // Same as initialBufferSize comment. |
| 35 | + maxIncomingPacketSize(settings->maxPacketSize), | ||
| 35 | ioWrapper(ssl, websocket, initialBufferSize, this), | 36 | ioWrapper(ssl, websocket, initialBufferSize, this), |
| 36 | readbuf(initialBufferSize), | 37 | readbuf(initialBufferSize), |
| 37 | writebuf(initialBufferSize), | 38 | writebuf(initialBufferSize), |
| @@ -141,7 +142,7 @@ bool Client::readFdIntoBuffer() | @@ -141,7 +142,7 @@ bool Client::readFdIntoBuffer() | ||
| 141 | // Make sure we either always have enough space for a next call of this method, or stop reading the fd. | 142 | // Make sure we either always have enough space for a next call of this method, or stop reading the fd. |
| 142 | if (readbuf.freeSpace() == 0) | 143 | if (readbuf.freeSpace() == 0) |
| 143 | { | 144 | { |
| 144 | - if (readbuf.getSize() * 2 < maxPacketSize) | 145 | + if (readbuf.getSize() * 2 < this->maxIncomingPacketSize) |
| 145 | { | 146 | { |
| 146 | readbuf.doubleSize(); | 147 | readbuf.doubleSize(); |
| 147 | } | 148 | } |
| @@ -181,18 +182,27 @@ void Client::writeText(const std::string &text) | @@ -181,18 +182,27 @@ void Client::writeText(const std::string &text) | ||
| 181 | 182 | ||
| 182 | int Client::writeMqttPacket(const MqttPacket &packet) | 183 | int Client::writeMqttPacket(const MqttPacket &packet) |
| 183 | { | 184 | { |
| 185 | + const size_t packetSize = packet.getSizeIncludingNonPresentHeader(); | ||
| 186 | + | ||
| 187 | + // "Where a Packet is too large to send, the Server MUST discard it without sending it and then behave as if it had completed | ||
| 188 | + // sending that Application Message [MQTT-3.1.2-25]." | ||
| 189 | + if (packetSize > this->maxOutgoingPacketSize) | ||
| 190 | + { | ||
| 191 | + return 0; | ||
| 192 | + } | ||
| 193 | + | ||
| 184 | std::lock_guard<std::mutex> locker(writeBufMutex); | 194 | std::lock_guard<std::mutex> locker(writeBufMutex); |
| 185 | 195 | ||
| 186 | // We have to allow big packets, yet don't allow a slow loris subscriber to grow huge write buffers. This | 196 | // We have to allow big packets, yet don't allow a slow loris subscriber to grow huge write buffers. This |
| 187 | // could be enhanced a lot, but it's a start. | 197 | // could be enhanced a lot, but it's a start. |
| 188 | - const uint32_t growBufMaxTo = std::min<int>(packet.getSizeIncludingNonPresentHeader() * 1000, maxPacketSize); | 198 | + const uint32_t growBufMaxTo = std::min<int>(packetSize * 1000, this->maxOutgoingPacketSize); |
| 189 | 199 | ||
| 190 | // Grow as far as we can. We have to make room for one MQTT packet. | 200 | // Grow as far as we can. We have to make room for one MQTT packet. |
| 191 | - writebuf.ensureFreeSpace(packet.getSizeIncludingNonPresentHeader(), growBufMaxTo); | 201 | + writebuf.ensureFreeSpace(packetSize, growBufMaxTo); |
| 192 | 202 | ||
| 193 | // And drop a publish when it doesn't fit, even after resizing. This means we do allow pings. And | 203 | // And drop a publish when it doesn't fit, even after resizing. This means we do allow pings. And |
| 194 | // QoS packet are queued and limited elsewhere. | 204 | // QoS packet are queued and limited elsewhere. |
| 195 | - if (packet.packetType == PacketType::PUBLISH && packet.getQos() == 0 && packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) | 205 | + if (packet.packetType == PacketType::PUBLISH && packet.getQos() == 0 && packetSize > writebuf.freeSpace()) |
| 196 | { | 206 | { |
| 197 | return 0; | 207 | return 0; |
| 198 | } | 208 | } |
| @@ -371,6 +381,16 @@ const std::string &Client::getTopicAlias(const uint16_t id) | @@ -371,6 +381,16 @@ const std::string &Client::getTopicAlias(const uint16_t id) | ||
| 371 | return this->incomingTopicAliases[id]; | 381 | return this->incomingTopicAliases[id]; |
| 372 | } | 382 | } |
| 373 | 383 | ||
| 384 | +/** | ||
| 385 | + * @brief We use this for doing the checks on client traffic, as opposed to using settings.maxPacketSize, because the latter than change on config reload, | ||
| 386 | + * possibly resulting in exceeding what the other side uses as maximum. | ||
| 387 | + * @return | ||
| 388 | + */ | ||
| 389 | +uint32_t Client::getMaxIncomingPacketSize() const | ||
| 390 | +{ | ||
| 391 | + return this->maxIncomingPacketSize; | ||
| 392 | +} | ||
| 393 | + | ||
| 374 | #ifndef NDEBUG | 394 | #ifndef NDEBUG |
| 375 | /** | 395 | /** |
| 376 | * @brief IoWrapper::setFakeUpgraded(). | 396 | * @brief IoWrapper::setFakeUpgraded(). |
| @@ -462,14 +482,14 @@ void Client::setClientProperties(ProtocolVersion protocolVersion, const std::str | @@ -462,14 +482,14 @@ void Client::setClientProperties(ProtocolVersion protocolVersion, const std::str | ||
| 462 | 482 | ||
| 463 | 483 | ||
| 464 | void Client::setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, | 484 | void Client::setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, |
| 465 | - uint32_t maxPacketSize, uint16_t maxOutgoingTopicAliasValue) | 485 | + uint32_t maxOutgoingPacketSize, uint16_t maxOutgoingTopicAliasValue) |
| 466 | { | 486 | { |
| 467 | this->protocolVersion = protocolVersion; | 487 | this->protocolVersion = protocolVersion; |
| 468 | this->clientid = clientId; | 488 | this->clientid = clientId; |
| 469 | this->username = username; | 489 | this->username = username; |
| 470 | this->connectPacketSeen = connectPacketSeen; | 490 | this->connectPacketSeen = connectPacketSeen; |
| 471 | this->keepalive = keepalive; | 491 | this->keepalive = keepalive; |
| 472 | - this->maxPacketSize = maxPacketSize; | 492 | + this->maxOutgoingPacketSize = maxOutgoingPacketSize; |
| 473 | this->maxOutgoingTopicAliasValue = maxOutgoingTopicAliasValue; | 493 | this->maxOutgoingTopicAliasValue = maxOutgoingTopicAliasValue; |
| 474 | } | 494 | } |
| 475 | 495 |
client.h
| @@ -52,7 +52,9 @@ class Client | @@ -52,7 +52,9 @@ class Client | ||
| 52 | ProtocolVersion protocolVersion = ProtocolVersion::None; | 52 | ProtocolVersion protocolVersion = ProtocolVersion::None; |
| 53 | 53 | ||
| 54 | const size_t initialBufferSize = 0; | 54 | const size_t initialBufferSize = 0; |
| 55 | - uint32_t maxPacketSize = 0; | 55 | + uint32_t maxOutgoingPacketSize; |
| 56 | + const uint32_t maxIncomingPacketSize; | ||
| 57 | + | ||
| 56 | uint16_t maxOutgoingTopicAliasValue = 0; | 58 | uint16_t maxOutgoingTopicAliasValue = 0; |
| 57 | 59 | ||
| 58 | IoWrapper ioWrapper; | 60 | IoWrapper ioWrapper; |
| @@ -111,7 +113,7 @@ public: | @@ -111,7 +113,7 @@ public: | ||
| 111 | void bufferToMqttPackets(std::vector<MqttPacket> &packetQueueIn, std::shared_ptr<Client> &sender); | 113 | void bufferToMqttPackets(std::vector<MqttPacket> &packetQueueIn, std::shared_ptr<Client> &sender); |
| 112 | void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive); | 114 | void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive); |
| 113 | void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, | 115 | void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, |
| 114 | - uint32_t maxPacketSize, uint16_t maxOutgoingTopicAliasValue); | 116 | + uint32_t maxOutgoingPacketSize, uint16_t maxOutgoingTopicAliasValue); |
| 115 | void setWill(const std::string &topic, const std::string &payload, bool retain, char qos); | 117 | void setWill(const std::string &topic, const std::string &payload, bool retain, char qos); |
| 116 | void setWill(Publish &&willPublish); | 118 | void setWill(Publish &&willPublish); |
| 117 | void clearWill(); | 119 | void clearWill(); |
| @@ -145,6 +147,8 @@ public: | @@ -145,6 +147,8 @@ public: | ||
| 145 | void setTopicAlias(const uint16_t alias_id, const std::string &topic); | 147 | void setTopicAlias(const uint16_t alias_id, const std::string &topic); |
| 146 | const std::string &getTopicAlias(const uint16_t id); | 148 | const std::string &getTopicAlias(const uint16_t id); |
| 147 | 149 | ||
| 150 | + uint32_t getMaxIncomingPacketSize() const; | ||
| 151 | + | ||
| 148 | #ifndef NDEBUG | 152 | #ifndef NDEBUG |
| 149 | void setFakeUpgraded(); | 153 | void setFakeUpgraded(); |
| 150 | #endif | 154 | #endif |
configfileparser.h
| @@ -30,8 +30,6 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>. | @@ -30,8 +30,6 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>. | ||
| 30 | #include "listener.h" | 30 | #include "listener.h" |
| 31 | #include "settings.h" | 31 | #include "settings.h" |
| 32 | 32 | ||
| 33 | -#define ABSOLUTE_MAX_PACKET_SIZE 268435461 // 256 MB + 5 | ||
| 34 | - | ||
| 35 | enum class ConfigParseLevel | 33 | enum class ConfigParseLevel |
| 36 | { | 34 | { |
| 37 | Root, | 35 | Root, |
mqttpacket.cpp
| @@ -31,6 +31,12 @@ MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_lengt | @@ -31,6 +31,12 @@ MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_lengt | ||
| 31 | sender(sender) | 31 | sender(sender) |
| 32 | { | 32 | { |
| 33 | assert(packet_len > 0); | 33 | assert(packet_len > 0); |
| 34 | + | ||
| 35 | + if (packet_len > sender->getMaxIncomingPacketSize()) | ||
| 36 | + { | ||
| 37 | + throw ProtocolError("Incoming packet size exceeded. TODO: DISCONNECT WITH CODE 0x95"); | ||
| 38 | + } | ||
| 39 | + | ||
| 34 | buf.read(bites.data(), packet_len); | 40 | buf.read(bites.data(), packet_len); |
| 35 | 41 | ||
| 36 | protocolVersion = sender->getProtocolVersion(); | 42 | protocolVersion = sender->getProtocolVersion(); |
| @@ -338,7 +344,7 @@ void MqttPacket::handleConnect() | @@ -338,7 +344,7 @@ void MqttPacket::handleConnect() | ||
| 338 | 344 | ||
| 339 | uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient; | 345 | uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient; |
| 340 | uint32_t session_expire = settings.expireSessionsAfterSeconds > 0 ? settings.expireSessionsAfterSeconds : std::numeric_limits<uint32_t>::max(); | 346 | uint32_t session_expire = settings.expireSessionsAfterSeconds > 0 ? settings.expireSessionsAfterSeconds : std::numeric_limits<uint32_t>::max(); |
| 341 | - uint32_t max_packet_size = settings.maxPacketSize; | 347 | + uint32_t max_outgoing_packet_size = settings.maxPacketSize; |
| 342 | uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases | 348 | uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases |
| 343 | bool request_response_information = false; | 349 | bool request_response_information = false; |
| 344 | bool request_problem_information = false; | 350 | bool request_problem_information = false; |
| @@ -361,7 +367,7 @@ void MqttPacket::handleConnect() | @@ -361,7 +367,7 @@ void MqttPacket::handleConnect() | ||
| 361 | max_qos_packets = std::min<int16_t>(readTwoBytesToUInt16(), max_qos_packets); | 367 | max_qos_packets = std::min<int16_t>(readTwoBytesToUInt16(), max_qos_packets); |
| 362 | break; | 368 | break; |
| 363 | case Mqtt5Properties::MaximumPacketSize: | 369 | case Mqtt5Properties::MaximumPacketSize: |
| 364 | - max_packet_size = std::min<uint32_t>(readFourBytesToUint32(), max_packet_size); | 370 | + max_outgoing_packet_size = std::min<uint32_t>(readFourBytesToUint32(), max_outgoing_packet_size); |
| 365 | break; | 371 | break; |
| 366 | case Mqtt5Properties::TopicAliasMaximum: | 372 | case Mqtt5Properties::TopicAliasMaximum: |
| 367 | max_outgoing_topic_aliases = std::min<uint16_t>(readTwoBytesToUInt16(), settings.maxOutgoingTopicAliasValue); | 373 | max_outgoing_topic_aliases = std::min<uint16_t>(readTwoBytesToUInt16(), settings.maxOutgoingTopicAliasValue); |
| @@ -535,7 +541,7 @@ void MqttPacket::handleConnect() | @@ -535,7 +541,7 @@ void MqttPacket::handleConnect() | ||
| 535 | clientIdGenerated = true; | 541 | clientIdGenerated = true; |
| 536 | } | 542 | } |
| 537 | 543 | ||
| 538 | - sender->setClientProperties(protocolVersion, client_id, username, true, keep_alive, max_packet_size, max_outgoing_topic_aliases); | 544 | + sender->setClientProperties(protocolVersion, client_id, username, true, keep_alive, max_outgoing_packet_size, max_outgoing_topic_aliases); |
| 539 | 545 | ||
| 540 | if (will_flag) | 546 | if (will_flag) |
| 541 | sender->setWill(std::move(willpublish)); | 547 | sender->setWill(std::move(willpublish)); |
| @@ -573,7 +579,7 @@ void MqttPacket::handleConnect() | @@ -573,7 +579,7 @@ void MqttPacket::handleConnect() | ||
| 573 | connAck.propertyBuilder->writeSessionExpiry(session_expire); | 579 | connAck.propertyBuilder->writeSessionExpiry(session_expire); |
| 574 | connAck.propertyBuilder->writeReceiveMax(max_qos_packets); | 580 | connAck.propertyBuilder->writeReceiveMax(max_qos_packets); |
| 575 | connAck.propertyBuilder->writeRetainAvailable(1); | 581 | connAck.propertyBuilder->writeRetainAvailable(1); |
| 576 | - connAck.propertyBuilder->writeMaxPacketSize(max_packet_size); | 582 | + connAck.propertyBuilder->writeMaxPacketSize(sender->getMaxIncomingPacketSize()); |
| 577 | if (clientIdGenerated) | 583 | if (clientIdGenerated) |
| 578 | connAck.propertyBuilder->writeAssignedClientId(client_id); | 584 | connAck.propertyBuilder->writeAssignedClientId(client_id); |
| 579 | connAck.propertyBuilder->writeMaxTopicAliases(settings.maxIncomingTopicAliasValue); | 585 | connAck.propertyBuilder->writeMaxTopicAliases(settings.maxIncomingTopicAliasValue); |
settings.h
| @@ -24,6 +24,8 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>. | @@ -24,6 +24,8 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>. | ||
| 24 | #include "mosquittoauthoptcompatwrap.h" | 24 | #include "mosquittoauthoptcompatwrap.h" |
| 25 | #include "listener.h" | 25 | #include "listener.h" |
| 26 | 26 | ||
| 27 | +#define ABSOLUTE_MAX_PACKET_SIZE 268435461 // 256 MB + 5 | ||
| 28 | + | ||
| 27 | class Settings | 29 | class Settings |
| 28 | { | 30 | { |
| 29 | friend class ConfigFileParser; | 31 | friend class ConfigFileParser; |
| @@ -41,7 +43,7 @@ public: | @@ -41,7 +43,7 @@ public: | ||
| 41 | bool authPluginSerializeInit = false; | 43 | bool authPluginSerializeInit = false; |
| 42 | bool authPluginSerializeAuthChecks = false; | 44 | bool authPluginSerializeAuthChecks = false; |
| 43 | int clientInitialBufferSize = 1024; // Must be power of 2 | 45 | int clientInitialBufferSize = 1024; // Must be power of 2 |
| 44 | - int maxPacketSize = 268435461; // 256 MB + 5 | 46 | + int maxPacketSize = ABSOLUTE_MAX_PACKET_SIZE; |
| 45 | uint16_t maxIncomingTopicAliasValue = 65535; | 47 | uint16_t maxIncomingTopicAliasValue = 65535; |
| 46 | uint16_t maxOutgoingTopicAliasValue = 65535; | 48 | uint16_t maxOutgoingTopicAliasValue = 65535; |
| 47 | #ifdef TESTING | 49 | #ifdef TESTING |