Commit 1bca4cbbbcfd4c528d20c697830d06d54681e482

Authored by Wiebe Cazemier
1 parent 6158d91b

Use 'Publish' for retained instead of 'MqttPacket'

This is better in line with how the copy factory is meant to be used. It
actually broke on the assert on 'externallyReceived' before.
subscriptionstore.cpp
@@ -147,7 +147,7 @@ void SubscriptionStore::addSubscription(std::shared_ptr<Client> &client, const s @@ -147,7 +147,7 @@ void SubscriptionStore::addSubscription(std::shared_ptr<Client> &client, const s
147 const std::shared_ptr<Session> &ses = session_it->second; 147 const std::shared_ptr<Session> &ses = session_it->second;
148 deepestNode->addSubscriber(ses, qos); 148 deepestNode->addSubscriber(ses, qos);
149 lock_guard.unlock(); 149 lock_guard.unlock();
150 - uint64_t count = giveClientRetainedMessages(client, ses, subtopics, qos); 150 + uint64_t count = giveClientRetainedMessages(ses, subtopics, qos);
151 client->getThreadData()->incrementSentMessageCount(count); 151 client->getThreadData()->incrementSentMessageCount(count);
152 } 152 }
153 } 153 }
@@ -447,25 +447,23 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory &amp;copyFactory @@ -447,25 +447,23 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory &amp;copyFactory
447 } 447 }
448 } 448 }
449 449
450 -void SubscriptionStore::giveClientRetainedMessagesRecursively(ProtocolVersion protocolVersion, std::vector<std::string>::const_iterator cur_subtopic_it, 450 +void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector<std::string>::const_iterator cur_subtopic_it,
451 std::vector<std::string>::const_iterator end, RetainedMessageNode *this_node, 451 std::vector<std::string>::const_iterator end, RetainedMessageNode *this_node,
452 - bool poundMode, std::forward_list<MqttPacket> &packetList) const 452 + bool poundMode, std::forward_list<Publish> &packetList) const
453 { 453 {
454 if (cur_subtopic_it == end) 454 if (cur_subtopic_it == end)
455 { 455 {
456 for(const RetainedMessage &rm : this_node->retainedMessages) 456 for(const RetainedMessage &rm : this_node->retainedMessages)
457 { 457 {
458 - // TODO: hmm, const stuff forces me to make copy  
459 - Publish pubcopy(rm.publish);  
460 - pubcopy.splitTopic = true;  
461 - packetList.emplace_front(protocolVersion, pubcopy); 458 + // TODO: hmm, const stuff forces me/it to make copy
  459 + packetList.emplace_front(rm.publish);
462 } 460 }
463 if (poundMode) 461 if (poundMode)
464 { 462 {
465 for (auto &pair : this_node->children) 463 for (auto &pair : this_node->children)
466 { 464 {
467 std::unique_ptr<RetainedMessageNode> &child = pair.second; 465 std::unique_ptr<RetainedMessageNode> &child = pair.second;
468 - giveClientRetainedMessagesRecursively(protocolVersion, cur_subtopic_it, end, child.get(), poundMode, packetList); 466 + giveClientRetainedMessagesRecursively(cur_subtopic_it, end, child.get(), poundMode, packetList);
469 } 467 }
470 } 468 }
471 469
@@ -482,7 +480,7 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(ProtocolVersion pr @@ -482,7 +480,7 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(ProtocolVersion pr
482 { 480 {
483 std::unique_ptr<RetainedMessageNode> &child = pair.second; 481 std::unique_ptr<RetainedMessageNode> &child = pair.second;
484 if (child) // I don't think it can ever be unset, but I'd rather avoid a crash. 482 if (child) // I don't think it can ever be unset, but I'd rather avoid a crash.
485 - giveClientRetainedMessagesRecursively(protocolVersion, next_subtopic, end, child.get(), poundFound, packetList); 483 + giveClientRetainedMessagesRecursively(next_subtopic, end, child.get(), poundFound, packetList);
486 } 484 }
487 } 485 }
488 else 486 else
@@ -491,12 +489,12 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(ProtocolVersion pr @@ -491,12 +489,12 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(ProtocolVersion pr
491 489
492 if (children) 490 if (children)
493 { 491 {
494 - giveClientRetainedMessagesRecursively(protocolVersion, next_subtopic, end, children, false, packetList); 492 + giveClientRetainedMessagesRecursively(next_subtopic, end, children, false, packetList);
495 } 493 }
496 } 494 }
497 } 495 }
498 496
499 -uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr<Client> &client, const std::shared_ptr<Session> &ses, 497 +uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr<Session> &ses,
500 const std::vector<std::string> &subscribeSubtopics, char max_qos) 498 const std::vector<std::string> &subscribeSubtopics, char max_qos)
501 { 499 {
502 uint64_t count = 0; 500 uint64_t count = 0;
@@ -505,17 +503,17 @@ uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr&lt;Cli @@ -505,17 +503,17 @@ uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr&lt;Cli
505 if (!subscribeSubtopics.empty() && !subscribeSubtopics[0].empty() > 0 && subscribeSubtopics[0][0] == '$') 503 if (!subscribeSubtopics.empty() && !subscribeSubtopics[0].empty() > 0 && subscribeSubtopics[0][0] == '$')
506 startNode = &retainedMessagesRootDollar; 504 startNode = &retainedMessagesRootDollar;
507 505
508 - std::forward_list<MqttPacket> packetList; 506 + std::forward_list<Publish> packetList;
509 507
510 { 508 {
511 RWLockGuard locker(&retainedMessagesRwlock); 509 RWLockGuard locker(&retainedMessagesRwlock);
512 locker.rdlock(); 510 locker.rdlock();
513 - giveClientRetainedMessagesRecursively(client->getProtocolVersion(), subscribeSubtopics.begin(), subscribeSubtopics.end(), startNode, false, packetList); 511 + giveClientRetainedMessagesRecursively(subscribeSubtopics.begin(), subscribeSubtopics.end(), startNode, false, packetList);
514 } 512 }
515 513
516 - for(MqttPacket &packet : packetList) 514 + for(Publish &publish : packetList)
517 { 515 {
518 - PublishCopyFactory copyFactory(&packet); 516 + PublishCopyFactory copyFactory(&publish);
519 ses->writePacket(copyFactory, max_qos, count); 517 ses->writePacket(copyFactory, max_qos, count);
520 } 518 }
521 519
subscriptionstore.h
@@ -146,9 +146,9 @@ class SubscriptionStore @@ -146,9 +146,9 @@ class SubscriptionStore
146 std::forward_list<ReceivingSubscriber> &targetSessions) const; 146 std::forward_list<ReceivingSubscriber> &targetSessions) const;
147 void publishRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end, 147 void publishRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end,
148 SubscriptionNode *this_node, std::forward_list<ReceivingSubscriber> &targetSessions) const; 148 SubscriptionNode *this_node, std::forward_list<ReceivingSubscriber> &targetSessions) const;
149 - void giveClientRetainedMessagesRecursively(ProtocolVersion protocolVersion, std::vector<std::string>::const_iterator cur_subtopic_it, 149 + void giveClientRetainedMessagesRecursively(std::vector<std::string>::const_iterator cur_subtopic_it,
150 std::vector<std::string>::const_iterator end, RetainedMessageNode *this_node, bool poundMode, 150 std::vector<std::string>::const_iterator end, RetainedMessageNode *this_node, bool poundMode,
151 - std::forward_list<MqttPacket> &packetList) const; 151 + std::forward_list<Publish> &packetList) const;
152 void getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList) const; 152 void getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList) const;
153 void getSubscriptions(SubscriptionNode *this_node, const std::string &composedTopic, bool root, 153 void getSubscriptions(SubscriptionNode *this_node, const std::string &composedTopic, bool root,
154 std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &outputList) const; 154 std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &outputList) const;
@@ -167,7 +167,7 @@ public: @@ -167,7 +167,7 @@ public:
167 void sendQueuedWillMessages(); 167 void sendQueuedWillMessages();
168 void queueWillMessage(const std::shared_ptr<Publish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow = false); 168 void queueWillMessage(const std::shared_ptr<Publish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow = false);
169 void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, bool dollar = false); 169 void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, bool dollar = false);
170 - uint64_t giveClientRetainedMessages(const std::shared_ptr<Client> &client, const std::shared_ptr<Session> &ses, 170 + uint64_t giveClientRetainedMessages(const std::shared_ptr<Session> &ses,
171 const std::vector<std::string> &subscribeSubtopics, char max_qos); 171 const std::vector<std::string> &subscribeSubtopics, char max_qos);
172 172
173 void setRetainedMessage(const Publish &publish, const std::vector<std::string> &subtopics); 173 void setRetainedMessage(const Publish &publish, const std::vector<std::string> &subtopics);