Commit 60065fcb995682d5eb1ec7cacee3192a731b5d5b

Authored by Wiebe Cazemier
1 parent 65146ae2

Do 'session present' for MQTT 3.1.1

mqttpacket.cpp
@@ -54,9 +54,8 @@ MqttPacket::MqttPacket(const ConnAck &connAck) : @@ -54,9 +54,8 @@ MqttPacket::MqttPacket(const ConnAck &connAck) :
54 char first_byte = static_cast<char>(packetType) << 4; 54 char first_byte = static_cast<char>(packetType) << 4;
55 writeByte(first_byte); 55 writeByte(first_byte);
56 writeByte(2); // length is always 2. 56 writeByte(2); // length is always 2.
57 - writeByte(0); // all connect-ack flags are 0, except session-present, but we don't have that yet. TODO: make that 57 + writeByte(connAck.session_present & 0b00000001); // all connect-ack flags are 0, except session-present. [MQTT-3.2.2.1]
58 writeByte(static_cast<char>(connAck.return_code)); 58 writeByte(static_cast<char>(connAck.return_code));
59 -  
60 } 59 }
61 60
62 MqttPacket::MqttPacket(const SubAck &subAck) : 61 MqttPacket::MqttPacket(const SubAck &subAck) :
@@ -155,6 +154,8 @@ void MqttPacket::handleConnect() @@ -155,6 +154,8 @@ void MqttPacket::handleConnect()
155 154
156 GlobalSettings *settings = GlobalSettings::getInstance(); 155 GlobalSettings *settings = GlobalSettings::getInstance();
157 156
  157 + std::shared_ptr<SubscriptionStore> subscriptionStore = sender->getThreadData()->getSubscriptionStore();
  158 +
158 uint16_t variable_header_length = readTwoBytesToUInt16(); 159 uint16_t variable_header_length = readTwoBytesToUInt16();
159 160
160 if (variable_header_length == 4 || variable_header_length == 6) 161 if (variable_header_length == 4 || variable_header_length == 6)
@@ -278,17 +279,18 @@ void MqttPacket::handleConnect() @@ -278,17 +279,18 @@ void MqttPacket::handleConnect()
278 279
279 if (sender->getThreadData()->authPlugin.unPwdCheck(username, password) == AuthResult::success) 280 if (sender->getThreadData()->authPlugin.unPwdCheck(username, password) == AuthResult::success)
280 { 281 {
281 - sender->getThreadData()->getSubscriptionStore()->registerClientAndKickExistingOne(sender); 282 + bool sessionPresent = protocolVersion >= ProtocolVersion::Mqtt311 && !clean_session && subscriptionStore->sessionPresent(client_id);
  283 + subscriptionStore->registerClientAndKickExistingOne(sender);
282 284
283 sender->setAuthenticated(true); 285 sender->setAuthenticated(true);
284 - ConnAck connAck(ConnAckReturnCodes::Accepted); 286 + ConnAck connAck(ConnAckReturnCodes::Accepted, sessionPresent);
285 MqttPacket response(connAck); 287 MqttPacket response(connAck);
286 sender->writeMqttPacket(response); 288 sender->writeMqttPacket(response);
287 logger->logf(LOG_NOTICE, "User '%s' logged in successfully", username.c_str()); 289 logger->logf(LOG_NOTICE, "User '%s' logged in successfully", username.c_str());
288 } 290 }
289 else 291 else
290 { 292 {
291 - ConnAck connDeny(ConnAckReturnCodes::NotAuthorized); 293 + ConnAck connDeny(ConnAckReturnCodes::NotAuthorized, false);
292 MqttPacket response(connDeny); 294 MqttPacket response(connDeny);
293 sender->setDisconnectReason("Access denied"); 295 sender->setDisconnectReason("Access denied");
294 sender->setReadyForDisconnect(); 296 sender->setReadyForDisconnect();
subscriptionstore.cpp
@@ -121,6 +121,22 @@ void SubscriptionStore::registerClientAndKickExistingOne(Client_p &amp;client) @@ -121,6 +121,22 @@ void SubscriptionStore::registerClientAndKickExistingOne(Client_p &amp;client)
121 session->sendPendingQosMessages(); 121 session->sendPendingQosMessages();
122 } 122 }
123 123
  124 +bool SubscriptionStore::sessionPresent(const std::string &clientid)
  125 +{
  126 + RWLockGuard lock_guard(&subscriptionsRwlock);
  127 + lock_guard.rdlock();
  128 +
  129 + bool result = false;
  130 +
  131 + auto it = sessionsByIdConst.find(clientid);
  132 + if (it != sessionsByIdConst.end())
  133 + {
  134 + it->second->touch(); // Touching to avoid a race condition between using the session after this, and it expiring.
  135 + result = true;
  136 + }
  137 + return result;
  138 +}
  139 +
124 // TODO: should I implement cache, this needs to be changed to returning a list of clients. 140 // TODO: should I implement cache, this needs to be changed to returning a list of clients.
125 void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const std::vector<Subscription> &subscribers) const 141 void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const std::vector<Subscription> &subscribers) const
126 { 142 {
subscriptionstore.h
@@ -70,6 +70,7 @@ public: @@ -70,6 +70,7 @@ public:
70 70
71 void addSubscription(Client_p &client, const std::string &topic, char qos); 71 void addSubscription(Client_p &client, const std::string &topic, char qos);
72 void registerClientAndKickExistingOne(Client_p &client); 72 void registerClientAndKickExistingOne(Client_p &client);
  73 + bool sessionPresent(const std::string &clientid);
73 74
74 void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet, const Client_p &sender); 75 void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet, const Client_p &sender);
75 void giveClientRetainedMessages(const std::shared_ptr<Session> &ses, const std::string &subscribe_topic, char max_qos); 76 void giveClientRetainedMessages(const std::shared_ptr<Session> &ses, const std::string &subscribe_topic, char max_qos);
types.cpp
1 #include "types.h" 1 #include "types.h"
2 2
3 -ConnAck::ConnAck(ConnAckReturnCodes return_code) :  
4 - return_code(return_code) 3 +ConnAck::ConnAck(ConnAckReturnCodes return_code, bool session_present) :
  4 + return_code(return_code),
  5 + session_present(session_present)
5 { 6 {
6 - 7 + // [MQTT-3.2.2-4]
  8 + if (return_code > ConnAckReturnCodes::Accepted)
  9 + session_present = false;
7 } 10 }
8 11
9 SubAck::SubAck(uint16_t packet_id, const std::list<char> &subs_qos_reponses) : 12 SubAck::SubAck(uint16_t packet_id, const std::list<char> &subs_qos_reponses) :
@@ -46,8 +46,9 @@ enum class ConnAckReturnCodes @@ -46,8 +46,9 @@ enum class ConnAckReturnCodes
46 class ConnAck 46 class ConnAck
47 { 47 {
48 public: 48 public:
49 - ConnAck(ConnAckReturnCodes return_code); 49 + ConnAck(ConnAckReturnCodes return_code, bool session_present=false);
50 ConnAckReturnCodes return_code; 50 ConnAckReturnCodes return_code;
  51 + bool session_present = false;
51 size_t getLengthWithoutFixedHeader() const { return 2;} // size of connack is always the same 52 size_t getLengthWithoutFixedHeader() const { return 2;} // size of connack is always the same
52 }; 53 };
53 54