Commit ae9b206037466b4e1f5a962f27c133141bec8b60

Authored by Wiebe Cazemier
1 parent 7db8bdff

stowing circular buffer work

Showing 2 changed files with 83 additions and 56 deletions
client.cpp
@@ -54,23 +54,12 @@ bool Client::readFdIntoBuffer() @@ -54,23 +54,12 @@ bool Client::readFdIntoBuffer()
54 if (disconnecting) 54 if (disconnecting)
55 return false; 55 return false;
56 56
57 - if (wi > CLIENT_MAX_BUFFER_SIZE)  
58 - {  
59 - setReadyForReading(false);  
60 - return true;  
61 - }  
62 -  
63 - int n;  
64 - while ((n = read(fd, &readbuf[wi], getReadBufMaxWriteSize())) != 0) 57 + int n = 0;
  58 + while (getReadBufFreeSpace() > 0 && (n = read(fd, &readbuf[wi], getReadBufMaxWriteSize())) != 0)
65 { 59 {
66 if (n > 0) 60 if (n > 0)
67 { 61 {
68 - wi += n;  
69 -  
70 - if (getReadBufMaxWriteSize() == 0)  
71 - {  
72 - growReadBuffer();  
73 - } 62 + wi = (wi + n) % readBufsize;
74 } 63 }
75 64
76 if (n < 0) 65 if (n < 0)
@@ -82,11 +71,24 @@ bool Client::readFdIntoBuffer() @@ -82,11 +71,24 @@ bool Client::readFdIntoBuffer()
82 else 71 else
83 check<std::runtime_error>(n); 72 check<std::runtime_error>(n);
84 } 73 }
  74 +
  75 + // Make sure we either always have enough space for a next call of this method, or stop reading the fd.
  76 + if (getReadBufFreeSpace() == 0)
  77 + {
  78 + if (readBufsize * 2 < CLIENT_MAX_BUFFER_SIZE)
  79 + {
  80 + growReadBuffer();
  81 + }
  82 + else
  83 + {
  84 + setReadyForReading(false);
  85 + break;
  86 + }
  87 + }
85 } 88 }
86 89
87 if (n == 0) // client disconnected. 90 if (n == 0) // client disconnected.
88 { 91 {
89 - //std::cerr << "normal disconnect" << std::endl;  
90 return false; 92 return false;
91 } 93 }
92 94
@@ -197,6 +199,36 @@ std::string Client::repr() @@ -197,6 +199,36 @@ std::string Client::repr()
197 return a.str(); 199 return a.str();
198 } 200 }
199 201
  202 +void Client::growReadBuffer() // TODO: refactor
  203 +{
  204 + const size_t newBufSize = readBufsize * 2;
  205 + char *readbuf = (char*)realloc(this->readbuf, newBufSize);
  206 + if (readbuf == NULL)
  207 + throw std::runtime_error("Memory allocation failure in growReadBuffer()");
  208 + this->readbuf = readbuf;
  209 + readBufsize = newBufSize;
  210 +
  211 + std::cout << "New read buf size: " << readBufsize << std::endl;
  212 +}
  213 +
  214 +void Client::growWriteBuffer(size_t add_size)
  215 +{
  216 + if (add_size == 0)
  217 + return;
  218 +
  219 + const size_t grow_by = std::max<size_t>(add_size, writeBufsize*2);
  220 + const size_t newBufSize = writeBufsize + grow_by;
  221 + char *writebuf = (char*)realloc(this->writebuf, newBufSize);
  222 +
  223 + if (writebuf == NULL)
  224 + throw std::runtime_error("Memory allocation failure in growWriteBuffer()");
  225 +
  226 + this->writebuf = writebuf;
  227 + writeBufsize = newBufSize;
  228 +
  229 + std::cout << "New write buf size: " << writeBufsize << std::endl;
  230 +}
  231 +
200 void Client::setReadyForWriting(bool val) 232 void Client::setReadyForWriting(bool val)
201 { 233 {
202 if (disconnecting) 234 if (disconnecting)
@@ -250,7 +282,7 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_ @@ -250,7 +282,7 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_
250 fixed_header_length++; 282 fixed_header_length++;
251 if (remaining_length_i >= wi) 283 if (remaining_length_i >= wi)
252 return false; 284 return false;
253 - encodedByte = readbuf[remaining_length_i++]; 285 + encodedByte = readbuf[remaining_length_i++ % readBufsize];
254 packet_length += (encodedByte & 127) * multiplier; 286 packet_length += (encodedByte & 127) * multiplier;
255 multiplier *= 128; 287 multiplier *= 128;
256 if (multiplier > 128*128*128) 288 if (multiplier > 128*128*128)
@@ -266,27 +298,28 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_ @@ -266,27 +298,28 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_
266 298
267 if (packet_length <= getReadBufBytesUsed()) 299 if (packet_length <= getReadBufBytesUsed())
268 { 300 {
  301 + // TODO: deal with circularness here, or in the packet?
269 MqttPacket packet(&readbuf[ri], packet_length, fixed_header_length, sender); 302 MqttPacket packet(&readbuf[ri], packet_length, fixed_header_length, sender);
270 packetQueueIn.push_back(std::move(packet)); 303 packetQueueIn.push_back(std::move(packet));
271 304
272 - ri += packet_length;  
273 - assert(ri <= wi); 305 + ri = (ri + packet_length) % readBufsize;
274 } 306 }
275 else 307 else
276 break; 308 break;
277 309
278 } 310 }
279 311
280 - if (ri == wi) 312 + if (getReadBufMaxWriteSize() > 0)
281 { 313 {
282 - ri = 0;  
283 - wi = 0;  
284 setReadyForReading(true); 314 setReadyForReading(true);
285 } 315 }
286 316
287 - return true; 317 + if (getReadBufBytesUsed() == 0)
  318 + {
  319 + // TODO: reset buffer to normal size after a while of not needing it, or not needing the extra space.
  320 + }
288 321
289 - // TODO: reset buffer to normal size after a while of not needing it, or not needing the extra space. 322 + return true;
290 } 323 }
291 324
292 void Client::setClientProperties(const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive) 325 void Client::setClientProperties(const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive)
client.h
@@ -6,6 +6,7 @@ @@ -6,6 +6,7 @@
6 #include <vector> 6 #include <vector>
7 #include <mutex> 7 #include <mutex>
8 #include <iostream> 8 #include <iostream>
  9 +#include
9 10
10 #include "forward_declarations.h" 11 #include "forward_declarations.h"
11 12
@@ -24,8 +25,8 @@ class Client @@ -24,8 +25,8 @@ class Client
24 25
25 char *readbuf = NULL; // With many clients, it may not be smart to keep a (big) buffer around. 26 char *readbuf = NULL; // With many clients, it may not be smart to keep a (big) buffer around.
26 size_t readBufsize = CLIENT_BUFFER_SIZE; 27 size_t readBufsize = CLIENT_BUFFER_SIZE;
27 - int wi = 0;  
28 - int ri = 0; 28 + uint wi = 0;
  29 + uint ri = 0;
29 30
30 char *writebuf = NULL; // With many clients, it may not be smart to keep a (big) buffer around. 31 char *writebuf = NULL; // With many clients, it may not be smart to keep a (big) buffer around.
31 size_t writeBufsize = CLIENT_BUFFER_SIZE; 32 size_t writeBufsize = CLIENT_BUFFER_SIZE;
@@ -51,30 +52,38 @@ class Client @@ -51,30 +52,38 @@ class Client
51 ThreadData_p threadData; 52 ThreadData_p threadData;
52 std::mutex writeBufMutex; 53 std::mutex writeBufMutex;
53 54
54 - // Note: this is not the inverse of free space, because there can be non-used lead-in in the buffer!  
55 - size_t getReadBufBytesUsed() 55 + inline size_t getReadBufBytesUsed() const
56 { 56 {
57 - return wi - ri; 57 + size_t result;
  58 + if (wi >= ri)
  59 + result = wi - ri;
  60 + else
  61 + result = (readBufsize + wi) - ri;
  62 + return result;
58 }; 63 };
59 64
60 - size_t getReadBufMaxWriteSize() 65 + inline size_t getReadBufFreeSpace() const
61 { 66 {
62 - size_t available = readBufsize - wi;  
63 - return available; 67 + size_t result = readBufsize - getReadBufBytesUsed() - 1;
  68 + return result;
64 } 69 }
65 70
66 - void growReadBuffer() 71 + inline size_t getReadBufMaxWriteSize() const
67 { 72 {
68 - const size_t newBufSize = readBufsize * 2;  
69 - char *readbuf = (char*)realloc(this->readbuf, newBufSize);  
70 - if (readbuf == NULL)  
71 - throw std::runtime_error("Memory allocation failure in growReadBuffer()");  
72 - this->readbuf = readbuf;  
73 - readBufsize = newBufSize;  
74 -  
75 - //std::cout << "New read buf size: " << readBufsize << std::endl; 73 + const size_t available_space = getReadBufFreeSpace();
  74 +
  75 + size_t result = 0;
  76 + if (wi >= ri)
  77 + result = available_space - wi;
  78 + else
  79 + result = ri - wi - 1;
  80 +
  81 + return result;
76 } 82 }
77 83
  84 + void growReadBuffer();
  85 +
  86 +
78 size_t getWriteBufMaxWriteSize() 87 size_t getWriteBufMaxWriteSize()
79 { 88 {
80 size_t available = writeBufsize - wwi; 89 size_t available = writeBufsize - wwi;
@@ -87,23 +96,8 @@ class Client @@ -87,23 +96,8 @@ class Client
87 return wwi - wri; 96 return wwi - wri;
88 }; 97 };
89 98
90 - void growWriteBuffer(size_t add_size)  
91 - {  
92 - if (add_size == 0)  
93 - return;  
94 -  
95 - const size_t grow_by = std::max<size_t>(add_size, writeBufsize*2);  
96 - const size_t newBufSize = writeBufsize + grow_by;  
97 - char *writebuf = (char*)realloc(this->writebuf, newBufSize);  
98 -  
99 - if (writebuf == NULL)  
100 - throw std::runtime_error("Memory allocation failure in growWriteBuffer()"); 99 + void growWriteBuffer(size_t add_size);
101 100
102 - this->writebuf = writebuf;  
103 - writeBufsize = newBufSize;  
104 -  
105 - //std::cout << "New write buf size: " << writeBufsize << std::endl;  
106 - }  
107 101
108 void setReadyForWriting(bool val); 102 void setReadyForWriting(bool val);
109 void setReadyForReading(bool val); 103 void setReadyForReading(bool val);