#ifndef SUBSCRIPTIONSTORE_H #define SUBSCRIPTIONSTORE_H #include #include #include #include #include #include "forward_declarations.h" #include "client.h" #include "session.h" #include "utils.h" #include "retainedmessage.h" #include "logger.h" struct RetainedPayload { std::string payload; char qos; }; struct Subscription { std::weak_ptr session; // Weak pointer expires when session has been cleaned by 'clean session' connect or when it was remove because it expired char qos; bool operator==(const Subscription &rhs) const; void reset(); bool sessionGone() const; }; class SubscriptionNode { std::string subtopic; std::vector subscribers; public: SubscriptionNode(const std::string &subtopic); SubscriptionNode(const SubscriptionNode &node) = delete; SubscriptionNode(SubscriptionNode &&node) = delete; std::vector &getSubscribers(); void addSubscriber(const std::shared_ptr &subscriber, char qos); std::unordered_map> children; std::unique_ptr childrenPlus; std::unique_ptr childrenPound; int cleanSubscriptions(); }; class SubscriptionStore { std::unique_ptr root; pthread_rwlock_t subscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER; std::unordered_map> sessionsById; const std::unordered_map> &sessionsByIdConst; pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER; std::unordered_set retainedMessages; Logger *logger = Logger::getInstance(); void publishNonRecursively(const MqttPacket &packet, const std::vector &subscribers) const; void publishRecursively(std::vector::const_iterator cur_subtopic_it, std::vector::const_iterator end, std::unique_ptr &next, const MqttPacket &packet) const; public: SubscriptionStore(); void addSubscription(Client_p &client, const std::string &topic, char qos); void registerClientAndKickExistingOne(Client_p &client); void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet, const Client_p &sender); void giveClientRetainedMessages(const std::shared_ptr &ses, const std::string &subscribe_topic, char max_qos); void setRetainedMessage(const std::string &topic, const std::string &payload, char qos); void removeExpiredSessionsClients(); }; #endif // SUBSCRIPTIONSTORE_H