diff --git a/client.cpp b/client.cpp index 8b1aa3b..2e82b78 100644 --- a/client.cpp +++ b/client.cpp @@ -33,6 +33,12 @@ void Client::closeConnection() // false means any kind of error we want to get rid of the client for. bool Client::readFdIntoBuffer() { + if (wi > CLIENT_MAX_BUFFER_SIZE) + { + setReadyForReading(false); + return true; + } + int n; while ((n = read(fd, &readbuf[wi], getReadBufMaxWriteSize())) != 0) { @@ -71,7 +77,7 @@ bool Client::readFdIntoBuffer() void Client::writeMqttPacket(const MqttPacket &packet) { - if (packet.packetType == PacketType::PUBLISH && getWriteBufBytesUsed() > CLIENT_MAX_BUFFER_SIZE) + if (packet.packetType == PacketType::PUBLISH && wwi > CLIENT_MAX_BUFFER_SIZE) return; if (packet.getSize() > getWriteBufMaxWriteSize()) @@ -163,8 +169,25 @@ void Client::setReadyForWriting(bool val) struct epoll_event ev; memset(&ev, 0, sizeof (struct epoll_event)); ev.data.fd = fd; - ev.events = EPOLLIN; - if (val) + if (readyForReading) + ev.events |= EPOLLIN; + if (readyForWriting) + ev.events |= EPOLLOUT; + check(epoll_ctl(threadData->epollfd, EPOLL_CTL_MOD, fd, &ev)); +} + +void Client::setReadyForReading(bool val) +{ + if (val == this->readyForReading) + return; + + readyForReading = val; + struct epoll_event ev; + memset(&ev, 0, sizeof (struct epoll_event)); + ev.data.fd = fd; + if (readyForReading) + ev.events |= EPOLLIN; + if (readyForWriting) ev.events |= EPOLLOUT; check(epoll_ctl(threadData->epollfd, EPOLL_CTL_MOD, fd, &ev)); } @@ -215,6 +238,7 @@ bool Client::bufferToMqttPackets(std::vector &packetQueueIn, Client_ { ri = 0; wi = 0; + setReadyForReading(true); } return true; diff --git a/client.h b/client.h index e1a37cd..a374017 100644 --- a/client.h +++ b/client.h @@ -5,6 +5,7 @@ #include #include #include +#include #include "forward_declarations.h" @@ -34,6 +35,7 @@ class Client bool authenticated = false; bool connectPacketSeen = false; bool readyForWriting = false; + bool readyForReading = true; std::string clientid; std::string username; @@ -61,6 +63,8 @@ class Client if (readbuf == NULL) throw std::runtime_error("Memory allocation failure in growReadBuffer()"); readBufsize = newBufSize; + + std::cout << "New read buf size: " << readBufsize << std::endl; } size_t getWriteBufMaxWriteSize() @@ -80,16 +84,20 @@ class Client if (add_size == 0) return; - const size_t newBufSize = writeBufsize + add_size; + const size_t grow_by = std::max(add_size, writeBufsize*2); + const size_t newBufSize = writeBufsize + grow_by; writebuf = (char*)realloc(writebuf, newBufSize); if (writebuf == NULL) throw std::runtime_error("Memory allocation failure in growWriteBuffer()"); writeBufsize = newBufSize; - } + std::cout << "New write buf size: " << writeBufsize << std::endl; + } + void setReadyForWriting(bool val); + void setReadyForReading(bool val); public: Client(int fd, ThreadData_p threadData); @@ -116,7 +124,7 @@ public: void queueMessage(const MqttPacket &packet); void queuedMessagesToBuffer(); - void setReadyForWriting(bool val); + }; #endif // CLIENT_H