From cffcf3cf20ebe5460d34d3f3aa25fbadf3a061eb Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Mon, 28 Dec 2020 14:41:33 +0100 Subject: [PATCH] Weak pointers to clients in subscription store --- subscriptionstore.cpp | 13 +++++-------- subscriptionstore.h | 5 ++--- threaddata.cpp | 3 --- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 31cba2f..337e4a9 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -49,13 +49,6 @@ void SubscriptionStore::addSubscription(Client_p &client, const std::string &top giveClientRetainedMessages(client, topic); } -void SubscriptionStore::removeClient(const Client_p &client) -{ - RWLockGuard lock_guard(&subscriptionsRwlock); - lock_guard.wrlock(); - clients_by_id.erase(client->getClientId()); -} - // TODO: should I implement cache, this needs to be changed to returning a list of clients. void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const std::forward_list &subscribers) const { @@ -64,7 +57,11 @@ void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const st auto client_it = clients_by_id_const.find(client_id); if (client_it != clients_by_id_const.end()) { - client_it->second->writeMqttPacketAndBlameThisClient(packet); + if (!client_it->second.expired()) + { + Client_p c = client_it->second.lock(); + c->writeMqttPacketAndBlameThisClient(packet); + } } } } diff --git a/subscriptionstore.h b/subscriptionstore.h index 4a8392e..9f5fc61 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -36,8 +36,8 @@ class SubscriptionStore { std::unique_ptr root; pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER; - std::unordered_map clients_by_id; - const std::unordered_map &clients_by_id_const; + std::unordered_map> clients_by_id; + const std::unordered_map> &clients_by_id_const; pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; std::unordered_set retainedMessages; @@ -49,7 +49,6 @@ public: SubscriptionStore(); void addSubscription(Client_p &client, const std::string &topic); - void removeClient(const Client_p &client); void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet, const Client_p &sender); void giveClientRetainedMessages(Client_p &client, const std::string &subscribe_topic); diff --git a/threaddata.cpp b/threaddata.cpp index 7bbc939..939cad1 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -52,7 +52,6 @@ void ThreadData::removeClient(Client_p client) std::lock_guard lck(clients_by_fd_mutex); clients_by_fd.erase(client->getFd()); - subscriptionStore->removeClient(client); } void ThreadData::removeClient(int fd) @@ -62,7 +61,6 @@ void ThreadData::removeClient(int fd) if (client_it != this->clients_by_fd.end()) { client_it->second->markAsDisconnecting(); - subscriptionStore->removeClient(client_it->second); this->clients_by_fd.erase(fd); } } @@ -85,7 +83,6 @@ bool ThreadData::doKeepAliveCheck() Client_p &client = it->second; if (client->keepAliveExpired()) { - subscriptionStore->removeClient(client); it = clients_by_fd.erase(it); } else -- libgit2 0.21.4