From 3cb1fae8aa306954c140ca24be34d8931e9c424c Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Mon, 4 Jan 2021 22:16:08 +0100 Subject: [PATCH] Store clients as sessions --- CMakeLists.txt | 1 + session.cpp | 21 +++++++++++++++++++++ session.h | 20 ++++++++++++++++++++ subscriptionstore.cpp | 35 +++++++++++++++++++---------------- subscriptionstore.h | 5 +++-- 5 files changed, 64 insertions(+), 18 deletions(-) create mode 100644 session.cpp create mode 100644 session.h diff --git a/CMakeLists.txt b/CMakeLists.txt index c24d452..ddbca1c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,7 @@ add_executable(FlashMQ utils.cpp threaddata.cpp client.cpp + session.cpp bytestopacketparser.cpp mqttpacket.cpp exceptions.cpp diff --git a/session.cpp b/session.cpp new file mode 100644 index 0000000..a228919 --- /dev/null +++ b/session.cpp @@ -0,0 +1,21 @@ +#include "session.h" + +Session::Session() +{ + +} + +Session::Session(std::shared_ptr &client) +{ + this->client = client; +} + +bool Session::clientDisconnected() const +{ + return client.expired(); +} + +std::shared_ptr Session::makeSharedClient() const +{ + return client.lock(); +} diff --git a/session.h b/session.h new file mode 100644 index 0000000..ef6cb60 --- /dev/null +++ b/session.h @@ -0,0 +1,20 @@ +#ifndef SESSION_H +#define SESSION_H + +#include + +class Client; + +class Session +{ + std::weak_ptr client; + // TODO: qos message queue, as some kind of movable pointer. +public: + Session(); + Session(std::shared_ptr &client); + + bool clientDisconnected() const; + std::shared_ptr makeSharedClient() const; +}; + +#endif // SESSION_H diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index b0db79f..b6b4d9e 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -13,7 +13,7 @@ SubscriptionNode::SubscriptionNode(const std::string &subtopic) : SubscriptionStore::SubscriptionStore() : root(new SubscriptionNode("root")), - clients_by_id_const(clients_by_id) + sessionsByIdConst(sessionsById) { } @@ -65,19 +65,21 @@ void SubscriptionStore::registerClientAndKickExistingOne(Client_p &client) if (client->getClientId().empty()) throw ProtocolError("Trying to store client without an ID."); - std::weak_ptr existingClient = clients_by_id[client->getClientId()]; - auto it = clients_by_id.find(client->getClientId()); - - if (it != clients_by_id.end() && !it->second.expired()) + auto session_it = sessionsById.find(client->getClientId()); + if (session_it != sessionsById.end()) { - std::shared_ptr cl = it->second.lock(); - logger->logf(LOG_NOTICE, "Disconnecting existing client with id '%s'", cl->getClientId().c_str()); - cl->setReadyForDisconnect(); - cl->getThreadData()->removeClient(cl); - cl->markAsDisconnecting(); - } + Session &session = session_it->second; - clients_by_id[client->getClientId()] = client; + if (!session.clientDisconnected()) + { + std::shared_ptr cl = session.makeSharedClient(); + logger->logf(LOG_NOTICE, "Disconnecting existing client with id '%s'", cl->getClientId().c_str()); + cl->setReadyForDisconnect(); + cl->getThreadData()->removeClient(cl); + cl->markAsDisconnecting(); + } + } + sessionsById[client->getClientId()] = client; } // TODO: should I implement cache, this needs to be changed to returning a list of clients. @@ -85,12 +87,13 @@ void SubscriptionStore::publishNonRecursively(const MqttPacket &packet, const st { for (const std::string &client_id : subscribers) { - auto client_it = clients_by_id_const.find(client_id); - if (client_it != clients_by_id_const.end()) + auto session_it = sessionsByIdConst.find(client_id); + if (session_it != sessionsByIdConst.end()) { - if (!client_it->second.expired()) + const Session &session = session_it->second; + if (!session.clientDisconnected()) { - Client_p c = client_it->second.lock(); + Client_p c = session.makeSharedClient(); c->writeMqttPacketAndBlameThisClient(packet); } } diff --git a/subscriptionstore.h b/subscriptionstore.h index e2c7a43..ffd94cc 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -10,6 +10,7 @@ #include "forward_declarations.h" #include "client.h" +#include "session.h" #include "utils.h" #include "retainedmessage.h" #include "logger.h" @@ -39,8 +40,8 @@ class SubscriptionStore { std::unique_ptr root; pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER; - std::unordered_map> clients_by_id; - const std::unordered_map> &clients_by_id_const; + std::unordered_map sessionsById; + const std::unordered_map &sessionsByIdConst; pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; std::unordered_set retainedMessages; -- libgit2 0.21.4