Commit d5625354feb7fe1fc60f8b64bec841b9f53e9abd

Authored by Wiebe Cazemier
1 parent 52efbdc7

Publish argument is now const again

mqttpacket.cpp
... ... @@ -61,7 +61,7 @@ std::shared_ptr<MqttPacket> MqttPacket::getCopy(char new_max_qos) const
61 61 // has to do with the Session::writePacket() and Session::sendPendingQosMessages() logic.
62 62 assert((first_byte & 0b00001000) == 0);
63 63  
64   - if (publish.qos > 0 && new_max_qos == 0)
  64 + if (publishData.qos > 0 && new_max_qos == 0)
65 65 {
66 66 // if shrinking the packet doesn't alter the amount of bytes in the 'remaining length' part of the header, we can
67 67 // just memmove+shrink the packet. This is because the packet id always is two bytes before the payload, so we just move the payload
... ... @@ -79,7 +79,7 @@ std::shared_ptr<MqttPacket> MqttPacket::getCopy(char new_max_qos) const
79 79 std::memmove(&p->bites[packet_id_pos], &p->bites[packet_id_pos+2], payloadLen);
80 80 p->bites.erase(p->bites.end() - 2, p->bites.end());
81 81 p->packet_id_pos = 0;
82   - p->publish.qos = 0;
  82 + p->publishData.qos = 0;
83 83 p->payloadStart -= 2;
84 84 if (pos > p->bites.size()) // pos can possible be set elsewhere, so we only set it back if it was after the payload.
85 85 p->pos -= 2;
... ... @@ -97,7 +97,7 @@ std::shared_ptr<MqttPacket> MqttPacket::getCopy(char new_max_qos) const
97 97 return p;
98 98 }
99 99  
100   - Publish pub(publish.topic, getPayloadCopy(), new_max_qos);
  100 + Publish pub(publishData.topic, getPayloadCopy(), new_max_qos);
101 101 pub.retain = getRetain();
102 102 std::shared_ptr<MqttPacket> copyPacket(new MqttPacket(ProtocolVersion::Mqtt311, pub)); // TODO: don't hard-code the protocol version.
103 103 return copyPacket;
... ... @@ -105,7 +105,7 @@ std::shared_ptr&lt;MqttPacket&gt; MqttPacket::getCopy(char new_max_qos) const
105 105  
106 106 std::shared_ptr<MqttPacket> copyPacket(new MqttPacket(*this));
107 107 copyPacket->sender.reset();
108   - if (publish.qos != new_max_qos)
  108 + if (publishData.qos != new_max_qos)
109 109 copyPacket->setQos(new_max_qos);
110 110 return copyPacket;
111 111 }
... ... @@ -163,29 +163,29 @@ size_t MqttPacket::getRequiredSizeForPublish(const ProtocolVersion protocolVersi
163 163 return result;
164 164 }
165 165  
166   -MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, Publish &_publish) :
  166 +MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &_publish) :
167 167 bites(getRequiredSizeForPublish(protocolVersion, _publish))
168 168 {
169   - if (publish.topic.length() > 0xFFFF)
  169 + if (publishData.topic.length() > 0xFFFF)
170 170 {
171 171 throw ProtocolError("Topic path too long.");
172 172 }
173 173  
174 174 this->protocolVersion = protocolVersion;
175 175  
176   - this->publish.topic = _publish.topic;
177   - splitTopic(this->publish.topic, this->publish.subtopics); // TODO: I think I can make this conditional, because the (planned) use will already have used the subtopics.
  176 + this->publishData.topic = _publish.topic;
  177 + splitTopic(this->publishData.topic, this->publishData.subtopics); // TODO: I think I can make this conditional, because the (planned) use will already have used the subtopics.
178 178  
179 179 packetType = PacketType::PUBLISH;
180   - this->publish.qos = _publish.qos;
  180 + this->publishData.qos = _publish.qos;
181 181 first_byte = static_cast<char>(packetType) << 4;
182 182 first_byte |= (_publish.qos << 1);
183 183 first_byte |= (static_cast<char>(_publish.retain) & 0b00000001);
184 184  
185   - writeUint16(publish.topic.length());
186   - writeBytes(publish.topic.c_str(), publish.topic.length());
  185 + writeUint16(publishData.topic.length());
  186 + writeBytes(publishData.topic.c_str(), publishData.topic.length());
187 187  
188   - if (publish.qos)
  188 + if (publishData.qos)
189 189 {
190 190 // Reserve the space for the packet id, which will be assigned later.
191 191 packet_id_pos = pos;
... ... @@ -769,22 +769,22 @@ void MqttPacket::handlePublish()
769 769  
770 770 if (qos > 2)
771 771 throw ProtocolError("QoS 3 is a protocol violation.");
772   - this->publish.qos = qos;
  772 + this->publishData.qos = qos;
773 773  
774 774 if (qos == 0 && dup)
775 775 throw ProtocolError("Duplicate flag is set for QoS 0 packet. This is illegal.");
776 776  
777   - publish.topic = std::string(readBytes(variable_header_length), variable_header_length);
778   - splitTopic(publish.topic, publish.subtopics);
  777 + publishData.topic = std::string(readBytes(variable_header_length), variable_header_length);
  778 + splitTopic(publishData.topic, publishData.subtopics);
779 779  
780   - if (!isValidUtf8(publish.topic, true))
  780 + if (!isValidUtf8(publishData.topic, true))
781 781 {
782 782 logger->logf(LOG_WARNING, "Client '%s' published a message with invalid UTF8 or $/+/# in it. Dropping.", sender->repr().c_str());
783 783 return;
784 784 }
785 785  
786 786 #ifndef NDEBUG
787   - logger->logf(LOG_DEBUG, "Publish received, topic '%s'. QoS=%d. Retain=%d, dup=%d", publish.topic.c_str(), qos, retain, dup);
  787 + logger->logf(LOG_DEBUG, "Publish received, topic '%s'. QoS=%d. Retain=%d, dup=%d", publishData.topic.c_str(), qos, retain, dup);
788 788 #endif
789 789  
790 790 sender->getThreadData()->incrementReceivedMessageCount();
... ... @@ -831,23 +831,23 @@ void MqttPacket::handlePublish()
831 831 while (pos < prop_end_at)
832 832 {
833 833 const Mqtt5Properties prop = static_cast<Mqtt5Properties>(readByte());
834   - publish.propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>();
  834 + publishData.propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>();
835 835  
836 836 switch (prop)
837 837 {
838 838 case Mqtt5Properties::PayloadFormatIndicator:
839   - publish.propertyBuilder->writePayloadFormatIndicator(readByte());
  839 + publishData.propertyBuilder->writePayloadFormatIndicator(readByte());
840 840 break;
841 841 case Mqtt5Properties::MessageExpiryInterval:
842   - publish.createdAt = std::chrono::steady_clock::now();
843   - publish.expiresAfter = std::chrono::seconds(readFourBytesToUint32());
  842 + publishData.createdAt = std::chrono::steady_clock::now();
  843 + publishData.expiresAfter = std::chrono::seconds(readFourBytesToUint32());
844 844 case Mqtt5Properties::TopicAlias:
845 845 break;
846 846 case Mqtt5Properties::ResponseTopic:
847 847 {
848 848 const uint16_t len = readTwoBytesToUInt16();
849 849 const std::string responseTopic(readBytes(len), len);
850   - publish.propertyBuilder->writeResponseTopic(responseTopic);
  850 + publishData.propertyBuilder->writeResponseTopic(responseTopic);
851 851 break;
852 852 }
853 853 case Mqtt5Properties::CorrelationData:
... ... @@ -860,7 +860,7 @@ void MqttPacket::handlePublish()
860 860 {
861 861 const uint16_t len = readTwoBytesToUInt16();
862 862 const std::string contentType(readBytes(len), len);
863   - publish.propertyBuilder->writeContentType(contentType);
  863 + publishData.propertyBuilder->writeContentType(contentType);
864 864 break;
865 865 }
866 866 default:
... ... @@ -873,12 +873,12 @@ void MqttPacket::handlePublish()
873 873 payloadStart = pos;
874 874  
875 875 Authentication &authentication = *ThreadGlobals::getAuth();
876   - if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), publish.topic, publish.subtopics, AclAccess::write, qos, retain) == AuthResult::success)
  876 + if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), publishData.topic, publishData.subtopics, AclAccess::write, qos, retain) == AuthResult::success)
877 877 {
878 878 if (retain)
879 879 {
880 880 std::string payload(readBytes(payloadLen), payloadLen);
881   - sender->getThreadData()->getSubscriptionStore()->setRetainedMessage(publish.topic, publish.subtopics, payload, qos);
  881 + sender->getThreadData()->getSubscriptionStore()->setRetainedMessage(publishData.topic, publishData.subtopics, payload, qos);
882 882 }
883 883  
884 884 // Set dup flag to 0, because that must not be propagated [MQTT-3.3.1-3].
... ... @@ -948,7 +948,7 @@ void MqttPacket::setPacketId(uint16_t packet_id)
948 948 assert(fixed_header_length == 0 || first_byte == bites[0]);
949 949 assert(packet_id_pos > 0);
950 950 assert(packetType == PacketType::PUBLISH);
951   - assert(publish.qos > 0);
  951 + assert(publishData.qos > 0);
952 952  
953 953 this->packet_id = packet_id;
954 954 pos = packet_id_pos;
... ... @@ -957,7 +957,7 @@ void MqttPacket::setPacketId(uint16_t packet_id)
957 957  
958 958 uint16_t MqttPacket::getPacketId() const
959 959 {
960   - assert(publish.qos > 0);
  960 + assert(publishData.qos > 0);
961 961 return packet_id;
962 962 }
963 963  
... ... @@ -965,7 +965,7 @@ uint16_t MqttPacket::getPacketId() const
965 965 void MqttPacket::setDuplicate()
966 966 {
967 967 assert(packetType == PacketType::PUBLISH);
968   - assert(publish.qos > 0);
  968 + assert(publishData.qos > 0);
969 969 assert(fixed_header_length == 0 || first_byte == bites[0]);
970 970  
971 971 first_byte |= 0b00001000;
... ... @@ -1010,12 +1010,12 @@ size_t MqttPacket::getSizeIncludingNonPresentHeader() const
1010 1010 void MqttPacket::setQos(const char new_qos)
1011 1011 {
1012 1012 // You can't change to a QoS level that would remove the packet identifier.
1013   - assert((publish.qos == 0 && new_qos == 0) || (publish.qos > 0 && new_qos > 0));
  1013 + assert((publishData.qos == 0 && new_qos == 0) || (publishData.qos > 0 && new_qos > 0));
1014 1014 assert(new_qos > 0 && packet_id_pos > 0);
1015 1015  
1016   - publish.qos = new_qos;
  1016 + publishData.qos = new_qos;
1017 1017 first_byte &= 0b11111001;
1018   - first_byte |= (publish.qos << 1);
  1018 + first_byte |= (publishData.qos << 1);
1019 1019  
1020 1020 if (fixed_header_length > 0)
1021 1021 {
... ... @@ -1026,7 +1026,7 @@ void MqttPacket::setQos(const char new_qos)
1026 1026  
1027 1027 const std::string &MqttPacket::getTopic() const
1028 1028 {
1029   - return this->publish.topic;
  1029 + return this->publishData.topic;
1030 1030 }
1031 1031  
1032 1032 /**
... ... @@ -1035,7 +1035,7 @@ const std::string &amp;MqttPacket::getTopic() const
1035 1035 */
1036 1036 const std::vector<std::string> &MqttPacket::getSubtopics() const
1037 1037 {
1038   - return this->publish.subtopics;
  1038 + return this->publishData.subtopics;
1039 1039 }
1040 1040  
1041 1041  
... ... @@ -1200,15 +1200,15 @@ void MqttPacket::setRetain()
1200 1200  
1201 1201 Publish *MqttPacket::getPublish()
1202 1202 {
1203   - if (payloadLen > 0 && publish.payload.empty())
1204   - publish.payload = getPayloadCopy();
  1203 + if (payloadLen > 0 && publishData.payload.empty())
  1204 + publishData.payload = getPayloadCopy();
1205 1205  
1206   - return &publish;
  1206 + return &publishData;
1207 1207 }
1208 1208  
1209 1209 void MqttPacket::readIntoBuf(CirBuf &buf) const
1210 1210 {
1211   - assert(packetType != PacketType::PUBLISH || (first_byte & 0b00000110) >> 1 == publish.qos);
  1211 + assert(packetType != PacketType::PUBLISH || (first_byte & 0b00000110) >> 1 == publishData.qos);
1212 1212  
1213 1213 buf.ensureFreeSpace(getSizeIncludingNonPresentHeader());
1214 1214  
... ...
mqttpacket.h
... ... @@ -43,7 +43,7 @@ class MqttPacket
43 43 #endif
44 44  
45 45 std::vector<char> bites;
46   - Publish publish;
  46 + Publish publishData;
47 47 size_t fixed_header_length = 0; // if 0, this packet does not contain the bytes of the fixed header.
48 48 VariableByteInt remainingLength;
49 49 std::shared_ptr<Client> sender;
... ... @@ -80,13 +80,13 @@ public:
80 80  
81 81 std::shared_ptr<MqttPacket> getCopy(char new_max_qos) const;
82 82  
83   - size_t getRequiredSizeForPublish(const ProtocolVersion protocolVersion, const Publish &publish) const;
  83 + size_t getRequiredSizeForPublish(const ProtocolVersion protocolVersion, const Publish &publishData) const;
84 84  
85 85 // Constructor for outgoing packets. These may not allocate room for the fixed header, because we don't (always) know the length in advance.
86 86 MqttPacket(const ConnAck &connAck);
87 87 MqttPacket(const SubAck &subAck);
88 88 MqttPacket(const UnsubAck &unsubAck);
89   - MqttPacket(const ProtocolVersion protocolVersion, Publish &_publish);
  89 + MqttPacket(const ProtocolVersion protocolVersion, const Publish &_publish);
90 90 MqttPacket(const PubAck &pubAck);
91 91 MqttPacket(const PubRec &pubRec);
92 92 MqttPacket(const PubComp &pubComp);
... ... @@ -108,7 +108,7 @@ public:
108 108  
109 109 size_t getSizeIncludingNonPresentHeader() const;
110 110 const std::vector<char> &getBites() const { return bites; }
111   - char getQos() const { return publish.qos; }
  111 + char getQos() const { return publishData.qos; }
112 112 void setQos(const char new_qos);
113 113 ProtocolVersion getProtocolVersion() const { return protocolVersion;}
114 114 const std::string &getTopic() const;
... ...
qospacketqueue.cpp
... ... @@ -16,7 +16,7 @@ uint16_t QueuedPublish::getPacketId() const
16 16 return this->packet_id;
17 17 }
18 18  
19   -Publish &QueuedPublish::getPublish()
  19 +const Publish &QueuedPublish::getPublish() const
20 20 {
21 21 return publish;
22 22 }
... ... @@ -79,12 +79,12 @@ void QoSPublishQueue::queuePublish(Publish &amp;&amp;pub, uint16_t id)
79 79 qosQueueBytes += queue.back().getApproximateMemoryFootprint();
80 80 }
81 81  
82   -std::list<QueuedPublish>::iterator QoSPublishQueue::begin()
  82 +std::list<QueuedPublish>::const_iterator QoSPublishQueue::begin() const
83 83 {
84 84 return queue.begin();
85 85 }
86 86  
87   -std::list<QueuedPublish>::iterator QoSPublishQueue::end()
  87 +std::list<QueuedPublish>::const_iterator QoSPublishQueue::end() const
88 88 {
89 89 return queue.end();
90 90 }
... ...
qospacketqueue.h
... ... @@ -21,7 +21,7 @@ public:
21 21  
22 22 size_t getApproximateMemoryFootprint() const;
23 23 uint16_t getPacketId() const;
24   - Publish &getPublish();
  24 + const Publish &getPublish() const;
25 25 };
26 26  
27 27 class QoSPublishQueue
... ... @@ -36,8 +36,8 @@ public:
36 36 void queuePublish(PublishCopyFactory &copyFactory, uint16_t id, char new_max_qos);
37 37 void queuePublish(Publish &&pub, uint16_t id);
38 38  
39   - std::list<QueuedPublish>::iterator begin();
40   - std::list<QueuedPublish>::iterator end();
  39 + std::list<QueuedPublish>::const_iterator begin() const;
  40 + std::list<QueuedPublish>::const_iterator end() const;
41 41 };
42 42  
43 43 #endif // QOSPACKETQUEUE_H
... ...
session.cpp
... ... @@ -257,7 +257,7 @@ uint64_t Session::sendPendingQosMessages()
257 257 if (c)
258 258 {
259 259 std::lock_guard<std::mutex> locker(qosQueueMutex);
260   - for (QueuedPublish &queuedPublish : qosPacketQueue)
  260 + for (const QueuedPublish &queuedPublish : qosPacketQueue)
261 261 {
262 262 MqttPacket p(c->getProtocolVersion(), queuedPublish.getPublish());
263 263 p.setDuplicate();
... ...
sessionsandsubscriptionsdb.cpp
... ... @@ -215,9 +215,9 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
215 215 size_t qosPacketsCounted = 0;
216 216 writeUint32(qosPacketsExpected);
217 217  
218   - for (QueuedPublish &p: ses->qosPacketQueue)
  218 + for (const QueuedPublish &p: ses->qosPacketQueue)
219 219 {
220   - Publish &pub = p.getPublish();
  220 + const Publish &pub = p.getPublish();
221 221  
222 222 logger->logf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str());
223 223  
... ...