Commit 51127c3d7603ef932ab662217d4f63bd118a3698

Authored by Wiebe Cazemier
1 parent aa87dfb6

Remove session of client with clean session on disconnect

Mandatory normative statement MQTT-3.1.2-6.
client.cpp
@@ -49,11 +49,11 @@ Client::Client(int fd, std::shared_ptr<ThreadData> threadData, SSL *ssl, bool we @@ -49,11 +49,11 @@ Client::Client(int fd, std::shared_ptr<ThreadData> threadData, SSL *ssl, bool we
49 49
50 Client::~Client() 50 Client::~Client()
51 { 51 {
  52 + std::shared_ptr<SubscriptionStore> &store = getThreadData()->getSubscriptionStore();
  53 +
52 // Will payload can be empty, apparently. 54 // Will payload can be empty, apparently.
53 if (!will_topic.empty()) 55 if (!will_topic.empty())
54 { 56 {
55 - std::shared_ptr<SubscriptionStore> &store = getThreadData()->getSubscriptionStore();  
56 -  
57 Publish will(will_topic, will_payload, will_qos); 57 Publish will(will_topic, will_payload, will_qos);
58 will.retain = will_retain; 58 will.retain = will_retain;
59 const MqttPacket willPacket(will); 59 const MqttPacket willPacket(will);
@@ -69,6 +69,12 @@ Client::~Client() @@ -69,6 +69,12 @@ Client::~Client()
69 if (epoll_ctl(threadData->epollfd, EPOLL_CTL_DEL, fd, NULL) != 0) 69 if (epoll_ctl(threadData->epollfd, EPOLL_CTL_DEL, fd, NULL) != 0)
70 logger->logf(LOG_ERR, "Removing fd %d of client '%s' from epoll produced error: %s", fd, repr().c_str(), strerror(errno)); 70 logger->logf(LOG_ERR, "Removing fd %d of client '%s' from epoll produced error: %s", fd, repr().c_str(), strerror(errno));
71 close(fd); 71 close(fd);
  72 +
  73 + // MQTT-3.1.2-6
  74 + if (cleanSession)
  75 + {
  76 + store->removeSession(clientid);
  77 + }
72 } 78 }
73 79
74 bool Client::isSslAccepted() const 80 bool Client::isSslAccepted() const
subscriptionstore.cpp
@@ -492,6 +492,20 @@ int SubscriptionNode::cleanSubscriptions() @@ -492,6 +492,20 @@ int SubscriptionNode::cleanSubscriptions()
492 return subscribers.size() + subscribersLeftInChildren; 492 return subscribers.size() + subscribersLeftInChildren;
493 } 493 }
494 494
  495 +void SubscriptionStore::removeSession(const std::string &clientid)
  496 +{
  497 + RWLockGuard lock_guard(&subscriptionsRwlock);
  498 + lock_guard.wrlock();
  499 +
  500 + logger->logf(LOG_DEBUG, "Removing session of client '%s'.", clientid.c_str());
  501 +
  502 + auto session_it = sessionsById.begin();
  503 + if (session_it != sessionsById.end())
  504 + {
  505 + sessionsById.erase(session_it);
  506 + }
  507 +}
  508 +
495 // This is not MQTT compliant, but the standard doesn't keep real world constraints into account. 509 // This is not MQTT compliant, but the standard doesn't keep real world constraints into account.
496 void SubscriptionStore::removeExpiredSessionsClients(int expireSessionsAfterSeconds) 510 void SubscriptionStore::removeExpiredSessionsClients(int expireSessionsAfterSeconds)
497 { 511 {
subscriptionstore.h
@@ -118,6 +118,7 @@ public: @@ -118,6 +118,7 @@ public:
118 118
119 void setRetainedMessage(const std::string &topic, const std::vector<std::string> &subtopics, const std::string &payload, char qos); 119 void setRetainedMessage(const std::string &topic, const std::vector<std::string> &subtopics, const std::string &payload, char qos);
120 120
  121 + void removeSession(const std::string &clientid);
121 void removeExpiredSessionsClients(int expireSessionsAfterSeconds); 122 void removeExpiredSessionsClients(int expireSessionsAfterSeconds);
122 123
123 int64_t getRetainedMessageCount() const; 124 int64_t getRetainedMessageCount() const;