Commit fa805a9493eb14520a4015a4642143b94066d64d

Authored by Wiebe Cazemier
1 parent 4881eaf9

Continued with my tree-based subscriptions

CMakeLists.txt
@@ -17,6 +17,7 @@ add_executable(FlashMQ @@ -17,6 +17,7 @@ add_executable(FlashMQ
17 exceptions.cpp 17 exceptions.cpp
18 types.cpp 18 types.cpp
19 subscriptionstore.cpp 19 subscriptionstore.cpp
  20 + rwlockguard.cpp
20 ) 21 )
21 22
22 target_link_libraries(FlashMQ pthread) 23 target_link_libraries(FlashMQ pthread)
rwlockguard.cpp 0 → 100644
  1 +#include "rwlockguard.h"
  2 +
  3 +RWLockGuard::RWLockGuard(pthread_rwlock_t *rwlock) :
  4 + rwlock(rwlock)
  5 +{
  6 +
  7 +}
  8 +
  9 +RWLockGuard::~RWLockGuard()
  10 +{
  11 + pthread_rwlock_unlock(rwlock);
  12 +}
  13 +
  14 +void RWLockGuard::wrlock()
  15 +{
  16 + pthread_rwlock_wrlock(rwlock);
  17 +}
  18 +
  19 +void RWLockGuard::rdlock()
  20 +{
  21 + pthread_rwlock_wrlock(rwlock);
  22 +}
rwlockguard.h 0 → 100644
  1 +#ifndef RWLOCKGUARD_H
  2 +#define RWLOCKGUARD_H
  3 +
  4 +
  5 +#include <pthread.h>
  6 +
  7 +class RWLockGuard
  8 +{
  9 + pthread_rwlock_t *rwlock = NULL;
  10 +public:
  11 + RWLockGuard(pthread_rwlock_t *rwlock);
  12 + ~RWLockGuard();
  13 + void wrlock();
  14 + void rdlock();
  15 +};
  16 +
  17 +#endif // RWLOCKGUARD_H
subscriptionstore.cpp
@@ -2,6 +2,8 @@ @@ -2,6 +2,8 @@
2 2
3 #include "cassert" 3 #include "cassert"
4 4
  5 +#include "rwlockguard.h"
  6 +
5 7
6 SubscriptionNode::SubscriptionNode(const std::string &subtopic) : 8 SubscriptionNode::SubscriptionNode(const std::string &subtopic) :
7 subtopic(subtopic) 9 subtopic(subtopic)
@@ -18,7 +20,9 @@ SubscriptionStore::SubscriptionStore() : @@ -18,7 +20,9 @@ SubscriptionStore::SubscriptionStore() :
18 void SubscriptionStore::addSubscription(Client_p &client, std::string &topic) 20 void SubscriptionStore::addSubscription(Client_p &client, std::string &topic)
19 { 21 {
20 const std::list<std::string> subtopics = split(topic, '/'); 22 const std::list<std::string> subtopics = split(topic, '/');
21 - std::lock_guard<std::mutex> lock(subscriptionsMutex); 23 +
  24 + RWLockGuard lock_guard(&subscriptionsRwlock);
  25 + lock_guard.wrlock();
22 26
23 SubscriptionNode *deepestNode = subscriptions2.get(); 27 SubscriptionNode *deepestNode = subscriptions2.get();
24 for(const std::string &subtopic : subtopics) 28 for(const std::string &subtopic : subtopics)
@@ -37,71 +41,44 @@ void SubscriptionStore::addSubscription(Client_p &amp;client, std::string &amp;topic) @@ -37,71 +41,44 @@ void SubscriptionStore::addSubscription(Client_p &amp;client, std::string &amp;topic)
37 { 41 {
38 deepestNode->subscribers.insert(client->getClientId()); 42 deepestNode->subscribers.insert(client->getClientId());
39 } 43 }
  44 +
  45 + clients_by_id[client->getClientId()] = client;
40 } 46 }
41 47
42 void SubscriptionStore::removeClient(const Client_p &client) 48 void SubscriptionStore::removeClient(const Client_p &client)
43 { 49 {
44 - 50 + RWLockGuard lock_guard(&subscriptionsRwlock);
  51 + lock_guard.wrlock();
  52 + clients_by_id.erase(client->getClientId());
45 } 53 }
46 54
47 void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket &packet, const Client_p &sender) 55 void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket &packet, const Client_p &sender)
48 { 56 {
49 const std::list<std::string> subtopics = split(topic, '/'); 57 const std::list<std::string> subtopics = split(topic, '/');
  58 + const auto &clients = clients_by_id;
50 59
51 - // TODO: temp. I want to work with read copies of the subscription store, to avoid frequent lock contention.  
52 - std::lock_guard<std::mutex> lock(subscriptionsMutex); 60 + RWLockGuard lock_guard(&subscriptionsRwlock);
  61 + lock_guard.rdlock();
53 62
54 - SubscriptionNode *deepestNode = subscriptions2.get(); 63 + const SubscriptionNode *deepestNode = subscriptions2.get();
55 for(const std::string &subtopic : subtopics) 64 for(const std::string &subtopic : subtopics)
56 { 65 {
57 - SubscriptionNode &nodeRef = *deepestNode;  
58 -  
59 - if (nodeRef.children.count(subtopic) == 0) 66 + auto sub_iter = deepestNode->children.find(subtopic);
  67 + if (sub_iter == deepestNode->children.end())
60 return; 68 return;
61 69
62 - std::unique_ptr<SubscriptionNode> &node = nodeRef.children[subtopic];  
63 -  
64 - assert(node); // because any empty unique_ptr's is a bug  
65 -  
66 - for (const std::string &client_id : node->subscribers)  
67 - {  
68 -  
69 - }  
70 -  
71 - deepestNode = node.get();  
72 - 70 + const std::unique_ptr<SubscriptionNode> &sub_node = sub_iter->second;
  71 + assert(sub_node); // because any empty unique_ptr's is a bug
  72 + deepestNode = sub_node.get();
73 } 73 }
74 74
75 - /*  
76 - for(const std::string &subtopic : subtopics)  
77 - {  
78 - std::unique_ptr<SubscriptionNode> &node = subscriptions2[subtopic];  
79 -  
80 - if (!node)  
81 - {  
82 - subscriptions2  
83 - }  
84 - }*/  
85 -  
86 - /*  
87 - for(const Client_p &client : subscriptions2[topic]) 75 + for (const std::string &client_id : deepestNode->subscribers)
88 { 76 {
89 - client->writeMqttPacket(packet);  
90 -  
91 -  
92 - if (client->getThreadData()->threadnr == sender->getThreadData()->threadnr)  
93 - {  
94 - client->writeMqttPacket(packet); // TODO: with my current hack way, this is wrong. Not using a lock only works with my previous idea of queueing.  
95 - }  
96 - else  
97 - {  
98 - // Or keep a list of queued messages in the store, per client?  
99 -  
100 - //client->writeMqttPacketLocked(packet);  
101 - //client->getThreadData()->addToReadyForDequeuing(client);  
102 - //client->getThreadData()->wakeUpThread();  
103 - }  
104 - }*/ 77 + std::cout << "Publishing to " << client_id << std::endl;
  78 + auto client_it = clients.find(client_id);
  79 + if (client_it != clients.end())
  80 + client_it->second->writeMqttPacket(packet);
  81 + }
105 } 82 }
106 83
107 84
subscriptionstore.h
@@ -5,6 +5,7 @@ @@ -5,6 +5,7 @@
5 #include <unordered_set> 5 #include <unordered_set>
6 #include <list> 6 #include <list>
7 #include <mutex> 7 #include <mutex>
  8 +#include <pthread.h>
8 9
9 #include "forward_declarations.h" 10 #include "forward_declarations.h"
10 11
@@ -20,7 +21,7 @@ public: @@ -20,7 +21,7 @@ public:
20 SubscriptionNode(const SubscriptionNode &node) = delete; 21 SubscriptionNode(const SubscriptionNode &node) = delete;
21 SubscriptionNode(SubscriptionNode &&node) = delete; 22 SubscriptionNode(SubscriptionNode &&node) = delete;
22 23
23 - std::unordered_set<std::string> subscribers; 24 + std::unordered_set<std::string> subscribers; // The idea is to store subscriptions by client id, to support persistent sessions.
24 std::unordered_map<std::string, std::unique_ptr<SubscriptionNode>> children; 25 std::unordered_map<std::string, std::unique_ptr<SubscriptionNode>> children;
25 26
26 }; 27 };
@@ -29,7 +30,8 @@ public: @@ -29,7 +30,8 @@ public:
29 class SubscriptionStore 30 class SubscriptionStore
30 { 31 {
31 std::unique_ptr<SubscriptionNode> subscriptions2; 32 std::unique_ptr<SubscriptionNode> subscriptions2;
32 - std::mutex subscriptionsMutex; 33 + pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER;
  34 + std::unordered_map<std::string, Client_p> clients_by_id;
33 public: 35 public:
34 SubscriptionStore(); 36 SubscriptionStore();
35 37