Commit 468ca4771ca34193971536deeff816d47a2d42a7
1 parent
b5ba41f5
Refactor code to use new interface on CirBuf
And fixed an assert in CirBuf.read().
Showing
4 changed files
with
13 additions
and
51 deletions
cirbuf.cpp
| ... | ... | @@ -90,20 +90,20 @@ char CirBuf::peakAhead(uint32_t offset) const |
| 90 | 90 | return b; |
| 91 | 91 | } |
| 92 | 92 | |
| 93 | -void CirBuf::ensureFreeSpace(size_t n) | |
| 93 | +void CirBuf::ensureFreeSpace(size_t n, const size_t max) | |
| 94 | 94 | { |
| 95 | + if (n <= freeSpace()) | |
| 96 | + return; | |
| 97 | + | |
| 95 | 98 | const size_t _usedBytes = usedBytes(); |
| 96 | 99 | |
| 97 | 100 | int mul = 1; |
| 98 | 101 | |
| 99 | - while((mul * size - _usedBytes - 1) < n) | |
| 102 | + while((mul * size - _usedBytes - 1) < n && (mul*size) < max) | |
| 100 | 103 | { |
| 101 | 104 | mul = mul << 1; |
| 102 | 105 | } |
| 103 | 106 | |
| 104 | - if (mul == 1) | |
| 105 | - return; | |
| 106 | - | |
| 107 | 107 | doubleSize(mul); |
| 108 | 108 | } |
| 109 | 109 | |
| ... | ... | @@ -223,5 +223,5 @@ void CirBuf::read(void *buf, const size_t count) |
| 223 | 223 | _packet_len -= readlen; |
| 224 | 224 | } |
| 225 | 225 | assert(_packet_len == 0); |
| 226 | - assert(i == _packet_len); | |
| 226 | + assert(i == static_cast<int>(count)); | |
| 227 | 227 | } | ... | ... |
cirbuf.h
| ... | ... | @@ -4,6 +4,7 @@ |
| 4 | 4 | #include <stddef.h> |
| 5 | 5 | #include <stdlib.h> |
| 6 | 6 | #include <stdint.h> |
| 7 | +#include <limits.h> | |
| 7 | 8 | |
| 8 | 9 | // Optimized circular buffer, works only with sizes power of two. |
| 9 | 10 | class CirBuf |
| ... | ... | @@ -32,7 +33,7 @@ public: |
| 32 | 33 | void advanceHead(uint32_t n); |
| 33 | 34 | void advanceTail(uint32_t n); |
| 34 | 35 | char peakAhead(uint32_t offset) const; |
| 35 | - void ensureFreeSpace(size_t n); | |
| 36 | + void ensureFreeSpace(size_t n, const size_t max = UINT_MAX); | |
| 36 | 37 | void doubleSize(uint factor = 2); |
| 37 | 38 | uint32_t getSize() const; |
| 38 | 39 | ... | ... |
client.cpp
| ... | ... | @@ -147,10 +147,7 @@ void Client::writeMqttPacket(const MqttPacket &packet) |
| 147 | 147 | const uint32_t growBufMaxTo = std::min<int>(packet.getSizeIncludingNonPresentHeader() * 1000, maxPacketSize); |
| 148 | 148 | |
| 149 | 149 | // Grow as far as we can. We have to make room for one MQTT packet. |
| 150 | - while (packet.getSizeIncludingNonPresentHeader() > writebuf.freeSpace() && writebuf.getSize() < growBufMaxTo) | |
| 151 | - { | |
| 152 | - writebuf.doubleSize(); | |
| 153 | - } | |
| 150 | + writebuf.ensureFreeSpace(packet.getSizeIncludingNonPresentHeader(), growBufMaxTo); | |
| 154 | 151 | |
| 155 | 152 | // And drop a publish when it doesn't fit, even after resizing. This means we do allow pings. And |
| 156 | 153 | // QoS packet are queued and limited elsewhere. |
| ... | ... | @@ -164,34 +161,10 @@ void Client::writeMqttPacket(const MqttPacket &packet) |
| 164 | 161 | writebuf.headPtr()[0] = packet.getFirstByte(); |
| 165 | 162 | writebuf.advanceHead(1); |
| 166 | 163 | RemainingLength r = packet.getRemainingLength(); |
| 167 | - | |
| 168 | - ssize_t len_left = r.len; | |
| 169 | - int src_i = 0; | |
| 170 | - while (len_left > 0) | |
| 171 | - { | |
| 172 | - const size_t len = std::min<int>(len_left, writebuf.maxWriteSize()); | |
| 173 | - assert(len > 0); | |
| 174 | - std::memcpy(writebuf.headPtr(), &r.bytes[src_i], len); | |
| 175 | - writebuf.advanceHead(len); | |
| 176 | - src_i += len; | |
| 177 | - len_left -= len; | |
| 178 | - } | |
| 179 | - assert(len_left == 0); | |
| 180 | - assert(src_i == r.len); | |
| 164 | + writebuf.write(r.bytes, r.len); | |
| 181 | 165 | } |
| 182 | 166 | |
| 183 | - ssize_t len_left = packet.getBites().size(); | |
| 184 | - int src_i = 0; | |
| 185 | - while (len_left > 0) | |
| 186 | - { | |
| 187 | - const size_t len = std::min<int>(len_left, writebuf.maxWriteSize()); | |
| 188 | - assert(len > 0); | |
| 189 | - std::memcpy(writebuf.headPtr(), &packet.getBites()[src_i], len); | |
| 190 | - writebuf.advanceHead(len); | |
| 191 | - src_i += len; | |
| 192 | - len_left -= len; | |
| 193 | - } | |
| 194 | - assert(len_left == 0); | |
| 167 | + writebuf.write(packet.getBites().data(), packet.getBites().size()); | |
| 195 | 168 | |
| 196 | 169 | if (packet.packetType == PacketType::DISCONNECT) |
| 197 | 170 | setReadyForDisconnect(); |
| ... | ... | @@ -217,8 +190,7 @@ void Client::writePingResp() |
| 217 | 190 | { |
| 218 | 191 | std::lock_guard<std::mutex> locker(writeBufMutex); |
| 219 | 192 | |
| 220 | - if (2 > writebuf.freeSpace()) | |
| 221 | - writebuf.doubleSize(); | |
| 193 | + writebuf.ensureFreeSpace(2); | |
| 222 | 194 | |
| 223 | 195 | writebuf.headPtr()[0] = 0b11010000; |
| 224 | 196 | writebuf.advanceHead(1); | ... | ... |
mqttpacket.cpp
| ... | ... | @@ -17,18 +17,7 @@ MqttPacket::MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_lengt |
| 17 | 17 | fixed_header_length(fixed_header_length), |
| 18 | 18 | sender(sender) |
| 19 | 19 | { |
| 20 | - int i = 0; | |
| 21 | - ssize_t _packet_len = packet_len; | |
| 22 | - while (_packet_len > 0) | |
| 23 | - { | |
| 24 | - int readlen = std::min<int>(buf.maxReadSize(), _packet_len); | |
| 25 | - assert(readlen > 0); | |
| 26 | - std::memcpy(&bites[i], buf.tailPtr(), readlen); | |
| 27 | - buf.advanceTail(readlen); | |
| 28 | - i += readlen; | |
| 29 | - _packet_len -= readlen; | |
| 30 | - } | |
| 31 | - assert(_packet_len == 0); | |
| 20 | + buf.read(bites.data(), packet_len); | |
| 32 | 21 | |
| 33 | 22 | first_byte = bites[0]; |
| 34 | 23 | unsigned char _packetType = (first_byte & 0xF0) >> 4; | ... | ... |