Commit 43df4fc2433380f915f8ad77986d1b261935d4a2

Authored by Wiebe Cazemier
1 parent 68bd5080

Disconnect client with existing ID

mqttpacket.cpp
@@ -221,6 +221,8 @@ void MqttPacket::handleConnect() @@ -221,6 +221,8 @@ void MqttPacket::handleConnect()
221 221
222 if (sender->getThreadData()->authPlugin.unPwdCheck(username, password) == AuthResult::success) 222 if (sender->getThreadData()->authPlugin.unPwdCheck(username, password) == AuthResult::success)
223 { 223 {
  224 + sender->getThreadData()->getSubscriptionStore()->registerClientAndKickExistingOne(sender);
  225 +
224 sender->setAuthenticated(true); 226 sender->setAuthenticated(true);
225 ConnAck connAck(ConnAckReturnCodes::Accepted); 227 ConnAck connAck(ConnAckReturnCodes::Accepted);
226 MqttPacket response(connAck); 228 MqttPacket response(connAck);
subscriptionstore.cpp
@@ -51,12 +51,35 @@ void SubscriptionStore::addSubscription(Client_p &client, const std::string &top @@ -51,12 +51,35 @@ void SubscriptionStore::addSubscription(Client_p &client, const std::string &top
51 deepestNode->subscribers.push_front(client->getClientId()); 51 deepestNode->subscribers.push_front(client->getClientId());
52 } 52 }
53 53
54 - clients_by_id[client->getClientId()] = client;  
55 lock_guard.unlock(); 54 lock_guard.unlock();
56 55
57 giveClientRetainedMessages(client, topic); 56 giveClientRetainedMessages(client, topic);
58 } 57 }
59 58
  59 +// Removes an existing client when it already exists [MQTT-3.1.4-2].
  60 +void SubscriptionStore::registerClientAndKickExistingOne(Client_p &client)
  61 +{
  62 + RWLockGuard lock_guard(&subscriptionsRwlock);
  63 + lock_guard.wrlock();
  64 +
  65 + if (client->getClientId().empty())
  66 + throw ProtocolError("Trying to store client without an ID.");
  67 +
  68 + std::weak_ptr<Client> existingClient = clients_by_id[client->getClientId()];
  69 + auto it = clients_by_id.find(client->getClientId());
  70 +
  71 + if (it != clients_by_id.end() && !it->second.expired())
  72 + {
  73 + std::shared_ptr<Client> cl = it->second.lock();
  74 + logger->logf(LOG_NOTICE, "Disconnecting existing client with id '%s'", cl->getClientId().c_str());
  75 + cl->setReadyForDisconnect();
  76 + cl->getThreadData()->removeClient(cl);
  77 + cl->markAsDisconnecting();
  78 + }
  79 +
  80 + clients_by_id[client->getClientId()] = client;
  81 +}
  82 +
60 // TODO: should I implement cache, this needs to be changed to returning a list of clients. 83 // TODO: should I implement cache, this needs to be changed to returning a list of clients.
61 void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const std::forward_list<std::string> &subscribers) const 84 void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const std::forward_list<std::string> &subscribers) const
62 { 85 {
subscriptionstore.h
@@ -12,6 +12,7 @@ @@ -12,6 +12,7 @@
12 #include "client.h" 12 #include "client.h"
13 #include "utils.h" 13 #include "utils.h"
14 #include "retainedmessage.h" 14 #include "retainedmessage.h"
  15 +#include "logger.h"
15 16
16 struct RetainedPayload 17 struct RetainedPayload
17 { 18 {
@@ -44,6 +45,8 @@ class SubscriptionStore @@ -44,6 +45,8 @@ class SubscriptionStore
44 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; 45 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER;
45 std::unordered_set<RetainedMessage> retainedMessages; 46 std::unordered_set<RetainedMessage> retainedMessages;
46 47
  48 + Logger *logger = Logger::getInstance();
  49 +
47 void publishNonRecursively(const MqttPacket &packet, const std::forward_list<std::string> &subscribers) const; 50 void publishNonRecursively(const MqttPacket &packet, const std::forward_list<std::string> &subscribers) const;
48 void publishRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end, 51 void publishRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end,
49 std::unique_ptr<SubscriptionNode> &next, const MqttPacket &packet) const; 52 std::unique_ptr<SubscriptionNode> &next, const MqttPacket &packet) const;
@@ -51,6 +54,7 @@ public: @@ -51,6 +54,7 @@ public:
51 SubscriptionStore(); 54 SubscriptionStore();
52 55
53 void addSubscription(Client_p &client, const std::string &topic); 56 void addSubscription(Client_p &client, const std::string &topic);
  57 + void registerClientAndKickExistingOne(Client_p &client);
54 58
55 void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet, const Client_p &sender); 59 void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet, const Client_p &sender);
56 void giveClientRetainedMessages(Client_p &client, const std::string &subscribe_topic); 60 void giveClientRetainedMessages(Client_p &client, const std::string &subscribe_topic);