From 54f45b2729fb0896ee558622a03504e2149bfb5d Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Sun, 13 Dec 2020 13:17:16 +0100 Subject: [PATCH] Rudimentary lock based cross-thread publish --- client.cpp | 6 ++++++ subscriptionstore.cpp | 5 ++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/client.cpp b/client.cpp index 2e82b78..5bfcfec 100644 --- a/client.cpp +++ b/client.cpp @@ -77,6 +77,8 @@ bool Client::readFdIntoBuffer() void Client::writeMqttPacket(const MqttPacket &packet) { + std::lock_guard locker(writeBufMutex); + if (packet.packetType == PacketType::PUBLISH && wwi > CLIENT_MAX_BUFFER_SIZE) return; @@ -99,6 +101,8 @@ void Client::writeMqttPacketLocked(const MqttPacket &packet) // Ping responses are always the same, so hardcoding it for optimization. void Client::writePingResp() { + std::lock_guard locker(writeBufMutex); + std::cout << "Sending ping response to " << repr() << std::endl; if (2 > getWriteBufMaxWriteSize()) @@ -112,6 +116,8 @@ void Client::writePingResp() bool Client::writeBufIntoFd() { + std::lock_guard locker(writeBufMutex); + int n; while ((n = write(fd, &writebuf[wri], getWriteBufBytesUsed())) != 0) { diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index ff83282..23ee222 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -28,6 +28,9 @@ void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket for(const Client_p &client : subscriptions[topic]) { + client->writeMqttPacket(packet); + + /* if (client->getThreadData()->threadnr == sender->getThreadData()->threadnr) { client->writeMqttPacket(packet); // TODO: with my current hack way, this is wrong. Not using a lock only works with my previous idea of queueing. @@ -39,6 +42,6 @@ void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket //client->writeMqttPacketLocked(packet); //client->getThreadData()->addToReadyForDequeuing(client); //client->getThreadData()->wakeUpThread(); - } + }*/ } } -- libgit2 0.21.4