add SSL support to client

This commit is contained in:
Michael Zanetti 2018-11-28 20:24:14 +01:00
parent 7e35b3e79e
commit 4e4ed7bad5
5 changed files with 84 additions and 40 deletions

View File

@ -31,14 +31,17 @@ MqttClientPrivate::MqttClientPrivate(MqttClient *q):
qRegisterMetaType<Mqtt::SubscribeReturnCodes>();
qRegisterMetaType<Mqtt::ConnackFlags>();
reconnectTimer.setSingleShot(true);
connect(&keepAliveTimer, &QTimer::timeout, this, &MqttClientPrivate::sendPingreq);
connect(&reconnectTimer, &QTimer::timeout, this, &MqttClientPrivate::reconnectTimerTimeout);
}
void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, bool cleanSession)
void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, bool cleanSession, bool useSsl, const QSslConfiguration &sslConfiguration)
{
if (serverHostname != hostName || serverPort != port) {
if (serverHostname != hostName || serverPort != port || this->useSsl != useSsl || sslConfiguration != this->sslConfiguration) {
serverHostname = hostName;
serverPort = port;
this->useSsl = useSsl;
this->sslConfiguration = sslConfiguration;
reconnectAttempt = 1;
reconnectTimer.stop();
}
@ -50,13 +53,21 @@ void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, boo
socket->abort();
socket->deleteLater();
}
socket = new QTcpSocket(this);
socket = new QSslSocket(this);
socket->setSslConfiguration(sslConfiguration);
connect(socket, &QTcpSocket::connected, this, &MqttClientPrivate::onConnected);
connect(socket, &QTcpSocket::disconnected, this, &MqttClientPrivate::onDisconnected);
connect(socket, &QTcpSocket::readyRead, this, &MqttClientPrivate::onReadyRead);
connect(socket, &QTcpSocket::stateChanged, this, &MqttClientPrivate::onSocketStateChanged);
// connect(d_ptr->socket, &QTcpSocket::error, this, &MqttClient::error);
socket->connectToHost(hostName, port);
typedef void (QSslSocket:: *sslErrorsSignal)(const QList<QSslError> &);
connect(socket, static_cast<sslErrorsSignal>(&QSslSocket::sslErrors), this, &MqttClientPrivate::onSslErrors);
typedef void (QSslSocket:: *errorSignal)(QAbstractSocket::SocketError);
connect(socket, static_cast<errorSignal>(&QSslSocket::error), this, &MqttClientPrivate::onSocketError);
if (useSsl) {
socket->connectToHostEncrypted(hostName, port);
} else {
socket->connectToHost(hostName, port);
}
}
void MqttClientPrivate::disconnectFromHost()
@ -90,10 +101,6 @@ MqttClient::MqttClient(const QString &clientId, quint16 keepAlive, const QString
d_ptr->willMessage = willMessage;
d_ptr->willQoS = willQoS;
d_ptr->willRetain = willRetain;
if (keepAlive > 0) {
connect(&d_ptr->keepAliveTimer, &QTimer::timeout, d_ptr, &MqttClientPrivate::sendPingreq);
}
}
bool MqttClient::autoReconnect() const
@ -181,9 +188,9 @@ void MqttClient::setPassword(const QString &password)
d_ptr->password = password;
}
void MqttClient::connectToHost(const QString &hostName, quint16 port, bool cleanSession)
void MqttClient::connectToHost(const QString &hostName, quint16 port, bool cleanSession, bool useSsl, const QSslConfiguration &sslConfiguration)
{
d_ptr->connectToHost(hostName, port, cleanSession);
d_ptr->connectToHost(hostName, port, cleanSession, useSsl, sslConfiguration);
}
void MqttClient::disconnectFromHost()
@ -247,8 +254,8 @@ quint16 MqttClient::publish(const QString &topic, const QByteArray &payload, Mqt
packet.setPayload(payload);
d_ptr->socket->write(packet.serialize());
if (qos == Mqtt::QoS0) {
QTimer::singleShot(0, this, [this, packetId](){
emit published(packetId);
QTimer::singleShot(0, this, [this, packet](){
emit published(packet.packetId(), packet.topic());
});
} else {
d_ptr->unackedPackets.insert(packet.packetId(), packet);
@ -279,7 +286,7 @@ void MqttClientPrivate::onDisconnected()
emit q_ptr->disconnected();
if (sessionActive && autoReconnect) {
reconnectAttempt = qMin(maxReconnectTimeout / 60 / 60, reconnectAttempt * 2);
qCDebug(dbgClient) << "Reconnecint in" << reconnectAttempt << "seconds";
qCDebug(dbgClient) << "Reconnecing in" << reconnectAttempt << "seconds";
reconnectTimer.setInterval(reconnectAttempt * 1000);
reconnectTimer.start();
}
@ -289,7 +296,7 @@ void MqttClientPrivate::onReadyRead()
{
static QByteArray data;
data.append(socket->readAll());
// qCDebug(dbgClient) << "Received data from server:" << data.toHex();
// qCDebug(dbgClient) << "Received data from server:" << data.toHex() << "\n" << data;
MqttPacket packet;
int ret = packet.parse(data);
if (ret == -1) {
@ -356,16 +363,19 @@ void MqttClientPrivate::onReadyRead()
}
}
break;
case MqttPacket::TypePuback:
unackedPackets.remove(packet.packetId());
case MqttPacket::TypePuback: {
MqttPacket publishPacket = unackedPackets.take(packet.packetId());
unackedPacketList.removeAll(packet.packetId());
emit q_ptr->published(packet.packetId());
emit q_ptr->published(packet.packetId(), publishPacket.topic());
restartKeepAliveTimer();
break;
}
case MqttPacket::TypePubrec: {
MqttPacket publishPacket = unackedPackets.value(packet.packetId());
MqttPacket response(MqttPacket::TypePubrel, packet.packetId());
unackedPackets[packet.packetId()] = response;
socket->write(response.serialize());
emit q_ptr->published(packet.packetId(), publishPacket.topic());
restartKeepAliveTimer();
break;
}
@ -379,15 +389,28 @@ void MqttClientPrivate::onReadyRead()
case MqttPacket::TypePubcomp:
unackedPackets.remove(packet.packetId());
unackedPacketList.removeAll(packet.packetId());
emit q_ptr->published(packet.packetId());
restartKeepAliveTimer();
break;
case MqttPacket::TypeSuback:
unackedPackets.remove(packet.packetId());
case MqttPacket::TypeSuback: {
MqttPacket subscribePacket = unackedPackets.take(packet.packetId());
unackedPacketList.removeAll(packet.packetId());
emit q_ptr->subscribed(packet.packetId(), packet.subscribeReturnCodes());
if (subscribePacket.subscriptions().count() != packet.subscribeReturnCodes().count()) {
qCWarning(dbgClient) << "Subscription return code count not matching subscribe packet!";
socket->abort();
return;
}
// Ack the subscription packet
emit q_ptr->subscribeResult(packet.packetId(), packet.subscribeReturnCodes());
// emit subscribed for each topic
for (int i = 0; i < packet.subscribeReturnCodes().count(); i++) {
emit q_ptr->subscribed(subscribePacket.subscriptions().at(i).topicFilter(), packet.subscribeReturnCodes().at(i));
}
restartKeepAliveTimer();
break;
}
case MqttPacket::TypeUnsuback:
if (!unackedPackets.contains(packet.packetId())) {
qCWarning(dbgClient) << "UNSUBACK received but not waiting for it. Dropping connection. Packet ID:" << packet.packetId();
@ -416,6 +439,17 @@ void MqttClientPrivate::onSocketStateChanged(QAbstractSocket::SocketState socket
emit q_ptr->stateChanged(socketState);
}
void MqttClientPrivate::onSocketError(QAbstractSocket::SocketError error)
{
qCWarning(dbgClient) << "MQTT socket error:" << error;
emit q_ptr->error(error);
}
void MqttClientPrivate::onSslErrors(const QList<QSslError> &errors)
{
qCWarning(dbgClient) << "SSL error in MQTT connection:" << errors;
}
quint16 MqttClientPrivate::newPacketId()
{
static quint16 packetId = 1;
@ -443,5 +477,5 @@ void MqttClientPrivate::reconnectTimerTimeout()
if (!autoReconnect) {
return;
}
connectToHost(serverHostname, serverPort, false);
connectToHost(serverHostname, serverPort, false, useSsl, sslConfiguration);
}

View File

@ -23,6 +23,7 @@
#include <QObject>
#include <QAbstractSocket>
#include <QSslConfiguration>
#include "mqttpacket.h"
#include "mqttsubscription.h"
@ -63,7 +64,7 @@ public:
QString password() const;
void setPassword(const QString &password);
void connectToHost(const QString &hostName, quint16 port, bool cleanSession = true);
void connectToHost(const QString &hostName, quint16 port, bool cleanSession = true, bool useSsl = false, const QSslConfiguration &sslConfiguration = QSslConfiguration());
void disconnectFromHost();
bool isConnected() const;
@ -85,9 +86,10 @@ signals:
void stateChanged(QAbstractSocket::SocketState state);
void error(QAbstractSocket::SocketError socketError);
void subscribed(quint16 packetId, const Mqtt::SubscribeReturnCodes &subscribeReturnCodes);
void subscribeResult(quint16 packetId, const Mqtt::SubscribeReturnCodes &subscribeReturnCodes);
void subscribed(const QString &topic, Mqtt::SubscribeReturnCode subscribeReturnCode);
void unsubscribed(quint16 packetId);
void published(quint16 packetId);
void published(quint16 packetId, const QString &topic);
void publishReceived(const QString &topic, const QByteArray &payload, bool retained);
private:

View File

@ -39,7 +39,7 @@ public:
MqttClientPrivate(MqttClient *q);
MqttClient *q_ptr;
void connectToHost(const QString &hostName, quint16 port, bool cleanSession);
void connectToHost(const QString &hostName, quint16 port, bool cleanSession, bool useSsl, const QSslConfiguration &sslConfiguration);
void disconnectFromHost();
public slots:
@ -47,6 +47,8 @@ public slots:
void onDisconnected();
void onReadyRead();
void onSocketStateChanged(QAbstractSocket::SocketState socketState);
void onSocketError(QAbstractSocket::SocketError error);
void onSslErrors(const QList<QSslError> &errors);
quint16 newPacketId();
void sendPingreq();
@ -57,10 +59,12 @@ public slots:
public:
QString serverHostname;
quint16 serverPort = 0;
bool useSsl = false;
QSslConfiguration sslConfiguration;
bool autoReconnect = true;
bool sessionActive = false;
bool cleanSession = true;
QTcpSocket *socket = nullptr;
QSslSocket *socket = nullptr;
QTimer reconnectTimer;
int reconnectAttempt = 0;
quint16 maxReconnectTimeout = 36000;

View File

@ -355,6 +355,10 @@ int MqttPacket::parse(const QByteArray &buffer)
remainingLength += (lengthBit & 0x7F) * multiplier;
multiplier *= 128;
lenFields++;
if (multiplier > 128*128*128) {
qCWarning(dbgProto) << "Remaining Length field invalid";
return -1;
}
} while((lengthBit & 0x80) != 0);
if (remainingLength > buffer.length() - 1 - lenFields) {
@ -370,7 +374,7 @@ int MqttPacket::parse(const QByteArray &buffer)
const quint16 fullRemainingLength = remainingLength;
quint16 strLen;
const quint16 MAX_STRLEN = 256;
const quint16 MAX_STRLEN = remainingLength;
char str[MAX_STRLEN];
switch (type()) {

View File

@ -134,7 +134,7 @@ void OperationTests::disconnectAndWait(MqttClient* client)
bool OperationTests::subscribeAndWait(MqttClient* client, const QString &topic, Mqtt::QoS qos)
{
QSignalSpy subscribedSpy(client, &MqttClient::subscribed);
QSignalSpy subscribedSpy(client, &MqttClient::subscribeResult);
quint16 packetId = client->subscribe(topic, qos);
if (subscribedSpy.count() == 0) {
subscribedSpy.wait();
@ -242,7 +242,7 @@ void OperationTests::subscribeAndPublish()
MqttClient *client2 = connectAndWait(clientId2);
QSignalSpy serverSubscribeSpy(m_server, &MqttServer::clientSubscribed);
QSignalSpy clientSubscribeSpy(client1, &MqttClient::subscribed);
QSignalSpy clientSubscribeSpy(client1, &MqttClient::subscribeResult);
quint16 packetId = client1->subscribe("#", qosClient1);
@ -301,7 +301,7 @@ void OperationTests::willIsNotSentOnClientDisconnecting()
MqttClient *client1 = connectAndWait("subWill-client");
MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye");
QSignalSpy subscribeSpy(client1, &MqttClient::subscribed);
QSignalSpy subscribeSpy(client1, &MqttClient::subscribeResult);
QSignalSpy publishSpy(client1, &MqttClient::publishReceived);
client1->subscribe("#");
@ -318,7 +318,7 @@ void OperationTests::testWillRetain()
MqttClient *client1 = connectAndWait("subWill-client");
MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye", Mqtt::QoS1, true);
QSignalSpy subscribeSpy(client1, &MqttClient::subscribed);
QSignalSpy subscribeSpy(client1, &MqttClient::subscribeResult);
QSignalSpy publishSpy(client1, &MqttClient::publishReceived);
client1->subscribe("#");
@ -392,7 +392,7 @@ void OperationTests::testQoS1Retransmissions()
void OperationTests::testMultiSubscription()
{
MqttClient *client = connectAndWait("subscription-topics");
QSignalSpy subscribedSpy(client, &MqttClient::subscribed);
QSignalSpy subscribedSpy(client, &MqttClient::subscribeResult);
MqttSubscriptions subscriptions = { MqttSubscription("topic1"), MqttSubscription("topic2") , MqttSubscription("#invalid") };
Mqtt::SubscribeReturnCodes subscriptionReturnCodes = { Mqtt::SubscribeReturnCodeSuccessQoS0, Mqtt::SubscribeReturnCodeSuccessQoS0, Mqtt::SubscribeReturnCodeFailure};
@ -433,7 +433,7 @@ void OperationTests::testSubscriptionTopicFilters()
QFETCH(Mqtt::SubscribeReturnCode, subscriptionReturnCode);
MqttClient *client = connectAndWait("subscription-topics");
QSignalSpy subscribedSpy(client, &MqttClient::subscribed);
QSignalSpy subscribedSpy(client, &MqttClient::subscribeResult);
client->subscribe(topicFilter);
QTRY_VERIFY2(subscribedSpy.count() == 1, "Subscribed signal not received");
@ -508,7 +508,7 @@ void OperationTests::testSubscriptionTopicMatching()
MqttClient *publisher = connectAndWait("publisher");
MqttClient *subscriber = connectAndWait("subscriber");
QSignalSpy subscribedSpy(subscriber, &MqttClient::subscribed);
QSignalSpy subscribedSpy(subscriber, &MqttClient::subscribeResult);
QSignalSpy publishReceivedSpy(subscriber, &MqttClient::publishReceived);
QSignalSpy publishedSpy(publisher, &MqttClient::published);
@ -532,7 +532,7 @@ void OperationTests::testSessionManagementDropOldSession()
MqttClient *client1Session1 = connectAndWait("client1");
client1Session1->setAutoReconnect(false);
QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribed);
QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribeResult);
client1Session1->subscribe("/testtopic");
QTRY_VERIFY(subscribeSpy.count() == 1);
@ -564,7 +564,7 @@ void OperationTests::testSessionManagementResumeOldSession()
MqttClient *client1Session1 = connectAndWait("client1");
client1Session1->setAutoReconnect(false);
QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribed);
QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribeResult);
client1Session1->subscribe("/testtopic");
QTRY_VERIFY(subscribeSpy.count() == 1);
@ -623,7 +623,7 @@ void OperationTests::testQoS1PublishToServerIsAckedOnSessionResume()
void OperationTests::testQoS1PublishToClientIsDeliveredOnSessionResume()
{
MqttClient *oldClient1 = connectAndWait("client1", true);
QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribed);
QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribeResult);
oldClient1->subscribe("/testtopic", Mqtt::QoS1);
QTRY_VERIFY(subscribedSpy.count() == 1);
@ -668,7 +668,7 @@ void OperationTests::testQoS2PublishToServerIsCompletedOnSessionResume()
void OperationTests::testQoS2PublishToClientIsCompletedOnSessionResume()
{
MqttClient *oldClient1 = connectAndWait("client1", true);
QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribed);
QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribeResult);
oldClient1->subscribe("/testtopic", Mqtt::QoS2);
QTRY_VERIFY(subscribedSpy.count() == 1);