Commit 4881eaf9493517f4d06bb1012882091b8a947af8

Authored by Wiebe Cazemier
1 parent daa339b0

An attempt at subscriptions that I'll probably abandon

subscriptionstore.cpp
1 #include "subscriptionstore.h" 1 #include "subscriptionstore.h"
2 2
3 -SubscriptionStore::SubscriptionStore() 3 +#include "cassert"
  4 +
  5 +
  6 +SubscriptionNode::SubscriptionNode(const std::string &subtopic) :
  7 + subtopic(subtopic)
4 { 8 {
5 9
6 } 10 }
7 11
8 -void SubscriptionStore::addSubscription(Client_p &client, std::string &topic) 12 +SubscriptionStore::SubscriptionStore() :
  13 + subscriptions2(new SubscriptionNode("root"))
9 { 14 {
10 - std::lock_guard<std::mutex> lock(subscriptionsMutex);  
11 - this->subscriptions[topic].insert(client); 15 +
12 } 16 }
13 17
14 -void SubscriptionStore::removeClient(const Client_p &client) 18 +void SubscriptionStore::addSubscription(Client_p &client, std::string &topic)
15 { 19 {
  20 + const std::list<std::string> subtopics = split(topic, '/');
16 std::lock_guard<std::mutex> lock(subscriptionsMutex); 21 std::lock_guard<std::mutex> lock(subscriptionsMutex);
17 - for(std::pair<const std::string, std::unordered_set<Client_p>> &pair : subscriptions) 22 +
  23 + SubscriptionNode *deepestNode = subscriptions2.get();
  24 + for(const std::string &subtopic : subtopics)
18 { 25 {
19 - std::unordered_set<Client_p> &bla = pair.second;  
20 - bla.erase(client); 26 + SubscriptionNode &nodeRef = *deepestNode;
  27 + std::unique_ptr<SubscriptionNode> &node = nodeRef.children[subtopic];
  28 +
  29 + if (!node)
  30 + {
  31 + node.reset(new SubscriptionNode(subtopic));
  32 + }
  33 + deepestNode = node.get();
21 } 34 }
  35 +
  36 + if (deepestNode)
  37 + {
  38 + deepestNode->subscribers.insert(client->getClientId());
  39 + }
  40 +}
  41 +
  42 +void SubscriptionStore::removeClient(const Client_p &client)
  43 +{
  44 +
22 } 45 }
23 46
24 void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket &packet, const Client_p &sender) 47 void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket &packet, const Client_p &sender)
25 { 48 {
  49 + const std::list<std::string> subtopics = split(topic, '/');
  50 +
26 // TODO: temp. I want to work with read copies of the subscription store, to avoid frequent lock contention. 51 // TODO: temp. I want to work with read copies of the subscription store, to avoid frequent lock contention.
27 std::lock_guard<std::mutex> lock(subscriptionsMutex); 52 std::lock_guard<std::mutex> lock(subscriptionsMutex);
28 53
29 - for(const Client_p &client : subscriptions[topic]) 54 + SubscriptionNode *deepestNode = subscriptions2.get();
  55 + for(const std::string &subtopic : subtopics)
  56 + {
  57 + SubscriptionNode &nodeRef = *deepestNode;
  58 +
  59 + if (nodeRef.children.count(subtopic) == 0)
  60 + return;
  61 +
  62 + std::unique_ptr<SubscriptionNode> &node = nodeRef.children[subtopic];
  63 +
  64 + assert(node); // because any empty unique_ptr's is a bug
  65 +
  66 + for (const std::string &client_id : node->subscribers)
  67 + {
  68 +
  69 + }
  70 +
  71 + deepestNode = node.get();
  72 +
  73 + }
  74 +
  75 + /*
  76 + for(const std::string &subtopic : subtopics)
  77 + {
  78 + std::unique_ptr<SubscriptionNode> &node = subscriptions2[subtopic];
  79 +
  80 + if (!node)
  81 + {
  82 + subscriptions2
  83 + }
  84 + }*/
  85 +
  86 + /*
  87 + for(const Client_p &client : subscriptions2[topic])
30 { 88 {
31 client->writeMqttPacket(packet); 89 client->writeMqttPacket(packet);
32 90
33 - /* 91 +
34 if (client->getThreadData()->threadnr == sender->getThreadData()->threadnr) 92 if (client->getThreadData()->threadnr == sender->getThreadData()->threadnr)
35 { 93 {
36 client->writeMqttPacket(packet); // TODO: with my current hack way, this is wrong. Not using a lock only works with my previous idea of queueing. 94 client->writeMqttPacket(packet); // TODO: with my current hack way, this is wrong. Not using a lock only works with my previous idea of queueing.
@@ -42,6 +100,8 @@ void SubscriptionStore::queueAtClientsTemp(std::string &amp;topic, const MqttPacket @@ -42,6 +100,8 @@ void SubscriptionStore::queueAtClientsTemp(std::string &amp;topic, const MqttPacket
42 //client->writeMqttPacketLocked(packet); 100 //client->writeMqttPacketLocked(packet);
43 //client->getThreadData()->addToReadyForDequeuing(client); 101 //client->getThreadData()->addToReadyForDequeuing(client);
44 //client->getThreadData()->wakeUpThread(); 102 //client->getThreadData()->wakeUpThread();
45 - }*/  
46 - } 103 + }
  104 + }*/
47 } 105 }
  106 +
  107 +
subscriptionstore.h
@@ -2,16 +2,33 @@ @@ -2,16 +2,33 @@
2 #define SUBSCRIPTIONSTORE_H 2 #define SUBSCRIPTIONSTORE_H
3 3
4 #include <unordered_map> 4 #include <unordered_map>
  5 +#include <unordered_set>
5 #include <list> 6 #include <list>
6 #include <mutex> 7 #include <mutex>
7 8
8 #include "forward_declarations.h" 9 #include "forward_declarations.h"
9 10
10 #include "client.h" 11 #include "client.h"
  12 +#include "utils.h"
  13 +
  14 +class SubscriptionNode
  15 +{
  16 + std::string subtopic;
  17 +
  18 +public:
  19 + SubscriptionNode(const std::string &subtopic);
  20 + SubscriptionNode(const SubscriptionNode &node) = delete;
  21 + SubscriptionNode(SubscriptionNode &&node) = delete;
  22 +
  23 + std::unordered_set<std::string> subscribers;
  24 + std::unordered_map<std::string, std::unique_ptr<SubscriptionNode>> children;
  25 +
  26 +};
  27 +
11 28
12 class SubscriptionStore 29 class SubscriptionStore
13 { 30 {
14 - std::unordered_map<std::string, std::unordered_set<Client_p>> subscriptions; 31 + std::unique_ptr<SubscriptionNode> subscriptions2;
15 std::mutex subscriptionsMutex; 32 std::mutex subscriptionsMutex;
16 public: 33 public:
17 SubscriptionStore(); 34 SubscriptionStore();