Commit a633388177049e700950fb1ff74b18be2ba27ec8

Authored by Wiebe Cazemier
1 parent 40f8e5e5

quick hack to get all threads to work

client.cpp
@@ -66,6 +66,13 @@ void Client::writeMqttPacket(const MqttPacket &packet) @@ -66,6 +66,13 @@ void Client::writeMqttPacket(const MqttPacket &packet)
66 wwi += packet.getSize(); 66 wwi += packet.getSize();
67 } 67 }
68 68
  69 +// Not sure if this is the method I want to use
  70 +void Client::writeMqttPacketLocked(const MqttPacket &packet)
  71 +{
  72 + std::lock_guard<std::mutex> lock(writeBufMutex);
  73 + writeMqttPacket(packet);
  74 +}
  75 +
69 // Ping responses are always the same, so hardcoding it for optimization. 76 // Ping responses are always the same, so hardcoding it for optimization.
70 void Client::writePingResp() 77 void Client::writePingResp()
71 { 78 {
client.h
@@ -4,6 +4,7 @@ @@ -4,6 +4,7 @@
4 #include <fcntl.h> 4 #include <fcntl.h>
5 #include <unistd.h> 5 #include <unistd.h>
6 #include <vector> 6 #include <vector>
  7 +#include <mutex>
7 8
8 #include "forward_declarations.h" 9 #include "forward_declarations.h"
9 10
@@ -11,6 +12,7 @@ @@ -11,6 +12,7 @@
11 #include "mqttpacket.h" 12 #include "mqttpacket.h"
12 #include "exceptions.h" 13 #include "exceptions.h"
13 14
  15 +
14 #define CLIENT_BUFFER_SIZE 1024 16 #define CLIENT_BUFFER_SIZE 1024
15 #define MQTT_HEADER_LENGH 2 17 #define MQTT_HEADER_LENGH 2
16 18
@@ -35,6 +37,7 @@ class Client @@ -35,6 +37,7 @@ class Client
35 uint16_t keepalive = 0; 37 uint16_t keepalive = 0;
36 38
37 ThreadData_p threadData; 39 ThreadData_p threadData;
  40 + std::mutex writeBufMutex;
38 41
39 size_t getReadBufBytesUsed() 42 size_t getReadBufBytesUsed()
40 { 43 {
@@ -96,6 +99,7 @@ public: @@ -96,6 +99,7 @@ public:
96 99
97 void writePingResp(); 100 void writePingResp();
98 void writeMqttPacket(const MqttPacket &packet); 101 void writeMqttPacket(const MqttPacket &packet);
  102 + void writeMqttPacketLocked(const MqttPacket &packet);
99 bool writeBufIntoFd(); 103 bool writeBufIntoFd();
100 104
101 std::string repr(); 105 std::string repr();
main.cpp
@@ -36,7 +36,8 @@ void do_thread_work(ThreadData *threadData) @@ -36,7 +36,8 @@ void do_thread_work(ThreadData *threadData)
36 { 36 {
37 for (Client_p client : threadData->getReadyForDequeueing()) 37 for (Client_p client : threadData->getReadyForDequeueing())
38 { 38 {
39 - client->queuedMessagesToBuffer(); 39 + //client->queuedMessagesToBuffer();
  40 + client->writeBufIntoFd();
40 } 41 }
41 threadData->clearReadyForDequeueing(); 42 threadData->clearReadyForDequeueing();
42 eventfd_value = 0; 43 eventfd_value = 0;
subscriptionstore.cpp
@@ -7,22 +7,27 @@ SubscriptionStore::SubscriptionStore() @@ -7,22 +7,27 @@ SubscriptionStore::SubscriptionStore()
7 7
8 void SubscriptionStore::addSubscription(Client_p &client, std::string &topic) 8 void SubscriptionStore::addSubscription(Client_p &client, std::string &topic)
9 { 9 {
  10 + std::lock_guard<std::mutex> lock(subscriptionsMutex);
10 this->subscriptions[topic].push_back(client); 11 this->subscriptions[topic].push_back(client);
11 } 12 }
12 13
13 void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket &packet, const Client_p &sender) 14 void SubscriptionStore::queueAtClientsTemp(std::string &topic, const MqttPacket &packet, const Client_p &sender)
14 { 15 {
  16 + // TODO: temp. I want to work with read copies of the subscription store, to avoid frequent lock contention.
  17 + std::lock_guard<std::mutex> lock(subscriptionsMutex);
  18 +
15 for(Client_p &client : subscriptions[topic]) 19 for(Client_p &client : subscriptions[topic])
16 { 20 {
17 if (client->getThreadData()->threadnr == sender->getThreadData()->threadnr) 21 if (client->getThreadData()->threadnr == sender->getThreadData()->threadnr)
18 { 22 {
19 - client->writeMqttPacket(packet); 23 + client->writeMqttPacket(packet); // TODO: with my current hack way, this is wrong. Not using a lock only works with my previous idea of queueing.
20 client->writeBufIntoFd(); 24 client->writeBufIntoFd();
21 } 25 }
22 else 26 else
23 { 27 {
24 - client->queueMessage(packet); 28 + client->writeMqttPacketLocked(packet);
25 client->getThreadData()->addToReadyForDequeuing(client); 29 client->getThreadData()->addToReadyForDequeuing(client);
  30 + client->getThreadData()->wakeUpThread();
26 } 31 }
27 } 32 }
28 } 33 }
subscriptionstore.h
@@ -3,6 +3,7 @@ @@ -3,6 +3,7 @@
3 3
4 #include <unordered_map> 4 #include <unordered_map>
5 #include <list> 5 #include <list>
  6 +#include <mutex>
6 7
7 #include "forward_declarations.h" 8 #include "forward_declarations.h"
8 9
@@ -11,6 +12,7 @@ @@ -11,6 +12,7 @@
11 class SubscriptionStore 12 class SubscriptionStore
12 { 13 {
13 std::unordered_map<std::string, std::list<Client_p>> subscriptions; 14 std::unordered_map<std::string, std::list<Client_p>> subscriptions;
  15 + std::mutex subscriptionsMutex;
14 public: 16 public:
15 SubscriptionStore(); 17 SubscriptionStore();
16 18
threaddata.cpp
@@ -7,6 +7,12 @@ ThreadData::ThreadData(int threadnr, std::shared_ptr&lt;SubscriptionStore&gt; &amp;subscri @@ -7,6 +7,12 @@ ThreadData::ThreadData(int threadnr, std::shared_ptr&lt;SubscriptionStore&gt; &amp;subscri
7 { 7 {
8 epollfd = check<std::runtime_error>(epoll_create(999)); 8 epollfd = check<std::runtime_error>(epoll_create(999));
9 event_fd = eventfd(0, EFD_NONBLOCK); 9 event_fd = eventfd(0, EFD_NONBLOCK);
  10 +
  11 + struct epoll_event ev;
  12 + memset(&ev, 0, sizeof (struct epoll_event));
  13 + ev.data.fd = event_fd;
  14 + ev.events = EPOLLIN;
  15 + check<std::runtime_error>(epoll_ctl(epollfd, EPOLL_CTL_ADD, event_fd, &ev));
10 } 16 }
11 17
12 void ThreadData::giveClient(Client_p client) 18 void ThreadData::giveClient(Client_p client)
@@ -44,11 +50,13 @@ void ThreadData::wakeUpThread() @@ -44,11 +50,13 @@ void ThreadData::wakeUpThread()
44 50
45 void ThreadData::addToReadyForDequeuing(Client_p &client) 51 void ThreadData::addToReadyForDequeuing(Client_p &client)
46 { 52 {
  53 + std::lock_guard<std::mutex> lock(readForDequeuingMutex);
47 this->readyForDequeueing.insert(client); 54 this->readyForDequeueing.insert(client);
48 } 55 }
49 56
50 void ThreadData::clearReadyForDequeueing() 57 void ThreadData::clearReadyForDequeueing()
51 { 58 {
  59 + std::lock_guard<std::mutex> lock(readForDequeuingMutex);
52 this->readyForDequeueing.clear(); 60 this->readyForDequeueing.clear();
53 } 61 }
54 62
threaddata.h
@@ -8,6 +8,7 @@ @@ -8,6 +8,7 @@
8 #include <map> 8 #include <map>
9 #include <unordered_set> 9 #include <unordered_set>
10 #include <unordered_map> 10 #include <unordered_map>
  11 +#include <mutex>
11 12
12 #include "forward_declarations.h" 13 #include "forward_declarations.h"
13 14
@@ -22,6 +23,7 @@ class ThreadData @@ -22,6 +23,7 @@ class ThreadData
22 std::unordered_map<int, Client_p> clients_by_fd; 23 std::unordered_map<int, Client_p> clients_by_fd;
23 std::shared_ptr<SubscriptionStore> subscriptionStore; 24 std::shared_ptr<SubscriptionStore> subscriptionStore;
24 std::unordered_set<Client_p> readyForDequeueing; 25 std::unordered_set<Client_p> readyForDequeueing;
  26 + std::mutex readForDequeuingMutex;
25 27
26 public: 28 public:
27 std::thread thread; 29 std::thread thread;