Commit a583c5f64c0553e66ad54565964df4ce2fabbc3c

Authored by Wiebe Cazemier
1 parent ee79271a

Improve subacks

mqttpacket.cpp
@@ -69,7 +69,8 @@ MqttPacket::MqttPacket(const SubAck &subAck) : @@ -69,7 +69,8 @@ MqttPacket::MqttPacket(const SubAck &subAck) :
69 } 69 }
70 70
71 std::vector<char> returnList; 71 std::vector<char> returnList;
72 - for (SubAckReturnCodes code : subAck.responses) 72 + returnList.reserve(subAck.responses.size());
  73 + for (ReasonCodes code : subAck.responses)
73 { 74 {
74 returnList.push_back(static_cast<char>(code)); 75 returnList.push_back(static_cast<char>(code));
75 } 76 }
@@ -605,7 +606,7 @@ void MqttPacket::handleSubscribe() @@ -605,7 +606,7 @@ void MqttPacket::handleSubscribe()
605 if (firstByteFirstNibble != 2) 606 if (firstByteFirstNibble != 2)
606 throw ProtocolError("First LSB of first byte is wrong value for subscribe packet."); 607 throw ProtocolError("First LSB of first byte is wrong value for subscribe packet.");
607 608
608 - uint16_t packet_id = readTwoBytesToUInt16(); 609 + const uint16_t packet_id = readTwoBytesToUInt16();
609 610
610 if (packet_id == 0) 611 if (packet_id == 0)
611 { 612 {
@@ -637,7 +638,7 @@ void MqttPacket::handleSubscribe() @@ -637,7 +638,7 @@ void MqttPacket::handleSubscribe()
637 638
638 Authentication &authentication = *ThreadGlobals::getAuth(); 639 Authentication &authentication = *ThreadGlobals::getAuth();
639 640
640 - std::list<char> subs_reponse_codes; 641 + std::list<ReasonCodes> subs_reponse_codes;
641 while (remainingAfterPos() > 0) 642 while (remainingAfterPos() > 0)
642 { 643 {
643 uint16_t topicLength = readTwoBytesToUInt16(); 644 uint16_t topicLength = readTwoBytesToUInt16();
@@ -649,7 +650,7 @@ void MqttPacket::handleSubscribe() @@ -649,7 +650,7 @@ void MqttPacket::handleSubscribe()
649 if (!isValidSubscribePath(topic)) 650 if (!isValidSubscribePath(topic))
650 throw ProtocolError(formatString("Invalid subscribe path: %s", topic.c_str())); 651 throw ProtocolError(formatString("Invalid subscribe path: %s", topic.c_str()));
651 652
652 - char qos = readByte(); 653 + uint8_t qos = readByte();
653 654
654 if (qos > 2) 655 if (qos > 2)
655 throw ProtocolError("QoS is greater than 2, and/or reserved bytes in QoS field are not 0."); 656 throw ProtocolError("QoS is greater than 2, and/or reserved bytes in QoS field are not 0.");
@@ -660,16 +661,14 @@ void MqttPacket::handleSubscribe() @@ -660,16 +661,14 @@ void MqttPacket::handleSubscribe()
660 { 661 {
661 logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribed to '%s' QoS %d", sender->repr().c_str(), topic.c_str(), qos); 662 logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribed to '%s' QoS %d", sender->repr().c_str(), topic.c_str(), qos);
662 sender->getThreadData()->getSubscriptionStore()->addSubscription(sender, topic, subtopics, qos); 663 sender->getThreadData()->getSubscriptionStore()->addSubscription(sender, topic, subtopics, qos);
663 - subs_reponse_codes.push_back(qos); 664 + subs_reponse_codes.push_back(static_cast<ReasonCodes>(qos));
664 } 665 }
665 else 666 else
666 { 667 {
667 logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribe to '%s' denied or failed.", sender->repr().c_str(), topic.c_str()); 668 logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribe to '%s' denied or failed.", sender->repr().c_str(), topic.c_str());
668 669
669 - // We can't not send an ack, because if there are multiple subscribes, you send fewer acks back, losing sync.  
670 - char return_code = qos;  
671 - if (sender->getProtocolVersion() >= ProtocolVersion::Mqtt311)  
672 - return_code = static_cast<char>(SubAckReturnCodes::Fail); 670 + // We can't not send an ack, because if there are multiple subscribes, you'd send fewer acks back, losing sync.
  671 + ReasonCodes return_code = sender->getProtocolVersion() >= ProtocolVersion::Mqtt311 ? ReasonCodes::NotAuthorized : static_cast<ReasonCodes>(qos);
673 subs_reponse_codes.push_back(return_code); 672 subs_reponse_codes.push_back(return_code);
674 } 673 }
675 } 674 }
types.cpp
@@ -82,15 +82,21 @@ size_t ConnAck::getLengthWithoutFixedHeader() const @@ -82,15 +82,21 @@ size_t ConnAck::getLengthWithoutFixedHeader() const
82 return result; 82 return result;
83 } 83 }
84 84
85 -SubAck::SubAck(const ProtocolVersion protVersion, uint16_t packet_id, const std::list<char> &subs_qos_reponses) : 85 +SubAck::SubAck(const ProtocolVersion protVersion, uint16_t packet_id, const std::list<ReasonCodes> &subs_qos_reponses) :
86 protocol_version(protVersion), 86 protocol_version(protVersion),
87 packet_id(packet_id) 87 packet_id(packet_id)
88 { 88 {
89 assert(!subs_qos_reponses.empty()); 89 assert(!subs_qos_reponses.empty());
90 90
91 - for (char ack_code : subs_qos_reponses) 91 + for (const ReasonCodes ack_code : subs_qos_reponses)
92 { 92 {
93 - responses.push_back(static_cast<SubAckReturnCodes>(ack_code)); 93 + assert(protVersion >= ProtocolVersion::Mqtt311 || ack_code <= ReasonCodes::GrantedQoS2);
  94 +
  95 + ReasonCodes _ack_code = ack_code;
  96 + if (protVersion < ProtocolVersion::Mqtt5 && ack_code >= ReasonCodes::UnspecifiedError)
  97 + _ack_code = ReasonCodes::UnspecifiedError; // Equals Mqtt 3.1.1 'suback failure'
  98 +
  99 + responses.push_back(static_cast<ReasonCodes>(_ack_code));
94 } 100 }
95 } 101 }
96 102
@@ -164,23 +164,15 @@ public: @@ -164,23 +164,15 @@ public:
164 size_t getLengthWithoutFixedHeader() const; 164 size_t getLengthWithoutFixedHeader() const;
165 }; 165 };
166 166
167 -enum class SubAckReturnCodes  
168 -{  
169 - MaxQoS0 = 0,  
170 - MaxQoS1 = 1,  
171 - MaxQoS2 = 2,  
172 - Fail = 0x80  
173 -};  
174 -  
175 class SubAck 167 class SubAck
176 { 168 {
177 public: 169 public:
178 const ProtocolVersion protocol_version; 170 const ProtocolVersion protocol_version;
179 - uint16_t packet_id;  
180 - std::list<SubAckReturnCodes> responses; 171 + const uint16_t packet_id;
  172 + std::list<ReasonCodes> responses;
181 std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder; 173 std::shared_ptr<Mqtt5PropertyBuilder> propertyBuilder;
182 174
183 - SubAck(const ProtocolVersion protVersion, uint16_t packet_id, const std::list<char> &subs_qos_reponses); 175 + SubAck(const ProtocolVersion protVersion, uint16_t packet_id, const std::list<ReasonCodes> &subs_qos_reponses);
184 size_t getLengthWithoutFixedHeader() const; 176 size_t getLengthWithoutFixedHeader() const;
185 }; 177 };
186 178