Commit 9a34fc462fc342e97029fd886bf02111ffcb9c08

Authored by Wiebe Cazemier
1 parent a682f1e0

Change clean session into MQTT5 symantics

The behavior for MQTT3 clients in the same, but I replaced the term
'clean session' and described the behavior in MQTT5 terms, of 'clean
start' and an expiry interval.
FlashMQTests/tst_maintests.cpp
... ... @@ -1001,15 +1001,15 @@ void MainTests::testSavingSessions()
1001 1001 ThreadGlobals::assign(&auth);
1002 1002  
1003 1003 std::shared_ptr<Client> c1(new Client(0, t, nullptr, false, nullptr, settings, false));
1004   - c1->setClientProperties(ProtocolVersion::Mqtt311, "c1", "user1", true, 60, false);
1005   - store->registerClientAndKickExistingOne(c1);
  1004 + c1->setClientProperties(ProtocolVersion::Mqtt311, "c1", "user1", true, 60);
  1005 + store->registerClientAndKickExistingOne(c1, false, 512, 120);
1006 1006 c1->getSession()->touch();
1007 1007 c1->getSession()->addIncomingQoS2MessageId(2);
1008 1008 c1->getSession()->addIncomingQoS2MessageId(3);
1009 1009  
1010 1010 std::shared_ptr<Client> c2(new Client(0, t, nullptr, false, nullptr, settings, false));
1011   - c2->setClientProperties(ProtocolVersion::Mqtt311, "c2", "user2", true, 60, false);
1012   - store->registerClientAndKickExistingOne(c2);
  1011 + c2->setClientProperties(ProtocolVersion::Mqtt311, "c2", "user2", true, 60);
  1012 + store->registerClientAndKickExistingOne(c2, false, 512, 120);
1013 1013 c2->getSession()->touch();
1014 1014 c2->getSession()->addOutgoingQoS2MessageId(55);
1015 1015 c2->getSession()->addOutgoingQoS2MessageId(66);
... ... @@ -1120,8 +1120,8 @@ void testCopyPacketHelper(const std::string &amp;topic, char from_qos, char to_qos,
1120 1120 ThreadGlobals::assign(&auth);
1121 1121  
1122 1122 std::shared_ptr<Client> dummyClient(new Client(0, t, nullptr, false, nullptr, settings, false));
1123   - dummyClient->setClientProperties(ProtocolVersion::Mqtt311, "qostestclient", "user1", true, 60, false);
1124   - store->registerClientAndKickExistingOne(dummyClient);
  1123 + dummyClient->setClientProperties(ProtocolVersion::Mqtt311, "qostestclient", "user1", true, 60);
  1124 + store->registerClientAndKickExistingOne(dummyClient, false, 512, 120);
1125 1125  
1126 1126 uint16_t packetid = 66;
1127 1127 for (int len = 0; len < 150; len++ )
... ...
client.cpp
... ... @@ -75,7 +75,7 @@ Client::~Client()
75 75 }
76 76  
77 77 // MQTT-3.1.2-6
78   - if (cleanSession)
  78 + if (session->getDestroyOnDisconnect())
79 79 {
80 80 store->removeSession(clientid);
81 81 }
... ... @@ -286,9 +286,9 @@ bool Client::writeBufIntoFd()
286 286  
287 287 std::string Client::repr()
288 288 {
289   - std::string s = formatString("[ClientID='%s', username='%s', fd=%d, keepalive=%ds, transport='%s', address='%s', cleanses=%d, prot=%s]",
  289 + std::string s = formatString("[ClientID='%s', username='%s', fd=%d, keepalive=%ds, transport='%s', address='%s', prot=%s]",
290 290 clientid.c_str(), username.c_str(), fd, keepalive, this->transportStr.c_str(), this->address.c_str(),
291   - cleanSession, protocolVersionString(protocolVersion).c_str());
  291 + protocolVersionString(protocolVersion).c_str());
292 292 return s;
293 293 }
294 294  
... ... @@ -334,11 +334,6 @@ void Client::resetBuffersIfEligible()
334 334 writebuf.resetSizeIfEligable(initialBufferSize);
335 335 }
336 336  
337   -void Client::setCleanSession(bool val)
338   -{
339   - this->cleanSession = val;
340   -}
341   -
342 337 #ifndef NDEBUG
343 338 /**
344 339 * @brief IoWrapper::setFakeUpgraded().
... ... @@ -421,24 +416,22 @@ void Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, std::sh
421 416 setReadyForReading(readbuf.freeSpace() > 0);
422 417 }
423 418  
424   -void Client::setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, bool cleanSession)
  419 +void Client::setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive)
425 420 {
426 421 const Settings *settings = ThreadGlobals::getSettings();
427 422  
428   - setClientProperties(protocolVersion, clientId, username, connectPacketSeen, keepalive, cleanSession,
429   - settings->maxPacketSize, 0);
  423 + setClientProperties(protocolVersion, clientId, username, connectPacketSeen, keepalive, settings->maxPacketSize, 0);
430 424 }
431 425  
432 426  
433 427 void Client::setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive,
434   - bool cleanSession, uint32_t maxPacketSize, uint16_t maxTopicAliases)
  428 + uint32_t maxPacketSize, uint16_t maxTopicAliases)
435 429 {
436 430 this->protocolVersion = protocolVersion;
437 431 this->clientid = clientId;
438 432 this->username = username;
439 433 this->connectPacketSeen = connectPacketSeen;
440 434 this->keepalive = keepalive;
441   - this->cleanSession = cleanSession;
442 435 this->maxPacketSize = maxPacketSize;
443 436 this->maxTopicAliases = maxTopicAliases;
444 437 }
... ...
client.h
... ... @@ -74,7 +74,6 @@ class Client
74 74 std::string clientid;
75 75 std::string username;
76 76 uint16_t keepalive = 0;
77   - bool cleanSession = false;
78 77  
79 78 std::string will_topic;
80 79 std::string will_payload;
... ... @@ -108,9 +107,9 @@ public:
108 107 void markAsDisconnecting();
109 108 bool readFdIntoBuffer();
110 109 void bufferToMqttPackets(std::vector<MqttPacket> &packetQueueIn, std::shared_ptr<Client> &sender);
111   - void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, bool cleanSession);
  110 + void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive);
112 111 void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive,
113   - bool cleanSession, uint32_t maxPacketSize, uint16_t maxTopicAliases);
  112 + uint32_t maxPacketSize, uint16_t maxTopicAliases);
114 113 void setWill(const std::string &topic, const std::string &payload, bool retain, char qos);
115 114 void clearWill();
116 115 void setAuthenticated(bool value) { authenticated = value;}
... ... @@ -119,7 +118,6 @@ public:
119 118 std::shared_ptr<ThreadData> getThreadData() { return threadData; }
120 119 std::string &getClientId() { return this->clientid; }
121 120 const std::string &getUsername() const { return this->username; }
122   - bool getCleanSession() { return cleanSession; }
123 121 void assignSession(std::shared_ptr<Session> &session);
124 122 std::shared_ptr<Session> getSession();
125 123 void setDisconnectReason(const std::string &reason);
... ... @@ -140,8 +138,6 @@ public:
140 138 std::string getKeepAliveInfoString() const;
141 139 void resetBuffersIfEligible();
142 140  
143   - void setCleanSession(bool val);
144   -
145 141 #ifndef NDEBUG
146 142 void setFakeUpgraded();
147 143 #endif
... ...
mainapp.cpp
... ... @@ -442,11 +442,11 @@ void MainApp::start()
442 442  
443 443 std::shared_ptr<Client> client = std::make_shared<Client>(fd, threaddata, nullptr, fuzzWebsockets, nullptr, settings, true);
444 444 std::shared_ptr<Client> subscriber = std::make_shared<Client>(fdnull, threaddata, nullptr, fuzzWebsockets, nullptr, settings, true);
445   - subscriber->setClientProperties(ProtocolVersion::Mqtt311, "subscriber", "subuser", true, 60, true);
  445 + subscriber->setClientProperties(ProtocolVersion::Mqtt311, "subscriber", "subuser", true, 60);
446 446 subscriber->setAuthenticated(true);
447 447  
448 448 std::shared_ptr<Client> websocketsubscriber = std::make_shared<Client>(fdnull2, threaddata, nullptr, true, nullptr, settings, true);
449   - websocketsubscriber->setClientProperties(ProtocolVersion::Mqtt311, "websocketsubscriber", "websocksubuser", true, 60, true);
  449 + websocketsubscriber->setClientProperties(ProtocolVersion::Mqtt311, "websocketsubscriber", "websocksubuser", true, 60);
450 450 websocketsubscriber->setAuthenticated(true);
451 451 websocketsubscriber->setFakeUpgraded();
452 452 subscriptionStore->registerClientAndKickExistingOne(websocketsubscriber);
... ...
mqttpacket.cpp
... ... @@ -363,7 +363,7 @@ void MqttPacket::handleConnect()
363 363 bool will_retain = !!(flagByte & 0b00100000);
364 364 char will_qos = (flagByte & 0b00011000) >> 3;
365 365 bool will_flag = !!(flagByte & 0b00000100);
366   - bool clean_session = !!(flagByte & 0b00000010);
  366 + bool clean_start = !!(flagByte & 0b00000010);
367 367  
368 368 if (will_qos > 2)
369 369 throw ProtocolError("Invalid QoS for will.");
... ... @@ -467,9 +467,9 @@ void MqttPacket::handleConnect()
467 467 logger->logf(LOG_ERR, "ClientID '%s' has + or # in the id and 'allow_unsafe_clientid_chars' is false.", client_id.c_str());
468 468 validClientId = false;
469 469 }
470   - else if (!clean_session && client_id.empty())
  470 + else if (!clean_start && client_id.empty())
471 471 {
472   - logger->logf(LOG_ERR, "ClientID empty and clean session 0, which is incompatible");
  472 + logger->logf(LOG_ERR, "ClientID empty and clean start 0, which is incompatible");
473 473 validClientId = false;
474 474 }
475 475 else if (protocolVersion < ProtocolVersion::Mqtt311 && client_id.empty())
... ... @@ -493,7 +493,7 @@ void MqttPacket::handleConnect()
493 493 client_id = getSecureRandomString(23);
494 494 }
495 495  
496   - sender->setClientProperties(protocolVersion, client_id, username, true, keep_alive, clean_session, max_packet_size, max_topic_aliases);
  496 + sender->setClientProperties(protocolVersion, client_id, username, true, keep_alive, max_packet_size, max_topic_aliases);
497 497 sender->setWill(will_topic, will_payload, will_retain, will_qos);
498 498  
499 499 bool accessGranted = false;
... ... @@ -518,7 +518,7 @@ void MqttPacket::handleConnect()
518 518  
519 519 if (accessGranted)
520 520 {
521   - bool sessionPresent = protocolVersion >= ProtocolVersion::Mqtt311 && !clean_session && subscriptionStore->sessionPresent(client_id);
  521 + bool sessionPresent = protocolVersion >= ProtocolVersion::Mqtt311 && !clean_start && subscriptionStore->sessionPresent(client_id);
522 522  
523 523 sender->setAuthenticated(true);
524 524 ConnAck connAck(ConnAckReturnCodes::Accepted, sessionPresent);
... ... @@ -526,7 +526,7 @@ void MqttPacket::handleConnect()
526 526 sender->writeMqttPacket(response);
527 527 logger->logf(LOG_NOTICE, "Client '%s' logged in successfully", sender->repr().c_str());
528 528  
529   - subscriptionStore->registerClientAndKickExistingOne(sender, max_qos_packets, session_expire);
  529 + subscriptionStore->registerClientAndKickExistingOne(sender, clean_start, max_qos_packets, session_expire);
530 530 }
531 531 else
532 532 {
... ...
session.cpp
... ... @@ -76,8 +76,7 @@ bool Session::requiresPacketRetransmission() const
76 76 if (client->getProtocolVersion() < ProtocolVersion::Mqtt311)
77 77 return true;
78 78  
79   - // TODO: for MQTT5, the rules are different.
80   - return !client->getCleanSession();
  79 + return !destroyOnDisconnect;
81 80 }
82 81  
83 82 void Session::increasePacketId()
... ... @@ -343,18 +342,30 @@ void Session::removeOutgoingQoS2MessageId(u_int16_t packet_id)
343 342 outgoingQoS2MessageIds.erase(it);
344 343 }
345 344  
346   -bool Session::getCleanSession() const
  345 +/**
  346 + * @brief Session::getDestroyOnDisconnect
  347 + * @return
  348 + *
  349 + * MQTT5: Setting Clean Start to 1 and a Session Expiry Interval of 0, is equivalent to setting CleanSession to 1 in the MQTT Specification Version 3.1.1.
  350 + */
  351 +bool Session::getDestroyOnDisconnect() const
347 352 {
348   - auto c = client.lock();
349   -
350   - if (!c)
351   - return false;
352   -
353   - return c->getCleanSession();
  353 + return destroyOnDisconnect;
354 354 }
355 355  
356   -void Session::setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval)
  356 +void Session::setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version)
357 357 {
358 358 this->maxQosMsgPending = maxQosPackets;
359 359 this->sessionExpiryInterval = sessionExpiryInterval;
  360 +
  361 + if (protocol_version <= ProtocolVersion::Mqtt311 && clean_start)
  362 + destroyOnDisconnect = true;
  363 + else
  364 + destroyOnDisconnect = sessionExpiryInterval == 0;
360 365 }
  366 +
  367 +uint32_t Session::getSessionExpiryInterval() const
  368 +{
  369 + return this->sessionExpiryInterval;
  370 +}
  371 +
... ...
session.h
... ... @@ -49,6 +49,7 @@ class Session
49 49 uint32_t sessionExpiryInterval = 0;
50 50 uint16_t maxQosMsgPending;
51 51 uint16_t QoSLogPrintedAtId = 0;
  52 + bool destroyOnDisconnect = false;
52 53 std::chrono::time_point<std::chrono::steady_clock> lastTouched = std::chrono::steady_clock::now();
53 54 Logger *logger = Logger::getInstance();
54 55  
... ... @@ -86,9 +87,10 @@ public:
86 87 void addOutgoingQoS2MessageId(uint16_t packet_id);
87 88 void removeOutgoingQoS2MessageId(u_int16_t packet_id);
88 89  
89   - bool getCleanSession() const;
  90 + bool getDestroyOnDisconnect() const;
90 91  
91   - void setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval);
  92 + void setSessionProperties(uint16_t maxQosPackets, uint32_t sessionExpiryInterval, bool clean_start, ProtocolVersion protocol_version);
  93 + uint32_t getSessionExpiryInterval() const;
92 94 };
93 95  
94 96 #endif // SESSION_H
... ...
subscriptionstore.cpp
... ... @@ -204,11 +204,11 @@ void SubscriptionStore::removeSubscription(std::shared_ptr&lt;Client&gt; &amp;client, cons
204 204 void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr<Client> &client)
205 205 {
206 206 const Settings *settings = ThreadGlobals::getSettings();
207   - registerClientAndKickExistingOne(client, settings->maxQosMsgPendingPerClient, settings->expireSessionsAfterSeconds);
  207 + registerClientAndKickExistingOne(client, true, settings->maxQosMsgPendingPerClient, settings->expireSessionsAfterSeconds);
208 208 }
209 209  
210 210 // Removes an existing client when it already exists [MQTT-3.1.4-2].
211   -void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr<Client> &client, uint16_t maxQosPackets, uint32_t sessionExpiryInterval)
  211 +void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr<Client> &client, bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval)
212 212 {
213 213 RWLockGuard lock_guard(&subscriptionsRwlock);
214 214 lock_guard.wrlock();
... ... @@ -216,7 +216,6 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
216 216 if (client->getClientId().empty())
217 217 throw ProtocolError("Trying to store client without an ID.");
218 218  
219   - bool originalClientDemandsSessionDestruction = false;
220 219 std::shared_ptr<Session> session;
221 220 auto session_it = sessionsById.find(client->getClientId());
222 221 if (session_it != sessionsById.end())
... ... @@ -232,11 +231,6 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
232 231 logger->logf(LOG_NOTICE, "Disconnecting existing client with id '%s'", cl->getClientId().c_str());
233 232 cl->setDisconnectReason("Another client with this ID connected");
234 233  
235   - // We have to set session to false, because it's no longer up to the destruction of that client
236   - // to destroy the session. We either do it in this function, or not at all.
237   - originalClientDemandsSessionDestruction = cl->getCleanSession();
238   - cl->setCleanSession(false);
239   -
240 234 cl->setReadyForDisconnect();
241 235 cl->getThreadData()->removeClientQueued(cl);
242 236 cl->markAsDisconnecting();
... ... @@ -245,7 +239,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
245 239 }
246 240 }
247 241  
248   - if (!session || client->getCleanSession() || originalClientDemandsSessionDestruction)
  242 + if (!session || session->getDestroyOnDisconnect())
249 243 {
250 244 session = std::make_shared<Session>();
251 245  
... ... @@ -254,7 +248,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
254 248  
255 249 session->assignActiveConnection(client);
256 250 client->assignSession(session);
257   - session->setSessionProperties(maxQosPackets, sessionExpiryInterval);
  251 + session->setSessionProperties(maxQosPackets, sessionExpiryInterval, clean_start, client->getProtocolVersion());
258 252 uint64_t count = session->sendPendingQosMessages();
259 253 client->getThreadData()->incrementSentMessageCount(count);
260 254 }
... ... @@ -744,7 +738,7 @@ void SubscriptionStore::saveSessionsAndSubscriptions(const std::string &amp;filePath
744 738 const Session &org = *pair.second.get();
745 739  
746 740 // Sessions created with clean session need to be destroyed when disconnecting, so no point in saving them.
747   - if (org.getCleanSession())
  741 + if (org.getDestroyOnDisconnect())
748 742 continue;
749 743  
750 744 sessionCopies.push_back(org.getCopy());
... ...
subscriptionstore.h
... ... @@ -119,7 +119,7 @@ public:
119 119 void addSubscription(std::shared_ptr<Client> &client, const std::string &topic, const std::vector<std::string> &subtopics, char qos);
120 120 void removeSubscription(std::shared_ptr<Client> &client, const std::string &topic);
121 121 void registerClientAndKickExistingOne(std::shared_ptr<Client> &client);
122   - void registerClientAndKickExistingOne(std::shared_ptr<Client> &client, uint16_t maxQosPackets, uint32_t sessionExpiryInterval);
  122 + void registerClientAndKickExistingOne(std::shared_ptr<Client> &client, bool clean_start, uint16_t maxQosPackets, uint32_t sessionExpiryInterval);
123 123 bool sessionPresent(const std::string &clientid);
124 124  
125 125 void queuePacketAtSubscribers(const std::vector<std::string> &subtopics, MqttPacket &packet, bool dollar = false);
... ...