Commit af190a633b22066afe11697df3ed35a633e36445

Authored by Wiebe Cazemier
1 parent 84fe398d

Store subscribers as map

The initial concept of iterating over a vector being fast didn't help,
and made having many subscribers to one topic very slow.
subscriptionstore.cpp
@@ -28,7 +28,7 @@ SubscriptionNode::SubscriptionNode(const std::string &subtopic) : @@ -28,7 +28,7 @@ SubscriptionNode::SubscriptionNode(const std::string &subtopic) :
28 28
29 } 29 }
30 30
31 -std::vector<Subscription> &SubscriptionNode::getSubscribers() 31 +std::unordered_map<std::string, Subscription> &SubscriptionNode::getSubscribers()
32 { 32 {
33 return subscribers; 33 return subscribers;
34 } 34 }
@@ -44,18 +44,8 @@ void SubscriptionNode::addSubscriber(const std::shared_ptr&lt;Session&gt; &amp;subscriber, @@ -44,18 +44,8 @@ void SubscriptionNode::addSubscriber(const std::shared_ptr&lt;Session&gt; &amp;subscriber,
44 sub.session = subscriber; 44 sub.session = subscriber;
45 sub.qos = qos; 45 sub.qos = qos;
46 46
47 - // I'll have to decide whether to keep the subscriber as a vector. Vectors are  
48 - // fast, and relatively, you don't often add subscribers.  
49 - auto subscriber_it = std::find(subscribers.begin(), subscribers.end(), sub);  
50 - if (subscriber_it == subscribers.end())  
51 - {  
52 - subscribers.push_back(sub);  
53 - }  
54 - else  
55 - {  
56 - Subscription &existingSub = *subscriber_it;  
57 - existingSub = sub;  
58 - } 47 + const std::string &client_id = subscriber->getClientId();
  48 + subscribers[client_id] = sub;
59 } 49 }
60 50
61 void SubscriptionNode::removeSubscriber(const std::shared_ptr<Session> &subscriber) 51 void SubscriptionNode::removeSubscriber(const std::shared_ptr<Session> &subscriber)
@@ -64,7 +54,7 @@ void SubscriptionNode::removeSubscriber(const std::shared_ptr&lt;Session&gt; &amp;subscrib @@ -64,7 +54,7 @@ void SubscriptionNode::removeSubscriber(const std::shared_ptr&lt;Session&gt; &amp;subscrib
64 sub.session = subscriber; 54 sub.session = subscriber;
65 sub.qos = 0; 55 sub.qos = 0;
66 56
67 - auto it = std::find(subscribers.begin(), subscribers.end(), sub); 57 + auto it = subscribers.find(subscriber->getClientId());
68 58
69 if (it != subscribers.end()) 59 if (it != subscribers.end())
70 { 60 {
@@ -261,10 +251,12 @@ bool SubscriptionStore::sessionPresent(const std::string &amp;clientid) @@ -261,10 +251,12 @@ bool SubscriptionStore::sessionPresent(const std::string &amp;clientid)
261 return result; 251 return result;
262 } 252 }
263 253
264 -void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const std::vector<Subscription> &subscribers, uint64_t &count) const 254 +void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const std::unordered_map<std::string, Subscription> &subscribers, uint64_t &count) const
265 { 255 {
266 - for (const Subscription &sub : subscribers) 256 + for (auto &pair : subscribers)
267 { 257 {
  258 + const Subscription &sub = pair.second;
  259 +
268 const std::shared_ptr<Session> session = sub.session.lock(); 260 const std::shared_ptr<Session> session = sub.session.lock();
269 if (session) // Shared pointer expires when session has been cleaned by 'clean session' connect. 261 if (session) // Shared pointer expires when session has been cleaned by 'clean session' connect.
270 { 262 {
@@ -479,10 +471,10 @@ int SubscriptionNode::cleanSubscriptions() @@ -479,10 +471,10 @@ int SubscriptionNode::cleanSubscriptions()
479 auto it = subscribers.begin(); 471 auto it = subscribers.begin();
480 while (it != subscribers.end()) 472 while (it != subscribers.end())
481 { 473 {
482 - std::shared_ptr<Session> ses = it->session.lock(); 474 + std::shared_ptr<Session> ses = it->second.session.lock();
483 if (!ses) 475 if (!ses)
484 { 476 {
485 - Logger::getInstance()->logf(LOG_DEBUG, "Removing empty spot in subscribers vector"); 477 + Logger::getInstance()->logf(LOG_DEBUG, "Removing empty spot in subscribers map");
486 it = subscribers.erase(it); 478 it = subscribers.erase(it);
487 } 479 }
488 else 480 else
@@ -567,8 +559,9 @@ void SubscriptionStore::getRetainedMessages(RetainedMessageNode *this_node, std: @@ -567,8 +559,9 @@ void SubscriptionStore::getRetainedMessages(RetainedMessageNode *this_node, std:
567 void SubscriptionStore::getSubscriptions(SubscriptionNode *this_node, const std::string &composedTopic, bool root, 559 void SubscriptionStore::getSubscriptions(SubscriptionNode *this_node, const std::string &composedTopic, bool root,
568 std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &outputList) const 560 std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &outputList) const
569 { 561 {
570 - for (const Subscription &node : this_node->getSubscribers()) 562 + for (auto &pair : this_node->getSubscribers())
571 { 563 {
  564 + const Subscription &node = pair.second;
572 std::shared_ptr<Session> ses = node.session.lock(); 565 std::shared_ptr<Session> ses = node.session.lock();
573 if (ses) 566 if (ses)
574 { 567 {
subscriptionstore.h
@@ -44,14 +44,14 @@ struct Subscription @@ -44,14 +44,14 @@ struct Subscription
44 class SubscriptionNode 44 class SubscriptionNode
45 { 45 {
46 std::string subtopic; 46 std::string subtopic;
47 - std::vector<Subscription> subscribers; 47 + std::unordered_map<std::string, Subscription> subscribers;
48 48
49 public: 49 public:
50 SubscriptionNode(const std::string &subtopic); 50 SubscriptionNode(const std::string &subtopic);
51 SubscriptionNode(const SubscriptionNode &node) = delete; 51 SubscriptionNode(const SubscriptionNode &node) = delete;
52 SubscriptionNode(SubscriptionNode &&node) = delete; 52 SubscriptionNode(SubscriptionNode &&node) = delete;
53 53
54 - std::vector<Subscription> &getSubscribers(); 54 + std::unordered_map<std::string, Subscription> &getSubscribers();
55 const std::string &getSubtopic() const; 55 const std::string &getSubtopic() const;
56 void addSubscriber(const std::shared_ptr<Session> &subscriber, char qos); 56 void addSubscriber(const std::shared_ptr<Session> &subscriber, char qos);
57 void removeSubscriber(const std::shared_ptr<Session> &subscriber); 57 void removeSubscriber(const std::shared_ptr<Session> &subscriber);
@@ -94,7 +94,7 @@ class SubscriptionStore @@ -94,7 +94,7 @@ class SubscriptionStore
94 94
95 Logger *logger = Logger::getInstance(); 95 Logger *logger = Logger::getInstance();
96 96
97 - void publishNonRecursively(const MqttPacket &packet, const std::vector<Subscription> &subscribers, uint64_t &count) const; 97 + void publishNonRecursively(const MqttPacket &packet, const std::unordered_map<std::string, Subscription> &subscribers, uint64_t &count) const;
98 void publishRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end, 98 void publishRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end,
99 SubscriptionNode *this_node, const MqttPacket &packet, uint64_t &count) const; 99 SubscriptionNode *this_node, const MqttPacket &packet, uint64_t &count) const;
100 void getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList) const; 100 void getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList) const;