Commit 4d4e0ec51d126ba62ef2f5384e759408b4024d4d

Authored by Wiebe Cazemier
1 parent 9dc1e6ed

Publish retained messages outside of retainedMessagesRwlock

subscriptionstore.cpp
@@ -359,8 +359,8 @@ void SubscriptionStore::queuePacketAtSubscribers(const std::vector<std::string> @@ -359,8 +359,8 @@ void SubscriptionStore::queuePacketAtSubscribers(const std::vector<std::string>
359 } 359 }
360 360
361 void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end, 361 void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end,
362 - RetainedMessageNode *this_node, char max_qos, const std::shared_ptr<Session> &ses,  
363 - bool poundMode, uint64_t &count) const 362 + RetainedMessageNode *this_node,
  363 + bool poundMode, std::forward_list<MqttPacket> &packetList) const
364 { 364 {
365 if (cur_subtopic_it == end) 365 if (cur_subtopic_it == end)
366 { 366 {
@@ -368,15 +368,14 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector&lt;std::s @@ -368,15 +368,14 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector&lt;std::s
368 { 368 {
369 Publish publish(rm.topic, rm.payload, rm.qos); 369 Publish publish(rm.topic, rm.payload, rm.qos);
370 publish.retain = true; 370 publish.retain = true;
371 - const MqttPacket packet(publish);  
372 - ses->writePacket(packet, max_qos, true, count); 371 + packetList.emplace_front(publish);
373 } 372 }
374 if (poundMode) 373 if (poundMode)
375 { 374 {
376 for (auto &pair : this_node->children) 375 for (auto &pair : this_node->children)
377 { 376 {
378 std::unique_ptr<RetainedMessageNode> &child = pair.second; 377 std::unique_ptr<RetainedMessageNode> &child = pair.second;
379 - giveClientRetainedMessagesRecursively(cur_subtopic_it, end, child.get(), max_qos, ses, poundMode, count); 378 + giveClientRetainedMessagesRecursively(cur_subtopic_it, end, child.get(), poundMode, packetList);
380 } 379 }
381 } 380 }
382 381
@@ -393,7 +392,7 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector&lt;std::s @@ -393,7 +392,7 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector&lt;std::s
393 { 392 {
394 std::unique_ptr<RetainedMessageNode> &child = pair.second; 393 std::unique_ptr<RetainedMessageNode> &child = pair.second;
395 if (child) // I don't think it can ever be unset, but I'd rather avoid a crash. 394 if (child) // I don't think it can ever be unset, but I'd rather avoid a crash.
396 - giveClientRetainedMessagesRecursively(next_subtopic, end, child.get(), max_qos, ses, poundFound, count); 395 + giveClientRetainedMessagesRecursively(next_subtopic, end, child.get(), poundFound, packetList);
397 } 396 }
398 } 397 }
399 else 398 else
@@ -402,7 +401,7 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector&lt;std::s @@ -402,7 +401,7 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector&lt;std::s
402 401
403 if (children) 402 if (children)
404 { 403 {
405 - giveClientRetainedMessagesRecursively(next_subtopic, end, children, max_qos, ses, false, count); 404 + giveClientRetainedMessagesRecursively(next_subtopic, end, children, false, packetList);
406 } 405 }
407 } 406 }
408 } 407 }
@@ -415,10 +414,18 @@ uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr&lt;Ses @@ -415,10 +414,18 @@ uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr&lt;Ses
415 if (!subscribeSubtopics.empty() && !subscribeSubtopics[0].empty() > 0 && subscribeSubtopics[0][0] == '$') 414 if (!subscribeSubtopics.empty() && !subscribeSubtopics[0].empty() > 0 && subscribeSubtopics[0][0] == '$')
416 startNode = &retainedMessagesRootDollar; 415 startNode = &retainedMessagesRootDollar;
417 416
418 - RWLockGuard locker(&retainedMessagesRwlock);  
419 - locker.rdlock(); 417 + std::forward_list<MqttPacket> packetList;
  418 +
  419 + {
  420 + RWLockGuard locker(&retainedMessagesRwlock);
  421 + locker.rdlock();
  422 + giveClientRetainedMessagesRecursively(subscribeSubtopics.begin(), subscribeSubtopics.end(), startNode, false, packetList);
  423 + }
420 424
421 - giveClientRetainedMessagesRecursively(subscribeSubtopics.begin(), subscribeSubtopics.end(), startNode, max_qos, ses, false, count); 425 + for(const MqttPacket &packet : packetList)
  426 + {
  427 + ses->writePacket(packet, max_qos, true, count);
  428 + }
422 429
423 return count; 430 return count;
424 } 431 }
subscriptionstore.h
@@ -122,8 +122,7 @@ public: @@ -122,8 +122,7 @@ public:
122 122
123 void queuePacketAtSubscribers(const std::vector<std::string> &subtopics, const MqttPacket &packet, bool dollar = false); 123 void queuePacketAtSubscribers(const std::vector<std::string> &subtopics, const MqttPacket &packet, bool dollar = false);
124 void giveClientRetainedMessagesRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end, 124 void giveClientRetainedMessagesRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end,
125 - RetainedMessageNode *this_node, char max_qos, const std::shared_ptr<Session> &ses,  
126 - bool poundMode, uint64_t &count) const; 125 + RetainedMessageNode *this_node, bool poundMode, std::forward_list<MqttPacket> &packetList) const;
127 uint64_t giveClientRetainedMessages(const std::shared_ptr<Session> &ses, const std::vector<std::string> &subscribeSubtopics, char max_qos); 126 uint64_t giveClientRetainedMessages(const std::shared_ptr<Session> &ses, const std::vector<std::string> &subscribeSubtopics, char max_qos);
128 127
129 void setRetainedMessage(const std::string &topic, const std::vector<std::string> &subtopics, const std::string &payload, char qos); 128 void setRetainedMessage(const std::string &topic, const std::vector<std::string> &subtopics, const std::string &payload, char qos);