diff --git a/cirbuf.cpp b/cirbuf.cpp index 0cf7593..1c0f958 100644 --- a/cirbuf.cpp +++ b/cirbuf.cpp @@ -1,7 +1,5 @@ #include "cirbuf.h" -#include - #include #include #include @@ -140,7 +138,7 @@ void CirBuf::doubleSize(uint factor) memset(&buf[head], 5, maxWriteSize() + 2); #endif - resizedAt = time(NULL); + primedForSizeReset = false; } uint32_t CirBuf::getSize() const @@ -148,14 +146,25 @@ uint32_t CirBuf::getSize() const return size; } -time_t CirBuf::bufferLastResizedSecondsAgo() const +void CirBuf::resetSizeIfEligable(size_t size) { - return time(NULL) - resizedAt; + // Ensuring the reset will only happen the second time the timer event hits. + if (!primedForSizeReset) + { + primedForSizeReset = true; + return; + } + + if (usedBytes() > 0) + return; + + resetSize(size); } void CirBuf::resetSize(size_t newSize) { assert(usedBytes() == 0); + primedForSizeReset = false; if (this->size == newSize) return; char *newBuf = (char*)malloc(newSize); @@ -166,7 +175,6 @@ void CirBuf::resetSize(size_t newSize) this->size = newSize; head = 0; tail = 0; - resizedAt = time(NULL); #ifndef NDEBUG Logger *logger = Logger::getInstance(); logger->logf(LOG_DEBUG, "Reset buf size: %d", size); diff --git a/cirbuf.h b/cirbuf.h index bd77060..e4c5d44 100644 --- a/cirbuf.h +++ b/cirbuf.h @@ -18,7 +18,7 @@ class CirBuf uint32_t tail = 0; uint32_t size = 0; - time_t resizedAt = 0; + bool primedForSizeReset = false; public: CirBuf(size_t size); @@ -37,7 +37,7 @@ public: void doubleSize(uint factor = 2); uint32_t getSize() const; - time_t bufferLastResizedSecondsAgo() const; + void resetSizeIfEligable(size_t size); void resetSize(size_t size); void reset(); diff --git a/client.cpp b/client.cpp index e7fb8d9..49718a2 100644 --- a/client.cpp +++ b/client.cpp @@ -228,19 +228,6 @@ bool Client::writeBufIntoFd() const bool bufferHasData = writebuf.usedBytes() > 0; setReadyForWriting(bufferHasData || error == IoWrapResult::Wouldblock); - if (!bufferHasData) - { - writeBufIsZeroCount++; - bool doReset = (writeBufIsZeroCount >= 10 && writebuf.getSize() > (maxPacketSize / 10) && writebuf.bufferLastResizedSecondsAgo() > 30); - doReset |= (writeBufIsZeroCount >= 100 && writebuf.bufferLastResizedSecondsAgo() > 300); - - if (doReset) - { - writeBufIsZeroCount = 0; - writebuf.resetSize(initialBufferSize); - } - } - return true; } @@ -268,6 +255,12 @@ std::string Client::getKeepAliveInfoString() const return s; } +void Client::resetBuffersIfEligible() +{ + readbuf.resetSizeIfEligable(initialBufferSize); + writebuf.resetSizeIfEligable(initialBufferSize); +} + // Call this from a place you know the writeBufMutex is locked, or we're still only doing SSL accept. void Client::setReadyForWriting(bool val) { @@ -353,19 +346,6 @@ bool Client::bufferToMqttPackets(std::vector &packetQueueIn, Client_ setReadyForReading(readbuf.freeSpace() > 0); - if (readbuf.usedBytes() == 0) - { - readBufIsZeroCount++; - bool doReset = (readBufIsZeroCount >= 10 && readbuf.getSize() > (maxPacketSize / 10) && readbuf.bufferLastResizedSecondsAgo() > 30); - doReset |= (readBufIsZeroCount >= 100 && readbuf.bufferLastResizedSecondsAgo() > 300); - - if (doReset) - { - readBufIsZeroCount = 0; - readbuf.resetSize(initialBufferSize); - } - } - return true; } diff --git a/client.h b/client.h index 27da06e..4e99b52 100644 --- a/client.h +++ b/client.h @@ -38,10 +38,7 @@ class Client IoWrapper ioWrapper; CirBuf readbuf; - uint8_t readBufIsZeroCount = 0; - CirBuf writebuf; - uint8_t writeBufIsZeroCount = 0; bool authenticated = false; bool connectPacketSeen = false; @@ -114,6 +111,7 @@ public: std::string repr(); bool keepAliveExpired(); std::string getKeepAliveInfoString() const; + void resetBuffersIfEligible(); }; diff --git a/threaddata.cpp b/threaddata.cpp index e2cd748..1c5b2dd 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -132,7 +132,10 @@ void ThreadData::doKeepAliveCheck() it = clients_by_fd.erase(it); } else + { + client->resetBuffersIfEligible(); it++; + } } } catch (std::exception &ex)