Commit cffcf3cf20ebe5460d34d3f3aa25fbadf3a061eb

Authored by Wiebe Cazemier
1 parent f8e062bf

Weak pointers to clients in subscription store

subscriptionstore.cpp
@@ -49,13 +49,6 @@ void SubscriptionStore::addSubscription(Client_p &client, const std::string &top @@ -49,13 +49,6 @@ void SubscriptionStore::addSubscription(Client_p &client, const std::string &top
49 giveClientRetainedMessages(client, topic); 49 giveClientRetainedMessages(client, topic);
50 } 50 }
51 51
52 -void SubscriptionStore::removeClient(const Client_p &client)  
53 -{  
54 - RWLockGuard lock_guard(&subscriptionsRwlock);  
55 - lock_guard.wrlock();  
56 - clients_by_id.erase(client->getClientId());  
57 -}  
58 -  
59 // TODO: should I implement cache, this needs to be changed to returning a list of clients. 52 // TODO: should I implement cache, this needs to be changed to returning a list of clients.
60 void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const std::forward_list<std::string> &subscribers) const 53 void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const std::forward_list<std::string> &subscribers) const
61 { 54 {
@@ -64,7 +57,11 @@ void SubscriptionStore::publishNonRecursively(const MqttPacket &amp;packet, const st @@ -64,7 +57,11 @@ void SubscriptionStore::publishNonRecursively(const MqttPacket &amp;packet, const st
64 auto client_it = clients_by_id_const.find(client_id); 57 auto client_it = clients_by_id_const.find(client_id);
65 if (client_it != clients_by_id_const.end()) 58 if (client_it != clients_by_id_const.end())
66 { 59 {
67 - client_it->second->writeMqttPacketAndBlameThisClient(packet); 60 + if (!client_it->second.expired())
  61 + {
  62 + Client_p c = client_it->second.lock();
  63 + c->writeMqttPacketAndBlameThisClient(packet);
  64 + }
68 } 65 }
69 } 66 }
70 } 67 }
subscriptionstore.h
@@ -36,8 +36,8 @@ class SubscriptionStore @@ -36,8 +36,8 @@ class SubscriptionStore
36 { 36 {
37 std::unique_ptr<SubscriptionNode> root; 37 std::unique_ptr<SubscriptionNode> root;
38 pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER; 38 pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER;
39 - std::unordered_map<std::string, Client_p> clients_by_id;  
40 - const std::unordered_map<std::string, Client_p> &clients_by_id_const; 39 + std::unordered_map<std::string, std::weak_ptr<Client>> clients_by_id;
  40 + const std::unordered_map<std::string, std::weak_ptr<Client>> &clients_by_id_const;
41 41
42 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; 42 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER;
43 std::unordered_set<RetainedMessage> retainedMessages; 43 std::unordered_set<RetainedMessage> retainedMessages;
@@ -49,7 +49,6 @@ public: @@ -49,7 +49,6 @@ public:
49 SubscriptionStore(); 49 SubscriptionStore();
50 50
51 void addSubscription(Client_p &client, const std::string &topic); 51 void addSubscription(Client_p &client, const std::string &topic);
52 - void removeClient(const Client_p &client);  
53 52
54 void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet, const Client_p &sender); 53 void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet, const Client_p &sender);
55 void giveClientRetainedMessages(Client_p &client, const std::string &subscribe_topic); 54 void giveClientRetainedMessages(Client_p &client, const std::string &subscribe_topic);
threaddata.cpp
@@ -52,7 +52,6 @@ void ThreadData::removeClient(Client_p client) @@ -52,7 +52,6 @@ void ThreadData::removeClient(Client_p client)
52 52
53 std::lock_guard<std::mutex> lck(clients_by_fd_mutex); 53 std::lock_guard<std::mutex> lck(clients_by_fd_mutex);
54 clients_by_fd.erase(client->getFd()); 54 clients_by_fd.erase(client->getFd());
55 - subscriptionStore->removeClient(client);  
56 } 55 }
57 56
58 void ThreadData::removeClient(int fd) 57 void ThreadData::removeClient(int fd)
@@ -62,7 +61,6 @@ void ThreadData::removeClient(int fd) @@ -62,7 +61,6 @@ void ThreadData::removeClient(int fd)
62 if (client_it != this->clients_by_fd.end()) 61 if (client_it != this->clients_by_fd.end())
63 { 62 {
64 client_it->second->markAsDisconnecting(); 63 client_it->second->markAsDisconnecting();
65 - subscriptionStore->removeClient(client_it->second);  
66 this->clients_by_fd.erase(fd); 64 this->clients_by_fd.erase(fd);
67 } 65 }
68 } 66 }
@@ -85,7 +83,6 @@ bool ThreadData::doKeepAliveCheck() @@ -85,7 +83,6 @@ bool ThreadData::doKeepAliveCheck()
85 Client_p &client = it->second; 83 Client_p &client = it->second;
86 if (client->keepAliveExpired()) 84 if (client->keepAliveExpired())
87 { 85 {
88 - subscriptionStore->removeClient(client);  
89 it = clients_by_fd.erase(it); 86 it = clients_by_fd.erase(it);
90 } 87 }
91 else 88 else