Commit 497bc9eb3be87398e9f093389410696d2a7148da

Authored by Wiebe Cazemier
1 parent a0596b6e

Reduce client buffers periodically

The test cases give a bunch of warnings, but that's the COMPARE macro's
fault. Let's see what to do...
cirbuf.cpp
1 #include "cirbuf.h" 1 #include "cirbuf.h"
2 2
  3 +#include <time.h>
  4 +
3 #include <iostream> 5 #include <iostream>
4 #include <exception> 6 #include <exception>
5 #include <stdexcept> 7 #include <stdexcept>
@@ -12,7 +14,7 @@ CirBuf::CirBuf(size_t size) : @@ -12,7 +14,7 @@ CirBuf::CirBuf(size_t size) :
12 buf = (char*)malloc(size); 14 buf = (char*)malloc(size);
13 15
14 if (buf == NULL) 16 if (buf == NULL)
15 - throw std::runtime_error("Malloc error constructing client."); 17 + throw std::runtime_error("Malloc error constructing buffer.");
16 18
17 #ifndef NDEBUG 19 #ifndef NDEBUG
18 memset(buf, 0, size); 20 memset(buf, 0, size);
@@ -25,19 +27,19 @@ CirBuf::~CirBuf() @@ -25,19 +27,19 @@ CirBuf::~CirBuf()
25 free(buf); 27 free(buf);
26 } 28 }
27 29
28 -uint CirBuf::usedBytes() const 30 +uint32_t CirBuf::usedBytes() const
29 { 31 {
30 int result = (head - tail) & (size-1); 32 int result = (head - tail) & (size-1);
31 return result; 33 return result;
32 } 34 }
33 35
34 -uint CirBuf::freeSpace() const 36 +uint32_t CirBuf::freeSpace() const
35 { 37 {
36 int result = (tail - (head + 1)) & (size-1); 38 int result = (tail - (head + 1)) & (size-1);
37 return result; 39 return result;
38 } 40 }
39 41
40 -int CirBuf::maxWriteSize() const 42 +uint32_t CirBuf::maxWriteSize() const
41 { 43 {
42 int end = size - 1 - head; 44 int end = size - 1 - head;
43 int n = (end + tail) & (size-1); 45 int n = (end + tail) & (size-1);
@@ -45,7 +47,7 @@ int CirBuf::maxWriteSize() const @@ -45,7 +47,7 @@ int CirBuf::maxWriteSize() const
45 return result; 47 return result;
46 } 48 }
47 49
48 -int CirBuf::maxReadSize() const 50 +uint32_t CirBuf::maxReadSize() const
49 { 51 {
50 int end = size - tail; 52 int end = size - tail;
51 int n = (head + end) & (size-1); 53 int n = (head + end) & (size-1);
@@ -63,18 +65,18 @@ char *CirBuf::tailPtr() @@ -63,18 +65,18 @@ char *CirBuf::tailPtr()
63 return &buf[tail]; 65 return &buf[tail];
64 } 66 }
65 67
66 -void CirBuf::advanceHead(int n) 68 +void CirBuf::advanceHead(uint32_t n)
67 { 69 {
68 head = (head + n) & (size -1); 70 head = (head + n) & (size -1);
69 assert(tail != head); // Putting things in the buffer must never end on tail, because tail == head == empty. 71 assert(tail != head); // Putting things in the buffer must never end on tail, because tail == head == empty.
70 } 72 }
71 73
72 -void CirBuf::advanceTail(int n) 74 +void CirBuf::advanceTail(uint32_t n)
73 { 75 {
74 tail = (tail + n) & (size -1); 76 tail = (tail + n) & (size -1);
75 } 77 }
76 78
77 -int CirBuf::peakAhead(int offset) const 79 +char CirBuf::peakAhead(uint32_t offset) const
78 { 80 {
79 int b = buf[(tail + offset) & (size - 1)]; 81 int b = buf[(tail + offset) & (size - 1)];
80 return b; 82 return b;
@@ -99,14 +101,43 @@ void CirBuf::doubleSize() @@ -99,14 +101,43 @@ void CirBuf::doubleSize()
99 head = tail + usedBytes(); 101 head = tail + usedBytes();
100 size = newSize; 102 size = newSize;
101 103
  104 +#ifndef NDEBUG
102 std::cout << "New buf size: " << size << std::endl; 105 std::cout << "New buf size: " << size << std::endl;
  106 +#endif
103 107
104 #ifdef TESTING 108 #ifdef TESTING
105 memset(&buf[head], 5, maxWriteSize() + 2); 109 memset(&buf[head], 5, maxWriteSize() + 2);
106 #endif 110 #endif
  111 +
  112 + resizedAt = time(NULL);
107 } 113 }
108 114
109 -uint CirBuf::getSize() const 115 +uint32_t CirBuf::getSize() const
110 { 116 {
111 return size; 117 return size;
112 } 118 }
  119 +
  120 +time_t CirBuf::bufferLastResizedSecondsAgo() const
  121 +{
  122 + return time(NULL) - resizedAt;
  123 +}
  124 +
  125 +void CirBuf::resetSize(size_t newSize)
  126 +{
  127 + assert(usedBytes() == 0);
  128 + if (this->size == newSize)
  129 + return;
  130 + char *newBuf = (char*)malloc(newSize);
  131 + if (newBuf == NULL)
  132 + throw std::runtime_error("Malloc error resizing buffer.");
  133 + free(buf);
  134 + buf = newBuf;
  135 + this->size = newSize;
  136 + head = 0;
  137 + tail = 0;
  138 + resizedAt = time(NULL);
  139 +#ifndef NDEBUG
  140 + std::cout << "Reset buf size: " << size << std::endl;
  141 + memset(buf, 0, newSize);
  142 +#endif
  143 +}
cirbuf.h
@@ -3,6 +3,7 @@ @@ -3,6 +3,7 @@
3 3
4 #include <stddef.h> 4 #include <stddef.h>
5 #include <stdlib.h> 5 #include <stdlib.h>
  6 +#include <stdint.h>
6 7
7 // Optimized circular buffer, works only with sizes power of two. 8 // Optimized circular buffer, works only with sizes power of two.
8 class CirBuf 9 class CirBuf
@@ -12,25 +13,30 @@ class CirBuf @@ -12,25 +13,30 @@ class CirBuf
12 #endif 13 #endif
13 14
14 char *buf = NULL; 15 char *buf = NULL;
15 - uint head = 0;  
16 - uint tail = 0;  
17 - uint size = 0; 16 + uint32_t head = 0;
  17 + uint32_t tail = 0;
  18 + uint32_t size = 0;
  19 +
  20 + time_t resizedAt = 0;
18 public: 21 public:
19 22
20 CirBuf(size_t size); 23 CirBuf(size_t size);
21 ~CirBuf(); 24 ~CirBuf();
22 25
23 - uint usedBytes() const;  
24 - uint freeSpace() const;  
25 - int maxWriteSize() const;  
26 - int maxReadSize() const; 26 + uint32_t usedBytes() const;
  27 + uint32_t freeSpace() const;
  28 + uint32_t maxWriteSize() const;
  29 + uint32_t maxReadSize() const;
27 char *headPtr(); 30 char *headPtr();
28 char *tailPtr(); 31 char *tailPtr();
29 - void advanceHead(int n);  
30 - void advanceTail(int n);  
31 - int peakAhead(int offset) const; 32 + void advanceHead(uint32_t n);
  33 + void advanceTail(uint32_t n);
  34 + char peakAhead(uint32_t offset) const;
32 void doubleSize(); 35 void doubleSize();
33 - uint getSize() const; 36 + uint32_t getSize() const;
  37 +
  38 + time_t bufferLastResizedSecondsAgo() const;
  39 + void resetSize(size_t size);
34 }; 40 };
35 41
36 #endif // CIRBUF_H 42 #endif // CIRBUF_H
client.cpp
@@ -191,7 +191,21 @@ bool Client::writeBufIntoFd() @@ -191,7 +191,21 @@ bool Client::writeBufIntoFd()
191 } 191 }
192 } 192 }
193 193
194 - setReadyForWriting(writebuf.usedBytes() > 0); 194 + const bool bufferHasData = writebuf.usedBytes() > 0;
  195 + setReadyForWriting(bufferHasData);
  196 +
  197 + if (!bufferHasData)
  198 + {
  199 + writeBufIsZeroCount++;
  200 + bool doReset = (writeBufIsZeroCount >= 10 && writebuf.getSize() > (MAX_PACKET_SIZE / 10) && writebuf.bufferLastResizedSecondsAgo() > 30);
  201 + doReset |= (writeBufIsZeroCount >= 100 && writebuf.bufferLastResizedSecondsAgo() > 300);
  202 +
  203 + if (doReset)
  204 + {
  205 + writeBufIsZeroCount = 0;
  206 + writebuf.resetSize(CLIENT_BUFFER_SIZE);
  207 + }
  208 + }
195 209
196 return true; 210 return true;
197 } 211 }
@@ -248,9 +262,9 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_ @@ -248,9 +262,9 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_
248 { 262 {
249 // Determine the packet length by decoding the variable length 263 // Determine the packet length by decoding the variable length
250 int remaining_length_i = 1; // index of 'remaining length' field is one after start. 264 int remaining_length_i = 1; // index of 'remaining length' field is one after start.
251 - int fixed_header_length = 1; 265 + uint fixed_header_length = 1;
252 int multiplier = 1; 266 int multiplier = 1;
253 - int packet_length = 0; 267 + uint packet_length = 0;
254 unsigned char encodedByte = 0; 268 unsigned char encodedByte = 0;
255 do 269 do
256 { 270 {
@@ -287,7 +301,15 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_ @@ -287,7 +301,15 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_
287 301
288 if (readbuf.usedBytes() == 0) 302 if (readbuf.usedBytes() == 0)
289 { 303 {
290 - // TODO: reset buffer to normal size after a while of not needing it, or not needing the extra space. 304 + readBufIsZeroCount++;
  305 + bool doReset = (readBufIsZeroCount >= 10 && readbuf.getSize() > (MAX_PACKET_SIZE / 10) && readbuf.bufferLastResizedSecondsAgo() > 30);
  306 + doReset |= (readBufIsZeroCount >= 100 && readbuf.bufferLastResizedSecondsAgo() > 300);
  307 +
  308 + if (doReset)
  309 + {
  310 + readBufIsZeroCount = 0;
  311 + readbuf.resetSize(CLIENT_BUFFER_SIZE);
  312 + }
291 } 313 }
292 314
293 return true; 315 return true;
client.h
@@ -24,8 +24,10 @@ class Client @@ -24,8 +24,10 @@ class Client
24 int fd; 24 int fd;
25 25
26 CirBuf readbuf; 26 CirBuf readbuf;
  27 + uint8_t readBufIsZeroCount = 0;
27 28
28 CirBuf writebuf; 29 CirBuf writebuf;
  30 + uint8_t writeBufIsZeroCount = 0;
29 31
30 bool authenticated = false; 32 bool authenticated = false;
31 bool connectPacketSeen = false; 33 bool connectPacketSeen = false;