Commit fce0beff8381f2eec8fe184daf192a453c303c73

Authored by Wiebe Cazemier
1 parent 321015d4

Remove old eventfd on thread concept

We use epoll now to change readiness.
mainapp.cpp
@@ -16,21 +16,8 @@ void do_thread_work(ThreadData *threadData) @@ -16,21 +16,8 @@ void do_thread_work(ThreadData *threadData)
16 16
17 std::vector<MqttPacket> packetQueueIn; 17 std::vector<MqttPacket> packetQueueIn;
18 18
19 - uint64_t eventfd_value = 0;  
20 -  
21 while (threadData->running) 19 while (threadData->running)
22 { 20 {
23 - if (eventfd_value > 0)  
24 - {  
25 - for (Client_p client : threadData->getReadyForDequeueing())  
26 - {  
27 - //client->queuedMessagesToBuffer();  
28 - client->writeBufIntoFd();  
29 - }  
30 - threadData->clearReadyForDequeueing();  
31 - eventfd_value = 0;  
32 - }  
33 -  
34 int fdcount = epoll_wait(epoll_fd, events, MAX_EVENTS, 100); 21 int fdcount = epoll_wait(epoll_fd, events, MAX_EVENTS, 100);
35 22
36 if (fdcount < 0) 23 if (fdcount < 0)
@@ -46,13 +33,6 @@ void do_thread_work(ThreadData *threadData) @@ -46,13 +33,6 @@ void do_thread_work(ThreadData *threadData)
46 struct epoll_event cur_ev = events[i]; 33 struct epoll_event cur_ev = events[i];
47 int fd = cur_ev.data.fd; 34 int fd = cur_ev.data.fd;
48 35
49 - // If this thread was actively woken up.  
50 - if (fd == threadData->event_fd)  
51 - {  
52 - read(fd, &eventfd_value, sizeof(uint64_t));  
53 - continue;  
54 - }  
55 -  
56 Client_p client = threadData->getClient(fd); 36 Client_p client = threadData->getClient(fd);
57 37
58 if (client) 38 if (client)
threaddata.cpp
@@ -7,13 +7,6 @@ ThreadData::ThreadData(int threadnr, std::shared_ptr&lt;SubscriptionStore&gt; &amp;subscri @@ -7,13 +7,6 @@ ThreadData::ThreadData(int threadnr, std::shared_ptr&lt;SubscriptionStore&gt; &amp;subscri
7 threadnr(threadnr) 7 threadnr(threadnr)
8 { 8 {
9 epollfd = check<std::runtime_error>(epoll_create(999)); 9 epollfd = check<std::runtime_error>(epoll_create(999));
10 - event_fd = eventfd(0, EFD_NONBLOCK);  
11 -  
12 - struct epoll_event ev;  
13 - memset(&ev, 0, sizeof (struct epoll_event));  
14 - ev.data.fd = event_fd;  
15 - ev.events = EPOLLIN;  
16 - check<std::runtime_error>(epoll_ctl(epollfd, EPOLL_CTL_ADD, event_fd, &ev));  
17 } 10 }
18 11
19 void ThreadData::moveThreadHere(std::thread &&thread) 12 void ThreadData::moveThreadHere(std::thread &&thread)
@@ -79,22 +72,5 @@ std::shared_ptr&lt;SubscriptionStore&gt; &amp;ThreadData::getSubscriptionStore() @@ -79,22 +72,5 @@ std::shared_ptr&lt;SubscriptionStore&gt; &amp;ThreadData::getSubscriptionStore()
79 return subscriptionStore; 72 return subscriptionStore;
80 } 73 }
81 74
82 -void ThreadData::wakeUpThread()  
83 -{  
84 - uint64_t one = 1;  
85 - write(event_fd, &one, sizeof(uint64_t));  
86 -}  
87 -  
88 -void ThreadData::addToReadyForDequeuing(Client_p &client)  
89 -{  
90 - std::lock_guard<std::mutex> lock(readForDequeuingMutex);  
91 - this->readyForDequeueing.insert(client);  
92 -}  
93 -  
94 -void ThreadData::clearReadyForDequeueing()  
95 -{  
96 - std::lock_guard<std::mutex> lock(readForDequeuingMutex);  
97 - this->readyForDequeueing.clear();  
98 -}  
99 75
100 76
threaddata.h
@@ -24,15 +24,12 @@ class ThreadData @@ -24,15 +24,12 @@ class ThreadData
24 std::unordered_map<int, Client_p> clients_by_fd; 24 std::unordered_map<int, Client_p> clients_by_fd;
25 std::mutex clients_by_fd_mutex; 25 std::mutex clients_by_fd_mutex;
26 std::shared_ptr<SubscriptionStore> subscriptionStore; 26 std::shared_ptr<SubscriptionStore> subscriptionStore;
27 - std::unordered_set<Client_p> readyForDequeueing;  
28 - std::mutex readForDequeuingMutex;  
29 27
30 public: 28 public:
31 bool running = true; 29 bool running = true;
32 std::thread thread; 30 std::thread thread;
33 int threadnr = 0; 31 int threadnr = 0;
34 int epollfd = 0; 32 int epollfd = 0;
35 - int event_fd = 0;  
36 33
37 ThreadData(int threadnr, std::shared_ptr<SubscriptionStore> &subscriptionStore); 34 ThreadData(int threadnr, std::shared_ptr<SubscriptionStore> &subscriptionStore);
38 35
@@ -43,10 +40,6 @@ public: @@ -43,10 +40,6 @@ public:
43 void removeClient(Client_p client); 40 void removeClient(Client_p client);
44 void removeClient(int fd); 41 void removeClient(int fd);
45 std::shared_ptr<SubscriptionStore> &getSubscriptionStore(); 42 std::shared_ptr<SubscriptionStore> &getSubscriptionStore();
46 - void wakeUpThread();  
47 - void addToReadyForDequeuing(Client_p &client);  
48 - std::unordered_set<Client_p> &getReadyForDequeueing() { return readyForDequeueing; }  
49 - void clearReadyForDequeueing();  
50 }; 43 };
51 44
52 #endif // THREADDATA_H 45 #endif // THREADDATA_H