diff --git a/FlashMQTests/tst_maintests.cpp b/FlashMQTests/tst_maintests.cpp index 958fbb5..13bf1c4 100644 --- a/FlashMQTests/tst_maintests.cpp +++ b/FlashMQTests/tst_maintests.cpp @@ -377,7 +377,7 @@ void MainTests::test_very_big_packet() testContext.publish(topic, payload); testContext.waitReceiverReceived(); - QVERIFY2(testContext.receivedMessages.count() == 1, "There must be one message in the received list"); + QCOMPARE(testContext.receivedMessages.count(), 1); QMQTT::Message msg = testContext.receivedMessages.first(); QCOMPARE(msg.payload(), payload); diff --git a/FlashMQTests/twoclienttestcontext.cpp b/FlashMQTests/twoclienttestcontext.cpp index 63b1be2..cd48472 100644 --- a/FlashMQTests/twoclienttestcontext.cpp +++ b/FlashMQTests/twoclienttestcontext.cpp @@ -3,12 +3,17 @@ #include #include +// TODO: port to QMqttClient that newer Qts now have? + TwoClientTestContext::TwoClientTestContext(QObject *parent) : QObject(parent) { QHostInfo targetHostInfo = QHostInfo::fromName("localhost"); QHostAddress targetHost(targetHostInfo.addresses().first()); sender.reset(new QMQTT::Client(targetHost)); receiver.reset(new QMQTT::Client(targetHost)); + + connect(sender.data(), &QMQTT::Client::error, this, &TwoClientTestContext::onClientError); + connect(receiver.data(), &QMQTT::Client::error, this, &TwoClientTestContext::onClientError); } void TwoClientTestContext::publish(const QString &topic, const QByteArray &payload, bool retain) @@ -64,6 +69,33 @@ void TwoClientTestContext::waitReceiverReceived() waiter.exec(); } +void TwoClientTestContext::onClientError(const QMQTT::ClientError error) +{ + const QMQTT::Client *_sender = sender.data(); + + // TODO: arg, doesn't qmqtt have a better way for this? + QString errStr = QString("unknown error"); + if (error == QMQTT::SocketConnectionRefusedError) + errStr = "Connection refused"; + if (error == QMQTT::SocketRemoteHostClosedError) + errStr = "Remote host closed"; + if (error == QMQTT::SocketHostNotFoundError) + errStr = "Remote host not found"; + if (error == QMQTT::MqttBadUserNameOrPasswordError) + errStr = "MQTT bad user or password"; + if (error == QMQTT::MqttNotAuthorizedError) + errStr = "MQTT not authorized"; + if (error == QMQTT::SocketResourceError) + errStr = "Socket resource error. Is your OS limiting you? Ulimit, etc?"; + if (error == QMQTT::SocketSslInternalError) + errStr = "Socket SSL internal error."; + if (error == QMQTT::SocketTimeoutError) + errStr = "Socket timeout"; + + QString msg = QString("Client %1 error code: %2 (%3). Initiated delayed reconnect.\n").arg(_sender->clientId()).arg(error).arg(errStr); + throw new std::runtime_error(msg.toStdString()); +} + void TwoClientTestContext::onReceiverReceived(const QMQTT::Message &message) { receivedMessages.append(message); diff --git a/FlashMQTests/twoclienttestcontext.h b/FlashMQTests/twoclienttestcontext.h index b440210..6052c8b 100644 --- a/FlashMQTests/twoclienttestcontext.h +++ b/FlashMQTests/twoclienttestcontext.h @@ -23,6 +23,7 @@ public: void disconnectReceiver(); void subscribeReceiver(const QString &topic); void waitReceiverReceived(); + void onClientError(const QMQTT::ClientError error); QList receivedMessages; diff --git a/client.cpp b/client.cpp index b76696e..05f225c 100644 --- a/client.cpp +++ b/client.cpp @@ -57,7 +57,7 @@ bool Client::readFdIntoBuffer() // Make sure we either always have enough space for a next call of this method, or stop reading the fd. if (readbuf.freeSpace() == 0) { - if (readbuf.getSize() * 2 < CLIENT_MAX_BUFFER_SIZE) + if (readbuf.getSize() * 2 < MAX_PACKET_SIZE) { readbuf.doubleSize(); } @@ -81,13 +81,19 @@ void Client::writeMqttPacket(const MqttPacket &packet) { std::lock_guard locker(writeBufMutex); - while (packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) + // Grow as far as we can. We have to make room for one MQTT packet. + while (packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace() && writebuf.getSize() < MAX_PACKET_SIZE) { - if (packet.packetType == PacketType::PUBLISH && writebuf.getSize() >= CLIENT_MAX_BUFFER_SIZE) - return; writebuf.doubleSize(); } + // And drop a publish when it doesn't fit, even after resizing. This means we do allow pings. + // TODO: when QoS is implemented, different filtering may be required. + if (packet.packetType == PacketType::PUBLISH && packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) + { + return; + } + if (!packet.containsFixedHeader()) { writebuf.headPtr()[0] = packet.getFirstByte(); @@ -257,8 +263,8 @@ bool Client::bufferToMqttPackets(std::vector &packetQueueIn, Client_ encodedByte = readbuf.peakAhead(remaining_length_i++); packet_length += (encodedByte & 127) * multiplier; multiplier *= 128; - if (multiplier > 128*128*128) - return false; + if (multiplier > 128*128*128*128) + throw ProtocolError("Malformed Remaining Length."); } while ((encodedByte & 128) != 0); packet_length += fixed_header_length; diff --git a/client.h b/client.h index b012d9b..bbbeec4 100644 --- a/client.h +++ b/client.h @@ -16,7 +16,7 @@ #define CLIENT_BUFFER_SIZE 1024 // Must be power of 2 -#define CLIENT_MAX_BUFFER_SIZE 65536 +#define MAX_PACKET_SIZE 268435461 // 256 MB + 5 #define MQTT_HEADER_LENGH 2 class Client