diff --git a/CMakeLists.txt b/CMakeLists.txt
index eb057a8..517499f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -12,6 +12,7 @@ SET(CMAKE_CXX_FLAGS "-rdynamic")
add_compile_options(-Wall)
add_executable(FlashMQ
+ forward_declarations.h
mainapp.h
utils.h
threaddata.h
diff --git a/client.cpp b/client.cpp
index 829cccc..f2f9c32 100644
--- a/client.cpp
+++ b/client.cpp
@@ -24,7 +24,7 @@ License along with FlashMQ. If not, see .
#include "logger.h"
-Client::Client(int fd, ThreadData_p threadData, SSL *ssl, bool websocket, std::shared_ptr settings, bool fuzzMode) :
+Client::Client(int fd, std::shared_ptr threadData, SSL *ssl, bool websocket, std::shared_ptr settings, bool fuzzMode) :
fd(fd),
fuzzMode(fuzzMode),
initialBufferSize(settings->clientInitialBufferSize), // The client is constructed in the main thread, so we need to use its settings copy
@@ -341,7 +341,7 @@ void Client::setReadyForReading(bool val)
check(epoll_ctl(threadData->epollfd, EPOLL_CTL_MOD, fd, &ev));
}
-bool Client::bufferToMqttPackets(std::vector &packetQueueIn, Client_p &sender)
+bool Client::bufferToMqttPackets(std::vector &packetQueueIn, std::shared_ptr &sender)
{
while (readbuf.usedBytes() >= MQTT_HEADER_LENGH)
{
diff --git a/client.h b/client.h
index b988196..a730a8e 100644
--- a/client.h
+++ b/client.h
@@ -77,7 +77,7 @@ class Client
bool will_retain = false;
char will_qos = 0;
- ThreadData_p threadData;
+ std::shared_ptr threadData;
std::mutex writeBufMutex;
std::shared_ptr session;
@@ -88,7 +88,7 @@ class Client
void setReadyForReading(bool val);
public:
- Client(int fd, ThreadData_p threadData, SSL *ssl, bool websocket, std::shared_ptr settings, bool fuzzMode=false);
+ Client(int fd, std::shared_ptr threadData, SSL *ssl, bool websocket, std::shared_ptr settings, bool fuzzMode=false);
Client(const Client &other) = delete;
Client(Client &&other) = delete;
~Client();
@@ -102,14 +102,14 @@ public:
void startOrContinueSslAccept();
void markAsDisconnecting();
bool readFdIntoBuffer();
- bool bufferToMqttPackets(std::vector &packetQueueIn, Client_p &sender);
+ bool bufferToMqttPackets(std::vector &packetQueueIn, std::shared_ptr &sender);
void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive, bool cleanSession);
void setWill(const std::string &topic, const std::string &payload, bool retain, char qos);
void clearWill();
void setAuthenticated(bool value) { authenticated = value;}
bool getAuthenticated() { return authenticated; }
bool hasConnectPacketSeen() { return connectPacketSeen; }
- ThreadData_p getThreadData() { return threadData; }
+ std::shared_ptr getThreadData() { return threadData; }
std::string &getClientId() { return this->clientid; }
const std::string &getUsername() const { return this->username; }
bool getCleanSession() { return cleanSession; }
diff --git a/forward_declarations.h b/forward_declarations.h
index dbc0a0c..755dc38 100644
--- a/forward_declarations.h
+++ b/forward_declarations.h
@@ -21,9 +21,7 @@ License along with FlashMQ. If not, see .
#include
class Client;
-typedef std::shared_ptr Client_p;
class ThreadData;
-typedef std::shared_ptr ThreadData_p;
class MqttPacket;
class SubscriptionStore;
class Session;
diff --git a/mainapp.cpp b/mainapp.cpp
index dfbb384..e49975f 100644
--- a/mainapp.cpp
+++ b/mainapp.cpp
@@ -91,7 +91,7 @@ void do_thread_work(ThreadData *threadData)
continue;
}
- Client_p client = threadData->getClient(fd);
+ std::shared_ptr client = threadData->getClient(fd);
if (client)
{
@@ -437,7 +437,7 @@ void MainApp::start()
std::shared_ptr threaddata(new ThreadData(0, subscriptionStore, settings));
- Client_p client(new Client(fd, threaddata, nullptr, fuzzWebsockets, settings, true));
+ std::shared_ptr client(new Client(fd, threaddata, nullptr, fuzzWebsockets, settings, true));
if (fuzzWebsockets && strContains(fuzzFilePathLower, "upgrade"))
client->setFakeUpgraded();
@@ -515,7 +515,7 @@ void MainApp::start()
SSL_set_fd(clientSSL, fd);
}
- Client_p client(new Client(fd, thread_data, clientSSL, listener->websocket, settings));
+ std::shared_ptr client(new Client(fd, thread_data, clientSSL, listener->websocket, settings));
thread_data->giveClient(client);
}
else
diff --git a/mqttpacket.cpp b/mqttpacket.cpp
index 6c84154..48384df 100644
--- a/mqttpacket.cpp
+++ b/mqttpacket.cpp
@@ -29,7 +29,7 @@ RemainingLength::RemainingLength()
}
// constructor for parsing incoming packets
-MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_length, Client_p &sender) :
+MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_length, std::shared_ptr &sender) :
bites(packet_len),
fixed_header_length(fixed_header_length),
sender(sender)
@@ -574,12 +574,12 @@ const std::string &MqttPacket::getTopic() const
}
-Client_p MqttPacket::getSender() const
+std::shared_ptr MqttPacket::getSender() const
{
return sender;
}
-void MqttPacket::setSender(const Client_p &value)
+void MqttPacket::setSender(const std::shared_ptr &value)
{
sender = value;
}
diff --git a/mqttpacket.h b/mqttpacket.h
index 9550a71..40eff69 100644
--- a/mqttpacket.h
+++ b/mqttpacket.h
@@ -48,7 +48,7 @@ class MqttPacket
size_t fixed_header_length = 0; // if 0, this packet does not contain the bytes of the fixed header.
RemainingLength remainingLength;
char qos = 0;
- Client_p sender;
+ std::shared_ptr sender;
char first_byte = 0;
size_t pos = 0;
size_t packet_id_pos = 0;
@@ -67,7 +67,7 @@ class MqttPacket
MqttPacket(const MqttPacket &other) = default;
public:
PacketType packetType = PacketType::Reserved;
- MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_length, Client_p &sender); // Constructor for parsing incoming packets.
+ MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_length, std::shared_ptr &sender); // Constructor for parsing incoming packets.
MqttPacket(MqttPacket &&other) = default;
@@ -93,8 +93,8 @@ public:
const std::vector &getBites() const { return bites; }
char getQos() const { return qos; }
const std::string &getTopic() const;
- Client_p getSender() const;
- void setSender(const Client_p &value);
+ std::shared_ptr getSender() const;
+ void setSender(const std::shared_ptr &value);
bool containsFixedHeader() const;
char getFirstByte() const;
RemainingLength getRemainingLength() const;
diff --git a/session.cpp b/session.cpp
index 9163b5b..c00da97 100644
--- a/session.cpp
+++ b/session.cpp
@@ -60,7 +60,7 @@ void Session::writePacket(const MqttPacket &packet, char max_qos)
{
if (!clientDisconnected())
{
- Client_p c = makeSharedClient();
+ std::shared_ptr c = makeSharedClient();
c->writeMqttPacketAndBlameThisClient(packet);
}
}
@@ -84,7 +84,7 @@ void Session::writePacket(const MqttPacket &packet, char max_qos)
if (!clientDisconnected())
{
- Client_p c = makeSharedClient();
+ std::shared_ptr c = makeSharedClient();
c->writeMqttPacketAndBlameThisClient(*copyPacket.get());
copyPacket->setDuplicate(); // Any dealings with this packet from here will be a duplicate.
}
@@ -130,7 +130,7 @@ void Session::sendPendingQosMessages()
{
if (!clientDisconnected())
{
- Client_p c = makeSharedClient();
+ std::shared_ptr c = makeSharedClient();
std::lock_guard locker(qosQueueMutex);
for (QueuedQosPacket &qosMessage : qosPacketQueue)
{
diff --git a/session.h b/session.h
index d9e27f9..6bfa1d9 100644
--- a/session.h
+++ b/session.h
@@ -41,7 +41,7 @@ struct QueuedQosPacket
class Session
{
std::weak_ptr client;
- ThreadData_p thread;
+ std::shared_ptr thread;
std::string client_id;
std::string username;
std::list qosPacketQueue; // Using list because it's easiest to maintain order [MQTT-4.6.0-6]
diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp
index 611ecec..f2138ba 100644
--- a/subscriptionstore.cpp
+++ b/subscriptionstore.cpp
@@ -69,7 +69,7 @@ SubscriptionStore::SubscriptionStore() :
}
-void SubscriptionStore::addSubscription(Client_p &client, const std::string &topic, char qos)
+void SubscriptionStore::addSubscription(std::shared_ptr &client, const std::string &topic, char qos)
{
const std::list subtopics = split(topic, '/');
@@ -115,7 +115,7 @@ void SubscriptionStore::addSubscription(Client_p &client, const std::string &top
}
-void SubscriptionStore::removeSubscription(Client_p &client, const std::string &topic)
+void SubscriptionStore::removeSubscription(std::shared_ptr &client, const std::string &topic)
{
const std::list subtopics = split(topic, '/');
@@ -162,7 +162,7 @@ void SubscriptionStore::removeSubscription(Client_p &client, const std::string &
}
// Removes an existing client when it already exists [MQTT-3.1.4-2].
-void SubscriptionStore::registerClientAndKickExistingOne(Client_p &client)
+void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr &client)
{
RWLockGuard lock_guard(&subscriptionsRwlock);
lock_guard.wrlock();
diff --git a/subscriptionstore.h b/subscriptionstore.h
index 4d4469a..3cd60ba 100644
--- a/subscriptionstore.h
+++ b/subscriptionstore.h
@@ -86,9 +86,9 @@ class SubscriptionStore
public:
SubscriptionStore();
- void addSubscription(Client_p &client, const std::string &topic, char qos);
- void removeSubscription(Client_p &client, const std::string &topic);
- void registerClientAndKickExistingOne(Client_p &client);
+ void addSubscription(std::shared_ptr &client, const std::string &topic, char qos);
+ void removeSubscription(std::shared_ptr &client, const std::string &topic);
+ void registerClientAndKickExistingOne(std::shared_ptr &client);
bool sessionPresent(const std::string &clientid);
void queuePacketAtSubscribers(const std::string &topic, const MqttPacket &packet);
diff --git a/threaddata.cpp b/threaddata.cpp
index 43bdb17..eb5d1f6 100644
--- a/threaddata.cpp
+++ b/threaddata.cpp
@@ -71,7 +71,7 @@ void ThreadData::quit()
running = false;
}
-void ThreadData::giveClient(Client_p client)
+void ThreadData::giveClient(std::shared_ptr client)
{
clients_by_fd_mutex.lock();
int fd = client->getFd();
@@ -85,13 +85,13 @@ void ThreadData::giveClient(Client_p client)
check(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev));
}
-Client_p ThreadData::getClient(int fd)
+std::shared_ptr ThreadData::getClient(int fd)
{
std::lock_guard lck(clients_by_fd_mutex);
return this->clients_by_fd[fd];
}
-void ThreadData::removeClient(Client_p client)
+void ThreadData::removeClient(std::shared_ptr client)
{
client->markAsDisconnecting();
@@ -157,7 +157,7 @@ void ThreadData::doKeepAliveCheck()
auto it = clients_by_fd.begin();
while (it != clients_by_fd.end())
{
- Client_p &client = it->second;
+ std::shared_ptr &client = it->second;
if (client && client->keepAliveExpired())
{
client->setDisconnectReason("Keep-alive expired: " + client->getKeepAliveInfoString());
diff --git a/threaddata.h b/threaddata.h
index e304a80..02693ef 100644
--- a/threaddata.h
+++ b/threaddata.h
@@ -42,7 +42,7 @@ typedef void (*thread_f)(ThreadData *);
class ThreadData
{
- std::unordered_map clients_by_fd;
+ std::unordered_map> clients_by_fd;
std::mutex clients_by_fd_mutex;
std::shared_ptr subscriptionStore;
Logger *logger;
@@ -69,9 +69,9 @@ public:
void start(thread_f f);
- void giveClient(Client_p client);
- Client_p getClient(int fd);
- void removeClient(Client_p client);
+ void giveClient(std::shared_ptr client);
+ std::shared_ptr getClient(int fd);
+ void removeClient(std::shared_ptr client);
void removeClient(int fd);
std::shared_ptr &getSubscriptionStore();