session.cpp
3.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include "cassert"
#include "session.h"
#include "client.h"
Session::Session()
{
}
Session::~Session()
{
logger->logf(LOG_DEBUG, "Session %s is being destroyed.", getClientId().c_str());
}
bool Session::clientDisconnected() const
{
return client.expired();
}
std::shared_ptr<Client> Session::makeSharedClient() const
{
return client.lock();
}
void Session::assignActiveConnection(std::shared_ptr<Client> &client)
{
this->client = client;
this->client_id = client->getClientId();
}
void Session::writePacket(const MqttPacket &packet, char max_qos)
{
const char qos = std::min<char>(packet.getQos(), max_qos);
if (qos == 0)
{
if (!clientDisconnected())
{
Client_p c = makeSharedClient();
c->writeMqttPacketAndBlameThisClient(packet);
}
}
else if (qos == 1)
{
std::shared_ptr<MqttPacket> copyPacket = packet.getCopy();
std::unique_lock<std::mutex> locker(qosQueueMutex);
if (qosPacketQueue.size() >= MAX_QOS_MSG_PENDING_PER_CLIENT || (qosQueueBytes >= MAX_QOS_BYTES_PENDING_PER_CLIENT && qosPacketQueue.size() > 0))
{
logger->logf(LOG_WARNING, "Dropping QoS message for client '%s', because its QoS buffers were full.", client_id.c_str());
return;
}
const uint16_t pid = nextPacketId++;
copyPacket->setPacketId(pid);
QueuedQosPacket p;
p.packet = copyPacket;
p.id = pid;
qosPacketQueue.push_back(p);
qosQueueBytes += copyPacket->getTotalMemoryFootprint();
locker.unlock();
if (!clientDisconnected())
{
Client_p c = makeSharedClient();
c->writeMqttPacketAndBlameThisClient(*copyPacket.get());
copyPacket->setDuplicate(); // Any dealings with this packet from here will be a duplicate.
}
}
}
// Normatively, this while loop will break on the first element, because all messages are sent out in order and
// should be acked in order.
void Session::clearQosMessage(uint16_t packet_id)
{
std::lock_guard<std::mutex> locker(qosQueueMutex);
auto it = qosPacketQueue.begin();
auto end = qosPacketQueue.end();
while (it != end)
{
QueuedQosPacket &p = *it;
if (p.id == packet_id)
{
size_t mem = p.packet->getTotalMemoryFootprint();
qosQueueBytes -= mem;
assert(qosQueueBytes >= 0);
if (qosQueueBytes < 0) // Should not happen, but correcting a hypothetical bug is fine for this purpose.
qosQueueBytes = 0;
qosPacketQueue.erase(it);
break;
}
it++;
}
}
// [MQTT-4.4.0-1]: "When a Client reconnects with CleanSession set to 0, both the Client and Server MUST re-send any
// unacknowledged PUBLISH Packets (where QoS > 0) and PUBREL Packets using their original Packet Identifiers. This
// is the only circumstance where a Client or Server is REQUIRED to redeliver messages."
//
// There is a bit of a hole there, I think. When we write out a packet to a receiver, it may decide to drop it, if its buffers
// are full, for instance. We are not required to (periodically) retry. TODO Perhaps I will implement that retry anyway.
void Session::sendPendingQosMessages()
{
if (!clientDisconnected())
{
Client_p c = makeSharedClient();
std::lock_guard<std::mutex> locker(qosQueueMutex);
for (QueuedQosPacket &qosMessage : qosPacketQueue)
{
c->writeMqttPacketAndBlameThisClient(*qosMessage.packet.get());
qosMessage.packet->setDuplicate(); // Any dealings with this packet from here will be a duplicate.
}
}
}
void Session::touch(time_t val)
{
time_t newval = val > 0 ? val : time(NULL);
lastTouched = newval;
}
bool Session::hasExpired()
{
return clientDisconnected() && (lastTouched + EXPIRE_SESSION_AFTER) < time(NULL);
}