diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 9e99af9..e3052f2 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -147,7 +147,7 @@ void SubscriptionStore::addSubscription(std::shared_ptr &client, const s const std::shared_ptr &ses = session_it->second; deepestNode->addSubscriber(ses, qos); lock_guard.unlock(); - uint64_t count = giveClientRetainedMessages(client, ses, subtopics, qos); + uint64_t count = giveClientRetainedMessages(ses, subtopics, qos); client->getThreadData()->incrementSentMessageCount(count); } } @@ -447,25 +447,23 @@ void SubscriptionStore::queuePacketAtSubscribers(PublishCopyFactory ©Factory } } -void SubscriptionStore::giveClientRetainedMessagesRecursively(ProtocolVersion protocolVersion, std::vector::const_iterator cur_subtopic_it, +void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector::const_iterator cur_subtopic_it, std::vector::const_iterator end, RetainedMessageNode *this_node, - bool poundMode, std::forward_list &packetList) const + bool poundMode, std::forward_list &packetList) const { if (cur_subtopic_it == end) { for(const RetainedMessage &rm : this_node->retainedMessages) { - // TODO: hmm, const stuff forces me to make copy - Publish pubcopy(rm.publish); - pubcopy.splitTopic = true; - packetList.emplace_front(protocolVersion, pubcopy); + // TODO: hmm, const stuff forces me/it to make copy + packetList.emplace_front(rm.publish); } if (poundMode) { for (auto &pair : this_node->children) { std::unique_ptr &child = pair.second; - giveClientRetainedMessagesRecursively(protocolVersion, cur_subtopic_it, end, child.get(), poundMode, packetList); + giveClientRetainedMessagesRecursively(cur_subtopic_it, end, child.get(), poundMode, packetList); } } @@ -482,7 +480,7 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(ProtocolVersion pr { std::unique_ptr &child = pair.second; if (child) // I don't think it can ever be unset, but I'd rather avoid a crash. - giveClientRetainedMessagesRecursively(protocolVersion, next_subtopic, end, child.get(), poundFound, packetList); + giveClientRetainedMessagesRecursively(next_subtopic, end, child.get(), poundFound, packetList); } } else @@ -491,12 +489,12 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(ProtocolVersion pr if (children) { - giveClientRetainedMessagesRecursively(protocolVersion, next_subtopic, end, children, false, packetList); + giveClientRetainedMessagesRecursively(next_subtopic, end, children, false, packetList); } } } -uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr &client, const std::shared_ptr &ses, +uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr &ses, const std::vector &subscribeSubtopics, char max_qos) { uint64_t count = 0; @@ -505,17 +503,17 @@ uint64_t SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr 0 && subscribeSubtopics[0][0] == '$') startNode = &retainedMessagesRootDollar; - std::forward_list packetList; + std::forward_list packetList; { RWLockGuard locker(&retainedMessagesRwlock); locker.rdlock(); - giveClientRetainedMessagesRecursively(client->getProtocolVersion(), subscribeSubtopics.begin(), subscribeSubtopics.end(), startNode, false, packetList); + giveClientRetainedMessagesRecursively(subscribeSubtopics.begin(), subscribeSubtopics.end(), startNode, false, packetList); } - for(MqttPacket &packet : packetList) + for(Publish &publish : packetList) { - PublishCopyFactory copyFactory(&packet); + PublishCopyFactory copyFactory(&publish); ses->writePacket(copyFactory, max_qos, count); } diff --git a/subscriptionstore.h b/subscriptionstore.h index d1a7f12..3124d3d 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -146,9 +146,9 @@ class SubscriptionStore std::forward_list &targetSessions) const; void publishRecursively(std::vector::const_iterator cur_subtopic_it, std::vector::const_iterator end, SubscriptionNode *this_node, std::forward_list &targetSessions) const; - void giveClientRetainedMessagesRecursively(ProtocolVersion protocolVersion, std::vector::const_iterator cur_subtopic_it, + void giveClientRetainedMessagesRecursively(std::vector::const_iterator cur_subtopic_it, std::vector::const_iterator end, RetainedMessageNode *this_node, bool poundMode, - std::forward_list &packetList) const; + std::forward_list &packetList) const; void getRetainedMessages(RetainedMessageNode *this_node, std::vector &outputList) const; void getSubscriptions(SubscriptionNode *this_node, const std::string &composedTopic, bool root, std::unordered_map> &outputList) const; @@ -167,7 +167,7 @@ public: void sendQueuedWillMessages(); void queueWillMessage(const std::shared_ptr &willMessage, const std::shared_ptr &session, bool forceNow = false); void queuePacketAtSubscribers(PublishCopyFactory ©Factory, bool dollar = false); - uint64_t giveClientRetainedMessages(const std::shared_ptr &client, const std::shared_ptr &ses, + uint64_t giveClientRetainedMessages(const std::shared_ptr &ses, const std::vector &subscribeSubtopics, char max_qos); void setRetainedMessage(const Publish &publish, const std::vector &subtopics);