commit 7be28b8e444b6761ad7c037f9711fa5f842a2a68 Author: Michael Zanetti Date: Tue Nov 13 00:54:32 2018 +0100 nymea-mqtt implementation diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0161bd7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.user + diff --git a/libnymea-mqtt/libnymea-mqtt.pri b/libnymea-mqtt/libnymea-mqtt.pri new file mode 100644 index 0000000..3ab25bf --- /dev/null +++ b/libnymea-mqtt/libnymea-mqtt.pri @@ -0,0 +1,22 @@ +QT -= gui +QT += network + +CONFIG += c++11 console static +CONFIG -= app_bundle + +SOURCES += \ + mqttserver.cpp \ + mqttpacket.cpp \ + mqttsubscription.cpp \ + $$PWD/mqttclient.cpp + +HEADERS += \ + mqttserver.h \ + mqttpacket.h \ + mqtt.h \ + mqttsubscription.h \ + $$PWD/mqttclient.h \ + $$PWD/mqttpacket_p.h \ + $$PWD/mqttclient_p.h \ + $$PWD/mqttserver_p.h + diff --git a/libnymea-mqtt/libnymea-mqtt.pro b/libnymea-mqtt/libnymea-mqtt.pro new file mode 100644 index 0000000..1e3dd28 --- /dev/null +++ b/libnymea-mqtt/libnymea-mqtt.pro @@ -0,0 +1,4 @@ +TEMPLATE = lib +TARGET = nymea-mqtt + +include(libnymea-mqtt.pri) diff --git a/libnymea-mqtt/mqtt.h b/libnymea-mqtt/mqtt.h new file mode 100644 index 0000000..4a1a54c --- /dev/null +++ b/libnymea-mqtt/mqtt.h @@ -0,0 +1,62 @@ +#ifndef MQTT_H +#define MQTT_H + +#include + +namespace Mqtt { + +enum Protocol { + ProtocolUnknown = 0x00, + Protocol311 = 0x04 +}; + +enum QoS { + QoS0 = 0x00, + QoS1 = 0x01, + QoS2 = 0x02 +}; + +enum ConnectFlag { + ConnectFlagNone = 0x00, + ConnectFlagCleanSession = 0x02, + ConnectFlagWill = 0x04, + ConnectFlagWillQoS1 = 0x08, + ConnectFlagWillQoS2 = 0x10, + ConnectFlagWillRetain = 0x20, + ConnectFlagPassword = 0x40, + ConnectFlagUsername = 0x80 +}; +Q_DECLARE_FLAGS(ConnectFlags, ConnectFlag) + +enum ConnackFlag { + ConnackFlagNone = 0x0, + ConnackFlagSessionPresent = 0x1 +}; +Q_DECLARE_FLAGS(ConnackFlags, ConnackFlag) + +enum ConnectReturnCode { + ConnectReturnCodeAccepted = 0x00, + ConnectReturnCodeUnacceptableProtocolVersion = 0x01, + ConnectReturnCodeIdentifierRejected = 0x02, + ConnectReturnCodeServerUnavailable = 0x03, + ConnectReturnCodeBadUsernameOrPassword = 0x04, + ConnectReturnCodeNotAuthorized = 0x05 +}; +enum SubscribeReturnCode { + SubscribeReturnCodeSuccessQoS0 = 0x00, + SubscribeReturnCodeSuccessQoS1 = 0x01, + SubscribeReturnCodeSuccessQoS2 = 0x02, + SubscribeReturnCodeFailure = 0x80 +}; +typedef QList SubscribeReturnCodes; + +}; + +Q_DECLARE_METATYPE(Mqtt::QoS) +Q_DECLARE_METATYPE(Mqtt::ConnectFlags) +Q_DECLARE_METATYPE(Mqtt::ConnackFlags) +Q_DECLARE_METATYPE(Mqtt::ConnectReturnCode) +Q_DECLARE_METATYPE(Mqtt::SubscribeReturnCode) +Q_DECLARE_METATYPE(Mqtt::SubscribeReturnCodes) + +#endif // MQTT_H diff --git a/libnymea-mqtt/mqttclient.cpp b/libnymea-mqtt/mqttclient.cpp new file mode 100644 index 0000000..66057d3 --- /dev/null +++ b/libnymea-mqtt/mqttclient.cpp @@ -0,0 +1,395 @@ +#include "mqttclient.h" +#include "mqttclient_p.h" +#include "mqttpacket.h" + +Q_LOGGING_CATEGORY(dbgClient, "nymea.mqtt.client") + +MqttClientPrivate::MqttClientPrivate(MqttClient *q): + QObject(q), + q_ptr(q) +{ + qRegisterMetaType(); + qRegisterMetaType(); +} + +void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, bool cleanSession) +{ + serverHostname = hostName; + serverPort = port; + this->cleanSession = cleanSession; + + sessionActive = true; + + if (socket) { + socket->abort(); + socket->deleteLater(); + } + socket = new QTcpSocket(this); + connect(socket, &QTcpSocket::connected, this, &MqttClientPrivate::onConnected); + connect(socket, &QTcpSocket::disconnected, this, &MqttClientPrivate::onDisconnected); + connect(socket, &QTcpSocket::readyRead, this, &MqttClientPrivate::onReadyRead); + connect(socket, &QTcpSocket::stateChanged, this, &MqttClientPrivate::onSocketStateChanged); +// connect(d_ptr->socket, &QTcpSocket::error, this, &MqttClient::error); + socket->connectToHost(hostName, port); +} + +void MqttClientPrivate::disconnectFromHost() +{ + sessionActive = false; + if (!socket || !socket->isOpen()) { + return; + } + MqttPacket packet(MqttPacket::TypeDisconnect); + socket->write(packet.serialize()); + socket->flush(); + socket->disconnectFromHost(); +} + +MqttClient::MqttClient(const QString &clientId, QObject *parent): + QObject(parent), + d_ptr(new MqttClientPrivate(this)) +{ + d_ptr->clientId = clientId; + +} + +MqttClient::MqttClient(const QString &clientId, quint16 keepAlive, const QString &willTopic, const QByteArray &willMessage, Mqtt::QoS willQoS, bool willRetain, QObject *parent): + QObject(parent), + d_ptr(new MqttClientPrivate(this)) +{ + + d_ptr->clientId = clientId; + d_ptr->keepAlive = keepAlive; + d_ptr->willTopic = willTopic; + d_ptr->willMessage = willMessage; + d_ptr->willQoS = willQoS; + d_ptr->willRetain = willRetain; + + if (keepAlive > 0) { + connect(&d_ptr->keepAliveTimer, &QTimer::timeout, d_ptr, &MqttClientPrivate::sendPingreq); + } +} + +bool MqttClient::autoReconnect() const +{ + return d_ptr->autoReconnect; +} + +void MqttClient::setAutoReconnect(bool autoReconnect) +{ + d_ptr->autoReconnect = autoReconnect; +} + +void MqttClient::setKeepAlive(quint16 keepAlive) +{ + d_ptr->keepAlive = keepAlive; +} + +QString MqttClient::willTopic() const +{ + return d_ptr->willTopic; +} + +void MqttClient::setWillTopic(const QString &willTopic) +{ + d_ptr->willTopic = willTopic; +} + +QByteArray MqttClient::willMessage() const +{ + return d_ptr->willMessage; +} + +void MqttClient::setWillMessage(const QByteArray &willMessage) +{ + d_ptr->willMessage = willMessage; +} + +Mqtt::QoS MqttClient::willQoS() const +{ + return d_ptr->willQoS; +} + +void MqttClient::setWillQoS(Mqtt::QoS willQoS) +{ + d_ptr->willQoS = willQoS; +} + +bool MqttClient::willRetain() const +{ + return d_ptr->willRetain; +} + +void MqttClient::setWillRetain(bool willRetain) +{ + d_ptr->willRetain = willRetain; +} + +QString MqttClient::username() const +{ + return d_ptr->username; +} + +void MqttClient::setUsername(const QString &username) +{ + d_ptr->username = username; +} + +QString MqttClient::password() const +{ + return d_ptr->password; +} + +void MqttClient::setPassword(const QString &password) +{ + d_ptr->password = password; +} + +void MqttClient::connectToHost(const QString &hostName, quint16 port, bool cleanSession) +{ + d_ptr->connectToHost(hostName, port, cleanSession); +} + +void MqttClient::disconnectFromHost() +{ + d_ptr->disconnectFromHost(); +} + +bool MqttClient::isConnected() const +{ + return d_ptr->socket && d_ptr->socket->state() == QAbstractSocket::ConnectedState && d_ptr->keepAliveTimer.isActive(); +} + +quint16 MqttClient::subscribe(const MqttSubscription &subscription) +{ + MqttSubscriptions subscriptions = {subscription}; + return subscribe(subscriptions); +} + +quint16 MqttClient::subscribe(const QString &topicFilter, Mqtt::QoS qos) +{ + MqttSubscription subscription(topicFilter.toUtf8(), qos); + return subscribe(subscription); +} + +quint16 MqttClient::subscribe(const MqttSubscriptions &subscriptions) +{ + MqttPacket packet(MqttPacket::TypeSubscribe, d_ptr->newPacketId()); + packet.setSubscriptions(subscriptions); + d_ptr->unackedPackets.insert(packet.packetId(), packet); + d_ptr->unackedPacketList.append(packet.packetId()); + d_ptr->socket->write(packet.serialize()); + return packet.packetId(); +} + +quint16 MqttClient::unsubscribe(const MqttSubscription &subscription) +{ + MqttSubscriptions subscriptions = {subscription}; + return unsubscribe(subscriptions); +} + +quint16 MqttClient::unsubscribe(const QString &topicFilter) +{ + return unsubscribe(MqttSubscription(topicFilter.toUtf8(), Mqtt::QoS0)); +} + +quint16 MqttClient::unsubscribe(const MqttSubscriptions &subscriptions) +{ + MqttPacket packet(MqttPacket::TypeUnsubscribe, d_ptr->newPacketId()); + packet.setSubscriptions(subscriptions); + d_ptr->unackedPackets.insert(packet.packetId(), packet); + d_ptr->unackedPacketList.append(packet.packetId()); + d_ptr->socket->write(packet.serialize()); + return packet.packetId(); +} + +quint16 MqttClient::publish(const QString &topic, const QByteArray &payload, Mqtt::QoS qos, bool retain) +{ + quint16 packetId = qos >= Mqtt::QoS1 ? d_ptr->newPacketId() : 0; + MqttPacket packet(MqttPacket::TypePublish, packetId, qos, retain, false); + packet.setTopic(topic.toUtf8()); + packet.setPayload(payload); + d_ptr->socket->write(packet.serialize()); + if (qos == Mqtt::QoS0) { + QTimer::singleShot(0, this, [this, packetId](){ + emit published(packetId); + }); + } else { + d_ptr->unackedPackets.insert(packet.packetId(), packet); + d_ptr->unackedPacketList.append(packetId); + } + return packetId; +} + +void MqttClientPrivate::onConnected() +{ + MqttPacket packet(MqttPacket::TypeConnect); + packet.setProtocolLevel(Mqtt::Protocol311); + packet.setCleanSession(cleanSession); + packet.setKeepAlive(keepAlive); + packet.setClientId(clientId.toUtf8()); + packet.setWillTopic(willTopic.toUtf8()); + packet.setWillMessage(willMessage); + packet.setWillQoS(willQoS); + packet.setWillRetain(willRetain); + packet.setUsername(username.toUtf8()); + packet.setPassword(password.toUtf8()); + socket->write(packet.serialize()); +} + +void MqttClientPrivate::onDisconnected() +{ + qCDebug(dbgClient) << "Disconnected from server"; + emit q_ptr->disconnected(); + if (sessionActive && autoReconnect) { + connectToHost(serverHostname, serverPort, cleanSession); + } +} + +void MqttClientPrivate::onReadyRead() +{ + static QByteArray data; + data.append(socket->readAll()); +// qCDebug(dbgClient) << "Received data from server:" << data.toHex(); + MqttPacket packet; + int ret = packet.parse(data); + if (ret == -1) { + qCDebug(dbgClient) << "Bad data from server. Dropping connection."; + data.clear(); + socket->abort(); + return; + } + if (ret == 0) { + qCDebug(dbgClient) << "Not enough data from server..."; + return; + } + data.remove(0, ret); + + switch (packet.type()) { + case MqttPacket::TypeConnack: + emit q_ptr->connected(packet.connectReturnCode(), packet.connackFlags()); + if (packet.connectReturnCode() != Mqtt::ConnectReturnCodeAccepted) { + qCWarning(dbgClient) << "MQTT connection refused:" << packet.connectReturnCode(); + socket->abort(); + emit q_ptr->disconnected(); + return; + } + foreach (quint16 retryPacketId, unackedPacketList) { + MqttPacket retryPacket = unackedPackets.value(retryPacketId); + retryPacket.setDup(true); + socket->write(retryPacket.serialize()); + } + restartKeepAliveTimer(); + break; + case MqttPacket::TypePublish: + qCDebug(dbgClient) << "Publish received from server. Topic:" << packet.topic() << "Payload:" << packet.payload() << "QoS:" << packet.qos(); + switch (packet.qos()) { + case Mqtt::QoS0: + emit q_ptr->publishReceived(packet.topic(), packet.payload(), packet.retain()); + break; + case Mqtt::QoS1: { + emit q_ptr->publishReceived(packet.topic(), packet.payload(), packet.retain()); + MqttPacket response(MqttPacket::TypePuback, packet.packetId()); + socket->write(response.serialize()); + break; + } + case Mqtt::QoS2: { + if (!packet.dup() && unackedPacketList.contains(packet.packetId())) { + // Hmm... Server says it's not a duplicate, but packet id is not released yet... Drop connection. + socket->disconnectFromHost(); + return; + } + + MqttPacket response(MqttPacket::TypePubrec, packet.packetId()); + + if (!unackedPacketList.contains(packet.packetId())) { + unackedPackets.insert(packet.packetId(), response); + unackedPacketList.append(packet.packetId()); + emit q_ptr->publishReceived(packet.topic(), packet.payload(), packet.retain()); + } + socket->write(response.serialize()); + break; + } + } + break; + case MqttPacket::TypePuback: + unackedPackets.remove(packet.packetId()); + unackedPacketList.removeAll(packet.packetId()); + emit q_ptr->published(packet.packetId()); + restartKeepAliveTimer(); + break; + case MqttPacket::TypePubrec: { + MqttPacket response(MqttPacket::TypePubrel, packet.packetId()); + unackedPackets[packet.packetId()] = response; + socket->write(response.serialize()); + restartKeepAliveTimer(); + break; + } + case MqttPacket::TypePubrel: { + MqttPacket response(MqttPacket::TypePubcomp, packet.packetId()); + unackedPackets[packet.packetId()] = response; + socket->write(response.serialize()); + restartKeepAliveTimer(); + break; + } + case MqttPacket::TypePubcomp: + unackedPackets.remove(packet.packetId()); + unackedPacketList.removeAll(packet.packetId()); + emit q_ptr->published(packet.packetId()); + restartKeepAliveTimer(); + break; + case MqttPacket::TypeSuback: + unackedPackets.remove(packet.packetId()); + unackedPacketList.removeAll(packet.packetId()); + emit q_ptr->subscribed(packet.packetId(), packet.subscribeReturnCodes()); + restartKeepAliveTimer(); + break; + case MqttPacket::TypeUnsuback: + if (!unackedPackets.contains(packet.packetId())) { + qCWarning(dbgClient) << "UNSUBACK received but not waiting for it. Dropping connection. Packet ID:" << packet.packetId(); + socket->abort(); + return; + } + unackedPackets.remove(packet.packetId()); + unackedPacketList.removeAll(packet.packetId()); + emit q_ptr->unsubscribed(packet.packetId()); + restartKeepAliveTimer(); + break; + case MqttPacket::TypePingresp: + break; + default: + qCDebug(dbgClient).noquote().nospace() << "Unhandled packet type: 0x" << QString::number(packet.type(), 16); + Q_ASSERT(false); + } + + if (!data.isEmpty()) { + onReadyRead(); + } +} + +void MqttClientPrivate::onSocketStateChanged(QAbstractSocket::SocketState socketState) +{ + emit q_ptr->stateChanged(socketState); +} + +quint16 MqttClientPrivate::newPacketId() +{ + static quint16 packetId = 1; + do { + packetId++; + } while (unackedPacketList.contains(packetId)); + return packetId; +} + +void MqttClientPrivate::sendPingreq() +{ + MqttPacket packet(MqttPacket::TypePingreq); + socket->write(packet.serialize()); +} + +void MqttClientPrivate::restartKeepAliveTimer() +{ + if (keepAlive > 0) { + keepAliveTimer.start(keepAlive * 1000); + } +} diff --git a/libnymea-mqtt/mqttclient.h b/libnymea-mqtt/mqttclient.h new file mode 100644 index 0000000..536a60b --- /dev/null +++ b/libnymea-mqtt/mqttclient.h @@ -0,0 +1,75 @@ +#ifndef MQTTCLIENT_H +#define MQTTCLIENT_H + +#include +#include + +#include "mqttpacket.h" +#include "mqttsubscription.h" + +class MqttClientPrivate; + +class MqttClient : public QObject +{ + Q_OBJECT +public: + explicit MqttClient(const QString &clientId, QObject *parent = nullptr); + explicit MqttClient(const QString &clientId, quint16 keepAlive = 300, const QString &willTopic = QString(), const QByteArray &willMessage = QByteArray(), Mqtt::QoS willQoS = Mqtt::QoS0, bool willRetain = false, QObject *parent = nullptr); + + bool autoReconnect() const; + void setAutoReconnect(bool autoReconnect); + + quint16 keepAlive() const; + void setKeepAlive(quint16 keepAlive); + + QString willTopic() const; + void setWillTopic(const QString &willTopic); + + QByteArray willMessage() const; + void setWillMessage(const QByteArray &willMessage); + + Mqtt::QoS willQoS() const; + void setWillQoS(Mqtt::QoS willQoS); + + bool willRetain() const; + void setWillRetain(bool willRetain); + + QString username() const; + void setUsername(const QString &username); + + QString password() const; + void setPassword(const QString &password); + + void connectToHost(const QString &hostName, quint16 port, bool cleanSession = true); + void disconnectFromHost(); + + bool isConnected() const; + +public slots: + quint16 subscribe(const MqttSubscription &subscription); + quint16 subscribe(const QString &topciFilter, Mqtt::QoS qos = Mqtt::QoS0); + quint16 subscribe(const MqttSubscriptions &subscriptions); + + quint16 unsubscribe(const MqttSubscription &subscription); + quint16 unsubscribe(const QString &topicFilter); + quint16 unsubscribe(const MqttSubscriptions &subscriptions); + + quint16 publish(const QString &topic, const QByteArray &payload, Mqtt::QoS qos = Mqtt::QoS0, bool retain = false); + +signals: + void connected(Mqtt::ConnectReturnCode connectReturnCode, Mqtt::ConnackFlags connackFlags); + void disconnected(); + void stateChanged(QAbstractSocket::SocketState state); + void error(QAbstractSocket::SocketError socketError); + + void subscribed(quint16 packetId, const Mqtt::SubscribeReturnCodes &subscribeReturnCodes); + void unsubscribed(quint16 packetId); + void published(quint16 packetId); + void publishReceived(const QString &topic, const QByteArray &payload, bool retained); + +private: + MqttClientPrivate *d_ptr; + friend class OperationTests; +}; + +#endif // MQTTCLIENT_H diff --git a/libnymea-mqtt/mqttclient_p.h b/libnymea-mqtt/mqttclient_p.h new file mode 100644 index 0000000..5bbc65a --- /dev/null +++ b/libnymea-mqtt/mqttclient_p.h @@ -0,0 +1,58 @@ +#ifndef MQTTCLIENT_P_H +#define MQTTCLIENT_P_H + +#include +#include +#include +#include + +#include "mqttpacket.h" +#include "mqttclient.h" +#include "mqttsubscription.h" + +Q_DECLARE_LOGGING_CATEGORY(dbgClient) + +class MqttClientPrivate: public QObject +{ + Q_OBJECT +public: + MqttClientPrivate(MqttClient *q); + MqttClient *q_ptr; + ~MqttClientPrivate() { qDebug() << "destroying client private" << this; } + + void connectToHost(const QString &hostName, quint16 port, bool cleanSession); + void disconnectFromHost(); + +public slots: + void onConnected(); + void onDisconnected(); + void onReadyRead(); + void onSocketStateChanged(QAbstractSocket::SocketState socketState); + + quint16 newPacketId(); + void sendPingreq(); + void restartKeepAliveTimer(); + +public: + QString serverHostname; + quint16 serverPort = 0; + bool autoReconnect = true; + bool sessionActive = false; + bool cleanSession = true; + QTcpSocket *socket = nullptr; + + QString clientId; + quint16 keepAlive; + QTimer keepAliveTimer; + QString willTopic; + QByteArray willMessage; + Mqtt::QoS willQoS = Mqtt::QoS0; + bool willRetain = false; + QString username; + QString password; + + QVector unackedPacketList; + QHash unackedPackets; +}; + +#endif // MQTTCLIENT_P_H diff --git a/libnymea-mqtt/mqttpacket.cpp b/libnymea-mqtt/mqttpacket.cpp new file mode 100644 index 0000000..3822d92 --- /dev/null +++ b/libnymea-mqtt/mqttpacket.cpp @@ -0,0 +1,741 @@ +#include "mqttpacket.h" +#include "mqttpacket_p.h" + +#include +#include + +Q_LOGGING_CATEGORY(dbgProto, "nymea.mqtt.protocol") + +#define ASSERT_LEN(a, name) if (remainingLength < a) { qCWarning(dbgProto) << "Bad" << name << "packet. Data too short."; return -1; } +#define VERIFY_LEN(a, name) if (remainingLength != a) { qCWarning(dbgProto) << "Bad" << name << "packet. Data length unexpected."; return -1; } + +MqttPacket::MqttPacket(): + d_ptr(new MqttPacketPrivate(this)) +{ +} + +MqttPacket::MqttPacket(MqttPacket::Type type, quint16 packetId, Mqtt::QoS qos, bool retain, bool dup): + d_ptr(new MqttPacketPrivate(this)) +{ + d_ptr->packetId = packetId; + d_ptr->header = type; + + switch (type) { + case TypeConnect: + case TypeConnack: + case TypePuback: + case TypePubrec: + case TypePubcomp: + case TypeSuback: + case TypeUnsuback: + case TypePingreq: + case TypePingresp: + case TypeDisconnect: + break; + case TypePublish: + setDup(dup); + setQoS(qos); + setRetain(retain); + break; + case TypeSubscribe: + case TypePubrel: + case TypeUnsubscribe: + setQoS(Mqtt::QoS1); + break; + } +} + +MqttPacket::Type MqttPacket::type() const +{ + return static_cast(d_ptr->header & 0xF0); +} + +bool MqttPacket::dup() const +{ + return d_ptr->header & 0x08; +} + +void MqttPacket::setDup(bool dup) +{ + if (dup) { + d_ptr->header |= 0x08; + } else { + d_ptr->header &= 0xf7; + } +} + +Mqtt::QoS MqttPacket::qos() const +{ + return static_cast((d_ptr->header & 0x06) >> 1); +} + +void MqttPacket::setQoS(Mqtt::QoS qoS) +{ + d_ptr->header &= 0xf9; + d_ptr->header |= (qoS << 1); +} + +bool MqttPacket::retain() const +{ + return d_ptr->header & 0x01; +} + +void MqttPacket::setRetain(bool retain) +{ + if (retain) { + d_ptr->header = d_ptr->header | 0x01; + } else { + d_ptr->header = d_ptr->header & 0xfe; + } +} + +void MqttPacket::setCleanSession(bool cleanSession) +{ + d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagCleanSession, cleanSession); +} + +bool MqttPacket::cleanSession() const +{ + return d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagCleanSession); +} + +Mqtt::ConnectFlags MqttPacket::connectFlags() const +{ + return d_ptr->connectFlags; +} + +QByteArray MqttPacket::protocolName() const +{ + return d_ptr->protocolName; +} + +Mqtt::Protocol MqttPacket::protocolLevel() const +{ + return d_ptr->protocolLevel; +} + +void MqttPacket::setProtocolLevel(Mqtt::Protocol protocolLevel) +{ + d_ptr->protocolLevel = protocolLevel; +} + +QByteArray MqttPacket::clientId() const +{ + return d_ptr->clientId; +} + +void MqttPacket::setClientId(const QByteArray &clientId) +{ + d_ptr->clientId = clientId; +} + +QByteArray MqttPacket::willTopic() const +{ + return d_ptr->willTopic; +} + +void MqttPacket::setWillTopic(const QByteArray &willTopic) +{ + d_ptr->willTopic = willTopic; + d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagWill, !willTopic.isEmpty()); +} + +QByteArray MqttPacket::willMessage() const +{ + return d_ptr->willMessage; +} + +void MqttPacket::setWillMessage(const QByteArray &willMessage) +{ + d_ptr->willMessage = willMessage; +} + +Mqtt::QoS MqttPacket::willQoS() const +{ + if (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWillQoS2)) { + return Mqtt::QoS2; + } + if (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWillQoS1)) { + return Mqtt::QoS1; + } + return Mqtt::QoS0; +} + +void MqttPacket::setWillQoS(Mqtt::QoS willQoS) +{ + d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagWillQoS1, willQoS == Mqtt::QoS1); + d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagWillQoS2, willQoS == Mqtt::QoS2); +} + +bool MqttPacket::willRetain() const +{ + return d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWillRetain); +} + +void MqttPacket::setWillRetain(bool willRetain) +{ + d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagWillRetain, willRetain); +} + +QByteArray MqttPacket::username() const +{ + return d_ptr->username; +} + +void MqttPacket::setUsername(const QByteArray &username) +{ + d_ptr->username = username; + d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagUsername, !username.isEmpty()); +} + +QByteArray MqttPacket::password() const +{ + return d_ptr->password; +} + +void MqttPacket::setPassword(const QByteArray &password) +{ + d_ptr->password = password; + d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagPassword, !password.isEmpty()); +} + +quint16 MqttPacket::keepAlive() const +{ + return d_ptr->keepAlive; +} + +void MqttPacket::setKeepAlive(quint16 keepAlive) +{ + d_ptr->keepAlive = keepAlive; +} + +Mqtt::ConnectReturnCode MqttPacket::connectReturnCode() const +{ + return d_ptr->connectReturnCode; +} + +void MqttPacket::setConnectReturnCode(Mqtt::ConnectReturnCode connectReturnCode) +{ + d_ptr->connectReturnCode = connectReturnCode; +} + +Mqtt::ConnackFlags MqttPacket::connackFlags() const +{ + return d_ptr->connackFlags; +} + +void MqttPacket::setConnackFlags(Mqtt::ConnackFlags connackFlags) +{ + d_ptr->connackFlags = connackFlags; +} + +quint16 MqttPacket::packetId() const +{ + return d_ptr->packetId; +} + +void MqttPacket::setPacketId(quint16 packetId) +{ + d_ptr->packetId = packetId; +} + +QByteArray MqttPacket::topic() const +{ + return d_ptr->topic; +} + +void MqttPacket::setTopic(const QByteArray &topic) +{ + d_ptr->topic = topic; +} + +QByteArray MqttPacket::payload() const +{ + return d_ptr->payload; +} + +void MqttPacket::setPayload(const QByteArray &payload) +{ + d_ptr->payload = payload; +} + +MqttSubscriptions MqttPacket::subscriptions() const +{ + return d_ptr->subscriptions; +} + +void MqttPacket::setSubscriptions(const MqttSubscriptions &subscriptions) +{ + d_ptr->subscriptions = subscriptions; +} + +void MqttPacket::addSubscription(const MqttSubscription &subscription) +{ + d_ptr->subscriptions.append(subscription); +} + +Mqtt::SubscribeReturnCodes MqttPacket::subscribeReturnCodes() const +{ + return d_ptr->subscribeReturnCodes; +} + +void MqttPacket::setSubscribeReturnCodes(const Mqtt::SubscribeReturnCodes subscribeReturnCodes) +{ + d_ptr->subscribeReturnCodes = subscribeReturnCodes; +} + +void MqttPacket::addSubscribeReturnCode(Mqtt::SubscribeReturnCode subscribeReturnCode) +{ + d_ptr->subscribeReturnCodes.append(subscribeReturnCode); +} + +int MqttPacket::parse(const QByteArray &buffer) +{ + if (buffer.length() < 2) { + return 0; + } + QDataStream inputStream(buffer); + qCDebug(dbgProto()) << "MQTT input data:\n" << buffer.toHex(); + + inputStream >> d_ptr->header; + + quint16 remainingLength = 0; + int multiplier = 1; + quint8 lengthBit; + quint8 lenFields = 0; + do { + inputStream >> lengthBit; + remainingLength += (lengthBit & 0x7F) * multiplier; + multiplier *= 128; + lenFields++; + } while((lengthBit & 0x80) != 0); + + if (remainingLength > buffer.length() - 1 - lenFields) { + qCDebug(dbgProto) << "Cannot process MQTT packet. Remaining Length field larger than input data size:" << remainingLength << ">" << (buffer.length() - 1 - lenFields); + return 0; + } + + if (!d_ptr->verifyHeaderFlags()) { + qCDebug(dbgProto) << "Bad MQTT packet. Fixed header flags invalid."; + return -1; + } + + const quint16 fullRemainingLength = remainingLength; + + quint16 strLen; + const quint16 MAX_STRLEN = 256; + char str[MAX_STRLEN]; + + switch (type()) { + case TypeConnect: { + ASSERT_LEN(2, "CONNECT"); + inputStream >> strLen; + remainingLength -= 2; + + ASSERT_LEN(strLen, "CONNECT"); + memset(str, 0, MAX_STRLEN); + inputStream.readRawData(str, qMin(MAX_STRLEN, strLen)); + remainingLength -= strLen; + d_ptr->protocolName = QByteArray(str); + + ASSERT_LEN(6, "CONNECT"); + quint8 pl; + inputStream >> pl; + d_ptr->protocolLevel = static_cast(pl); + remainingLength -= 1; + quint8 cF; + inputStream >> cF; + remainingLength -= 1; + d_ptr->connectFlags = static_cast(cF); + inputStream >> d_ptr->keepAlive; + remainingLength -= 2; + + inputStream >> strLen; + remainingLength -= 2; + + ASSERT_LEN(strLen, "CONNECT"); + memset(str, 0, MAX_STRLEN); + inputStream.readRawData(str, qMin(MAX_STRLEN, strLen)); + remainingLength -= strLen; + d_ptr->clientId = QByteArray(str); + + if (connectFlags().testFlag(Mqtt::ConnectFlagWill)) { + ASSERT_LEN(2, "CONNECT"); + inputStream >> strLen; + remainingLength -= 2; + ASSERT_LEN(strLen, "CONNECT"); + memset(str, 0, MAX_STRLEN); + inputStream.readRawData(str, qMin(MAX_STRLEN, strLen)); + remainingLength -= strLen; + d_ptr->willTopic = QByteArray(str); + + ASSERT_LEN(2, "CONNECT"); + inputStream >> strLen; + remainingLength -= 2; + ASSERT_LEN(strLen, "CONNECT"); + memset(str, 0, MAX_STRLEN); + inputStream.readRawData(str, qMin(MAX_STRLEN, strLen)); + remainingLength -= strLen; + d_ptr->willMessage = QByteArray(str); + } else { + if (willRetain() || willQoS() != Mqtt::QoS0) { + qCWarning(dbgProto) << "Bad CONNECT packet. Will flag not set but WillQoS or WillRetain set."; + return -1; + } + } + + if (connectFlags().testFlag(Mqtt::ConnectFlagUsername)) { + ASSERT_LEN(2, "CONNECT"); + inputStream >> strLen; + remainingLength -= 2; + ASSERT_LEN(strLen, "CONNECT"); + memset(str, 0, MAX_STRLEN); + inputStream.readRawData(str, qMin(MAX_STRLEN, strLen)); + remainingLength -= strLen; + d_ptr->username = QByteArray(str); + } else { + if (connectFlags().testFlag(Mqtt::ConnectFlagPassword)) { + qCWarning(dbgProto) << "Bad CONNECT packet. Username flag not set but password is set."; + return -1; + } + } + + if (connectFlags().testFlag(Mqtt::ConnectFlagPassword)) { + ASSERT_LEN(2, "CONNECT"); + inputStream >> strLen; + remainingLength -= 2; + ASSERT_LEN(strLen, "CONNECT"); + memset(str, 0, MAX_STRLEN); + inputStream.readRawData(str, qMin(MAX_STRLEN, strLen)); + remainingLength -= strLen; + d_ptr->password = QByteArray(str); + } + VERIFY_LEN(0, "CONNECT"); + break; + } + case TypeConnack: { + VERIFY_LEN(2, "CONNACK"); + quint8 connackFlags; + inputStream >> connackFlags; + remainingLength -= 1; + d_ptr->connackFlags = static_cast(connackFlags); + quint8 connectReturnCode; + inputStream >> connectReturnCode; + d_ptr->connectReturnCode = static_cast(connectReturnCode); + remainingLength -= 1; + VERIFY_LEN(0, "CONNACK"); + break; + } + case TypePublish: { + ASSERT_LEN(2, "PUBLISH"); + inputStream >> strLen; + remainingLength -= 2; + ASSERT_LEN(strLen, "PUBLISH"); + memset(str, 0, MAX_STRLEN); + inputStream.readRawData(str, qMin(MAX_STRLEN, strLen)); + remainingLength -= strLen; + d_ptr->topic = QByteArray(str); + + if (qos() == Mqtt::QoS1 || qos() == Mqtt::QoS2) { + ASSERT_LEN(2, "PUBLISH"); + inputStream >> d_ptr->packetId; + remainingLength -= 2; + } + + memset(str, 0, MAX_STRLEN); + inputStream.readRawData(str, qMin(MAX_STRLEN, remainingLength)); + d_ptr->payload = QByteArray(str); + break; + } + case TypePuback: + VERIFY_LEN(2, "PUBACK"); + inputStream >> d_ptr->packetId; + break; + case TypePubrec: + VERIFY_LEN(2, "PUBREC"); + inputStream >> d_ptr->packetId; + break; + case TypePubrel: + VERIFY_LEN(2, "PUBREL"); + inputStream >> d_ptr->packetId; + break; + case TypePubcomp: + VERIFY_LEN(2, "PUBCOMP"); + inputStream >> d_ptr->packetId; + break; + case TypeSubscribe: { + ASSERT_LEN(2, "SUBSCRIBE"); + inputStream >> d_ptr->packetId; + remainingLength -= 2; + + if (remainingLength == 0) { + qCWarning(dbgProto) << "Bad SUBSCRIBE packet. Subscription filter in payload missing."; + return -1; + } + while (remainingLength > 0) { + ASSERT_LEN(2, "SUBSCRIBE"); + inputStream >> strLen; + remainingLength -= 2; + ASSERT_LEN(strLen, "SUBSCRIBE"); + memset(str, 0, MAX_STRLEN); + inputStream.readRawData(str, qMin(MAX_STRLEN, strLen)); + remainingLength -= strLen; + MqttSubscription subscription; + subscription.setTopicFilter(QByteArray(str)); + + ASSERT_LEN(1, "SUBSCRIBE"); + quint8 requestedQoS; + inputStream >> requestedQoS; + remainingLength -= 1; + if ((requestedQoS & 0xFC) != 0x00) { + qCWarning(dbgProto) << "Bad SUBSCRIBE packet. Reserved bits set in requested QoS field."; + return -1; + } + if ((requestedQoS & 0x03) == 0x03) { + qCWarning(dbgProto) << "Bad SUBSCRIBE packet. QoS cannot be QoS1 and QoS2 at the same time."; + return -1; + } + subscription.setQoS(static_cast(requestedQoS)); + d_ptr->subscriptions.append(subscription); + } + break; + } + case TypeSuback: + ASSERT_LEN(3, "SUBACK"); + inputStream >> d_ptr->packetId; + remainingLength -= 2; + while (remainingLength > 0) { + quint8 subscribeReturnCode; + inputStream >> subscribeReturnCode; + remainingLength -= 1; + d_ptr->subscribeReturnCodes.append(static_cast(subscribeReturnCode)); + } + break; + case TypeUnsubscribe: { + ASSERT_LEN(5, "UNSUBSCRIBE"); + inputStream >> d_ptr->packetId; + remainingLength -= 2; + while (remainingLength > 0) { + ASSERT_LEN(2, "UNSUBSCRIBE"); + inputStream >> strLen; + remainingLength -= 2; + ASSERT_LEN(strLen, "UNSUBSCRIBE"); + memset(str, 0, MAX_STRLEN); + inputStream.readRawData(str, qMin(MAX_STRLEN, strLen)); + remainingLength -= strLen; + MqttSubscription subscription; + subscription.setTopicFilter(QByteArray(str)); + d_ptr->subscriptions.append(subscription); + } + } + break; + case TypeUnsuback: + VERIFY_LEN(2, "UNSUBACK"); + inputStream >> d_ptr->packetId; + break; + case TypePingreq: + VERIFY_LEN(0, "PINGREC"); + break; + case TypePingresp: + VERIFY_LEN(0, "PINGRESP"); + break; + case TypeDisconnect: + VERIFY_LEN(0, "DISCONNECT"); + break; + } + return fullRemainingLength + 1 + lenFields; +} + +QByteArray MqttPacket::serialize() const +{ + QByteArray ret; + QDataStream stream(&ret, QIODevice::WriteOnly); + stream << d_ptr->header; + + quint16 remainingLength = 0; + switch (type()) { + case TypeConnect: + remainingLength = static_cast( + 2 // protocol name length + + d_ptr->protocolName.length() + + 1 // protocol level + + 1 // connect flags + + 2 // keep alive + + 2 // client id length + + d_ptr->clientId.length() + + (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWill) ? (2 + d_ptr->willTopic.length()) : 0) + + (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWill) ? (2 + d_ptr->willMessage.length()) : 0) + + (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagUsername) ? (2 + d_ptr->username.length()) : 0) + + (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagPassword) ? (2 + d_ptr->password.length()) : 0) + ); + break; + case TypeConnack: + remainingLength = 2; + break; + case TypePublish: + remainingLength += 2; // len for topic + remainingLength += d_ptr->topic.length(); + if (qos() == Mqtt::QoS1 || qos() == Mqtt::QoS2) { + remainingLength += 2; // packetId + } + remainingLength += d_ptr->payload.length(); + break; + case TypePuback: + case TypePubrec: + case TypePubrel: + case TypePubcomp: + remainingLength = 2; + break; + case TypeSubscribe: + remainingLength = 2; // packet id + foreach (const MqttSubscription &subscription, d_ptr->subscriptions) { + remainingLength += 2; // topic filter length + remainingLength += static_cast(subscription.topicFilter().length()); + remainingLength += 1; // requested QoS + } + break; + case TypeSuback: + remainingLength = 2 + static_cast(d_ptr->subscribeReturnCodes.length()); + break; + case TypeUnsubscribe: + remainingLength = 2; // packet id + foreach (const MqttSubscription &subscription, d_ptr->subscriptions) { + remainingLength += 2; + remainingLength += static_cast(subscription.topicFilter().length()); + } + break; + case TypeUnsuback: + remainingLength = 2; // packet id + break; + case TypePingreq: + break; + case TypePingresp: + break; + case TypeDisconnect: + break; + } + + quint8 encodedByte; + do { + encodedByte = remainingLength % 128; + remainingLength /= 128; + if ( remainingLength > 0 ) { + encodedByte = encodedByte | 128; + } + stream << encodedByte; + } while ( remainingLength > 0 ); + + switch (type()) { + case TypeConnect: + stream << static_cast(d_ptr->protocolName.length()); + stream.writeRawData(d_ptr->protocolName.data(), d_ptr->protocolName.length()); + stream << static_cast(d_ptr->protocolLevel); + stream << static_cast(d_ptr->connectFlags); + stream << static_cast(d_ptr->keepAlive); + stream << static_cast(d_ptr->clientId.length()); + stream.writeRawData(d_ptr->clientId.data(), d_ptr->clientId.length()); + if (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWill)) { + stream << static_cast(d_ptr->willTopic.length()); + stream.writeRawData(d_ptr->willTopic.data(), d_ptr->willTopic.length()); + stream << static_cast(d_ptr->willMessage.length()); + stream.writeRawData(d_ptr->willMessage.data(), d_ptr->willMessage.length()); + } + if (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagUsername)) { + stream << static_cast(d_ptr->username.length()); + stream.writeRawData(d_ptr->username.data(), d_ptr->username.length()); + } + if (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagPassword)) { + stream << static_cast(d_ptr->password.length()); + stream.writeRawData(d_ptr->password.data(), d_ptr->password.length()); + } + break; + case TypeConnack: + stream << static_cast(d_ptr->connackFlags); + stream << static_cast(d_ptr->connectReturnCode); + break; + case TypePublish: + stream << static_cast(d_ptr->topic.length()); + stream.writeRawData(d_ptr->topic.data(), d_ptr->topic.length()); + if (qos() == Mqtt::QoS1 || qos() == Mqtt::QoS2) { + stream << d_ptr->packetId; + } + stream.writeRawData(d_ptr->payload.data(), d_ptr->payload.length()); + break; + case TypePuback: + case TypePubrec: + case TypePubrel: + case TypePubcomp: + stream << d_ptr->packetId; + break; + case TypeSubscribe: + stream << static_cast(d_ptr->packetId); + foreach (const MqttSubscription &subscription, d_ptr->subscriptions) { + stream << static_cast(subscription.topicFilter().length()); + stream.writeRawData(subscription.topicFilter().data(), subscription.topicFilter().length()); + stream << static_cast(subscription.qoS()); + } + break; + case TypeSuback: + stream << d_ptr->packetId; + foreach (Mqtt::SubscribeReturnCode subscribeReturnCode, d_ptr->subscribeReturnCodes) { + stream << static_cast(subscribeReturnCode); + } + break; + case TypeUnsubscribe: + stream << d_ptr->packetId; + foreach (const MqttSubscription &subscription, d_ptr->subscriptions) { + stream << static_cast(subscription.topicFilter().length()); + stream.writeRawData(subscription.topicFilter().data(), subscription.topicFilter().length()); + } + break; + case TypeUnsuback: + stream << d_ptr->packetId; + break; + case TypePingreq: + break; + case TypePingresp: + break; + case TypeDisconnect: + break; + } +// qCDebug(dbgProto()) << "Serialized MQTT packet:" << ret.toHex(); + return ret; +} + +bool MqttPacket::operator==(const MqttPacket &other) const +{ + return serialize() == other.serialize(); +} + +bool MqttPacketPrivate::verifyHeaderFlags() +{ + bool fail = false; + + switch (q_ptr->type()) { + case MqttPacket::MqttPacket::TypeConnect: + case MqttPacket::TypeConnack: + case MqttPacket::TypePuback: + case MqttPacket::TypePubrec: + case MqttPacket::TypePubcomp: + case MqttPacket::TypeSuback: + case MqttPacket::TypeUnsuback: + case MqttPacket::TypePingreq: + case MqttPacket::TypePingresp: + case MqttPacket::TypeDisconnect: + fail |= q_ptr->dup(); + fail |= (q_ptr->qos() != Mqtt::QoS0); + fail |= q_ptr->retain(); + break; + case MqttPacket::TypePublish: + fail |= (q_ptr->qos() == 0x03); + break; + case MqttPacket::TypeSubscribe: + case MqttPacket::TypePubrel: + case MqttPacket::TypeUnsubscribe: + fail |= q_ptr->dup(); + fail |= (q_ptr->qos() != Mqtt::QoS1); + fail |= q_ptr->retain(); + break; + } + return !fail; +} diff --git a/libnymea-mqtt/mqttpacket.h b/libnymea-mqtt/mqttpacket.h new file mode 100644 index 0000000..440fbe1 --- /dev/null +++ b/libnymea-mqtt/mqttpacket.h @@ -0,0 +1,112 @@ +#ifndef MQTTPACKET_H +#define MQTTPACKET_H + +#include +#include +#include +#include + +#include "mqtt.h" +#include "mqttsubscription.h" + +class MqttPacketPrivate; + +class MqttPacket +{ +public: + + enum Type { + TypeConnect = 0x10, + TypeConnack = 0x20, + TypePublish = 0x30, + TypePuback = 0x40, + TypePubrec = 0x50, + TypePubrel = 0x60, + TypePubcomp = 0x70, + TypeSubscribe = 0x80, + TypeSuback = 0x90, + TypeUnsubscribe = 0xa0, + TypeUnsuback = 0xb0, + TypePingreq = 0xc0, + TypePingresp = 0xd0, + TypeDisconnect = 0xe0 + }; + + MqttPacket(); + MqttPacket(Type type, quint16 packetId = 0, Mqtt::QoS qos = Mqtt::QoS0, bool retain = false, bool dup = false); + +public: + Type type() const; + bool dup() const; + void setDup(bool dup); + Mqtt::QoS qos() const; + void setQoS(Mqtt::QoS qoS); + + bool retain() const; + void setRetain(bool retain); + + + // CONNECT + void setCleanSession(bool cleanSession); + bool cleanSession() const; + Mqtt::ConnectFlags connectFlags() const; + QByteArray protocolName() const; + Mqtt::Protocol protocolLevel() const; + void setProtocolLevel(Mqtt::Protocol protocolLevel); + quint16 keepAlive() const; + void setKeepAlive(quint16 keepAlive); + QByteArray clientId() const; + void setClientId(const QByteArray &clientId); + QByteArray willTopic() const; + void setWillTopic(const QByteArray &willTopic); + QByteArray willMessage() const; + void setWillMessage(const QByteArray &willMessage); + Mqtt::QoS willQoS() const; + void setWillQoS(Mqtt::QoS willQoS); + bool willRetain() const; + void setWillRetain(bool willRetain); + QByteArray username() const; + void setUsername(const QByteArray &username); + QByteArray password() const; + void setPassword(const QByteArray &password); + + // CONNACK + Mqtt::ConnectReturnCode connectReturnCode() const; + void setConnectReturnCode(Mqtt::ConnectReturnCode connectReturnCode); + + Mqtt::ConnackFlags connackFlags() const; + void setConnackFlags(Mqtt::ConnackFlags connackFlags); + + // PUBLISH/SUBSCRIBE + quint16 packetId() const; + void setPacketId(quint16 packetId); + // PUBLISH + QByteArray topic() const; + void setTopic(const QByteArray &topic); + QByteArray payload() const; + void setPayload(const QByteArray &payload); + // SUBSCRIBE + MqttSubscriptions subscriptions() const; + void setSubscriptions(const MqttSubscriptions &subscriptions); + void addSubscription(const MqttSubscription &subscription); + // SUBACK + Mqtt::SubscribeReturnCodes subscribeReturnCodes() const; + void setSubscribeReturnCodes(const Mqtt::SubscribeReturnCodes subscribeReturnCodes); + void addSubscribeReturnCode(Mqtt::SubscribeReturnCode subscribeReturnCode); + + // Takes a buffer and fills the packet accordingly. + // Returns the length of data used from the buffer on success, bad() will return false + // Returns -1 on bad data input, bad() will return true + // Returns 0 if input data is ok, but not long enough, bad() will return true + int parse(const QByteArray &buffer); + QByteArray serialize() const; + + bool operator==(const MqttPacket &other) const; + +private: + MqttPacketPrivate *d_ptr = nullptr; +}; + +typedef QList MqttPackets; + +#endif // MQTTPACKET_H diff --git a/libnymea-mqtt/mqttpacket_p.h b/libnymea-mqtt/mqttpacket_p.h new file mode 100644 index 0000000..782830f --- /dev/null +++ b/libnymea-mqtt/mqttpacket_p.h @@ -0,0 +1,45 @@ +#ifndef MQTTPACKET_P_H +#define MQTTPACKET_P_H + +#include +#include +#include +#include + +#include "mqtt.h" +#include "mqttpacket.h" +#include "mqttsubscription.h" + +Q_DECLARE_LOGGING_CATEGORY(dbgProto) + +class MqttPacketPrivate +{ +public: + MqttPacketPrivate(MqttPacket *q) : q_ptr(q) { } + MqttPacket *q_ptr; + + bool verifyHeaderFlags(); + + quint8 header = 0; + QByteArray protocolName = "MQTT"; + Mqtt::Protocol protocolLevel = Mqtt::ProtocolUnknown; + Mqtt::ConnectFlags connectFlags = Mqtt::ConnectFlagNone; + Mqtt::ConnackFlags connackFlags = Mqtt::ConnackFlagNone; + quint16 keepAlive = 0; + QByteArray clientId; + QByteArray willTopic; + QByteArray willMessage; + QByteArray username; + QByteArray password; + + quint16 packetId = 0; + QByteArray topic; + QByteArray payload; + + Mqtt::ConnectReturnCode connectReturnCode = Mqtt::ConnectReturnCodeAccepted; + + MqttSubscriptions subscriptions; + Mqtt::SubscribeReturnCodes subscribeReturnCodes; +}; + +#endif // MQTTPACKET_P_H diff --git a/libnymea-mqtt/mqttserver.cpp b/libnymea-mqtt/mqttserver.cpp new file mode 100644 index 0000000..5a878d6 --- /dev/null +++ b/libnymea-mqtt/mqttserver.cpp @@ -0,0 +1,569 @@ +#include "mqttserver.h" +#include "mqttserver_p.h" +#include "mqttpacket.h" + +#include +#include +#include +#include +#include + +Q_LOGGING_CATEGORY(dbgServer, "nymea.mqtt.server") + +MqttServerPrivate::MqttServerPrivate(MqttServer *q): + QObject(q), + q_ptr(q) +{ + qRegisterMetaType(); + + server = new QTcpServer(this); + connect(server, &QTcpServer::newConnection, this, &MqttServerPrivate::onNewConnection); +} + +QHash MqttServerPrivate::publish(const QString &topic, const QByteArray &payload) +{ + QHash receivers; + foreach (QTcpSocket *c, clientList.keys()) { + foreach (const MqttSubscription &subscription, clientList.value(c)->subscriptions) { + if (matchTopic(subscription.topicFilter(), topic)) { + if (!receivers.contains(c) || receivers.value(c) < subscription.qoS()) { + receivers[c] = subscription.qoS(); + } + } + } + } + + QHash packets; + foreach (QTcpSocket *receiver, receivers.keys()) { + ClientContext *ctx = clientList.value(receiver); + qCDebug(dbgServer) << "Relaying packet to subscribed client:" << ctx->clientId; + Mqtt::QoS qos = receivers.value(receiver); + MqttPacket packet(MqttPacket::TypePublish, qos >= Mqtt::QoS0 ? newPacketId(ctx) : 0, qos); + packet.setTopic(topic.toUtf8()); + packet.setPayload(payload); + receiver->write(packet.serialize()); + packets.insert(ctx->clientId, packet.packetId()); + if (packet.qos() == Mqtt::QoS0) { + QString clientId = ctx->clientId; + QTimer::singleShot(0, this, [this, clientId, packet](){ + emit q_ptr->published(clientId, packet.packetId(), packet.topic(), packet.payload()); + }); + } else { + ClientContext *ctx = clientList.value(receiver); + ctx->unackedPackets.insert(packet.packetId(), packet); + ctx->unackedPacketList.append(packet.packetId()); + } + } + return packets; +} + +MqttServer::MqttServer(QObject *parent): + QObject(parent), + d_ptr(new MqttServerPrivate(this)) +{ + +} + +Mqtt::QoS MqttServer::maximumSubscriptionsQoS() const +{ + return d_ptr->maximumSubscriptionQoS; +} + +void MqttServer::setMaximumSubscriptionsQoS(Mqtt::QoS maximumSubscriptionQoS) +{ + d_ptr->maximumSubscriptionQoS = maximumSubscriptionQoS; +} + +bool MqttServer::listen(const QHostAddress &address, quint16 port, MqttUserValidator *userValidator) +{ + d_ptr->userValidator = userValidator; + + if (!d_ptr->server->listen(address, port)) { + qCWarning(dbgServer()) << "Error listening on port" << port; + return false; + } + qCDebug(dbgServer) << "nymea MQTT server running on" << address.toString() << ":" << port; + return true; +} + +QStringList MqttServer::clients() const +{ + QStringList clientIds; + foreach (ClientContext *ctx, d_ptr->clientList) { + clientIds << ctx->clientId; + } + return clientIds; +} + +QHash MqttServer::publish(const QString &topic, const QByteArray &payload) +{ + return d_ptr->publish(topic, payload); +} + +void MqttServerPrivate::onNewConnection() +{ + QTcpSocket *client = server->nextPendingConnection(); + + // Start a 10 second timer to clean up the connection if we don't get data until then. + QTimer *timeoutTimer = new QTimer(this); + connect(timeoutTimer, &QTimer::timeout, this, [this, client]() { + qCWarning(dbgServer) << "A client connected but did not send data in 10 seconds. Dropping connection."; + client->abort(); + pendingConnections.take(client)->deleteLater(); + client->deleteLater(); + }); + timeoutTimer->start(10000); + pendingConnections.insert(client, timeoutTimer); + + connect(client, &QTcpSocket::readyRead, this, &MqttServerPrivate::onClientReadyRead); + connect(client, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(onClientError(QAbstractSocket::SocketError))); + connect(client, &QTcpSocket::disconnected, this, &MqttServerPrivate::onClientDisconnected); +} + +void MqttServerPrivate::onClientDisconnected() +{ + QTcpSocket *client = static_cast(sender()); + cleanupClient(client); +} + +void MqttServerPrivate::cleanupClient(QTcpSocket *client) +{ + if (clientBuffers.contains(client)) { + clientBuffers.remove(client); + } + if (clientList.contains(client)) { + ClientContext *ctx = clientList.value(client); + qCDebug(dbgServer) << "Client" << ctx->clientId << "disconnected."; + ctx->keepAliveTimer.stop(); + + if (!ctx->willTopic.isEmpty()) { + qCDebug(dbgServer) << "Publishing will message for client" << ctx->clientId << "on topic" << ctx->willTopic << "( Retain:" << ctx->willRetain << ")"; + MqttPacket willPacket(MqttPacket::TypePublish, ctx->willQoS >= Mqtt::QoS1 ? newPacketId(ctx) : 0, ctx->willQoS, ctx->willRetain); + willPacket.setTopic(ctx->willTopic); + willPacket.setPayload(ctx->willMessage); + processPacket(willPacket, client); + } + + emit q_ptr->clientDisconnected(ctx->clientId); + + clientList.remove(client); + delete ctx; + } + client->flush(); + client->deleteLater(); +} + +void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *client) +{ + if (packet.type() == MqttPacket::TypeConnect) { + if (clientList.contains(client)) { + ClientContext *ctx = clientList.value(client); + qCWarning(dbgServer) << "Client" << ctx->clientId << "sends duplicate CONNECT packets. Dropping connection."; + cleanupClient(client); + return; + } + + MqttPacket response(MqttPacket::TypeConnack, packet.packetId()); + + if (packet.protocolLevel() != Mqtt::Protocol311) { + qCWarning(dbgServer) << "This MQTT broker only supports Protocol version 3.1.1"; + response.setConnectReturnCode(Mqtt::ConnectReturnCodeUnacceptableProtocolVersion); + client->write(response.serialize()); + cleanupClient(client); + return; + } + + QString clientId = packet.clientId(); + if (clientId.isEmpty()) { + if (!packet.cleanSession()) { + qCWarning(dbgServer) << "Empty client id provided but clean session flag not set. Rejecting connection."; + response.setConnectReturnCode(Mqtt::ConnectReturnCodeIdentifierRejected); + client->write(response.serialize()); + cleanupClient(client); + return; + } + clientId = QUuid::createUuid().toString().remove(QRegExp("[{}-]*")); + } + + if (userValidator) { + QString username; + if (packet.connectFlags().testFlag(Mqtt::ConnectFlagUsername)) { + username = packet.username(); + } + QString password; + if (packet.connectFlags().testFlag(Mqtt::ConnectFlagPassword)) { + password = packet.password(); + } + Mqtt::ConnectReturnCode userValidationReturnCode = userValidator->validateConnect(clientId, username, password, client->peerAddress()); + if (userValidationReturnCode != Mqtt::ConnectReturnCodeAccepted) { + qCWarning(dbgServer) << "Rejecting connection due to user validation."; + response.setConnectReturnCode(userValidationReturnCode); + client->write(response.serialize()); + cleanupClient(client); + return; + } + } + + ClientContext *ctx = nullptr; + + QList existingSockets = clientList.keys(); + for (int i = 0; i < existingSockets.count(); i++) { + QTcpSocket *existingClient = existingSockets.at(i); + if (clientId == clientList.value(existingClient)->clientId) { + if (!packet.connectFlags().testFlag(Mqtt::ConnectFlagCleanSession)) { + qCDebug(dbgServer).nospace() << clientId << ": Already have a session for this client ID. Taking over existing session."; + + response.setConnackFlags(Mqtt::ConnackFlagSessionPresent); + ctx = clientList.value(existingClient); + + // remove old client manually, we don't want to clean up the context, nor send any will message or emit disconnected signals + clientList.remove(existingClient); + clientBuffers.remove(existingClient); + existingClient->flush(); + existingClient->deleteLater(); + } else { + qCDebug(dbgServer).nospace() << clientId << ": Already have a session for this client ID. Dropping old session."; + cleanupClient(existingClient); + } + break; + } + } + + if (!ctx) { + if (!packet.connectFlags().testFlag(Mqtt::ConnectFlagCleanSession)) { + qCWarning(dbgServer).nospace() << clientId << ": Request to take over existing session but we don't have an existing session."; + } + + ctx = new ClientContext(); + ctx->clientId = clientId; + + connect(&ctx->keepAliveTimer, &QTimer::timeout, this, [this, client](){ + qCWarning(dbgServer) << "Keep alive timeout reached for client:" << clientList.value(client)->clientId; + cleanupClient(client); + }); + } + + ctx->keepAlive = packet.keepAlive(); + ctx->version = packet.protocolLevel(); + + + if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWill)) { + ctx->willTopic = packet.willTopic(); + ctx->willMessage = packet.willMessage(); + ctx->willRetain = packet.willRetain(); + if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWillQoS2)) { + ctx->willQoS = Mqtt::QoS2; + } else if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWillQoS1)) { + ctx->willQoS = Mqtt::QoS1; + } + } + if (packet.connectFlags().testFlag(Mqtt::ConnectFlagUsername)) { + ctx->username = packet.username(); + } + if (packet.connectFlags().testFlag(Mqtt::ConnectFlagPassword)) { + } + + qCDebug(dbgServer).nospace().noquote() + << "New MQTT client: \"" << clientId << '\"' + << ", Protocol: " << packet.protocolName() << " (" << packet.protocolLevel() << ')' + << ", Flags: " << packet.connectFlags() + << ", KeepAlive: " << packet.keepAlive() + << ", Will Topic: \"" << packet.willTopic() << '\"' + << ", Will Message: \"" << packet.willMessage() << '\"' + << ", Will Retain: " << packet.willRetain() + << ", Username: " << packet.username() + << ", Password: " << QString(packet.password()).replace(QRegExp("."), "*"); + + if (ctx->keepAlive > 0) { + ctx->keepAliveTimer.start(ctx->keepAlive * 1500); + } + + clientList.insert(client, ctx); + response.setConnectReturnCode(Mqtt::ConnectReturnCodeAccepted); + client->write(response.serialize()); + emit q_ptr->clientConnected(ctx->clientId, ctx->username, client->peerAddress()); + + foreach (quint16 retryPacketId, ctx->unackedPacketList) { + qCDebug(dbgServer) << "Resending unacked packet" << retryPacketId << "to" << ctx->clientId;; + MqttPacket retryPacket = ctx->unackedPackets.value(retryPacketId); + retryPacket.setDup(true); + client->write(retryPacket.serialize()); + } + return; + } + + if (!clientList.contains(client)) { + qCWarning(dbgServer) << "Protocol error: Client connection did not send CONNECT yet. Dropping connection."; + client->close(); + return; + } + + ClientContext *ctx = clientList.value(client); + if (ctx->keepAlive > 0) { + ctx->keepAliveTimer.start(); + } + emit q_ptr->clientAlive(ctx->clientId); + + if (packet.type() == MqttPacket::TypePublish) { + qCDebug(dbgServer).nospace() << "Publish received from client " << ctx->clientId << ": Topic: " << packet.topic() << ", Payload: " << packet.payload() << " (Packet ID: " << packet.packetId() << ", DUP: " << packet.dup() << ", QoS: " << packet.qos() << ", Retain: " << packet.retain() << ')'; + emit q_ptr->publishReceived(ctx->clientId, packet.packetId(), packet.topic(), packet.payload(), packet.dup()); + + switch (packet.qos()) { + case Mqtt::QoS0: + break; + case Mqtt::QoS1: { + MqttPacket response(MqttPacket::TypePuback, packet.packetId()); + client->write(response.serialize()); + break; + } + case Mqtt::QoS2: { + if (packet.dup() && ctx->unackedPacketList.contains(packet.packetId())) { + // We received this message before but the client keeps on trying... Just send a PUBREC and stop processing + client->write(ctx->unackedPackets.value(packet.packetId()).serialize()); + return; + } else if (ctx->unackedPacketList.contains(packet.packetId())) { + // Hmm... Client says this is a new packet, but the ID is not released yet! Drop client connection. + qCWarning(dbgServer()).nospace() << "Received a bad packet from \"" << ctx->clientId << "\". DUP is not set but packet ID is already used and not released. Dropping client connection."; + cleanupClient(client); + return; + } + // Ok, a new packet, ack it with a PUBREC and store the number + MqttPacket response(MqttPacket::TypePubrec, packet.packetId()); + ctx->unackedPackets.insert(response.packetId(), response); + ctx->unackedPacketList.append(packet.packetId()); + client->write(response.serialize()); + break; + } + } + if (packet.retain()) { + if (packet.payload().isEmpty()) { + qCDebug(dbgServer) << "Clearing retained messages for topic" << packet.topic(); + retainedMessages.remove(packet.topic()); + } else { + if (packet.qos() == Mqtt::QoS0) { + qCDebug(dbgServer) << "Clearing retained messages for topic" << packet.topic(); + retainedMessages.remove(packet.topic()); + } + qCDebug(dbgServer) << "Adding retained message for topic" << packet.topic(); + retainedMessages[packet.topic()].append(packet); + } + } + + publish(packet.topic(), packet.payload()); + + return; + } + if (packet.type() == MqttPacket::TypePuback) { + ctx->unackedPacketList.removeAll(packet.packetId()); + MqttPacket publishedPacket = ctx->unackedPackets.take(packet.packetId()); + emit q_ptr->published(ctx->clientId, packet.packetId(), publishedPacket.topic(), publishedPacket.payload()); + return; + } + if (packet.type() == MqttPacket::TypePubrec) { + MqttPacket publishedPacket = ctx->unackedPackets.take(packet.packetId()); + emit q_ptr->published(ctx->clientId, packet.packetId(), publishedPacket.topic(), publishedPacket.payload()); + MqttPacket pubrel(MqttPacket::TypePubrel, packet.packetId()); + ctx->unackedPackets.insert(packet.packetId(), pubrel); + client->write(pubrel.serialize()); + return; + } + if (packet.type() == MqttPacket::TypePubrel) { + ctx->unackedPackets.remove(packet.packetId()); + ctx->unackedPacketList.removeAll(packet.packetId()); + MqttPacket response(MqttPacket::TypePubcomp, packet.packetId()); + client->write(response.serialize()); + return; + } + if (packet.type() == MqttPacket::TypePubcomp) { + ctx->unackedPackets.remove(packet.packetId()); + ctx->unackedPacketList.removeAll(packet.packetId()); + return; + } + if (packet.type() == MqttPacket::TypeSubscribe) { +// qCDebug(dbgServer).nospace() << ctx->clientId ": Subscribe packet received."; + MqttPacket response(MqttPacket::TypeSuback, packet.packetId()); + QByteArray payload; + foreach (MqttSubscription subscription, packet.subscriptions()) { + if (userValidator && !userValidator->validateSubscribe(subscription.topicFilter(), ctx->clientId, ctx->username)) { + qCWarning(dbgServer).nospace().noquote() << "Subscription topic filter not allowed for client \"" << ctx->clientId << "\": \"" << subscription.topicFilter() << '\"'; + response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeFailure); + continue; + } + if (!validateTopicFilter(subscription.topicFilter())) { + qCWarning(dbgServer).nospace() << "Subscription topic filter not valid for client \"" << ctx->clientId << "\": " << subscription.topicFilter(); + response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeFailure); + continue; + } + subscription.setQoS(qMin(subscription.qoS(), maximumSubscriptionQoS)); + bool updated = false; + for (int i = 0; i < ctx->subscriptions.count(); i++) { + if (ctx->subscriptions.at(i).topicFilter() == subscription.topicFilter()) { + qCDebug(dbgServer).noquote().nospace() << "Client \"" << ctx->clientId << "\" subscribed with a duplicate topic filter. Replacing subcription with new QoS" << subscription.qoS(); + ctx->subscriptions.replace(i, subscription); + updated = true; + } + } + if (!updated) { + ctx->subscriptions.append(subscription); + } + qCDebug(dbgServer).noquote().nospace() << "Subscribed client \"" << ctx->clientId << "\" to topic filter: \"" << subscription.topicFilter() << "\" with QoS " << subscription.qoS(); + emit q_ptr->clientSubscribed(ctx->clientId, subscription.topicFilter(), subscription.qoS()); + switch (subscription.qoS()) { + case Mqtt::QoS0: + response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS0); + break; + case Mqtt::QoS1: + response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS1); + break; + case Mqtt::QoS2: + response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS2); + break; + } + } + client->write(response.serialize()); + + // Deliver any retained messages for this topic + foreach (MqttSubscription subscription, packet.subscriptions()) { + foreach (const QString &topic, retainedMessages.keys()) { + if (matchTopic(subscription.topicFilter(), topic)) { + foreach (MqttPacket packet, retainedMessages.value(topic)) { + packet.setRetain(true); + client->write(packet.serialize()); + } + } + } + } + return; + } + if (packet.type() == MqttPacket::TypeUnsubscribe) { + MqttSubscriptions newSubscriptions; + foreach (const MqttSubscription &existingSubscription, ctx->subscriptions) { + bool matching = false; + foreach (const MqttSubscription &unsub, packet.subscriptions()) { + if (existingSubscription.topicFilter() == unsub.topicFilter()) { + qCDebug(dbgServer) << "Unsubscribing client" << ctx->clientId << "from" << unsub.topicFilter(); + emit q_ptr->clientUnsubscribed(ctx->clientId, unsub.topicFilter()); + matching = true; + break; + } + } + if (!matching) { + newSubscriptions.append(existingSubscription); + } + } + ctx->subscriptions = newSubscriptions; + MqttPacket response(MqttPacket::TypeUnsuback, packet.packetId()); + client->write(response.serialize()); + return; + } + if (packet.type() == MqttPacket::TypePingreq) { +// qCDebug(dbgServer).nospace() << ctx->clientId << ": Pingreq received"; + MqttPacket response(MqttPacket::TypePingresp, packet.packetId()); + client->write(response.serialize()); + return; + } + if (packet.type() == MqttPacket::TypeDisconnect) { + ctx->willMessage.clear(); + ctx->willTopic.clear(); + return; + } + qCWarning(dbgServer).nospace().noquote() << "Unknown packet received from client \"" << ctx->clientId << "\": " << QString::number(packet.type(), 16); + Q_ASSERT(false); + cleanupClient(client); + +} + +bool MqttServerPrivate::validateTopicFilter(const QString &topicFilter) +{ + if (topicFilter.length() < 1) { + return false; + } + QStringList parts = topicFilter.split('/'); + for (int i = 0; i < parts.count(); i++) { + const QString &part = parts.at(i); + if (part.contains('#') && (part != '#' || i != parts.count() - 1)) { + return false; + } + if (part.contains('+') && part != '+') { + return false; + } + } + + return true; +} + +bool MqttServerPrivate::matchTopic(const QString &topicFilter, const QString &topic) +{ + if (topic.startsWith('$')) { + return false; + } + + QStringList filterParts = topicFilter.split('/'); + QStringList topicParts = topic.split('/'); + + if (topicParts.count() < filterParts.count() - 1) { + return false; + } + + for (int i = 0; i < filterParts.count(); i++) { + if (filterParts.at(i) == '+') { + continue; + } + if (filterParts.at(i) == '#') { + continue; + } + if (topicParts.at(i) == filterParts.at(i)) { + continue; + } + return false; + } + + return filterParts.count() == topicParts.count() || topicFilter.endsWith('#'); +} + +quint16 MqttServerPrivate::newPacketId(ClientContext *ctx) +{ + static quint16 packetId = 0; + do { + packetId++; + } while(ctx->unackedPacketList.contains(packetId)); + return packetId; +} + +void MqttServerPrivate::onClientReadyRead() +{ + QTcpSocket* client = static_cast(sender()); + + clientBuffers[client].append(client->readAll()); + + do { + MqttPacket packet; + int ret = packet.parse(clientBuffers[client]); + if (ret == 0) { + qCDebug(dbgServer) << "Packet too short... Waiting for more..."; + return; + } + + // Ok, we've got a full packet (or garbage data). If this client is still pending + // we can stop the timer, the protocol will take it from here. + if (pendingConnections.contains(client)) { + pendingConnections.take(client)->deleteLater(); + } + + if (ret == -1) { + qCWarning(dbgServer) << "Bad MQTT packet data, Dropping connection"; + cleanupClient(client); + return; + } + + clientBuffers[client].remove(0, ret); + + processPacket(packet, client); + + } while (!clientBuffers.value(client).isEmpty()); +} + +void MqttServerPrivate::onClientError(QAbstractSocket::SocketError error) +{ +// qCWarning(dbgServer) << "Client error:" << error; +} diff --git a/libnymea-mqtt/mqttserver.h b/libnymea-mqtt/mqttserver.h new file mode 100644 index 0000000..1120e36 --- /dev/null +++ b/libnymea-mqtt/mqttserver.h @@ -0,0 +1,58 @@ +#ifndef MQTTSERVER_H +#define MQTTSERVER_H + +#include +#include +#include +#include +#include + +#include "mqttpacket.h" + +class MqttServerPrivate; +class Subscription; + +class MqttUserValidator { +public: + virtual ~MqttUserValidator() = default; + virtual Mqtt::ConnectReturnCode validateConnect(const QString &clientId, const QString &username, const QString &password, const QHostAddress &peerAddress) = 0; + virtual bool validateSubscribe(const QString &topicFilter, const QString &clientId, const QString &username) = 0; +}; + +class MqttServer : public QObject +{ + Q_OBJECT +public: + explicit MqttServer(QObject *parent = nullptr); + + Mqtt::QoS maximumSubscriptionsQoS() const; + void setMaximumSubscriptionsQoS(Mqtt::QoS maximumSubscriptionQoS); + + bool listen(const QHostAddress &address = QHostAddress::Any, quint16 port = 1883, MqttUserValidator *userValidator = nullptr); + + QStringList clients() const; + + // allows publishing from the server, including topcis starting with $ + QHash publish(const QString &topic, const QByteArray &payload = QByteArray()); + +signals: + // emitted whenever a client connects, after the mqtt connect handshake has been done. + void clientConnected(const QString &clientId, const QString &username, const QHostAddress &clientAddress); + // emitted whenever a client disconnects, that is, when a DISCONNECT message has been received or the keep alive timeout has been reached. + void clientDisconnected(const QString &clientId); + // emitted whenever a client has been seen, that is, a control message or a keep alive message has been received. + void clientAlive(const QString &clientId); + // emitted whenever a client subscribes, a client can also subscribe to topics starting with $ but those won't be relayed from other clients. Only internal server publishes to $ topcis will be sent to subscribed clients. + void clientSubscribed(const QString &clientId, const QString &topicFilter, Mqtt::QoS requestedQoS); + // emitted whenever a client unsubscribes from a topic + void clientUnsubscribed(const QString &clientId, const QString &topicFiltr); + // emitted whenever a publish message is received from a client before the message is relayed to other clients. Topics starting with $ will be received here, but not relayed to other clients. + void publishReceived(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload, bool dup); + // emitted whenever a publish message is sent to a client. Note: this might be fired often if many clients are connected and subsribed to matching topic filters. + void published(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload); + +private: + MqttServerPrivate *d_ptr; +}; + +#endif // MQTTSERVER_H diff --git a/libnymea-mqtt/mqttserver_p.h b/libnymea-mqtt/mqttserver_p.h new file mode 100644 index 0000000..4b9464b --- /dev/null +++ b/libnymea-mqtt/mqttserver_p.h @@ -0,0 +1,73 @@ +#ifndef MQTTSERVER_P_H +#define MQTTSERVER_P_H + +#include +#include +#include +#include +#include + +#include "mqttpacket.h" +#include "mqttserver.h" + +Q_DECLARE_LOGGING_CATEGORY(dbgServer) + +class ClientContext; +class Subscription; + +class MqttServerPrivate: public QObject +{ + Q_OBJECT +public: + explicit MqttServerPrivate(MqttServer *q); + + QHash publish(const QString &topic, const QByteArray &payload = QByteArray()); + +public: + void cleanupClient(QTcpSocket *client); + + void processPacket(const MqttPacket &packet, QTcpSocket *client); + bool validateTopicFilter(const QString &topicFilter); + bool matchTopic(const QString &topicFilter, const QString &topic); + quint16 newPacketId(ClientContext *ctx); + +public slots: + void onNewConnection(); + void onClientReadyRead(); + void onClientError(QAbstractSocket::SocketError); + void onClientDisconnected(); + +public: + MqttServer *q_ptr; + + QTcpServer *server = nullptr; + MqttUserValidator *userValidator = nullptr; + + Mqtt::QoS maximumSubscriptionQoS = Mqtt::QoS2; + + QHash pendingConnections; + QHash clientList; + QHash clientBuffers; + QHash retainedMessages; +}; + +class ClientContext { +public: + Mqtt::Protocol version = Mqtt::ProtocolUnknown; + quint16 keepAlive = 0; + QTimer keepAliveTimer; + QString clientId; + QString username; + QByteArray willTopic; + QByteArray willMessage; + Mqtt::QoS willQoS = Mqtt::QoS0; + bool willRetain = false; + + QByteArray inputBuffer; + MqttSubscriptions subscriptions; + + QVector unackedPacketList; + QHash unackedPackets; +}; + +#endif // MQTTSERVER_P_H diff --git a/libnymea-mqtt/mqttsubscription.cpp b/libnymea-mqtt/mqttsubscription.cpp new file mode 100644 index 0000000..e88d0b5 --- /dev/null +++ b/libnymea-mqtt/mqttsubscription.cpp @@ -0,0 +1,38 @@ +#include "mqttsubscription.h" + +MqttSubscription::MqttSubscription() +{ + +} + +MqttSubscription::MqttSubscription(const QByteArray &topicFilter, Mqtt::QoS qoS): + m_topicFilter(topicFilter), + m_qoS(qoS) +{ + +} + +QByteArray MqttSubscription::topicFilter() const +{ + return m_topicFilter; +} + +void MqttSubscription::setTopicFilter(const QByteArray &topicFilter) +{ + m_topicFilter = topicFilter; +} + +Mqtt::QoS MqttSubscription::qoS() const +{ + return m_qoS; +} + +void MqttSubscription::setQoS(Mqtt::QoS qoS) +{ + m_qoS = qoS; +} + +bool MqttSubscription::operator==(const MqttSubscription &other) const +{ + return m_qoS == other.qoS() && m_topicFilter == other.topicFilter(); +} diff --git a/libnymea-mqtt/mqttsubscription.h b/libnymea-mqtt/mqttsubscription.h new file mode 100644 index 0000000..42e122d --- /dev/null +++ b/libnymea-mqtt/mqttsubscription.h @@ -0,0 +1,36 @@ +#ifndef MQTTSUBSCRIPTION_H +#define MQTTSUBSCRIPTION_H + +#include "mqtt.h" +#include +#include + +class MqttSubscription +{ +public: + MqttSubscription(); + MqttSubscription(const QByteArray &topicFilter, Mqtt::QoS qoS = Mqtt::QoS0); + + QByteArray topicFilter() const; + void setTopicFilter(const QByteArray &topicFilter); + + Mqtt::QoS qoS() const; + void setQoS(Mqtt::QoS qoS); + + bool operator==(const MqttSubscription &other) const; +private: + QByteArray m_topicFilter; + Mqtt::QoS m_qoS = Mqtt::QoS0; + +}; +Q_DECLARE_METATYPE(MqttSubscription) + +typedef QVector MqttSubscriptions; +Q_DECLARE_METATYPE(MqttSubscriptions) + +inline QDebug operator<<(QDebug debug, const MqttSubscription &subscription) { + debug.nospace().noquote() << "\"" << subscription.topicFilter() << "\" QoS: " << subscription.qoS(); + return debug; +} + +#endif // MQTTSUBSCRIPTION_H diff --git a/nymea-mqtt.pri b/nymea-mqtt.pri new file mode 100644 index 0000000..e83601a --- /dev/null +++ b/nymea-mqtt.pri @@ -0,0 +1,6 @@ +QMAKE_CXXFLAGS *= -Werror -std=c++11 -g +QMAKE_LFLAGS *= -std=c++11 + +top_srcdir=$$PWD +top_builddir=$$shadowed($$PWD) + diff --git a/nymea-mqtt.pro b/nymea-mqtt.pro new file mode 100644 index 0000000..3d38718 --- /dev/null +++ b/nymea-mqtt.pro @@ -0,0 +1,6 @@ +TEMPLATE = subdirs +SUBDIRS += libnymea-mqtt server tests + +server.depends = libnymea-mqtt +tests.depends = libnymea-mqtt + diff --git a/server/main.cpp b/server/main.cpp new file mode 100644 index 0000000..fc7d506 --- /dev/null +++ b/server/main.cpp @@ -0,0 +1,13 @@ +#include + +#include "mqttserver.h" + +int main(int argc, char *argv[]) +{ + QCoreApplication a(argc, argv); + + MqttServer server; + server.listen(QHostAddress::AnyIPv4, 1883); + + return a.exec(); +} diff --git a/server/server.pro b/server/server.pro new file mode 100644 index 0000000..1c5be81 --- /dev/null +++ b/server/server.pro @@ -0,0 +1,12 @@ +TEMPLATE = app +TARGET = nymea-mqttserver + +include(../nymea-mqtt.pri) + +QT += network + +INCLUDEPATH += $$top_srcdir/libnymea-mqtt/ + +SOURCES += main.cpp + +LIBS += -L$$top_builddir/libnymea-mqtt/ -lnymea-mqtt diff --git a/tests/operation/operation.pro b/tests/operation/operation.pro new file mode 100644 index 0000000..ad286d3 --- /dev/null +++ b/tests/operation/operation.pro @@ -0,0 +1,16 @@ +QT += testlib network +QT -= gui + +CONFIG += qt console warn_on depend_includepath testcase +CONFIG -= app_bundle + +TEMPLATE = app + +include(../../nymea-mqtt.pri) + + +INCLUDEPATH += $$top_srcdir/libnymea-mqtt/ + +SOURCES += test_operation.cpp + +LIBS += -L$$top_builddir/libnymea-mqtt/ -lnymea-mqtt diff --git a/tests/operation/test_operation.cpp b/tests/operation/test_operation.cpp new file mode 100644 index 0000000..0edf6c3 --- /dev/null +++ b/tests/operation/test_operation.cpp @@ -0,0 +1,794 @@ +#include "mqttserver.h" +#include "mqttclient.h" +#include "mqttclient_p.h" + +#include +#include + + +class OperationTests: public QObject +{ + Q_OBJECT + +private slots: + void initTestCase(); + void cleanup(); + + void connectAndDisconnect(); + void keepAliveTimesOut(); + + void subscribeAndPublish_data(); + void subscribeAndPublish(); + + void willIsSentOnClientDisappearing(); + void willIsNotSentOnClientDisconnecting(); + + void testWillRetain(); + + void testAutoReconnect(); + + void testQoS1Retransmissions(); + + void testMultiSubscription(); + + void testSubscriptionTopicFilters_data(); + void testSubscriptionTopicFilters(); + + void testSubscriptionTopicMatching_data(); + void testSubscriptionTopicMatching(); + + void testSessionManagementDropOldSession(); + void testSessionManagementResumeOldSession(); + void testSessionManagementFailResumeOldSession(); + + void testQoS1PublishToServerIsAckedOnSessionResume(); + void testQoS1PublishToClientIsDeliveredOnSessionResume(); + + void testQoS2PublishToServerIsCompletedOnSessionResume(); + + void testQoS2PublishToClientIsCompletedOnSessionResume(); + + void testRetain(); + + void testUnsubscribe(); + + void testEmptyClientId(); + +private: + // Connects and waits for the MQTT CONNECT to be finished + MqttClient *connectAndWait(const QString &clientId, bool cleanSession = true, quint16 keepAlive = 300, const QString &willTopic = QString(), const QString &willMessage = QString(), Mqtt::QoS willQoS = Mqtt::QoS0, bool willRetain = false); + + // Just connects, returns the client and signalspy which has been created before calling connect. You must delete the spy yourself! + QPair connectToServer(const QString &clientId, bool cleanSession = true, quint16 keepAlive = 300, const QString &willTopic = QString(), const QString &willMessage = QString(), Mqtt::QoS willQoS = Mqtt::QoS0, bool willRetain = false); + + void disconnectAndWait(MqttClient* client); + + bool subscribeAndWait(MqttClient* client, const QString &topic, Mqtt::QoS qos = Mqtt::QoS1); + +private: + QString m_serverHost = "127.0.0.1"; + quint16 m_serverPort = 5555; + MqttServer *m_server = nullptr; + + QList m_clients; +}; + +MqttClient *OperationTests::connectAndWait(const QString &clientId, bool cleanSession, quint16 keepAlive, const QString &willTopic, const QString &willMessage, Mqtt::QoS willQoS, bool willRetain) +{ + QPair result = connectToServer(clientId, cleanSession, keepAlive, willTopic, willMessage, willQoS, willRetain); + if (result.second->count() == 0) { + result.second->wait(); + } + + if (result.second->count() == 0) { + qWarning() << "WARNING: Client didn't emit connected"; + } + delete result.second; + return result.first; +} + +QPair OperationTests::connectToServer(const QString &clientId, bool cleanSession, quint16 keepAlive, const QString &willTopic, const QString &willMessage, Mqtt::QoS willQoS, bool willRetain) +{ + MqttClient* client = new MqttClient(clientId, keepAlive, willTopic, willMessage.toUtf8(), willQoS, willRetain, this); + client->setAutoReconnect(false); + + m_clients.append(client); + + QSignalSpy *spy = new QSignalSpy(client, &MqttClient::connected); + client->connectToHost(m_serverHost, m_serverPort, cleanSession); + return qMakePair(client, spy); +} + +void OperationTests::disconnectAndWait(MqttClient* client) +{ + QSignalSpy disconnectedSpy(client, &MqttClient::disconnected); + client->disconnectFromHost(); + if (disconnectedSpy.count() == 0) { + disconnectedSpy.wait(); + } +} + +bool OperationTests::subscribeAndWait(MqttClient* client, const QString &topic, Mqtt::QoS qos) +{ + QSignalSpy subscribedSpy(client, &MqttClient::subscribed); + quint16 packetId = client->subscribe(topic, qos); + if (subscribedSpy.count() == 0) { + subscribedSpy.wait(); + } + Mqtt::SubscribeReturnCode expectedSubscribeReturnCode = qos == Mqtt::QoS0 ? Mqtt::SubscribeReturnCodeSuccessQoS0 : qos == Mqtt::QoS1 ? Mqtt::SubscribeReturnCodeSuccessQoS1 : Mqtt::SubscribeReturnCodeSuccessQoS2; + return subscribedSpy.count() == 1 && subscribedSpy.first().at(0).toInt() == packetId && subscribedSpy.first().at(1).value().first() == expectedSubscribeReturnCode; +} + +void OperationTests::initTestCase() +{ +// QLoggingCategory::setFilterRules("nymea.mqtt.protocol.debug=false"); + + m_server = new MqttServer(this); + + bool registered = false; + quint16 attempts = 0; + do { + registered = m_server->listen(QHostAddress(m_serverHost), m_serverPort + attempts); + } while(!registered && attempts++ < 20); + + QVERIFY2(registered, QString("Failed to register server on %1 from port %2 to %3. Tests won't work.").arg(m_serverHost).arg(m_serverPort).arg(m_serverPort+attempts).toUtf8().data()); + + m_serverPort += attempts; +} + +void OperationTests::cleanup() +{ + while (!m_clients.isEmpty()) { + MqttClient *client = m_clients.takeFirst(); + client->disconnectFromHost(); + client->deleteLater(); + } + QTRY_COMPARE(m_server->clients().count(), 0); +} + +void OperationTests::connectAndDisconnect() +{ + QSignalSpy serverSpy(m_server, &MqttServer::clientConnected); + + QString clientId = "connectAndDisconnect-client"; + MqttClient* client = connectAndWait(clientId); + + QVERIFY2(serverSpy.count() == 1, "Server didn't emit clientConnected"); + QVERIFY2(serverSpy.at(0).first() == clientId, "ClientId not matching on server side."); + + QSignalSpy serverSpyDisconnect(m_server, &MqttServer::clientDisconnected); + QSignalSpy clientSpy(client, &MqttClient::disconnected); + client->disconnectFromHost(); + QTRY_VERIFY2(clientSpy.count() == 1, "client didn't emit disconnected"); + QTRY_VERIFY2(serverSpyDisconnect.count() == 1, "Server didn't emit clientDisconnected"); + QVERIFY2(serverSpyDisconnect.at(0).first() == clientId, "ClientId not matching on server side."); +} + +void OperationTests::keepAliveTimesOut() +{ + QSignalSpy keepAliveSpy(m_server, &MqttServer::clientAlive); + MqttClient *client = connectAndWait("keepAlive1sec-client", true, 1); + client->setAutoReconnect(false); + QTest::qWait(2000); + qDebug() << "Received" << keepAliveSpy.count() << "keep alive messages"; + QVERIFY2(client->isConnected(), "Client connection dropped"); + QVERIFY2(keepAliveSpy.count() > 0, "Keep alive not received"); + + client->disconnectFromHost(); + + keepAliveSpy.clear(); + + client = connectAndWait("timeout1sec-client", true, 1); + client->setAutoReconnect(false); + client->d_ptr->keepAliveTimer.stop(); // disable the keepalive timer + QTest::qWait(2000); + qDebug() << "Received" << keepAliveSpy.count() << "keep alive messages"; + QVERIFY2(!client->isConnected(), "Client connection still alive but it should have been dropped"); +} + +void OperationTests::subscribeAndPublish_data() +{ + QTest::addColumn("qosClient1"); + QTest::addColumn("qosClient2"); + + QList > rows; + rows.append({Mqtt::QoS0, Mqtt::QoS0}); + rows.append({Mqtt::QoS0, Mqtt::QoS1}); + rows.append({Mqtt::QoS0, Mqtt::QoS2}); + rows.append({Mqtt::QoS1, Mqtt::QoS0}); + rows.append({Mqtt::QoS1, Mqtt::QoS1}); + rows.append({Mqtt::QoS1, Mqtt::QoS2}); + rows.append({Mqtt::QoS2, Mqtt::QoS0}); + rows.append({Mqtt::QoS2, Mqtt::QoS1}); + rows.append({Mqtt::QoS2, Mqtt::QoS2}); + + foreach (const QList &row, rows) { + QTest::newRow(QString("Subscribe QoS%1 -> Publish QoS%2").arg(row.at(0)).arg(row.at(1)).toUtf8().data()) << row.at(0) << row.at(1); + } +} + +void OperationTests::subscribeAndPublish() +{ + QFETCH(Mqtt::QoS, qosClient1); + QFETCH(Mqtt::QoS, qosClient2); + + QString clientId1 = QString("subQoS%1-client").arg(qosClient1); + MqttClient *client1 = connectAndWait(clientId1); + QString clientId2 = QString("pubQoS%1-client").arg(qosClient2); + MqttClient *client2 = connectAndWait(clientId2); + + QSignalSpy serverSubscribeSpy(m_server, &MqttServer::clientSubscribed); + QSignalSpy clientSubscribeSpy(client1, &MqttClient::subscribed); + + quint16 packetId = client1->subscribe("#", qosClient1); + + QTRY_VERIFY2(serverSubscribeSpy.count() == 1, "Server did not emit clientSubscribed"); + QVERIFY2(serverSubscribeSpy.first().first() == clientId1, "Client Id not matching"); + QVERIFY2(serverSubscribeSpy.first().at(1) == "#", "Topic not matching"); + QVERIFY2(serverSubscribeSpy.first().at(2) == qosClient1, "QoS not matching"); + + QTRY_VERIFY2(clientSubscribeSpy.count() == 1, "Client did not emit subscribed"); + QVERIFY2(clientSubscribeSpy.first().first() == packetId, "Packet ID not matching"); + QVERIFY2(clientSubscribeSpy.first().at(1).value().count() == 1, "Subscribe return code count not matching"); + + QSignalSpy serverPublishReceivedSpy(m_server, &MqttServer::publishReceived); + QSignalSpy serverPublishedSpy(m_server, &MqttServer::published); + QSignalSpy client1PublishReceivedSpy(client1, &MqttClient::publishReceived); + QSignalSpy client2PublishedSpy(client2, &MqttClient::published); + + packetId = client2->publish("/testtopic/", "Hello world", qosClient2); + + QTRY_VERIFY2(serverPublishReceivedSpy.count() == 1, "Server did not emit publishReceived"); + QVERIFY2(serverPublishReceivedSpy.first().at(0) == clientId2, "Server did emit publishReceived signal but client ID is not matching"); + QVERIFY2(serverPublishReceivedSpy.first().at(1) == packetId, QString("Server did emit publishReceived signal but Packet ID is not matching:\nActual: %1\nExpected: %2").arg(serverPublishReceivedSpy.first().at(1).toInt()).arg(packetId).toUtf8().data()); + QVERIFY2(serverPublishReceivedSpy.first().at(2) == "/testtopic/", "Server did emit publishReceived signal but topic is not matching"); + QVERIFY2(serverPublishReceivedSpy.first().at(3) == "Hello world", "Server did emit publishReceived signal but payload is not matching"); + + QTRY_VERIFY2(serverPublishedSpy.count() == 1, "Server did not emit published"); + QVERIFY2(serverPublishedSpy.first().at(0) == clientId1, "Server did emit published signal but client ID is not matching"); + + QTRY_VERIFY2(client1PublishReceivedSpy.count() == 1, "Subscribing client did not emit publishReceived signal"); + QVERIFY2(client1PublishReceivedSpy.first().at(0) == "/testtopic/", "Subscribing client did emit publishReceived signal but topic is not matching"); + QVERIFY2(client1PublishReceivedSpy.first().at(1) == "Hello world", "Subscribing client did emit publishReceived signal but payload is not matching"); + + QTRY_VERIFY2(client2PublishedSpy.count() == 1, "Publishing client did not emit published signal"); + QVERIFY2(client2PublishedSpy.first().first() == packetId, "Publishing client did emit published signal but packet ID not matching"); + +} + +void OperationTests::willIsSentOnClientDisappearing() +{ + MqttClient *client1 = connectAndWait("subWill-client"); + MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye"); + + QSignalSpy publishSpy(client1, &MqttClient::publishReceived); + + QVERIFY(subscribeAndWait(client1, "#")); + + client2->d_ptr->socket->abort(); + + QTRY_VERIFY2(publishSpy.count() == 1, "Will has not been sent"); + QVERIFY2(publishSpy.first().at(0) == "/testtopic", "Will topic not matching"); + QVERIFY2(publishSpy.first().at(1) == "Bye bye", "Will message not matching"); +} + +void OperationTests::willIsNotSentOnClientDisconnecting() +{ + MqttClient *client1 = connectAndWait("subWill-client"); + MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye"); + + QSignalSpy subscribeSpy(client1, &MqttClient::subscribed); + QSignalSpy publishSpy(client1, &MqttClient::publishReceived); + + client1->subscribe("#"); + subscribeSpy.wait(); + + client2->disconnectFromHost(); + + publishSpy.wait(200); + QVERIFY2(publishSpy.count() == 0, "Will has been sent but it should not have been"); +} + +void OperationTests::testWillRetain() +{ + MqttClient *client1 = connectAndWait("subWill-client"); + MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye", Mqtt::QoS1, true); + + QSignalSpy subscribeSpy(client1, &MqttClient::subscribed); + QSignalSpy publishSpy(client1, &MqttClient::publishReceived); + + client1->subscribe("#"); + subscribeSpy.wait(); + + client2->setAutoReconnect(false); + client2->d_ptr->socket->abort(); + + QTRY_VERIFY2(publishSpy.count() == 1, "Will has not been sent"); + QVERIFY2(publishSpy.first().at(0) == "/testtopic", QString("Will topic not matching: %1").arg(publishSpy.first().at(0).toString()).toUtf8().data()); + QVERIFY2(publishSpy.first().at(1) == "Bye bye", "Will message not matching"); + QVERIFY2(publishSpy.first().at(2) == false, "Retain flag not matching"); + + MqttClient *client3 = connectAndWait("subWill-client2"); + QSignalSpy retainedWillSpy(client3, &MqttClient::publishReceived); + + client3->subscribe("#"); + QTRY_VERIFY2(retainedWillSpy.count() == 1, "Retained Will has not been sent"); + QVERIFY2(retainedWillSpy.first().at(0) == "/testtopic", "Will topic not matching"); + QVERIFY2(retainedWillSpy.first().at(1) == "Bye bye", "Will message not matching"); + QVERIFY2(retainedWillSpy.first().at(2) == true, "Retain flag not matching"); + + // Clear retain on /testtopic + QSignalSpy clearRetainSpy(client3, &MqttClient::published); + client3->publish("/testtopic", QByteArray(), Mqtt::QoS1, true); + QTRY_VERIFY2(clearRetainSpy.count() == 1, "Clearing retain message did not succeed"); +} + +void OperationTests::testAutoReconnect() +{ + MqttClient *client1 = connectAndWait("client1"); + client1->setAutoReconnect(true); + + QSignalSpy disconnectedSpy(client1, &MqttClient::disconnected); + QSignalSpy connectedSpy(client1, &MqttClient::connected); + + client1->d_ptr->socket->abort(); + + QTRY_VERIFY2(disconnectedSpy.count() == 1, "client did not emit disconnected"); + QTRY_VERIFY2(connectedSpy.count() == 1, "client did not emit connected"); +} + +void OperationTests::testQoS1Retransmissions() +{ + QSignalSpy serverSpy(m_server, &MqttServer::publishReceived); + + MqttClient *client = connectAndWait("client1"); + client->setAutoReconnect(true); + + // publish a packet, flush the pipe and immediately drop the connection before we have a chance to receive the PUBACK + int packetId = client->publish("/testtopic", "Hello world", Mqtt::QoS1); + client->d_ptr->socket->flush(); + QSignalSpy connectedSpy(client, &MqttClient::connected); + client->d_ptr->socket->abort(); + + // Wait for it to reconnect, it should then republish the packet + connectedSpy.wait(); + + QTRY_VERIFY2(serverSpy.count() == 2, "Server didn't receive the publication twice but it should have"); + QCOMPARE(serverSpy.at(0).at(0).toString(), QString("client1")); + QCOMPARE(serverSpy.at(0).at(1).toInt(), packetId); + QCOMPARE(serverSpy.at(0).at(2).toString(), QString("/testtopic")); + QCOMPARE(serverSpy.at(0).at(3).toString(), QString("Hello world")); + QCOMPARE(serverSpy.at(0).at(4).toBool(), false); + + QCOMPARE(serverSpy.at(1).at(0).toString(), QString("client1")); + QCOMPARE(serverSpy.at(1).at(1).toInt(), packetId); + QCOMPARE(serverSpy.at(1).at(2).toString(), QString("/testtopic")); + QCOMPARE(serverSpy.at(1).at(3).toString(), QString("Hello world")); + QCOMPARE(serverSpy.at(1).at(4).toBool(), true); +} + +void OperationTests::testMultiSubscription() +{ + MqttClient *client = connectAndWait("subscription-topics"); + QSignalSpy subscribedSpy(client, &MqttClient::subscribed); + + MqttSubscriptions subscriptions = { MqttSubscription("topic1"), MqttSubscription("topic2") , MqttSubscription("#invalid") }; + Mqtt::SubscribeReturnCodes subscriptionReturnCodes = { Mqtt::SubscribeReturnCodeSuccessQoS0, Mqtt::SubscribeReturnCodeSuccessQoS0, Mqtt::SubscribeReturnCodeFailure}; + + client->subscribe(subscriptions); + QTRY_VERIFY2(subscribedSpy.count() == 1, "Subscribed signal not received"); + + Mqtt::SubscribeReturnCodes retCodes = subscribedSpy.first().at(1).value(); + QCOMPARE(retCodes, subscriptionReturnCodes); +} + +void OperationTests::testSubscriptionTopicFilters_data() +{ + QTest::addColumn("topicFilter"); + QTest::addColumn("subscriptionReturnCode"); + + QTest::newRow("a") << "a" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("/") << "/" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("/a") << "/a" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("//") << "//" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("/a/") << "/a/" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("/a/b") << "/a/b" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("//b") << "//b" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("#") << "#" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("a/#") << "a/#" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("a/b#") << "a/b#" << Mqtt::SubscribeReturnCodeFailure; + QTest::newRow("a/b/#/c") << "a/b/#/c" << Mqtt::SubscribeReturnCodeFailure; + QTest::newRow("+") << "+" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("+/a/#") << "+/a/#" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("a+") << "a+" << Mqtt::SubscribeReturnCodeFailure; + QTest::newRow("a/+/b") << "a/+/b" << Mqtt::SubscribeReturnCodeSuccessQoS0; + QTest::newRow("+/a/#") << "+/a/#" << Mqtt::SubscribeReturnCodeSuccessQoS0; +} + +void OperationTests::testSubscriptionTopicFilters() +{ + QFETCH(QString, topicFilter); + QFETCH(Mqtt::SubscribeReturnCode, subscriptionReturnCode); + + MqttClient *client = connectAndWait("subscription-topics"); + QSignalSpy subscribedSpy(client, &MqttClient::subscribed); + client->subscribe(topicFilter); + QTRY_VERIFY2(subscribedSpy.count() == 1, "Subscribed signal not received"); + + Mqtt::SubscribeReturnCodes retCodes = subscribedSpy.first().at(1).value(); + QCOMPARE(retCodes.first(), subscriptionReturnCode); +} + +void OperationTests::testSubscriptionTopicMatching_data() +{ + QTest::addColumn("topicFilter"); + QTest::addColumn("topic"); + QTest::addColumn("receivedPublishMessageCount"); + + QList rows; + rows.append({ "a", "a", "1" }); + rows.append({ "a", "b", "0" }); + rows.append({ "/", "/" , "1" }); + rows.append({ "/", "/a" , "0" }); + rows.append({ "#", "a", "1" }); + rows.append({ "#", "a/b", "1" }); + rows.append({ "+", "a", "1" }); + rows.append({ "+", "a/", "0" }); + rows.append({ "+", "/a" , "0" }); + rows.append({ "+", "a/b", "0" }); + + rows.append({ "/#", "/" , "1" }); + rows.append({ "/+", "/a" , "1" }); + rows.append({ "/a", "/a" , "1" }); + rows.append({ "/a", "/a" , "1" }); + rows.append({ "a/+", "a", "0" }); + rows.append({ "a/+", "a/", "1" }); + rows.append({ "a/+", "a/b", "1" }); + rows.append({ "a/+", "a/b/c", "0" }); + rows.append({ "+/+", "/a" , "1" }); + rows.append({ "+/+", "/a" , "1" }); + rows.append({ "+/+", "a/" , "1" }); + rows.append({ "a/#", "a/b", "1" }); + rows.append({ "a/#", "a", "1" }); + rows.append({ "a/#", "/", "0" }); + rows.append({ "a/#", "a/b/c", "1" }); + rows.append({ "a/#", "b/c", "0" }); + rows.append({ "a//", "a//", "1" }); + rows.append({ "a//#", "a//b", "1" }); + rows.append({ "a/b/+", "a/b/c", "1" }); + rows.append({ "a/b/+", "a/b/d", "1" }); + rows.append({ "a/b/+", "a/b/c/d", "0" }); + rows.append({ "+/a/#", "a/a/b", "1" }); + rows.append({ "+/a/#", "a/a/b/c", "1" }); + rows.append({ "+/a/#", "d/a/b/c", "1" }); + rows.append({ "+/a/#", "a/b/c/d", "0" }); + rows.append({ "a/b/#", "a/b/c", "1" }); + rows.append({ "a//+/", "a//b/", "1" }); + rows.append({ "a//+/", "a///", "1" }); + rows.append({ "a//+/#", "a//b/c", "1" }); + rows.append({ "a//+/#", "a/b/c/d", "0" }); + + rows.append({ "$SYS/", "$SYS/", "0" }); + rows.append({ "#", "$SYS/", "0" }); + rows.append({ "+/", "$SYS/", "0" }); + + foreach (const QStringList &row, rows) { + QTest::newRow(QString("%1, %2").arg(row.at(0), row.at(1)).toUtf8().data()) << row.at(0) << row.at(1) << row.at(2).toInt(); + } +} + +void OperationTests::testSubscriptionTopicMatching() +{ + QFETCH(QString, topicFilter); + QFETCH(QString, topic); + QFETCH(int, receivedPublishMessageCount); + + MqttClient *publisher = connectAndWait("publisher"); + MqttClient *subscriber = connectAndWait("subscriber"); + + QSignalSpy subscribedSpy(subscriber, &MqttClient::subscribed); + QSignalSpy publishReceivedSpy(subscriber, &MqttClient::publishReceived); + QSignalSpy publishedSpy(publisher, &MqttClient::published); + + subscriber->subscribe(topicFilter); + QTRY_VERIFY2(subscribedSpy.count() == 1, "Subscribed signal not received"); + + publisher->publish(topic, "testpayload"); + QTRY_VERIFY2(publishedSpy.count() == 1, "Published signal not received"); + + if (receivedPublishMessageCount == 0) { + // Give it some time to wait for a publishReceived (It should not show up) + QTest::qWait(500); + } else if (publishReceivedSpy.count() == 0) { + publishReceivedSpy.wait(); + } + QVERIFY2(publishReceivedSpy.count() == receivedPublishMessageCount, QString("PublishReceived signal not received the expected amount of time.\nActual: %1\nExpected: %2").arg(publishReceivedSpy.count()).arg(receivedPublishMessageCount).toUtf8().data()); +} + +void OperationTests::testSessionManagementDropOldSession() +{ + MqttClient *client1Session1 = connectAndWait("client1"); + client1Session1->setAutoReconnect(false); + + QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribed); + client1Session1->subscribe("/testtopic"); + QTRY_VERIFY(subscribeSpy.count() == 1); + + QSignalSpy disconnectedSpy(client1Session1, &MqttClient::disconnected); + + QPair client1Session2 = connectToServer("client1"); + if (client1Session2.second->count() == 0) { + client1Session2.second->wait(); + } + QVERIFY2(!client1Session2.second->first().at(0).value().testFlag(Mqtt::ConnackFlagSessionPresent), "Session present flag is set while it should not be."); + + QTRY_VERIFY2(disconnectedSpy.count() == 1, "First instance didn't get disconnected when new instance connected."); + + + // Now connect with another client and post to testtopic. Client 1 should not get it because he didn't resume the session and didn't resubscribe + QSignalSpy client1PublishReceivedSpy(client1Session2.first, &MqttClient::publishReceived); + + MqttClient *client2 = connectAndWait("client2"); + + client2->publish("/testtopic", "Hello world"); + + QTest::qWait(500); + + QVERIFY2(client1PublishReceivedSpy.count() == 0, "Client 1 did receive the publish but it should not have."); +} + +void OperationTests::testSessionManagementResumeOldSession() +{ + MqttClient *client1Session1 = connectAndWait("client1"); + client1Session1->setAutoReconnect(false); + + QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribed); + client1Session1->subscribe("/testtopic"); + QTRY_VERIFY(subscribeSpy.count() == 1); + + QSignalSpy disconnectedSpy(client1Session1, &MqttClient::disconnected); + + QPair client1Session2 = connectToServer("client1", false); + if (client1Session2.second->count() == 0) { + client1Session2.second->wait(); + } + QVERIFY2(client1Session2.second->first().at(0).value() == Mqtt::ConnectReturnCodeAccepted, "Session hasn't been accepted."); + QVERIFY2(client1Session2.second->first().at(1).value().testFlag(Mqtt::ConnackFlagSessionPresent), "Session present flag is not set while it should be."); + + QTRY_VERIFY2(disconnectedSpy.count() == 1, "First instance didn't get disconnected when new instance connected."); + + + // Now connect with another client and post to testtopic. Client 1 should not get it because he didn't resume the session and didn't resubscribe + QSignalSpy client1PublishReceivedSpy(client1Session2.first, &MqttClient::publishReceived); + + MqttClient *client2 = connectAndWait("client2"); + + client2->publish("/testtopic", "Hello world"); + + QTRY_VERIFY2(client1PublishReceivedSpy.count() == 1, "Client 1 did not receive the publish but it should have."); +} + +void OperationTests::testSessionManagementFailResumeOldSession() +{ + // try to resume non existing session + QPair client = connectToServer("client1", false); + if (client.second->count() == 0) { + client.second->wait(); + } + QVERIFY2(!client.second->first().at(0).value().testFlag(Mqtt::ConnackFlagSessionPresent), "Session present flag is set while it should not be."); +} + +void OperationTests::testQoS1PublishToServerIsAckedOnSessionResume() +{ + MqttClient *client = connectAndWait("client1", true); + client->setAutoReconnect(true); + + QSignalSpy reconnectedSpy(client, &MqttClient::connected); + + QSignalSpy publishedSpy(client, &MqttClient::published); + client->publish("/testtopic", "Hello world", Mqtt::QoS1); + client->d_ptr->socket->flush(); + client->d_ptr->socket->abort(); + + QVERIFY2(publishedSpy.count() == 0, "Should not have received the PUBACK yet... Test is bad."); + + QTRY_VERIFY2(reconnectedSpy.count() == 1, "client didn't reconnect"); + + QTRY_VERIFY2(publishedSpy.count() == 1, "Published signal not emitted after reconnect"); + +} + +void OperationTests::testQoS1PublishToClientIsDeliveredOnSessionResume() +{ + MqttClient *oldClient1 = connectAndWait("client1", true); + QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribed); + oldClient1->subscribe("/testtopic", Mqtt::QoS1); + QTRY_VERIFY(subscribedSpy.count() == 1); + + // prevent the client from receiving anything + oldClient1->d_ptr->socket->blockSignals(true); + + // pbulish something with a second client + MqttClient *client2 = connectAndWait("client2"); + QSignalSpy publishedSpy(client2, &MqttClient::published); + client2->publish("/testtopic", "Hello world", Mqtt::QoS1); + QTRY_VERIFY(publishedSpy.count() == 1); + + // Resume (take over) old session and make sure we got the publish + MqttClient *newClient1 = new MqttClient("client1", this); + m_clients.append(newClient1); // let cleanupTestcase() clean it up + QSignalSpy publishReceivedSpy(newClient1, &MqttClient::publishReceived); + + newClient1->connectToHost(m_serverHost, m_serverPort, false); + + QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Client did not receive publish packet upon session resume"); +} + +void OperationTests::testQoS2PublishToServerIsCompletedOnSessionResume() +{ + MqttClient *client = connectAndWait("client1", true); + client->setAutoReconnect(true); + + QSignalSpy reconnectedSpy(client, &MqttClient::connected); + + QSignalSpy publishedSpy(client, &MqttClient::published); + client->publish("/testtopic", "Hello world", Mqtt::QoS2); + client->d_ptr->socket->flush(); + client->d_ptr->socket->abort(); + + QVERIFY2(publishedSpy.count() == 0, "Should not have received the PUBACK yet... Test is bad."); + + QTRY_VERIFY2(reconnectedSpy.count() == 1, "client didn't reconnect"); + + QTRY_VERIFY2(publishedSpy.count() == 1, "Published signal not emitted after reconnect"); +} + +void OperationTests::testQoS2PublishToClientIsCompletedOnSessionResume() +{ + MqttClient *oldClient1 = connectAndWait("client1", true); + QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribed); + oldClient1->subscribe("/testtopic", Mqtt::QoS2); + QTRY_VERIFY(subscribedSpy.count() == 1); + + // prevent the client from receiving anything + oldClient1->d_ptr->socket->blockSignals(true); + + // pbulish something with a second client + MqttClient *client2 = connectAndWait("client2"); + QSignalSpy publishedSpy(client2, &MqttClient::published); + client2->publish("/testtopic", "Hello world", Mqtt::QoS2); + QTRY_VERIFY(publishedSpy.count() == 1); + + // Resume (take over) old session and make sure we got the publish + MqttClient *newClient1 = new MqttClient("client1", this); + m_clients.append(newClient1); // let cleanupTestcase() clean it up + QSignalSpy publishReceivedSpy(newClient1, &MqttClient::publishReceived); + + newClient1->connectToHost(m_serverHost, m_serverPort, false); + + QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Client did not receive publish packet upon session resume"); +} + +void OperationTests::testRetain() +{ + MqttClient *client1 = connectAndWait("client1", true); + + // post a retained message + QSignalSpy publishedSpy(client1, &MqttClient::published); + client1->publish("/retaintopic", "Message 1", Mqtt::QoS1, true); + QTRY_VERIFY(publishedSpy.count() == 1); + + // Connect a second client + MqttClient *client2 = connectAndWait("client2"); + + // subscribe to topic and verify we received the retained message + QSignalSpy publishReceivedSpy(client2, &MqttClient::publishReceived); + client2->subscribe("/retaintopic", Mqtt::QoS1); + QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Did not receive retained topic on subscribe."); + QVERIFY2(publishReceivedSpy.first().at(2).toBool() == true, "Retain flag not set"); + + publishReceivedSpy.clear(); + + // Post another retained message from client1 and make sure we receive it + client1->publish("/retaintopic", "Message 2", Mqtt::QoS1, true); + QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Did not receive published meessage."); + QVERIFY2(publishReceivedSpy.first().at(2).toBool() == false, "Retain flag is set"); + + // Disconnect client, and connect again, verify we get 2 retained messages now + disconnectAndWait(client2); + client2 = connectAndWait("client2"); + QSignalSpy publishReceivedSpy2(client2, &MqttClient::publishReceived); + client2->subscribe("/retaintopic", Mqtt::QoS1); + QTRY_VERIFY2(publishReceivedSpy2.count() == 2, "Did not receive retained topic on subscribe."); + QVERIFY2(publishReceivedSpy2.at(0).at(2).toBool() == true, "Retain flag not set"); + QVERIFY2(publishReceivedSpy2.at(1).at(2).toBool() == true, "Retain flag not set"); + + publishReceivedSpy2.clear(); + + // Post a message with 0 paylod, it should be delivered as normal but discard any retained messages + client1->publish("/retaintopic", QByteArray(), Mqtt::QoS1, true); + QTRY_VERIFY2(publishReceivedSpy2.count() == 1, "Did not receive published message."); + QVERIFY2(publishReceivedSpy.first().at(2).toBool() == false, "Retain flag is set"); + + disconnectAndWait(client2); + client2 = connectAndWait("client2"); + QSignalSpy publishReceivedSpy3(client2, &MqttClient::publishReceived); + client2->subscribe("/retaintopic", Mqtt::QoS1); + QTest::qWait(500); + QVERIFY2(publishReceivedSpy3.count() == 0, "Did receive retained messages on subscribe but should not have."); + + // post another 2 retained messages (and some others), reconnect and verify they're there again + client1->publish("/retaintopic", "Message 3", Mqtt::QoS1, true); + client1->publish("/retaintopic", "Message 4", Mqtt::QoS1, false); + client1->publish("/retaintopic", "Message 5", Mqtt::QoS1, false); + client1->publish("/retaintopic", "Message 6", Mqtt::QoS1, true); + client1->publish("/retaintopic", "Message 7", Mqtt::QoS1, false); + QTRY_VERIFY(publishReceivedSpy3.count() == 5); + + disconnectAndWait(client2); + client2 = connectAndWait("client2"); + QSignalSpy publishReceivedSpy4(client2, &MqttClient::publishReceived); + client2->subscribe("/retaintopic", Mqtt::QoS1); + QTRY_VERIFY2(publishReceivedSpy4.count() == 2, "Did not receive retained messages."); + + publishReceivedSpy4.clear(); + + // post a QoS0 message to this topic. it should discard previously retained messages but stay retained + client1->publish("/retaintopic", "Message 8", Mqtt::QoS0, true); + QTRY_VERIFY2(publishReceivedSpy4.count() == 1, "Did not receive retained messages."); + + disconnectAndWait(client2); + client2 = connectAndWait("client2"); + QSignalSpy publishReceivedSpy5(client2, &MqttClient::publishReceived); + client2->subscribe("/retaintopic", Mqtt::QoS1); + QTRY_VERIFY2(publishReceivedSpy5.count() == 1, "Did not receive exactly 1 retained message."); +} + +void OperationTests::testUnsubscribe() +{ + MqttClient *client1 = connectAndWait("client1"); + QVERIFY(subscribeAndWait(client1, "testtopic")); + + QSignalSpy publishReceivedSpy(client1, &MqttClient::publishReceived); + + MqttClient *client2 = connectAndWait("client2"); + client2->publish("testtopic", "Hello world"); + + QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Did not receive publish message"); + + QSignalSpy unsubscribedSpy(client1, &MqttClient::unsubscribed); + QSignalSpy serverSideUnsubscribedSpy(m_server, &MqttServer::clientUnsubscribed); + + quint16 packetId = client1->unsubscribe("testtopic"); + + QTRY_VERIFY2(serverSideUnsubscribedSpy.count() == 1, "Server side unsubscribed signal not received"); + QVERIFY2(serverSideUnsubscribedSpy.first().at(0).toString() == "client1", "ClientId not matching"); + QVERIFY2(serverSideUnsubscribedSpy.first().at(1).toString() == "testtopic", "topicFilter not matching"); + + QTRY_VERIFY2(unsubscribedSpy.count() == 1, "Unsubscibed signal not emitted"); + QVERIFY2(unsubscribedSpy.first().at(0).toInt() == packetId, "packet id not matching"); + + publishReceivedSpy.clear(); + + client2->publish("testtopic", "Hello world 2"); + + QTest::qWait(500); + QVERIFY2(publishReceivedSpy.count() == 0, "Received publish packet even though we should not have"); +} + +void OperationTests::testEmptyClientId() +{ + MqttClient *client1 = connectAndWait(""); + QVERIFY2(client1->isConnected(), "Client did not connect"); + + MqttClient *client2 = connectAndWait(""); + QVERIFY2(client2->isConnected(), "Client did not connect"); + + QPair client3 = connectToServer("", false); + QTRY_VERIFY2(client3.second->count() == 1, "Client did not emit connected signal"); + QTRY_COMPARE(client3.second->first().at(0).value(), Mqtt::ConnectReturnCodeIdentifierRejected); +} + + +QTEST_MAIN(OperationTests) +#include "test_operation.moc" diff --git a/tests/tests.pro b/tests/tests.pro new file mode 100644 index 0000000..70eb65a --- /dev/null +++ b/tests/tests.pro @@ -0,0 +1,3 @@ +TEMPLATE = subdirs +SUBDIRS += operation +