Commit a47a83064a89ea585874ec8404b14228123aa461

Authored by Wiebe Cazemier
1 parent b55a9894

Handle packets immediately

There was not really a benefit to the old method, and this prevents
packet pile up and keeps the code cleaner.
Showing 2 changed files with 24 additions and 15 deletions
threadloop.cpp
@@ -102,6 +102,7 @@ void do_thread_work(ThreadData *threadData) @@ -102,6 +102,7 @@ void do_thread_work(ThreadData *threadData)
102 } 102 }
103 if ((cur_ev.events & EPOLLIN) || ((cur_ev.events & EPOLLOUT) && client->getSslReadWantsWrite())) 103 if ((cur_ev.events & EPOLLIN) || ((cur_ev.events & EPOLLOUT) && client->getSslReadWantsWrite()))
104 { 104 {
  105 + VectorClearGuard vectorClear(packetQueueIn);
105 bool readSuccess = client->readFdIntoBuffer(); 106 bool readSuccess = client->readFdIntoBuffer();
106 client->bufferToMqttPackets(packetQueueIn, client); 107 client->bufferToMqttPackets(packetQueueIn, client);
107 108
@@ -111,6 +112,11 @@ void do_thread_work(ThreadData *threadData) @@ -111,6 +112,11 @@ void do_thread_work(ThreadData *threadData)
111 threadData->removeClient(client); 112 threadData->removeClient(client);
112 continue; 113 continue;
113 } 114 }
  115 +
  116 + for (MqttPacket &packet : packetQueueIn)
  117 + {
  118 + packet.handle();
  119 + }
114 } 120 }
115 if ((cur_ev.events & EPOLLOUT) || ((cur_ev.events & EPOLLIN) && client->getSslWriteWantsRead())) 121 if ((cur_ev.events & EPOLLOUT) || ((cur_ev.events & EPOLLIN) && client->getSslWriteWantsRead()))
116 { 122 {
@@ -136,21 +142,6 @@ void do_thread_work(ThreadData *threadData) @@ -136,21 +142,6 @@ void do_thread_work(ThreadData *threadData)
136 } 142 }
137 } 143 }
138 } 144 }
139 -  
140 - for (MqttPacket &packet : packetQueueIn)  
141 - {  
142 - try  
143 - {  
144 - packet.handle();  
145 - }  
146 - catch (std::exception &ex)  
147 - {  
148 - packet.getSender()->setDisconnectReason(ex.what());  
149 - logger->logf(LOG_ERR, "MqttPacket handling error: %s. Removing client.", ex.what());  
150 - threadData->removeClient(packet.getSender());  
151 - }  
152 - }  
153 - packetQueueIn.clear();  
154 } 145 }
155 146
156 try 147 try
threadloop.h
@@ -23,6 +23,24 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>. @@ -23,6 +23,24 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>.
23 23
24 #define MAX_EVENTS 65536 24 #define MAX_EVENTS 65536
25 25
  26 +#include "forward_declarations.h"
  27 +
  28 +class VectorClearGuard
  29 +{
  30 + std::vector<MqttPacket> &v;
  31 +public:
  32 + VectorClearGuard(std::vector<MqttPacket> &v) :
  33 + v(v)
  34 + {
  35 +
  36 + }
  37 +
  38 + ~VectorClearGuard()
  39 + {
  40 + v.clear();
  41 + }
  42 +};
  43 +
26 void do_thread_work(ThreadData *threadData); 44 void do_thread_work(ThreadData *threadData);
27 45
28 46