diff --git a/mqttpacket.cpp b/mqttpacket.cpp index f72eacf..25ee333 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -61,7 +61,7 @@ std::shared_ptr MqttPacket::getCopy(char new_max_qos) const // has to do with the Session::writePacket() and Session::sendPendingQosMessages() logic. assert((first_byte & 0b00001000) == 0); - if (publish.qos > 0 && new_max_qos == 0) + if (publishData.qos > 0 && new_max_qos == 0) { // if shrinking the packet doesn't alter the amount of bytes in the 'remaining length' part of the header, we can // just memmove+shrink the packet. This is because the packet id always is two bytes before the payload, so we just move the payload @@ -79,7 +79,7 @@ std::shared_ptr MqttPacket::getCopy(char new_max_qos) const std::memmove(&p->bites[packet_id_pos], &p->bites[packet_id_pos+2], payloadLen); p->bites.erase(p->bites.end() - 2, p->bites.end()); p->packet_id_pos = 0; - p->publish.qos = 0; + p->publishData.qos = 0; p->payloadStart -= 2; if (pos > p->bites.size()) // pos can possible be set elsewhere, so we only set it back if it was after the payload. p->pos -= 2; @@ -97,7 +97,7 @@ std::shared_ptr MqttPacket::getCopy(char new_max_qos) const return p; } - Publish pub(publish.topic, getPayloadCopy(), new_max_qos); + Publish pub(publishData.topic, getPayloadCopy(), new_max_qos); pub.retain = getRetain(); std::shared_ptr copyPacket(new MqttPacket(ProtocolVersion::Mqtt311, pub)); // TODO: don't hard-code the protocol version. return copyPacket; @@ -105,7 +105,7 @@ std::shared_ptr MqttPacket::getCopy(char new_max_qos) const std::shared_ptr copyPacket(new MqttPacket(*this)); copyPacket->sender.reset(); - if (publish.qos != new_max_qos) + if (publishData.qos != new_max_qos) copyPacket->setQos(new_max_qos); return copyPacket; } @@ -163,29 +163,29 @@ size_t MqttPacket::getRequiredSizeForPublish(const ProtocolVersion protocolVersi return result; } -MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, Publish &_publish) : +MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &_publish) : bites(getRequiredSizeForPublish(protocolVersion, _publish)) { - if (publish.topic.length() > 0xFFFF) + if (publishData.topic.length() > 0xFFFF) { throw ProtocolError("Topic path too long."); } this->protocolVersion = protocolVersion; - this->publish.topic = _publish.topic; - splitTopic(this->publish.topic, this->publish.subtopics); // TODO: I think I can make this conditional, because the (planned) use will already have used the subtopics. + this->publishData.topic = _publish.topic; + splitTopic(this->publishData.topic, this->publishData.subtopics); // TODO: I think I can make this conditional, because the (planned) use will already have used the subtopics. packetType = PacketType::PUBLISH; - this->publish.qos = _publish.qos; + this->publishData.qos = _publish.qos; first_byte = static_cast(packetType) << 4; first_byte |= (_publish.qos << 1); first_byte |= (static_cast(_publish.retain) & 0b00000001); - writeUint16(publish.topic.length()); - writeBytes(publish.topic.c_str(), publish.topic.length()); + writeUint16(publishData.topic.length()); + writeBytes(publishData.topic.c_str(), publishData.topic.length()); - if (publish.qos) + if (publishData.qos) { // Reserve the space for the packet id, which will be assigned later. packet_id_pos = pos; @@ -769,22 +769,22 @@ void MqttPacket::handlePublish() if (qos > 2) throw ProtocolError("QoS 3 is a protocol violation."); - this->publish.qos = qos; + this->publishData.qos = qos; if (qos == 0 && dup) throw ProtocolError("Duplicate flag is set for QoS 0 packet. This is illegal."); - publish.topic = std::string(readBytes(variable_header_length), variable_header_length); - splitTopic(publish.topic, publish.subtopics); + publishData.topic = std::string(readBytes(variable_header_length), variable_header_length); + splitTopic(publishData.topic, publishData.subtopics); - if (!isValidUtf8(publish.topic, true)) + if (!isValidUtf8(publishData.topic, true)) { logger->logf(LOG_WARNING, "Client '%s' published a message with invalid UTF8 or $/+/# in it. Dropping.", sender->repr().c_str()); return; } #ifndef NDEBUG - logger->logf(LOG_DEBUG, "Publish received, topic '%s'. QoS=%d. Retain=%d, dup=%d", publish.topic.c_str(), qos, retain, dup); + logger->logf(LOG_DEBUG, "Publish received, topic '%s'. QoS=%d. Retain=%d, dup=%d", publishData.topic.c_str(), qos, retain, dup); #endif sender->getThreadData()->incrementReceivedMessageCount(); @@ -831,23 +831,23 @@ void MqttPacket::handlePublish() while (pos < prop_end_at) { const Mqtt5Properties prop = static_cast(readByte()); - publish.propertyBuilder = std::make_shared(); + publishData.propertyBuilder = std::make_shared(); switch (prop) { case Mqtt5Properties::PayloadFormatIndicator: - publish.propertyBuilder->writePayloadFormatIndicator(readByte()); + publishData.propertyBuilder->writePayloadFormatIndicator(readByte()); break; case Mqtt5Properties::MessageExpiryInterval: - publish.createdAt = std::chrono::steady_clock::now(); - publish.expiresAfter = std::chrono::seconds(readFourBytesToUint32()); + publishData.createdAt = std::chrono::steady_clock::now(); + publishData.expiresAfter = std::chrono::seconds(readFourBytesToUint32()); case Mqtt5Properties::TopicAlias: break; case Mqtt5Properties::ResponseTopic: { const uint16_t len = readTwoBytesToUInt16(); const std::string responseTopic(readBytes(len), len); - publish.propertyBuilder->writeResponseTopic(responseTopic); + publishData.propertyBuilder->writeResponseTopic(responseTopic); break; } case Mqtt5Properties::CorrelationData: @@ -860,7 +860,7 @@ void MqttPacket::handlePublish() { const uint16_t len = readTwoBytesToUInt16(); const std::string contentType(readBytes(len), len); - publish.propertyBuilder->writeContentType(contentType); + publishData.propertyBuilder->writeContentType(contentType); break; } default: @@ -873,12 +873,12 @@ void MqttPacket::handlePublish() payloadStart = pos; Authentication &authentication = *ThreadGlobals::getAuth(); - if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), publish.topic, publish.subtopics, AclAccess::write, qos, retain) == AuthResult::success) + if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), publishData.topic, publishData.subtopics, AclAccess::write, qos, retain) == AuthResult::success) { if (retain) { std::string payload(readBytes(payloadLen), payloadLen); - sender->getThreadData()->getSubscriptionStore()->setRetainedMessage(publish.topic, publish.subtopics, payload, qos); + sender->getThreadData()->getSubscriptionStore()->setRetainedMessage(publishData.topic, publishData.subtopics, payload, qos); } // Set dup flag to 0, because that must not be propagated [MQTT-3.3.1-3]. @@ -948,7 +948,7 @@ void MqttPacket::setPacketId(uint16_t packet_id) assert(fixed_header_length == 0 || first_byte == bites[0]); assert(packet_id_pos > 0); assert(packetType == PacketType::PUBLISH); - assert(publish.qos > 0); + assert(publishData.qos > 0); this->packet_id = packet_id; pos = packet_id_pos; @@ -957,7 +957,7 @@ void MqttPacket::setPacketId(uint16_t packet_id) uint16_t MqttPacket::getPacketId() const { - assert(publish.qos > 0); + assert(publishData.qos > 0); return packet_id; } @@ -965,7 +965,7 @@ uint16_t MqttPacket::getPacketId() const void MqttPacket::setDuplicate() { assert(packetType == PacketType::PUBLISH); - assert(publish.qos > 0); + assert(publishData.qos > 0); assert(fixed_header_length == 0 || first_byte == bites[0]); first_byte |= 0b00001000; @@ -1010,12 +1010,12 @@ size_t MqttPacket::getSizeIncludingNonPresentHeader() const void MqttPacket::setQos(const char new_qos) { // You can't change to a QoS level that would remove the packet identifier. - assert((publish.qos == 0 && new_qos == 0) || (publish.qos > 0 && new_qos > 0)); + assert((publishData.qos == 0 && new_qos == 0) || (publishData.qos > 0 && new_qos > 0)); assert(new_qos > 0 && packet_id_pos > 0); - publish.qos = new_qos; + publishData.qos = new_qos; first_byte &= 0b11111001; - first_byte |= (publish.qos << 1); + first_byte |= (publishData.qos << 1); if (fixed_header_length > 0) { @@ -1026,7 +1026,7 @@ void MqttPacket::setQos(const char new_qos) const std::string &MqttPacket::getTopic() const { - return this->publish.topic; + return this->publishData.topic; } /** @@ -1035,7 +1035,7 @@ const std::string &MqttPacket::getTopic() const */ const std::vector &MqttPacket::getSubtopics() const { - return this->publish.subtopics; + return this->publishData.subtopics; } @@ -1200,15 +1200,15 @@ void MqttPacket::setRetain() Publish *MqttPacket::getPublish() { - if (payloadLen > 0 && publish.payload.empty()) - publish.payload = getPayloadCopy(); + if (payloadLen > 0 && publishData.payload.empty()) + publishData.payload = getPayloadCopy(); - return &publish; + return &publishData; } void MqttPacket::readIntoBuf(CirBuf &buf) const { - assert(packetType != PacketType::PUBLISH || (first_byte & 0b00000110) >> 1 == publish.qos); + assert(packetType != PacketType::PUBLISH || (first_byte & 0b00000110) >> 1 == publishData.qos); buf.ensureFreeSpace(getSizeIncludingNonPresentHeader()); diff --git a/mqttpacket.h b/mqttpacket.h index 55bebf0..97d827f 100644 --- a/mqttpacket.h +++ b/mqttpacket.h @@ -43,7 +43,7 @@ class MqttPacket #endif std::vector bites; - Publish publish; + Publish publishData; size_t fixed_header_length = 0; // if 0, this packet does not contain the bytes of the fixed header. VariableByteInt remainingLength; std::shared_ptr sender; @@ -80,13 +80,13 @@ public: std::shared_ptr getCopy(char new_max_qos) const; - size_t getRequiredSizeForPublish(const ProtocolVersion protocolVersion, const Publish &publish) const; + size_t getRequiredSizeForPublish(const ProtocolVersion protocolVersion, const Publish &publishData) const; // Constructor for outgoing packets. These may not allocate room for the fixed header, because we don't (always) know the length in advance. MqttPacket(const ConnAck &connAck); MqttPacket(const SubAck &subAck); MqttPacket(const UnsubAck &unsubAck); - MqttPacket(const ProtocolVersion protocolVersion, Publish &_publish); + MqttPacket(const ProtocolVersion protocolVersion, const Publish &_publish); MqttPacket(const PubAck &pubAck); MqttPacket(const PubRec &pubRec); MqttPacket(const PubComp &pubComp); @@ -108,7 +108,7 @@ public: size_t getSizeIncludingNonPresentHeader() const; const std::vector &getBites() const { return bites; } - char getQos() const { return publish.qos; } + char getQos() const { return publishData.qos; } void setQos(const char new_qos); ProtocolVersion getProtocolVersion() const { return protocolVersion;} const std::string &getTopic() const; diff --git a/qospacketqueue.cpp b/qospacketqueue.cpp index 17ee641..aecec36 100644 --- a/qospacketqueue.cpp +++ b/qospacketqueue.cpp @@ -16,7 +16,7 @@ uint16_t QueuedPublish::getPacketId() const return this->packet_id; } -Publish &QueuedPublish::getPublish() +const Publish &QueuedPublish::getPublish() const { return publish; } @@ -79,12 +79,12 @@ void QoSPublishQueue::queuePublish(Publish &&pub, uint16_t id) qosQueueBytes += queue.back().getApproximateMemoryFootprint(); } -std::list::iterator QoSPublishQueue::begin() +std::list::const_iterator QoSPublishQueue::begin() const { return queue.begin(); } -std::list::iterator QoSPublishQueue::end() +std::list::const_iterator QoSPublishQueue::end() const { return queue.end(); } diff --git a/qospacketqueue.h b/qospacketqueue.h index 2b812e1..df950bd 100644 --- a/qospacketqueue.h +++ b/qospacketqueue.h @@ -21,7 +21,7 @@ public: size_t getApproximateMemoryFootprint() const; uint16_t getPacketId() const; - Publish &getPublish(); + const Publish &getPublish() const; }; class QoSPublishQueue @@ -36,8 +36,8 @@ public: void queuePublish(PublishCopyFactory ©Factory, uint16_t id, char new_max_qos); void queuePublish(Publish &&pub, uint16_t id); - std::list::iterator begin(); - std::list::iterator end(); + std::list::const_iterator begin() const; + std::list::const_iterator end() const; }; #endif // QOSPACKETQUEUE_H diff --git a/session.cpp b/session.cpp index 9bf7c8e..21d9821 100644 --- a/session.cpp +++ b/session.cpp @@ -257,7 +257,7 @@ uint64_t Session::sendPendingQosMessages() if (c) { std::lock_guard locker(qosQueueMutex); - for (QueuedPublish &queuedPublish : qosPacketQueue) + for (const QueuedPublish &queuedPublish : qosPacketQueue) { MqttPacket p(c->getProtocolVersion(), queuedPublish.getPublish()); p.setDuplicate(); diff --git a/sessionsandsubscriptionsdb.cpp b/sessionsandsubscriptionsdb.cpp index 441c00b..03ae120 100644 --- a/sessionsandsubscriptionsdb.cpp +++ b/sessionsandsubscriptionsdb.cpp @@ -215,9 +215,9 @@ void SessionsAndSubscriptionsDB::saveData(const std::vectorqosPacketQueue) + for (const QueuedPublish &p: ses->qosPacketQueue) { - Publish &pub = p.getPublish(); + const Publish &pub = p.getPublish(); logger->logf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str());