Commit 163d14e82248f49daa3d47521d980db9d3a89752

Authored by Wiebe Cazemier
1 parent fa805a94

Recursive publish

mqttpacket.cpp
@@ -208,7 +208,7 @@ void MqttPacket::handlePublish(std::shared_ptr<SubscriptionStore> &subscriptionS @@ -208,7 +208,7 @@ void MqttPacket::handlePublish(std::shared_ptr<SubscriptionStore> &subscriptionS
208 size_t payload_length = remainingAfterPos(); 208 size_t payload_length = remainingAfterPos();
209 std::string payload(readBytes(payload_length), payload_length); 209 std::string payload(readBytes(payload_length), payload_length);
210 210
211 - subscriptionStore->queueAtClientsTemp(topic, *this, sender); 211 + subscriptionStore->queuePacketAtSubscribers(topic, *this, sender);
212 } 212 }
213 213
214 214
subscriptionstore.cpp
@@ -12,7 +12,8 @@ SubscriptionNode::SubscriptionNode(const std::string &subtopic) : @@ -12,7 +12,8 @@ SubscriptionNode::SubscriptionNode(const std::string &subtopic) :
12 } 12 }
13 13
14 SubscriptionStore::SubscriptionStore() : 14 SubscriptionStore::SubscriptionStore() :
15 - subscriptions2(new SubscriptionNode("root")) 15 + root(new SubscriptionNode("root")),
  16 + clients_by_id_const(clients_by_id)
16 { 17 {
17 18
18 } 19 }
@@ -24,7 +25,7 @@ void SubscriptionStore::addSubscription(Client_p &client, std::string &topic) @@ -24,7 +25,7 @@ void SubscriptionStore::addSubscription(Client_p &client, std::string &topic)
24 RWLockGuard lock_guard(&subscriptionsRwlock); 25 RWLockGuard lock_guard(&subscriptionsRwlock);
25 lock_guard.wrlock(); 26 lock_guard.wrlock();
26 27
27 - SubscriptionNode *deepestNode = subscriptions2.get(); 28 + SubscriptionNode *deepestNode = root.get();
28 for(const std::string &subtopic : subtopics) 29 for(const std::string &subtopic : subtopics)
29 { 30 {
30 SubscriptionNode &nodeRef = *deepestNode; 31 SubscriptionNode &nodeRef = *deepestNode;
@@ -39,7 +40,7 @@ void SubscriptionStore::addSubscription(Client_p &client, std::string &topic) @@ -39,7 +40,7 @@ void SubscriptionStore::addSubscription(Client_p &client, std::string &topic)
39 40
40 if (deepestNode) 41 if (deepestNode)
41 { 42 {
42 - deepestNode->subscribers.insert(client->getClientId()); 43 + deepestNode->subscribers.push_front(client->getClientId());
43 } 44 }
44 45
45 clients_by_id[client->getClientId()] = client; 46 clients_by_id[client->getClientId()] = client;
@@ -52,33 +53,51 @@ void SubscriptionStore::removeClient(const Client_p &client) @@ -52,33 +53,51 @@ void SubscriptionStore::removeClient(const Client_p &client)
52 clients_by_id.erase(client->getClientId()); 53 clients_by_id.erase(client->getClientId());
53 } 54 }
54 55
55 -void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket &packet, const Client_p &sender) 56 +// TODO: keep a cache of topics vs clients
  57 +
  58 +bool SubscriptionStore::publishRecursively(std::list<std::string>::const_iterator cur_subtopic_it, std::list<std::string>::const_iterator end,
  59 + std::unique_ptr<SubscriptionNode> &this_node, const MqttPacket &packet) const
56 { 60 {
57 - const std::list<std::string> subtopics = split(topic, '/');  
58 - const auto &clients = clients_by_id; 61 + if (cur_subtopic_it == end) // This is the end of the topic path, so look for subscribers here.
  62 + {
  63 + for (const std::string &client_id : this_node->subscribers)
  64 + {
  65 + auto client_it = clients_by_id_const.find(client_id);
  66 + if (client_it != clients_by_id_const.end())
  67 + client_it->second->writeMqttPacket(packet);
  68 + }
59 69
60 - RWLockGuard lock_guard(&subscriptionsRwlock);  
61 - lock_guard.rdlock(); 70 + return true;
  71 + }
62 72
63 - const SubscriptionNode *deepestNode = subscriptions2.get();  
64 - for(const std::string &subtopic : subtopics)  
65 - {  
66 - auto sub_iter = deepestNode->children.find(subtopic);  
67 - if (sub_iter == deepestNode->children.end())  
68 - return; 73 + std::string cur_subtop = *cur_subtopic_it;
  74 + auto sub_node = this_node->children.find(cur_subtop);
  75 +
  76 + const auto next_subtopic = ++cur_subtopic_it;
69 77
70 - const std::unique_ptr<SubscriptionNode> &sub_node = sub_iter->second;  
71 - assert(sub_node); // because any empty unique_ptr's is a bug  
72 - deepestNode = sub_node.get(); 78 + if (sub_node != this_node->children.end())
  79 + {
  80 + publishRecursively(next_subtopic, end, sub_node->second, packet);
73 } 81 }
74 82
75 - for (const std::string &client_id : deepestNode->subscribers) 83 + const auto plus_sign_node = this_node->children.find("+");
  84 +
  85 + if (plus_sign_node != this_node->children.end())
76 { 86 {
77 - std::cout << "Publishing to " << client_id << std::endl;  
78 - auto client_it = clients.find(client_id);  
79 - if (client_it != clients.end())  
80 - client_it->second->writeMqttPacket(packet); 87 + publishRecursively(next_subtopic, end, plus_sign_node->second, packet);
81 } 88 }
  89 +
  90 + return false;
  91 +}
  92 +
  93 +void SubscriptionStore::queuePacketAtSubscribers(std::string &topic, const MqttPacket &packet, const Client_p &sender)
  94 +{
  95 + const std::list<std::string> subtopics = split(topic, '/');
  96 +
  97 + RWLockGuard lock_guard(&subscriptionsRwlock);
  98 + lock_guard.rdlock();
  99 +
  100 + publishRecursively(subtopics.begin(), subtopics.end(), root, packet);
82 } 101 }
83 102
84 103
subscriptionstore.h
@@ -2,7 +2,7 @@ @@ -2,7 +2,7 @@
2 #define SUBSCRIPTIONSTORE_H 2 #define SUBSCRIPTIONSTORE_H
3 3
4 #include <unordered_map> 4 #include <unordered_map>
5 -#include <unordered_set> 5 +#include <forward_list>
6 #include <list> 6 #include <list>
7 #include <mutex> 7 #include <mutex>
8 #include <pthread.h> 8 #include <pthread.h>
@@ -21,27 +21,27 @@ public: @@ -21,27 +21,27 @@ public:
21 SubscriptionNode(const SubscriptionNode &node) = delete; 21 SubscriptionNode(const SubscriptionNode &node) = delete;
22 SubscriptionNode(SubscriptionNode &&node) = delete; 22 SubscriptionNode(SubscriptionNode &&node) = delete;
23 23
24 - std::unordered_set<std::string> subscribers; // The idea is to store subscriptions by client id, to support persistent sessions. 24 + std::forward_list<std::string> subscribers; // The idea is to store subscriptions by client id, to support persistent sessions.
25 std::unordered_map<std::string, std::unique_ptr<SubscriptionNode>> children; 25 std::unordered_map<std::string, std::unique_ptr<SubscriptionNode>> children;
26 -  
27 }; 26 };
28 27
29 28
30 class SubscriptionStore 29 class SubscriptionStore
31 { 30 {
32 - std::unique_ptr<SubscriptionNode> subscriptions2; 31 + std::unique_ptr<SubscriptionNode> root;
33 pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER; 32 pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER;
34 std::unordered_map<std::string, Client_p> clients_by_id; 33 std::unordered_map<std::string, Client_p> clients_by_id;
  34 + const std::unordered_map<std::string, Client_p> &clients_by_id_const;
  35 +
  36 + bool publishRecursively(std::list<std::string>::const_iterator cur_subtopic_it, std::list<std::string>::const_iterator end,
  37 + std::unique_ptr<SubscriptionNode> &next, const MqttPacket &packet) const;
35 public: 38 public:
36 SubscriptionStore(); 39 SubscriptionStore();
37 40
38 void addSubscription(Client_p &client, std::string &topic); 41 void addSubscription(Client_p &client, std::string &topic);
39 void removeClient(const Client_p &client); 42 void removeClient(const Client_p &client);
40 43
41 - // work with read copies intead of mutex/lock over the central store  
42 - void getReadCopy(); // TODO  
43 -  
44 - void queueAtClientsTemp(std::string &topic, const MqttPacket &packet, const Client_p &sender); 44 + void queuePacketAtSubscribers(std::string &topic, const MqttPacket &packet, const Client_p &sender);
45 }; 45 };
46 46
47 #endif // SUBSCRIPTIONSTORE_H 47 #endif // SUBSCRIPTIONSTORE_H