From fa805a9493eb14520a4015a4642143b94066d64d Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Mon, 14 Dec 2020 17:23:03 +0100 Subject: [PATCH] Continued with my tree-based subscriptions --- CMakeLists.txt | 1 + rwlockguard.cpp | 22 ++++++++++++++++++++++ rwlockguard.h | 17 +++++++++++++++++ subscriptionstore.cpp | 73 +++++++++++++++++++++++++------------------------------------------------ subscriptionstore.h | 6 ++++-- 5 files changed, 69 insertions(+), 50 deletions(-) create mode 100644 rwlockguard.cpp create mode 100644 rwlockguard.h diff --git a/CMakeLists.txt b/CMakeLists.txt index fe5ba53..52e74e7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,6 +17,7 @@ add_executable(FlashMQ exceptions.cpp types.cpp subscriptionstore.cpp + rwlockguard.cpp ) target_link_libraries(FlashMQ pthread) diff --git a/rwlockguard.cpp b/rwlockguard.cpp new file mode 100644 index 0000000..89649f8 --- /dev/null +++ b/rwlockguard.cpp @@ -0,0 +1,22 @@ +#include "rwlockguard.h" + +RWLockGuard::RWLockGuard(pthread_rwlock_t *rwlock) : + rwlock(rwlock) +{ + +} + +RWLockGuard::~RWLockGuard() +{ + pthread_rwlock_unlock(rwlock); +} + +void RWLockGuard::wrlock() +{ + pthread_rwlock_wrlock(rwlock); +} + +void RWLockGuard::rdlock() +{ + pthread_rwlock_wrlock(rwlock); +} diff --git a/rwlockguard.h b/rwlockguard.h new file mode 100644 index 0000000..de0afb2 --- /dev/null +++ b/rwlockguard.h @@ -0,0 +1,17 @@ +#ifndef RWLOCKGUARD_H +#define RWLOCKGUARD_H + + +#include + +class RWLockGuard +{ + pthread_rwlock_t *rwlock = NULL; +public: + RWLockGuard(pthread_rwlock_t *rwlock); + ~RWLockGuard(); + void wrlock(); + void rdlock(); +}; + +#endif // RWLOCKGUARD_H diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 6921f18..3db2e6a 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -2,6 +2,8 @@ #include "cassert" +#include "rwlockguard.h" + SubscriptionNode::SubscriptionNode(const std::string &subtopic) : subtopic(subtopic) @@ -18,7 +20,9 @@ SubscriptionStore::SubscriptionStore() : void SubscriptionStore::addSubscription(Client_p &client, std::string &topic) { const std::list subtopics = split(topic, '/'); - std::lock_guard lock(subscriptionsMutex); + + RWLockGuard lock_guard(&subscriptionsRwlock); + lock_guard.wrlock(); SubscriptionNode *deepestNode = subscriptions2.get(); for(const std::string &subtopic : subtopics) @@ -37,71 +41,44 @@ void SubscriptionStore::addSubscription(Client_p &client, std::string &topic) { deepestNode->subscribers.insert(client->getClientId()); } + + clients_by_id[client->getClientId()] = client; } void SubscriptionStore::removeClient(const Client_p &client) { - + RWLockGuard lock_guard(&subscriptionsRwlock); + lock_guard.wrlock(); + clients_by_id.erase(client->getClientId()); } void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket &packet, const Client_p &sender) { const std::list subtopics = split(topic, '/'); + const auto &clients = clients_by_id; - // TODO: temp. I want to work with read copies of the subscription store, to avoid frequent lock contention. - std::lock_guard lock(subscriptionsMutex); + RWLockGuard lock_guard(&subscriptionsRwlock); + lock_guard.rdlock(); - SubscriptionNode *deepestNode = subscriptions2.get(); + const SubscriptionNode *deepestNode = subscriptions2.get(); for(const std::string &subtopic : subtopics) { - SubscriptionNode &nodeRef = *deepestNode; - - if (nodeRef.children.count(subtopic) == 0) + auto sub_iter = deepestNode->children.find(subtopic); + if (sub_iter == deepestNode->children.end()) return; - std::unique_ptr &node = nodeRef.children[subtopic]; - - assert(node); // because any empty unique_ptr's is a bug - - for (const std::string &client_id : node->subscribers) - { - - } - - deepestNode = node.get(); - + const std::unique_ptr &sub_node = sub_iter->second; + assert(sub_node); // because any empty unique_ptr's is a bug + deepestNode = sub_node.get(); } - /* - for(const std::string &subtopic : subtopics) - { - std::unique_ptr &node = subscriptions2[subtopic]; - - if (!node) - { - subscriptions2 - } - }*/ - - /* - for(const Client_p &client : subscriptions2[topic]) + for (const std::string &client_id : deepestNode->subscribers) { - client->writeMqttPacket(packet); - - - if (client->getThreadData()->threadnr == sender->getThreadData()->threadnr) - { - client->writeMqttPacket(packet); // TODO: with my current hack way, this is wrong. Not using a lock only works with my previous idea of queueing. - } - else - { - // Or keep a list of queued messages in the store, per client? - - //client->writeMqttPacketLocked(packet); - //client->getThreadData()->addToReadyForDequeuing(client); - //client->getThreadData()->wakeUpThread(); - } - }*/ + std::cout << "Publishing to " << client_id << std::endl; + auto client_it = clients.find(client_id); + if (client_it != clients.end()) + client_it->second->writeMqttPacket(packet); + } } diff --git a/subscriptionstore.h b/subscriptionstore.h index 8910d26..46222d0 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -5,6 +5,7 @@ #include #include #include +#include #include "forward_declarations.h" @@ -20,7 +21,7 @@ public: SubscriptionNode(const SubscriptionNode &node) = delete; SubscriptionNode(SubscriptionNode &&node) = delete; - std::unordered_set subscribers; + std::unordered_set subscribers; // The idea is to store subscriptions by client id, to support persistent sessions. std::unordered_map> children; }; @@ -29,7 +30,8 @@ public: class SubscriptionStore { std::unique_ptr subscriptions2; - std::mutex subscriptionsMutex; + pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER; + std::unordered_map clients_by_id; public: SubscriptionStore(); -- libgit2 0.21.4