Commit f8e062bf9b371cc1cfd1e8af59b2aaa46df006e6

Authored by Wiebe Cazemier
1 parent fce0beff

Simple keep-alive mechanism

client.cpp
@@ -17,6 +17,7 @@ Client::Client(int fd, ThreadData_p threadData) : @@ -17,6 +17,7 @@ Client::Client(int fd, ThreadData_p threadData) :
17 17
18 Client::~Client() 18 Client::~Client()
19 { 19 {
  20 + std::cout << "Removing client: " << repr() << std::endl;
20 close(fd); 21 close(fd);
21 } 22 }
22 23
@@ -74,6 +75,8 @@ bool Client::readFdIntoBuffer() @@ -74,6 +75,8 @@ bool Client::readFdIntoBuffer()
74 return false; 75 return false;
75 } 76 }
76 77
  78 + lastActivity = time(NULL);
  79 +
77 return true; 80 return true;
78 } 81 }
79 82
@@ -218,6 +221,15 @@ std::string Client::repr() @@ -218,6 +221,15 @@ std::string Client::repr()
218 return a.str(); 221 return a.str();
219 } 222 }
220 223
  224 +bool Client::keepAliveExpired()
  225 +{
  226 + if (!authenticated)
  227 + return lastActivity + 20 < time(NULL);
  228 +
  229 + bool result = (lastActivity + (keepalive*10/5)) < time(NULL);
  230 + return result;
  231 +}
  232 +
221 void Client::setReadyForWriting(bool val) 233 void Client::setReadyForWriting(bool val)
222 { 234 {
223 if (disconnecting) 235 if (disconnecting)
client.h
@@ -6,6 +6,7 @@ @@ -6,6 +6,7 @@
6 #include <vector> 6 #include <vector>
7 #include <mutex> 7 #include <mutex>
8 #include <iostream> 8 #include <iostream>
  9 +#include <time.h>
9 10
10 #include "forward_declarations.h" 11 #include "forward_declarations.h"
11 12
@@ -35,6 +36,7 @@ class Client @@ -35,6 +36,7 @@ class Client
35 bool readyForReading = true; 36 bool readyForReading = true;
36 bool disconnectWhenBytesWritten = false; 37 bool disconnectWhenBytesWritten = false;
37 bool disconnecting = false; 38 bool disconnecting = false;
  39 + time_t lastActivity = time(NULL);
38 40
39 std::string clientid; 41 std::string clientid;
40 std::string username; 42 std::string username;
@@ -80,6 +82,7 @@ public: @@ -80,6 +82,7 @@ public:
80 void setReadyForDisconnect() { disconnectWhenBytesWritten = true; } 82 void setReadyForDisconnect() { disconnectWhenBytesWritten = true; }
81 83
82 std::string repr(); 84 std::string repr();
  85 + bool keepAliveExpired();
83 86
84 }; 87 };
85 88
mainapp.cpp
@@ -15,6 +15,7 @@ void do_thread_work(ThreadData *threadData) @@ -15,6 +15,7 @@ void do_thread_work(ThreadData *threadData)
15 memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS); 15 memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS);
16 16
17 std::vector<MqttPacket> packetQueueIn; 17 std::vector<MqttPacket> packetQueueIn;
  18 + time_t lastKeepAliveCheck = time(NULL);
18 19
19 while (threadData->running) 20 while (threadData->running)
20 { 21 {
@@ -46,7 +47,6 @@ void do_thread_work(ThreadData *threadData) @@ -46,7 +47,6 @@ void do_thread_work(ThreadData *threadData)
46 47
47 if (!readSuccess) 48 if (!readSuccess)
48 { 49 {
49 - std::cout << "Disconnect: " << client->repr() << std::endl;  
50 threadData->removeClient(client); 50 threadData->removeClient(client);
51 continue; 51 continue;
52 } 52 }
@@ -92,6 +92,19 @@ void do_thread_work(ThreadData *threadData) @@ -92,6 +92,19 @@ void do_thread_work(ThreadData *threadData)
92 } 92 }
93 } 93 }
94 packetQueueIn.clear(); 94 packetQueueIn.clear();
  95 +
  96 + try
  97 + {
  98 + if (lastKeepAliveCheck + 30 < time(NULL))
  99 + {
  100 + if (threadData->doKeepAliveCheck())
  101 + lastKeepAliveCheck = time(NULL);
  102 + }
  103 + }
  104 + catch (std::exception &ex)
  105 + {
  106 + std::cerr << "Error handling keep-alives: " << ex.what() << std::endl;
  107 + }
95 } 108 }
96 } 109 }
97 110
threaddata.cpp
@@ -72,5 +72,28 @@ std::shared_ptr&lt;SubscriptionStore&gt; &amp;ThreadData::getSubscriptionStore() @@ -72,5 +72,28 @@ std::shared_ptr&lt;SubscriptionStore&gt; &amp;ThreadData::getSubscriptionStore()
72 return subscriptionStore; 72 return subscriptionStore;
73 } 73 }
74 74
  75 +// TODO: profile how fast hash iteration is. Perhaps having a second list/vector is beneficial?
  76 +bool ThreadData::doKeepAliveCheck()
  77 +{
  78 + std::unique_lock<std::mutex> lock(clients_by_fd_mutex, std::try_to_lock);
  79 + if (!lock.owns_lock())
  80 + return false;
  81 +
  82 + auto it = clients_by_fd.begin();
  83 + while (it != clients_by_fd.end())
  84 + {
  85 + Client_p &client = it->second;
  86 + if (client->keepAliveExpired())
  87 + {
  88 + subscriptionStore->removeClient(client);
  89 + it = clients_by_fd.erase(it);
  90 + }
  91 + else
  92 + it++;
  93 + }
  94 +
  95 + return true;
  96 +}
  97 +
75 98
76 99
threaddata.h
@@ -40,6 +40,8 @@ public: @@ -40,6 +40,8 @@ public:
40 void removeClient(Client_p client); 40 void removeClient(Client_p client);
41 void removeClient(int fd); 41 void removeClient(int fd);
42 std::shared_ptr<SubscriptionStore> &getSubscriptionStore(); 42 std::shared_ptr<SubscriptionStore> &getSubscriptionStore();
  43 +
  44 + bool doKeepAliveCheck();
43 }; 45 };
44 46
45 #endif // THREADDATA_H 47 #endif // THREADDATA_H