Commit 8ce485ee555b65917efafc5d633d3315f5d84550

Authored by Wiebe Cazemier
1 parent b0def7dc

Mostly done with user properties in connect, publish and subscribe.

And one line about correlation data that was a bug.
authplugin.cpp
... ... @@ -278,7 +278,7 @@ void Authentication::securityCleanup(bool reloading)
278 278 }
279 279  
280 280 AuthResult Authentication::aclCheck(const std::string &clientid, const std::string &username, const std::string &topic, const std::vector<std::string> &subtopics,
281   - AclAccess access, char qos, bool retain)
  281 + AclAccess access, char qos, bool retain, const std::vector<std::pair<std::string, std::string>> *userProperties)
282 282 {
283 283 assert(subtopics.size() > 0);
284 284  
... ... @@ -322,7 +322,7 @@ AuthResult Authentication::aclCheck(const std::string &amp;clientid, const std::stri
322 322 // gets disconnected.
323 323 try
324 324 {
325   - FlashMQMessage msg(topic, subtopics, qos, retain);
  325 + FlashMQMessage msg(topic, subtopics, qos, retain, userProperties);
326 326 return flashmq_auth_plugin_acl_check_v1(pluginData, access, clientid, username, msg);
327 327 }
328 328 catch (std::exception &ex)
... ... @@ -335,7 +335,8 @@ AuthResult Authentication::aclCheck(const std::string &amp;clientid, const std::stri
335 335 return AuthResult::error;
336 336 }
337 337  
338   -AuthResult Authentication::unPwdCheck(const std::string &username, const std::string &password)
  338 +AuthResult Authentication::unPwdCheck(const std::string &username, const std::string &password,
  339 + const std::vector<std::pair<std::string, std::string>> *userProperties)
339 340 {
340 341 AuthResult firstResult = unPwdCheckFromMosquittoPasswordFile(username, password);
341 342  
... ... @@ -373,7 +374,7 @@ AuthResult Authentication::unPwdCheck(const std::string &amp;username, const std::st
373 374 // gets disconnected.
374 375 try
375 376 {
376   - return flashmq_auth_plugin_login_check_v1(pluginData, username, password);
  377 + return flashmq_auth_plugin_login_check_v1(pluginData, username, password, userProperties);
377 378 }
378 379 catch (std::exception &ex)
379 380 {
... ...
authplugin.h
... ... @@ -65,7 +65,8 @@ typedef void(*F_flashmq_auth_plugin_deallocate_thread_memory_v1)(void *thread_da
65 65 typedef void(*F_flashmq_auth_plugin_init_v1)(void *thread_data, std::unordered_map<std::string, std::string> &auth_opts, bool reloading);
66 66 typedef void(*F_flashmq_auth_plugin_deinit_v1)(void *thread_data, std::unordered_map<std::string, std::string> &auth_opts, bool reloading);
67 67 typedef AuthResult(*F_flashmq_auth_plugin_acl_check_v1)(void *thread_data, AclAccess access, const std::string &clientid, const std::string &username, const FlashMQMessage &msg);
68   -typedef AuthResult(*F_flashmq_auth_plugin_login_check_v1)(void *thread_data, const std::string &username, const std::string &password);
  68 +typedef AuthResult(*F_flashmq_auth_plugin_login_check_v1)(void *thread_data, const std::string &username, const std::string &password,
  69 + const std::vector<std::pair<std::string, std::string>> *userProperties);
69 70 typedef void (*F_flashmq_auth_plugin_periodic_event)(void *thread_data);
70 71  
71 72 extern "C"
... ... @@ -152,8 +153,9 @@ public:
152 153 void securityInit(bool reloading);
153 154 void securityCleanup(bool reloading);
154 155 AuthResult aclCheck(const std::string &clientid, const std::string &username, const std::string &topic, const std::vector<std::string> &subtopics,
155   - AclAccess access, char qos, bool retain);
156   - AuthResult unPwdCheck(const std::string &username, const std::string &password);
  156 + AclAccess access, char qos, bool retain, const std::vector<std::pair<std::string, std::string>> *userProperties);
  157 + AuthResult unPwdCheck(const std::string &username, const std::string &password,
  158 + const std::vector<std::pair<std::string, std::string>> *userProperties);
157 159  
158 160 void setQuitting();
159 161 void loadMosquittoPasswordFile();
... ...
flashmq_plugin.cpp
... ... @@ -12,9 +12,11 @@ void flashmq_logf(int level, const char *str, ...)
12 12 va_end(valist);
13 13 }
14 14  
15   -FlashMQMessage::FlashMQMessage(const std::string &topic, const std::vector<std::string> &subtopics, const char qos, const bool retain) :
  15 +FlashMQMessage::FlashMQMessage(const std::string &topic, const std::vector<std::string> &subtopics, const char qos, const bool retain,
  16 + const std::vector<std::pair<std::string, std::string>> *userProperties) :
16 17 topic(topic),
17 18 subtopics(subtopics),
  19 + userProperties(userProperties),
18 20 qos(qos),
19 21 retain(retain)
20 22 {
... ...
flashmq_plugin.h
... ... @@ -16,6 +16,7 @@
16 16 #include <string>
17 17 #include <vector>
18 18 #include <unordered_map>
  19 +#include <memory>
19 20  
20 21 #define FLASHMQ_PLUGIN_VERSION 1
21 22  
... ... @@ -72,10 +73,12 @@ struct FlashMQMessage
72 73 {
73 74 const std::string &topic;
74 75 const std::vector<std::string> &subtopics;
  76 + const std::vector<std::pair<std::string, std::string>> *userProperties;
75 77 const char qos;
76 78 const bool retain;
77 79  
78   - FlashMQMessage(const std::string &topic, const std::vector<std::string> &subtopics, const char qos, const bool retain);
  80 + FlashMQMessage(const std::string &topic, const std::vector<std::string> &subtopics, const char qos, const bool retain,
  81 + const std::vector<std::pair<std::string, std::string>> *userProperties);
79 82 };
80 83  
81 84 /**
... ... @@ -177,7 +180,8 @@ void flashmq_auth_plugin_periodic_event(void *thread_data);
177 180 * Note that there is a setting 'auth_plugin_serialize_auth_checks'. Use only as a last resort if your plugin is not
178 181 * thread-safe. It will negate much of FlashMQ's multi-core model.
179 182 */
180   -AuthResult flashmq_auth_plugin_login_check(void *thread_data, const std::string &username, const std::string &password);
  183 +AuthResult flashmq_auth_plugin_login_check(void *thread_data, const std::string &username, const std::string &password,
  184 + const std::vector<std::pair<std::string, std::string>> *userProperties);
181 185  
182 186 /**
183 187 * @brief flashmq_auth_plugin_acl_check is called on publish, deliver and subscribe.
... ...
mqtt5properties.cpp
... ... @@ -2,6 +2,7 @@
2 2  
3 3 #include "cstring"
4 4 #include "vector"
  5 +#include "cassert"
5 6  
6 7 #include "exceptions.h"
7 8  
... ... @@ -38,6 +39,11 @@ void Mqtt5PropertyBuilder::clearClientSpecificBytes()
38 39 clientSpecificBytes.clear();
39 40 }
40 41  
  42 +std::shared_ptr<std::vector<std::pair<std::string, std::string>>> Mqtt5PropertyBuilder::getUserProperties() const
  43 +{
  44 + return this->userProperties;
  45 +}
  46 +
41 47 void Mqtt5PropertyBuilder::writeSessionExpiry(uint32_t val)
42 48 {
43 49 writeUint32(Mqtt5Properties::SessionExpiryInterval, val, genericBytes);
... ... @@ -103,9 +109,15 @@ void Mqtt5PropertyBuilder::writeResponseTopic(const std::string &amp;str)
103 109 writeStr(Mqtt5Properties::ResponseTopic, str);
104 110 }
105 111  
106   -void Mqtt5PropertyBuilder::writeUserProperty(const std::string &key, const std::string &value)
  112 +void Mqtt5PropertyBuilder::writeUserProperty(std::string &&key, std::string &&value)
107 113 {
108 114 write2Str(Mqtt5Properties::UserProperty, key, value);
  115 +
  116 + if (!this->userProperties)
  117 + this->userProperties = std::make_shared<std::vector<std::pair<std::string, std::string>>>();
  118 +
  119 + std::pair<std::string, std::string> pair(std::move(key), std::move(value));
  120 + this->userProperties->push_back(std::move(pair));
109 121 }
110 122  
111 123 void Mqtt5PropertyBuilder::writeCorrelationData(const std::string &correlationData)
... ... @@ -113,6 +125,15 @@ void Mqtt5PropertyBuilder::writeCorrelationData(const std::string &amp;correlationDa
113 125 writeStr(Mqtt5Properties::CorrelationData, correlationData);
114 126 }
115 127  
  128 +void Mqtt5PropertyBuilder::setNewUserProperties(const std::shared_ptr<std::vector<std::pair<std::string, std::string>>> &userProperties)
  129 +{
  130 + assert(!this->userProperties);
  131 + assert(this->genericBytes.empty());
  132 + assert(this->clientSpecificBytes.empty());
  133 +
  134 + this->userProperties = userProperties;
  135 +}
  136 +
116 137 void Mqtt5PropertyBuilder::writeUint32(Mqtt5Properties prop, const uint32_t x, std::vector<char> &target)
117 138 {
118 139 size_t pos = target.size();
... ...
mqtt5properties.h
... ... @@ -10,6 +10,7 @@ class Mqtt5PropertyBuilder
10 10 {
11 11 std::vector<char> genericBytes;
12 12 std::vector<char> clientSpecificBytes; // only relevant for publishes
  13 + std::shared_ptr<std::vector<std::pair<std::string, std::string>>> userProperties;
13 14 VariableByteInt length;
14 15  
15 16 void writeUint32(Mqtt5Properties prop, const uint32_t x, std::vector<char> &target);
... ... @@ -25,6 +26,7 @@ public:
25 26 const std::vector<char> &getGenericBytes() const;
26 27 const std::vector<char> &getclientSpecificBytes() const;
27 28 void clearClientSpecificBytes();
  29 + std::shared_ptr<std::vector<std::pair<std::string, std::string>>> getUserProperties() const;
28 30  
29 31 void writeSessionExpiry(uint32_t val);
30 32 void writeReceiveMax(uint16_t val);
... ... @@ -39,8 +41,9 @@ public:
39 41 void writePayloadFormatIndicator(uint8_t val);
40 42 void writeMessageExpiryInterval(uint32_t val);
41 43 void writeResponseTopic(const std::string &str);
42   - void writeUserProperty(const std::string &key, const std::string &value);
  44 + void writeUserProperty(std::string &&key, std::string &&value);
43 45 void writeCorrelationData(const std::string &correlationData);
  46 + void setNewUserProperties(const std::shared_ptr<std::vector<std::pair<std::string, std::string>>> &userProperties);
44 47 };
45 48  
46 49 #endif // MQTT5PROPERTIES_H
... ...
mqttpacket.cpp
... ... @@ -132,6 +132,15 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &amp;_pu
132 132  
133 133 if (protocolVersion >= ProtocolVersion::Mqtt5)
134 134 {
  135 + // Step 1: make certain properties available as objects, because FlashMQ needs access to them for internal logic.
  136 + if (_publish.propertyBuilder)
  137 + {
  138 + this->publishData.constructPropertyBuilder();
  139 + this->publishData.propertyBuilder->setNewUserProperties(_publish.propertyBuilder->getUserProperties());
  140 + }
  141 +
  142 + // Step 2: this line will make sure the whole byte array containing all properties as flat bytes is present in the 'bites' vector,
  143 + // which is sent to the subscribers.
135 144 writeProperties(_publish.propertyBuilder);
136 145 }
137 146  
... ... @@ -368,13 +377,8 @@ void MqttPacket::handleConnect()
368 377 request_problem_information = !!readByte();
369 378 break;
370 379 case Mqtt5Properties::UserProperty:
371   - {
372   - const uint16_t len = readTwoBytesToUInt16();
373   - readBytes(len);
374   - const uint16_t len2 = readTwoBytesToUInt16();
375   - readBytes(len2);
  380 + readUserProperty();
376 381 break;
377   - }
378 382 case Mqtt5Properties::AuthenticationMethod:
379 383 {
380 384 const uint16_t len = readTwoBytesToUInt16();
... ... @@ -407,7 +411,7 @@ void MqttPacket::handleConnect()
407 411 {
408 412 if (protocolVersion == ProtocolVersion::Mqtt5)
409 413 {
410   - willpublish.propertyBuilder = std::make_unique<Mqtt5PropertyBuilder>();
  414 + willpublish.constructPropertyBuilder();
411 415  
412 416 const size_t proplen = decodeVariableByteIntAtPos();
413 417 const size_t prop_end_at = pos + proplen;
... ... @@ -450,15 +454,16 @@ void MqttPacket::handleConnect()
450 454 {
451 455 const uint16_t len = readTwoBytesToUInt16();
452 456 const std::string correlationData(readBytes(len), len);
453   - publishData.propertyBuilder->writeCorrelationData(correlationData);
  457 + willpublish.propertyBuilder->writeCorrelationData(correlationData);
454 458 break;
455 459 }
456 460 case Mqtt5Properties::UserProperty:
457 461 {
458   - const uint16_t len = readTwoBytesToUInt16();
459   - readBytes(len);
460   - const uint16_t len2 = readTwoBytesToUInt16();
461   - readBytes(len2);
  462 + const uint16_t lenKey = readTwoBytesToUInt16();
  463 + std::string userPropKey(readBytes(lenKey), lenKey);
  464 + const uint16_t lenVal = readTwoBytesToUInt16();
  465 + std::string userPropVal(readBytes(lenVal), lenVal);
  466 + willpublish.propertyBuilder->writeUserProperty(std::move(userPropKey), std::move(userPropVal));
462 467 break;
463 468 }
464 469 default:
... ... @@ -554,7 +559,7 @@ void MqttPacket::handleConnect()
554 559 sender->setDisconnectReason("Invalid username character");
555 560 accessGranted = false;
556 561 }
557   - else if (authentication.unPwdCheck(username, password) == AuthResult::success)
  562 + else if (authentication.unPwdCheck(username, password, getUserProperties()) == AuthResult::success)
558 563 {
559 564 accessGranted = true;
560 565 }
... ... @@ -644,13 +649,8 @@ void MqttPacket::handleSubscribe()
644 649 decodeVariableByteIntAtPos();
645 650 break;
646 651 case Mqtt5Properties::UserProperty:
647   - {
648   - const uint16_t len = readTwoBytesToUInt16();
649   - readBytes(len);
650   - const uint16_t len2 = readTwoBytesToUInt16();
651   - readBytes(len2);
  652 + readUserProperty();
652 653 break;
653   - }
654 654 default:
655 655 throw ProtocolError("Invalid subscribe property.");
656 656 }
... ... @@ -678,7 +678,7 @@ void MqttPacket::handleSubscribe()
678 678  
679 679 std::vector<std::string> subtopics;
680 680 splitTopic(topic, subtopics);
681   - if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, subtopics, AclAccess::subscribe, qos, false) == AuthResult::success)
  681 + if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, subtopics, AclAccess::subscribe, qos, false, getUserProperties()) == AuthResult::success)
682 682 {
683 683 logger->logf(LOG_SUBSCRIBE, "Client '%s' subscribed to '%s' QoS %d", sender->repr().c_str(), topic.c_str(), qos);
684 684 sender->getThreadData()->getSubscriptionStore()->addSubscription(sender, topic, subtopics, qos);
... ... @@ -821,7 +821,7 @@ void MqttPacket::handlePublish()
821 821 const size_t prop_end_at = pos + proplen;
822 822  
823 823 if (proplen > 0)
824   - publishData.propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>();
  824 + publishData.constructPropertyBuilder();
825 825  
826 826 while (pos < prop_end_at)
827 827 {
... ... @@ -854,10 +854,10 @@ void MqttPacket::handlePublish()
854 854 case Mqtt5Properties::UserProperty:
855 855 {
856 856 const uint16_t lenKey = readTwoBytesToUInt16();
857   - const std::string userPropKey(readBytes(lenKey), lenKey);
  857 + std::string userPropKey(readBytes(lenKey), lenKey);
858 858 const uint16_t lenVal = readTwoBytesToUInt16();
859   - const std::string userPropVal(readBytes(lenVal), lenVal);
860   - publishData.propertyBuilder->writeUserProperty(userPropKey, userPropVal);
  859 + std::string userPropVal(readBytes(lenVal), lenVal);
  860 + publishData.propertyBuilder->writeUserProperty(std::move(userPropKey), std::move(userPropVal));
861 861 break;
862 862 }
863 863 case Mqtt5Properties::SubscriptionIdentifier:
... ... @@ -882,7 +882,7 @@ void MqttPacket::handlePublish()
882 882 payloadStart = pos;
883 883  
884 884 Authentication &authentication = *ThreadGlobals::getAuth();
885   - if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), publishData.topic, publishData.subtopics, AclAccess::write, qos, retain) == AuthResult::success)
  885 + if (authentication.aclCheck(sender->getClientId(), sender->getUsername(), publishData.topic, publishData.subtopics, AclAccess::write, qos, retain, getUserProperties()) == AuthResult::success)
886 886 {
887 887 if (retain)
888 888 {
... ... @@ -1176,6 +1176,26 @@ size_t MqttPacket::decodeVariableByteIntAtPos()
1176 1176 return value;
1177 1177 }
1178 1178  
  1179 +void MqttPacket::readUserProperty()
  1180 +{
  1181 + this->publishData.constructPropertyBuilder();
  1182 +
  1183 + const uint16_t len = readTwoBytesToUInt16();
  1184 + std::string key(readBytes(len), len);
  1185 + const uint16_t len2 = readTwoBytesToUInt16();
  1186 + std::string value(readBytes(len2), len2);
  1187 +
  1188 + this->publishData.propertyBuilder->writeUserProperty(std::move(key), std::move(value));
  1189 +}
  1190 +
  1191 +const std::vector<std::pair<std::string, std::string>> *MqttPacket::getUserProperties() const
  1192 +{
  1193 + if (this->publishData.propertyBuilder)
  1194 + return this->publishData.propertyBuilder->getUserProperties().get();
  1195 +
  1196 + return nullptr;
  1197 +}
  1198 +
1179 1199 bool MqttPacket::getRetain() const
1180 1200 {
1181 1201 return (first_byte & 0b00000001);
... ...
mqttpacket.h
... ... @@ -67,11 +67,12 @@ class MqttPacket
67 67 uint32_t readFourBytesToUint32();
68 68 size_t remainingAfterPos();
69 69 size_t decodeVariableByteIntAtPos();
  70 + void readUserProperty();
70 71  
71 72 void calculateRemainingLength();
72 73 void pubCommonConstruct(const uint16_t packet_id, PacketType packetType, uint8_t firstByteDefaultBits = 0);
73 74  
74   - MqttPacket(const MqttPacket &other) = default;
  75 + MqttPacket(const MqttPacket &other) = delete;
75 76 public:
76 77 PacketType packetType = PacketType::Reserved;
77 78 MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_length, std::shared_ptr<Client> &sender); // Constructor for parsing incoming packets.
... ... @@ -123,6 +124,7 @@ public:
123 124 void setRetain();
124 125 const Publish &getPublishData();
125 126 bool containsClientSpecificProperties() const;
  127 + const std::vector<std::pair<std::string, std::string>> *getUserProperties() const;
126 128 };
127 129  
128 130 #endif // MQTTPACKET_H
... ...
publishcopyfactory.cpp
... ... @@ -124,3 +124,20 @@ std::shared_ptr&lt;Client&gt; PublishCopyFactory::getSender()
124 124 return packet->getSender();
125 125 return std::shared_ptr<Client>(0);
126 126 }
  127 +
  128 +const std::vector<std::pair<std::string, std::string> > *PublishCopyFactory::getUserProperties() const
  129 +{
  130 + if (packet)
  131 + {
  132 + return packet->getUserProperties();
  133 + }
  134 +
  135 + assert(publish);
  136 +
  137 + if (publish->propertyBuilder)
  138 + {
  139 + return publish->propertyBuilder->getUserProperties().get();
  140 + }
  141 +
  142 + return nullptr;
  143 +}
... ...
publishcopyfactory.h
... ... @@ -36,6 +36,8 @@ public:
36 36 bool getRetain() const;
37 37 Publish getNewPublish() const;
38 38 std::shared_ptr<Client> getSender();
  39 + const std::vector<std::pair<std::string, std::string>> *getUserProperties() const;
  40 +
39 41 };
40 42  
41 43 #endif // PUBLISHCOPYFACTORY_H
... ...
session.cpp
... ... @@ -160,7 +160,7 @@ void Session::writePacket(PublishCopyFactory &amp;copyFactory, const char max_qos, u
160 160 Authentication *_auth = ThreadGlobals::getAuth();
161 161 assert(_auth);
162 162 Authentication &auth = *_auth;
163   - if (auth.aclCheck(client_id, username, copyFactory.getTopic(), copyFactory.getSubtopics(), AclAccess::read, effectiveQos, copyFactory.getRetain()) == AuthResult::success)
  163 + if (auth.aclCheck(client_id, username, copyFactory.getTopic(), copyFactory.getSubtopics(), AclAccess::read, effectiveQos, copyFactory.getRetain(), copyFactory.getUserProperties()) == AuthResult::success)
164 164 {
165 165 std::shared_ptr<Client> c = makeSharedClient();
166 166 if (effectiveQos == 0)
... ...
types.cpp
... ... @@ -141,6 +141,14 @@ void PublishBase::setClientSpecificProperties()
141 141 propertyBuilder->writeMessageExpiryInterval(newExpiresAfter.count());
142 142 }
143 143  
  144 +void PublishBase::constructPropertyBuilder()
  145 +{
  146 + if (this->propertyBuilder)
  147 + return;
  148 +
  149 + this->propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>();
  150 +}
  151 +
144 152 Publish::Publish(const Publish &other) :
145 153 PublishBase(other)
146 154 {
... ...
... ... @@ -212,6 +212,7 @@ public:
212 212 PublishBase(const std::string &topic, const std::string &payload, char qos);
213 213 size_t getLengthWithoutFixedHeader() const;
214 214 void setClientSpecificProperties();
  215 + void constructPropertyBuilder();
215 216 };
216 217  
217 218 class Publish : public PublishBase
... ...