Commit 7e87fd91bb6ef68c808936fb178010c579510ae0

Authored by Wiebe Cazemier
1 parent 9b6e0d3b

Change from checking fd==-1 to properly through the destructor

client.cpp
@@ -33,23 +33,27 @@ Client::Client(int fd, ThreadData_p threadData) : @@ -33,23 +33,27 @@ Client::Client(int fd, ThreadData_p threadData) :
33 33
34 Client::~Client() 34 Client::~Client()
35 { 35 {
36 - closeConnection(); 36 + close(fd);
37 free(readbuf); 37 free(readbuf);
38 free(writebuf); 38 free(writebuf);
39 } 39 }
40 40
41 -void Client::closeConnection() 41 +// Do this from a place you'll know ownwership of the shared_ptr is being given up everywhere, so the close happens when the last owner gives it up.
  42 +void Client::markAsDisconnecting()
42 { 43 {
43 - if (fd < 0) 44 + if (disconnecting)
44 return; 45 return;
  46 +
  47 + disconnecting = true;
45 check<std::runtime_error>(epoll_ctl(threadData->epollfd, EPOLL_CTL_DEL, fd, NULL)); 48 check<std::runtime_error>(epoll_ctl(threadData->epollfd, EPOLL_CTL_DEL, fd, NULL));
46 - close(fd);  
47 - fd = -1;  
48 } 49 }
49 50
50 // false means any kind of error we want to get rid of the client for. 51 // false means any kind of error we want to get rid of the client for.
51 bool Client::readFdIntoBuffer() 52 bool Client::readFdIntoBuffer()
52 { 53 {
  54 + if (disconnecting)
  55 + return false;
  56 +
53 if (wi > CLIENT_MAX_BUFFER_SIZE) 57 if (wi > CLIENT_MAX_BUFFER_SIZE)
54 { 58 {
55 setReadyForReading(false); 59 setReadyForReading(false);
@@ -119,6 +123,19 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet) @@ -119,6 +123,19 @@ void Client::writeMqttPacket(const MqttPacket &amp;packet)
119 setReadyForWriting(true); 123 setReadyForWriting(true);
120 } 124 }
121 125
  126 +// Helper method to avoid the exception ending up at the sender of messages, which would then get disconnected.
  127 +void Client::writeMqttPacketAndBlameThisClient(const MqttPacket &packet)
  128 +{
  129 + try
  130 + {
  131 + this->writeMqttPacket(packet);
  132 + }
  133 + catch (std::exception &ex)
  134 + {
  135 + threadData->removeClient(fd);
  136 + }
  137 +}
  138 +
122 // Ping responses are always the same, so hardcoding it for optimization. 139 // Ping responses are always the same, so hardcoding it for optimization.
123 void Client::writePingResp() 140 void Client::writePingResp()
124 { 141 {
@@ -142,7 +159,7 @@ bool Client::writeBufIntoFd() @@ -142,7 +159,7 @@ bool Client::writeBufIntoFd()
142 return true; 159 return true;
143 160
144 // We can abort the write; the client is about to be removed anyway. 161 // We can abort the write; the client is about to be removed anyway.
145 - if (isDisconnected()) 162 + if (disconnecting)
146 return false; 163 return false;
147 164
148 int n; 165 int n;
@@ -182,6 +199,9 @@ std::string Client::repr() @@ -182,6 +199,9 @@ std::string Client::repr()
182 199
183 void Client::setReadyForWriting(bool val) 200 void Client::setReadyForWriting(bool val)
184 { 201 {
  202 + if (disconnecting)
  203 + return;
  204 +
185 if (val == this->readyForWriting) 205 if (val == this->readyForWriting)
186 return; 206 return;
187 207
@@ -198,6 +218,9 @@ void Client::setReadyForWriting(bool val) @@ -198,6 +218,9 @@ void Client::setReadyForWriting(bool val)
198 218
199 void Client::setReadyForReading(bool val) 219 void Client::setReadyForReading(bool val)
200 { 220 {
  221 + if (disconnecting)
  222 + return;
  223 +
201 if (val == this->readyForReading) 224 if (val == this->readyForReading)
202 return; 225 return;
203 226
client.h
@@ -37,6 +37,7 @@ class Client @@ -37,6 +37,7 @@ class Client
37 bool readyForWriting = false; 37 bool readyForWriting = false;
38 bool readyForReading = true; 38 bool readyForReading = true;
39 bool disconnectWhenBytesWritten = false; 39 bool disconnectWhenBytesWritten = false;
  40 + bool disconnecting = false;
40 41
41 std::string clientid; 42 std::string clientid;
42 std::string username; 43 std::string username;
@@ -109,10 +110,12 @@ class Client @@ -109,10 +110,12 @@ class Client
109 110
110 public: 111 public:
111 Client(int fd, ThreadData_p threadData); 112 Client(int fd, ThreadData_p threadData);
  113 + Client(const Client &other) = delete;
  114 + Client(Client &&other) = delete;
112 ~Client(); 115 ~Client();
113 116
114 int getFd() { return fd;} 117 int getFd() { return fd;}
115 - void closeConnection(); 118 + void markAsDisconnecting();
116 bool readFdIntoBuffer(); 119 bool readFdIntoBuffer();
117 bool bufferToMqttPackets(std::vector<MqttPacket> &packetQueueIn, Client_p &sender); 120 bool bufferToMqttPackets(std::vector<MqttPacket> &packetQueueIn, Client_p &sender);
118 void setClientProperties(const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive); 121 void setClientProperties(const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive);
@@ -125,14 +128,13 @@ public: @@ -125,14 +128,13 @@ public:
125 128
126 void writePingResp(); 129 void writePingResp();
127 void writeMqttPacket(const MqttPacket &packet); 130 void writeMqttPacket(const MqttPacket &packet);
  131 + void writeMqttPacketAndBlameThisClient(const MqttPacket &packet);
128 bool writeBufIntoFd(); 132 bool writeBufIntoFd();
129 bool readyForDisconnecting() const { return disconnectWhenBytesWritten && wwi == wri && wwi == 0; } 133 bool readyForDisconnecting() const { return disconnectWhenBytesWritten && wwi == wri && wwi == 0; }
130 134
131 // Do this before calling an action that makes this client ready for writing, so that the EPOLLOUT will handle it. 135 // Do this before calling an action that makes this client ready for writing, so that the EPOLLOUT will handle it.
132 void setReadyForDisconnect() { disconnectWhenBytesWritten = true; } 136 void setReadyForDisconnect() { disconnectWhenBytesWritten = true; }
133 137
134 - bool isDisconnected() const { return fd < 0; }  
135 -  
136 std::string repr(); 138 std::string repr();
137 139
138 }; 140 };
mainapp.cpp
@@ -55,7 +55,7 @@ void do_thread_work(ThreadData *threadData) @@ -55,7 +55,7 @@ void do_thread_work(ThreadData *threadData)
55 55
56 Client_p client = threadData->getClient(fd); 56 Client_p client = threadData->getClient(fd);
57 57
58 - if (client && !client->isDisconnected()) 58 + if (client)
59 { 59 {
60 try 60 try
61 { 61 {
subscriptionstore.cpp
@@ -66,7 +66,7 @@ bool SubscriptionStore::publishNonRecursively(const MqttPacket &amp;packet, const st @@ -66,7 +66,7 @@ bool SubscriptionStore::publishNonRecursively(const MqttPacket &amp;packet, const st
66 auto client_it = clients_by_id_const.find(client_id); 66 auto client_it = clients_by_id_const.find(client_id);
67 if (client_it != clients_by_id_const.end()) 67 if (client_it != clients_by_id_const.end())
68 { 68 {
69 - client_it->second->writeMqttPacket(packet); 69 + client_it->second->writeMqttPacketAndBlameThisClient(packet);
70 result = true; 70 result = true;
71 } 71 }
72 } 72 }
threaddata.cpp
@@ -55,12 +55,25 @@ Client_p ThreadData::getClient(int fd) @@ -55,12 +55,25 @@ Client_p ThreadData::getClient(int fd)
55 55
56 void ThreadData::removeClient(Client_p client) 56 void ThreadData::removeClient(Client_p client)
57 { 57 {
  58 + client->markAsDisconnecting();
  59 +
58 std::lock_guard<std::mutex> lck(clients_by_fd_mutex); 60 std::lock_guard<std::mutex> lck(clients_by_fd_mutex);
59 clients_by_fd.erase(client->getFd()); 61 clients_by_fd.erase(client->getFd());
60 - client->closeConnection();  
61 subscriptionStore->removeClient(client); 62 subscriptionStore->removeClient(client);
62 } 63 }
63 64
  65 +void ThreadData::removeClient(int fd)
  66 +{
  67 + std::lock_guard<std::mutex> lck(clients_by_fd_mutex);
  68 + auto client_it = this->clients_by_fd.find(fd);
  69 + if (client_it != this->clients_by_fd.end())
  70 + {
  71 + client_it->second->markAsDisconnecting();
  72 + subscriptionStore->removeClient(client_it->second);
  73 + this->clients_by_fd.erase(fd);
  74 + }
  75 +}
  76 +
64 std::shared_ptr<SubscriptionStore> &ThreadData::getSubscriptionStore() 77 std::shared_ptr<SubscriptionStore> &ThreadData::getSubscriptionStore()
65 { 78 {
66 return subscriptionStore; 79 return subscriptionStore;
threaddata.h
@@ -41,6 +41,7 @@ public: @@ -41,6 +41,7 @@ public:
41 void giveClient(Client_p client); 41 void giveClient(Client_p client);
42 Client_p getClient(int fd); 42 Client_p getClient(int fd);
43 void removeClient(Client_p client); 43 void removeClient(Client_p client);
  44 + void removeClient(int fd);
44 std::shared_ptr<SubscriptionStore> &getSubscriptionStore(); 45 std::shared_ptr<SubscriptionStore> &getSubscriptionStore();
45 void wakeUpThread(); 46 void wakeUpThread();
46 void addToReadyForDequeuing(Client_p &client); 47 void addToReadyForDequeuing(Client_p &client);