Commit 54f45b2729fb0896ee558622a03504e2149bfb5d

Authored by Wiebe Cazemier
1 parent e29e0f6d

Rudimentary lock based cross-thread publish

client.cpp
@@ -77,6 +77,8 @@ bool Client::readFdIntoBuffer() @@ -77,6 +77,8 @@ bool Client::readFdIntoBuffer()
77 77
78 void Client::writeMqttPacket(const MqttPacket &packet) 78 void Client::writeMqttPacket(const MqttPacket &packet)
79 { 79 {
  80 + std::lock_guard<std::mutex> locker(writeBufMutex);
  81 +
80 if (packet.packetType == PacketType::PUBLISH && wwi > CLIENT_MAX_BUFFER_SIZE) 82 if (packet.packetType == PacketType::PUBLISH && wwi > CLIENT_MAX_BUFFER_SIZE)
81 return; 83 return;
82 84
@@ -99,6 +101,8 @@ void Client::writeMqttPacketLocked(const MqttPacket &amp;packet) @@ -99,6 +101,8 @@ void Client::writeMqttPacketLocked(const MqttPacket &amp;packet)
99 // Ping responses are always the same, so hardcoding it for optimization. 101 // Ping responses are always the same, so hardcoding it for optimization.
100 void Client::writePingResp() 102 void Client::writePingResp()
101 { 103 {
  104 + std::lock_guard<std::mutex> locker(writeBufMutex);
  105 +
102 std::cout << "Sending ping response to " << repr() << std::endl; 106 std::cout << "Sending ping response to " << repr() << std::endl;
103 107
104 if (2 > getWriteBufMaxWriteSize()) 108 if (2 > getWriteBufMaxWriteSize())
@@ -112,6 +116,8 @@ void Client::writePingResp() @@ -112,6 +116,8 @@ void Client::writePingResp()
112 116
113 bool Client::writeBufIntoFd() 117 bool Client::writeBufIntoFd()
114 { 118 {
  119 + std::lock_guard<std::mutex> locker(writeBufMutex);
  120 +
115 int n; 121 int n;
116 while ((n = write(fd, &writebuf[wri], getWriteBufBytesUsed())) != 0) 122 while ((n = write(fd, &writebuf[wri], getWriteBufBytesUsed())) != 0)
117 { 123 {
subscriptionstore.cpp
@@ -28,6 +28,9 @@ void SubscriptionStore::queueAtClientsTemp(std::string &amp;topic, const MqttPacket @@ -28,6 +28,9 @@ void SubscriptionStore::queueAtClientsTemp(std::string &amp;topic, const MqttPacket
28 28
29 for(const Client_p &client : subscriptions[topic]) 29 for(const Client_p &client : subscriptions[topic])
30 { 30 {
  31 + client->writeMqttPacket(packet);
  32 +
  33 + /*
31 if (client->getThreadData()->threadnr == sender->getThreadData()->threadnr) 34 if (client->getThreadData()->threadnr == sender->getThreadData()->threadnr)
32 { 35 {
33 client->writeMqttPacket(packet); // TODO: with my current hack way, this is wrong. Not using a lock only works with my previous idea of queueing. 36 client->writeMqttPacket(packet); // TODO: with my current hack way, this is wrong. Not using a lock only works with my previous idea of queueing.
@@ -39,6 +42,6 @@ void SubscriptionStore::queueAtClientsTemp(std::string &amp;topic, const MqttPacket @@ -39,6 +42,6 @@ void SubscriptionStore::queueAtClientsTemp(std::string &amp;topic, const MqttPacket
39 //client->writeMqttPacketLocked(packet); 42 //client->writeMqttPacketLocked(packet);
40 //client->getThreadData()->addToReadyForDequeuing(client); 43 //client->getThreadData()->addToReadyForDequeuing(client);
41 //client->getThreadData()->wakeUpThread(); 44 //client->getThreadData()->wakeUpThread();
42 - } 45 + }*/
43 } 46 }
44 } 47 }