Commit cd447414bb397e61e0ea293fba84a2691c8e4605

Authored by Wiebe Cazemier
1 parent 55108074

Tests for saving clientid and username for publishes

FlashMQTests/tst_maintests.cpp
@@ -934,6 +934,14 @@ void MainTests::testRetainedMessageDB() @@ -934,6 +934,14 @@ void MainTests::testRetainedMessageDB()
934 messages.emplace_back(Publish("/boe", longpayload, 1)); 934 messages.emplace_back(Publish("/boe", longpayload, 1));
935 messages.emplace_back(Publish("one", "µsdf", 1)); 935 messages.emplace_back(Publish("one", "µsdf", 1));
936 936
  937 + int clientidCount = 1;
  938 + int usernameCount = 1;
  939 + for (RetainedMessage &rm : messages)
  940 + {
  941 + rm.publish.client_id = formatString("Clientid__%d", clientidCount++);
  942 + rm.publish.username = formatString("Username__%d", usernameCount++);
  943 + }
  944 +
937 RetainedMessagesDB db("/tmp/flashmqtests_retained.db"); 945 RetainedMessagesDB db("/tmp/flashmqtests_retained.db");
938 db.openWrite(); 946 db.openWrite();
939 db.saveData(messages); 947 db.saveData(messages);
@@ -958,6 +966,11 @@ void MainTests::testRetainedMessageDB() @@ -958,6 +966,11 @@ void MainTests::testRetainedMessageDB()
958 QCOMPARE(one.publish.payload, two.publish.payload); 966 QCOMPARE(one.publish.payload, two.publish.payload);
959 QCOMPARE(one.publish.qos, two.publish.qos); 967 QCOMPARE(one.publish.qos, two.publish.qos);
960 968
  969 + QVERIFY(!two.publish.client_id.empty());
  970 + QVERIFY(!two.publish.username.empty());
  971 + QCOMPARE(two.publish.client_id, one.publish.client_id);
  972 + QCOMPARE(two.publish.username, one.publish.username);
  973 +
961 itOrg++; 974 itOrg++;
962 itLoaded++; 975 itLoaded++;
963 } 976 }
@@ -1060,6 +1073,8 @@ void MainTests::testSavingSessions() @@ -1060,6 +1073,8 @@ void MainTests::testSavingSessions()
1060 store->addSubscription(c2, topic4, subtopics, 0); 1073 store->addSubscription(c2, topic4, subtopics, 0);
1061 1074
1062 Publish publish("a/b/c", "Hello Barry", 1); 1075 Publish publish("a/b/c", "Hello Barry", 1);
  1076 + publish.client_id = "ClientIdFromFakePublisher";
  1077 + publish.username = "UsernameFromFakePublisher";
1063 1078
1064 std::shared_ptr<Session> c1ses = c1->getSession(); 1079 std::shared_ptr<Session> c1ses = c1->getSession();
1065 c1.reset(); 1080 c1.reset();
@@ -1088,7 +1103,6 @@ void MainTests::testSavingSessions() @@ -1088,7 +1103,6 @@ void MainTests::testSavingSessions()
1088 QCOMPARE(ses->nextPacketId, ses2->nextPacketId); 1103 QCOMPARE(ses->nextPacketId, ses2->nextPacketId);
1089 } 1104 }
1090 1105
1091 -  
1092 std::unordered_map<std::string, std::list<SubscriptionForSerializing>> store1Subscriptions; 1106 std::unordered_map<std::string, std::list<SubscriptionForSerializing>> store1Subscriptions;
1093 store->getSubscriptions(&store->root, "", true, store1Subscriptions); 1107 store->getSubscriptions(&store->root, "", true, store1Subscriptions);
1094 1108
@@ -1121,7 +1135,14 @@ void MainTests::testSavingSessions() @@ -1121,7 +1135,14 @@ void MainTests::testSavingSessions()
1121 1135
1122 } 1136 }
1123 1137
  1138 + std::shared_ptr<Session> loadedSes = store2->sessionsById["c1"];
  1139 + QueuedPublish queuedPublishLoaded = *loadedSes->qosPacketQueue.begin();
1124 1140
  1141 + QCOMPARE(queuedPublishLoaded.getPublish().topic, "a/b/c");
  1142 + QCOMPARE(queuedPublishLoaded.getPublish().payload, "Hello Barry");
  1143 + QCOMPARE(queuedPublishLoaded.getPublish().qos, 1);
  1144 + QCOMPARE(queuedPublishLoaded.getPublish().client_id, "ClientIdFromFakePublisher");
  1145 + QCOMPARE(queuedPublishLoaded.getPublish().username, "UsernameFromFakePublisher");
1125 } 1146 }
1126 catch (std::exception &ex) 1147 catch (std::exception &ex)
1127 { 1148 {
persistencefile.cpp
@@ -219,6 +219,12 @@ void PersistenceFile::writeUint16(const uint16_t val) @@ -219,6 +219,12 @@ void PersistenceFile::writeUint16(const uint16_t val)
219 writeCheck(buf, 1, 2, f); 219 writeCheck(buf, 1, 2, f);
220 } 220 }
221 221
  222 +void PersistenceFile::writeString(const std::string &s)
  223 +{
  224 + writeUint32(s.size());
  225 + writeCheck(s.c_str(), 1, s.size(), f);
  226 +}
  227 +
222 int64_t PersistenceFile::readInt64(bool &eofFound) 228 int64_t PersistenceFile::readInt64(bool &eofFound)
223 { 229 {
224 if (readCheck(buf.data(), 1, 8, f) < 0) 230 if (readCheck(buf.data(), 1, 8, f) < 0)
@@ -253,6 +259,19 @@ uint16_t PersistenceFile::readUint16(bool &amp;eofFound) @@ -253,6 +259,19 @@ uint16_t PersistenceFile::readUint16(bool &amp;eofFound)
253 return val; 259 return val;
254 } 260 }
255 261
  262 +std::string PersistenceFile::readString(bool &eofFound)
  263 +{
  264 + const uint32_t size = readUint32(eofFound);
  265 +
  266 + if (size > 0xFFFF)
  267 + throw std::runtime_error("In MQTT world, strings are never longer than 65535 bytes.");
  268 +
  269 + makeSureBufSize(size);
  270 + readCheck(buf.data(), 1, size, f);
  271 + std::string result(buf.data(), size);
  272 + return result;
  273 +}
  274 +
256 /** 275 /**
257 * @brief RetainedMessagesDB::openWrite doesn't explicitely name a file version (v1, etc), because we always write the current definition. 276 * @brief RetainedMessagesDB::openWrite doesn't explicitely name a file version (v1, etc), because we always write the current definition.
258 */ 277 */
persistencefile.h
@@ -76,9 +76,11 @@ protected: @@ -76,9 +76,11 @@ protected:
76 void writeInt64(const int64_t val); 76 void writeInt64(const int64_t val);
77 void writeUint32(const uint32_t val); 77 void writeUint32(const uint32_t val);
78 void writeUint16(const uint16_t val); 78 void writeUint16(const uint16_t val);
  79 + void writeString(const std::string &s);
79 int64_t readInt64(bool &eofFound); 80 int64_t readInt64(bool &eofFound);
80 uint32_t readUint32(bool &eofFound); 81 uint32_t readUint32(bool &eofFound);
81 uint16_t readUint16(bool &eofFound); 82 uint16_t readUint16(bool &eofFound);
  83 + std::string readString(bool &eofFound);
82 84
83 public: 85 public:
84 PersistenceFile(const std::string &filePath); 86 PersistenceFile(const std::string &filePath);
retainedmessagesdb.cpp
@@ -88,6 +88,8 @@ void RetainedMessagesDB::saveData(const std::vector&lt;RetainedMessage&gt; &amp;messages) @@ -88,6 +88,8 @@ void RetainedMessagesDB::saveData(const std::vector&lt;RetainedMessage&gt; &amp;messages)
88 88
89 writeUint16(pack.getFixedHeaderLength()); 89 writeUint16(pack.getFixedHeaderLength());
90 writeUint32(packSize); 90 writeUint32(packSize);
  91 + writeString(pcopy.client_id);
  92 + writeString(pcopy.username);
91 writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f); 93 writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f);
92 } 94 }
93 95
@@ -136,6 +138,9 @@ std::list&lt;RetainedMessage&gt; RetainedMessagesDB::readDataV2() @@ -136,6 +138,9 @@ std::list&lt;RetainedMessage&gt; RetainedMessagesDB::readDataV2()
136 const uint16_t fixed_header_length = readUint16(eofFound); 138 const uint16_t fixed_header_length = readUint16(eofFound);
137 const uint32_t packlen = readUint32(eofFound); 139 const uint32_t packlen = readUint32(eofFound);
138 140
  141 + const std::string client_id = readString(eofFound);
  142 + const std::string username = readString(eofFound);
  143 +
139 if (eofFound) 144 if (eofFound)
140 continue; 145 continue;
141 146
@@ -149,6 +154,9 @@ std::list&lt;RetainedMessage&gt; RetainedMessagesDB::readDataV2() @@ -149,6 +154,9 @@ std::list&lt;RetainedMessage&gt; RetainedMessagesDB::readDataV2()
149 pack.parsePublishData(); 154 pack.parsePublishData();
150 Publish pub(pack.getPublishData()); 155 Publish pub(pack.getPublishData());
151 156
  157 + pub.client_id = client_id;
  158 + pub.username = username;
  159 +
152 RetainedMessage msg(pub); 160 RetainedMessage msg(pub);
153 logger->logf(LOG_DEBUG, "Loading retained message for topic '%s' QoS %d.", msg.publish.topic.c_str(), msg.publish.qos); 161 logger->logf(LOG_DEBUG, "Loading retained message for topic '%s' QoS %d.", msg.publish.topic.c_str(), msg.publish.qos);
154 messages.push_back(std::move(msg)); 162 messages.push_back(std::move(msg));
sessionsandsubscriptionsdb.cpp
@@ -115,6 +115,8 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -115,6 +115,8 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
115 const uint16_t id = readUint16(eofFound); 115 const uint16_t id = readUint16(eofFound);
116 const uint32_t originalPubAge = readUint32(eofFound); 116 const uint32_t originalPubAge = readUint32(eofFound);
117 const uint32_t packlen = readUint32(eofFound); 117 const uint32_t packlen = readUint32(eofFound);
  118 + const std::string sender_clientid = readString(eofFound);
  119 + const std::string sender_username = readString(eofFound);
118 120
119 assert(id > 0); 121 assert(id > 0);
120 122
@@ -128,6 +130,9 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -128,6 +130,9 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
128 pack.parsePublishData(); 130 pack.parsePublishData();
129 Publish pub(pack.getPublishData()); 131 Publish pub(pack.getPublishData());
130 132
  133 + pub.client_id = sender_clientid;
  134 + pub.username = sender_username;
  135 +
131 const uint32_t newPubAge = persistence_state_age + originalPubAge; 136 const uint32_t newPubAge = persistence_state_age + originalPubAge;
132 pub.createdAt = timepointFromAge(newPubAge); 137 pub.createdAt = timepointFromAge(newPubAge);
133 138
@@ -175,6 +180,8 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -175,6 +180,8 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
175 const uint32_t originalWillQueueAge = readUint32(eofFound); 180 const uint32_t originalWillQueueAge = readUint32(eofFound);
176 const uint32_t newWillDelayAfterMaybeAlreadyBeingQueued = originalWillQueueAge < originalWillDelay ? originalWillDelay - originalWillQueueAge : 0; 181 const uint32_t newWillDelayAfterMaybeAlreadyBeingQueued = originalWillQueueAge < originalWillDelay ? originalWillDelay - originalWillQueueAge : 0;
177 const uint32_t packlen = readUint32(eofFound); 182 const uint32_t packlen = readUint32(eofFound);
  183 + const std::string sender_clientid = readString(eofFound);
  184 + const std::string sender_username = readString(eofFound);
178 185
179 const uint32_t stateAgecompensatedWillDelay = 186 const uint32_t stateAgecompensatedWillDelay =
180 persistence_state_age > newWillDelayAfterMaybeAlreadyBeingQueued ? 0 : newWillDelayAfterMaybeAlreadyBeingQueued - persistence_state_age; 187 persistence_state_age > newWillDelayAfterMaybeAlreadyBeingQueued ? 0 : newWillDelayAfterMaybeAlreadyBeingQueued - persistence_state_age;
@@ -189,6 +196,9 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2() @@ -189,6 +196,9 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV2()
189 WillPublish willPublish = publishpack.getPublishData(); 196 WillPublish willPublish = publishpack.getPublishData();
190 willPublish.will_delay = stateAgecompensatedWillDelay; 197 willPublish.will_delay = stateAgecompensatedWillDelay;
191 198
  199 + willPublish.client_id = sender_clientid;
  200 + willPublish.username = sender_username;
  201 +
192 ses->setWill(std::move(willPublish)); 202 ses->setWill(std::move(willPublish));
193 } 203 }
194 } 204 }
@@ -288,6 +298,8 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -288,6 +298,8 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
288 writeUint16(p.getPacketId()); 298 writeUint16(p.getPacketId());
289 writeUint32(pubAge); 299 writeUint32(pubAge);
290 writeUint32(packSize); 300 writeUint32(packSize);
  301 + writeString(pub.client_id);
  302 + writeString(pub.username);
291 writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f); 303 writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f);
292 } 304 }
293 305
@@ -333,6 +345,8 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess @@ -333,6 +345,8 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector&lt;std::unique_ptr&lt;Sess
333 writeUint32(will.will_delay); 345 writeUint32(will.will_delay);
334 writeUint32(will.getQueuedAtAge()); 346 writeUint32(will.getQueuedAtAge());
335 writeUint32(packSize); 347 writeUint32(packSize);
  348 + writeString(will.client_id);
  349 + writeString(will.username);
336 writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f); 350 writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f);
337 } 351 }
338 } 352 }