diff --git a/client.cpp b/client.cpp index 9217dc6..8550518 100644 --- a/client.cpp +++ b/client.cpp @@ -54,23 +54,12 @@ bool Client::readFdIntoBuffer() if (disconnecting) return false; - if (wi > CLIENT_MAX_BUFFER_SIZE) - { - setReadyForReading(false); - return true; - } - - int n; - while ((n = read(fd, &readbuf[wi], getReadBufMaxWriteSize())) != 0) + int n = 0; + while (getReadBufFreeSpace() > 0 && (n = read(fd, &readbuf[wi], getReadBufMaxWriteSize())) != 0) { if (n > 0) { - wi += n; - - if (getReadBufMaxWriteSize() == 0) - { - growReadBuffer(); - } + wi = (wi + n) % readBufsize; } if (n < 0) @@ -82,11 +71,24 @@ bool Client::readFdIntoBuffer() else check(n); } + + // Make sure we either always have enough space for a next call of this method, or stop reading the fd. + if (getReadBufFreeSpace() == 0) + { + if (readBufsize * 2 < CLIENT_MAX_BUFFER_SIZE) + { + growReadBuffer(); + } + else + { + setReadyForReading(false); + break; + } + } } if (n == 0) // client disconnected. { - //std::cerr << "normal disconnect" << std::endl; return false; } @@ -197,6 +199,36 @@ std::string Client::repr() return a.str(); } +void Client::growReadBuffer() // TODO: refactor +{ + const size_t newBufSize = readBufsize * 2; + char *readbuf = (char*)realloc(this->readbuf, newBufSize); + if (readbuf == NULL) + throw std::runtime_error("Memory allocation failure in growReadBuffer()"); + this->readbuf = readbuf; + readBufsize = newBufSize; + + std::cout << "New read buf size: " << readBufsize << std::endl; +} + +void Client::growWriteBuffer(size_t add_size) +{ + if (add_size == 0) + return; + + const size_t grow_by = std::max(add_size, writeBufsize*2); + const size_t newBufSize = writeBufsize + grow_by; + char *writebuf = (char*)realloc(this->writebuf, newBufSize); + + if (writebuf == NULL) + throw std::runtime_error("Memory allocation failure in growWriteBuffer()"); + + this->writebuf = writebuf; + writeBufsize = newBufSize; + + std::cout << "New write buf size: " << writeBufsize << std::endl; +} + void Client::setReadyForWriting(bool val) { if (disconnecting) @@ -250,7 +282,7 @@ bool Client::bufferToMqttPackets(std::vector &packetQueueIn, Client_ fixed_header_length++; if (remaining_length_i >= wi) return false; - encodedByte = readbuf[remaining_length_i++]; + encodedByte = readbuf[remaining_length_i++ % readBufsize]; packet_length += (encodedByte & 127) * multiplier; multiplier *= 128; if (multiplier > 128*128*128) @@ -266,27 +298,28 @@ bool Client::bufferToMqttPackets(std::vector &packetQueueIn, Client_ if (packet_length <= getReadBufBytesUsed()) { + // TODO: deal with circularness here, or in the packet? MqttPacket packet(&readbuf[ri], packet_length, fixed_header_length, sender); packetQueueIn.push_back(std::move(packet)); - ri += packet_length; - assert(ri <= wi); + ri = (ri + packet_length) % readBufsize; } else break; } - if (ri == wi) + if (getReadBufMaxWriteSize() > 0) { - ri = 0; - wi = 0; setReadyForReading(true); } - return true; + if (getReadBufBytesUsed() == 0) + { + // TODO: reset buffer to normal size after a while of not needing it, or not needing the extra space. + } - // TODO: reset buffer to normal size after a while of not needing it, or not needing the extra space. + return true; } void Client::setClientProperties(const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive) diff --git a/client.h b/client.h index ff090e7..813c8a1 100644 --- a/client.h +++ b/client.h @@ -6,6 +6,7 @@ #include #include #include +#include #include "forward_declarations.h" @@ -24,8 +25,8 @@ class Client char *readbuf = NULL; // With many clients, it may not be smart to keep a (big) buffer around. size_t readBufsize = CLIENT_BUFFER_SIZE; - int wi = 0; - int ri = 0; + uint wi = 0; + uint ri = 0; char *writebuf = NULL; // With many clients, it may not be smart to keep a (big) buffer around. size_t writeBufsize = CLIENT_BUFFER_SIZE; @@ -51,30 +52,38 @@ class Client ThreadData_p threadData; std::mutex writeBufMutex; - // Note: this is not the inverse of free space, because there can be non-used lead-in in the buffer! - size_t getReadBufBytesUsed() + inline size_t getReadBufBytesUsed() const { - return wi - ri; + size_t result; + if (wi >= ri) + result = wi - ri; + else + result = (readBufsize + wi) - ri; + return result; }; - size_t getReadBufMaxWriteSize() + inline size_t getReadBufFreeSpace() const { - size_t available = readBufsize - wi; - return available; + size_t result = readBufsize - getReadBufBytesUsed() - 1; + return result; } - void growReadBuffer() + inline size_t getReadBufMaxWriteSize() const { - const size_t newBufSize = readBufsize * 2; - char *readbuf = (char*)realloc(this->readbuf, newBufSize); - if (readbuf == NULL) - throw std::runtime_error("Memory allocation failure in growReadBuffer()"); - this->readbuf = readbuf; - readBufsize = newBufSize; - - //std::cout << "New read buf size: " << readBufsize << std::endl; + const size_t available_space = getReadBufFreeSpace(); + + size_t result = 0; + if (wi >= ri) + result = available_space - wi; + else + result = ri - wi - 1; + + return result; } + void growReadBuffer(); + + size_t getWriteBufMaxWriteSize() { size_t available = writeBufsize - wwi; @@ -87,23 +96,8 @@ class Client return wwi - wri; }; - void growWriteBuffer(size_t add_size) - { - if (add_size == 0) - return; - - const size_t grow_by = std::max(add_size, writeBufsize*2); - const size_t newBufSize = writeBufsize + grow_by; - char *writebuf = (char*)realloc(this->writebuf, newBufSize); - - if (writebuf == NULL) - throw std::runtime_error("Memory allocation failure in growWriteBuffer()"); + void growWriteBuffer(size_t add_size); - this->writebuf = writebuf; - writeBufsize = newBufSize; - - //std::cout << "New write buf size: " << writeBufsize << std::endl; - } void setReadyForWriting(bool val); void setReadyForReading(bool val);