/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * Copyright 2013 - 2020, nymea GmbH * Contact: contact@nymea.io * * This file is part of nymea. * This project including source code and documentation is protected by copyright law, and * remains the property of nymea GmbH. All rights, including reproduction, publication, * editing and translation, are reserved. The use of this project is subject to the terms of a * license agreement to be concluded with nymea GmbH in accordance with the terms * of use of nymea GmbH, available under https://nymea.io/license * * GNU Lesser General Public License Usage * Alternatively, this project may be redistributed and/or modified under the terms of the GNU * Lesser General Public License as published by the Free Software Foundation; version 3. * this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License along with this project. * If not, see . * * For any further details and any questions please contact us under contact@nymea.io * or see our FAQ/Licensing Information on https://nymea.io/license/faq * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ #include "mqttserver.h" #include "mqttclient.h" #include "mqttclient_p.h" #include #include #include "mqtttests.h" #if (QT_VERSION >= QT_VERSION_CHECK(5, 7, 0)) MqttClient *MqttTests::connectAndWait(const QString &clientId, bool cleanSession, quint16 keepAlive, const QString &willTopic, const QString &willMessage, Mqtt::QoS willQoS, bool willRetain) { QPair result = connectToServer(clientId, cleanSession, keepAlive, willTopic, willMessage, willQoS, willRetain); if (result.second->count() == 0) { result.second->wait(); } if (result.second->count() == 0) { qWarning() << "WARNING: Client didn't emit connected"; } delete result.second; return result.first; } QPair MqttTests::connectToServer(const QString &clientId, bool cleanSession, quint16 keepAlive, const QString &willTopic, const QString &willMessage, Mqtt::QoS willQoS, bool willRetain) { MqttClient* client = new MqttClient(clientId, keepAlive, willTopic, willMessage.toUtf8(), willQoS, willRetain, this); client->setAutoReconnect(false); m_clients.append(client); QSignalSpy *spy = new QSignalSpy(client, &MqttClient::connected); connectClientToServer(client, cleanSession); return qMakePair(client, spy); } void MqttTests::disconnectAndWait(MqttClient* client) { QSignalSpy disconnectedSpy(client, &MqttClient::disconnected); client->disconnectFromHost(); if (disconnectedSpy.count() == 0) { disconnectedSpy.wait(); } } bool MqttTests::subscribeAndWait(MqttClient* client, const QString &topic, Mqtt::QoS qos) { QSignalSpy subscribedSpy(client, &MqttClient::subscribeResult); quint16 packetId = client->subscribe(topic, qos); if (subscribedSpy.count() == 0) { subscribedSpy.wait(); } Mqtt::SubscribeReturnCode expectedSubscribeReturnCode = qos == Mqtt::QoS0 ? Mqtt::SubscribeReturnCodeSuccessQoS0 : qos == Mqtt::QoS1 ? Mqtt::SubscribeReturnCodeSuccessQoS1 : Mqtt::SubscribeReturnCodeSuccessQoS2; return subscribedSpy.count() == 1 && subscribedSpy.first().at(0).toInt() == packetId && subscribedSpy.first().at(1).value().first() == expectedSubscribeReturnCode; } void MqttTests::initTestCase() { // QLoggingCategory::setFilterRules("nymea.mqtt.protocol.debug=false"); m_server = new MqttServer(this); m_serverId = startServer(m_server); QVERIFY2(m_serverId >= 0, "Failed to register server. Tests won't work."); } void MqttTests::cleanup() { while (!m_clients.isEmpty()) { MqttClient *client = m_clients.takeFirst(); client->disconnectFromHost(); client->deleteLater(); } QTRY_COMPARE(m_server->clients().count(), 0); } void MqttTests::cleanupTestCase() { m_server->close(m_serverId); delete m_server; } void MqttTests::connectAndDisconnect() { QSignalSpy serverSpy(m_server, &MqttServer::clientConnected); QString clientId = "connectAndDisconnect-client"; QPair result = connectToServer(clientId); MqttClient* client = result.first; connect(client, &MqttClient::connected, this, [client](Mqtt::ConnectReturnCode connectReturnCode, Mqtt::ConnackFlags connackFlags){ QVERIFY2(client->isConnected(), "MqttClient::isConnected not returning true in connected signal()"); QCOMPARE(connectReturnCode, Mqtt::ConnectReturnCodeAccepted); QCOMPARE(connackFlags, Mqtt::ConnackFlagNone); }); if (result.second->count() == 0) { result.second->wait(); } QVERIFY2(serverSpy.count() == 1, "Server didn't emit clientConnected"); QVERIFY2(serverSpy.at(0).at(1) == clientId, "ClientId not matching on server side."); QSignalSpy serverSpyDisconnect(m_server, &MqttServer::clientDisconnected); QSignalSpy clientSpy(client, &MqttClient::disconnected); client->disconnectFromHost(); QTRY_VERIFY2(clientSpy.count() == 1, "client didn't emit disconnected"); QTRY_VERIFY2(serverSpyDisconnect.count() == 1, "Server didn't emit clientDisconnected"); QVERIFY2(serverSpyDisconnect.at(0).first() == clientId, "ClientId not matching on server side."); } void MqttTests::keepAliveTimesOut() { QSignalSpy keepAliveSpy(m_server, &MqttServer::clientAlive); MqttClient *client = connectAndWait("keepAlive1sec-client", true, 1); client->setAutoReconnect(false); QTest::qWait(2000); qDebug() << "Received" << keepAliveSpy.count() << "keep alive messages"; QVERIFY2(client->isConnected(), "Client connection dropped"); QVERIFY2(keepAliveSpy.count() > 0, "Keep alive not received"); client->disconnectFromHost(); keepAliveSpy.clear(); client = connectAndWait("timeout1sec-client", true, 1); client->setAutoReconnect(false); client->d_ptr->keepAliveTimer.stop(); // disable the keepalive timer QTest::qWait(2000); qDebug() << "Received" << keepAliveSpy.count() << "keep alive messages"; QVERIFY2(!client->isConnected(), "Client connection still alive but it should have been dropped"); } void MqttTests::subscribeAndPublish_data() { QTest::addColumn("qosClient1"); QTest::addColumn("qosClient2"); QList > rows; rows.append({Mqtt::QoS0, Mqtt::QoS0}); rows.append({Mqtt::QoS0, Mqtt::QoS1}); rows.append({Mqtt::QoS0, Mqtt::QoS2}); rows.append({Mqtt::QoS1, Mqtt::QoS0}); rows.append({Mqtt::QoS1, Mqtt::QoS1}); rows.append({Mqtt::QoS1, Mqtt::QoS2}); rows.append({Mqtt::QoS2, Mqtt::QoS0}); rows.append({Mqtt::QoS2, Mqtt::QoS1}); rows.append({Mqtt::QoS2, Mqtt::QoS2}); foreach (const QList &row, rows) { QTest::newRow(QString("Subscribe QoS%1 -> Publish QoS%2").arg(row.at(0)).arg(row.at(1)).toUtf8().data()) << row.at(0) << row.at(1); } } void MqttTests::subscribeAndPublish() { QFETCH(Mqtt::QoS, qosClient1); QFETCH(Mqtt::QoS, qosClient2); QString clientId1 = QString("subQoS%1-client").arg(qosClient1); MqttClient *client1 = connectAndWait(clientId1); QString clientId2 = QString("pubQoS%1-client").arg(qosClient2); MqttClient *client2 = connectAndWait(clientId2); QSignalSpy serverSubscribeSpy(m_server, &MqttServer::clientSubscribed); QSignalSpy clientSubscribeSpy(client1, &MqttClient::subscribeResult); quint16 packetId = client1->subscribe("#", qosClient1); QTRY_VERIFY2(serverSubscribeSpy.count() == 1, "Server did not emit clientSubscribed"); QVERIFY2(serverSubscribeSpy.first().first() == clientId1, "Client Id not matching"); QVERIFY2(serverSubscribeSpy.first().at(1) == "#", "Topic not matching"); QVERIFY2(serverSubscribeSpy.first().at(2) == qosClient1, "QoS not matching"); QTRY_VERIFY2(clientSubscribeSpy.count() == 1, "Client did not emit subscribed"); QVERIFY2(clientSubscribeSpy.first().first() == packetId, "Packet ID not matching"); QVERIFY2(clientSubscribeSpy.first().at(1).value().count() == 1, "Subscribe return code count not matching"); QSignalSpy serverPublishReceivedSpy(m_server, &MqttServer::publishReceived); QSignalSpy serverPublishedSpy(m_server, &MqttServer::published); QSignalSpy client1PublishReceivedSpy(client1, &MqttClient::publishReceived); QSignalSpy client2PublishedSpy(client2, &MqttClient::published); packetId = client2->publish("/testtopic/", "Hello world", qosClient2); QTRY_VERIFY2(serverPublishReceivedSpy.count() == 1, "Server did not emit publishReceived"); QVERIFY2(serverPublishReceivedSpy.first().at(0) == clientId2, "Server did emit publishReceived signal but client ID is not matching"); QVERIFY2(serverPublishReceivedSpy.first().at(1) == packetId, QString("Server did emit publishReceived signal but Packet ID is not matching:\nActual: %1\nExpected: %2").arg(serverPublishReceivedSpy.first().at(1).toInt()).arg(packetId).toUtf8().data()); QVERIFY2(serverPublishReceivedSpy.first().at(2) == "/testtopic/", "Server did emit publishReceived signal but topic is not matching"); QVERIFY2(serverPublishReceivedSpy.first().at(3) == "Hello world", "Server did emit publishReceived signal but payload is not matching"); QTRY_VERIFY2(serverPublishedSpy.count() == 1, "Server did not emit published"); QVERIFY2(serverPublishedSpy.first().at(0) == clientId1, "Server did emit published signal but client ID is not matching"); QTRY_VERIFY2(client1PublishReceivedSpy.count() == 1, "Subscribing client did not emit publishReceived signal"); QVERIFY2(client1PublishReceivedSpy.first().at(0) == "/testtopic/", "Subscribing client did emit publishReceived signal but topic is not matching"); QVERIFY2(client1PublishReceivedSpy.first().at(1) == "Hello world", "Subscribing client did emit publishReceived signal but payload is not matching"); QTRY_VERIFY2(client2PublishedSpy.count() == 1, "Publishing client did not emit published signal"); QVERIFY2(client2PublishedSpy.first().first() == packetId, "Publishing client did emit published signal but packet ID not matching"); } void MqttTests::willIsSentOnClientDisappearing() { MqttClient *client1 = connectAndWait("subWill-client"); MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye"); QSignalSpy publishSpy(client1, &MqttClient::publishReceived); QVERIFY(subscribeAndWait(client1, "#")); client2->d_ptr->transport->abort(); QTRY_VERIFY2(publishSpy.count() == 1, "Will has not been sent"); QVERIFY2(publishSpy.first().at(0) == "/testtopic", "Will topic not matching"); QVERIFY2(publishSpy.first().at(1) == "Bye bye", "Will message not matching"); } void MqttTests::willIsNotSentOnClientDisconnecting() { MqttClient *client1 = connectAndWait("subWill-client"); MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye"); QSignalSpy subscribeSpy(client1, &MqttClient::subscribeResult); QSignalSpy publishSpy(client1, &MqttClient::publishReceived); client1->subscribe("#"); subscribeSpy.wait(); client2->disconnectFromHost(); publishSpy.wait(200); QVERIFY2(publishSpy.count() == 0, "Will has been sent but it should not have been"); } void MqttTests::testWillRetain() { MqttClient *client1 = connectAndWait("subWill-client"); MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye", Mqtt::QoS1, true); QSignalSpy subscribeSpy(client1, &MqttClient::subscribeResult); QSignalSpy publishSpy(client1, &MqttClient::publishReceived); client1->subscribe("#"); subscribeSpy.wait(); client2->setAutoReconnect(false); client2->d_ptr->transport->abort(); QTRY_VERIFY2(publishSpy.count() == 1, "Will has not been sent"); QVERIFY2(publishSpy.first().at(0) == "/testtopic", QString("Will topic not matching: %1").arg(publishSpy.first().at(0).toString()).toUtf8().data()); QVERIFY2(publishSpy.first().at(1) == "Bye bye", "Will message not matching"); QVERIFY2(publishSpy.first().at(2) == false, "Retain flag not matching"); MqttClient *client3 = connectAndWait("subWill-client2"); QSignalSpy retainedWillSpy(client3, &MqttClient::publishReceived); client3->subscribe("#"); QTRY_VERIFY2(retainedWillSpy.count() == 1, "Retained Will has not been sent"); QVERIFY2(retainedWillSpy.first().at(0) == "/testtopic", "Will topic not matching"); QVERIFY2(retainedWillSpy.first().at(1) == "Bye bye", "Will message not matching"); QVERIFY2(retainedWillSpy.first().at(2) == true, "Retain flag not matching"); // Clear retain on /testtopic QSignalSpy clearRetainSpy(client3, &MqttClient::published); client3->publish("/testtopic", QByteArray(), Mqtt::QoS1, true); QTRY_VERIFY2(clearRetainSpy.count() == 1, "Clearing retain message did not succeed"); } void MqttTests::testAutoReconnect() { MqttClient *client1 = connectAndWait("client1"); client1->setAutoReconnect(true); QSignalSpy disconnectedSpy(client1, &MqttClient::disconnected); QSignalSpy connectedSpy(client1, &MqttClient::connected); client1->d_ptr->transport->abort(); QTRY_VERIFY2(disconnectedSpy.count() == 1, "client did not emit disconnected"); QTRY_VERIFY2(connectedSpy.count() == 1, "client did not emit connected"); } void MqttTests::testQoS1Retransmissions() { QSignalSpy serverSpy(m_server, &MqttServer::publishReceived); MqttClient *client = connectAndWait("client1"); client->setAutoReconnect(true); // publish a packet, flush the pipe and immediately drop the connection before we have a chance to receive the PUBACK int packetId = client->publish("/testtopic", "Hello world", Mqtt::QoS1); client->d_ptr->transport->flush(); QSignalSpy connectedSpy(client, &MqttClient::connected); client->d_ptr->transport->abort(); // Wait for it to reconnect, it should then republish the packet connectedSpy.wait(); QTRY_VERIFY2(serverSpy.count() == 2, "Server didn't receive the publication twice but it should have"); QCOMPARE(serverSpy.at(0).at(0).toString(), QString("client1")); QCOMPARE(serverSpy.at(0).at(1).toInt(), packetId); QCOMPARE(serverSpy.at(0).at(2).toString(), QString("/testtopic")); QCOMPARE(serverSpy.at(0).at(3).toString(), QString("Hello world")); QCOMPARE(serverSpy.at(1).at(0).toString(), QString("client1")); QCOMPARE(serverSpy.at(1).at(1).toInt(), packetId); QCOMPARE(serverSpy.at(1).at(2).toString(), QString("/testtopic")); QCOMPARE(serverSpy.at(1).at(3).toString(), QString("Hello world")); } void MqttTests::testMultiSubscription() { MqttClient *client = connectAndWait("subscription-topics"); QSignalSpy subscribedSpy(client, &MqttClient::subscribeResult); MqttSubscriptions subscriptions = { MqttSubscription("topic1"), MqttSubscription("topic2") , MqttSubscription("#invalid") }; Mqtt::SubscribeReturnCodes subscriptionReturnCodes = { Mqtt::SubscribeReturnCodeSuccessQoS0, Mqtt::SubscribeReturnCodeSuccessQoS0, Mqtt::SubscribeReturnCodeFailure}; client->subscribe(subscriptions); QTRY_VERIFY2(subscribedSpy.count() == 1, "Subscribed signal not received"); Mqtt::SubscribeReturnCodes retCodes = subscribedSpy.first().at(1).value(); QCOMPARE(retCodes, subscriptionReturnCodes); } void MqttTests::testSubscriptionTopicFilters_data() { QTest::addColumn("topicFilter"); QTest::addColumn("subscriptionReturnCode"); QTest::newRow("a") << "a" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("/") << "/" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("/a") << "/a" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("//") << "//" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("/a/") << "/a/" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("/a/b") << "/a/b" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("//b") << "//b" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("#") << "#" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("a/#") << "a/#" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("a/b#") << "a/b#" << Mqtt::SubscribeReturnCodeFailure; QTest::newRow("a/b/#/c") << "a/b/#/c" << Mqtt::SubscribeReturnCodeFailure; QTest::newRow("+") << "+" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("+/a/#") << "+/a/#" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("a+") << "a+" << Mqtt::SubscribeReturnCodeFailure; QTest::newRow("a/+/b") << "a/+/b" << Mqtt::SubscribeReturnCodeSuccessQoS0; QTest::newRow("+/a/#") << "+/a/#" << Mqtt::SubscribeReturnCodeSuccessQoS0; } void MqttTests::testSubscriptionTopicFilters() { QFETCH(QString, topicFilter); QFETCH(Mqtt::SubscribeReturnCode, subscriptionReturnCode); MqttClient *client = connectAndWait("subscription-topics"); QSignalSpy subscribedSpy(client, &MqttClient::subscribeResult); client->subscribe(topicFilter); QTRY_VERIFY2(subscribedSpy.count() == 1, "Subscribed signal not received"); Mqtt::SubscribeReturnCodes retCodes = subscribedSpy.first().at(1).value(); QCOMPARE(retCodes.first(), subscriptionReturnCode); } void MqttTests::testSubscriptionTopicMatching_data() { QTest::addColumn("topicFilter"); QTest::addColumn("topic"); QTest::addColumn("receivedPublishMessageCount"); QList rows; rows.append({ "a", "a", "1" }); rows.append({ "a", "b", "0" }); rows.append({ "/", "/" , "1" }); rows.append({ "/", "/a" , "0" }); rows.append({ "#", "a", "1" }); rows.append({ "#", "a/b", "1" }); rows.append({ "+", "a", "1" }); rows.append({ "+", "a/", "0" }); rows.append({ "+", "/a" , "0" }); rows.append({ "+", "a/b", "0" }); rows.append({ "/#", "/" , "1" }); rows.append({ "/+", "/a" , "1" }); rows.append({ "/a", "/a" , "1" }); rows.append({ "/a", "/a" , "1" }); rows.append({ "a/+", "a", "0" }); rows.append({ "a/+", "a/", "1" }); rows.append({ "a/+", "a/b", "1" }); rows.append({ "a/+", "a/b/c", "0" }); rows.append({ "+/+", "/a" , "1" }); rows.append({ "+/+", "/a" , "1" }); rows.append({ "+/+", "a/" , "1" }); rows.append({ "a/#", "a/b", "1" }); rows.append({ "a/#", "a", "1" }); rows.append({ "a/#", "/", "0" }); rows.append({ "a/#", "a/b/c", "1" }); rows.append({ "a/#", "b/c", "0" }); rows.append({ "a//", "a//", "1" }); rows.append({ "a//#", "a//b", "1" }); rows.append({ "a/b/+", "a/b/c", "1" }); rows.append({ "a/b/+", "a/b/d", "1" }); rows.append({ "a/b/+", "a/b/c/d", "0" }); rows.append({ "+/a/#", "a/a/b", "1" }); rows.append({ "+/a/#", "a/a/b/c", "1" }); rows.append({ "+/a/#", "d/a/b/c", "1" }); rows.append({ "+/a/#", "a/b/c/d", "0" }); rows.append({ "a/b/#", "a/b/c", "1" }); rows.append({ "a//+/", "a//b/", "1" }); rows.append({ "a//+/", "a///", "1" }); rows.append({ "a//+/#", "a//b/c", "1" }); rows.append({ "a//+/#", "a/b/c/d", "0" }); rows.append({ "a/b/c", "a/b", "0"}); rows.append({ "$SYS/", "$SYS/", "0" }); rows.append({ "#", "$SYS/", "0" }); rows.append({ "+/", "$SYS/", "0" }); foreach (const QStringList &row, rows) { QTest::newRow(QString("%1, %2").arg(row.at(0), row.at(1)).toUtf8().data()) << row.at(0) << row.at(1) << row.at(2).toInt(); } } void MqttTests::testSubscriptionTopicMatching() { QFETCH(QString, topicFilter); QFETCH(QString, topic); QFETCH(int, receivedPublishMessageCount); MqttClient *publisher = connectAndWait("publisher"); MqttClient *subscriber = connectAndWait("subscriber"); QSignalSpy subscribedSpy(subscriber, &MqttClient::subscribeResult); QSignalSpy publishReceivedSpy(subscriber, &MqttClient::publishReceived); QSignalSpy publishedSpy(publisher, &MqttClient::published); subscriber->subscribe(topicFilter); QTRY_VERIFY2(subscribedSpy.count() == 1, "Subscribed signal not received"); publisher->publish(topic, "testpayload"); QTRY_VERIFY2(publishedSpy.count() == 1, "Published signal not received"); if (receivedPublishMessageCount == 0) { // Give it some time to wait for a publishReceived (It should not show up) QTest::qWait(500); } else if (publishReceivedSpy.count() == 0) { publishReceivedSpy.wait(); } QVERIFY2(publishReceivedSpy.count() == receivedPublishMessageCount, QString("PublishReceived signal not received the expected amount of time.\nActual: %1\nExpected: %2").arg(publishReceivedSpy.count()).arg(receivedPublishMessageCount).toUtf8().data()); } void MqttTests::testSessionManagementDropOldSession() { MqttClient *client1Session1 = connectAndWait("client1"); client1Session1->setAutoReconnect(false); QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribeResult); client1Session1->subscribe("/testtopic"); QTRY_VERIFY(subscribeSpy.count() == 1); QSignalSpy disconnectedSpy(client1Session1, &MqttClient::disconnected); QPair client1Session2 = connectToServer("client1"); if (client1Session2.second->count() == 0) { client1Session2.second->wait(); } QVERIFY2(!client1Session2.second->first().at(0).value().testFlag(Mqtt::ConnackFlagSessionPresent), "Session present flag is set while it should not be."); QTRY_VERIFY2(disconnectedSpy.count() == 1, "First instance didn't get disconnected when new instance connected."); // Now connect with another client and post to testtopic. Client 1 should not get it because he didn't resume the session and didn't resubscribe QSignalSpy client1PublishReceivedSpy(client1Session2.first, &MqttClient::publishReceived); MqttClient *client2 = connectAndWait("client2"); client2->publish("/testtopic", "Hello world"); QTest::qWait(500); QVERIFY2(client1PublishReceivedSpy.count() == 0, "Client 1 did receive the publish but it should not have."); } void MqttTests::testSessionManagementResumeOldSession() { MqttClient *client1Session1 = connectAndWait("client1"); client1Session1->setAutoReconnect(false); QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribeResult); client1Session1->subscribe("/testtopic"); QTRY_VERIFY(subscribeSpy.count() == 1); QSignalSpy disconnectedSpy(client1Session1, &MqttClient::disconnected); QPair client1Session2 = connectToServer("client1", false); if (client1Session2.second->count() == 0) { client1Session2.second->wait(); } QVERIFY2(client1Session2.second->first().at(0).value() == Mqtt::ConnectReturnCodeAccepted, "Session hasn't been accepted."); QVERIFY2(client1Session2.second->first().at(1).value().testFlag(Mqtt::ConnackFlagSessionPresent), "Session present flag is not set while it should be."); QTRY_VERIFY2(disconnectedSpy.count() == 1, "First instance didn't get disconnected when new instance connected."); // Now connect with another client and post to testtopic. Client 1 should not get it because he didn't resume the session and didn't resubscribe QSignalSpy client1PublishReceivedSpy(client1Session2.first, &MqttClient::publishReceived); MqttClient *client2 = connectAndWait("client2"); client2->publish("/testtopic", "Hello world"); QTRY_VERIFY2(client1PublishReceivedSpy.count() == 1, "Client 1 did not receive the publish but it should have."); } void MqttTests::testSessionManagementFailResumeOldSession() { // try to resume non existing session QPair client = connectToServer("client1", false); if (client.second->count() == 0) { client.second->wait(); } QVERIFY2(!client.second->first().at(0).value().testFlag(Mqtt::ConnackFlagSessionPresent), "Session present flag is set while it should not be."); } void MqttTests::testQoS1PublishToServerIsAckedOnSessionResume() { MqttClient *client = connectAndWait("client1", true); client->setAutoReconnect(true); QSignalSpy reconnectedSpy(client, &MqttClient::connected); QSignalSpy publishedSpy(client, &MqttClient::published); client->publish("/testtopic", "Hello world", Mqtt::QoS1); client->d_ptr->transport->flush(); client->d_ptr->transport->abort(); QVERIFY2(publishedSpy.count() == 0, "Should not have received the PUBACK yet... Test is bad."); QTRY_VERIFY2(reconnectedSpy.count() == 1, "client didn't reconnect"); QTRY_VERIFY2(publishedSpy.count() == 1, "Published signal not emitted after reconnect"); } void MqttTests::testQoS1PublishToClientIsDeliveredOnSessionResume() { MqttClient *oldClient1 = connectAndWait("client1", true); QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribeResult); oldClient1->subscribe("/testtopic", Mqtt::QoS1); QTRY_VERIFY(subscribedSpy.count() == 1); // prevent the client from receiving anything oldClient1->d_ptr->transport->blockSignals(true); // publish something with a second client MqttClient *client2 = connectAndWait("client2"); QSignalSpy publishedSpy(client2, &MqttClient::published); client2->publish("/testtopic", "Hello world", Mqtt::QoS1); QTRY_VERIFY(publishedSpy.count() == 1); // Resume (take over) old session and make sure we got the publish MqttClient *newClient1 = connectToServer("client1", false).first;; QSignalSpy publishReceivedSpy(newClient1, &MqttClient::publishReceived); QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Client did not receive publish packet upon session resume"); } void MqttTests::testQoS2PublishToServerIsCompletedOnSessionResume() { MqttClient *client = connectAndWait("client1", true); client->setAutoReconnect(true); QSignalSpy reconnectedSpy(client, &MqttClient::connected); QSignalSpy publishedSpy(client, &MqttClient::published); client->publish("/testtopic", "Hello world", Mqtt::QoS2); client->d_ptr->transport->flush(); client->d_ptr->transport->abort(); QVERIFY2(publishedSpy.count() == 0, "Should not have received the PUBACK yet... Test is bad."); QTRY_VERIFY2(reconnectedSpy.count() == 1, "client didn't reconnect"); QTRY_VERIFY2(publishedSpy.count() == 1, "Published signal not emitted after reconnect"); } void MqttTests::testQoS2PublishToClientIsCompletedOnSessionResume() { MqttClient *oldClient1 = connectAndWait("client1", true); QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribeResult); oldClient1->subscribe("/testtopic", Mqtt::QoS2); QTRY_VERIFY(subscribedSpy.count() == 1); // prevent the client from receiving anything oldClient1->d_ptr->transport->blockSignals(true); // pbulish something with a second client MqttClient *client2 = connectAndWait("client2"); QSignalSpy publishedSpy(client2, &MqttClient::published); client2->publish("/testtopic", "Hello world", Mqtt::QoS2); QTRY_VERIFY(publishedSpy.count() == 1); // Resume (take over) old session and make sure we got the publish MqttClient *newClient1 = connectToServer("client1", false).first; QSignalSpy publishReceivedSpy(newClient1, &MqttClient::publishReceived); QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Client did not receive publish packet upon session resume"); } void MqttTests::testRetain() { MqttClient *client1 = connectAndWait("client1", true); // post a retained message QSignalSpy publishedSpy(client1, &MqttClient::published); client1->publish("/retaintopic", "Message 1", Mqtt::QoS1, true); QTRY_VERIFY(publishedSpy.count() == 1); // Connect a second client MqttClient *client2 = connectAndWait("client2"); // subscribe to topic and verify we received the retained message QSignalSpy publishReceivedSpy(client2, &MqttClient::publishReceived); client2->subscribe("/retaintopic", Mqtt::QoS1); QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Did not receive retained topic on subscribe."); QVERIFY2(publishReceivedSpy.first().at(2).toBool() == true, "Retain flag not set"); publishReceivedSpy.clear(); // Post another retained message from client1 and make sure we receive it client1->publish("/retaintopic", "Message 2", Mqtt::QoS1, true); QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Did not receive published meessage."); QVERIFY2(publishReceivedSpy.first().at(2).toBool() == false, "Retain flag is set"); // Disconnect client, and connect again, verify we get 2 retained messages now disconnectAndWait(client2); client2 = connectAndWait("client2"); QSignalSpy publishReceivedSpy2(client2, &MqttClient::publishReceived); client2->subscribe("/retaintopic", Mqtt::QoS1); QTRY_VERIFY2(publishReceivedSpy2.count() == 2, "Did not receive retained topic on subscribe."); QVERIFY2(publishReceivedSpy2.at(0).at(2).toBool() == true, "Retain flag not set"); QVERIFY2(publishReceivedSpy2.at(1).at(2).toBool() == true, "Retain flag not set"); publishReceivedSpy2.clear(); // Post a message with 0 paylod, it should be delivered as normal but discard any retained messages client1->publish("/retaintopic", QByteArray(), Mqtt::QoS1, true); QTRY_VERIFY2(publishReceivedSpy2.count() == 1, "Did not receive published message."); QVERIFY2(publishReceivedSpy.first().at(2).toBool() == false, "Retain flag is set"); disconnectAndWait(client2); client2 = connectAndWait("client2"); QSignalSpy publishReceivedSpy3(client2, &MqttClient::publishReceived); client2->subscribe("/retaintopic", Mqtt::QoS1); QTest::qWait(500); QVERIFY2(publishReceivedSpy3.count() == 0, "Did receive retained messages on subscribe but should not have."); // post another 2 retained messages (and some others), reconnect and verify they're there again client1->publish("/retaintopic", "Message 3", Mqtt::QoS1, true); client1->publish("/retaintopic", "Message 4", Mqtt::QoS1, false); client1->publish("/retaintopic", "Message 5", Mqtt::QoS1, false); client1->publish("/retaintopic", "Message 6", Mqtt::QoS1, true); client1->publish("/retaintopic", "Message 7", Mqtt::QoS1, false); QTRY_VERIFY(publishReceivedSpy3.count() == 5); disconnectAndWait(client2); client2 = connectAndWait("client2"); QSignalSpy publishReceivedSpy4(client2, &MqttClient::publishReceived); client2->subscribe("/retaintopic", Mqtt::QoS1); QTRY_VERIFY2(publishReceivedSpy4.count() == 2, "Did not receive retained messages."); publishReceivedSpy4.clear(); // post a QoS0 message to this topic. it should discard previously retained messages but stay retained client1->publish("/retaintopic", "Message 8", Mqtt::QoS0, true); QTRY_VERIFY2(publishReceivedSpy4.count() == 1, "Did not receive retained messages."); disconnectAndWait(client2); client2 = connectAndWait("client2"); QSignalSpy publishReceivedSpy5(client2, &MqttClient::publishReceived); client2->subscribe("/retaintopic", Mqtt::QoS1); QTRY_VERIFY2(publishReceivedSpy5.count() == 1, "Did not receive exactly 1 retained message."); publishReceivedSpy5.clear(); // post an empty payload to this topic. it should clear all retained messages client1->publish("/retaintopic", "", Mqtt::QoS1, true); QTRY_VERIFY2_WITH_TIMEOUT(publishReceivedSpy5.count() == 0, "Recaived a message but should not have.", 250); disconnectAndWait(client2); client2 = connectAndWait("client2"); QSignalSpy publishReceivedSpy6(client2, &MqttClient::publishReceived); client2->subscribe("/retaintopic", Mqtt::QoS1); QTRY_VERIFY2_WITH_TIMEOUT(publishReceivedSpy6.count() == 0, "Recaived a message but should not have.", 250); } void MqttTests::testUnsubscribe() { MqttClient *client1 = connectAndWait("client1"); QVERIFY(subscribeAndWait(client1, "testtopic")); QSignalSpy publishReceivedSpy(client1, &MqttClient::publishReceived); MqttClient *client2 = connectAndWait("client2"); client2->publish("testtopic", "Hello world"); QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Did not receive publish message"); QSignalSpy unsubscribedSpy(client1, &MqttClient::unsubscribed); QSignalSpy serverSideUnsubscribedSpy(m_server, &MqttServer::clientUnsubscribed); quint16 packetId = client1->unsubscribe("testtopic"); QTRY_VERIFY2(serverSideUnsubscribedSpy.count() == 1, "Server side unsubscribed signal not received"); QVERIFY2(serverSideUnsubscribedSpy.first().at(0).toString() == "client1", "ClientId not matching"); QVERIFY2(serverSideUnsubscribedSpy.first().at(1).toString() == "testtopic", "topicFilter not matching"); QTRY_VERIFY2(unsubscribedSpy.count() == 1, "Unsubscibed signal not emitted"); QVERIFY2(unsubscribedSpy.first().at(0).toInt() == packetId, "packet id not matching"); publishReceivedSpy.clear(); client2->publish("testtopic", "Hello world 2"); QTest::qWait(500); QVERIFY2(publishReceivedSpy.count() == 0, "Received publish packet even though we should not have"); } void MqttTests::testEmptyClientId() { MqttClient *client1 = connectAndWait(""); QVERIFY2(client1->isConnected(), "Client did not connect"); MqttClient *client2 = connectAndWait(""); QVERIFY2(client2->isConnected(), "Client did not connect"); QPair client3 = connectToServer("", false); QTRY_VERIFY2(client3.second->count() == 1, "Client did not emit connected signal"); QTRY_COMPARE(client3.second->first().at(0).value(), Mqtt::ConnectReturnCodeIdentifierRejected); QTRY_VERIFY2(client3.first->isConnected() == false, "Connection should have been dropped"); } void MqttTests::testBinaryPaylaod() { MqttClient *client = connectAndWait(""); QVERIFY2(client->isConnected(), "Client did not connect"); client->subscribe("#"); const char binData[] = {'\xA5', '\x20', '\x00', '\x04', '\x00', '\x52'}; QByteArray payload(QByteArray::fromRawData(binData, 6)); QSignalSpy publishReceivedSpy(client, &MqttClient::publishReceived); client->publish("testtopic", payload); QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Did not receive publish message"); QCOMPARE(publishReceivedSpy.first().at(1).toByteArray(), payload); } #endif