diff --git a/client.cpp b/client.cpp index 35fef94..64b11d1 100644 --- a/client.cpp +++ b/client.cpp @@ -195,17 +195,7 @@ int Client::writeMqttPacket(const MqttPacket &packet, const char qos) return 0; } - writebuf.ensureFreeSpace(packet.getSizeIncludingNonPresentHeader()); - - if (!packet.containsFixedHeader()) - { - writebuf.headPtr()[0] = packet.getFirstByte(); - writebuf.advanceHead(1); - RemainingLength r = packet.getRemainingLength(); - writebuf.write(r.bytes, r.len); - } - - writebuf.write(packet.getBites().data(), packet.getBites().size()); + packet.readIntoBuf(writebuf); if (packet.packetType == PacketType::DISCONNECT) setReadyForDisconnect(); @@ -396,57 +386,10 @@ void Client::setReadyForReading(bool val) } } -bool Client::bufferToMqttPackets(std::vector &packetQueueIn, std::shared_ptr &sender) +void Client::bufferToMqttPackets(std::vector &packetQueueIn, std::shared_ptr &sender) { - while (readbuf.usedBytes() >= MQTT_HEADER_LENGH) - { - // Determine the packet length by decoding the variable length - int remaining_length_i = 1; // index of 'remaining length' field is one after start. - uint fixed_header_length = 1; - size_t multiplier = 1; - size_t packet_length = 0; - unsigned char encodedByte = 0; - do - { - fixed_header_length++; - - if (fixed_header_length > 5) - throw ProtocolError("Packet signifies more than 5 bytes in variable length header. Invalid."); - - // This happens when you only don't have all the bytes that specify the remaining length. - if (fixed_header_length > readbuf.usedBytes()) - return false; - - encodedByte = readbuf.peakAhead(remaining_length_i++); - packet_length += (encodedByte & 127) * multiplier; - multiplier *= 128; - if (multiplier > 128*128*128*128) - throw ProtocolError("Malformed Remaining Length."); - } - while ((encodedByte & 128) != 0); - packet_length += fixed_header_length; - - if (!authenticated && packet_length >= 1024*1024) - { - throw ProtocolError("An unauthenticated client sends a packet of 1 MB or bigger? Probably it's just random bytes."); - } - - if (packet_length > ABSOLUTE_MAX_PACKET_SIZE) - { - throw ProtocolError("A client sends a packet claiming to be bigger than the maximum MQTT allows."); - } - - if (packet_length <= readbuf.usedBytes()) - { - packetQueueIn.emplace_back(readbuf, packet_length, fixed_header_length, sender); - } - else - break; - } - + MqttPacket::bufferToMqttPackets(readbuf, packetQueueIn, sender); setReadyForReading(readbuf.freeSpace() > 0); - - return true; } void Client::setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, bool cleanSession) diff --git a/client.h b/client.h index 986fbe3..8510233 100644 --- a/client.h +++ b/client.h @@ -104,7 +104,7 @@ public: void startOrContinueSslAccept(); void markAsDisconnecting(); bool readFdIntoBuffer(); - bool bufferToMqttPackets(std::vector &packetQueueIn, std::shared_ptr &sender); + void bufferToMqttPackets(std::vector &packetQueueIn, std::shared_ptr &sender); void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, bool cleanSession); void setWill(const std::string &topic, const std::string &payload, bool retain, char qos); void clearWill(); diff --git a/mqttpacket.cpp b/mqttpacket.cpp index ef228a1..4c8dc6c 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -181,6 +181,55 @@ MqttPacket::MqttPacket(const PubRel &pubRel) : pubCommonConstruct(pubRel.packet_id, PacketType::PUBREL, 0b0010); } +void MqttPacket::bufferToMqttPackets(CirBuf &buf, std::vector &packetQueueIn, std::shared_ptr &sender) +{ + while (buf.usedBytes() >= MQTT_HEADER_LENGH) + { + // Determine the packet length by decoding the variable length + int remaining_length_i = 1; // index of 'remaining length' field is one after start. + uint fixed_header_length = 1; + size_t multiplier = 1; + size_t packet_length = 0; + unsigned char encodedByte = 0; + do + { + fixed_header_length++; + + if (fixed_header_length > 5) + throw ProtocolError("Packet signifies more than 5 bytes in variable length header. Invalid."); + + // This happens when you only don't have all the bytes that specify the remaining length. + if (fixed_header_length > buf.usedBytes()) + return; + + encodedByte = buf.peakAhead(remaining_length_i++); + packet_length += (encodedByte & 127) * multiplier; + multiplier *= 128; + if (multiplier > 128*128*128*128) + throw ProtocolError("Malformed Remaining Length."); + } + while ((encodedByte & 128) != 0); + packet_length += fixed_header_length; + + if (sender && !sender->getAuthenticated() && packet_length >= 1024*1024) + { + throw ProtocolError("An unauthenticated client sends a packet of 1 MB or bigger? Probably it's just random bytes."); + } + + if (packet_length > ABSOLUTE_MAX_PACKET_SIZE) + { + throw ProtocolError("A client sends a packet claiming to be bigger than the maximum MQTT allows."); + } + + if (packet_length <= buf.usedBytes()) + { + packetQueueIn.emplace_back(buf, packet_length, fixed_header_length, sender); + } + else + break; + } +} + void MqttPacket::handle() { if (packetType == PacketType::Reserved) @@ -838,6 +887,23 @@ size_t MqttPacket::remainingAfterPos() } +void MqttPacket::readIntoBuf(CirBuf &buf) const +{ + buf.ensureFreeSpace(getSizeIncludingNonPresentHeader()); + + if (!containsFixedHeader()) + { + assert(remainingLength.len > 0); + + buf.headPtr()[0] = getFirstByte(); + buf.advanceHead(1); + buf.write(remainingLength.bytes, remainingLength.len); + } + + buf.write(bites.data(), bites.size()); +} + + diff --git a/mqttpacket.h b/mqttpacket.h index cae205e..3eb4fd9 100644 --- a/mqttpacket.h +++ b/mqttpacket.h @@ -92,6 +92,8 @@ public: MqttPacket(const PubComp &pubComp); MqttPacket(const PubRel &pubRel); + static void bufferToMqttPackets(CirBuf &buf, std::vector &packetQueueIn, std::shared_ptr &sender); + void handle(); void handleConnect(); void handleDisconnect(); @@ -119,6 +121,7 @@ public: void setDuplicate(); size_t getTotalMemoryFootprint(); std::string getPayloadCopy(); + void readIntoBuf(CirBuf &buf) const; }; #endif // MQTTPACKET_H