Commit f262b796403efae22fb59218a6af98c75c1670c1

Authored by Wiebe Cazemier
1 parent 7775fe41

Separate CONNECT packet parsing and handling

This is necessary for the test client I have in mind, so I can re-use
this code in that new test client which has no MQTT behavior, but just
returns packets (meaning I have to be able to parse them without
initiating handling).
CMakeLists.txt
@@ -63,6 +63,7 @@ add_executable(FlashMQ @@ -63,6 +63,7 @@ add_executable(FlashMQ
63 mqtt5properties.h 63 mqtt5properties.h
64 globalstats.h 64 globalstats.h
65 derivablecounter.h 65 derivablecounter.h
  66 + packetdatatypes.h
66 67
67 mainapp.cpp 68 mainapp.cpp
68 main.cpp 69 main.cpp
@@ -105,6 +106,7 @@ add_executable(FlashMQ @@ -105,6 +106,7 @@ add_executable(FlashMQ
105 mqtt5properties.cpp 106 mqtt5properties.cpp
106 globalstats.cpp 107 globalstats.cpp
107 derivablecounter.cpp 108 derivablecounter.cpp
  109 + packetdatatypes.cpp
108 110
109 ) 111 )
110 112
FlashMQTests/FlashMQTests.pro
@@ -54,6 +54,7 @@ SOURCES += tst_maintests.cpp \ @@ -54,6 +54,7 @@ SOURCES += tst_maintests.cpp \
54 ../mqtt5properties.cpp \ 54 ../mqtt5properties.cpp \
55 ../globalstats.cpp \ 55 ../globalstats.cpp \
56 ../derivablecounter.cpp \ 56 ../derivablecounter.cpp \
  57 + ../packetdatatypes.cpp \
57 mainappthread.cpp \ 58 mainappthread.cpp \
58 twoclienttestcontext.cpp 59 twoclienttestcontext.cpp
59 60
@@ -100,6 +101,7 @@ HEADERS += \ @@ -100,6 +101,7 @@ HEADERS += \
100 ../mqtt5properties.h \ 101 ../mqtt5properties.h \
101 ../globalstats.h \ 102 ../globalstats.h \
102 ../derivablecounter.h \ 103 ../derivablecounter.h \
  104 + ../packetdatatypes.h \
103 mainappthread.h \ 105 mainappthread.h \
104 twoclienttestcontext.h 106 twoclienttestcontext.h
105 107
mqttpacket.cpp
@@ -321,52 +321,40 @@ void MqttPacket::handle() @@ -321,52 +321,40 @@ void MqttPacket::handle()
321 handleExtendedAuth(); 321 handleExtendedAuth();
322 } 322 }
323 323
324 -void MqttPacket::handleConnect() 324 +ConnectData MqttPacket::parseConnectData()
325 { 325 {
326 - if (sender->hasConnectPacketSeen())  
327 - throw ProtocolError("Client already sent a CONNECT.", ReasonCodes::ProtocolError); 326 + if (this->packetType != PacketType::CONNECT)
  327 + throw std::runtime_error("Packet must be connect packet.");
328 328
329 - std::shared_ptr<SubscriptionStore> subscriptionStore = MainApp::getMainApp()->getSubscriptionStore(); 329 + setPosToDataStart();
330 330
331 - ThreadGlobals::getThreadData()->mqttConnectCounter.inc(); 331 + ConnectData result;
332 332
333 uint16_t variable_header_length = readTwoBytesToUInt16(); 333 uint16_t variable_header_length = readTwoBytesToUInt16();
334 334
335 - const Settings &settings = *ThreadGlobals::getSettings();  
336 -  
337 if (!(variable_header_length == 4 || variable_header_length == 6)) 335 if (!(variable_header_length == 4 || variable_header_length == 6))
338 { 336 {
339 throw ProtocolError("Invalid variable header length. Garbage?", ReasonCodes::MalformedPacket); 337 throw ProtocolError("Invalid variable header length. Garbage?", ReasonCodes::MalformedPacket);
340 } 338 }
341 339
  340 + const Settings &settings = *ThreadGlobals::getSettings();
  341 +
342 char *c = readBytes(variable_header_length); 342 char *c = readBytes(variable_header_length);
343 std::string magic_marker(c, variable_header_length); 343 std::string magic_marker(c, variable_header_length);
344 344
345 - char protocol_level = readByte(); 345 + result.protocol_level_byte = readByte();
346 346
347 if (magic_marker == "MQTT") 347 if (magic_marker == "MQTT")
348 { 348 {
349 - if (protocol_level == 0x04) 349 + if (result.protocol_level_byte == 0x04)
350 protocolVersion = ProtocolVersion::Mqtt311; 350 protocolVersion = ProtocolVersion::Mqtt311;
351 - if (protocol_level == 0x05) 351 + if (result.protocol_level_byte == 0x05)
352 protocolVersion = ProtocolVersion::Mqtt5; 352 protocolVersion = ProtocolVersion::Mqtt5;
353 } 353 }
354 - else if (magic_marker == "MQIsdp" && protocol_level == 0x03) 354 + else if (magic_marker == "MQIsdp" && result.protocol_level_byte == 0x03)
355 { 355 {
356 protocolVersion = ProtocolVersion::Mqtt31; 356 protocolVersion = ProtocolVersion::Mqtt31;
357 } 357 }
358 - else  
359 - {  
360 - // The specs are unclear when to use the version 3 codes or version 5 codes.  
361 - ProtocolVersion fuzzyProtocolVersion = protocol_level < 0x05 ? ProtocolVersion::Mqtt31 : ProtocolVersion::Mqtt5;  
362 -  
363 - ConnAck connAck(fuzzyProtocolVersion, ReasonCodes::UnsupportedProtocolVersion);  
364 - MqttPacket response(connAck);  
365 - sender->setReadyForDisconnect();  
366 - sender->writeMqttPacket(response);  
367 - logger->logf(LOG_ERR, "Rejecting because of invalid protocol version: %s", sender->repr().c_str());  
368 - return;  
369 - }  
370 358
371 char flagByte = readByte(); 359 char flagByte = readByte();
372 bool reserved = !!(flagByte & 0b00000001); 360 bool reserved = !!(flagByte & 0b00000001);
@@ -374,32 +362,21 @@ void MqttPacket::handleConnect() @@ -374,32 +362,21 @@ void MqttPacket::handleConnect()
374 if (reserved) 362 if (reserved)
375 throw ProtocolError("Protocol demands reserved flag in CONNECT is 0", ReasonCodes::MalformedPacket); 363 throw ProtocolError("Protocol demands reserved flag in CONNECT is 0", ReasonCodes::MalformedPacket);
376 364
  365 + result.user_name_flag = !!(flagByte & 0b10000000);
  366 + result.password_flag = !!(flagByte & 0b01000000);
  367 + result.will_retain = !!(flagByte & 0b00100000);
  368 + result.will_qos = (flagByte & 0b00011000) >> 3;
  369 + result.will_flag = !!(flagByte & 0b00000100);
  370 + result.clean_start = !!(flagByte & 0b00000010);
377 371
378 - bool user_name_flag = !!(flagByte & 0b10000000);  
379 - bool password_flag = !!(flagByte & 0b01000000);  
380 - bool will_retain = !!(flagByte & 0b00100000);  
381 - char will_qos = (flagByte & 0b00011000) >> 3;  
382 - bool will_flag = !!(flagByte & 0b00000100);  
383 - bool clean_start = !!(flagByte & 0b00000010);  
384 -  
385 - if (will_qos > 2) 372 + if (result.will_qos > 2)
386 throw ProtocolError("Invalid QoS for will.", ReasonCodes::MalformedPacket); 373 throw ProtocolError("Invalid QoS for will.", ReasonCodes::MalformedPacket);
387 374
388 - uint16_t keep_alive = readTwoBytesToUInt16();  
389 -  
390 - uint16_t client_receive_max = settings.maxQosMsgPendingPerClient;  
391 - uint32_t session_expire = settings.getExpireSessionAfterSeconds();  
392 - uint32_t max_outgoing_packet_size = settings.maxPacketSize;  
393 - uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases  
394 - bool request_response_information = false;  
395 - bool request_problem_information = false;  
396 -  
397 - std::string authenticationMethod;  
398 - std::string authenticationData; 375 + result.keep_alive = readTwoBytesToUInt16();
399 376
400 if (protocolVersion == ProtocolVersion::Mqtt5) 377 if (protocolVersion == ProtocolVersion::Mqtt5)
401 { 378 {
402 - keep_alive = std::max<uint16_t>(keep_alive, 5); 379 + result.keep_alive = std::max<uint16_t>(result.keep_alive, 5);
403 380
404 const size_t proplen = decodeVariableByteIntAtPos(); 381 const size_t proplen = decodeVariableByteIntAtPos();
405 const size_t prop_end_at = pos + proplen; 382 const size_t prop_end_at = pos + proplen;
@@ -411,36 +388,34 @@ void MqttPacket::handleConnect() @@ -411,36 +388,34 @@ void MqttPacket::handleConnect()
411 switch (prop) 388 switch (prop)
412 { 389 {
413 case Mqtt5Properties::SessionExpiryInterval: 390 case Mqtt5Properties::SessionExpiryInterval:
414 - session_expire = std::min<uint32_t>(readFourBytesToUint32(), session_expire); 391 + result.session_expire = std::min<uint32_t>(readFourBytesToUint32(), result.session_expire);
415 break; 392 break;
416 case Mqtt5Properties::ReceiveMaximum: 393 case Mqtt5Properties::ReceiveMaximum:
417 - client_receive_max = std::min<int16_t>(readTwoBytesToUInt16(), client_receive_max); 394 + result.client_receive_max = std::min<int16_t>(readTwoBytesToUInt16(), result.client_receive_max);
418 break; 395 break;
419 case Mqtt5Properties::MaximumPacketSize: 396 case Mqtt5Properties::MaximumPacketSize:
420 - max_outgoing_packet_size = std::min<uint32_t>(readFourBytesToUint32(), max_outgoing_packet_size); 397 + result.max_outgoing_packet_size = std::min<uint32_t>(readFourBytesToUint32(), result.max_outgoing_packet_size);
421 break; 398 break;
422 case Mqtt5Properties::TopicAliasMaximum: 399 case Mqtt5Properties::TopicAliasMaximum:
423 - max_outgoing_topic_aliases = std::min<uint16_t>(readTwoBytesToUInt16(), settings.maxOutgoingTopicAliasValue); 400 + result.max_outgoing_topic_aliases = std::min<uint16_t>(readTwoBytesToUInt16(), settings.maxOutgoingTopicAliasValue);
424 break; 401 break;
425 case Mqtt5Properties::RequestResponseInformation: 402 case Mqtt5Properties::RequestResponseInformation:
426 - request_response_information = !!readByte();  
427 - UNUSED(request_response_information); 403 + result.request_response_information = !!readByte();
428 break; 404 break;
429 case Mqtt5Properties::RequestProblemInformation: 405 case Mqtt5Properties::RequestProblemInformation:
430 - request_problem_information = !!readByte();  
431 - UNUSED(request_problem_information); 406 + result.request_problem_information = !!readByte();
432 break; 407 break;
433 case Mqtt5Properties::UserProperty: 408 case Mqtt5Properties::UserProperty:
434 readUserProperty(); 409 readUserProperty();
435 break; 410 break;
436 case Mqtt5Properties::AuthenticationMethod: 411 case Mqtt5Properties::AuthenticationMethod:
437 { 412 {
438 - authenticationMethod = readBytesToString(); 413 + result.authenticationMethod = readBytesToString();
439 break; 414 break;
440 } 415 }
441 case Mqtt5Properties::AuthenticationData: 416 case Mqtt5Properties::AuthenticationData:
442 { 417 {
443 - authenticationData = readBytesToString(false); 418 + result.authenticationData = readBytesToString(false);
444 break; 419 break;
445 } 420 }
446 default: 421 default:
@@ -449,25 +424,21 @@ void MqttPacket::handleConnect() @@ -449,25 +424,21 @@ void MqttPacket::handleConnect()
449 } 424 }
450 } 425 }
451 426
452 - if (client_receive_max == 0 || max_outgoing_packet_size == 0) 427 + if (result.client_receive_max == 0 || result.max_outgoing_packet_size == 0)
453 { 428 {
454 throw ProtocolError("Receive max or max outgoing packet size can't be 0.", ReasonCodes::ProtocolError); 429 throw ProtocolError("Receive max or max outgoing packet size can't be 0.", ReasonCodes::ProtocolError);
455 } 430 }
456 431
457 - std::string client_id = readBytesToString();  
458 -  
459 - std::string username;  
460 - std::string password; 432 + result.client_id = readBytesToString();
461 433
462 - WillPublish willpublish;  
463 - willpublish.qos = will_qos;  
464 - willpublish.retain = will_retain; 434 + result.willpublish.qos = result.will_qos;
  435 + result.willpublish.retain = result.will_retain;
465 436
466 - if (will_flag) 437 + if (result.will_flag)
467 { 438 {
468 if (protocolVersion == ProtocolVersion::Mqtt5) 439 if (protocolVersion == ProtocolVersion::Mqtt5)
469 { 440 {
470 - willpublish.constructPropertyBuilder(); 441 + result.willpublish.constructPropertyBuilder();
471 442
472 const size_t proplen = decodeVariableByteIntAtPos(); 443 const size_t proplen = decodeVariableByteIntAtPos();
473 const size_t prop_end_at = pos + proplen; 444 const size_t prop_end_at = pos + proplen;
@@ -479,38 +450,43 @@ void MqttPacket::handleConnect() @@ -479,38 +450,43 @@ void MqttPacket::handleConnect()
479 switch (prop) 450 switch (prop)
480 { 451 {
481 case Mqtt5Properties::WillDelayInterval: 452 case Mqtt5Properties::WillDelayInterval:
482 - willpublish.will_delay = readFourBytesToUint32(); 453 + result.willpublish.will_delay = readFourBytesToUint32();
483 break; 454 break;
484 case Mqtt5Properties::PayloadFormatIndicator: 455 case Mqtt5Properties::PayloadFormatIndicator:
485 - willpublish.propertyBuilder->writePayloadFormatIndicator(readByte()); 456 + result.willpublish.propertyBuilder->writePayloadFormatIndicator(readByte());
486 break; 457 break;
487 case Mqtt5Properties::ContentType: 458 case Mqtt5Properties::ContentType:
488 { 459 {
489 const std::string contentType = readBytesToString(); 460 const std::string contentType = readBytesToString();
490 - willpublish.propertyBuilder->writeContentType(contentType); 461 + result.willpublish.propertyBuilder->writeContentType(contentType);
491 break; 462 break;
492 } 463 }
493 case Mqtt5Properties::ResponseTopic: 464 case Mqtt5Properties::ResponseTopic:
494 { 465 {
495 const std::string responseTopic = readBytesToString(true, true); 466 const std::string responseTopic = readBytesToString(true, true);
496 - willpublish.propertyBuilder->writeResponseTopic(responseTopic); 467 + result.willpublish.propertyBuilder->writeResponseTopic(responseTopic);
497 break; 468 break;
498 } 469 }
499 case Mqtt5Properties::MessageExpiryInterval: 470 case Mqtt5Properties::MessageExpiryInterval:
500 { 471 {
501 const uint32_t expiresAfter = readFourBytesToUint32(); 472 const uint32_t expiresAfter = readFourBytesToUint32();
502 - willpublish.setExpireAfter(expiresAfter); 473 + result.willpublish.setExpireAfter(expiresAfter);
503 break; 474 break;
504 } 475 }
505 case Mqtt5Properties::CorrelationData: 476 case Mqtt5Properties::CorrelationData:
506 { 477 {
507 const std::string correlationData = readBytesToString(false); 478 const std::string correlationData = readBytesToString(false);
508 - willpublish.propertyBuilder->writeCorrelationData(correlationData); 479 + result.willpublish.propertyBuilder->writeCorrelationData(correlationData);
509 break; 480 break;
510 } 481 }
511 case Mqtt5Properties::UserProperty: 482 case Mqtt5Properties::UserProperty:
512 { 483 {
513 - readUserProperty(); 484 + result.willpublish.constructPropertyBuilder();
  485 +
  486 + std::string key = readBytesToString();
  487 + std::string value = readBytesToString();
  488 +
  489 + result.willpublish.propertyBuilder->writeUserProperty(std::move(key), std::move(value));
514 break; 490 break;
515 } 491 }
516 default: 492 default:
@@ -519,61 +495,92 @@ void MqttPacket::handleConnect() @@ -519,61 +495,92 @@ void MqttPacket::handleConnect()
519 } 495 }
520 } 496 }
521 497
522 - willpublish.topic = readBytesToString(true, true); 498 + result.willpublish.topic = readBytesToString(true, true);
523 499
524 uint16_t will_payload_length = readTwoBytesToUInt16(); 500 uint16_t will_payload_length = readTwoBytesToUInt16();
525 - willpublish.payload = std::string(readBytes(will_payload_length), will_payload_length); 501 + result.willpublish.payload = std::string(readBytes(will_payload_length), will_payload_length);
526 } 502 }
527 - if (user_name_flag) 503 +
  504 + if (result.user_name_flag)
528 { 505 {
529 - username = readBytesToString(false); 506 + result.username = readBytesToString(false);
530 507
531 - if (username.empty()) 508 + if (result.username.empty())
532 throw ProtocolError("Username flagged as present, but it's 0 bytes.", ReasonCodes::MalformedPacket); 509 throw ProtocolError("Username flagged as present, but it's 0 bytes.", ReasonCodes::MalformedPacket);
533 510
534 - if (!settings.allowUnsafeUsernameChars && containsDangerousCharacters(username))  
535 - throw ProtocolError(formatString("Username '%s' contains unsafe characters and 'allow_unsafe_username_chars' is false.", username.c_str()), 511 + if (!settings.allowUnsafeUsernameChars && containsDangerousCharacters(result.username))
  512 + throw ProtocolError(formatString("Username '%s' contains unsafe characters and 'allow_unsafe_username_chars' is false.", result.username.c_str()),
536 ReasonCodes::BadUserNameOrPassword); 513 ReasonCodes::BadUserNameOrPassword);
537 } 514 }
538 - if (password_flag) 515 +
  516 + if (result.password_flag)
539 { 517 {
540 - if (this->protocolVersion <= ProtocolVersion::Mqtt311 && !user_name_flag) 518 + if (this->protocolVersion <= ProtocolVersion::Mqtt311 && !result.user_name_flag)
541 { 519 {
542 throw ProtocolError("MQTT 3.1.1: If the User Name Flag is set to 0, the Password Flag MUST be set to 0."); 520 throw ProtocolError("MQTT 3.1.1: If the User Name Flag is set to 0, the Password Flag MUST be set to 0.");
543 } 521 }
544 522
545 - password = readBytesToString(false); 523 + result.password = readBytesToString(false);
546 524
547 - if (password.empty()) 525 + if (result.password.empty())
548 throw ProtocolError("Password flagged as present, but it's 0 bytes.", ReasonCodes::MalformedPacket); 526 throw ProtocolError("Password flagged as present, but it's 0 bytes.", ReasonCodes::MalformedPacket);
549 } 527 }
550 528
  529 + return result;
  530 +}
  531 +
  532 +void MqttPacket::handleConnect()
  533 +{
  534 + if (sender->hasConnectPacketSeen())
  535 + throw ProtocolError("Client already sent a CONNECT.", ReasonCodes::ProtocolError);
  536 +
  537 + std::shared_ptr<SubscriptionStore> subscriptionStore = MainApp::getMainApp()->getSubscriptionStore();
  538 +
  539 + ThreadGlobals::getThreadData()->mqttConnectCounter.inc();
  540 +
  541 + ConnectData connectData = parseConnectData();
  542 +
  543 + if (this->protocolVersion == ProtocolVersion::None)
  544 + {
  545 + // The specs are unclear when to use the version 3 codes or version 5 codes.
  546 + ProtocolVersion fuzzyProtocolVersion = connectData.protocol_level_byte < 0x05 ? ProtocolVersion::Mqtt31 : ProtocolVersion::Mqtt5;
  547 +
  548 + ConnAck connAck(fuzzyProtocolVersion, ReasonCodes::UnsupportedProtocolVersion);
  549 + MqttPacket response(connAck);
  550 + sender->setReadyForDisconnect();
  551 + sender->writeMqttPacket(response);
  552 + logger->logf(LOG_ERR, "Rejecting because of invalid protocol version: %s", sender->repr().c_str());
  553 + return;
  554 + }
  555 +
  556 + const Settings &settings = *ThreadGlobals::getSettings();
  557 +
551 // I deferred the initial UTF8 check on username to be able to give an appropriate connack here, but to me, the specs 558 // I deferred the initial UTF8 check on username to be able to give an appropriate connack here, but to me, the specs
552 // are actually vague whether 'BadUserNameOrPassword' should be given on invalid UTF8. 559 // are actually vague whether 'BadUserNameOrPassword' should be given on invalid UTF8.
553 - if (!isValidUtf8(username)) 560 + if (!isValidUtf8(connectData.username))
554 { 561 {
555 ConnAck connAck(protocolVersion, ReasonCodes::BadUserNameOrPassword); 562 ConnAck connAck(protocolVersion, ReasonCodes::BadUserNameOrPassword);
556 MqttPacket response(connAck); 563 MqttPacket response(connAck);
557 sender->setReadyForDisconnect(); 564 sender->setReadyForDisconnect();
558 sender->writeMqttPacket(response); 565 sender->writeMqttPacket(response);
559 - logger->logf(LOG_ERR, "Username has invalid UTF8: %s", username.c_str()); 566 + logger->logf(LOG_ERR, "Username has invalid UTF8: %s", connectData.username.c_str());
560 return; 567 return;
561 } 568 }
562 569
563 bool validClientId = true; 570 bool validClientId = true;
564 571
565 // Check for wildcard chars in case the client_id ever appears in topics. 572 // Check for wildcard chars in case the client_id ever appears in topics.
566 - if (!settings.allowUnsafeClientidChars && containsDangerousCharacters(client_id)) 573 + if (!settings.allowUnsafeClientidChars && containsDangerousCharacters(connectData.client_id))
567 { 574 {
568 - logger->logf(LOG_ERR, "ClientID '%s' has + or # in the id and 'allow_unsafe_clientid_chars' is false.", client_id.c_str()); 575 + logger->logf(LOG_ERR, "ClientID '%s' has + or # in the id and 'allow_unsafe_clientid_chars' is false.", connectData.client_id.c_str());
569 validClientId = false; 576 validClientId = false;
570 } 577 }
571 - else if (!clean_start && client_id.empty()) 578 + else if (!connectData.clean_start && connectData.client_id.empty())
572 { 579 {
573 logger->logf(LOG_ERR, "ClientID empty and clean start 0, which is incompatible"); 580 logger->logf(LOG_ERR, "ClientID empty and clean start 0, which is incompatible");
574 validClientId = false; 581 validClientId = false;
575 } 582 }
576 - else if (protocolVersion < ProtocolVersion::Mqtt311 && client_id.empty()) 583 + else if (protocolVersion < ProtocolVersion::Mqtt311 && connectData.client_id.empty())
577 { 584 {
578 logger->logf(LOG_ERR, "Empty clientID. Connect with protocol 3.1.1 or higher to have one generated securely."); 585 logger->logf(LOG_ERR, "Empty clientID. Connect with protocol 3.1.1 or higher to have one generated securely.");
579 validClientId = false; 586 validClientId = false;
@@ -590,25 +597,26 @@ void MqttPacket::handleConnect() @@ -590,25 +597,26 @@ void MqttPacket::handleConnect()
590 } 597 }
591 598
592 bool clientIdGenerated = false; 599 bool clientIdGenerated = false;
593 - if (client_id.empty()) 600 + if (connectData.client_id.empty())
594 { 601 {
595 - client_id = getSecureRandomString(23); 602 + connectData.client_id = getSecureRandomString(23);
596 clientIdGenerated = true; 603 clientIdGenerated = true;
597 } 604 }
598 605
599 - sender->setClientProperties(protocolVersion, client_id, username, true, keep_alive, max_outgoing_packet_size, max_outgoing_topic_aliases); 606 + sender->setClientProperties(protocolVersion, connectData.client_id, connectData.username, true, connectData.keep_alive,
  607 + connectData.max_outgoing_packet_size, connectData.max_outgoing_topic_aliases);
600 608
601 - if (will_flag)  
602 - sender->setWill(std::move(willpublish)); 609 + if (connectData.will_flag)
  610 + sender->setWill(std::move(connectData.willpublish));
603 611
604 // Stage connack, for immediate or delayed use when auth succeeds. 612 // Stage connack, for immediate or delayed use when auth succeeds.
605 { 613 {
606 bool sessionPresent = false; 614 bool sessionPresent = false;
607 std::shared_ptr<Session> existingSession; 615 std::shared_ptr<Session> existingSession;
608 616
609 - if (protocolVersion >= ProtocolVersion::Mqtt311 && !clean_start) 617 + if (protocolVersion >= ProtocolVersion::Mqtt311 && !connectData.clean_start)
610 { 618 {
611 - existingSession = subscriptionStore->lockSession(client_id); 619 + existingSession = subscriptionStore->lockSession(connectData.client_id);
612 if (existingSession) 620 if (existingSession)
613 sessionPresent = true; 621 sessionPresent = true;
614 } 622 }
@@ -618,47 +626,47 @@ void MqttPacket::handleConnect() @@ -618,47 +626,47 @@ void MqttPacket::handleConnect()
618 if (protocolVersion >= ProtocolVersion::Mqtt5) 626 if (protocolVersion >= ProtocolVersion::Mqtt5)
619 { 627 {
620 connAck->propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>(); 628 connAck->propertyBuilder = std::make_shared<Mqtt5PropertyBuilder>();
621 - connAck->propertyBuilder->writeSessionExpiry(session_expire); 629 + connAck->propertyBuilder->writeSessionExpiry(connectData.session_expire);
622 connAck->propertyBuilder->writeReceiveMax(settings.maxQosMsgPendingPerClient); 630 connAck->propertyBuilder->writeReceiveMax(settings.maxQosMsgPendingPerClient);
623 connAck->propertyBuilder->writeRetainAvailable(1); 631 connAck->propertyBuilder->writeRetainAvailable(1);
624 connAck->propertyBuilder->writeMaxPacketSize(sender->getMaxIncomingPacketSize()); 632 connAck->propertyBuilder->writeMaxPacketSize(sender->getMaxIncomingPacketSize());
625 if (clientIdGenerated) 633 if (clientIdGenerated)
626 - connAck->propertyBuilder->writeAssignedClientId(client_id); 634 + connAck->propertyBuilder->writeAssignedClientId(connectData.client_id);
627 connAck->propertyBuilder->writeMaxTopicAliases(sender->getMaxIncomingTopicAliasValue()); 635 connAck->propertyBuilder->writeMaxTopicAliases(sender->getMaxIncomingTopicAliasValue());
628 connAck->propertyBuilder->writeWildcardSubscriptionAvailable(1); 636 connAck->propertyBuilder->writeWildcardSubscriptionAvailable(1);
629 connAck->propertyBuilder->writeSubscriptionIdentifiersAvailable(0); 637 connAck->propertyBuilder->writeSubscriptionIdentifiersAvailable(0);
630 connAck->propertyBuilder->writeSharedSubscriptionAvailable(0); 638 connAck->propertyBuilder->writeSharedSubscriptionAvailable(0);
631 - connAck->propertyBuilder->writeServerKeepAlive(keep_alive); 639 + connAck->propertyBuilder->writeServerKeepAlive(connectData.keep_alive);
632 640
633 - if (!authenticationMethod.empty()) 641 + if (!connectData.authenticationMethod.empty())
634 { 642 {
635 - connAck->propertyBuilder->writeAuthenticationMethod(authenticationMethod); 643 + connAck->propertyBuilder->writeAuthenticationMethod(connectData.authenticationMethod);
636 } 644 }
637 } 645 }
638 646
639 sender->stageConnack(std::move(connAck)); 647 sender->stageConnack(std::move(connAck));
640 } 648 }
641 649
642 - sender->setRegistrationData(clean_start, client_receive_max, session_expire); 650 + sender->setRegistrationData(connectData.clean_start, connectData.client_receive_max, connectData.session_expire);
643 651
644 Authentication &authentication = *ThreadGlobals::getAuth(); 652 Authentication &authentication = *ThreadGlobals::getAuth();
645 AuthResult authResult = AuthResult::login_denied; 653 AuthResult authResult = AuthResult::login_denied;
646 654
647 - if (!user_name_flag && authenticationMethod.empty() && settings.allowAnonymous) 655 + if (!connectData.user_name_flag && connectData.authenticationMethod.empty() && settings.allowAnonymous)
648 { 656 {
649 authResult = AuthResult::success; 657 authResult = AuthResult::success;
650 } 658 }
651 - else if (!authenticationMethod.empty()) 659 + else if (!connectData.authenticationMethod.empty())
652 { 660 {
653 - sender->setExtendedAuthenticationMethod(authenticationMethod); 661 + sender->setExtendedAuthenticationMethod(connectData.authenticationMethod);
654 662
655 std::string returnData; 663 std::string returnData;
656 - authResult = authentication.extendedAuth(client_id, ExtendedAuthStage::Auth, authenticationMethod, authenticationData, 664 + authResult = authentication.extendedAuth(connectData.client_id, ExtendedAuthStage::Auth, connectData.authenticationMethod, connectData.authenticationData,
657 getUserProperties(), returnData, sender->getMutableUsername()); 665 getUserProperties(), returnData, sender->getMutableUsername());
658 666
659 if (authResult == AuthResult::auth_continue) 667 if (authResult == AuthResult::auth_continue)
660 { 668 {
661 - Auth auth(ReasonCodes::ContinueAuthentication, authenticationMethod, returnData); 669 + Auth auth(ReasonCodes::ContinueAuthentication, connectData.authenticationMethod, returnData);
662 MqttPacket pack(auth); 670 MqttPacket pack(auth);
663 sender->writeMqttPacket(pack); 671 sender->writeMqttPacket(pack);
664 return; 672 return;
@@ -670,7 +678,7 @@ void MqttPacket::handleConnect() @@ -670,7 +678,7 @@ void MqttPacket::handleConnect()
670 } 678 }
671 else 679 else
672 { 680 {
673 - authResult = authentication.unPwdCheck(username, password, getUserProperties()); 681 + authResult = authentication.unPwdCheck(connectData.username, connectData.password, getUserProperties());
674 } 682 }
675 683
676 if (authResult == AuthResult::success) 684 if (authResult == AuthResult::success)
mqttpacket.h
@@ -35,6 +35,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;. @@ -35,6 +35,7 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
35 35
36 #include "variablebyteint.h" 36 #include "variablebyteint.h"
37 #include "mqtt5properties.h" 37 #include "mqtt5properties.h"
  38 +#include "packetdatatypes.h"
38 39
39 /** 40 /**
40 * @brief The MqttPacket class represents incoming and outgonig packets. 41 * @brief The MqttPacket class represents incoming and outgonig packets.
@@ -109,6 +110,7 @@ public: @@ -109,6 +110,7 @@ public:
109 static void bufferToMqttPackets(CirBuf &buf, std::vector<MqttPacket> &packetQueueIn, std::shared_ptr<Client> &sender); 110 static void bufferToMqttPackets(CirBuf &buf, std::vector<MqttPacket> &packetQueueIn, std::shared_ptr<Client> &sender);
110 111
111 void handle(); 112 void handle();
  113 + ConnectData parseConnectData();
112 void handleConnect(); 114 void handleConnect();
113 void handleExtendedAuth(); 115 void handleExtendedAuth();
114 void handleDisconnect(); 116 void handleDisconnect();
packetdatatypes.cpp 0 → 100644
  1 +#include "packetdatatypes.h"
  2 +
  3 +#include "threadglobals.h"
  4 +#include "settings.h"
  5 +
  6 +ConnectData::ConnectData()
  7 +{
  8 + const Settings *settings = ThreadGlobals::getSettings();
  9 +
  10 + client_receive_max = settings->maxQosMsgPendingPerClient;
  11 + session_expire = settings->getExpireSessionAfterSeconds();
  12 + max_outgoing_packet_size = settings->maxPacketSize;
  13 +}
packetdatatypes.h 0 → 100644
  1 +#ifndef PACKETDATATYPES_H
  2 +#define PACKETDATATYPES_H
  3 +
  4 +#include "mqtt5properties.h"
  5 +
  6 +struct ConnectData
  7 +{
  8 + char protocol_level_byte = 0;
  9 +
  10 + // Flags
  11 + bool user_name_flag = false;
  12 + bool password_flag = false;
  13 + bool will_retain = false;
  14 + char will_qos = false;
  15 + bool will_flag = false;
  16 + bool clean_start = false;
  17 +
  18 + uint16_t keep_alive = 0;
  19 +
  20 + // Content from properties
  21 + uint16_t client_receive_max;
  22 + uint32_t session_expire;
  23 + uint32_t max_outgoing_packet_size;
  24 + uint16_t max_outgoing_topic_aliases = 0; // Default MUST BE 0, meaning server won't initiate aliases;
  25 + bool request_response_information = false;
  26 + bool request_problem_information = false;
  27 + std::string authenticationMethod;
  28 + std::string authenticationData;
  29 +
  30 + // Content from Payload
  31 + std::string client_id;
  32 + WillPublish willpublish;
  33 + std::string username;
  34 + std::string password;
  35 +
  36 + Mqtt5PropertyBuilder builder;
  37 +
  38 + ConnectData();
  39 +};
  40 +
  41 +
  42 +#endif // PACKETDATATYPES_H