From 7e35b3e79ec0195b687f14a0960a75599b6cf683 Mon Sep 17 00:00:00 2001 From: Michael Zanetti Date: Tue, 27 Nov 2018 23:31:23 +0100 Subject: [PATCH] add reconnect feature to client and improve server api a bit --- libnymea-mqtt/mqttclient.cpp | 44 ++++++- libnymea-mqtt/mqttclient.h | 3 + libnymea-mqtt/mqttclient_p.h | 5 + libnymea-mqtt/mqttpacket.cpp | 2 + libnymea-mqtt/mqttserver.cpp | 201 ++++++++++++++++++++--------- libnymea-mqtt/mqttserver.h | 25 ++-- libnymea-mqtt/mqttserver_p.h | 40 +++++- tests/operation/test_operation.cpp | 5 +- 8 files changed, 242 insertions(+), 83 deletions(-) diff --git a/libnymea-mqtt/mqttclient.cpp b/libnymea-mqtt/mqttclient.cpp index 9fcd70f..e38063c 100644 --- a/libnymea-mqtt/mqttclient.cpp +++ b/libnymea-mqtt/mqttclient.cpp @@ -30,12 +30,18 @@ MqttClientPrivate::MqttClientPrivate(MqttClient *q): { qRegisterMetaType(); qRegisterMetaType(); + reconnectTimer.setSingleShot(true); + connect(&reconnectTimer, &QTimer::timeout, this, &MqttClientPrivate::reconnectTimerTimeout); } void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, bool cleanSession) { - serverHostname = hostName; - serverPort = port; + if (serverHostname != hostName || serverPort != port) { + serverHostname = hostName; + serverPort = port; + reconnectAttempt = 1; + reconnectTimer.stop(); + } this->cleanSession = cleanSession; sessionActive = true; @@ -100,6 +106,16 @@ void MqttClient::setAutoReconnect(bool autoReconnect) d_ptr->autoReconnect = autoReconnect; } +quint16 MqttClient::maxAutoReconnectTimeout() const +{ + return d_ptr->maxReconnectTimeout; +} + +void MqttClient::setMaxAutoReconnectTimeout(quint16 maxAutoReconnectTimeout) +{ + d_ptr->maxReconnectTimeout = maxAutoReconnectTimeout; +} + void MqttClient::setKeepAlive(quint16 keepAlive) { d_ptr->keepAlive = keepAlive; @@ -262,7 +278,10 @@ void MqttClientPrivate::onDisconnected() qCDebug(dbgClient) << "Disconnected from server"; emit q_ptr->disconnected(); if (sessionActive && autoReconnect) { - connectToHost(serverHostname, serverPort, cleanSession); + reconnectAttempt = qMin(maxReconnectTimeout / 60 / 60, reconnectAttempt * 2); + qCDebug(dbgClient) << "Reconnecint in" << reconnectAttempt << "seconds"; + reconnectTimer.setInterval(reconnectAttempt * 1000); + reconnectTimer.start(); } } @@ -287,18 +306,23 @@ void MqttClientPrivate::onReadyRead() switch (packet.type()) { case MqttPacket::TypeConnack: - emit q_ptr->connected(packet.connectReturnCode(), packet.connackFlags()); if (packet.connectReturnCode() != Mqtt::ConnectReturnCodeAccepted) { qCWarning(dbgClient) << "MQTT connection refused:" << packet.connectReturnCode(); + // Always emit connected, even if just to indicate a "ClientRefusedError" + emit q_ptr->connected(packet.connectReturnCode(), packet.connackFlags()); socket->abort(); - emit q_ptr->disconnected(); + emit q_ptr->error(QAbstractSocket::ConnectionRefusedError); return; } foreach (quint16 retryPacketId, unackedPacketList) { MqttPacket retryPacket = unackedPackets.value(retryPacketId); - retryPacket.setDup(true); + if (retryPacket.type() == MqttPacket::TypePublish) { + retryPacket.setDup(true); + } socket->write(retryPacket.serialize()); } + // Make sure we emit connected after having handled all the retransmission queue + emit q_ptr->connected(packet.connectReturnCode(), packet.connackFlags()); restartKeepAliveTimer(); break; case MqttPacket::TypePublish: @@ -413,3 +437,11 @@ void MqttClientPrivate::restartKeepAliveTimer() keepAliveTimer.start(keepAlive * 1000); } } + +void MqttClientPrivate::reconnectTimerTimeout() +{ + if (!autoReconnect) { + return; + } + connectToHost(serverHostname, serverPort, false); +} diff --git a/libnymea-mqtt/mqttclient.h b/libnymea-mqtt/mqttclient.h index 35214ca..1a75ce2 100644 --- a/libnymea-mqtt/mqttclient.h +++ b/libnymea-mqtt/mqttclient.h @@ -39,6 +39,9 @@ public: bool autoReconnect() const; void setAutoReconnect(bool autoReconnect); + quint16 maxAutoReconnectTimeout() const; + void setMaxAutoReconnectTimeout(quint16 maxAutoReconnectTimeout); + quint16 keepAlive() const; void setKeepAlive(quint16 keepAlive); diff --git a/libnymea-mqtt/mqttclient_p.h b/libnymea-mqtt/mqttclient_p.h index e0342a8..350c1d0 100644 --- a/libnymea-mqtt/mqttclient_p.h +++ b/libnymea-mqtt/mqttclient_p.h @@ -52,6 +52,8 @@ public slots: void sendPingreq(); void restartKeepAliveTimer(); + void reconnectTimerTimeout(); + public: QString serverHostname; quint16 serverPort = 0; @@ -59,6 +61,9 @@ public: bool sessionActive = false; bool cleanSession = true; QTcpSocket *socket = nullptr; + QTimer reconnectTimer; + int reconnectAttempt = 0; + quint16 maxReconnectTimeout = 36000; QString clientId; quint16 keepAlive; diff --git a/libnymea-mqtt/mqttpacket.cpp b/libnymea-mqtt/mqttpacket.cpp index 1710b60..2c68b1a 100644 --- a/libnymea-mqtt/mqttpacket.cpp +++ b/libnymea-mqtt/mqttpacket.cpp @@ -60,7 +60,9 @@ MqttPacket::MqttPacket(MqttPacket::Type type, quint16 packetId, Mqtt::QoS qos, b case TypeSubscribe: case TypePubrel: case TypeUnsubscribe: + setDup(false); setQoS(Mqtt::QoS1); + setRetain(false); break; } } diff --git a/libnymea-mqtt/mqttserver.cpp b/libnymea-mqtt/mqttserver.cpp index 4c1ace9..96b86c5 100644 --- a/libnymea-mqtt/mqttserver.cpp +++ b/libnymea-mqtt/mqttserver.cpp @@ -35,9 +35,6 @@ MqttServerPrivate::MqttServerPrivate(MqttServer *q): q_ptr(q) { qRegisterMetaType(); - - server = new QTcpServer(this); - connect(server, &QTcpServer::newConnection, this, &MqttServerPrivate::onNewConnection); } QHash MqttServerPrivate::publish(const QString &topic, const QByteArray &payload) @@ -94,26 +91,56 @@ void MqttServer::setMaximumSubscriptionsQoS(Mqtt::QoS maximumSubscriptionQoS) d_ptr->maximumSubscriptionQoS = maximumSubscriptionQoS; } -bool MqttServer::listen(const QHostAddress &address, quint16 port, MqttUserValidator *userValidator) +void MqttServer::setAuthorizer(MqttAuthorizer *authorizer) { - d_ptr->userValidator = userValidator; + d_ptr->authorizer = authorizer; +} - if (!d_ptr->server->listen(address, port)) { - qCWarning(dbgServer()) << "Error listening on port" << port; - return false; +int MqttServer::listen(const QHostAddress &address, quint16 port, const QSslConfiguration &sslConfiguration) +{ + SslServer *server = new SslServer(sslConfiguration); + connect(server, &SslServer::clientConnected, d_ptr, &MqttServerPrivate::onClientConnected); + connect(server, &SslServer::clientDisconnected, d_ptr, &MqttServerPrivate::onClientDisconnected); + connect(server, &SslServer::dataAvailable, d_ptr, &MqttServerPrivate::onDataAvailable); + + if (!server->listen(address, port)) { + qCWarning(dbgServer) << "Error listening on port" << port; + server->deleteLater(); + return -1; } - qCDebug(dbgServer) << "nymea MQTT server running on" << address.toString() << ":" << port; - return true; + static int addressId = -1; + d_ptr->servers.insert(++addressId, server); + qCDebug(dbgServer) << "nymea MQTT server running on" << address.toString() << ":" << port << "( Address ID" << addressId << ")"; + return addressId; } -bool MqttServer::isListening() const +bool MqttServer::isListening(const QHostAddress &address, quint16 port) const { - return d_ptr->server->isListening(); + foreach (SslServer *server, d_ptr->servers) { + if (server->serverAddress() == address && server->serverPort() == port && server->isListening()) { + return true; + } + } + return false; } -void MqttServer::close() +QList MqttServer::listeningAddressIds() const { - d_ptr->server->close(); + return d_ptr->servers.keys(); +} + +void MqttServer::close(int interfaceId) +{ + if (!d_ptr->servers.contains(interfaceId)) { + qCWarning(dbgServer) << "No such server address ID" << interfaceId; + return; + } + SslServer *server = d_ptr->servers.take(interfaceId); + while (!d_ptr->clientServerMap.keys(server).isEmpty()) { + d_ptr->cleanupClient(d_ptr->clientServerMap.keys(server).first()); + } + server->close(); + server->deleteLater(); } QStringList MqttServer::clients() const @@ -125,14 +152,24 @@ QStringList MqttServer::clients() const return clientIds; } +void MqttServer::disconnectClient(const QString &clientId) +{ + foreach (ClientContext *ctx, d_ptr->clientList) { + if (ctx->clientId == clientId) { + d_ptr->cleanupClient(d_ptr->clientList.key(ctx)); + return; + } + } +} + QHash MqttServer::publish(const QString &topic, const QByteArray &payload) { return d_ptr->publish(topic, payload); } -void MqttServerPrivate::onNewConnection() +void MqttServerPrivate::onClientConnected(QSslSocket *client) { - QTcpSocket *client = server->nextPendingConnection(); + SslServer *server = static_cast(sender()); // Start a 10 second timer to clean up the connection if we don't get data until then. QTimer *timeoutTimer = new QTimer(this); @@ -143,16 +180,43 @@ void MqttServerPrivate::onNewConnection() client->deleteLater(); }); timeoutTimer->start(10000); + clientServerMap.insert(client, server); pendingConnections.insert(client, timeoutTimer); - - connect(client, &QTcpSocket::readyRead, this, &MqttServerPrivate::onClientReadyRead); - connect(client, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(onClientError(QAbstractSocket::SocketError))); - connect(client, &QTcpSocket::disconnected, this, &MqttServerPrivate::onClientDisconnected); } -void MqttServerPrivate::onClientDisconnected() +void MqttServerPrivate::onDataAvailable(QSslSocket *client, const QByteArray &data) +{ + clientBuffers[client].append(data); + + do { + MqttPacket packet; + int ret = packet.parse(clientBuffers[client]); + if (ret == 0) { + qCDebug(dbgServer) << "Packet too short... Waiting for more..."; + return; + } + + // Ok, we've got a full packet (or garbage data). If this client is still pending + // we can stop the timer, the protocol will take it from here. + if (pendingConnections.contains(client)) { + pendingConnections.take(client)->deleteLater(); + } + + if (ret == -1) { + qCWarning(dbgServer) << "Bad MQTT packet data, Dropping connection" << packet.serialize().toHex(); + cleanupClient(client); + return; + } + + clientBuffers[client].remove(0, ret); + + processPacket(packet, client); + + } while (!clientBuffers.value(client).isEmpty()); +} + +void MqttServerPrivate::onClientDisconnected(QSslSocket *client) { - QTcpSocket *client = static_cast(sender()); cleanupClient(client); } @@ -161,6 +225,9 @@ void MqttServerPrivate::cleanupClient(QTcpSocket *client) if (clientBuffers.contains(client)) { clientBuffers.remove(client); } + if (clientServerMap.contains(client)) { + clientServerMap.remove(client); + } if (clientList.contains(client)) { ClientContext *ctx = clientList.value(client); qCDebug(dbgServer) << "Client" << ctx->clientId << "disconnected."; @@ -174,12 +241,20 @@ void MqttServerPrivate::cleanupClient(QTcpSocket *client) processPacket(willPacket, client); } + while (!ctx->subscriptions.isEmpty()) { + emit q_ptr->clientUnsubscribed(ctx->clientId, ctx->subscriptions.takeFirst().topicFilter()); + } + emit q_ptr->clientDisconnected(ctx->clientId); clientList.remove(client); delete ctx; } - client->flush(); + + if (client->isOpen()) { + client->flush(); + client->close(); + } client->deleteLater(); } @@ -215,7 +290,7 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie clientId = QUuid::createUuid().toString().remove(QRegExp("[{}-]*")); } - if (userValidator) { + if (authorizer) { QString username; if (packet.connectFlags().testFlag(Mqtt::ConnectFlagUsername)) { username = packet.username(); @@ -224,7 +299,9 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie if (packet.connectFlags().testFlag(Mqtt::ConnectFlagPassword)) { password = packet.password(); } - Mqtt::ConnectReturnCode userValidationReturnCode = userValidator->validateConnect(clientId, username, password, client->peerAddress()); + SslServer *server = clientServerMap.value(client); + int serverAddressId = servers.key(server); + Mqtt::ConnectReturnCode userValidationReturnCode = authorizer->authorizeConnect(serverAddressId, clientId, username, password, client->peerAddress()); if (userValidationReturnCode != Mqtt::ConnectReturnCodeAccepted) { qCWarning(dbgServer) << "Rejecting connection due to user validation."; response.setConnectReturnCode(userValidationReturnCode); @@ -311,7 +388,7 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie clientList.insert(client, ctx); response.setConnectReturnCode(Mqtt::ConnectReturnCodeAccepted); client->write(response.serialize()); - emit q_ptr->clientConnected(ctx->clientId, ctx->username, client->peerAddress()); + emit q_ptr->clientConnected(servers.key(clientServerMap.value(client)), ctx->clientId, ctx->username, client->peerAddress()); foreach (quint16 retryPacketId, ctx->unackedPacketList) { qCDebug(dbgServer) << "Resending unacked packet" << retryPacketId << "to" << ctx->clientId;; @@ -336,8 +413,6 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie if (packet.type() == MqttPacket::TypePublish) { qCDebug(dbgServer).nospace() << "Publish received from client " << ctx->clientId << ": Topic: " << packet.topic() << ", Payload: " << packet.payload() << " (Packet ID: " << packet.packetId() << ", DUP: " << packet.dup() << ", QoS: " << packet.qos() << ", Retain: " << packet.retain() << ')'; - emit q_ptr->publishReceived(ctx->clientId, packet.packetId(), packet.topic(), packet.payload(), packet.dup()); - switch (packet.qos()) { case Mqtt::QoS0: break; @@ -379,6 +454,12 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie } } + if (authorizer && !authorizer->authorizePublish(servers.key(clientServerMap.value(client)), ctx->clientId, packet.topic())) { + qCDebug(dbgServer) << "Client not authorized to publish to this topic. Discarding packet"; + return; + } + + emit q_ptr->publishReceived(ctx->clientId, packet.packetId(), packet.topic(), packet.payload()); publish(packet.topic(), packet.payload()); return; @@ -413,8 +494,9 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie // qCDebug(dbgServer).nospace() << ctx->clientId ": Subscribe packet received."; MqttPacket response(MqttPacket::TypeSuback, packet.packetId()); QByteArray payload; + MqttSubscriptions effectiveSubscriptions; foreach (MqttSubscription subscription, packet.subscriptions()) { - if (userValidator && !userValidator->validateSubscribe(subscription.topicFilter(), ctx->clientId, ctx->username)) { + if (authorizer && !authorizer->authorizeSubscribe(servers.key(clientServerMap.value(client)), ctx->clientId, subscription.topicFilter())) { qCWarning(dbgServer).nospace().noquote() << "Subscription topic filter not allowed for client \"" << ctx->clientId << "\": \"" << subscription.topicFilter() << '\"'; response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeFailure); continue; @@ -437,6 +519,7 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie ctx->subscriptions.append(subscription); } qCDebug(dbgServer).noquote().nospace() << "Subscribed client \"" << ctx->clientId << "\" to topic filter: \"" << subscription.topicFilter() << "\" with QoS " << subscription.qoS(); + effectiveSubscriptions << subscription; emit q_ptr->clientSubscribed(ctx->clientId, subscription.topicFilter(), subscription.qoS()); switch (subscription.qoS()) { case Mqtt::QoS0: @@ -453,7 +536,7 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie client->write(response.serialize()); // Deliver any retained messages for this topic - foreach (MqttSubscription subscription, packet.subscriptions()) { + foreach (MqttSubscription subscription, effectiveSubscriptions) { foreach (const QString &topic, retainedMessages.keys()) { if (matchTopic(subscription.topicFilter(), topic)) { foreach (MqttPacket packet, retainedMessages.value(topic)) { @@ -560,40 +643,40 @@ quint16 MqttServerPrivate::newPacketId(ClientContext *ctx) return packetId; } -void MqttServerPrivate::onClientReadyRead() +void SslServer::incomingConnection(qintptr socketDescriptor) { - QTcpSocket* client = static_cast(sender()); + QSslSocket *sslSocket = new QSslSocket(this); - clientBuffers[client].append(client->readAll()); + qCDebug(dbgServer) << "New client socket connection:" << sslSocket; - do { - MqttPacket packet; - int ret = packet.parse(clientBuffers[client]); - if (ret == 0) { - qCDebug(dbgServer) << "Packet too short... Waiting for more..."; - return; - } + connect(sslSocket, &QSslSocket::encrypted, [this, sslSocket](){ emit clientConnected(sslSocket); }); + connect(sslSocket, &QSslSocket::readyRead, this, &SslServer::onSocketReadyRead); + connect(sslSocket, &QSslSocket::disconnected, this, &SslServer::onClientDisconnected); - // Ok, we've got a full packet (or garbage data). If this client is still pending - // we can stop the timer, the protocol will take it from here. - if (pendingConnections.contains(client)) { - pendingConnections.take(client)->deleteLater(); - } - - if (ret == -1) { - qCWarning(dbgServer) << "Bad MQTT packet data, Dropping connection"; - cleanupClient(client); - return; - } - - clientBuffers[client].remove(0, ret); - - processPacket(packet, client); - - } while (!clientBuffers.value(client).isEmpty()); + if (!sslSocket->setSocketDescriptor(socketDescriptor)) { + qCWarning(dbgServer) << "Failed to set SSL socket descriptor."; + delete sslSocket; + return; + } + if (!m_config.isNull()) { + sslSocket->setSslConfiguration(m_config); + sslSocket->startServerEncryption(); + } else { + emit clientConnected(sslSocket); + } } -void MqttServerPrivate::onClientError(QAbstractSocket::SocketError error) +void SslServer::onClientDisconnected() { -// qCWarning(dbgServer) << "Client error:" << error; + QSslSocket *socket = static_cast(sender()); + qCDebug(dbgServer) << "Client socket disconnected:" << socket; + emit clientDisconnected(socket); + socket->deleteLater(); +} + +void SslServer::onSocketReadyRead() +{ + QSslSocket *socket = static_cast(sender()); + QByteArray data = socket->readAll(); + emit dataAvailable(socket, data); } diff --git a/libnymea-mqtt/mqttserver.h b/libnymea-mqtt/mqttserver.h index 34a3597..ed64811 100644 --- a/libnymea-mqtt/mqttserver.h +++ b/libnymea-mqtt/mqttserver.h @@ -26,17 +26,19 @@ #include #include #include +#include #include "mqttpacket.h" class MqttServerPrivate; class Subscription; -class MqttUserValidator { +class MqttAuthorizer { public: - virtual ~MqttUserValidator() = default; - virtual Mqtt::ConnectReturnCode validateConnect(const QString &clientId, const QString &username, const QString &password, const QHostAddress &peerAddress) = 0; - virtual bool validateSubscribe(const QString &topicFilter, const QString &clientId, const QString &username) = 0; + virtual ~MqttAuthorizer() = default; + virtual Mqtt::ConnectReturnCode authorizeConnect(int serverAddressId, const QString &clientId, const QString &username, const QString &password, const QHostAddress &peerAddress) = 0; + virtual bool authorizeSubscribe(int serverAddressId, const QString &clientId, const QString &topicFilter) = 0; + virtual bool authorizePublish(int serverAddressId, const QString &clientId, const QString &topic) = 0; }; class MqttServer : public QObject @@ -48,18 +50,23 @@ public: Mqtt::QoS maximumSubscriptionsQoS() const; void setMaximumSubscriptionsQoS(Mqtt::QoS maximumSubscriptionQoS); - bool listen(const QHostAddress &address = QHostAddress::Any, quint16 port = 1883, MqttUserValidator *userValidator = nullptr); - bool isListening() const; - void close(); + void setAuthorizer(MqttAuthorizer *authorizer); + + int listen(const QHostAddress &address = QHostAddress::Any, quint16 port = 1883, const QSslConfiguration &sslConfiguration = QSslConfiguration()); + QList listeningAddressIds() const; + QPair listeningAddress(int addressId); + void close(int addressId); + bool isListening(const QHostAddress &address, quint16 port) const; QStringList clients() const; + void disconnectClient(const QString &clientId); // allows publishing from the server, including topcis starting with $ QHash publish(const QString &topic, const QByteArray &payload = QByteArray()); signals: // emitted whenever a client connects, after the mqtt connect handshake has been done. - void clientConnected(const QString &clientId, const QString &username, const QHostAddress &clientAddress); + void clientConnected(int serverAddressId, const QString &clientId, const QString &username, const QHostAddress &clientAddress); // emitted whenever a client disconnects, that is, when a DISCONNECT message has been received or the keep alive timeout has been reached. void clientDisconnected(const QString &clientId); // emitted whenever a client has been seen, that is, a control message or a keep alive message has been received. @@ -69,7 +76,7 @@ signals: // emitted whenever a client unsubscribes from a topic void clientUnsubscribed(const QString &clientId, const QString &topicFiltr); // emitted whenever a publish message is received from a client before the message is relayed to other clients. Topics starting with $ will be received here, but not relayed to other clients. - void publishReceived(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload, bool dup); + void publishReceived(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload); // emitted whenever a publish message is sent to a client. Note: this might be fired often if many clients are connected and subsribed to matching topic filters. void published(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload); diff --git a/libnymea-mqtt/mqttserver_p.h b/libnymea-mqtt/mqttserver_p.h index 18fdf9b..11c747f 100644 --- a/libnymea-mqtt/mqttserver_p.h +++ b/libnymea-mqtt/mqttserver_p.h @@ -34,6 +34,7 @@ Q_DECLARE_LOGGING_CATEGORY(dbgServer) class ClientContext; class Subscription; +class SslServer; class MqttServerPrivate: public QObject { @@ -52,16 +53,15 @@ public: quint16 newPacketId(ClientContext *ctx); public slots: - void onNewConnection(); - void onClientReadyRead(); - void onClientError(QAbstractSocket::SocketError); - void onClientDisconnected(); + void onClientConnected(QSslSocket *client); + void onDataAvailable(QSslSocket *client, const QByteArray &data); + void onClientDisconnected(QSslSocket *client); public: MqttServer *q_ptr; - QTcpServer *server = nullptr; - MqttUserValidator *userValidator = nullptr; + QHash servers; + MqttAuthorizer *authorizer = nullptr; Mqtt::QoS maximumSubscriptionQoS = Mqtt::QoS2; @@ -69,6 +69,7 @@ public: QHash clientList; QHash clientBuffers; QHash retainedMessages; + QHash clientServerMap; }; class ClientContext { @@ -90,4 +91,31 @@ public: QHash unackedPackets; }; +class SslServer: public QTcpServer +{ + Q_OBJECT +public: + SslServer(const QSslConfiguration &config, QObject *parent = nullptr): + QTcpServer(parent), + m_config(config) + { + + } + +signals: + void clientConnected(QSslSocket *socket); + void clientDisconnected(QSslSocket *socket); + void dataAvailable(QSslSocket *socket, const QByteArray &data); + +protected: + void incomingConnection(qintptr socketDescriptor) override; + +private slots: + void onClientDisconnected(); + void onSocketReadyRead(); + +private: + QSslConfiguration m_config; +}; + #endif // MQTTSERVER_P_H diff --git a/tests/operation/test_operation.cpp b/tests/operation/test_operation.cpp index 00ecb9a..69488f8 100644 --- a/tests/operation/test_operation.cpp +++ b/tests/operation/test_operation.cpp @@ -178,7 +178,7 @@ void OperationTests::connectAndDisconnect() MqttClient* client = connectAndWait(clientId); QVERIFY2(serverSpy.count() == 1, "Server didn't emit clientConnected"); - QVERIFY2(serverSpy.at(0).first() == clientId, "ClientId not matching on server side."); + QVERIFY2(serverSpy.at(0).at(1) == clientId, "ClientId not matching on server side."); QSignalSpy serverSpyDisconnect(m_server, &MqttServer::clientDisconnected); QSignalSpy clientSpy(client, &MqttClient::disconnected); @@ -382,13 +382,11 @@ void OperationTests::testQoS1Retransmissions() QCOMPARE(serverSpy.at(0).at(1).toInt(), packetId); QCOMPARE(serverSpy.at(0).at(2).toString(), QString("/testtopic")); QCOMPARE(serverSpy.at(0).at(3).toString(), QString("Hello world")); - QCOMPARE(serverSpy.at(0).at(4).toBool(), false); QCOMPARE(serverSpy.at(1).at(0).toString(), QString("client1")); QCOMPARE(serverSpy.at(1).at(1).toInt(), packetId); QCOMPARE(serverSpy.at(1).at(2).toString(), QString("/testtopic")); QCOMPARE(serverSpy.at(1).at(3).toString(), QString("Hello world")); - QCOMPARE(serverSpy.at(1).at(4).toBool(), true); } void OperationTests::testMultiSubscription() @@ -811,6 +809,7 @@ void OperationTests::testEmptyClientId() QPair client3 = connectToServer("", false); QTRY_VERIFY2(client3.second->count() == 1, "Client did not emit connected signal"); QTRY_COMPARE(client3.second->first().at(0).value(), Mqtt::ConnectReturnCodeIdentifierRejected); + QTRY_VERIFY2(client3.first->isConnected() == false, "Connection should have been dropped"); } #endif