diff --git a/cirbuf.cpp b/cirbuf.cpp index 74d0cdf..658c690 100644 --- a/cirbuf.cpp +++ b/cirbuf.cpp @@ -1,5 +1,7 @@ #include "cirbuf.h" +#include + #include #include #include @@ -12,7 +14,7 @@ CirBuf::CirBuf(size_t size) : buf = (char*)malloc(size); if (buf == NULL) - throw std::runtime_error("Malloc error constructing client."); + throw std::runtime_error("Malloc error constructing buffer."); #ifndef NDEBUG memset(buf, 0, size); @@ -25,19 +27,19 @@ CirBuf::~CirBuf() free(buf); } -uint CirBuf::usedBytes() const +uint32_t CirBuf::usedBytes() const { int result = (head - tail) & (size-1); return result; } -uint CirBuf::freeSpace() const +uint32_t CirBuf::freeSpace() const { int result = (tail - (head + 1)) & (size-1); return result; } -int CirBuf::maxWriteSize() const +uint32_t CirBuf::maxWriteSize() const { int end = size - 1 - head; int n = (end + tail) & (size-1); @@ -45,7 +47,7 @@ int CirBuf::maxWriteSize() const return result; } -int CirBuf::maxReadSize() const +uint32_t CirBuf::maxReadSize() const { int end = size - tail; int n = (head + end) & (size-1); @@ -63,18 +65,18 @@ char *CirBuf::tailPtr() return &buf[tail]; } -void CirBuf::advanceHead(int n) +void CirBuf::advanceHead(uint32_t n) { head = (head + n) & (size -1); assert(tail != head); // Putting things in the buffer must never end on tail, because tail == head == empty. } -void CirBuf::advanceTail(int n) +void CirBuf::advanceTail(uint32_t n) { tail = (tail + n) & (size -1); } -int CirBuf::peakAhead(int offset) const +char CirBuf::peakAhead(uint32_t offset) const { int b = buf[(tail + offset) & (size - 1)]; return b; @@ -99,14 +101,43 @@ void CirBuf::doubleSize() head = tail + usedBytes(); size = newSize; +#ifndef NDEBUG std::cout << "New buf size: " << size << std::endl; +#endif #ifdef TESTING memset(&buf[head], 5, maxWriteSize() + 2); #endif + + resizedAt = time(NULL); } -uint CirBuf::getSize() const +uint32_t CirBuf::getSize() const { return size; } + +time_t CirBuf::bufferLastResizedSecondsAgo() const +{ + return time(NULL) - resizedAt; +} + +void CirBuf::resetSize(size_t newSize) +{ + assert(usedBytes() == 0); + if (this->size == newSize) + return; + char *newBuf = (char*)malloc(newSize); + if (newBuf == NULL) + throw std::runtime_error("Malloc error resizing buffer."); + free(buf); + buf = newBuf; + this->size = newSize; + head = 0; + tail = 0; + resizedAt = time(NULL); +#ifndef NDEBUG + std::cout << "Reset buf size: " << size << std::endl; + memset(buf, 0, newSize); +#endif +} diff --git a/cirbuf.h b/cirbuf.h index 4b9dc89..b01ee76 100644 --- a/cirbuf.h +++ b/cirbuf.h @@ -3,6 +3,7 @@ #include #include +#include // Optimized circular buffer, works only with sizes power of two. class CirBuf @@ -12,25 +13,30 @@ class CirBuf #endif char *buf = NULL; - uint head = 0; - uint tail = 0; - uint size = 0; + uint32_t head = 0; + uint32_t tail = 0; + uint32_t size = 0; + + time_t resizedAt = 0; public: CirBuf(size_t size); ~CirBuf(); - uint usedBytes() const; - uint freeSpace() const; - int maxWriteSize() const; - int maxReadSize() const; + uint32_t usedBytes() const; + uint32_t freeSpace() const; + uint32_t maxWriteSize() const; + uint32_t maxReadSize() const; char *headPtr(); char *tailPtr(); - void advanceHead(int n); - void advanceTail(int n); - int peakAhead(int offset) const; + void advanceHead(uint32_t n); + void advanceTail(uint32_t n); + char peakAhead(uint32_t offset) const; void doubleSize(); - uint getSize() const; + uint32_t getSize() const; + + time_t bufferLastResizedSecondsAgo() const; + void resetSize(size_t size); }; #endif // CIRBUF_H diff --git a/client.cpp b/client.cpp index 05f225c..306bbaa 100644 --- a/client.cpp +++ b/client.cpp @@ -191,7 +191,21 @@ bool Client::writeBufIntoFd() } } - setReadyForWriting(writebuf.usedBytes() > 0); + const bool bufferHasData = writebuf.usedBytes() > 0; + setReadyForWriting(bufferHasData); + + if (!bufferHasData) + { + writeBufIsZeroCount++; + bool doReset = (writeBufIsZeroCount >= 10 && writebuf.getSize() > (MAX_PACKET_SIZE / 10) && writebuf.bufferLastResizedSecondsAgo() > 30); + doReset |= (writeBufIsZeroCount >= 100 && writebuf.bufferLastResizedSecondsAgo() > 300); + + if (doReset) + { + writeBufIsZeroCount = 0; + writebuf.resetSize(CLIENT_BUFFER_SIZE); + } + } return true; } @@ -248,9 +262,9 @@ bool Client::bufferToMqttPackets(std::vector &packetQueueIn, Client_ { // Determine the packet length by decoding the variable length int remaining_length_i = 1; // index of 'remaining length' field is one after start. - int fixed_header_length = 1; + uint fixed_header_length = 1; int multiplier = 1; - int packet_length = 0; + uint packet_length = 0; unsigned char encodedByte = 0; do { @@ -287,7 +301,15 @@ bool Client::bufferToMqttPackets(std::vector &packetQueueIn, Client_ if (readbuf.usedBytes() == 0) { - // TODO: reset buffer to normal size after a while of not needing it, or not needing the extra space. + readBufIsZeroCount++; + bool doReset = (readBufIsZeroCount >= 10 && readbuf.getSize() > (MAX_PACKET_SIZE / 10) && readbuf.bufferLastResizedSecondsAgo() > 30); + doReset |= (readBufIsZeroCount >= 100 && readbuf.bufferLastResizedSecondsAgo() > 300); + + if (doReset) + { + readBufIsZeroCount = 0; + readbuf.resetSize(CLIENT_BUFFER_SIZE); + } } return true; diff --git a/client.h b/client.h index bbbeec4..ce72ea6 100644 --- a/client.h +++ b/client.h @@ -24,8 +24,10 @@ class Client int fd; CirBuf readbuf; + uint8_t readBufIsZeroCount = 0; CirBuf writebuf; + uint8_t writeBufIsZeroCount = 0; bool authenticated = false; bool connectPacketSeen = false;