Commit 3cb1fae8aa306954c140ca24be34d8931e9c424c

Authored by Wiebe Cazemier
1 parent 43df4fc2

Store clients as sessions

In preparation for clean session and qos.
CMakeLists.txt
@@ -13,6 +13,7 @@ add_executable(FlashMQ @@ -13,6 +13,7 @@ add_executable(FlashMQ
13 utils.cpp 13 utils.cpp
14 threaddata.cpp 14 threaddata.cpp
15 client.cpp 15 client.cpp
  16 + session.cpp
16 bytestopacketparser.cpp 17 bytestopacketparser.cpp
17 mqttpacket.cpp 18 mqttpacket.cpp
18 exceptions.cpp 19 exceptions.cpp
session.cpp 0 → 100644
  1 +#include "session.h"
  2 +
  3 +Session::Session()
  4 +{
  5 +
  6 +}
  7 +
  8 +Session::Session(std::shared_ptr<Client> &client)
  9 +{
  10 + this->client = client;
  11 +}
  12 +
  13 +bool Session::clientDisconnected() const
  14 +{
  15 + return client.expired();
  16 +}
  17 +
  18 +std::shared_ptr<Client> Session::makeSharedClient() const
  19 +{
  20 + return client.lock();
  21 +}
session.h 0 → 100644
  1 +#ifndef SESSION_H
  2 +#define SESSION_H
  3 +
  4 +#include <memory>
  5 +
  6 +class Client;
  7 +
  8 +class Session
  9 +{
  10 + std::weak_ptr<Client> client;
  11 + // TODO: qos message queue, as some kind of movable pointer.
  12 +public:
  13 + Session();
  14 + Session(std::shared_ptr<Client> &client);
  15 +
  16 + bool clientDisconnected() const;
  17 + std::shared_ptr<Client> makeSharedClient() const;
  18 +};
  19 +
  20 +#endif // SESSION_H
subscriptionstore.cpp
@@ -13,7 +13,7 @@ SubscriptionNode::SubscriptionNode(const std::string &amp;subtopic) : @@ -13,7 +13,7 @@ SubscriptionNode::SubscriptionNode(const std::string &amp;subtopic) :
13 13
14 SubscriptionStore::SubscriptionStore() : 14 SubscriptionStore::SubscriptionStore() :
15 root(new SubscriptionNode("root")), 15 root(new SubscriptionNode("root")),
16 - clients_by_id_const(clients_by_id) 16 + sessionsByIdConst(sessionsById)
17 { 17 {
18 18
19 } 19 }
@@ -65,19 +65,21 @@ void SubscriptionStore::registerClientAndKickExistingOne(Client_p &amp;client) @@ -65,19 +65,21 @@ void SubscriptionStore::registerClientAndKickExistingOne(Client_p &amp;client)
65 if (client->getClientId().empty()) 65 if (client->getClientId().empty())
66 throw ProtocolError("Trying to store client without an ID."); 66 throw ProtocolError("Trying to store client without an ID.");
67 67
68 - std::weak_ptr<Client> existingClient = clients_by_id[client->getClientId()];  
69 - auto it = clients_by_id.find(client->getClientId());  
70 -  
71 - if (it != clients_by_id.end() && !it->second.expired()) 68 + auto session_it = sessionsById.find(client->getClientId());
  69 + if (session_it != sessionsById.end())
72 { 70 {
73 - std::shared_ptr<Client> cl = it->second.lock();  
74 - logger->logf(LOG_NOTICE, "Disconnecting existing client with id '%s'", cl->getClientId().c_str());  
75 - cl->setReadyForDisconnect();  
76 - cl->getThreadData()->removeClient(cl);  
77 - cl->markAsDisconnecting();  
78 - } 71 + Session &session = session_it->second;
79 72
80 - clients_by_id[client->getClientId()] = client; 73 + if (!session.clientDisconnected())
  74 + {
  75 + std::shared_ptr<Client> cl = session.makeSharedClient();
  76 + logger->logf(LOG_NOTICE, "Disconnecting existing client with id '%s'", cl->getClientId().c_str());
  77 + cl->setReadyForDisconnect();
  78 + cl->getThreadData()->removeClient(cl);
  79 + cl->markAsDisconnecting();
  80 + }
  81 + }
  82 + sessionsById[client->getClientId()] = client;
81 } 83 }
82 84
83 // TODO: should I implement cache, this needs to be changed to returning a list of clients. 85 // TODO: should I implement cache, this needs to be changed to returning a list of clients.
@@ -85,12 +87,13 @@ void SubscriptionStore::publishNonRecursively(const MqttPacket &amp;packet, const st @@ -85,12 +87,13 @@ void SubscriptionStore::publishNonRecursively(const MqttPacket &amp;packet, const st
85 { 87 {
86 for (const std::string &client_id : subscribers) 88 for (const std::string &client_id : subscribers)
87 { 89 {
88 - auto client_it = clients_by_id_const.find(client_id);  
89 - if (client_it != clients_by_id_const.end()) 90 + auto session_it = sessionsByIdConst.find(client_id);
  91 + if (session_it != sessionsByIdConst.end())
90 { 92 {
91 - if (!client_it->second.expired()) 93 + const Session &session = session_it->second;
  94 + if (!session.clientDisconnected())
92 { 95 {
93 - Client_p c = client_it->second.lock(); 96 + Client_p c = session.makeSharedClient();
94 c->writeMqttPacketAndBlameThisClient(packet); 97 c->writeMqttPacketAndBlameThisClient(packet);
95 } 98 }
96 } 99 }
subscriptionstore.h
@@ -10,6 +10,7 @@ @@ -10,6 +10,7 @@
10 #include "forward_declarations.h" 10 #include "forward_declarations.h"
11 11
12 #include "client.h" 12 #include "client.h"
  13 +#include "session.h"
13 #include "utils.h" 14 #include "utils.h"
14 #include "retainedmessage.h" 15 #include "retainedmessage.h"
15 #include "logger.h" 16 #include "logger.h"
@@ -39,8 +40,8 @@ class SubscriptionStore @@ -39,8 +40,8 @@ class SubscriptionStore
39 { 40 {
40 std::unique_ptr<SubscriptionNode> root; 41 std::unique_ptr<SubscriptionNode> root;
41 pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER; 42 pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER;
42 - std::unordered_map<std::string, std::weak_ptr<Client>> clients_by_id;  
43 - const std::unordered_map<std::string, std::weak_ptr<Client>> &clients_by_id_const; 43 + std::unordered_map<std::string, Session> sessionsById;
  44 + const std::unordered_map<std::string, Session> &sessionsByIdConst;
44 45
45 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; 46 pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER;
46 std::unordered_set<RetainedMessage> retainedMessages; 47 std::unordered_set<RetainedMessage> retainedMessages;