Commit e5746d7f030563e232c501d0e0a6836fa80fb2dc

Authored by Wiebe Cazemier
1 parent 468ca477

Use timer for resetting buffers

cirbuf.cpp
1 #include "cirbuf.h" 1 #include "cirbuf.h"
2 2
3 -#include <time.h>  
4 -  
5 #include <iostream> 3 #include <iostream>
6 #include <exception> 4 #include <exception>
7 #include <stdexcept> 5 #include <stdexcept>
@@ -140,7 +138,7 @@ void CirBuf::doubleSize(uint factor) @@ -140,7 +138,7 @@ void CirBuf::doubleSize(uint factor)
140 memset(&buf[head], 5, maxWriteSize() + 2); 138 memset(&buf[head], 5, maxWriteSize() + 2);
141 #endif 139 #endif
142 140
143 - resizedAt = time(NULL); 141 + primedForSizeReset = false;
144 } 142 }
145 143
146 uint32_t CirBuf::getSize() const 144 uint32_t CirBuf::getSize() const
@@ -148,14 +146,25 @@ uint32_t CirBuf::getSize() const @@ -148,14 +146,25 @@ uint32_t CirBuf::getSize() const
148 return size; 146 return size;
149 } 147 }
150 148
151 -time_t CirBuf::bufferLastResizedSecondsAgo() const 149 +void CirBuf::resetSizeIfEligable(size_t size)
152 { 150 {
153 - return time(NULL) - resizedAt; 151 + // Ensuring the reset will only happen the second time the timer event hits.
  152 + if (!primedForSizeReset)
  153 + {
  154 + primedForSizeReset = true;
  155 + return;
  156 + }
  157 +
  158 + if (usedBytes() > 0)
  159 + return;
  160 +
  161 + resetSize(size);
154 } 162 }
155 163
156 void CirBuf::resetSize(size_t newSize) 164 void CirBuf::resetSize(size_t newSize)
157 { 165 {
158 assert(usedBytes() == 0); 166 assert(usedBytes() == 0);
  167 + primedForSizeReset = false;
159 if (this->size == newSize) 168 if (this->size == newSize)
160 return; 169 return;
161 char *newBuf = (char*)malloc(newSize); 170 char *newBuf = (char*)malloc(newSize);
@@ -166,7 +175,6 @@ void CirBuf::resetSize(size_t newSize) @@ -166,7 +175,6 @@ void CirBuf::resetSize(size_t newSize)
166 this->size = newSize; 175 this->size = newSize;
167 head = 0; 176 head = 0;
168 tail = 0; 177 tail = 0;
169 - resizedAt = time(NULL);  
170 #ifndef NDEBUG 178 #ifndef NDEBUG
171 Logger *logger = Logger::getInstance(); 179 Logger *logger = Logger::getInstance();
172 logger->logf(LOG_DEBUG, "Reset buf size: %d", size); 180 logger->logf(LOG_DEBUG, "Reset buf size: %d", size);
cirbuf.h
@@ -18,7 +18,7 @@ class CirBuf @@ -18,7 +18,7 @@ class CirBuf
18 uint32_t tail = 0; 18 uint32_t tail = 0;
19 uint32_t size = 0; 19 uint32_t size = 0;
20 20
21 - time_t resizedAt = 0; 21 + bool primedForSizeReset = false;
22 public: 22 public:
23 23
24 CirBuf(size_t size); 24 CirBuf(size_t size);
@@ -37,7 +37,7 @@ public: @@ -37,7 +37,7 @@ public:
37 void doubleSize(uint factor = 2); 37 void doubleSize(uint factor = 2);
38 uint32_t getSize() const; 38 uint32_t getSize() const;
39 39
40 - time_t bufferLastResizedSecondsAgo() const; 40 + void resetSizeIfEligable(size_t size);
41 void resetSize(size_t size); 41 void resetSize(size_t size);
42 void reset(); 42 void reset();
43 43
client.cpp
@@ -228,19 +228,6 @@ bool Client::writeBufIntoFd() @@ -228,19 +228,6 @@ bool Client::writeBufIntoFd()
228 const bool bufferHasData = writebuf.usedBytes() > 0; 228 const bool bufferHasData = writebuf.usedBytes() > 0;
229 setReadyForWriting(bufferHasData || error == IoWrapResult::Wouldblock); 229 setReadyForWriting(bufferHasData || error == IoWrapResult::Wouldblock);
230 230
231 - if (!bufferHasData)  
232 - {  
233 - writeBufIsZeroCount++;  
234 - bool doReset = (writeBufIsZeroCount >= 10 && writebuf.getSize() > (maxPacketSize / 10) && writebuf.bufferLastResizedSecondsAgo() > 30);  
235 - doReset |= (writeBufIsZeroCount >= 100 && writebuf.bufferLastResizedSecondsAgo() > 300);  
236 -  
237 - if (doReset)  
238 - {  
239 - writeBufIsZeroCount = 0;  
240 - writebuf.resetSize(initialBufferSize);  
241 - }  
242 - }  
243 -  
244 return true; 231 return true;
245 } 232 }
246 233
@@ -268,6 +255,12 @@ std::string Client::getKeepAliveInfoString() const @@ -268,6 +255,12 @@ std::string Client::getKeepAliveInfoString() const
268 return s; 255 return s;
269 } 256 }
270 257
  258 +void Client::resetBuffersIfEligible()
  259 +{
  260 + readbuf.resetSizeIfEligable(initialBufferSize);
  261 + writebuf.resetSizeIfEligable(initialBufferSize);
  262 +}
  263 +
271 // Call this from a place you know the writeBufMutex is locked, or we're still only doing SSL accept. 264 // Call this from a place you know the writeBufMutex is locked, or we're still only doing SSL accept.
272 void Client::setReadyForWriting(bool val) 265 void Client::setReadyForWriting(bool val)
273 { 266 {
@@ -353,19 +346,6 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_ @@ -353,19 +346,6 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_
353 346
354 setReadyForReading(readbuf.freeSpace() > 0); 347 setReadyForReading(readbuf.freeSpace() > 0);
355 348
356 - if (readbuf.usedBytes() == 0)  
357 - {  
358 - readBufIsZeroCount++;  
359 - bool doReset = (readBufIsZeroCount >= 10 && readbuf.getSize() > (maxPacketSize / 10) && readbuf.bufferLastResizedSecondsAgo() > 30);  
360 - doReset |= (readBufIsZeroCount >= 100 && readbuf.bufferLastResizedSecondsAgo() > 300);  
361 -  
362 - if (doReset)  
363 - {  
364 - readBufIsZeroCount = 0;  
365 - readbuf.resetSize(initialBufferSize);  
366 - }  
367 - }  
368 -  
369 return true; 349 return true;
370 } 350 }
371 351
client.h
@@ -38,10 +38,7 @@ class Client @@ -38,10 +38,7 @@ class Client
38 IoWrapper ioWrapper; 38 IoWrapper ioWrapper;
39 39
40 CirBuf readbuf; 40 CirBuf readbuf;
41 - uint8_t readBufIsZeroCount = 0;  
42 -  
43 CirBuf writebuf; 41 CirBuf writebuf;
44 - uint8_t writeBufIsZeroCount = 0;  
45 42
46 bool authenticated = false; 43 bool authenticated = false;
47 bool connectPacketSeen = false; 44 bool connectPacketSeen = false;
@@ -114,6 +111,7 @@ public: @@ -114,6 +111,7 @@ public:
114 std::string repr(); 111 std::string repr();
115 bool keepAliveExpired(); 112 bool keepAliveExpired();
116 std::string getKeepAliveInfoString() const; 113 std::string getKeepAliveInfoString() const;
  114 + void resetBuffersIfEligible();
117 115
118 }; 116 };
119 117
threaddata.cpp
@@ -132,7 +132,10 @@ void ThreadData::doKeepAliveCheck() @@ -132,7 +132,10 @@ void ThreadData::doKeepAliveCheck()
132 it = clients_by_fd.erase(it); 132 it = clients_by_fd.erase(it);
133 } 133 }
134 else 134 else
  135 + {
  136 + client->resetBuffersIfEligible();
135 it++; 137 it++;
  138 + }
136 } 139 }
137 } 140 }
138 catch (std::exception &ex) 141 catch (std::exception &ex)