Commit 5c74f9737a8d35b437ea1b7df888341c59c9f505

Authored by Wiebe Cazemier
1 parent c9f3af29

Topic splitting with SSE instructions

CMakeLists.txt
@@ -7,7 +7,7 @@ add_definitions(-DOPENSSL_API_COMPAT=0x10100000L) @@ -7,7 +7,7 @@ add_definitions(-DOPENSSL_API_COMPAT=0x10100000L)
7 set(CMAKE_CXX_STANDARD 11) 7 set(CMAKE_CXX_STANDARD 11)
8 set(CMAKE_CXX_STANDARD_REQUIRED ON) 8 set(CMAKE_CXX_STANDARD_REQUIRED ON)
9 9
10 -SET(CMAKE_CXX_FLAGS "-rdynamic") 10 +SET(CMAKE_CXX_FLAGS "-rdynamic -msse4.2")
11 11
12 add_compile_options(-Wall) 12 add_compile_options(-Wall)
13 13
FlashMQTests/FlashMQTests.pro
@@ -79,3 +79,4 @@ HEADERS += \ @@ -79,3 +79,4 @@ HEADERS += \
79 LIBS += -ldl -lssl -lcrypto 79 LIBS += -ldl -lssl -lcrypto
80 80
81 QMAKE_LFLAGS += -rdynamic 81 QMAKE_LFLAGS += -rdynamic
  82 +QMAKE_CXXFLAGS += -msse4.2
FlashMQTests/tst_maintests.cpp
@@ -26,6 +26,7 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>. @@ -26,6 +26,7 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>.
26 #include "mainapp.h" 26 #include "mainapp.h"
27 #include "mainappthread.h" 27 #include "mainappthread.h"
28 #include "twoclienttestcontext.h" 28 #include "twoclienttestcontext.h"
  29 +#include "threaddata.h"
29 30
30 // Dumb Qt version gives warnings when comparing uint with number literal. 31 // Dumb Qt version gives warnings when comparing uint with number literal.
31 template <typename T1, typename T2> 32 template <typename T1, typename T2>
@@ -75,6 +76,8 @@ private slots: @@ -75,6 +76,8 @@ private slots:
75 void test_acl_patterns_username(); 76 void test_acl_patterns_username();
76 void test_acl_patterns_clientid(); 77 void test_acl_patterns_clientid();
77 78
  79 + void test_sse_split();
  80 +
78 }; 81 };
79 82
80 MainTests::MainTests() 83 MainTests::MainTests()
@@ -587,6 +590,31 @@ void MainTests::test_acl_patterns_clientid() @@ -587,6 +590,31 @@ void MainTests::test_acl_patterns_clientid()
587 QCOMPARE(aclTree.findPermission(splitToVector("d/clientid_one/f/A/B", '/'), AclGrant::Read, "foo", "clientid_one"), AuthResult::success); 590 QCOMPARE(aclTree.findPermission(splitToVector("d/clientid_one/f/A/B", '/'), AclGrant::Read, "foo", "clientid_one"), AuthResult::success);
588 } 591 }
589 592
  593 +void MainTests::test_sse_split()
  594 +{
  595 + std::shared_ptr<SubscriptionStore> store(new SubscriptionStore);
  596 + std::shared_ptr<Settings> settings(new Settings);
  597 + ThreadData data(0, store, settings);
  598 +
  599 + std::list<std::string> topics;
  600 + topics.push_back("one/two/threeabcasdfasdf/koe");
  601 + topics.push_back("/two/threeabcasdfasdf/koe"); // Test empty component.
  602 + topics.push_back("//two/threeabcasdfasdf/koe"); // Test two empty components.
  603 + topics.push_back("//1234567890abcde/bla/koe"); // Test two empty components, 15 char topic (one byte short of 16 alignment).
  604 + topics.push_back("//1234567890abcdef/bla/koe"); // Test two empty components, 16 char topic
  605 + topics.push_back("//1234567890abcdefg/bla/koe"); // Test two empty components, 17 char topic
  606 + topics.push_back("//1234567890abcdefg/1234567890abcdefg/koe"); // Test two empty components, two 17 char topics
  607 + topics.push_back("//1234567890abcdef/1234567890abcdefg/koe"); // Test two empty components, 16 and 17 char
  608 + topics.push_back("//1234567890abcdef/1234567890abcdefg/koe/");
  609 + topics.push_back("//1234567890abcdef/1234567890abcdefg/koe//");
  610 + topics.push_back("//1234567890abcdef/1234567890abcdef/");
  611 +
  612 + for (const std::string &t : topics)
  613 + {
  614 + QCOMPARE(*data.splitTopic(t), splitToVector(t, '/'));
  615 + }
  616 +}
  617 +
590 QTEST_GUILESS_MAIN(MainTests) 618 QTEST_GUILESS_MAIN(MainTests)
591 619
592 #include "tst_maintests.moc" 620 #include "tst_maintests.moc"
mqttpacket.cpp
@@ -102,7 +102,7 @@ MqttPacket::MqttPacket(const Publish &amp;publish) : @@ -102,7 +102,7 @@ MqttPacket::MqttPacket(const Publish &amp;publish) :
102 } 102 }
103 103
104 this->topic = publish.topic; 104 this->topic = publish.topic;
105 - this->subtopics = splitToVector(publish.topic, '/'); 105 + this->subtopics = sender->getThreadData()->splitTopic(this->topic);
106 106
107 packetType = PacketType::PUBLISH; 107 packetType = PacketType::PUBLISH;
108 this->qos = publish.qos; 108 this->qos = publish.qos;
@@ -439,7 +439,7 @@ void MqttPacket::handlePublish() @@ -439,7 +439,7 @@ void MqttPacket::handlePublish()
439 throw ProtocolError("Duplicate flag is set for QoS 0 packet. This is illegal."); 439 throw ProtocolError("Duplicate flag is set for QoS 0 packet. This is illegal.");
440 440
441 topic = std::string(readBytes(variable_header_length), variable_header_length); 441 topic = std::string(readBytes(variable_header_length), variable_header_length);
442 - subtopics = splitToVector(topic, '/'); 442 + subtopics = sender->getThreadData()->splitTopic(topic);
443 443
444 if (!isValidUtf8(topic, true)) 444 if (!isValidUtf8(topic, true))
445 { 445 {
@@ -464,7 +464,7 @@ void MqttPacket::handlePublish() @@ -464,7 +464,7 @@ void MqttPacket::handlePublish()
464 sender->writeMqttPacket(response); 464 sender->writeMqttPacket(response);
465 } 465 }
466 466
467 - if (sender->getThreadData()->authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, subtopics, AclAccess::write) == AuthResult::success) 467 + if (sender->getThreadData()->authentication.aclCheck(sender->getClientId(), sender->getUsername(), topic, *subtopics, AclAccess::write) == AuthResult::success)
468 { 468 {
469 if (retain) 469 if (retain)
470 { 470 {
@@ -479,7 +479,7 @@ void MqttPacket::handlePublish() @@ -479,7 +479,7 @@ void MqttPacket::handlePublish()
479 bites[0] &= 0b11110110; 479 bites[0] &= 0b11110110;
480 480
481 // For the existing clients, we can just write the same packet back out, with our small alterations. 481 // For the existing clients, we can just write the same packet back out, with our small alterations.
482 - sender->getThreadData()->getSubscriptionStore()->queuePacketAtSubscribers(subtopics, *this); 482 + sender->getThreadData()->getSubscriptionStore()->queuePacketAtSubscribers(*subtopics, *this);
483 } 483 }
484 } 484 }
485 485
@@ -569,7 +569,7 @@ const std::string &amp;MqttPacket::getTopic() const @@ -569,7 +569,7 @@ const std::string &amp;MqttPacket::getTopic() const
569 return this->topic; 569 return this->topic;
570 } 570 }
571 571
572 -const std::vector<std::string> &MqttPacket::getSubtopics() const 572 +const std::vector<std::string> *MqttPacket::getSubtopics() const
573 { 573 {
574 return this->subtopics; 574 return this->subtopics;
575 } 575 }
mqttpacket.h
@@ -44,7 +44,7 @@ public: @@ -44,7 +44,7 @@ public:
44 class MqttPacket 44 class MqttPacket
45 { 45 {
46 std::string topic; 46 std::string topic;
47 - std::vector<std::string> subtopics; 47 + std::vector<std::string> *subtopics; // comes from local thread storage. See std::vector<std::string> *ThreadData::splitTopic(std::string &topic)
48 std::vector<char> bites; 48 std::vector<char> bites;
49 size_t fixed_header_length = 0; // if 0, this packet does not contain the bytes of the fixed header. 49 size_t fixed_header_length = 0; // if 0, this packet does not contain the bytes of the fixed header.
50 RemainingLength remainingLength; 50 RemainingLength remainingLength;
@@ -94,7 +94,7 @@ public: @@ -94,7 +94,7 @@ public:
94 const std::vector<char> &getBites() const { return bites; } 94 const std::vector<char> &getBites() const { return bites; }
95 char getQos() const { return qos; } 95 char getQos() const { return qos; }
96 const std::string &getTopic() const; 96 const std::string &getTopic() const;
97 - const std::vector<std::string> &getSubtopics() const; 97 + const std::vector<std::string> *getSubtopics() const;
98 std::shared_ptr<Client> getSender() const; 98 std::shared_ptr<Client> getSender() const;
99 void setSender(const std::shared_ptr<Client> &value); 99 void setSender(const std::shared_ptr<Client> &value);
100 bool containsFixedHeader() const; 100 bool containsFixedHeader() const;
session.cpp
@@ -52,7 +52,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos) @@ -52,7 +52,7 @@ void Session::writePacket(const MqttPacket &amp;packet, char max_qos)
52 { 52 {
53 assert(max_qos <= 2); 53 assert(max_qos <= 2);
54 54
55 - if (thread->authentication.aclCheck(client_id, username, packet.getTopic(), packet.getSubtopics(), AclAccess::read) == AuthResult::success) 55 + if (thread->authentication.aclCheck(client_id, username, packet.getTopic(), *packet.getSubtopics(), AclAccess::read) == AuthResult::success)
56 { 56 {
57 const char qos = std::min<char>(packet.getQos(), max_qos); 57 const char qos = std::min<char>(packet.getQos(), max_qos);
58 58
threaddata.cpp
@@ -19,8 +19,12 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -19,8 +19,12 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
19 #include <string> 19 #include <string>
20 #include <sstream> 20 #include <sstream>
21 21
  22 +#define TOPIC_MEMORY_LENGTH 65560
  23 +
22 ThreadData::ThreadData(int threadnr, std::shared_ptr<SubscriptionStore> &subscriptionStore, std::shared_ptr<Settings> settings) : 24 ThreadData::ThreadData(int threadnr, std::shared_ptr<SubscriptionStore> &subscriptionStore, std::shared_ptr<Settings> settings) :
23 subscriptionStore(subscriptionStore), 25 subscriptionStore(subscriptionStore),
  26 + subtopicParseMem(TOPIC_MEMORY_LENGTH),
  27 + topicCopy(TOPIC_MEMORY_LENGTH),
24 settingsLocalCopy(*settings.get()), 28 settingsLocalCopy(*settings.get()),
25 authentication(settingsLocalCopy), 29 authentication(settingsLocalCopy),
26 threadnr(threadnr) 30 threadnr(threadnr)
@@ -155,6 +159,45 @@ void ThreadData::queuePasswdFileReload() @@ -155,6 +159,45 @@ void ThreadData::queuePasswdFileReload()
155 wakeUpThread(); 159 wakeUpThread();
156 } 160 }
157 161
  162 +/**
  163 + * @brief ThreadData::splitTopic uses SSE4.2 to detect the '/' chars, 16 chars at a time, and returns a pointer to thread-local memory.
  164 + * @param topic string is altered: some extra space is reserved.
  165 + * @return Pointer to thread-owned vector of subtopics.
  166 + *
  167 + * Because it returns a pointer to the thread-local vector, only the current thread should touch it.
  168 + */
  169 +std::vector<std::string> *ThreadData::splitTopic(const std::string &topic)
  170 +{
  171 + subtopics.clear();
  172 +
  173 + const int s = topic.size();
  174 + std::memcpy(topicCopy.data(), topic.c_str(), s+1);
  175 + std::memset(&topicCopy.data()[s], 0, 16);
  176 + int n = 0;
  177 + int carryi = 0;
  178 + while (n <= s)
  179 + {
  180 + const char *i = &topicCopy.data()[n];
  181 + __m128i loaded = _mm_loadu_si128((__m128i*)i);
  182 +
  183 + int len_left = s - n;
  184 + int index = _mm_cmpestri(slashes, 1, loaded, len_left, 0);
  185 + std::memcpy(&subtopicParseMem[carryi], i, index);
  186 + carryi += std::min<int>(index, len_left);
  187 +
  188 + n += index;
  189 +
  190 + if (index < 16 || n >= s)
  191 + {
  192 + subtopics.emplace_back(subtopicParseMem.data(), carryi);
  193 + carryi = 0;
  194 + n++;
  195 + }
  196 + }
  197 +
  198 + return &subtopics;
  199 +}
  200 +
158 // TODO: profile how fast hash iteration is. Perhaps having a second list/vector is beneficial? 201 // TODO: profile how fast hash iteration is. Perhaps having a second list/vector is beneficial?
159 void ThreadData::doKeepAliveCheck() 202 void ThreadData::doKeepAliveCheck()
160 { 203 {
threaddata.h
@@ -18,6 +18,8 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -18,6 +18,8 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
18 #ifndef THREADDATA_H 18 #ifndef THREADDATA_H
19 #define THREADDATA_H 19 #define THREADDATA_H
20 20
  21 +#include <immintrin.h>
  22 +
21 #include <thread> 23 #include <thread>
22 24
23 #include <sys/epoll.h> 25 #include <sys/epoll.h>
@@ -47,6 +49,12 @@ class ThreadData @@ -47,6 +49,12 @@ class ThreadData
47 std::shared_ptr<SubscriptionStore> subscriptionStore; 49 std::shared_ptr<SubscriptionStore> subscriptionStore;
48 Logger *logger; 50 Logger *logger;
49 51
  52 + // Topic parsing working memory
  53 + std::vector<std::string> subtopics;
  54 + std::vector<char> subtopicParseMem;
  55 + std::vector<char> topicCopy;
  56 + __m128i slashes = _mm_set1_epi8('/');
  57 +
50 void reload(std::shared_ptr<Settings> settings); 58 void reload(std::shared_ptr<Settings> settings);
51 void wakeUpThread(); 59 void wakeUpThread();
52 void doKeepAliveCheck(); 60 void doKeepAliveCheck();
@@ -82,6 +90,8 @@ public: @@ -82,6 +90,8 @@ public:
82 void waitForQuit(); 90 void waitForQuit();
83 void queuePasswdFileReload(); 91 void queuePasswdFileReload();
84 92
  93 + std::vector<std::string> *splitTopic(const std::string &topic);
  94 +
85 }; 95 };
86 96
87 #endif // THREADDATA_H 97 #endif // THREADDATA_H