Commit e29e0f6d5363f21f190c73eacfa8972d2b97a6e6

Authored by Wiebe Cazemier
1 parent ffdf1c91

Drop or throttle when buffers are full

Showing 2 changed files with 38 additions and 6 deletions
client.cpp
@@ -33,6 +33,12 @@ void Client::closeConnection() @@ -33,6 +33,12 @@ void Client::closeConnection()
33 // false means any kind of error we want to get rid of the client for. 33 // false means any kind of error we want to get rid of the client for.
34 bool Client::readFdIntoBuffer() 34 bool Client::readFdIntoBuffer()
35 { 35 {
  36 + if (wi > CLIENT_MAX_BUFFER_SIZE)
  37 + {
  38 + setReadyForReading(false);
  39 + return true;
  40 + }
  41 +
36 int n; 42 int n;
37 while ((n = read(fd, &readbuf[wi], getReadBufMaxWriteSize())) != 0) 43 while ((n = read(fd, &readbuf[wi], getReadBufMaxWriteSize())) != 0)
38 { 44 {
@@ -71,7 +77,7 @@ bool Client::readFdIntoBuffer() @@ -71,7 +77,7 @@ bool Client::readFdIntoBuffer()
71 77
72 void Client::writeMqttPacket(const MqttPacket &packet) 78 void Client::writeMqttPacket(const MqttPacket &packet)
73 { 79 {
74 - if (packet.packetType == PacketType::PUBLISH && getWriteBufBytesUsed() > CLIENT_MAX_BUFFER_SIZE) 80 + if (packet.packetType == PacketType::PUBLISH && wwi > CLIENT_MAX_BUFFER_SIZE)
75 return; 81 return;
76 82
77 if (packet.getSize() > getWriteBufMaxWriteSize()) 83 if (packet.getSize() > getWriteBufMaxWriteSize())
@@ -163,8 +169,25 @@ void Client::setReadyForWriting(bool val) @@ -163,8 +169,25 @@ void Client::setReadyForWriting(bool val)
163 struct epoll_event ev; 169 struct epoll_event ev;
164 memset(&ev, 0, sizeof (struct epoll_event)); 170 memset(&ev, 0, sizeof (struct epoll_event));
165 ev.data.fd = fd; 171 ev.data.fd = fd;
166 - ev.events = EPOLLIN;  
167 - if (val) 172 + if (readyForReading)
  173 + ev.events |= EPOLLIN;
  174 + if (readyForWriting)
  175 + ev.events |= EPOLLOUT;
  176 + check<std::runtime_error>(epoll_ctl(threadData->epollfd, EPOLL_CTL_MOD, fd, &ev));
  177 +}
  178 +
  179 +void Client::setReadyForReading(bool val)
  180 +{
  181 + if (val == this->readyForReading)
  182 + return;
  183 +
  184 + readyForReading = val;
  185 + struct epoll_event ev;
  186 + memset(&ev, 0, sizeof (struct epoll_event));
  187 + ev.data.fd = fd;
  188 + if (readyForReading)
  189 + ev.events |= EPOLLIN;
  190 + if (readyForWriting)
168 ev.events |= EPOLLOUT; 191 ev.events |= EPOLLOUT;
169 check<std::runtime_error>(epoll_ctl(threadData->epollfd, EPOLL_CTL_MOD, fd, &ev)); 192 check<std::runtime_error>(epoll_ctl(threadData->epollfd, EPOLL_CTL_MOD, fd, &ev));
170 } 193 }
@@ -215,6 +238,7 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_ @@ -215,6 +238,7 @@ bool Client::bufferToMqttPackets(std::vector&lt;MqttPacket&gt; &amp;packetQueueIn, Client_
215 { 238 {
216 ri = 0; 239 ri = 0;
217 wi = 0; 240 wi = 0;
  241 + setReadyForReading(true);
218 } 242 }
219 243
220 return true; 244 return true;
client.h
@@ -5,6 +5,7 @@ @@ -5,6 +5,7 @@
5 #include <unistd.h> 5 #include <unistd.h>
6 #include <vector> 6 #include <vector>
7 #include <mutex> 7 #include <mutex>
  8 +#include <iostream>
8 9
9 #include "forward_declarations.h" 10 #include "forward_declarations.h"
10 11
@@ -34,6 +35,7 @@ class Client @@ -34,6 +35,7 @@ class Client
34 bool authenticated = false; 35 bool authenticated = false;
35 bool connectPacketSeen = false; 36 bool connectPacketSeen = false;
36 bool readyForWriting = false; 37 bool readyForWriting = false;
  38 + bool readyForReading = true;
37 39
38 std::string clientid; 40 std::string clientid;
39 std::string username; 41 std::string username;
@@ -61,6 +63,8 @@ class Client @@ -61,6 +63,8 @@ class Client
61 if (readbuf == NULL) 63 if (readbuf == NULL)
62 throw std::runtime_error("Memory allocation failure in growReadBuffer()"); 64 throw std::runtime_error("Memory allocation failure in growReadBuffer()");
63 readBufsize = newBufSize; 65 readBufsize = newBufSize;
  66 +
  67 + std::cout << "New read buf size: " << readBufsize << std::endl;
64 } 68 }
65 69
66 size_t getWriteBufMaxWriteSize() 70 size_t getWriteBufMaxWriteSize()
@@ -80,16 +84,20 @@ class Client @@ -80,16 +84,20 @@ class Client
80 if (add_size == 0) 84 if (add_size == 0)
81 return; 85 return;
82 86
83 - const size_t newBufSize = writeBufsize + add_size; 87 + const size_t grow_by = std::max<size_t>(add_size, writeBufsize*2);
  88 + const size_t newBufSize = writeBufsize + grow_by;
84 writebuf = (char*)realloc(writebuf, newBufSize); 89 writebuf = (char*)realloc(writebuf, newBufSize);
85 90
86 if (writebuf == NULL) 91 if (writebuf == NULL)
87 throw std::runtime_error("Memory allocation failure in growWriteBuffer()"); 92 throw std::runtime_error("Memory allocation failure in growWriteBuffer()");
88 93
89 writeBufsize = newBufSize; 94 writeBufsize = newBufSize;
90 - }  
91 95
  96 + std::cout << "New write buf size: " << writeBufsize << std::endl;
  97 + }
92 98
  99 + void setReadyForWriting(bool val);
  100 + void setReadyForReading(bool val);
93 101
94 public: 102 public:
95 Client(int fd, ThreadData_p threadData); 103 Client(int fd, ThreadData_p threadData);
@@ -116,7 +124,7 @@ public: @@ -116,7 +124,7 @@ public:
116 void queueMessage(const MqttPacket &packet); 124 void queueMessage(const MqttPacket &packet);
117 void queuedMessagesToBuffer(); 125 void queuedMessagesToBuffer();
118 126
119 - void setReadyForWriting(bool val); 127 +
120 }; 128 };
121 129
122 #endif // CLIENT_H 130 #endif // CLIENT_H