diff --git a/cirbuf.cpp b/cirbuf.cpp index aaf9931..74d0cdf 100644 --- a/cirbuf.cpp +++ b/cirbuf.cpp @@ -25,13 +25,13 @@ CirBuf::~CirBuf() free(buf); } -int CirBuf::usedBytes() const +uint CirBuf::usedBytes() const { int result = (head - tail) & (size-1); return result; } -int CirBuf::freeSpace() const +uint CirBuf::freeSpace() const { int result = (tail - (head + 1)) & (size-1); return result; @@ -99,7 +99,7 @@ void CirBuf::doubleSize() head = tail + usedBytes(); size = newSize; - std::cout << "New read buf size: " << size << std::endl; + std::cout << "New buf size: " << size << std::endl; #ifdef TESTING memset(&buf[head], 5, maxWriteSize() + 2); diff --git a/cirbuf.h b/cirbuf.h index 761d18c..4b9dc89 100644 --- a/cirbuf.h +++ b/cirbuf.h @@ -20,8 +20,8 @@ public: CirBuf(size_t size); ~CirBuf(); - int usedBytes() const; - int freeSpace() const; + uint usedBytes() const; + uint freeSpace() const; int maxWriteSize() const; int maxReadSize() const; char *headPtr(); diff --git a/client.cpp b/client.cpp index 37c9c94..6604690 100644 --- a/client.cpp +++ b/client.cpp @@ -8,24 +8,16 @@ Client::Client(int fd, ThreadData_p threadData) : fd(fd), readbuf(CLIENT_BUFFER_SIZE), + writebuf(CLIENT_BUFFER_SIZE), threadData(threadData) { int flags = fcntl(fd, F_GETFL); fcntl(fd, F_SETFL, flags | O_NONBLOCK); - char *writebuf = (char*)malloc(CLIENT_BUFFER_SIZE); - - if (writebuf == NULL) - { - throw std::runtime_error("Malloc error constructing client."); - } - - this->writebuf = writebuf; } Client::~Client() { close(fd); - free(writebuf); } // Do this from a place you'll know ownwership of the shared_ptr is being given up everywhere, so the close happens when the last owner gives it up. @@ -89,25 +81,44 @@ void Client::writeMqttPacket(const MqttPacket &packet) { std::lock_guard locker(writeBufMutex); - if (packet.packetType == PacketType::PUBLISH && wwi > CLIENT_MAX_BUFFER_SIZE) - return; - - if (packet.getSizeIncludingNonPresentHeader() > getWriteBufMaxWriteSize()) - growWriteBuffer(packet.getSizeIncludingNonPresentHeader()); + if (packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace()) + { + if (packet.packetType == PacketType::PUBLISH && writebuf.getSize() >= CLIENT_MAX_BUFFER_SIZE) + return; + writebuf.doubleSize(); + } if (!packet.containsFixedHeader()) { - writebuf[wwi++] = packet.getFirstByte(); + writebuf.headPtr()[0] = packet.getFirstByte(); + writebuf.advanceHead(1); RemainingLength r = packet.getRemainingLength(); - std::memcpy(&writebuf[wwi], r.bytes, r.len); - wwi += r.len; - } - std::memcpy(&writebuf[wwi], &packet.getBites()[0], packet.getBites().size()); - wwi += packet.getBites().size(); + ssize_t len_left = r.len; + int src_i = 0; + while (len_left > 0) + { + const size_t len = std::min(len_left, writebuf.maxWriteSize()); + std::memcpy(writebuf.headPtr(), &r.bytes[src_i], len); + writebuf.advanceHead(len); + src_i += len; + len_left -= len; + } + assert(len_left == 0); + assert(src_i == r.len); + } - assert(wwi >= static_cast(packet.getSizeIncludingNonPresentHeader())); - assert(wwi <= static_cast(writeBufsize)); + ssize_t len_left = packet.getBites().size(); + int src_i = 0; + while (len_left > 0) + { + const size_t len = std::min(len_left, writebuf.maxWriteSize()); + std::memcpy(writebuf.headPtr(), &packet.getBites()[src_i], len); + writebuf.advanceHead(len); + src_i += len; + len_left -= len; + } + assert(len_left == 0); if (packet.packetType == PacketType::DISCONNECT) setReadyForDisconnect(); @@ -135,11 +146,13 @@ void Client::writePingResp() std::cout << "Sending ping response to " << repr() << std::endl; - if (2 > getWriteBufMaxWriteSize()) - growWriteBuffer(CLIENT_BUFFER_SIZE); + if (2 > writebuf.freeSpace()) + writebuf.doubleSize(); - writebuf[wwi++] = 0b11010000; - writebuf[wwi++] = 0; + writebuf.headPtr()[0] = 0b11010000; + writebuf.advanceHead(1); + writebuf.headPtr()[0] = 0; + writebuf.advanceHead(1); setReadyForWriting(true); } @@ -155,10 +168,10 @@ bool Client::writeBufIntoFd() return false; int n; - while ((n = write(fd, &writebuf[wri], getWriteBufBytesUsed())) != 0) + while ((n = write(fd, writebuf.tailPtr(), writebuf.maxReadSize())) != 0) { if (n > 0) - wri += n; + writebuf.advanceTail(n); if (n < 0) { if (errno == EINTR) @@ -170,13 +183,7 @@ bool Client::writeBufIntoFd() } } - if (wri == wwi) - { - wri = 0; - wwi = 0; - - setReadyForWriting(false); - } + setReadyForWriting(writebuf.usedBytes() > 0); return true; } @@ -189,24 +196,6 @@ std::string Client::repr() return a.str(); } -void Client::growWriteBuffer(size_t add_size) -{ - if (add_size == 0) - return; - - const size_t grow_by = std::max(add_size, writeBufsize*2); - const size_t newBufSize = writeBufsize + grow_by; - char *writebuf = (char*)realloc(this->writebuf, newBufSize); - - if (writebuf == NULL) - throw std::runtime_error("Memory allocation failure in growWriteBuffer()"); - - this->writebuf = writebuf; - writeBufsize = newBufSize; - - std::cout << "New write buf size: " << writeBufsize << std::endl; -} - void Client::setReadyForWriting(bool val) { if (disconnecting) diff --git a/client.h b/client.h index 1d96c8f..b012d9b 100644 --- a/client.h +++ b/client.h @@ -25,10 +25,7 @@ class Client CirBuf readbuf; - char *writebuf = NULL; // With many clients, it may not be smart to keep a (big) buffer around. - size_t writeBufsize = CLIENT_BUFFER_SIZE; - int wwi = 0; - int wri = 0; + CirBuf writebuf; bool authenticated = false; bool connectPacketSeen = false; @@ -50,21 +47,6 @@ class Client std::mutex writeBufMutex; - size_t getWriteBufMaxWriteSize() - { - size_t available = writeBufsize - wwi; - return available; - } - - // Note: this is not the inverse of free space, because there can be non-used lead-in in the buffer! - size_t getWriteBufBytesUsed() - { - return wwi - wri; - }; - - void growWriteBuffer(size_t add_size); - - void setReadyForWriting(bool val); void setReadyForReading(bool val); @@ -90,7 +72,7 @@ public: void writeMqttPacket(const MqttPacket &packet); void writeMqttPacketAndBlameThisClient(const MqttPacket &packet); bool writeBufIntoFd(); - bool readyForDisconnecting() const { return disconnectWhenBytesWritten && wwi == wri && wwi == 0; } + bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; } // Do this before calling an action that makes this client ready for writing, so that the EPOLLOUT will handle it. void setReadyForDisconnect() { disconnectWhenBytesWritten = true; }