From 4e4ed7bad50b3c4c3eb14180785c201fa059706d Mon Sep 17 00:00:00 2001 From: Michael Zanetti Date: Wed, 28 Nov 2018 20:24:14 +0100 Subject: [PATCH] add SSL support to client --- libnymea-mqtt/mqttclient.cpp | 80 +++++++++++++++++++++--------- libnymea-mqtt/mqttclient.h | 8 +-- libnymea-mqtt/mqttclient_p.h | 8 ++- libnymea-mqtt/mqttpacket.cpp | 6 ++- tests/operation/test_operation.cpp | 22 ++++---- 5 files changed, 84 insertions(+), 40 deletions(-) diff --git a/libnymea-mqtt/mqttclient.cpp b/libnymea-mqtt/mqttclient.cpp index e38063c..efc9565 100644 --- a/libnymea-mqtt/mqttclient.cpp +++ b/libnymea-mqtt/mqttclient.cpp @@ -31,14 +31,17 @@ MqttClientPrivate::MqttClientPrivate(MqttClient *q): qRegisterMetaType(); qRegisterMetaType(); reconnectTimer.setSingleShot(true); + connect(&keepAliveTimer, &QTimer::timeout, this, &MqttClientPrivate::sendPingreq); connect(&reconnectTimer, &QTimer::timeout, this, &MqttClientPrivate::reconnectTimerTimeout); } -void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, bool cleanSession) +void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, bool cleanSession, bool useSsl, const QSslConfiguration &sslConfiguration) { - if (serverHostname != hostName || serverPort != port) { + if (serverHostname != hostName || serverPort != port || this->useSsl != useSsl || sslConfiguration != this->sslConfiguration) { serverHostname = hostName; serverPort = port; + this->useSsl = useSsl; + this->sslConfiguration = sslConfiguration; reconnectAttempt = 1; reconnectTimer.stop(); } @@ -50,13 +53,21 @@ void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, boo socket->abort(); socket->deleteLater(); } - socket = new QTcpSocket(this); + socket = new QSslSocket(this); + socket->setSslConfiguration(sslConfiguration); connect(socket, &QTcpSocket::connected, this, &MqttClientPrivate::onConnected); connect(socket, &QTcpSocket::disconnected, this, &MqttClientPrivate::onDisconnected); connect(socket, &QTcpSocket::readyRead, this, &MqttClientPrivate::onReadyRead); connect(socket, &QTcpSocket::stateChanged, this, &MqttClientPrivate::onSocketStateChanged); -// connect(d_ptr->socket, &QTcpSocket::error, this, &MqttClient::error); - socket->connectToHost(hostName, port); + typedef void (QSslSocket:: *sslErrorsSignal)(const QList &); + connect(socket, static_cast(&QSslSocket::sslErrors), this, &MqttClientPrivate::onSslErrors); + typedef void (QSslSocket:: *errorSignal)(QAbstractSocket::SocketError); + connect(socket, static_cast(&QSslSocket::error), this, &MqttClientPrivate::onSocketError); + if (useSsl) { + socket->connectToHostEncrypted(hostName, port); + } else { + socket->connectToHost(hostName, port); + } } void MqttClientPrivate::disconnectFromHost() @@ -90,10 +101,6 @@ MqttClient::MqttClient(const QString &clientId, quint16 keepAlive, const QString d_ptr->willMessage = willMessage; d_ptr->willQoS = willQoS; d_ptr->willRetain = willRetain; - - if (keepAlive > 0) { - connect(&d_ptr->keepAliveTimer, &QTimer::timeout, d_ptr, &MqttClientPrivate::sendPingreq); - } } bool MqttClient::autoReconnect() const @@ -181,9 +188,9 @@ void MqttClient::setPassword(const QString &password) d_ptr->password = password; } -void MqttClient::connectToHost(const QString &hostName, quint16 port, bool cleanSession) +void MqttClient::connectToHost(const QString &hostName, quint16 port, bool cleanSession, bool useSsl, const QSslConfiguration &sslConfiguration) { - d_ptr->connectToHost(hostName, port, cleanSession); + d_ptr->connectToHost(hostName, port, cleanSession, useSsl, sslConfiguration); } void MqttClient::disconnectFromHost() @@ -247,8 +254,8 @@ quint16 MqttClient::publish(const QString &topic, const QByteArray &payload, Mqt packet.setPayload(payload); d_ptr->socket->write(packet.serialize()); if (qos == Mqtt::QoS0) { - QTimer::singleShot(0, this, [this, packetId](){ - emit published(packetId); + QTimer::singleShot(0, this, [this, packet](){ + emit published(packet.packetId(), packet.topic()); }); } else { d_ptr->unackedPackets.insert(packet.packetId(), packet); @@ -279,7 +286,7 @@ void MqttClientPrivate::onDisconnected() emit q_ptr->disconnected(); if (sessionActive && autoReconnect) { reconnectAttempt = qMin(maxReconnectTimeout / 60 / 60, reconnectAttempt * 2); - qCDebug(dbgClient) << "Reconnecint in" << reconnectAttempt << "seconds"; + qCDebug(dbgClient) << "Reconnecing in" << reconnectAttempt << "seconds"; reconnectTimer.setInterval(reconnectAttempt * 1000); reconnectTimer.start(); } @@ -289,7 +296,7 @@ void MqttClientPrivate::onReadyRead() { static QByteArray data; data.append(socket->readAll()); -// qCDebug(dbgClient) << "Received data from server:" << data.toHex(); +// qCDebug(dbgClient) << "Received data from server:" << data.toHex() << "\n" << data; MqttPacket packet; int ret = packet.parse(data); if (ret == -1) { @@ -356,16 +363,19 @@ void MqttClientPrivate::onReadyRead() } } break; - case MqttPacket::TypePuback: - unackedPackets.remove(packet.packetId()); + case MqttPacket::TypePuback: { + MqttPacket publishPacket = unackedPackets.take(packet.packetId()); unackedPacketList.removeAll(packet.packetId()); - emit q_ptr->published(packet.packetId()); + emit q_ptr->published(packet.packetId(), publishPacket.topic()); restartKeepAliveTimer(); break; + } case MqttPacket::TypePubrec: { + MqttPacket publishPacket = unackedPackets.value(packet.packetId()); MqttPacket response(MqttPacket::TypePubrel, packet.packetId()); unackedPackets[packet.packetId()] = response; socket->write(response.serialize()); + emit q_ptr->published(packet.packetId(), publishPacket.topic()); restartKeepAliveTimer(); break; } @@ -379,15 +389,28 @@ void MqttClientPrivate::onReadyRead() case MqttPacket::TypePubcomp: unackedPackets.remove(packet.packetId()); unackedPacketList.removeAll(packet.packetId()); - emit q_ptr->published(packet.packetId()); restartKeepAliveTimer(); break; - case MqttPacket::TypeSuback: - unackedPackets.remove(packet.packetId()); + case MqttPacket::TypeSuback: { + MqttPacket subscribePacket = unackedPackets.take(packet.packetId()); unackedPacketList.removeAll(packet.packetId()); - emit q_ptr->subscribed(packet.packetId(), packet.subscribeReturnCodes()); + + if (subscribePacket.subscriptions().count() != packet.subscribeReturnCodes().count()) { + qCWarning(dbgClient) << "Subscription return code count not matching subscribe packet!"; + socket->abort(); + return; + } + + // Ack the subscription packet + emit q_ptr->subscribeResult(packet.packetId(), packet.subscribeReturnCodes()); + + // emit subscribed for each topic + for (int i = 0; i < packet.subscribeReturnCodes().count(); i++) { + emit q_ptr->subscribed(subscribePacket.subscriptions().at(i).topicFilter(), packet.subscribeReturnCodes().at(i)); + } restartKeepAliveTimer(); break; + } case MqttPacket::TypeUnsuback: if (!unackedPackets.contains(packet.packetId())) { qCWarning(dbgClient) << "UNSUBACK received but not waiting for it. Dropping connection. Packet ID:" << packet.packetId(); @@ -416,6 +439,17 @@ void MqttClientPrivate::onSocketStateChanged(QAbstractSocket::SocketState socket emit q_ptr->stateChanged(socketState); } +void MqttClientPrivate::onSocketError(QAbstractSocket::SocketError error) +{ + qCWarning(dbgClient) << "MQTT socket error:" << error; + emit q_ptr->error(error); +} + +void MqttClientPrivate::onSslErrors(const QList &errors) +{ + qCWarning(dbgClient) << "SSL error in MQTT connection:" << errors; +} + quint16 MqttClientPrivate::newPacketId() { static quint16 packetId = 1; @@ -443,5 +477,5 @@ void MqttClientPrivate::reconnectTimerTimeout() if (!autoReconnect) { return; } - connectToHost(serverHostname, serverPort, false); + connectToHost(serverHostname, serverPort, false, useSsl, sslConfiguration); } diff --git a/libnymea-mqtt/mqttclient.h b/libnymea-mqtt/mqttclient.h index 1a75ce2..23a2430 100644 --- a/libnymea-mqtt/mqttclient.h +++ b/libnymea-mqtt/mqttclient.h @@ -23,6 +23,7 @@ #include #include +#include #include "mqttpacket.h" #include "mqttsubscription.h" @@ -63,7 +64,7 @@ public: QString password() const; void setPassword(const QString &password); - void connectToHost(const QString &hostName, quint16 port, bool cleanSession = true); + void connectToHost(const QString &hostName, quint16 port, bool cleanSession = true, bool useSsl = false, const QSslConfiguration &sslConfiguration = QSslConfiguration()); void disconnectFromHost(); bool isConnected() const; @@ -85,9 +86,10 @@ signals: void stateChanged(QAbstractSocket::SocketState state); void error(QAbstractSocket::SocketError socketError); - void subscribed(quint16 packetId, const Mqtt::SubscribeReturnCodes &subscribeReturnCodes); + void subscribeResult(quint16 packetId, const Mqtt::SubscribeReturnCodes &subscribeReturnCodes); + void subscribed(const QString &topic, Mqtt::SubscribeReturnCode subscribeReturnCode); void unsubscribed(quint16 packetId); - void published(quint16 packetId); + void published(quint16 packetId, const QString &topic); void publishReceived(const QString &topic, const QByteArray &payload, bool retained); private: diff --git a/libnymea-mqtt/mqttclient_p.h b/libnymea-mqtt/mqttclient_p.h index 350c1d0..0afb4df 100644 --- a/libnymea-mqtt/mqttclient_p.h +++ b/libnymea-mqtt/mqttclient_p.h @@ -39,7 +39,7 @@ public: MqttClientPrivate(MqttClient *q); MqttClient *q_ptr; - void connectToHost(const QString &hostName, quint16 port, bool cleanSession); + void connectToHost(const QString &hostName, quint16 port, bool cleanSession, bool useSsl, const QSslConfiguration &sslConfiguration); void disconnectFromHost(); public slots: @@ -47,6 +47,8 @@ public slots: void onDisconnected(); void onReadyRead(); void onSocketStateChanged(QAbstractSocket::SocketState socketState); + void onSocketError(QAbstractSocket::SocketError error); + void onSslErrors(const QList &errors); quint16 newPacketId(); void sendPingreq(); @@ -57,10 +59,12 @@ public slots: public: QString serverHostname; quint16 serverPort = 0; + bool useSsl = false; + QSslConfiguration sslConfiguration; bool autoReconnect = true; bool sessionActive = false; bool cleanSession = true; - QTcpSocket *socket = nullptr; + QSslSocket *socket = nullptr; QTimer reconnectTimer; int reconnectAttempt = 0; quint16 maxReconnectTimeout = 36000; diff --git a/libnymea-mqtt/mqttpacket.cpp b/libnymea-mqtt/mqttpacket.cpp index 2c68b1a..4a55159 100644 --- a/libnymea-mqtt/mqttpacket.cpp +++ b/libnymea-mqtt/mqttpacket.cpp @@ -355,6 +355,10 @@ int MqttPacket::parse(const QByteArray &buffer) remainingLength += (lengthBit & 0x7F) * multiplier; multiplier *= 128; lenFields++; + if (multiplier > 128*128*128) { + qCWarning(dbgProto) << "Remaining Length field invalid"; + return -1; + } } while((lengthBit & 0x80) != 0); if (remainingLength > buffer.length() - 1 - lenFields) { @@ -370,7 +374,7 @@ int MqttPacket::parse(const QByteArray &buffer) const quint16 fullRemainingLength = remainingLength; quint16 strLen; - const quint16 MAX_STRLEN = 256; + const quint16 MAX_STRLEN = remainingLength; char str[MAX_STRLEN]; switch (type()) { diff --git a/tests/operation/test_operation.cpp b/tests/operation/test_operation.cpp index 69488f8..b5ee4a9 100644 --- a/tests/operation/test_operation.cpp +++ b/tests/operation/test_operation.cpp @@ -134,7 +134,7 @@ void OperationTests::disconnectAndWait(MqttClient* client) bool OperationTests::subscribeAndWait(MqttClient* client, const QString &topic, Mqtt::QoS qos) { - QSignalSpy subscribedSpy(client, &MqttClient::subscribed); + QSignalSpy subscribedSpy(client, &MqttClient::subscribeResult); quint16 packetId = client->subscribe(topic, qos); if (subscribedSpy.count() == 0) { subscribedSpy.wait(); @@ -242,7 +242,7 @@ void OperationTests::subscribeAndPublish() MqttClient *client2 = connectAndWait(clientId2); QSignalSpy serverSubscribeSpy(m_server, &MqttServer::clientSubscribed); - QSignalSpy clientSubscribeSpy(client1, &MqttClient::subscribed); + QSignalSpy clientSubscribeSpy(client1, &MqttClient::subscribeResult); quint16 packetId = client1->subscribe("#", qosClient1); @@ -301,7 +301,7 @@ void OperationTests::willIsNotSentOnClientDisconnecting() MqttClient *client1 = connectAndWait("subWill-client"); MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye"); - QSignalSpy subscribeSpy(client1, &MqttClient::subscribed); + QSignalSpy subscribeSpy(client1, &MqttClient::subscribeResult); QSignalSpy publishSpy(client1, &MqttClient::publishReceived); client1->subscribe("#"); @@ -318,7 +318,7 @@ void OperationTests::testWillRetain() MqttClient *client1 = connectAndWait("subWill-client"); MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye", Mqtt::QoS1, true); - QSignalSpy subscribeSpy(client1, &MqttClient::subscribed); + QSignalSpy subscribeSpy(client1, &MqttClient::subscribeResult); QSignalSpy publishSpy(client1, &MqttClient::publishReceived); client1->subscribe("#"); @@ -392,7 +392,7 @@ void OperationTests::testQoS1Retransmissions() void OperationTests::testMultiSubscription() { MqttClient *client = connectAndWait("subscription-topics"); - QSignalSpy subscribedSpy(client, &MqttClient::subscribed); + QSignalSpy subscribedSpy(client, &MqttClient::subscribeResult); MqttSubscriptions subscriptions = { MqttSubscription("topic1"), MqttSubscription("topic2") , MqttSubscription("#invalid") }; Mqtt::SubscribeReturnCodes subscriptionReturnCodes = { Mqtt::SubscribeReturnCodeSuccessQoS0, Mqtt::SubscribeReturnCodeSuccessQoS0, Mqtt::SubscribeReturnCodeFailure}; @@ -433,7 +433,7 @@ void OperationTests::testSubscriptionTopicFilters() QFETCH(Mqtt::SubscribeReturnCode, subscriptionReturnCode); MqttClient *client = connectAndWait("subscription-topics"); - QSignalSpy subscribedSpy(client, &MqttClient::subscribed); + QSignalSpy subscribedSpy(client, &MqttClient::subscribeResult); client->subscribe(topicFilter); QTRY_VERIFY2(subscribedSpy.count() == 1, "Subscribed signal not received"); @@ -508,7 +508,7 @@ void OperationTests::testSubscriptionTopicMatching() MqttClient *publisher = connectAndWait("publisher"); MqttClient *subscriber = connectAndWait("subscriber"); - QSignalSpy subscribedSpy(subscriber, &MqttClient::subscribed); + QSignalSpy subscribedSpy(subscriber, &MqttClient::subscribeResult); QSignalSpy publishReceivedSpy(subscriber, &MqttClient::publishReceived); QSignalSpy publishedSpy(publisher, &MqttClient::published); @@ -532,7 +532,7 @@ void OperationTests::testSessionManagementDropOldSession() MqttClient *client1Session1 = connectAndWait("client1"); client1Session1->setAutoReconnect(false); - QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribed); + QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribeResult); client1Session1->subscribe("/testtopic"); QTRY_VERIFY(subscribeSpy.count() == 1); @@ -564,7 +564,7 @@ void OperationTests::testSessionManagementResumeOldSession() MqttClient *client1Session1 = connectAndWait("client1"); client1Session1->setAutoReconnect(false); - QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribed); + QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribeResult); client1Session1->subscribe("/testtopic"); QTRY_VERIFY(subscribeSpy.count() == 1); @@ -623,7 +623,7 @@ void OperationTests::testQoS1PublishToServerIsAckedOnSessionResume() void OperationTests::testQoS1PublishToClientIsDeliveredOnSessionResume() { MqttClient *oldClient1 = connectAndWait("client1", true); - QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribed); + QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribeResult); oldClient1->subscribe("/testtopic", Mqtt::QoS1); QTRY_VERIFY(subscribedSpy.count() == 1); @@ -668,7 +668,7 @@ void OperationTests::testQoS2PublishToServerIsCompletedOnSessionResume() void OperationTests::testQoS2PublishToClientIsCompletedOnSessionResume() { MqttClient *oldClient1 = connectAndWait("client1", true); - QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribed); + QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribeResult); oldClient1->subscribe("/testtopic", Mqtt::QoS2); QTRY_VERIFY(subscribedSpy.count() == 1);