Commit 4bfef9db006a58c8fc1a94a96db78f30b05a2d12

Authored by Wiebe Cazemier
1 parent af190a63

Fix several deadlocks

One fix is client destruction happening on the correct thread (when
kicking another one off with existing client ID). This caused deadlocks
on the subscriptions lock during a race condition when
doKeepAliveCheck() also ran.

A related deadlock was that the queued functions were executed while
holding the lock taskQueueMutex. Together with the subscriptions lock,
that was executed in the $SYS topic function, this also caused
deadlocks.
client.cpp
@@ -222,7 +222,7 @@ void Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet, const c @@ -222,7 +222,7 @@ void Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet, const c
222 } 222 }
223 catch (std::exception &ex) 223 catch (std::exception &ex)
224 { 224 {
225 - threadData->removeClient(fd); 225 + threadData->removeClientQueued(fd);
226 } 226 }
227 } 227 }
228 228
@@ -322,6 +322,11 @@ void Client::resetBuffersIfEligible() @@ -322,6 +322,11 @@ void Client::resetBuffersIfEligible()
322 writebuf.resetSizeIfEligable(initialBufferSize); 322 writebuf.resetSizeIfEligable(initialBufferSize);
323 } 323 }
324 324
  325 +void Client::setCleanSession(bool val)
  326 +{
  327 + this->cleanSession = val;
  328 +}
  329 +
325 #ifndef NDEBUG 330 #ifndef NDEBUG
326 /** 331 /**
327 * @brief IoWrapper::setFakeUpgraded(). 332 * @brief IoWrapper::setFakeUpgraded().
client.h
@@ -134,6 +134,8 @@ public: @@ -134,6 +134,8 @@ public:
134 std::string getKeepAliveInfoString() const; 134 std::string getKeepAliveInfoString() const;
135 void resetBuffersIfEligible(); 135 void resetBuffersIfEligible();
136 136
  137 + void setCleanSession(bool val);
  138 +
137 #ifndef NDEBUG 139 #ifndef NDEBUG
138 void setFakeUpgraded(); 140 void setFakeUpgraded();
139 #endif 141 #endif
mqttpacket.cpp
@@ -412,7 +412,7 @@ void MqttPacket::handleDisconnect() @@ -412,7 +412,7 @@ void MqttPacket::handleDisconnect()
412 sender->setDisconnectReason("MQTT Disconnect received."); 412 sender->setDisconnectReason("MQTT Disconnect received.");
413 sender->markAsDisconnecting(); 413 sender->markAsDisconnecting();
414 sender->clearWill(); 414 sender->clearWill();
415 - sender->getThreadData()->removeClient(sender); 415 + sender->getThreadData()->removeClientQueued(sender);
416 } 416 }
417 417
418 void MqttPacket::handleSubscribe() 418 void MqttPacket::handleSubscribe()
subscriptionstore.cpp
@@ -202,6 +202,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr<Client> @@ -202,6 +202,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr<Client>
202 if (client->getClientId().empty()) 202 if (client->getClientId().empty())
203 throw ProtocolError("Trying to store client without an ID."); 203 throw ProtocolError("Trying to store client without an ID.");
204 204
  205 + bool originalClientDemandsSessionDestruction = false;
205 std::shared_ptr<Session> session; 206 std::shared_ptr<Session> session;
206 auto session_it = sessionsById.find(client->getClientId()); 207 auto session_it = sessionsById.find(client->getClientId());
207 if (session_it != sessionsById.end()) 208 if (session_it != sessionsById.end())
@@ -215,14 +216,22 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt; @@ -215,14 +216,22 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr&lt;Client&gt;
215 if (cl) 216 if (cl)
216 { 217 {
217 logger->logf(LOG_NOTICE, "Disconnecting existing client with id '%s'", cl->getClientId().c_str()); 218 logger->logf(LOG_NOTICE, "Disconnecting existing client with id '%s'", cl->getClientId().c_str());
  219 + cl->setDisconnectReason("Another client with this ID connected");
  220 +
  221 + // We have to set session to false, because it's no longer up to the destruction of that client
  222 + // to destroy the session. We either do it in this function, or not at all.
  223 + originalClientDemandsSessionDestruction = cl->getCleanSession();
  224 + cl->setCleanSession(false);
  225 +
218 cl->setReadyForDisconnect(); 226 cl->setReadyForDisconnect();
219 - cl->getThreadData()->removeClient(cl); 227 + cl->getThreadData()->removeClientQueued(cl);
220 cl->markAsDisconnecting(); 228 cl->markAsDisconnecting();
221 } 229 }
  230 +
222 } 231 }
223 } 232 }
224 233
225 - if (!session || client->getCleanSession()) 234 + if (!session || client->getCleanSession() || originalClientDemandsSessionDestruction)
226 { 235 {
227 session.reset(new Session()); 236 session.reset(new Session());
228 237
threaddata.cpp
@@ -131,6 +131,36 @@ void ThreadData::publishStat(const std::string &amp;topic, uint64_t n) @@ -131,6 +131,36 @@ void ThreadData::publishStat(const std::string &amp;topic, uint64_t n)
131 subscriptionStore->setRetainedMessage(topic, subtopics, payload, 0); 131 subscriptionStore->setRetainedMessage(topic, subtopics, payload, 0);
132 } 132 }
133 133
  134 +void ThreadData::removeQueuedClients()
  135 +{
  136 + std::vector<int> fds;
  137 + fds.reserve(1024); // 1024 is arbitrary...
  138 +
  139 + {
  140 + std::lock_guard<std::mutex> lck2(clientsToRemoveMutex);
  141 +
  142 + for (const std::weak_ptr<Client> &c : clientsQueuedForRemoving)
  143 + {
  144 + std::shared_ptr<Client> client = c.lock();
  145 + if (client)
  146 + {
  147 + int fd = client->getFd();
  148 + fds.push_back(fd);
  149 + }
  150 + }
  151 +
  152 + clientsQueuedForRemoving.clear();
  153 + }
  154 +
  155 + {
  156 + std::lock_guard<std::mutex> lck(clients_by_fd_mutex);
  157 + for(int fd : fds)
  158 + {
  159 + clients_by_fd.erase(fd);
  160 + }
  161 + }
  162 +}
  163 +
134 void ThreadData::giveClient(std::shared_ptr<Client> client) 164 void ThreadData::giveClient(std::shared_ptr<Client> client)
135 { 165 {
136 clients_by_fd_mutex.lock(); 166 clients_by_fd_mutex.lock();
@@ -151,25 +181,70 @@ std::shared_ptr&lt;Client&gt; ThreadData::getClient(int fd) @@ -151,25 +181,70 @@ std::shared_ptr&lt;Client&gt; ThreadData::getClient(int fd)
151 return this->clients_by_fd[fd]; 181 return this->clients_by_fd[fd];
152 } 182 }
153 183
154 -void ThreadData::removeClient(std::shared_ptr<Client> client) 184 +void ThreadData::removeClientQueued(const std::shared_ptr<Client> &client)
155 { 185 {
156 - client->markAsDisconnecting(); 186 + bool wakeUpNeeded = true;
157 187
158 - std::lock_guard<std::mutex> lck(clients_by_fd_mutex);  
159 - clients_by_fd.erase(client->getFd()); 188 + {
  189 + std::lock_guard<std::mutex> locker(clientsToRemoveMutex);
  190 + wakeUpNeeded = clientsQueuedForRemoving.empty();
  191 + clientsQueuedForRemoving.push_front(client);
  192 + }
  193 +
  194 + if (wakeUpNeeded)
  195 + {
  196 + auto f = std::bind(&ThreadData::removeQueuedClients, this);
  197 + std::lock_guard<std::mutex> lockertaskQueue(taskQueueMutex);
  198 + taskQueue.push_front(f);
  199 +
  200 + wakeUpThread();
  201 + }
160 } 202 }
161 203
162 -void ThreadData::removeClient(int fd) 204 +void ThreadData::removeClientQueued(int fd)
163 { 205 {
164 - std::lock_guard<std::mutex> lck(clients_by_fd_mutex);  
165 - auto client_it = this->clients_by_fd.find(fd);  
166 - if (client_it != this->clients_by_fd.end()) 206 + bool wakeUpNeeded = true;
  207 + std::shared_ptr<Client> clientFound;
  208 +
  209 + {
  210 + std::lock_guard<std::mutex> lck(clients_by_fd_mutex);
  211 + auto client_it = this->clients_by_fd.find(fd);
  212 + if (client_it != this->clients_by_fd.end())
  213 + {
  214 + clientFound = client_it->second;
  215 + }
  216 + }
  217 +
  218 + if (clientFound)
167 { 219 {
168 - client_it->second->markAsDisconnecting();  
169 - this->clients_by_fd.erase(fd); 220 + {
  221 + std::lock_guard<std::mutex> locker(clientsToRemoveMutex);
  222 + wakeUpNeeded = clientsQueuedForRemoving.empty();
  223 + clientsQueuedForRemoving.push_front(clientFound);
  224 + }
  225 +
  226 + if (wakeUpNeeded)
  227 + {
  228 + auto f = std::bind(&ThreadData::removeQueuedClients, this);
  229 + std::lock_guard<std::mutex> lockertaskQueue(taskQueueMutex);
  230 + taskQueue.push_front(f);
  231 +
  232 + wakeUpThread();
  233 + }
170 } 234 }
171 } 235 }
172 236
  237 +void ThreadData::removeClient(std::shared_ptr<Client> client)
  238 +{
  239 + // This function is only for same-thread calling.
  240 + assert(pthread_self() == thread.native_handle());
  241 +
  242 + client->markAsDisconnecting();
  243 +
  244 + std::lock_guard<std::mutex> lck(clients_by_fd_mutex);
  245 + clients_by_fd.erase(client->getFd());
  246 +}
  247 +
173 std::shared_ptr<SubscriptionStore> &ThreadData::getSubscriptionStore() 248 std::shared_ptr<SubscriptionStore> &ThreadData::getSubscriptionStore()
174 { 249 {
175 return subscriptionStore; 250 return subscriptionStore;
threaddata.h
@@ -56,6 +56,8 @@ class ThreadData @@ -56,6 +56,8 @@ class ThreadData
56 uint64_t sentMessageCountPrevious = 0; 56 uint64_t sentMessageCountPrevious = 0;
57 std::chrono::time_point<std::chrono::steady_clock> sentMessagePreviousTime = std::chrono::steady_clock::now(); 57 std::chrono::time_point<std::chrono::steady_clock> sentMessagePreviousTime = std::chrono::steady_clock::now();
58 58
  59 + std::mutex clientsToRemoveMutex;
  60 + std::forward_list<std::weak_ptr<Client>> clientsQueuedForRemoving;
59 61
60 void reload(std::shared_ptr<Settings> settings); 62 void reload(std::shared_ptr<Settings> settings);
61 void wakeUpThread(); 63 void wakeUpThread();
@@ -64,6 +66,8 @@ class ThreadData @@ -64,6 +66,8 @@ class ThreadData
64 void publishStatsOnDollarTopic(std::vector<std::shared_ptr<ThreadData>> &threads); 66 void publishStatsOnDollarTopic(std::vector<std::shared_ptr<ThreadData>> &threads);
65 void publishStat(const std::string &topic, uint64_t n); 67 void publishStat(const std::string &topic, uint64_t n);
66 68
  69 + void removeQueuedClients();
  70 +
67 public: 71 public:
68 Settings settingsLocalCopy; // Is updated on reload, within the thread loop. 72 Settings settingsLocalCopy; // Is updated on reload, within the thread loop.
69 Authentication authentication; 73 Authentication authentication;
@@ -84,8 +88,9 @@ public: @@ -84,8 +88,9 @@ public:
84 88
85 void giveClient(std::shared_ptr<Client> client); 89 void giveClient(std::shared_ptr<Client> client);
86 std::shared_ptr<Client> getClient(int fd); 90 std::shared_ptr<Client> getClient(int fd);
  91 + void removeClientQueued(const std::shared_ptr<Client> &client);
  92 + void removeClientQueued(int fd);
87 void removeClient(std::shared_ptr<Client> client); 93 void removeClient(std::shared_ptr<Client> client);
88 - void removeClient(int fd);  
89 std::shared_ptr<SubscriptionStore> &getSubscriptionStore(); 94 std::shared_ptr<SubscriptionStore> &getSubscriptionStore();
90 95
91 void initAuthPlugin(); 96 void initAuthPlugin();
threadloop.cpp
@@ -64,12 +64,21 @@ void do_thread_work(ThreadData *threadData) @@ -64,12 +64,21 @@ void do_thread_work(ThreadData *threadData)
64 uint64_t eventfd_value = 0; 64 uint64_t eventfd_value = 0;
65 check<std::runtime_error>(read(fd, &eventfd_value, sizeof(uint64_t))); 65 check<std::runtime_error>(read(fd, &eventfd_value, sizeof(uint64_t)));
66 66
67 - std::lock_guard<std::mutex> locker(threadData->taskQueueMutex);  
68 - for(auto &f : threadData->taskQueue) 67 + std::forward_list<std::function<void()>> copiedTasks;
  68 +
  69 + {
  70 + std::lock_guard<std::mutex> locker(threadData->taskQueueMutex);
  71 + for(auto &f : threadData->taskQueue)
  72 + {
  73 + copiedTasks.push_front(std::move(f));
  74 + }
  75 + threadData->taskQueue.clear();
  76 + }
  77 +
  78 + for(auto &f : copiedTasks)
69 { 79 {
70 f(); 80 f();
71 } 81 }
72 - threadData->taskQueue.clear();  
73 82
74 continue; 83 continue;
75 } 84 }