diff --git a/client.cpp b/client.cpp index a50bd86..31d7d6c 100644 --- a/client.cpp +++ b/client.cpp @@ -66,6 +66,13 @@ void Client::writeMqttPacket(const MqttPacket &packet) wwi += packet.getSize(); } +// Not sure if this is the method I want to use +void Client::writeMqttPacketLocked(const MqttPacket &packet) +{ + std::lock_guard lock(writeBufMutex); + writeMqttPacket(packet); +} + // Ping responses are always the same, so hardcoding it for optimization. void Client::writePingResp() { diff --git a/client.h b/client.h index fc6ae46..1122aae 100644 --- a/client.h +++ b/client.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "forward_declarations.h" @@ -11,6 +12,7 @@ #include "mqttpacket.h" #include "exceptions.h" + #define CLIENT_BUFFER_SIZE 1024 #define MQTT_HEADER_LENGH 2 @@ -35,6 +37,7 @@ class Client uint16_t keepalive = 0; ThreadData_p threadData; + std::mutex writeBufMutex; size_t getReadBufBytesUsed() { @@ -96,6 +99,7 @@ public: void writePingResp(); void writeMqttPacket(const MqttPacket &packet); + void writeMqttPacketLocked(const MqttPacket &packet); bool writeBufIntoFd(); std::string repr(); diff --git a/main.cpp b/main.cpp index 3f8cbb5..423454d 100644 --- a/main.cpp +++ b/main.cpp @@ -36,7 +36,8 @@ void do_thread_work(ThreadData *threadData) { for (Client_p client : threadData->getReadyForDequeueing()) { - client->queuedMessagesToBuffer(); + //client->queuedMessagesToBuffer(); + client->writeBufIntoFd(); } threadData->clearReadyForDequeueing(); eventfd_value = 0; diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index fd25cd2..e0e5fc1 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -7,22 +7,27 @@ SubscriptionStore::SubscriptionStore() void SubscriptionStore::addSubscription(Client_p &client, std::string &topic) { + std::lock_guard lock(subscriptionsMutex); this->subscriptions[topic].push_back(client); } void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket &packet, const Client_p &sender) { + // TODO: temp. I want to work with read copies of the subscription store, to avoid frequent lock contention. + std::lock_guard lock(subscriptionsMutex); + for(Client_p &client : subscriptions[topic]) { if (client->getThreadData()->threadnr == sender->getThreadData()->threadnr) { - client->writeMqttPacket(packet); + client->writeMqttPacket(packet); // TODO: with my current hack way, this is wrong. Not using a lock only works with my previous idea of queueing. client->writeBufIntoFd(); } else { - client->queueMessage(packet); + client->writeMqttPacketLocked(packet); client->getThreadData()->addToReadyForDequeuing(client); + client->getThreadData()->wakeUpThread(); } } } diff --git a/subscriptionstore.h b/subscriptionstore.h index a661e36..7264bd8 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -3,6 +3,7 @@ #include #include +#include #include "forward_declarations.h" @@ -11,6 +12,7 @@ class SubscriptionStore { std::unordered_map> subscriptions; + std::mutex subscriptionsMutex; public: SubscriptionStore(); diff --git a/threaddata.cpp b/threaddata.cpp index 8e77fea..41ffd7f 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -7,6 +7,12 @@ ThreadData::ThreadData(int threadnr, std::shared_ptr &subscri { epollfd = check(epoll_create(999)); event_fd = eventfd(0, EFD_NONBLOCK); + + struct epoll_event ev; + memset(&ev, 0, sizeof (struct epoll_event)); + ev.data.fd = event_fd; + ev.events = EPOLLIN; + check(epoll_ctl(epollfd, EPOLL_CTL_ADD, event_fd, &ev)); } void ThreadData::giveClient(Client_p client) @@ -44,11 +50,13 @@ void ThreadData::wakeUpThread() void ThreadData::addToReadyForDequeuing(Client_p &client) { + std::lock_guard lock(readForDequeuingMutex); this->readyForDequeueing.insert(client); } void ThreadData::clearReadyForDequeueing() { + std::lock_guard lock(readForDequeuingMutex); this->readyForDequeueing.clear(); } diff --git a/threaddata.h b/threaddata.h index 15656f2..5d2a03f 100644 --- a/threaddata.h +++ b/threaddata.h @@ -8,6 +8,7 @@ #include #include #include +#include #include "forward_declarations.h" @@ -22,6 +23,7 @@ class ThreadData std::unordered_map clients_by_fd; std::shared_ptr subscriptionStore; std::unordered_set readyForDequeueing; + std::mutex readForDequeuingMutex; public: std::thread thread;