Commit 121e04dec421ee4f23dd91bcbf2abb035001daff

Authored by Wiebe Cazemier
1 parent ae95e6dc

Circbuf for write buffer

cirbuf.cpp
... ... @@ -25,13 +25,13 @@ CirBuf::~CirBuf()
25 25 free(buf);
26 26 }
27 27  
28   -int CirBuf::usedBytes() const
  28 +uint CirBuf::usedBytes() const
29 29 {
30 30 int result = (head - tail) & (size-1);
31 31 return result;
32 32 }
33 33  
34   -int CirBuf::freeSpace() const
  34 +uint CirBuf::freeSpace() const
35 35 {
36 36 int result = (tail - (head + 1)) & (size-1);
37 37 return result;
... ... @@ -99,7 +99,7 @@ void CirBuf::doubleSize()
99 99 head = tail + usedBytes();
100 100 size = newSize;
101 101  
102   - std::cout << "New read buf size: " << size << std::endl;
  102 + std::cout << "New buf size: " << size << std::endl;
103 103  
104 104 #ifdef TESTING
105 105 memset(&buf[head], 5, maxWriteSize() + 2);
... ...
cirbuf.h
... ... @@ -20,8 +20,8 @@ public:
20 20 CirBuf(size_t size);
21 21 ~CirBuf();
22 22  
23   - int usedBytes() const;
24   - int freeSpace() const;
  23 + uint usedBytes() const;
  24 + uint freeSpace() const;
25 25 int maxWriteSize() const;
26 26 int maxReadSize() const;
27 27 char *headPtr();
... ...
client.cpp
... ... @@ -8,24 +8,16 @@
8 8 Client::Client(int fd, ThreadData_p threadData) :
9 9 fd(fd),
10 10 readbuf(CLIENT_BUFFER_SIZE),
  11 + writebuf(CLIENT_BUFFER_SIZE),
11 12 threadData(threadData)
12 13 {
13 14 int flags = fcntl(fd, F_GETFL);
14 15 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
15   - char *writebuf = (char*)malloc(CLIENT_BUFFER_SIZE);
16   -
17   - if (writebuf == NULL)
18   - {
19   - throw std::runtime_error("Malloc error constructing client.");
20   - }
21   -
22   - this->writebuf = writebuf;
23 16 }
24 17  
25 18 Client::~Client()
26 19 {
27 20 close(fd);
28   - free(writebuf);
29 21 }
30 22  
31 23 // Do this from a place you'll know ownwership of the shared_ptr is being given up everywhere, so the close happens when the last owner gives it up.
... ... @@ -89,25 +81,44 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet)
89 81 {
90 82 std::lock_guard<std::mutex> locker(writeBufMutex);
91 83  
92   - if (packet.packetType == PacketType::PUBLISH && wwi > CLIENT_MAX_BUFFER_SIZE)
93   - return;
94   -
95   - if (packet.getSizeIncludingNonPresentHeader() > getWriteBufMaxWriteSize())
96   - growWriteBuffer(packet.getSizeIncludingNonPresentHeader());
  84 + if (packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace())
  85 + {
  86 + if (packet.packetType == PacketType::PUBLISH && writebuf.getSize() >= CLIENT_MAX_BUFFER_SIZE)
  87 + return;
  88 + writebuf.doubleSize();
  89 + }
97 90  
98 91 if (!packet.containsFixedHeader())
99 92 {
100   - writebuf[wwi++] = packet.getFirstByte();
  93 + writebuf.headPtr()[0] = packet.getFirstByte();
  94 + writebuf.advanceHead(1);
101 95 RemainingLength r = packet.getRemainingLength();
102   - std::memcpy(&writebuf[wwi], r.bytes, r.len);
103   - wwi += r.len;
104   - }
105 96  
106   - std::memcpy(&writebuf[wwi], &packet.getBites()[0], packet.getBites().size());
107   - wwi += packet.getBites().size();
  97 + ssize_t len_left = r.len;
  98 + int src_i = 0;
  99 + while (len_left > 0)
  100 + {
  101 + const size_t len = std::min<int>(len_left, writebuf.maxWriteSize());
  102 + std::memcpy(writebuf.headPtr(), &r.bytes[src_i], len);
  103 + writebuf.advanceHead(len);
  104 + src_i += len;
  105 + len_left -= len;
  106 + }
  107 + assert(len_left == 0);
  108 + assert(src_i == r.len);
  109 + }
108 110  
109   - assert(wwi >= static_cast<int>(packet.getSizeIncludingNonPresentHeader()));
110   - assert(wwi <= static_cast<int>(writeBufsize));
  111 + ssize_t len_left = packet.getBites().size();
  112 + int src_i = 0;
  113 + while (len_left > 0)
  114 + {
  115 + const size_t len = std::min<int>(len_left, writebuf.maxWriteSize());
  116 + std::memcpy(writebuf.headPtr(), &packet.getBites()[src_i], len);
  117 + writebuf.advanceHead(len);
  118 + src_i += len;
  119 + len_left -= len;
  120 + }
  121 + assert(len_left == 0);
111 122  
112 123 if (packet.packetType == PacketType::DISCONNECT)
113 124 setReadyForDisconnect();
... ... @@ -135,11 +146,13 @@ void Client::writePingResp()
135 146  
136 147 std::cout << "Sending ping response to " << repr() << std::endl;
137 148  
138   - if (2 > getWriteBufMaxWriteSize())
139   - growWriteBuffer(CLIENT_BUFFER_SIZE);
  149 + if (2 > writebuf.freeSpace())
  150 + writebuf.doubleSize();
140 151  
141   - writebuf[wwi++] = 0b11010000;
142   - writebuf[wwi++] = 0;
  152 + writebuf.headPtr()[0] = 0b11010000;
  153 + writebuf.advanceHead(1);
  154 + writebuf.headPtr()[0] = 0;
  155 + writebuf.advanceHead(1);
143 156  
144 157 setReadyForWriting(true);
145 158 }
... ... @@ -155,10 +168,10 @@ bool Client::writeBufIntoFd()
155 168 return false;
156 169  
157 170 int n;
158   - while ((n = write(fd, &writebuf[wri], getWriteBufBytesUsed())) != 0)
  171 + while ((n = write(fd, writebuf.tailPtr(), writebuf.maxReadSize())) != 0)
159 172 {
160 173 if (n > 0)
161   - wri += n;
  174 + writebuf.advanceTail(n);
162 175 if (n < 0)
163 176 {
164 177 if (errno == EINTR)
... ... @@ -170,13 +183,7 @@ bool Client::writeBufIntoFd()
170 183 }
171 184 }
172 185  
173   - if (wri == wwi)
174   - {
175   - wri = 0;
176   - wwi = 0;
177   -
178   - setReadyForWriting(false);
179   - }
  186 + setReadyForWriting(writebuf.usedBytes() > 0);
180 187  
181 188 return true;
182 189 }
... ... @@ -189,24 +196,6 @@ std::string Client::repr()
189 196 return a.str();
190 197 }
191 198  
192   -void Client::growWriteBuffer(size_t add_size)
193   -{
194   - if (add_size == 0)
195   - return;
196   -
197   - const size_t grow_by = std::max<size_t>(add_size, writeBufsize*2);
198   - const size_t newBufSize = writeBufsize + grow_by;
199   - char *writebuf = (char*)realloc(this->writebuf, newBufSize);
200   -
201   - if (writebuf == NULL)
202   - throw std::runtime_error("Memory allocation failure in growWriteBuffer()");
203   -
204   - this->writebuf = writebuf;
205   - writeBufsize = newBufSize;
206   -
207   - std::cout << "New write buf size: " << writeBufsize << std::endl;
208   -}
209   -
210 199 void Client::setReadyForWriting(bool val)
211 200 {
212 201 if (disconnecting)
... ...
client.h
... ... @@ -25,10 +25,7 @@ class Client
25 25  
26 26 CirBuf readbuf;
27 27  
28   - char *writebuf = NULL; // With many clients, it may not be smart to keep a (big) buffer around.
29   - size_t writeBufsize = CLIENT_BUFFER_SIZE;
30   - int wwi = 0;
31   - int wri = 0;
  28 + CirBuf writebuf;
32 29  
33 30 bool authenticated = false;
34 31 bool connectPacketSeen = false;
... ... @@ -50,21 +47,6 @@ class Client
50 47 std::mutex writeBufMutex;
51 48  
52 49  
53   - size_t getWriteBufMaxWriteSize()
54   - {
55   - size_t available = writeBufsize - wwi;
56   - return available;
57   - }
58   -
59   - // Note: this is not the inverse of free space, because there can be non-used lead-in in the buffer!
60   - size_t getWriteBufBytesUsed()
61   - {
62   - return wwi - wri;
63   - };
64   -
65   - void growWriteBuffer(size_t add_size);
66   -
67   -
68 50 void setReadyForWriting(bool val);
69 51 void setReadyForReading(bool val);
70 52  
... ... @@ -90,7 +72,7 @@ public:
90 72 void writeMqttPacket(const MqttPacket &packet);
91 73 void writeMqttPacketAndBlameThisClient(const MqttPacket &packet);
92 74 bool writeBufIntoFd();
93   - bool readyForDisconnecting() const { return disconnectWhenBytesWritten && wwi == wri && wwi == 0; }
  75 + bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; }
94 76  
95 77 // Do this before calling an action that makes this client ready for writing, so that the EPOLLOUT will handle it.
96 78 void setReadyForDisconnect() { disconnectWhenBytesWritten = true; }
... ...