/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * Copyright 2013 - 2022, nymea GmbH * Contact: contact@nymea.io * * This file is part of nymea. * This project including source code and documentation is protected by copyright law, and * remains the property of nymea GmbH. All rights, including reproduction, publication, * editing and translation, are reserved. The use of this project is subject to the terms of a * license agreement to be concluded with nymea GmbH in accordance with the terms * of use of nymea GmbH, available under https://nymea.io/license * * GNU Lesser General Public License Usage * Alternatively, this project may be redistributed and/or modified under the terms of the GNU * Lesser General Public License as published by the Free Software Foundation; version 3. * this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License along with this project. * If not, see . * * For any further details and any questions please contact us under contact@nymea.io * or see our FAQ/Licensing Information on https://nymea.io/license/faq * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ /*! \class MqttClient \brief A MQTT client \inmodule nymea-mqtt \ingroup mqtt MqttClient is used to connect to MQTT servers/brokers. The currently supported MQTT protocol version is 3.1.1. The currently supported transports are TCP socket and WebSocket with SSL encryption. */ #include "mqttclient.h" #include "mqttclient_p.h" #include "mqttpacket.h" #include "transports/mqtttcpclienttransport.h" #include "transports/mqttwebsocketclienttransport.h" Q_LOGGING_CATEGORY(dbgClient, "nymea.mqtt.client") MqttClientPrivate::MqttClientPrivate(MqttClient *q): QObject(q), q_ptr(q) { qRegisterMetaType(); qRegisterMetaType(); qRegisterMetaType(); reconnectTimer.setSingleShot(true); connect(&keepAliveTimer, &QTimer::timeout, this, &MqttClientPrivate::sendPingreq); connect(&reconnectTimer, &QTimer::timeout, this, &MqttClientPrivate::reconnectTimerTimeout); } void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, bool cleanSession, bool useSsl, const QSslConfiguration &sslConfiguration) { MqttTcpClientTransport *tcpTransport = new MqttTcpClientTransport(hostName, port, useSsl, sslConfiguration, this); connectToHost(tcpTransport, cleanSession); } void MqttClientPrivate::connectToHost(const QNetworkRequest &request, bool cleanSession) { MqttWebSocketClientTransport *webSocketTransport = new MqttWebSocketClientTransport(request, this); connectToHost(webSocketTransport, cleanSession); } void MqttClientPrivate::connectToHost(MqttClientTransport *transport, bool cleanSession) { if (this->transport != transport) { reconnectAttempt = 1; reconnectTimer.stop(); if (this->transport) { this->transport->abort(); this->transport->deleteLater(); } this->transport = transport; connect(transport, &MqttClientTransport::connected, this, &MqttClientPrivate::onConnected); connect(transport, &MqttClientTransport::disconnected, this, &MqttClientPrivate::onDisconnected); connect(transport, &MqttClientTransport::dataReceived, this, &MqttClientPrivate::onDataReceived); connect(transport, &MqttClientTransport::stateChanged, this, &MqttClientPrivate::onSocketStateChanged); connect(transport, &MqttClientTransport::errorSignal, this, &MqttClientPrivate::onSocketError); connect(transport, &MqttClientTransport::sslErrors, this, &MqttClientPrivate::onSslErrors); } this->cleanSession = cleanSession; sessionActive = true; transport->connectToHost(); } void MqttClientPrivate::disconnectFromHost() { sessionActive = false; if (!transport || !transport->isOpen()) { return; } MqttPacket packet(MqttPacket::TypeDisconnect); transport->write(packet.serialize()); transport->flush(); transport->disconnectFromHost(); } /*! * \brief Constructs a new MQTT client object. * \param clientId The client ID. * \param parent A QObject parent for this MqttClient. * * The clientId is usually obtained with the credentials for a server. */ MqttClient::MqttClient(const QString &clientId, QObject *parent): QObject(parent), d_ptr(new MqttClientPrivate(this)) { d_ptr->clientId = clientId; } /*! * \brief Constructs a new MQTT client object. * \param clientId The client ID. * \param keepAlive The keep alive timeout in seconds * \param willTopic The will topic for this connection * \param willMessage The will message payload for this connection * \param willQoS The QoS used to send the will for this message * \param willRetain Determines whether the will message should be retained on the server * \param parent A QObject parent for this MqttClient. * * The clientId is usually obtained with the credentials for a server. Please refer to the MQTT documentation * for information about how the will message in MQTT works. */ MqttClient::MqttClient(const QString &clientId, quint16 keepAlive, const QString &willTopic, const QByteArray &willMessage, Mqtt::QoS willQoS, bool willRetain, QObject *parent): QObject(parent), d_ptr(new MqttClientPrivate(this)) { d_ptr->clientId = clientId; d_ptr->keepAlive = keepAlive; d_ptr->willTopic = willTopic; d_ptr->willMessage = willMessage; d_ptr->willQoS = willQoS; d_ptr->willRetain = willRetain; } bool MqttClient::autoReconnect() const { return d_ptr->autoReconnect; } void MqttClient::setAutoReconnect(bool autoReconnect) { d_ptr->autoReconnect = autoReconnect; } quint16 MqttClient::maxAutoReconnectTimeout() const { return d_ptr->maxReconnectTimeout; } void MqttClient::setMaxAutoReconnectTimeout(quint16 maxAutoReconnectTimeout) { d_ptr->maxReconnectTimeout = maxAutoReconnectTimeout; } void MqttClient::setKeepAlive(quint16 keepAlive) { d_ptr->keepAlive = keepAlive; } QString MqttClient::willTopic() const { return d_ptr->willTopic; } void MqttClient::setWillTopic(const QString &willTopic) { d_ptr->willTopic = willTopic; } QByteArray MqttClient::willMessage() const { return d_ptr->willMessage; } void MqttClient::setWillMessage(const QByteArray &willMessage) { d_ptr->willMessage = willMessage; } Mqtt::QoS MqttClient::willQoS() const { return d_ptr->willQoS; } void MqttClient::setWillQoS(Mqtt::QoS willQoS) { d_ptr->willQoS = willQoS; } bool MqttClient::willRetain() const { return d_ptr->willRetain; } void MqttClient::setWillRetain(bool willRetain) { d_ptr->willRetain = willRetain; } QString MqttClient::username() const { return d_ptr->username; } void MqttClient::setUsername(const QString &username) { d_ptr->username = username; } QString MqttClient::password() const { return d_ptr->password; } void MqttClient::setPassword(const QString &password) { d_ptr->password = password; } void MqttClient::connectToHost(const QString &hostName, quint16 port, bool cleanSession, bool useSsl, const QSslConfiguration &sslConfiguration) { d_ptr->connectToHost(hostName, port, cleanSession, useSsl, sslConfiguration); } void MqttClient::connectToHost(const QNetworkRequest &request, bool cleanSession) { d_ptr->connectToHost(request, cleanSession); } void MqttClient::disconnectFromHost() { d_ptr->disconnectFromHost(); } bool MqttClient::isConnected() const { return d_ptr->transport && d_ptr->transport->state() == QAbstractSocket::ConnectedState && d_ptr->keepAliveTimer.isActive(); } void MqttClient::ignoreSslErrors() { d_ptr->transport->ignoreSslErrors(); } quint16 MqttClient::subscribe(const MqttSubscription &subscription) { MqttSubscriptions subscriptions = {subscription}; return subscribe(subscriptions); } quint16 MqttClient::subscribe(const QString &topicFilter, Mqtt::QoS qos) { MqttSubscription subscription(topicFilter.toUtf8(), qos); return subscribe(subscription); } quint16 MqttClient::subscribe(const MqttSubscriptions &subscriptions) { MqttPacket packet(MqttPacket::TypeSubscribe, d_ptr->newPacketId()); packet.setSubscriptions(subscriptions); d_ptr->unackedPackets.insert(packet.packetId(), packet); d_ptr->unackedPacketList.append(packet.packetId()); d_ptr->transport->write(packet.serialize()); return packet.packetId(); } quint16 MqttClient::unsubscribe(const MqttSubscription &subscription) { MqttSubscriptions subscriptions = {subscription}; return unsubscribe(subscriptions); } quint16 MqttClient::unsubscribe(const QString &topicFilter) { return unsubscribe(MqttSubscription(topicFilter.toUtf8(), Mqtt::QoS0)); } quint16 MqttClient::unsubscribe(const MqttSubscriptions &subscriptions) { MqttPacket packet(MqttPacket::TypeUnsubscribe, d_ptr->newPacketId()); packet.setSubscriptions(subscriptions); d_ptr->unackedPackets.insert(packet.packetId(), packet); d_ptr->unackedPacketList.append(packet.packetId()); d_ptr->transport->write(packet.serialize()); return packet.packetId(); } quint16 MqttClient::publish(const QString &topic, const QByteArray &payload, Mqtt::QoS qos, bool retain) { quint16 packetId = qos >= Mqtt::QoS1 ? d_ptr->newPacketId() : 0; MqttPacket packet(MqttPacket::TypePublish, packetId, qos, retain, false); packet.setTopic(topic.toUtf8()); packet.setPayload(payload); d_ptr->transport->write(packet.serialize()); if (qos == Mqtt::QoS0) { QTimer::singleShot(0, this, [this, packet](){ emit published(packet.packetId(), packet.topic()); }); } else { d_ptr->unackedPackets.insert(packet.packetId(), packet); d_ptr->unackedPacketList.append(packetId); } return packetId; } void MqttClientPrivate::onConnected() { MqttPacket packet(MqttPacket::TypeConnect); packet.setProtocolLevel(Mqtt::Protocol311); packet.setCleanSession(cleanSession); packet.setKeepAlive(keepAlive); packet.setClientId(clientId.toUtf8()); packet.setWillTopic(willTopic.toUtf8()); packet.setWillMessage(willMessage); packet.setWillQoS(willQoS); packet.setWillRetain(willRetain); packet.setUsername(username.toUtf8()); packet.setPassword(password.toUtf8()); transport->write(packet.serialize()); } void MqttClientPrivate::onDisconnected() { qCDebug(dbgClient) << "Disconnected from server"; emit q_ptr->disconnected(); if (sessionActive && autoReconnect) { reconnectAttempt = qMin(maxReconnectTimeout / 60 / 60, reconnectAttempt * 2); qCDebug(dbgClient) << "Reconnecting in" << reconnectAttempt << "seconds"; reconnectTimer.setInterval(reconnectAttempt * 1000); reconnectTimer.start(); } } void MqttClientPrivate::onDataReceived(const QByteArray &data) { inputBuffer.append(data); // qCDebug(dbgClient) << "Received data from server:" << data.toHex() << "\n" << data; MqttPacket packet; int ret = packet.parse(inputBuffer); if (ret == -1) { qCDebug(dbgClient) << "Bad data from server. Dropping connection."; inputBuffer.clear(); transport->abort(); return; } if (ret == 0) { qCDebug(dbgClient) << "Not enough data from server..."; return; } inputBuffer.remove(0, ret); switch (packet.type()) { case MqttPacket::TypeConnack: if (packet.connectReturnCode() != Mqtt::ConnectReturnCodeAccepted) { qCWarning(dbgClient) << "MQTT connection refused:" << packet.connectReturnCode(); // Always emit connected, even if just to indicate a "ClientRefusedError" emit q_ptr->connected(packet.connectReturnCode(), packet.connackFlags()); transport->abort(); emit q_ptr->error(QAbstractSocket::ConnectionRefusedError); return; } foreach (quint16 retryPacketId, unackedPacketList) { MqttPacket retryPacket = unackedPackets.value(retryPacketId); if (retryPacket.type() == MqttPacket::TypePublish) { retryPacket.setDup(true); } transport->write(retryPacket.serialize()); } restartKeepAliveTimer(); // Make sure we emit connected after having handled all the retransmission queue emit q_ptr->connected(packet.connectReturnCode(), packet.connackFlags()); break; case MqttPacket::TypePublish: qCDebug(dbgClient) << "Publish received from server. Topic:" << packet.topic() << "Payload:" << packet.payload() << "QoS:" << packet.qos(); switch (packet.qos()) { case Mqtt::QoS0: emit q_ptr->publishReceived(packet.topic(), packet.payload(), packet.retain()); break; case Mqtt::QoS1: { emit q_ptr->publishReceived(packet.topic(), packet.payload(), packet.retain()); MqttPacket response(MqttPacket::TypePuback, packet.packetId()); transport->write(response.serialize()); break; } case Mqtt::QoS2: { if (!packet.dup() && unackedPacketList.contains(packet.packetId())) { // Hmm... Server says it's not a duplicate, but packet id is not released yet... Drop connection. transport->disconnectFromHost(); return; } MqttPacket response(MqttPacket::TypePubrec, packet.packetId()); if (!unackedPacketList.contains(packet.packetId())) { unackedPackets.insert(packet.packetId(), response); unackedPacketList.append(packet.packetId()); emit q_ptr->publishReceived(packet.topic(), packet.payload(), packet.retain()); } transport->write(response.serialize()); break; } } break; case MqttPacket::TypePuback: { MqttPacket publishPacket = unackedPackets.take(packet.packetId()); unackedPacketList.removeAll(packet.packetId()); emit q_ptr->published(packet.packetId(), publishPacket.topic()); restartKeepAliveTimer(); break; } case MqttPacket::TypePubrec: { MqttPacket publishPacket = unackedPackets.value(packet.packetId()); MqttPacket response(MqttPacket::TypePubrel, packet.packetId()); unackedPackets[packet.packetId()] = response; transport->write(response.serialize()); emit q_ptr->published(packet.packetId(), publishPacket.topic()); restartKeepAliveTimer(); break; } case MqttPacket::TypePubrel: { MqttPacket response(MqttPacket::TypePubcomp, packet.packetId()); unackedPackets[packet.packetId()] = response; transport->write(response.serialize()); restartKeepAliveTimer(); break; } case MqttPacket::TypePubcomp: unackedPackets.remove(packet.packetId()); unackedPacketList.removeAll(packet.packetId()); restartKeepAliveTimer(); break; case MqttPacket::TypeSuback: { MqttPacket subscribePacket = unackedPackets.take(packet.packetId()); unackedPacketList.removeAll(packet.packetId()); if (subscribePacket.subscriptions().count() != packet.subscribeReturnCodes().count()) { qCWarning(dbgClient) << "Subscription return code count not matching subscribe packet!"; transport->abort(); return; } // Ack the subscription packet emit q_ptr->subscribeResult(packet.packetId(), packet.subscribeReturnCodes()); // emit subscribed for each topic for (int i = 0; i < packet.subscribeReturnCodes().count(); i++) { emit q_ptr->subscribed(subscribePacket.subscriptions().at(i).topicFilter(), packet.subscribeReturnCodes().at(i)); } restartKeepAliveTimer(); break; } case MqttPacket::TypeUnsuback: if (!unackedPackets.contains(packet.packetId())) { qCWarning(dbgClient) << "UNSUBACK received but not waiting for it. Dropping connection. Packet ID:" << packet.packetId(); transport->abort(); return; } unackedPackets.remove(packet.packetId()); unackedPacketList.removeAll(packet.packetId()); emit q_ptr->unsubscribed(packet.packetId()); restartKeepAliveTimer(); break; case MqttPacket::TypePingresp: break; default: qCDebug(dbgClient).noquote().nospace() << "Unhandled packet type: 0x" << QString::number(packet.type(), 16); Q_ASSERT(false); } if (!inputBuffer.isEmpty()) { onDataReceived(QByteArray()); } } void MqttClientPrivate::onSocketStateChanged(QAbstractSocket::SocketState socketState) { emit q_ptr->stateChanged(socketState); } void MqttClientPrivate::onSocketError(QAbstractSocket::SocketError error) { qCWarning(dbgClient) << "MQTT socket error:" << error; emit q_ptr->error(error); } void MqttClientPrivate::onSslErrors(const QList &errors) { qCWarning(dbgClient) << "SSL error in MQTT connection:" << errors; emit q_ptr->sslErrors(errors); } quint16 MqttClientPrivate::newPacketId() { static quint16 packetId = 1; do { packetId++; } while (unackedPacketList.contains(packetId)); return packetId; } void MqttClientPrivate::sendPingreq() { MqttPacket packet(MqttPacket::TypePingreq); transport->write(packet.serialize()); } void MqttClientPrivate::restartKeepAliveTimer() { if (keepAlive > 0) { keepAliveTimer.start(keepAlive * 1000); } } void MqttClientPrivate::reconnectTimerTimeout() { qCDebug(dbgClient()) << "Reconnecting now..."; if (!autoReconnect) { return; } connectToHost(transport, false); }