diff --git a/client.cpp b/client.cpp index d49f496..e4c84ee 100644 --- a/client.cpp +++ b/client.cpp @@ -31,7 +31,8 @@ Client::Client(int fd, std::shared_ptr threadData, SSL *ssl, bool we fd(fd), fuzzMode(fuzzMode), initialBufferSize(settings->clientInitialBufferSize), // The client is constructed in the main thread, so we need to use its settings copy - maxPacketSize(settings->maxPacketSize), // Same as initialBufferSize comment. + maxOutgoingPacketSize(settings->maxPacketSize), // Same as initialBufferSize comment. + maxIncomingPacketSize(settings->maxPacketSize), ioWrapper(ssl, websocket, initialBufferSize, this), readbuf(initialBufferSize), writebuf(initialBufferSize), @@ -141,7 +142,7 @@ bool Client::readFdIntoBuffer() // Make sure we either always have enough space for a next call of this method, or stop reading the fd. if (readbuf.freeSpace() == 0) { - if (readbuf.getSize() * 2 < maxPacketSize) + if (readbuf.getSize() * 2 < this->maxIncomingPacketSize) { readbuf.doubleSize(); } @@ -181,18 +182,27 @@ void Client::writeText(const std::string &text) int Client::writeMqttPacket(const MqttPacket &packet) { + const size_t packetSize = packet.getSizeIncludingNonPresentHeader(); + + // "Where a Packet is too large to send, the Server MUST discard it without sending it and then behave as if it had completed + // sending that Application Message [MQTT-3.1.2-25]." + if (packetSize > this->maxOutgoingPacketSize) + { + return 0; + } + std::lock_guard locker(writeBufMutex); // We have to allow big packets, yet don't allow a slow loris subscriber to grow huge write buffers. This // could be enhanced a lot, but it's a start. - const uint32_t growBufMaxTo = std::min(packet.getSizeIncludingNonPresentHeader() * 1000, maxPacketSize); + const uint32_t growBufMaxTo = std::min(packetSize * 1000, this->maxOutgoingPacketSize); // Grow as far as we can. We have to make room for one MQTT packet. - writebuf.ensureFreeSpace(packet.getSizeIncludingNonPresentHeader(), growBufMaxTo); + writebuf.ensureFreeSpace(packetSize, growBufMaxTo); // And drop a publish when it doesn't fit, even after resizing. This means we do allow pings. And // QoS packet are queued and limited elsewhere. - if (packet.packetType == PacketType::PUBLISH && packet.getQos() == 0 && packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) + if (packet.packetType == PacketType::PUBLISH && packet.getQos() == 0 && packetSize > writebuf.freeSpace()) { return 0; } @@ -371,6 +381,16 @@ const std::string &Client::getTopicAlias(const uint16_t id) return this->incomingTopicAliases[id]; } +/** + * @brief We use this for doing the checks on client traffic, as opposed to using settings.maxPacketSize, because the latter than change on config reload, + * possibly resulting in exceeding what the other side uses as maximum. + * @return + */ +uint32_t Client::getMaxIncomingPacketSize() const +{ + return this->maxIncomingPacketSize; +} + #ifndef NDEBUG /** * @brief IoWrapper::setFakeUpgraded(). @@ -462,14 +482,14 @@ void Client::setClientProperties(ProtocolVersion protocolVersion, const std::str void Client::setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, - uint32_t maxPacketSize, uint16_t maxOutgoingTopicAliasValue) + uint32_t maxOutgoingPacketSize, uint16_t maxOutgoingTopicAliasValue) { this->protocolVersion = protocolVersion; this->clientid = clientId; this->username = username; this->connectPacketSeen = connectPacketSeen; this->keepalive = keepalive; - this->maxPacketSize = maxPacketSize; + this->maxOutgoingPacketSize = maxOutgoingPacketSize; this->maxOutgoingTopicAliasValue = maxOutgoingTopicAliasValue; } diff --git a/client.h b/client.h index ece7ce7..30f1b2e 100644 --- a/client.h +++ b/client.h @@ -52,7 +52,9 @@ class Client ProtocolVersion protocolVersion = ProtocolVersion::None; const size_t initialBufferSize = 0; - uint32_t maxPacketSize = 0; + uint32_t maxOutgoingPacketSize; + const uint32_t maxIncomingPacketSize; + uint16_t maxOutgoingTopicAliasValue = 0; IoWrapper ioWrapper; @@ -111,7 +113,7 @@ public: void bufferToMqttPackets(std::vector &packetQueueIn, std::shared_ptr &sender); void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive); void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, - uint32_t maxPacketSize, uint16_t maxOutgoingTopicAliasValue); + uint32_t maxOutgoingPacketSize, uint16_t maxOutgoingTopicAliasValue); void setWill(const std::string &topic, const std::string &payload, bool retain, char qos); void setWill(Publish &&willPublish); void clearWill(); @@ -145,6 +147,8 @@ public: void setTopicAlias(const uint16_t alias_id, const std::string &topic); const std::string &getTopicAlias(const uint16_t id); + uint32_t getMaxIncomingPacketSize() const; + #ifndef NDEBUG void setFakeUpgraded(); #endif diff --git a/configfileparser.h b/configfileparser.h index 0cc9ac3..b534069 100644 --- a/configfileparser.h +++ b/configfileparser.h @@ -30,8 +30,6 @@ License along with FlashMQ. If not, see . #include "listener.h" #include "settings.h" -#define ABSOLUTE_MAX_PACKET_SIZE 268435461 // 256 MB + 5 - enum class ConfigParseLevel { Root, diff --git a/mqttpacket.cpp b/mqttpacket.cpp index ea2cb5a..f5d6762 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -31,6 +31,12 @@ MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_lengt sender(sender) { assert(packet_len > 0); + + if (packet_len > sender->getMaxIncomingPacketSize()) + { + throw ProtocolError("Incoming packet size exceeded. TODO: DISCONNECT WITH CODE 0x95"); + } + buf.read(bites.data(), packet_len); protocolVersion = sender->getProtocolVersion(); @@ -338,7 +344,7 @@ void MqttPacket::handleConnect() uint16_t max_qos_packets = settings.maxQosMsgPendingPerClient; uint32_t session_expire = settings.expireSessionsAfterSeconds > 0 ? settings.expireSessionsAfterSeconds : std::numeric_limits::max(); - uint32_t max_packet_size = settings.maxPacketSize; + uint32_t max_outgoing_packet_size = settings.maxPacketSize; uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases bool request_response_information = false; bool request_problem_information = false; @@ -361,7 +367,7 @@ void MqttPacket::handleConnect() max_qos_packets = std::min(readTwoBytesToUInt16(), max_qos_packets); break; case Mqtt5Properties::MaximumPacketSize: - max_packet_size = std::min(readFourBytesToUint32(), max_packet_size); + max_outgoing_packet_size = std::min(readFourBytesToUint32(), max_outgoing_packet_size); break; case Mqtt5Properties::TopicAliasMaximum: max_outgoing_topic_aliases = std::min(readTwoBytesToUInt16(), settings.maxOutgoingTopicAliasValue); @@ -535,7 +541,7 @@ void MqttPacket::handleConnect() clientIdGenerated = true; } - sender->setClientProperties(protocolVersion, client_id, username, true, keep_alive, max_packet_size, max_outgoing_topic_aliases); + sender->setClientProperties(protocolVersion, client_id, username, true, keep_alive, max_outgoing_packet_size, max_outgoing_topic_aliases); if (will_flag) sender->setWill(std::move(willpublish)); @@ -573,7 +579,7 @@ void MqttPacket::handleConnect() connAck.propertyBuilder->writeSessionExpiry(session_expire); connAck.propertyBuilder->writeReceiveMax(max_qos_packets); connAck.propertyBuilder->writeRetainAvailable(1); - connAck.propertyBuilder->writeMaxPacketSize(max_packet_size); + connAck.propertyBuilder->writeMaxPacketSize(sender->getMaxIncomingPacketSize()); if (clientIdGenerated) connAck.propertyBuilder->writeAssignedClientId(client_id); connAck.propertyBuilder->writeMaxTopicAliases(settings.maxIncomingTopicAliasValue); diff --git a/settings.h b/settings.h index e32abce..6202d18 100644 --- a/settings.h +++ b/settings.h @@ -24,6 +24,8 @@ License along with FlashMQ. If not, see . #include "mosquittoauthoptcompatwrap.h" #include "listener.h" +#define ABSOLUTE_MAX_PACKET_SIZE 268435461 // 256 MB + 5 + class Settings { friend class ConfigFileParser; @@ -41,7 +43,7 @@ public: bool authPluginSerializeInit = false; bool authPluginSerializeAuthChecks = false; int clientInitialBufferSize = 1024; // Must be power of 2 - int maxPacketSize = 268435461; // 256 MB + 5 + int maxPacketSize = ABSOLUTE_MAX_PACKET_SIZE; uint16_t maxIncomingTopicAliasValue = 65535; uint16_t maxOutgoingTopicAliasValue = 65535; #ifdef TESTING