diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 23ee222..6921f18 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -1,36 +1,94 @@ #include "subscriptionstore.h" -SubscriptionStore::SubscriptionStore() +#include "cassert" + + +SubscriptionNode::SubscriptionNode(const std::string &subtopic) : + subtopic(subtopic) { } -void SubscriptionStore::addSubscription(Client_p &client, std::string &topic) +SubscriptionStore::SubscriptionStore() : + subscriptions2(new SubscriptionNode("root")) { - std::lock_guard lock(subscriptionsMutex); - this->subscriptions[topic].insert(client); + } -void SubscriptionStore::removeClient(const Client_p &client) +void SubscriptionStore::addSubscription(Client_p &client, std::string &topic) { + const std::list subtopics = split(topic, '/'); std::lock_guard lock(subscriptionsMutex); - for(std::pair> &pair : subscriptions) + + SubscriptionNode *deepestNode = subscriptions2.get(); + for(const std::string &subtopic : subtopics) { - std::unordered_set &bla = pair.second; - bla.erase(client); + SubscriptionNode &nodeRef = *deepestNode; + std::unique_ptr &node = nodeRef.children[subtopic]; + + if (!node) + { + node.reset(new SubscriptionNode(subtopic)); + } + deepestNode = node.get(); } + + if (deepestNode) + { + deepestNode->subscribers.insert(client->getClientId()); + } +} + +void SubscriptionStore::removeClient(const Client_p &client) +{ + } void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket &packet, const Client_p &sender) { + const std::list subtopics = split(topic, '/'); + // TODO: temp. I want to work with read copies of the subscription store, to avoid frequent lock contention. std::lock_guard lock(subscriptionsMutex); - for(const Client_p &client : subscriptions[topic]) + SubscriptionNode *deepestNode = subscriptions2.get(); + for(const std::string &subtopic : subtopics) + { + SubscriptionNode &nodeRef = *deepestNode; + + if (nodeRef.children.count(subtopic) == 0) + return; + + std::unique_ptr &node = nodeRef.children[subtopic]; + + assert(node); // because any empty unique_ptr's is a bug + + for (const std::string &client_id : node->subscribers) + { + + } + + deepestNode = node.get(); + + } + + /* + for(const std::string &subtopic : subtopics) + { + std::unique_ptr &node = subscriptions2[subtopic]; + + if (!node) + { + subscriptions2 + } + }*/ + + /* + for(const Client_p &client : subscriptions2[topic]) { client->writeMqttPacket(packet); - /* + if (client->getThreadData()->threadnr == sender->getThreadData()->threadnr) { client->writeMqttPacket(packet); // TODO: with my current hack way, this is wrong. Not using a lock only works with my previous idea of queueing. @@ -42,6 +100,8 @@ void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket //client->writeMqttPacketLocked(packet); //client->getThreadData()->addToReadyForDequeuing(client); //client->getThreadData()->wakeUpThread(); - }*/ - } + } + }*/ } + + diff --git a/subscriptionstore.h b/subscriptionstore.h index b8427ad..8910d26 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -2,16 +2,33 @@ #define SUBSCRIPTIONSTORE_H #include +#include #include #include #include "forward_declarations.h" #include "client.h" +#include "utils.h" + +class SubscriptionNode +{ + std::string subtopic; + +public: + SubscriptionNode(const std::string &subtopic); + SubscriptionNode(const SubscriptionNode &node) = delete; + SubscriptionNode(SubscriptionNode &&node) = delete; + + std::unordered_set subscribers; + std::unordered_map> children; + +}; + class SubscriptionStore { - std::unordered_map> subscriptions; + std::unique_ptr subscriptions2; std::mutex subscriptionsMutex; public: SubscriptionStore();