nymea-mqtt implementation

This commit is contained in:
Michael Zanetti 2018-11-13 00:54:32 +01:00
commit 7be28b8e44
22 changed files with 3140 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*.user

View File

@ -0,0 +1,22 @@
QT -= gui
QT += network
CONFIG += c++11 console static
CONFIG -= app_bundle
SOURCES += \
mqttserver.cpp \
mqttpacket.cpp \
mqttsubscription.cpp \
$$PWD/mqttclient.cpp
HEADERS += \
mqttserver.h \
mqttpacket.h \
mqtt.h \
mqttsubscription.h \
$$PWD/mqttclient.h \
$$PWD/mqttpacket_p.h \
$$PWD/mqttclient_p.h \
$$PWD/mqttserver_p.h

View File

@ -0,0 +1,4 @@
TEMPLATE = lib
TARGET = nymea-mqtt
include(libnymea-mqtt.pri)

62
libnymea-mqtt/mqtt.h Normal file
View File

@ -0,0 +1,62 @@
#ifndef MQTT_H
#define MQTT_H
#include <QObject>
namespace Mqtt {
enum Protocol {
ProtocolUnknown = 0x00,
Protocol311 = 0x04
};
enum QoS {
QoS0 = 0x00,
QoS1 = 0x01,
QoS2 = 0x02
};
enum ConnectFlag {
ConnectFlagNone = 0x00,
ConnectFlagCleanSession = 0x02,
ConnectFlagWill = 0x04,
ConnectFlagWillQoS1 = 0x08,
ConnectFlagWillQoS2 = 0x10,
ConnectFlagWillRetain = 0x20,
ConnectFlagPassword = 0x40,
ConnectFlagUsername = 0x80
};
Q_DECLARE_FLAGS(ConnectFlags, ConnectFlag)
enum ConnackFlag {
ConnackFlagNone = 0x0,
ConnackFlagSessionPresent = 0x1
};
Q_DECLARE_FLAGS(ConnackFlags, ConnackFlag)
enum ConnectReturnCode {
ConnectReturnCodeAccepted = 0x00,
ConnectReturnCodeUnacceptableProtocolVersion = 0x01,
ConnectReturnCodeIdentifierRejected = 0x02,
ConnectReturnCodeServerUnavailable = 0x03,
ConnectReturnCodeBadUsernameOrPassword = 0x04,
ConnectReturnCodeNotAuthorized = 0x05
};
enum SubscribeReturnCode {
SubscribeReturnCodeSuccessQoS0 = 0x00,
SubscribeReturnCodeSuccessQoS1 = 0x01,
SubscribeReturnCodeSuccessQoS2 = 0x02,
SubscribeReturnCodeFailure = 0x80
};
typedef QList<SubscribeReturnCode> SubscribeReturnCodes;
};
Q_DECLARE_METATYPE(Mqtt::QoS)
Q_DECLARE_METATYPE(Mqtt::ConnectFlags)
Q_DECLARE_METATYPE(Mqtt::ConnackFlags)
Q_DECLARE_METATYPE(Mqtt::ConnectReturnCode)
Q_DECLARE_METATYPE(Mqtt::SubscribeReturnCode)
Q_DECLARE_METATYPE(Mqtt::SubscribeReturnCodes)
#endif // MQTT_H

View File

@ -0,0 +1,395 @@
#include "mqttclient.h"
#include "mqttclient_p.h"
#include "mqttpacket.h"
Q_LOGGING_CATEGORY(dbgClient, "nymea.mqtt.client")
MqttClientPrivate::MqttClientPrivate(MqttClient *q):
QObject(q),
q_ptr(q)
{
qRegisterMetaType<Mqtt::SubscribeReturnCodes>();
qRegisterMetaType<Mqtt::ConnackFlags>();
}
void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, bool cleanSession)
{
serverHostname = hostName;
serverPort = port;
this->cleanSession = cleanSession;
sessionActive = true;
if (socket) {
socket->abort();
socket->deleteLater();
}
socket = new QTcpSocket(this);
connect(socket, &QTcpSocket::connected, this, &MqttClientPrivate::onConnected);
connect(socket, &QTcpSocket::disconnected, this, &MqttClientPrivate::onDisconnected);
connect(socket, &QTcpSocket::readyRead, this, &MqttClientPrivate::onReadyRead);
connect(socket, &QTcpSocket::stateChanged, this, &MqttClientPrivate::onSocketStateChanged);
// connect(d_ptr->socket, &QTcpSocket::error, this, &MqttClient::error);
socket->connectToHost(hostName, port);
}
void MqttClientPrivate::disconnectFromHost()
{
sessionActive = false;
if (!socket || !socket->isOpen()) {
return;
}
MqttPacket packet(MqttPacket::TypeDisconnect);
socket->write(packet.serialize());
socket->flush();
socket->disconnectFromHost();
}
MqttClient::MqttClient(const QString &clientId, QObject *parent):
QObject(parent),
d_ptr(new MqttClientPrivate(this))
{
d_ptr->clientId = clientId;
}
MqttClient::MqttClient(const QString &clientId, quint16 keepAlive, const QString &willTopic, const QByteArray &willMessage, Mqtt::QoS willQoS, bool willRetain, QObject *parent):
QObject(parent),
d_ptr(new MqttClientPrivate(this))
{
d_ptr->clientId = clientId;
d_ptr->keepAlive = keepAlive;
d_ptr->willTopic = willTopic;
d_ptr->willMessage = willMessage;
d_ptr->willQoS = willQoS;
d_ptr->willRetain = willRetain;
if (keepAlive > 0) {
connect(&d_ptr->keepAliveTimer, &QTimer::timeout, d_ptr, &MqttClientPrivate::sendPingreq);
}
}
bool MqttClient::autoReconnect() const
{
return d_ptr->autoReconnect;
}
void MqttClient::setAutoReconnect(bool autoReconnect)
{
d_ptr->autoReconnect = autoReconnect;
}
void MqttClient::setKeepAlive(quint16 keepAlive)
{
d_ptr->keepAlive = keepAlive;
}
QString MqttClient::willTopic() const
{
return d_ptr->willTopic;
}
void MqttClient::setWillTopic(const QString &willTopic)
{
d_ptr->willTopic = willTopic;
}
QByteArray MqttClient::willMessage() const
{
return d_ptr->willMessage;
}
void MqttClient::setWillMessage(const QByteArray &willMessage)
{
d_ptr->willMessage = willMessage;
}
Mqtt::QoS MqttClient::willQoS() const
{
return d_ptr->willQoS;
}
void MqttClient::setWillQoS(Mqtt::QoS willQoS)
{
d_ptr->willQoS = willQoS;
}
bool MqttClient::willRetain() const
{
return d_ptr->willRetain;
}
void MqttClient::setWillRetain(bool willRetain)
{
d_ptr->willRetain = willRetain;
}
QString MqttClient::username() const
{
return d_ptr->username;
}
void MqttClient::setUsername(const QString &username)
{
d_ptr->username = username;
}
QString MqttClient::password() const
{
return d_ptr->password;
}
void MqttClient::setPassword(const QString &password)
{
d_ptr->password = password;
}
void MqttClient::connectToHost(const QString &hostName, quint16 port, bool cleanSession)
{
d_ptr->connectToHost(hostName, port, cleanSession);
}
void MqttClient::disconnectFromHost()
{
d_ptr->disconnectFromHost();
}
bool MqttClient::isConnected() const
{
return d_ptr->socket && d_ptr->socket->state() == QAbstractSocket::ConnectedState && d_ptr->keepAliveTimer.isActive();
}
quint16 MqttClient::subscribe(const MqttSubscription &subscription)
{
MqttSubscriptions subscriptions = {subscription};
return subscribe(subscriptions);
}
quint16 MqttClient::subscribe(const QString &topicFilter, Mqtt::QoS qos)
{
MqttSubscription subscription(topicFilter.toUtf8(), qos);
return subscribe(subscription);
}
quint16 MqttClient::subscribe(const MqttSubscriptions &subscriptions)
{
MqttPacket packet(MqttPacket::TypeSubscribe, d_ptr->newPacketId());
packet.setSubscriptions(subscriptions);
d_ptr->unackedPackets.insert(packet.packetId(), packet);
d_ptr->unackedPacketList.append(packet.packetId());
d_ptr->socket->write(packet.serialize());
return packet.packetId();
}
quint16 MqttClient::unsubscribe(const MqttSubscription &subscription)
{
MqttSubscriptions subscriptions = {subscription};
return unsubscribe(subscriptions);
}
quint16 MqttClient::unsubscribe(const QString &topicFilter)
{
return unsubscribe(MqttSubscription(topicFilter.toUtf8(), Mqtt::QoS0));
}
quint16 MqttClient::unsubscribe(const MqttSubscriptions &subscriptions)
{
MqttPacket packet(MqttPacket::TypeUnsubscribe, d_ptr->newPacketId());
packet.setSubscriptions(subscriptions);
d_ptr->unackedPackets.insert(packet.packetId(), packet);
d_ptr->unackedPacketList.append(packet.packetId());
d_ptr->socket->write(packet.serialize());
return packet.packetId();
}
quint16 MqttClient::publish(const QString &topic, const QByteArray &payload, Mqtt::QoS qos, bool retain)
{
quint16 packetId = qos >= Mqtt::QoS1 ? d_ptr->newPacketId() : 0;
MqttPacket packet(MqttPacket::TypePublish, packetId, qos, retain, false);
packet.setTopic(topic.toUtf8());
packet.setPayload(payload);
d_ptr->socket->write(packet.serialize());
if (qos == Mqtt::QoS0) {
QTimer::singleShot(0, this, [this, packetId](){
emit published(packetId);
});
} else {
d_ptr->unackedPackets.insert(packet.packetId(), packet);
d_ptr->unackedPacketList.append(packetId);
}
return packetId;
}
void MqttClientPrivate::onConnected()
{
MqttPacket packet(MqttPacket::TypeConnect);
packet.setProtocolLevel(Mqtt::Protocol311);
packet.setCleanSession(cleanSession);
packet.setKeepAlive(keepAlive);
packet.setClientId(clientId.toUtf8());
packet.setWillTopic(willTopic.toUtf8());
packet.setWillMessage(willMessage);
packet.setWillQoS(willQoS);
packet.setWillRetain(willRetain);
packet.setUsername(username.toUtf8());
packet.setPassword(password.toUtf8());
socket->write(packet.serialize());
}
void MqttClientPrivate::onDisconnected()
{
qCDebug(dbgClient) << "Disconnected from server";
emit q_ptr->disconnected();
if (sessionActive && autoReconnect) {
connectToHost(serverHostname, serverPort, cleanSession);
}
}
void MqttClientPrivate::onReadyRead()
{
static QByteArray data;
data.append(socket->readAll());
// qCDebug(dbgClient) << "Received data from server:" << data.toHex();
MqttPacket packet;
int ret = packet.parse(data);
if (ret == -1) {
qCDebug(dbgClient) << "Bad data from server. Dropping connection.";
data.clear();
socket->abort();
return;
}
if (ret == 0) {
qCDebug(dbgClient) << "Not enough data from server...";
return;
}
data.remove(0, ret);
switch (packet.type()) {
case MqttPacket::TypeConnack:
emit q_ptr->connected(packet.connectReturnCode(), packet.connackFlags());
if (packet.connectReturnCode() != Mqtt::ConnectReturnCodeAccepted) {
qCWarning(dbgClient) << "MQTT connection refused:" << packet.connectReturnCode();
socket->abort();
emit q_ptr->disconnected();
return;
}
foreach (quint16 retryPacketId, unackedPacketList) {
MqttPacket retryPacket = unackedPackets.value(retryPacketId);
retryPacket.setDup(true);
socket->write(retryPacket.serialize());
}
restartKeepAliveTimer();
break;
case MqttPacket::TypePublish:
qCDebug(dbgClient) << "Publish received from server. Topic:" << packet.topic() << "Payload:" << packet.payload() << "QoS:" << packet.qos();
switch (packet.qos()) {
case Mqtt::QoS0:
emit q_ptr->publishReceived(packet.topic(), packet.payload(), packet.retain());
break;
case Mqtt::QoS1: {
emit q_ptr->publishReceived(packet.topic(), packet.payload(), packet.retain());
MqttPacket response(MqttPacket::TypePuback, packet.packetId());
socket->write(response.serialize());
break;
}
case Mqtt::QoS2: {
if (!packet.dup() && unackedPacketList.contains(packet.packetId())) {
// Hmm... Server says it's not a duplicate, but packet id is not released yet... Drop connection.
socket->disconnectFromHost();
return;
}
MqttPacket response(MqttPacket::TypePubrec, packet.packetId());
if (!unackedPacketList.contains(packet.packetId())) {
unackedPackets.insert(packet.packetId(), response);
unackedPacketList.append(packet.packetId());
emit q_ptr->publishReceived(packet.topic(), packet.payload(), packet.retain());
}
socket->write(response.serialize());
break;
}
}
break;
case MqttPacket::TypePuback:
unackedPackets.remove(packet.packetId());
unackedPacketList.removeAll(packet.packetId());
emit q_ptr->published(packet.packetId());
restartKeepAliveTimer();
break;
case MqttPacket::TypePubrec: {
MqttPacket response(MqttPacket::TypePubrel, packet.packetId());
unackedPackets[packet.packetId()] = response;
socket->write(response.serialize());
restartKeepAliveTimer();
break;
}
case MqttPacket::TypePubrel: {
MqttPacket response(MqttPacket::TypePubcomp, packet.packetId());
unackedPackets[packet.packetId()] = response;
socket->write(response.serialize());
restartKeepAliveTimer();
break;
}
case MqttPacket::TypePubcomp:
unackedPackets.remove(packet.packetId());
unackedPacketList.removeAll(packet.packetId());
emit q_ptr->published(packet.packetId());
restartKeepAliveTimer();
break;
case MqttPacket::TypeSuback:
unackedPackets.remove(packet.packetId());
unackedPacketList.removeAll(packet.packetId());
emit q_ptr->subscribed(packet.packetId(), packet.subscribeReturnCodes());
restartKeepAliveTimer();
break;
case MqttPacket::TypeUnsuback:
if (!unackedPackets.contains(packet.packetId())) {
qCWarning(dbgClient) << "UNSUBACK received but not waiting for it. Dropping connection. Packet ID:" << packet.packetId();
socket->abort();
return;
}
unackedPackets.remove(packet.packetId());
unackedPacketList.removeAll(packet.packetId());
emit q_ptr->unsubscribed(packet.packetId());
restartKeepAliveTimer();
break;
case MqttPacket::TypePingresp:
break;
default:
qCDebug(dbgClient).noquote().nospace() << "Unhandled packet type: 0x" << QString::number(packet.type(), 16);
Q_ASSERT(false);
}
if (!data.isEmpty()) {
onReadyRead();
}
}
void MqttClientPrivate::onSocketStateChanged(QAbstractSocket::SocketState socketState)
{
emit q_ptr->stateChanged(socketState);
}
quint16 MqttClientPrivate::newPacketId()
{
static quint16 packetId = 1;
do {
packetId++;
} while (unackedPacketList.contains(packetId));
return packetId;
}
void MqttClientPrivate::sendPingreq()
{
MqttPacket packet(MqttPacket::TypePingreq);
socket->write(packet.serialize());
}
void MqttClientPrivate::restartKeepAliveTimer()
{
if (keepAlive > 0) {
keepAliveTimer.start(keepAlive * 1000);
}
}

View File

@ -0,0 +1,75 @@
#ifndef MQTTCLIENT_H
#define MQTTCLIENT_H
#include <QObject>
#include <QAbstractSocket>
#include "mqttpacket.h"
#include "mqttsubscription.h"
class MqttClientPrivate;
class MqttClient : public QObject
{
Q_OBJECT
public:
explicit MqttClient(const QString &clientId, QObject *parent = nullptr);
explicit MqttClient(const QString &clientId, quint16 keepAlive = 300, const QString &willTopic = QString(), const QByteArray &willMessage = QByteArray(), Mqtt::QoS willQoS = Mqtt::QoS0, bool willRetain = false, QObject *parent = nullptr);
bool autoReconnect() const;
void setAutoReconnect(bool autoReconnect);
quint16 keepAlive() const;
void setKeepAlive(quint16 keepAlive);
QString willTopic() const;
void setWillTopic(const QString &willTopic);
QByteArray willMessage() const;
void setWillMessage(const QByteArray &willMessage);
Mqtt::QoS willQoS() const;
void setWillQoS(Mqtt::QoS willQoS);
bool willRetain() const;
void setWillRetain(bool willRetain);
QString username() const;
void setUsername(const QString &username);
QString password() const;
void setPassword(const QString &password);
void connectToHost(const QString &hostName, quint16 port, bool cleanSession = true);
void disconnectFromHost();
bool isConnected() const;
public slots:
quint16 subscribe(const MqttSubscription &subscription);
quint16 subscribe(const QString &topciFilter, Mqtt::QoS qos = Mqtt::QoS0);
quint16 subscribe(const MqttSubscriptions &subscriptions);
quint16 unsubscribe(const MqttSubscription &subscription);
quint16 unsubscribe(const QString &topicFilter);
quint16 unsubscribe(const MqttSubscriptions &subscriptions);
quint16 publish(const QString &topic, const QByteArray &payload, Mqtt::QoS qos = Mqtt::QoS0, bool retain = false);
signals:
void connected(Mqtt::ConnectReturnCode connectReturnCode, Mqtt::ConnackFlags connackFlags);
void disconnected();
void stateChanged(QAbstractSocket::SocketState state);
void error(QAbstractSocket::SocketError socketError);
void subscribed(quint16 packetId, const Mqtt::SubscribeReturnCodes &subscribeReturnCodes);
void unsubscribed(quint16 packetId);
void published(quint16 packetId);
void publishReceived(const QString &topic, const QByteArray &payload, bool retained);
private:
MqttClientPrivate *d_ptr;
friend class OperationTests;
};
#endif // MQTTCLIENT_H

View File

@ -0,0 +1,58 @@
#ifndef MQTTCLIENT_P_H
#define MQTTCLIENT_P_H
#include <QObject>
#include <QTcpSocket>
#include <QTimer>
#include <QLoggingCategory>
#include "mqttpacket.h"
#include "mqttclient.h"
#include "mqttsubscription.h"
Q_DECLARE_LOGGING_CATEGORY(dbgClient)
class MqttClientPrivate: public QObject
{
Q_OBJECT
public:
MqttClientPrivate(MqttClient *q);
MqttClient *q_ptr;
~MqttClientPrivate() { qDebug() << "destroying client private" << this; }
void connectToHost(const QString &hostName, quint16 port, bool cleanSession);
void disconnectFromHost();
public slots:
void onConnected();
void onDisconnected();
void onReadyRead();
void onSocketStateChanged(QAbstractSocket::SocketState socketState);
quint16 newPacketId();
void sendPingreq();
void restartKeepAliveTimer();
public:
QString serverHostname;
quint16 serverPort = 0;
bool autoReconnect = true;
bool sessionActive = false;
bool cleanSession = true;
QTcpSocket *socket = nullptr;
QString clientId;
quint16 keepAlive;
QTimer keepAliveTimer;
QString willTopic;
QByteArray willMessage;
Mqtt::QoS willQoS = Mqtt::QoS0;
bool willRetain = false;
QString username;
QString password;
QVector<quint16> unackedPacketList;
QHash<quint16, MqttPacket> unackedPackets;
};
#endif // MQTTCLIENT_P_H

View File

@ -0,0 +1,741 @@
#include "mqttpacket.h"
#include "mqttpacket_p.h"
#include <QDebug>
#include <QDataStream>
Q_LOGGING_CATEGORY(dbgProto, "nymea.mqtt.protocol")
#define ASSERT_LEN(a, name) if (remainingLength < a) { qCWarning(dbgProto) << "Bad" << name << "packet. Data too short."; return -1; }
#define VERIFY_LEN(a, name) if (remainingLength != a) { qCWarning(dbgProto) << "Bad" << name << "packet. Data length unexpected."; return -1; }
MqttPacket::MqttPacket():
d_ptr(new MqttPacketPrivate(this))
{
}
MqttPacket::MqttPacket(MqttPacket::Type type, quint16 packetId, Mqtt::QoS qos, bool retain, bool dup):
d_ptr(new MqttPacketPrivate(this))
{
d_ptr->packetId = packetId;
d_ptr->header = type;
switch (type) {
case TypeConnect:
case TypeConnack:
case TypePuback:
case TypePubrec:
case TypePubcomp:
case TypeSuback:
case TypeUnsuback:
case TypePingreq:
case TypePingresp:
case TypeDisconnect:
break;
case TypePublish:
setDup(dup);
setQoS(qos);
setRetain(retain);
break;
case TypeSubscribe:
case TypePubrel:
case TypeUnsubscribe:
setQoS(Mqtt::QoS1);
break;
}
}
MqttPacket::Type MqttPacket::type() const
{
return static_cast<Type>(d_ptr->header & 0xF0);
}
bool MqttPacket::dup() const
{
return d_ptr->header & 0x08;
}
void MqttPacket::setDup(bool dup)
{
if (dup) {
d_ptr->header |= 0x08;
} else {
d_ptr->header &= 0xf7;
}
}
Mqtt::QoS MqttPacket::qos() const
{
return static_cast<Mqtt::QoS>((d_ptr->header & 0x06) >> 1);
}
void MqttPacket::setQoS(Mqtt::QoS qoS)
{
d_ptr->header &= 0xf9;
d_ptr->header |= (qoS << 1);
}
bool MqttPacket::retain() const
{
return d_ptr->header & 0x01;
}
void MqttPacket::setRetain(bool retain)
{
if (retain) {
d_ptr->header = d_ptr->header | 0x01;
} else {
d_ptr->header = d_ptr->header & 0xfe;
}
}
void MqttPacket::setCleanSession(bool cleanSession)
{
d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagCleanSession, cleanSession);
}
bool MqttPacket::cleanSession() const
{
return d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagCleanSession);
}
Mqtt::ConnectFlags MqttPacket::connectFlags() const
{
return d_ptr->connectFlags;
}
QByteArray MqttPacket::protocolName() const
{
return d_ptr->protocolName;
}
Mqtt::Protocol MqttPacket::protocolLevel() const
{
return d_ptr->protocolLevel;
}
void MqttPacket::setProtocolLevel(Mqtt::Protocol protocolLevel)
{
d_ptr->protocolLevel = protocolLevel;
}
QByteArray MqttPacket::clientId() const
{
return d_ptr->clientId;
}
void MqttPacket::setClientId(const QByteArray &clientId)
{
d_ptr->clientId = clientId;
}
QByteArray MqttPacket::willTopic() const
{
return d_ptr->willTopic;
}
void MqttPacket::setWillTopic(const QByteArray &willTopic)
{
d_ptr->willTopic = willTopic;
d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagWill, !willTopic.isEmpty());
}
QByteArray MqttPacket::willMessage() const
{
return d_ptr->willMessage;
}
void MqttPacket::setWillMessage(const QByteArray &willMessage)
{
d_ptr->willMessage = willMessage;
}
Mqtt::QoS MqttPacket::willQoS() const
{
if (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWillQoS2)) {
return Mqtt::QoS2;
}
if (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWillQoS1)) {
return Mqtt::QoS1;
}
return Mqtt::QoS0;
}
void MqttPacket::setWillQoS(Mqtt::QoS willQoS)
{
d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagWillQoS1, willQoS == Mqtt::QoS1);
d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagWillQoS2, willQoS == Mqtt::QoS2);
}
bool MqttPacket::willRetain() const
{
return d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWillRetain);
}
void MqttPacket::setWillRetain(bool willRetain)
{
d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagWillRetain, willRetain);
}
QByteArray MqttPacket::username() const
{
return d_ptr->username;
}
void MqttPacket::setUsername(const QByteArray &username)
{
d_ptr->username = username;
d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagUsername, !username.isEmpty());
}
QByteArray MqttPacket::password() const
{
return d_ptr->password;
}
void MqttPacket::setPassword(const QByteArray &password)
{
d_ptr->password = password;
d_ptr->connectFlags.setFlag(Mqtt::ConnectFlagPassword, !password.isEmpty());
}
quint16 MqttPacket::keepAlive() const
{
return d_ptr->keepAlive;
}
void MqttPacket::setKeepAlive(quint16 keepAlive)
{
d_ptr->keepAlive = keepAlive;
}
Mqtt::ConnectReturnCode MqttPacket::connectReturnCode() const
{
return d_ptr->connectReturnCode;
}
void MqttPacket::setConnectReturnCode(Mqtt::ConnectReturnCode connectReturnCode)
{
d_ptr->connectReturnCode = connectReturnCode;
}
Mqtt::ConnackFlags MqttPacket::connackFlags() const
{
return d_ptr->connackFlags;
}
void MqttPacket::setConnackFlags(Mqtt::ConnackFlags connackFlags)
{
d_ptr->connackFlags = connackFlags;
}
quint16 MqttPacket::packetId() const
{
return d_ptr->packetId;
}
void MqttPacket::setPacketId(quint16 packetId)
{
d_ptr->packetId = packetId;
}
QByteArray MqttPacket::topic() const
{
return d_ptr->topic;
}
void MqttPacket::setTopic(const QByteArray &topic)
{
d_ptr->topic = topic;
}
QByteArray MqttPacket::payload() const
{
return d_ptr->payload;
}
void MqttPacket::setPayload(const QByteArray &payload)
{
d_ptr->payload = payload;
}
MqttSubscriptions MqttPacket::subscriptions() const
{
return d_ptr->subscriptions;
}
void MqttPacket::setSubscriptions(const MqttSubscriptions &subscriptions)
{
d_ptr->subscriptions = subscriptions;
}
void MqttPacket::addSubscription(const MqttSubscription &subscription)
{
d_ptr->subscriptions.append(subscription);
}
Mqtt::SubscribeReturnCodes MqttPacket::subscribeReturnCodes() const
{
return d_ptr->subscribeReturnCodes;
}
void MqttPacket::setSubscribeReturnCodes(const Mqtt::SubscribeReturnCodes subscribeReturnCodes)
{
d_ptr->subscribeReturnCodes = subscribeReturnCodes;
}
void MqttPacket::addSubscribeReturnCode(Mqtt::SubscribeReturnCode subscribeReturnCode)
{
d_ptr->subscribeReturnCodes.append(subscribeReturnCode);
}
int MqttPacket::parse(const QByteArray &buffer)
{
if (buffer.length() < 2) {
return 0;
}
QDataStream inputStream(buffer);
qCDebug(dbgProto()) << "MQTT input data:\n" << buffer.toHex();
inputStream >> d_ptr->header;
quint16 remainingLength = 0;
int multiplier = 1;
quint8 lengthBit;
quint8 lenFields = 0;
do {
inputStream >> lengthBit;
remainingLength += (lengthBit & 0x7F) * multiplier;
multiplier *= 128;
lenFields++;
} while((lengthBit & 0x80) != 0);
if (remainingLength > buffer.length() - 1 - lenFields) {
qCDebug(dbgProto) << "Cannot process MQTT packet. Remaining Length field larger than input data size:" << remainingLength << ">" << (buffer.length() - 1 - lenFields);
return 0;
}
if (!d_ptr->verifyHeaderFlags()) {
qCDebug(dbgProto) << "Bad MQTT packet. Fixed header flags invalid.";
return -1;
}
const quint16 fullRemainingLength = remainingLength;
quint16 strLen;
const quint16 MAX_STRLEN = 256;
char str[MAX_STRLEN];
switch (type()) {
case TypeConnect: {
ASSERT_LEN(2, "CONNECT");
inputStream >> strLen;
remainingLength -= 2;
ASSERT_LEN(strLen, "CONNECT");
memset(str, 0, MAX_STRLEN);
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
remainingLength -= strLen;
d_ptr->protocolName = QByteArray(str);
ASSERT_LEN(6, "CONNECT");
quint8 pl;
inputStream >> pl;
d_ptr->protocolLevel = static_cast<Mqtt::Protocol>(pl);
remainingLength -= 1;
quint8 cF;
inputStream >> cF;
remainingLength -= 1;
d_ptr->connectFlags = static_cast<Mqtt::ConnectFlags>(cF);
inputStream >> d_ptr->keepAlive;
remainingLength -= 2;
inputStream >> strLen;
remainingLength -= 2;
ASSERT_LEN(strLen, "CONNECT");
memset(str, 0, MAX_STRLEN);
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
remainingLength -= strLen;
d_ptr->clientId = QByteArray(str);
if (connectFlags().testFlag(Mqtt::ConnectFlagWill)) {
ASSERT_LEN(2, "CONNECT");
inputStream >> strLen;
remainingLength -= 2;
ASSERT_LEN(strLen, "CONNECT");
memset(str, 0, MAX_STRLEN);
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
remainingLength -= strLen;
d_ptr->willTopic = QByteArray(str);
ASSERT_LEN(2, "CONNECT");
inputStream >> strLen;
remainingLength -= 2;
ASSERT_LEN(strLen, "CONNECT");
memset(str, 0, MAX_STRLEN);
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
remainingLength -= strLen;
d_ptr->willMessage = QByteArray(str);
} else {
if (willRetain() || willQoS() != Mqtt::QoS0) {
qCWarning(dbgProto) << "Bad CONNECT packet. Will flag not set but WillQoS or WillRetain set.";
return -1;
}
}
if (connectFlags().testFlag(Mqtt::ConnectFlagUsername)) {
ASSERT_LEN(2, "CONNECT");
inputStream >> strLen;
remainingLength -= 2;
ASSERT_LEN(strLen, "CONNECT");
memset(str, 0, MAX_STRLEN);
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
remainingLength -= strLen;
d_ptr->username = QByteArray(str);
} else {
if (connectFlags().testFlag(Mqtt::ConnectFlagPassword)) {
qCWarning(dbgProto) << "Bad CONNECT packet. Username flag not set but password is set.";
return -1;
}
}
if (connectFlags().testFlag(Mqtt::ConnectFlagPassword)) {
ASSERT_LEN(2, "CONNECT");
inputStream >> strLen;
remainingLength -= 2;
ASSERT_LEN(strLen, "CONNECT");
memset(str, 0, MAX_STRLEN);
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
remainingLength -= strLen;
d_ptr->password = QByteArray(str);
}
VERIFY_LEN(0, "CONNECT");
break;
}
case TypeConnack: {
VERIFY_LEN(2, "CONNACK");
quint8 connackFlags;
inputStream >> connackFlags;
remainingLength -= 1;
d_ptr->connackFlags = static_cast<Mqtt::ConnackFlags>(connackFlags);
quint8 connectReturnCode;
inputStream >> connectReturnCode;
d_ptr->connectReturnCode = static_cast<Mqtt::ConnectReturnCode>(connectReturnCode);
remainingLength -= 1;
VERIFY_LEN(0, "CONNACK");
break;
}
case TypePublish: {
ASSERT_LEN(2, "PUBLISH");
inputStream >> strLen;
remainingLength -= 2;
ASSERT_LEN(strLen, "PUBLISH");
memset(str, 0, MAX_STRLEN);
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
remainingLength -= strLen;
d_ptr->topic = QByteArray(str);
if (qos() == Mqtt::QoS1 || qos() == Mqtt::QoS2) {
ASSERT_LEN(2, "PUBLISH");
inputStream >> d_ptr->packetId;
remainingLength -= 2;
}
memset(str, 0, MAX_STRLEN);
inputStream.readRawData(str, qMin(MAX_STRLEN, remainingLength));
d_ptr->payload = QByteArray(str);
break;
}
case TypePuback:
VERIFY_LEN(2, "PUBACK");
inputStream >> d_ptr->packetId;
break;
case TypePubrec:
VERIFY_LEN(2, "PUBREC");
inputStream >> d_ptr->packetId;
break;
case TypePubrel:
VERIFY_LEN(2, "PUBREL");
inputStream >> d_ptr->packetId;
break;
case TypePubcomp:
VERIFY_LEN(2, "PUBCOMP");
inputStream >> d_ptr->packetId;
break;
case TypeSubscribe: {
ASSERT_LEN(2, "SUBSCRIBE");
inputStream >> d_ptr->packetId;
remainingLength -= 2;
if (remainingLength == 0) {
qCWarning(dbgProto) << "Bad SUBSCRIBE packet. Subscription filter in payload missing.";
return -1;
}
while (remainingLength > 0) {
ASSERT_LEN(2, "SUBSCRIBE");
inputStream >> strLen;
remainingLength -= 2;
ASSERT_LEN(strLen, "SUBSCRIBE");
memset(str, 0, MAX_STRLEN);
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
remainingLength -= strLen;
MqttSubscription subscription;
subscription.setTopicFilter(QByteArray(str));
ASSERT_LEN(1, "SUBSCRIBE");
quint8 requestedQoS;
inputStream >> requestedQoS;
remainingLength -= 1;
if ((requestedQoS & 0xFC) != 0x00) {
qCWarning(dbgProto) << "Bad SUBSCRIBE packet. Reserved bits set in requested QoS field.";
return -1;
}
if ((requestedQoS & 0x03) == 0x03) {
qCWarning(dbgProto) << "Bad SUBSCRIBE packet. QoS cannot be QoS1 and QoS2 at the same time.";
return -1;
}
subscription.setQoS(static_cast<Mqtt::QoS>(requestedQoS));
d_ptr->subscriptions.append(subscription);
}
break;
}
case TypeSuback:
ASSERT_LEN(3, "SUBACK");
inputStream >> d_ptr->packetId;
remainingLength -= 2;
while (remainingLength > 0) {
quint8 subscribeReturnCode;
inputStream >> subscribeReturnCode;
remainingLength -= 1;
d_ptr->subscribeReturnCodes.append(static_cast<Mqtt::SubscribeReturnCode>(subscribeReturnCode));
}
break;
case TypeUnsubscribe: {
ASSERT_LEN(5, "UNSUBSCRIBE");
inputStream >> d_ptr->packetId;
remainingLength -= 2;
while (remainingLength > 0) {
ASSERT_LEN(2, "UNSUBSCRIBE");
inputStream >> strLen;
remainingLength -= 2;
ASSERT_LEN(strLen, "UNSUBSCRIBE");
memset(str, 0, MAX_STRLEN);
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
remainingLength -= strLen;
MqttSubscription subscription;
subscription.setTopicFilter(QByteArray(str));
d_ptr->subscriptions.append(subscription);
}
}
break;
case TypeUnsuback:
VERIFY_LEN(2, "UNSUBACK");
inputStream >> d_ptr->packetId;
break;
case TypePingreq:
VERIFY_LEN(0, "PINGREC");
break;
case TypePingresp:
VERIFY_LEN(0, "PINGRESP");
break;
case TypeDisconnect:
VERIFY_LEN(0, "DISCONNECT");
break;
}
return fullRemainingLength + 1 + lenFields;
}
QByteArray MqttPacket::serialize() const
{
QByteArray ret;
QDataStream stream(&ret, QIODevice::WriteOnly);
stream << d_ptr->header;
quint16 remainingLength = 0;
switch (type()) {
case TypeConnect:
remainingLength = static_cast<quint16>(
2 // protocol name length
+ d_ptr->protocolName.length()
+ 1 // protocol level
+ 1 // connect flags
+ 2 // keep alive
+ 2 // client id length
+ d_ptr->clientId.length()
+ (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWill) ? (2 + d_ptr->willTopic.length()) : 0)
+ (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWill) ? (2 + d_ptr->willMessage.length()) : 0)
+ (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagUsername) ? (2 + d_ptr->username.length()) : 0)
+ (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagPassword) ? (2 + d_ptr->password.length()) : 0)
);
break;
case TypeConnack:
remainingLength = 2;
break;
case TypePublish:
remainingLength += 2; // len for topic
remainingLength += d_ptr->topic.length();
if (qos() == Mqtt::QoS1 || qos() == Mqtt::QoS2) {
remainingLength += 2; // packetId
}
remainingLength += d_ptr->payload.length();
break;
case TypePuback:
case TypePubrec:
case TypePubrel:
case TypePubcomp:
remainingLength = 2;
break;
case TypeSubscribe:
remainingLength = 2; // packet id
foreach (const MqttSubscription &subscription, d_ptr->subscriptions) {
remainingLength += 2; // topic filter length
remainingLength += static_cast<quint16>(subscription.topicFilter().length());
remainingLength += 1; // requested QoS
}
break;
case TypeSuback:
remainingLength = 2 + static_cast<quint16>(d_ptr->subscribeReturnCodes.length());
break;
case TypeUnsubscribe:
remainingLength = 2; // packet id
foreach (const MqttSubscription &subscription, d_ptr->subscriptions) {
remainingLength += 2;
remainingLength += static_cast<quint16>(subscription.topicFilter().length());
}
break;
case TypeUnsuback:
remainingLength = 2; // packet id
break;
case TypePingreq:
break;
case TypePingresp:
break;
case TypeDisconnect:
break;
}
quint8 encodedByte;
do {
encodedByte = remainingLength % 128;
remainingLength /= 128;
if ( remainingLength > 0 ) {
encodedByte = encodedByte | 128;
}
stream << encodedByte;
} while ( remainingLength > 0 );
switch (type()) {
case TypeConnect:
stream << static_cast<quint16>(d_ptr->protocolName.length());
stream.writeRawData(d_ptr->protocolName.data(), d_ptr->protocolName.length());
stream << static_cast<quint8>(d_ptr->protocolLevel);
stream << static_cast<quint8>(d_ptr->connectFlags);
stream << static_cast<quint16>(d_ptr->keepAlive);
stream << static_cast<quint16>(d_ptr->clientId.length());
stream.writeRawData(d_ptr->clientId.data(), d_ptr->clientId.length());
if (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagWill)) {
stream << static_cast<quint16>(d_ptr->willTopic.length());
stream.writeRawData(d_ptr->willTopic.data(), d_ptr->willTopic.length());
stream << static_cast<quint16>(d_ptr->willMessage.length());
stream.writeRawData(d_ptr->willMessage.data(), d_ptr->willMessage.length());
}
if (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagUsername)) {
stream << static_cast<quint16>(d_ptr->username.length());
stream.writeRawData(d_ptr->username.data(), d_ptr->username.length());
}
if (d_ptr->connectFlags.testFlag(Mqtt::ConnectFlagPassword)) {
stream << static_cast<quint16>(d_ptr->password.length());
stream.writeRawData(d_ptr->password.data(), d_ptr->password.length());
}
break;
case TypeConnack:
stream << static_cast<quint8>(d_ptr->connackFlags);
stream << static_cast<quint8>(d_ptr->connectReturnCode);
break;
case TypePublish:
stream << static_cast<quint16>(d_ptr->topic.length());
stream.writeRawData(d_ptr->topic.data(), d_ptr->topic.length());
if (qos() == Mqtt::QoS1 || qos() == Mqtt::QoS2) {
stream << d_ptr->packetId;
}
stream.writeRawData(d_ptr->payload.data(), d_ptr->payload.length());
break;
case TypePuback:
case TypePubrec:
case TypePubrel:
case TypePubcomp:
stream << d_ptr->packetId;
break;
case TypeSubscribe:
stream << static_cast<quint16>(d_ptr->packetId);
foreach (const MqttSubscription &subscription, d_ptr->subscriptions) {
stream << static_cast<quint16>(subscription.topicFilter().length());
stream.writeRawData(subscription.topicFilter().data(), subscription.topicFilter().length());
stream << static_cast<quint8>(subscription.qoS());
}
break;
case TypeSuback:
stream << d_ptr->packetId;
foreach (Mqtt::SubscribeReturnCode subscribeReturnCode, d_ptr->subscribeReturnCodes) {
stream << static_cast<quint8>(subscribeReturnCode);
}
break;
case TypeUnsubscribe:
stream << d_ptr->packetId;
foreach (const MqttSubscription &subscription, d_ptr->subscriptions) {
stream << static_cast<quint16>(subscription.topicFilter().length());
stream.writeRawData(subscription.topicFilter().data(), subscription.topicFilter().length());
}
break;
case TypeUnsuback:
stream << d_ptr->packetId;
break;
case TypePingreq:
break;
case TypePingresp:
break;
case TypeDisconnect:
break;
}
// qCDebug(dbgProto()) << "Serialized MQTT packet:" << ret.toHex();
return ret;
}
bool MqttPacket::operator==(const MqttPacket &other) const
{
return serialize() == other.serialize();
}
bool MqttPacketPrivate::verifyHeaderFlags()
{
bool fail = false;
switch (q_ptr->type()) {
case MqttPacket::MqttPacket::TypeConnect:
case MqttPacket::TypeConnack:
case MqttPacket::TypePuback:
case MqttPacket::TypePubrec:
case MqttPacket::TypePubcomp:
case MqttPacket::TypeSuback:
case MqttPacket::TypeUnsuback:
case MqttPacket::TypePingreq:
case MqttPacket::TypePingresp:
case MqttPacket::TypeDisconnect:
fail |= q_ptr->dup();
fail |= (q_ptr->qos() != Mqtt::QoS0);
fail |= q_ptr->retain();
break;
case MqttPacket::TypePublish:
fail |= (q_ptr->qos() == 0x03);
break;
case MqttPacket::TypeSubscribe:
case MqttPacket::TypePubrel:
case MqttPacket::TypeUnsubscribe:
fail |= q_ptr->dup();
fail |= (q_ptr->qos() != Mqtt::QoS1);
fail |= q_ptr->retain();
break;
}
return !fail;
}

112
libnymea-mqtt/mqttpacket.h Normal file
View File

@ -0,0 +1,112 @@
#ifndef MQTTPACKET_H
#define MQTTPACKET_H
#include <QByteArray>
#include <QByteArray>
#include <QList>
#include <QLoggingCategory>
#include "mqtt.h"
#include "mqttsubscription.h"
class MqttPacketPrivate;
class MqttPacket
{
public:
enum Type {
TypeConnect = 0x10,
TypeConnack = 0x20,
TypePublish = 0x30,
TypePuback = 0x40,
TypePubrec = 0x50,
TypePubrel = 0x60,
TypePubcomp = 0x70,
TypeSubscribe = 0x80,
TypeSuback = 0x90,
TypeUnsubscribe = 0xa0,
TypeUnsuback = 0xb0,
TypePingreq = 0xc0,
TypePingresp = 0xd0,
TypeDisconnect = 0xe0
};
MqttPacket();
MqttPacket(Type type, quint16 packetId = 0, Mqtt::QoS qos = Mqtt::QoS0, bool retain = false, bool dup = false);
public:
Type type() const;
bool dup() const;
void setDup(bool dup);
Mqtt::QoS qos() const;
void setQoS(Mqtt::QoS qoS);
bool retain() const;
void setRetain(bool retain);
// CONNECT
void setCleanSession(bool cleanSession);
bool cleanSession() const;
Mqtt::ConnectFlags connectFlags() const;
QByteArray protocolName() const;
Mqtt::Protocol protocolLevel() const;
void setProtocolLevel(Mqtt::Protocol protocolLevel);
quint16 keepAlive() const;
void setKeepAlive(quint16 keepAlive);
QByteArray clientId() const;
void setClientId(const QByteArray &clientId);
QByteArray willTopic() const;
void setWillTopic(const QByteArray &willTopic);
QByteArray willMessage() const;
void setWillMessage(const QByteArray &willMessage);
Mqtt::QoS willQoS() const;
void setWillQoS(Mqtt::QoS willQoS);
bool willRetain() const;
void setWillRetain(bool willRetain);
QByteArray username() const;
void setUsername(const QByteArray &username);
QByteArray password() const;
void setPassword(const QByteArray &password);
// CONNACK
Mqtt::ConnectReturnCode connectReturnCode() const;
void setConnectReturnCode(Mqtt::ConnectReturnCode connectReturnCode);
Mqtt::ConnackFlags connackFlags() const;
void setConnackFlags(Mqtt::ConnackFlags connackFlags);
// PUBLISH/SUBSCRIBE
quint16 packetId() const;
void setPacketId(quint16 packetId);
// PUBLISH
QByteArray topic() const;
void setTopic(const QByteArray &topic);
QByteArray payload() const;
void setPayload(const QByteArray &payload);
// SUBSCRIBE
MqttSubscriptions subscriptions() const;
void setSubscriptions(const MqttSubscriptions &subscriptions);
void addSubscription(const MqttSubscription &subscription);
// SUBACK
Mqtt::SubscribeReturnCodes subscribeReturnCodes() const;
void setSubscribeReturnCodes(const Mqtt::SubscribeReturnCodes subscribeReturnCodes);
void addSubscribeReturnCode(Mqtt::SubscribeReturnCode subscribeReturnCode);
// Takes a buffer and fills the packet accordingly.
// Returns the length of data used from the buffer on success, bad() will return false
// Returns -1 on bad data input, bad() will return true
// Returns 0 if input data is ok, but not long enough, bad() will return true
int parse(const QByteArray &buffer);
QByteArray serialize() const;
bool operator==(const MqttPacket &other) const;
private:
MqttPacketPrivate *d_ptr = nullptr;
};
typedef QList<MqttPacket> MqttPackets;
#endif // MQTTPACKET_H

View File

@ -0,0 +1,45 @@
#ifndef MQTTPACKET_P_H
#define MQTTPACKET_P_H
#include <QByteArray>
#include <QByteArray>
#include <QList>
#include <QLoggingCategory>
#include "mqtt.h"
#include "mqttpacket.h"
#include "mqttsubscription.h"
Q_DECLARE_LOGGING_CATEGORY(dbgProto)
class MqttPacketPrivate
{
public:
MqttPacketPrivate(MqttPacket *q) : q_ptr(q) { }
MqttPacket *q_ptr;
bool verifyHeaderFlags();
quint8 header = 0;
QByteArray protocolName = "MQTT";
Mqtt::Protocol protocolLevel = Mqtt::ProtocolUnknown;
Mqtt::ConnectFlags connectFlags = Mqtt::ConnectFlagNone;
Mqtt::ConnackFlags connackFlags = Mqtt::ConnackFlagNone;
quint16 keepAlive = 0;
QByteArray clientId;
QByteArray willTopic;
QByteArray willMessage;
QByteArray username;
QByteArray password;
quint16 packetId = 0;
QByteArray topic;
QByteArray payload;
Mqtt::ConnectReturnCode connectReturnCode = Mqtt::ConnectReturnCodeAccepted;
MqttSubscriptions subscriptions;
Mqtt::SubscribeReturnCodes subscribeReturnCodes;
};
#endif // MQTTPACKET_P_H

View File

@ -0,0 +1,569 @@
#include "mqttserver.h"
#include "mqttserver_p.h"
#include "mqttpacket.h"
#include <QDebug>
#include <QDataStream>
#include <QUuid>
#include <QtGlobal>
#include <QRegExp>
Q_LOGGING_CATEGORY(dbgServer, "nymea.mqtt.server")
MqttServerPrivate::MqttServerPrivate(MqttServer *q):
QObject(q),
q_ptr(q)
{
qRegisterMetaType<Mqtt::QoS>();
server = new QTcpServer(this);
connect(server, &QTcpServer::newConnection, this, &MqttServerPrivate::onNewConnection);
}
QHash<QString, quint16> MqttServerPrivate::publish(const QString &topic, const QByteArray &payload)
{
QHash<QTcpSocket*, Mqtt::QoS> receivers;
foreach (QTcpSocket *c, clientList.keys()) {
foreach (const MqttSubscription &subscription, clientList.value(c)->subscriptions) {
if (matchTopic(subscription.topicFilter(), topic)) {
if (!receivers.contains(c) || receivers.value(c) < subscription.qoS()) {
receivers[c] = subscription.qoS();
}
}
}
}
QHash<QString, quint16> packets;
foreach (QTcpSocket *receiver, receivers.keys()) {
ClientContext *ctx = clientList.value(receiver);
qCDebug(dbgServer) << "Relaying packet to subscribed client:" << ctx->clientId;
Mqtt::QoS qos = receivers.value(receiver);
MqttPacket packet(MqttPacket::TypePublish, qos >= Mqtt::QoS0 ? newPacketId(ctx) : 0, qos);
packet.setTopic(topic.toUtf8());
packet.setPayload(payload);
receiver->write(packet.serialize());
packets.insert(ctx->clientId, packet.packetId());
if (packet.qos() == Mqtt::QoS0) {
QString clientId = ctx->clientId;
QTimer::singleShot(0, this, [this, clientId, packet](){
emit q_ptr->published(clientId, packet.packetId(), packet.topic(), packet.payload());
});
} else {
ClientContext *ctx = clientList.value(receiver);
ctx->unackedPackets.insert(packet.packetId(), packet);
ctx->unackedPacketList.append(packet.packetId());
}
}
return packets;
}
MqttServer::MqttServer(QObject *parent):
QObject(parent),
d_ptr(new MqttServerPrivate(this))
{
}
Mqtt::QoS MqttServer::maximumSubscriptionsQoS() const
{
return d_ptr->maximumSubscriptionQoS;
}
void MqttServer::setMaximumSubscriptionsQoS(Mqtt::QoS maximumSubscriptionQoS)
{
d_ptr->maximumSubscriptionQoS = maximumSubscriptionQoS;
}
bool MqttServer::listen(const QHostAddress &address, quint16 port, MqttUserValidator *userValidator)
{
d_ptr->userValidator = userValidator;
if (!d_ptr->server->listen(address, port)) {
qCWarning(dbgServer()) << "Error listening on port" << port;
return false;
}
qCDebug(dbgServer) << "nymea MQTT server running on" << address.toString() << ":" << port;
return true;
}
QStringList MqttServer::clients() const
{
QStringList clientIds;
foreach (ClientContext *ctx, d_ptr->clientList) {
clientIds << ctx->clientId;
}
return clientIds;
}
QHash<QString, quint16> MqttServer::publish(const QString &topic, const QByteArray &payload)
{
return d_ptr->publish(topic, payload);
}
void MqttServerPrivate::onNewConnection()
{
QTcpSocket *client = server->nextPendingConnection();
// Start a 10 second timer to clean up the connection if we don't get data until then.
QTimer *timeoutTimer = new QTimer(this);
connect(timeoutTimer, &QTimer::timeout, this, [this, client]() {
qCWarning(dbgServer) << "A client connected but did not send data in 10 seconds. Dropping connection.";
client->abort();
pendingConnections.take(client)->deleteLater();
client->deleteLater();
});
timeoutTimer->start(10000);
pendingConnections.insert(client, timeoutTimer);
connect(client, &QTcpSocket::readyRead, this, &MqttServerPrivate::onClientReadyRead);
connect(client, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(onClientError(QAbstractSocket::SocketError)));
connect(client, &QTcpSocket::disconnected, this, &MqttServerPrivate::onClientDisconnected);
}
void MqttServerPrivate::onClientDisconnected()
{
QTcpSocket *client = static_cast<QTcpSocket*>(sender());
cleanupClient(client);
}
void MqttServerPrivate::cleanupClient(QTcpSocket *client)
{
if (clientBuffers.contains(client)) {
clientBuffers.remove(client);
}
if (clientList.contains(client)) {
ClientContext *ctx = clientList.value(client);
qCDebug(dbgServer) << "Client" << ctx->clientId << "disconnected.";
ctx->keepAliveTimer.stop();
if (!ctx->willTopic.isEmpty()) {
qCDebug(dbgServer) << "Publishing will message for client" << ctx->clientId << "on topic" << ctx->willTopic << "( Retain:" << ctx->willRetain << ")";
MqttPacket willPacket(MqttPacket::TypePublish, ctx->willQoS >= Mqtt::QoS1 ? newPacketId(ctx) : 0, ctx->willQoS, ctx->willRetain);
willPacket.setTopic(ctx->willTopic);
willPacket.setPayload(ctx->willMessage);
processPacket(willPacket, client);
}
emit q_ptr->clientDisconnected(ctx->clientId);
clientList.remove(client);
delete ctx;
}
client->flush();
client->deleteLater();
}
void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *client)
{
if (packet.type() == MqttPacket::TypeConnect) {
if (clientList.contains(client)) {
ClientContext *ctx = clientList.value(client);
qCWarning(dbgServer) << "Client" << ctx->clientId << "sends duplicate CONNECT packets. Dropping connection.";
cleanupClient(client);
return;
}
MqttPacket response(MqttPacket::TypeConnack, packet.packetId());
if (packet.protocolLevel() != Mqtt::Protocol311) {
qCWarning(dbgServer) << "This MQTT broker only supports Protocol version 3.1.1";
response.setConnectReturnCode(Mqtt::ConnectReturnCodeUnacceptableProtocolVersion);
client->write(response.serialize());
cleanupClient(client);
return;
}
QString clientId = packet.clientId();
if (clientId.isEmpty()) {
if (!packet.cleanSession()) {
qCWarning(dbgServer) << "Empty client id provided but clean session flag not set. Rejecting connection.";
response.setConnectReturnCode(Mqtt::ConnectReturnCodeIdentifierRejected);
client->write(response.serialize());
cleanupClient(client);
return;
}
clientId = QUuid::createUuid().toString().remove(QRegExp("[{}-]*"));
}
if (userValidator) {
QString username;
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagUsername)) {
username = packet.username();
}
QString password;
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagPassword)) {
password = packet.password();
}
Mqtt::ConnectReturnCode userValidationReturnCode = userValidator->validateConnect(clientId, username, password, client->peerAddress());
if (userValidationReturnCode != Mqtt::ConnectReturnCodeAccepted) {
qCWarning(dbgServer) << "Rejecting connection due to user validation.";
response.setConnectReturnCode(userValidationReturnCode);
client->write(response.serialize());
cleanupClient(client);
return;
}
}
ClientContext *ctx = nullptr;
QList<QTcpSocket*> existingSockets = clientList.keys();
for (int i = 0; i < existingSockets.count(); i++) {
QTcpSocket *existingClient = existingSockets.at(i);
if (clientId == clientList.value(existingClient)->clientId) {
if (!packet.connectFlags().testFlag(Mqtt::ConnectFlagCleanSession)) {
qCDebug(dbgServer).nospace() << clientId << ": Already have a session for this client ID. Taking over existing session.";
response.setConnackFlags(Mqtt::ConnackFlagSessionPresent);
ctx = clientList.value(existingClient);
// remove old client manually, we don't want to clean up the context, nor send any will message or emit disconnected signals
clientList.remove(existingClient);
clientBuffers.remove(existingClient);
existingClient->flush();
existingClient->deleteLater();
} else {
qCDebug(dbgServer).nospace() << clientId << ": Already have a session for this client ID. Dropping old session.";
cleanupClient(existingClient);
}
break;
}
}
if (!ctx) {
if (!packet.connectFlags().testFlag(Mqtt::ConnectFlagCleanSession)) {
qCWarning(dbgServer).nospace() << clientId << ": Request to take over existing session but we don't have an existing session.";
}
ctx = new ClientContext();
ctx->clientId = clientId;
connect(&ctx->keepAliveTimer, &QTimer::timeout, this, [this, client](){
qCWarning(dbgServer) << "Keep alive timeout reached for client:" << clientList.value(client)->clientId;
cleanupClient(client);
});
}
ctx->keepAlive = packet.keepAlive();
ctx->version = packet.protocolLevel();
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWill)) {
ctx->willTopic = packet.willTopic();
ctx->willMessage = packet.willMessage();
ctx->willRetain = packet.willRetain();
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWillQoS2)) {
ctx->willQoS = Mqtt::QoS2;
} else if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWillQoS1)) {
ctx->willQoS = Mqtt::QoS1;
}
}
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagUsername)) {
ctx->username = packet.username();
}
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagPassword)) {
}
qCDebug(dbgServer).nospace().noquote()
<< "New MQTT client: \"" << clientId << '\"'
<< ", Protocol: " << packet.protocolName() << " (" << packet.protocolLevel() << ')'
<< ", Flags: " << packet.connectFlags()
<< ", KeepAlive: " << packet.keepAlive()
<< ", Will Topic: \"" << packet.willTopic() << '\"'
<< ", Will Message: \"" << packet.willMessage() << '\"'
<< ", Will Retain: " << packet.willRetain()
<< ", Username: " << packet.username()
<< ", Password: " << QString(packet.password()).replace(QRegExp("."), "*");
if (ctx->keepAlive > 0) {
ctx->keepAliveTimer.start(ctx->keepAlive * 1500);
}
clientList.insert(client, ctx);
response.setConnectReturnCode(Mqtt::ConnectReturnCodeAccepted);
client->write(response.serialize());
emit q_ptr->clientConnected(ctx->clientId, ctx->username, client->peerAddress());
foreach (quint16 retryPacketId, ctx->unackedPacketList) {
qCDebug(dbgServer) << "Resending unacked packet" << retryPacketId << "to" << ctx->clientId;;
MqttPacket retryPacket = ctx->unackedPackets.value(retryPacketId);
retryPacket.setDup(true);
client->write(retryPacket.serialize());
}
return;
}
if (!clientList.contains(client)) {
qCWarning(dbgServer) << "Protocol error: Client connection did not send CONNECT yet. Dropping connection.";
client->close();
return;
}
ClientContext *ctx = clientList.value(client);
if (ctx->keepAlive > 0) {
ctx->keepAliveTimer.start();
}
emit q_ptr->clientAlive(ctx->clientId);
if (packet.type() == MqttPacket::TypePublish) {
qCDebug(dbgServer).nospace() << "Publish received from client " << ctx->clientId << ": Topic: " << packet.topic() << ", Payload: " << packet.payload() << " (Packet ID: " << packet.packetId() << ", DUP: " << packet.dup() << ", QoS: " << packet.qos() << ", Retain: " << packet.retain() << ')';
emit q_ptr->publishReceived(ctx->clientId, packet.packetId(), packet.topic(), packet.payload(), packet.dup());
switch (packet.qos()) {
case Mqtt::QoS0:
break;
case Mqtt::QoS1: {
MqttPacket response(MqttPacket::TypePuback, packet.packetId());
client->write(response.serialize());
break;
}
case Mqtt::QoS2: {
if (packet.dup() && ctx->unackedPacketList.contains(packet.packetId())) {
// We received this message before but the client keeps on trying... Just send a PUBREC and stop processing
client->write(ctx->unackedPackets.value(packet.packetId()).serialize());
return;
} else if (ctx->unackedPacketList.contains(packet.packetId())) {
// Hmm... Client says this is a new packet, but the ID is not released yet! Drop client connection.
qCWarning(dbgServer()).nospace() << "Received a bad packet from \"" << ctx->clientId << "\". DUP is not set but packet ID is already used and not released. Dropping client connection.";
cleanupClient(client);
return;
}
// Ok, a new packet, ack it with a PUBREC and store the number
MqttPacket response(MqttPacket::TypePubrec, packet.packetId());
ctx->unackedPackets.insert(response.packetId(), response);
ctx->unackedPacketList.append(packet.packetId());
client->write(response.serialize());
break;
}
}
if (packet.retain()) {
if (packet.payload().isEmpty()) {
qCDebug(dbgServer) << "Clearing retained messages for topic" << packet.topic();
retainedMessages.remove(packet.topic());
} else {
if (packet.qos() == Mqtt::QoS0) {
qCDebug(dbgServer) << "Clearing retained messages for topic" << packet.topic();
retainedMessages.remove(packet.topic());
}
qCDebug(dbgServer) << "Adding retained message for topic" << packet.topic();
retainedMessages[packet.topic()].append(packet);
}
}
publish(packet.topic(), packet.payload());
return;
}
if (packet.type() == MqttPacket::TypePuback) {
ctx->unackedPacketList.removeAll(packet.packetId());
MqttPacket publishedPacket = ctx->unackedPackets.take(packet.packetId());
emit q_ptr->published(ctx->clientId, packet.packetId(), publishedPacket.topic(), publishedPacket.payload());
return;
}
if (packet.type() == MqttPacket::TypePubrec) {
MqttPacket publishedPacket = ctx->unackedPackets.take(packet.packetId());
emit q_ptr->published(ctx->clientId, packet.packetId(), publishedPacket.topic(), publishedPacket.payload());
MqttPacket pubrel(MqttPacket::TypePubrel, packet.packetId());
ctx->unackedPackets.insert(packet.packetId(), pubrel);
client->write(pubrel.serialize());
return;
}
if (packet.type() == MqttPacket::TypePubrel) {
ctx->unackedPackets.remove(packet.packetId());
ctx->unackedPacketList.removeAll(packet.packetId());
MqttPacket response(MqttPacket::TypePubcomp, packet.packetId());
client->write(response.serialize());
return;
}
if (packet.type() == MqttPacket::TypePubcomp) {
ctx->unackedPackets.remove(packet.packetId());
ctx->unackedPacketList.removeAll(packet.packetId());
return;
}
if (packet.type() == MqttPacket::TypeSubscribe) {
// qCDebug(dbgServer).nospace() << ctx->clientId ": Subscribe packet received.";
MqttPacket response(MqttPacket::TypeSuback, packet.packetId());
QByteArray payload;
foreach (MqttSubscription subscription, packet.subscriptions()) {
if (userValidator && !userValidator->validateSubscribe(subscription.topicFilter(), ctx->clientId, ctx->username)) {
qCWarning(dbgServer).nospace().noquote() << "Subscription topic filter not allowed for client \"" << ctx->clientId << "\": \"" << subscription.topicFilter() << '\"';
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeFailure);
continue;
}
if (!validateTopicFilter(subscription.topicFilter())) {
qCWarning(dbgServer).nospace() << "Subscription topic filter not valid for client \"" << ctx->clientId << "\": " << subscription.topicFilter();
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeFailure);
continue;
}
subscription.setQoS(qMin(subscription.qoS(), maximumSubscriptionQoS));
bool updated = false;
for (int i = 0; i < ctx->subscriptions.count(); i++) {
if (ctx->subscriptions.at(i).topicFilter() == subscription.topicFilter()) {
qCDebug(dbgServer).noquote().nospace() << "Client \"" << ctx->clientId << "\" subscribed with a duplicate topic filter. Replacing subcription with new QoS" << subscription.qoS();
ctx->subscriptions.replace(i, subscription);
updated = true;
}
}
if (!updated) {
ctx->subscriptions.append(subscription);
}
qCDebug(dbgServer).noquote().nospace() << "Subscribed client \"" << ctx->clientId << "\" to topic filter: \"" << subscription.topicFilter() << "\" with QoS " << subscription.qoS();
emit q_ptr->clientSubscribed(ctx->clientId, subscription.topicFilter(), subscription.qoS());
switch (subscription.qoS()) {
case Mqtt::QoS0:
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS0);
break;
case Mqtt::QoS1:
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS1);
break;
case Mqtt::QoS2:
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS2);
break;
}
}
client->write(response.serialize());
// Deliver any retained messages for this topic
foreach (MqttSubscription subscription, packet.subscriptions()) {
foreach (const QString &topic, retainedMessages.keys()) {
if (matchTopic(subscription.topicFilter(), topic)) {
foreach (MqttPacket packet, retainedMessages.value(topic)) {
packet.setRetain(true);
client->write(packet.serialize());
}
}
}
}
return;
}
if (packet.type() == MqttPacket::TypeUnsubscribe) {
MqttSubscriptions newSubscriptions;
foreach (const MqttSubscription &existingSubscription, ctx->subscriptions) {
bool matching = false;
foreach (const MqttSubscription &unsub, packet.subscriptions()) {
if (existingSubscription.topicFilter() == unsub.topicFilter()) {
qCDebug(dbgServer) << "Unsubscribing client" << ctx->clientId << "from" << unsub.topicFilter();
emit q_ptr->clientUnsubscribed(ctx->clientId, unsub.topicFilter());
matching = true;
break;
}
}
if (!matching) {
newSubscriptions.append(existingSubscription);
}
}
ctx->subscriptions = newSubscriptions;
MqttPacket response(MqttPacket::TypeUnsuback, packet.packetId());
client->write(response.serialize());
return;
}
if (packet.type() == MqttPacket::TypePingreq) {
// qCDebug(dbgServer).nospace() << ctx->clientId << ": Pingreq received";
MqttPacket response(MqttPacket::TypePingresp, packet.packetId());
client->write(response.serialize());
return;
}
if (packet.type() == MqttPacket::TypeDisconnect) {
ctx->willMessage.clear();
ctx->willTopic.clear();
return;
}
qCWarning(dbgServer).nospace().noquote() << "Unknown packet received from client \"" << ctx->clientId << "\": " << QString::number(packet.type(), 16);
Q_ASSERT(false);
cleanupClient(client);
}
bool MqttServerPrivate::validateTopicFilter(const QString &topicFilter)
{
if (topicFilter.length() < 1) {
return false;
}
QStringList parts = topicFilter.split('/');
for (int i = 0; i < parts.count(); i++) {
const QString &part = parts.at(i);
if (part.contains('#') && (part != '#' || i != parts.count() - 1)) {
return false;
}
if (part.contains('+') && part != '+') {
return false;
}
}
return true;
}
bool MqttServerPrivate::matchTopic(const QString &topicFilter, const QString &topic)
{
if (topic.startsWith('$')) {
return false;
}
QStringList filterParts = topicFilter.split('/');
QStringList topicParts = topic.split('/');
if (topicParts.count() < filterParts.count() - 1) {
return false;
}
for (int i = 0; i < filterParts.count(); i++) {
if (filterParts.at(i) == '+') {
continue;
}
if (filterParts.at(i) == '#') {
continue;
}
if (topicParts.at(i) == filterParts.at(i)) {
continue;
}
return false;
}
return filterParts.count() == topicParts.count() || topicFilter.endsWith('#');
}
quint16 MqttServerPrivate::newPacketId(ClientContext *ctx)
{
static quint16 packetId = 0;
do {
packetId++;
} while(ctx->unackedPacketList.contains(packetId));
return packetId;
}
void MqttServerPrivate::onClientReadyRead()
{
QTcpSocket* client = static_cast<QTcpSocket*>(sender());
clientBuffers[client].append(client->readAll());
do {
MqttPacket packet;
int ret = packet.parse(clientBuffers[client]);
if (ret == 0) {
qCDebug(dbgServer) << "Packet too short... Waiting for more...";
return;
}
// Ok, we've got a full packet (or garbage data). If this client is still pending
// we can stop the timer, the protocol will take it from here.
if (pendingConnections.contains(client)) {
pendingConnections.take(client)->deleteLater();
}
if (ret == -1) {
qCWarning(dbgServer) << "Bad MQTT packet data, Dropping connection";
cleanupClient(client);
return;
}
clientBuffers[client].remove(0, ret);
processPacket(packet, client);
} while (!clientBuffers.value(client).isEmpty());
}
void MqttServerPrivate::onClientError(QAbstractSocket::SocketError error)
{
// qCWarning(dbgServer) << "Client error:" << error;
}

View File

@ -0,0 +1,58 @@
#ifndef MQTTSERVER_H
#define MQTTSERVER_H
#include <QObject>
#include <QTcpServer>
#include <QTcpSocket>
#include <QTimer>
#include <QLoggingCategory>
#include "mqttpacket.h"
class MqttServerPrivate;
class Subscription;
class MqttUserValidator {
public:
virtual ~MqttUserValidator() = default;
virtual Mqtt::ConnectReturnCode validateConnect(const QString &clientId, const QString &username, const QString &password, const QHostAddress &peerAddress) = 0;
virtual bool validateSubscribe(const QString &topicFilter, const QString &clientId, const QString &username) = 0;
};
class MqttServer : public QObject
{
Q_OBJECT
public:
explicit MqttServer(QObject *parent = nullptr);
Mqtt::QoS maximumSubscriptionsQoS() const;
void setMaximumSubscriptionsQoS(Mqtt::QoS maximumSubscriptionQoS);
bool listen(const QHostAddress &address = QHostAddress::Any, quint16 port = 1883, MqttUserValidator *userValidator = nullptr);
QStringList clients() const;
// allows publishing from the server, including topcis starting with $
QHash<QString, quint16> publish(const QString &topic, const QByteArray &payload = QByteArray());
signals:
// emitted whenever a client connects, after the mqtt connect handshake has been done.
void clientConnected(const QString &clientId, const QString &username, const QHostAddress &clientAddress);
// emitted whenever a client disconnects, that is, when a DISCONNECT message has been received or the keep alive timeout has been reached.
void clientDisconnected(const QString &clientId);
// emitted whenever a client has been seen, that is, a control message or a keep alive message has been received.
void clientAlive(const QString &clientId);
// emitted whenever a client subscribes, a client can also subscribe to topics starting with $ but those won't be relayed from other clients. Only internal server publishes to $ topcis will be sent to subscribed clients.
void clientSubscribed(const QString &clientId, const QString &topicFilter, Mqtt::QoS requestedQoS);
// emitted whenever a client unsubscribes from a topic
void clientUnsubscribed(const QString &clientId, const QString &topicFiltr);
// emitted whenever a publish message is received from a client before the message is relayed to other clients. Topics starting with $ will be received here, but not relayed to other clients.
void publishReceived(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload, bool dup);
// emitted whenever a publish message is sent to a client. Note: this might be fired often if many clients are connected and subsribed to matching topic filters.
void published(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload);
private:
MqttServerPrivate *d_ptr;
};
#endif // MQTTSERVER_H

View File

@ -0,0 +1,73 @@
#ifndef MQTTSERVER_P_H
#define MQTTSERVER_P_H
#include <QObject>
#include <QTcpServer>
#include <QTcpSocket>
#include <QTimer>
#include <QLoggingCategory>
#include "mqttpacket.h"
#include "mqttserver.h"
Q_DECLARE_LOGGING_CATEGORY(dbgServer)
class ClientContext;
class Subscription;
class MqttServerPrivate: public QObject
{
Q_OBJECT
public:
explicit MqttServerPrivate(MqttServer *q);
QHash<QString, quint16> publish(const QString &topic, const QByteArray &payload = QByteArray());
public:
void cleanupClient(QTcpSocket *client);
void processPacket(const MqttPacket &packet, QTcpSocket *client);
bool validateTopicFilter(const QString &topicFilter);
bool matchTopic(const QString &topicFilter, const QString &topic);
quint16 newPacketId(ClientContext *ctx);
public slots:
void onNewConnection();
void onClientReadyRead();
void onClientError(QAbstractSocket::SocketError);
void onClientDisconnected();
public:
MqttServer *q_ptr;
QTcpServer *server = nullptr;
MqttUserValidator *userValidator = nullptr;
Mqtt::QoS maximumSubscriptionQoS = Mqtt::QoS2;
QHash<QTcpSocket*, QTimer*> pendingConnections;
QHash<QTcpSocket*, ClientContext*> clientList;
QHash<QTcpSocket*, QByteArray> clientBuffers;
QHash<QString, MqttPackets> retainedMessages;
};
class ClientContext {
public:
Mqtt::Protocol version = Mqtt::ProtocolUnknown;
quint16 keepAlive = 0;
QTimer keepAliveTimer;
QString clientId;
QString username;
QByteArray willTopic;
QByteArray willMessage;
Mqtt::QoS willQoS = Mqtt::QoS0;
bool willRetain = false;
QByteArray inputBuffer;
MqttSubscriptions subscriptions;
QVector<quint16> unackedPacketList;
QHash<quint16, MqttPacket> unackedPackets;
};
#endif // MQTTSERVER_P_H

View File

@ -0,0 +1,38 @@
#include "mqttsubscription.h"
MqttSubscription::MqttSubscription()
{
}
MqttSubscription::MqttSubscription(const QByteArray &topicFilter, Mqtt::QoS qoS):
m_topicFilter(topicFilter),
m_qoS(qoS)
{
}
QByteArray MqttSubscription::topicFilter() const
{
return m_topicFilter;
}
void MqttSubscription::setTopicFilter(const QByteArray &topicFilter)
{
m_topicFilter = topicFilter;
}
Mqtt::QoS MqttSubscription::qoS() const
{
return m_qoS;
}
void MqttSubscription::setQoS(Mqtt::QoS qoS)
{
m_qoS = qoS;
}
bool MqttSubscription::operator==(const MqttSubscription &other) const
{
return m_qoS == other.qoS() && m_topicFilter == other.topicFilter();
}

View File

@ -0,0 +1,36 @@
#ifndef MQTTSUBSCRIPTION_H
#define MQTTSUBSCRIPTION_H
#include "mqtt.h"
#include <QString>
#include <QtDebug>
class MqttSubscription
{
public:
MqttSubscription();
MqttSubscription(const QByteArray &topicFilter, Mqtt::QoS qoS = Mqtt::QoS0);
QByteArray topicFilter() const;
void setTopicFilter(const QByteArray &topicFilter);
Mqtt::QoS qoS() const;
void setQoS(Mqtt::QoS qoS);
bool operator==(const MqttSubscription &other) const;
private:
QByteArray m_topicFilter;
Mqtt::QoS m_qoS = Mqtt::QoS0;
};
Q_DECLARE_METATYPE(MqttSubscription)
typedef QVector<MqttSubscription> MqttSubscriptions;
Q_DECLARE_METATYPE(MqttSubscriptions)
inline QDebug operator<<(QDebug debug, const MqttSubscription &subscription) {
debug.nospace().noquote() << "\"" << subscription.topicFilter() << "\" QoS: " << subscription.qoS();
return debug;
}
#endif // MQTTSUBSCRIPTION_H

6
nymea-mqtt.pri Normal file
View File

@ -0,0 +1,6 @@
QMAKE_CXXFLAGS *= -Werror -std=c++11 -g
QMAKE_LFLAGS *= -std=c++11
top_srcdir=$$PWD
top_builddir=$$shadowed($$PWD)

6
nymea-mqtt.pro Normal file
View File

@ -0,0 +1,6 @@
TEMPLATE = subdirs
SUBDIRS += libnymea-mqtt server tests
server.depends = libnymea-mqtt
tests.depends = libnymea-mqtt

13
server/main.cpp Normal file
View File

@ -0,0 +1,13 @@
#include <QCoreApplication>
#include "mqttserver.h"
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
MqttServer server;
server.listen(QHostAddress::AnyIPv4, 1883);
return a.exec();
}

12
server/server.pro Normal file
View File

@ -0,0 +1,12 @@
TEMPLATE = app
TARGET = nymea-mqttserver
include(../nymea-mqtt.pri)
QT += network
INCLUDEPATH += $$top_srcdir/libnymea-mqtt/
SOURCES += main.cpp
LIBS += -L$$top_builddir/libnymea-mqtt/ -lnymea-mqtt

View File

@ -0,0 +1,16 @@
QT += testlib network
QT -= gui
CONFIG += qt console warn_on depend_includepath testcase
CONFIG -= app_bundle
TEMPLATE = app
include(../../nymea-mqtt.pri)
INCLUDEPATH += $$top_srcdir/libnymea-mqtt/
SOURCES += test_operation.cpp
LIBS += -L$$top_builddir/libnymea-mqtt/ -lnymea-mqtt

View File

@ -0,0 +1,794 @@
#include "mqttserver.h"
#include "mqttclient.h"
#include "mqttclient_p.h"
#include <QTest>
#include <QSignalSpy>
class OperationTests: public QObject
{
Q_OBJECT
private slots:
void initTestCase();
void cleanup();
void connectAndDisconnect();
void keepAliveTimesOut();
void subscribeAndPublish_data();
void subscribeAndPublish();
void willIsSentOnClientDisappearing();
void willIsNotSentOnClientDisconnecting();
void testWillRetain();
void testAutoReconnect();
void testQoS1Retransmissions();
void testMultiSubscription();
void testSubscriptionTopicFilters_data();
void testSubscriptionTopicFilters();
void testSubscriptionTopicMatching_data();
void testSubscriptionTopicMatching();
void testSessionManagementDropOldSession();
void testSessionManagementResumeOldSession();
void testSessionManagementFailResumeOldSession();
void testQoS1PublishToServerIsAckedOnSessionResume();
void testQoS1PublishToClientIsDeliveredOnSessionResume();
void testQoS2PublishToServerIsCompletedOnSessionResume();
void testQoS2PublishToClientIsCompletedOnSessionResume();
void testRetain();
void testUnsubscribe();
void testEmptyClientId();
private:
// Connects and waits for the MQTT CONNECT to be finished
MqttClient *connectAndWait(const QString &clientId, bool cleanSession = true, quint16 keepAlive = 300, const QString &willTopic = QString(), const QString &willMessage = QString(), Mqtt::QoS willQoS = Mqtt::QoS0, bool willRetain = false);
// Just connects, returns the client and signalspy which has been created before calling connect. You must delete the spy yourself!
QPair<MqttClient*, QSignalSpy*> connectToServer(const QString &clientId, bool cleanSession = true, quint16 keepAlive = 300, const QString &willTopic = QString(), const QString &willMessage = QString(), Mqtt::QoS willQoS = Mqtt::QoS0, bool willRetain = false);
void disconnectAndWait(MqttClient* client);
bool subscribeAndWait(MqttClient* client, const QString &topic, Mqtt::QoS qos = Mqtt::QoS1);
private:
QString m_serverHost = "127.0.0.1";
quint16 m_serverPort = 5555;
MqttServer *m_server = nullptr;
QList<MqttClient*> m_clients;
};
MqttClient *OperationTests::connectAndWait(const QString &clientId, bool cleanSession, quint16 keepAlive, const QString &willTopic, const QString &willMessage, Mqtt::QoS willQoS, bool willRetain)
{
QPair<MqttClient*, QSignalSpy*> result = connectToServer(clientId, cleanSession, keepAlive, willTopic, willMessage, willQoS, willRetain);
if (result.second->count() == 0) {
result.second->wait();
}
if (result.second->count() == 0) {
qWarning() << "WARNING: Client didn't emit connected";
}
delete result.second;
return result.first;
}
QPair<MqttClient*, QSignalSpy*> OperationTests::connectToServer(const QString &clientId, bool cleanSession, quint16 keepAlive, const QString &willTopic, const QString &willMessage, Mqtt::QoS willQoS, bool willRetain)
{
MqttClient* client = new MqttClient(clientId, keepAlive, willTopic, willMessage.toUtf8(), willQoS, willRetain, this);
client->setAutoReconnect(false);
m_clients.append(client);
QSignalSpy *spy = new QSignalSpy(client, &MqttClient::connected);
client->connectToHost(m_serverHost, m_serverPort, cleanSession);
return qMakePair<MqttClient*, QSignalSpy*>(client, spy);
}
void OperationTests::disconnectAndWait(MqttClient* client)
{
QSignalSpy disconnectedSpy(client, &MqttClient::disconnected);
client->disconnectFromHost();
if (disconnectedSpy.count() == 0) {
disconnectedSpy.wait();
}
}
bool OperationTests::subscribeAndWait(MqttClient* client, const QString &topic, Mqtt::QoS qos)
{
QSignalSpy subscribedSpy(client, &MqttClient::subscribed);
quint16 packetId = client->subscribe(topic, qos);
if (subscribedSpy.count() == 0) {
subscribedSpy.wait();
}
Mqtt::SubscribeReturnCode expectedSubscribeReturnCode = qos == Mqtt::QoS0 ? Mqtt::SubscribeReturnCodeSuccessQoS0 : qos == Mqtt::QoS1 ? Mqtt::SubscribeReturnCodeSuccessQoS1 : Mqtt::SubscribeReturnCodeSuccessQoS2;
return subscribedSpy.count() == 1 && subscribedSpy.first().at(0).toInt() == packetId && subscribedSpy.first().at(1).value<Mqtt::SubscribeReturnCodes>().first() == expectedSubscribeReturnCode;
}
void OperationTests::initTestCase()
{
// QLoggingCategory::setFilterRules("nymea.mqtt.protocol.debug=false");
m_server = new MqttServer(this);
bool registered = false;
quint16 attempts = 0;
do {
registered = m_server->listen(QHostAddress(m_serverHost), m_serverPort + attempts);
} while(!registered && attempts++ < 20);
QVERIFY2(registered, QString("Failed to register server on %1 from port %2 to %3. Tests won't work.").arg(m_serverHost).arg(m_serverPort).arg(m_serverPort+attempts).toUtf8().data());
m_serverPort += attempts;
}
void OperationTests::cleanup()
{
while (!m_clients.isEmpty()) {
MqttClient *client = m_clients.takeFirst();
client->disconnectFromHost();
client->deleteLater();
}
QTRY_COMPARE(m_server->clients().count(), 0);
}
void OperationTests::connectAndDisconnect()
{
QSignalSpy serverSpy(m_server, &MqttServer::clientConnected);
QString clientId = "connectAndDisconnect-client";
MqttClient* client = connectAndWait(clientId);
QVERIFY2(serverSpy.count() == 1, "Server didn't emit clientConnected");
QVERIFY2(serverSpy.at(0).first() == clientId, "ClientId not matching on server side.");
QSignalSpy serverSpyDisconnect(m_server, &MqttServer::clientDisconnected);
QSignalSpy clientSpy(client, &MqttClient::disconnected);
client->disconnectFromHost();
QTRY_VERIFY2(clientSpy.count() == 1, "client didn't emit disconnected");
QTRY_VERIFY2(serverSpyDisconnect.count() == 1, "Server didn't emit clientDisconnected");
QVERIFY2(serverSpyDisconnect.at(0).first() == clientId, "ClientId not matching on server side.");
}
void OperationTests::keepAliveTimesOut()
{
QSignalSpy keepAliveSpy(m_server, &MqttServer::clientAlive);
MqttClient *client = connectAndWait("keepAlive1sec-client", true, 1);
client->setAutoReconnect(false);
QTest::qWait(2000);
qDebug() << "Received" << keepAliveSpy.count() << "keep alive messages";
QVERIFY2(client->isConnected(), "Client connection dropped");
QVERIFY2(keepAliveSpy.count() > 0, "Keep alive not received");
client->disconnectFromHost();
keepAliveSpy.clear();
client = connectAndWait("timeout1sec-client", true, 1);
client->setAutoReconnect(false);
client->d_ptr->keepAliveTimer.stop(); // disable the keepalive timer
QTest::qWait(2000);
qDebug() << "Received" << keepAliveSpy.count() << "keep alive messages";
QVERIFY2(!client->isConnected(), "Client connection still alive but it should have been dropped");
}
void OperationTests::subscribeAndPublish_data()
{
QTest::addColumn<Mqtt::QoS>("qosClient1");
QTest::addColumn<Mqtt::QoS>("qosClient2");
QList<QList<Mqtt::QoS> > rows;
rows.append({Mqtt::QoS0, Mqtt::QoS0});
rows.append({Mqtt::QoS0, Mqtt::QoS1});
rows.append({Mqtt::QoS0, Mqtt::QoS2});
rows.append({Mqtt::QoS1, Mqtt::QoS0});
rows.append({Mqtt::QoS1, Mqtt::QoS1});
rows.append({Mqtt::QoS1, Mqtt::QoS2});
rows.append({Mqtt::QoS2, Mqtt::QoS0});
rows.append({Mqtt::QoS2, Mqtt::QoS1});
rows.append({Mqtt::QoS2, Mqtt::QoS2});
foreach (const QList<Mqtt::QoS> &row, rows) {
QTest::newRow(QString("Subscribe QoS%1 -> Publish QoS%2").arg(row.at(0)).arg(row.at(1)).toUtf8().data()) << row.at(0) << row.at(1);
}
}
void OperationTests::subscribeAndPublish()
{
QFETCH(Mqtt::QoS, qosClient1);
QFETCH(Mqtt::QoS, qosClient2);
QString clientId1 = QString("subQoS%1-client").arg(qosClient1);
MqttClient *client1 = connectAndWait(clientId1);
QString clientId2 = QString("pubQoS%1-client").arg(qosClient2);
MqttClient *client2 = connectAndWait(clientId2);
QSignalSpy serverSubscribeSpy(m_server, &MqttServer::clientSubscribed);
QSignalSpy clientSubscribeSpy(client1, &MqttClient::subscribed);
quint16 packetId = client1->subscribe("#", qosClient1);
QTRY_VERIFY2(serverSubscribeSpy.count() == 1, "Server did not emit clientSubscribed");
QVERIFY2(serverSubscribeSpy.first().first() == clientId1, "Client Id not matching");
QVERIFY2(serverSubscribeSpy.first().at(1) == "#", "Topic not matching");
QVERIFY2(serverSubscribeSpy.first().at(2) == qosClient1, "QoS not matching");
QTRY_VERIFY2(clientSubscribeSpy.count() == 1, "Client did not emit subscribed");
QVERIFY2(clientSubscribeSpy.first().first() == packetId, "Packet ID not matching");
QVERIFY2(clientSubscribeSpy.first().at(1).value<Mqtt::SubscribeReturnCodes>().count() == 1, "Subscribe return code count not matching");
QSignalSpy serverPublishReceivedSpy(m_server, &MqttServer::publishReceived);
QSignalSpy serverPublishedSpy(m_server, &MqttServer::published);
QSignalSpy client1PublishReceivedSpy(client1, &MqttClient::publishReceived);
QSignalSpy client2PublishedSpy(client2, &MqttClient::published);
packetId = client2->publish("/testtopic/", "Hello world", qosClient2);
QTRY_VERIFY2(serverPublishReceivedSpy.count() == 1, "Server did not emit publishReceived");
QVERIFY2(serverPublishReceivedSpy.first().at(0) == clientId2, "Server did emit publishReceived signal but client ID is not matching");
QVERIFY2(serverPublishReceivedSpy.first().at(1) == packetId, QString("Server did emit publishReceived signal but Packet ID is not matching:\nActual: %1\nExpected: %2").arg(serverPublishReceivedSpy.first().at(1).toInt()).arg(packetId).toUtf8().data());
QVERIFY2(serverPublishReceivedSpy.first().at(2) == "/testtopic/", "Server did emit publishReceived signal but topic is not matching");
QVERIFY2(serverPublishReceivedSpy.first().at(3) == "Hello world", "Server did emit publishReceived signal but payload is not matching");
QTRY_VERIFY2(serverPublishedSpy.count() == 1, "Server did not emit published");
QVERIFY2(serverPublishedSpy.first().at(0) == clientId1, "Server did emit published signal but client ID is not matching");
QTRY_VERIFY2(client1PublishReceivedSpy.count() == 1, "Subscribing client did not emit publishReceived signal");
QVERIFY2(client1PublishReceivedSpy.first().at(0) == "/testtopic/", "Subscribing client did emit publishReceived signal but topic is not matching");
QVERIFY2(client1PublishReceivedSpy.first().at(1) == "Hello world", "Subscribing client did emit publishReceived signal but payload is not matching");
QTRY_VERIFY2(client2PublishedSpy.count() == 1, "Publishing client did not emit published signal");
QVERIFY2(client2PublishedSpy.first().first() == packetId, "Publishing client did emit published signal but packet ID not matching");
}
void OperationTests::willIsSentOnClientDisappearing()
{
MqttClient *client1 = connectAndWait("subWill-client");
MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye");
QSignalSpy publishSpy(client1, &MqttClient::publishReceived);
QVERIFY(subscribeAndWait(client1, "#"));
client2->d_ptr->socket->abort();
QTRY_VERIFY2(publishSpy.count() == 1, "Will has not been sent");
QVERIFY2(publishSpy.first().at(0) == "/testtopic", "Will topic not matching");
QVERIFY2(publishSpy.first().at(1) == "Bye bye", "Will message not matching");
}
void OperationTests::willIsNotSentOnClientDisconnecting()
{
MqttClient *client1 = connectAndWait("subWill-client");
MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye");
QSignalSpy subscribeSpy(client1, &MqttClient::subscribed);
QSignalSpy publishSpy(client1, &MqttClient::publishReceived);
client1->subscribe("#");
subscribeSpy.wait();
client2->disconnectFromHost();
publishSpy.wait(200);
QVERIFY2(publishSpy.count() == 0, "Will has been sent but it should not have been");
}
void OperationTests::testWillRetain()
{
MqttClient *client1 = connectAndWait("subWill-client");
MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye", Mqtt::QoS1, true);
QSignalSpy subscribeSpy(client1, &MqttClient::subscribed);
QSignalSpy publishSpy(client1, &MqttClient::publishReceived);
client1->subscribe("#");
subscribeSpy.wait();
client2->setAutoReconnect(false);
client2->d_ptr->socket->abort();
QTRY_VERIFY2(publishSpy.count() == 1, "Will has not been sent");
QVERIFY2(publishSpy.first().at(0) == "/testtopic", QString("Will topic not matching: %1").arg(publishSpy.first().at(0).toString()).toUtf8().data());
QVERIFY2(publishSpy.first().at(1) == "Bye bye", "Will message not matching");
QVERIFY2(publishSpy.first().at(2) == false, "Retain flag not matching");
MqttClient *client3 = connectAndWait("subWill-client2");
QSignalSpy retainedWillSpy(client3, &MqttClient::publishReceived);
client3->subscribe("#");
QTRY_VERIFY2(retainedWillSpy.count() == 1, "Retained Will has not been sent");
QVERIFY2(retainedWillSpy.first().at(0) == "/testtopic", "Will topic not matching");
QVERIFY2(retainedWillSpy.first().at(1) == "Bye bye", "Will message not matching");
QVERIFY2(retainedWillSpy.first().at(2) == true, "Retain flag not matching");
// Clear retain on /testtopic
QSignalSpy clearRetainSpy(client3, &MqttClient::published);
client3->publish("/testtopic", QByteArray(), Mqtt::QoS1, true);
QTRY_VERIFY2(clearRetainSpy.count() == 1, "Clearing retain message did not succeed");
}
void OperationTests::testAutoReconnect()
{
MqttClient *client1 = connectAndWait("client1");
client1->setAutoReconnect(true);
QSignalSpy disconnectedSpy(client1, &MqttClient::disconnected);
QSignalSpy connectedSpy(client1, &MqttClient::connected);
client1->d_ptr->socket->abort();
QTRY_VERIFY2(disconnectedSpy.count() == 1, "client did not emit disconnected");
QTRY_VERIFY2(connectedSpy.count() == 1, "client did not emit connected");
}
void OperationTests::testQoS1Retransmissions()
{
QSignalSpy serverSpy(m_server, &MqttServer::publishReceived);
MqttClient *client = connectAndWait("client1");
client->setAutoReconnect(true);
// publish a packet, flush the pipe and immediately drop the connection before we have a chance to receive the PUBACK
int packetId = client->publish("/testtopic", "Hello world", Mqtt::QoS1);
client->d_ptr->socket->flush();
QSignalSpy connectedSpy(client, &MqttClient::connected);
client->d_ptr->socket->abort();
// Wait for it to reconnect, it should then republish the packet
connectedSpy.wait();
QTRY_VERIFY2(serverSpy.count() == 2, "Server didn't receive the publication twice but it should have");
QCOMPARE(serverSpy.at(0).at(0).toString(), QString("client1"));
QCOMPARE(serverSpy.at(0).at(1).toInt(), packetId);
QCOMPARE(serverSpy.at(0).at(2).toString(), QString("/testtopic"));
QCOMPARE(serverSpy.at(0).at(3).toString(), QString("Hello world"));
QCOMPARE(serverSpy.at(0).at(4).toBool(), false);
QCOMPARE(serverSpy.at(1).at(0).toString(), QString("client1"));
QCOMPARE(serverSpy.at(1).at(1).toInt(), packetId);
QCOMPARE(serverSpy.at(1).at(2).toString(), QString("/testtopic"));
QCOMPARE(serverSpy.at(1).at(3).toString(), QString("Hello world"));
QCOMPARE(serverSpy.at(1).at(4).toBool(), true);
}
void OperationTests::testMultiSubscription()
{
MqttClient *client = connectAndWait("subscription-topics");
QSignalSpy subscribedSpy(client, &MqttClient::subscribed);
MqttSubscriptions subscriptions = { MqttSubscription("topic1"), MqttSubscription("topic2") , MqttSubscription("#invalid") };
Mqtt::SubscribeReturnCodes subscriptionReturnCodes = { Mqtt::SubscribeReturnCodeSuccessQoS0, Mqtt::SubscribeReturnCodeSuccessQoS0, Mqtt::SubscribeReturnCodeFailure};
client->subscribe(subscriptions);
QTRY_VERIFY2(subscribedSpy.count() == 1, "Subscribed signal not received");
Mqtt::SubscribeReturnCodes retCodes = subscribedSpy.first().at(1).value<Mqtt::SubscribeReturnCodes>();
QCOMPARE(retCodes, subscriptionReturnCodes);
}
void OperationTests::testSubscriptionTopicFilters_data()
{
QTest::addColumn<QString>("topicFilter");
QTest::addColumn<Mqtt::SubscribeReturnCode>("subscriptionReturnCode");
QTest::newRow("a") << "a" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("/") << "/" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("/a") << "/a" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("//") << "//" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("/a/") << "/a/" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("/a/b") << "/a/b" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("//b") << "//b" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("#") << "#" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("a/#") << "a/#" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("a/b#") << "a/b#" << Mqtt::SubscribeReturnCodeFailure;
QTest::newRow("a/b/#/c") << "a/b/#/c" << Mqtt::SubscribeReturnCodeFailure;
QTest::newRow("+") << "+" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("+/a/#") << "+/a/#" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("a+") << "a+" << Mqtt::SubscribeReturnCodeFailure;
QTest::newRow("a/+/b") << "a/+/b" << Mqtt::SubscribeReturnCodeSuccessQoS0;
QTest::newRow("+/a/#") << "+/a/#" << Mqtt::SubscribeReturnCodeSuccessQoS0;
}
void OperationTests::testSubscriptionTopicFilters()
{
QFETCH(QString, topicFilter);
QFETCH(Mqtt::SubscribeReturnCode, subscriptionReturnCode);
MqttClient *client = connectAndWait("subscription-topics");
QSignalSpy subscribedSpy(client, &MqttClient::subscribed);
client->subscribe(topicFilter);
QTRY_VERIFY2(subscribedSpy.count() == 1, "Subscribed signal not received");
Mqtt::SubscribeReturnCodes retCodes = subscribedSpy.first().at(1).value<Mqtt::SubscribeReturnCodes>();
QCOMPARE(retCodes.first(), subscriptionReturnCode);
}
void OperationTests::testSubscriptionTopicMatching_data()
{
QTest::addColumn<QString>("topicFilter");
QTest::addColumn<QString>("topic");
QTest::addColumn<int>("receivedPublishMessageCount");
QList<QStringList> rows;
rows.append({ "a", "a", "1" });
rows.append({ "a", "b", "0" });
rows.append({ "/", "/" , "1" });
rows.append({ "/", "/a" , "0" });
rows.append({ "#", "a", "1" });
rows.append({ "#", "a/b", "1" });
rows.append({ "+", "a", "1" });
rows.append({ "+", "a/", "0" });
rows.append({ "+", "/a" , "0" });
rows.append({ "+", "a/b", "0" });
rows.append({ "/#", "/" , "1" });
rows.append({ "/+", "/a" , "1" });
rows.append({ "/a", "/a" , "1" });
rows.append({ "/a", "/a" , "1" });
rows.append({ "a/+", "a", "0" });
rows.append({ "a/+", "a/", "1" });
rows.append({ "a/+", "a/b", "1" });
rows.append({ "a/+", "a/b/c", "0" });
rows.append({ "+/+", "/a" , "1" });
rows.append({ "+/+", "/a" , "1" });
rows.append({ "+/+", "a/" , "1" });
rows.append({ "a/#", "a/b", "1" });
rows.append({ "a/#", "a", "1" });
rows.append({ "a/#", "/", "0" });
rows.append({ "a/#", "a/b/c", "1" });
rows.append({ "a/#", "b/c", "0" });
rows.append({ "a//", "a//", "1" });
rows.append({ "a//#", "a//b", "1" });
rows.append({ "a/b/+", "a/b/c", "1" });
rows.append({ "a/b/+", "a/b/d", "1" });
rows.append({ "a/b/+", "a/b/c/d", "0" });
rows.append({ "+/a/#", "a/a/b", "1" });
rows.append({ "+/a/#", "a/a/b/c", "1" });
rows.append({ "+/a/#", "d/a/b/c", "1" });
rows.append({ "+/a/#", "a/b/c/d", "0" });
rows.append({ "a/b/#", "a/b/c", "1" });
rows.append({ "a//+/", "a//b/", "1" });
rows.append({ "a//+/", "a///", "1" });
rows.append({ "a//+/#", "a//b/c", "1" });
rows.append({ "a//+/#", "a/b/c/d", "0" });
rows.append({ "$SYS/", "$SYS/", "0" });
rows.append({ "#", "$SYS/", "0" });
rows.append({ "+/", "$SYS/", "0" });
foreach (const QStringList &row, rows) {
QTest::newRow(QString("%1, %2").arg(row.at(0), row.at(1)).toUtf8().data()) << row.at(0) << row.at(1) << row.at(2).toInt();
}
}
void OperationTests::testSubscriptionTopicMatching()
{
QFETCH(QString, topicFilter);
QFETCH(QString, topic);
QFETCH(int, receivedPublishMessageCount);
MqttClient *publisher = connectAndWait("publisher");
MqttClient *subscriber = connectAndWait("subscriber");
QSignalSpy subscribedSpy(subscriber, &MqttClient::subscribed);
QSignalSpy publishReceivedSpy(subscriber, &MqttClient::publishReceived);
QSignalSpy publishedSpy(publisher, &MqttClient::published);
subscriber->subscribe(topicFilter);
QTRY_VERIFY2(subscribedSpy.count() == 1, "Subscribed signal not received");
publisher->publish(topic, "testpayload");
QTRY_VERIFY2(publishedSpy.count() == 1, "Published signal not received");
if (receivedPublishMessageCount == 0) {
// Give it some time to wait for a publishReceived (It should not show up)
QTest::qWait(500);
} else if (publishReceivedSpy.count() == 0) {
publishReceivedSpy.wait();
}
QVERIFY2(publishReceivedSpy.count() == receivedPublishMessageCount, QString("PublishReceived signal not received the expected amount of time.\nActual: %1\nExpected: %2").arg(publishReceivedSpy.count()).arg(receivedPublishMessageCount).toUtf8().data());
}
void OperationTests::testSessionManagementDropOldSession()
{
MqttClient *client1Session1 = connectAndWait("client1");
client1Session1->setAutoReconnect(false);
QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribed);
client1Session1->subscribe("/testtopic");
QTRY_VERIFY(subscribeSpy.count() == 1);
QSignalSpy disconnectedSpy(client1Session1, &MqttClient::disconnected);
QPair<MqttClient*, QSignalSpy*> client1Session2 = connectToServer("client1");
if (client1Session2.second->count() == 0) {
client1Session2.second->wait();
}
QVERIFY2(!client1Session2.second->first().at(0).value<Mqtt::ConnackFlags>().testFlag(Mqtt::ConnackFlagSessionPresent), "Session present flag is set while it should not be.");
QTRY_VERIFY2(disconnectedSpy.count() == 1, "First instance didn't get disconnected when new instance connected.");
// Now connect with another client and post to testtopic. Client 1 should not get it because he didn't resume the session and didn't resubscribe
QSignalSpy client1PublishReceivedSpy(client1Session2.first, &MqttClient::publishReceived);
MqttClient *client2 = connectAndWait("client2");
client2->publish("/testtopic", "Hello world");
QTest::qWait(500);
QVERIFY2(client1PublishReceivedSpy.count() == 0, "Client 1 did receive the publish but it should not have.");
}
void OperationTests::testSessionManagementResumeOldSession()
{
MqttClient *client1Session1 = connectAndWait("client1");
client1Session1->setAutoReconnect(false);
QSignalSpy subscribeSpy(client1Session1, &MqttClient::subscribed);
client1Session1->subscribe("/testtopic");
QTRY_VERIFY(subscribeSpy.count() == 1);
QSignalSpy disconnectedSpy(client1Session1, &MqttClient::disconnected);
QPair<MqttClient*, QSignalSpy*> client1Session2 = connectToServer("client1", false);
if (client1Session2.second->count() == 0) {
client1Session2.second->wait();
}
QVERIFY2(client1Session2.second->first().at(0).value<Mqtt::ConnectReturnCode>() == Mqtt::ConnectReturnCodeAccepted, "Session hasn't been accepted.");
QVERIFY2(client1Session2.second->first().at(1).value<Mqtt::ConnackFlags>().testFlag(Mqtt::ConnackFlagSessionPresent), "Session present flag is not set while it should be.");
QTRY_VERIFY2(disconnectedSpy.count() == 1, "First instance didn't get disconnected when new instance connected.");
// Now connect with another client and post to testtopic. Client 1 should not get it because he didn't resume the session and didn't resubscribe
QSignalSpy client1PublishReceivedSpy(client1Session2.first, &MqttClient::publishReceived);
MqttClient *client2 = connectAndWait("client2");
client2->publish("/testtopic", "Hello world");
QTRY_VERIFY2(client1PublishReceivedSpy.count() == 1, "Client 1 did not receive the publish but it should have.");
}
void OperationTests::testSessionManagementFailResumeOldSession()
{
// try to resume non existing session
QPair<MqttClient*, QSignalSpy*> client = connectToServer("client1", false);
if (client.second->count() == 0) {
client.second->wait();
}
QVERIFY2(!client.second->first().at(0).value<Mqtt::ConnackFlags>().testFlag(Mqtt::ConnackFlagSessionPresent), "Session present flag is set while it should not be.");
}
void OperationTests::testQoS1PublishToServerIsAckedOnSessionResume()
{
MqttClient *client = connectAndWait("client1", true);
client->setAutoReconnect(true);
QSignalSpy reconnectedSpy(client, &MqttClient::connected);
QSignalSpy publishedSpy(client, &MqttClient::published);
client->publish("/testtopic", "Hello world", Mqtt::QoS1);
client->d_ptr->socket->flush();
client->d_ptr->socket->abort();
QVERIFY2(publishedSpy.count() == 0, "Should not have received the PUBACK yet... Test is bad.");
QTRY_VERIFY2(reconnectedSpy.count() == 1, "client didn't reconnect");
QTRY_VERIFY2(publishedSpy.count() == 1, "Published signal not emitted after reconnect");
}
void OperationTests::testQoS1PublishToClientIsDeliveredOnSessionResume()
{
MqttClient *oldClient1 = connectAndWait("client1", true);
QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribed);
oldClient1->subscribe("/testtopic", Mqtt::QoS1);
QTRY_VERIFY(subscribedSpy.count() == 1);
// prevent the client from receiving anything
oldClient1->d_ptr->socket->blockSignals(true);
// pbulish something with a second client
MqttClient *client2 = connectAndWait("client2");
QSignalSpy publishedSpy(client2, &MqttClient::published);
client2->publish("/testtopic", "Hello world", Mqtt::QoS1);
QTRY_VERIFY(publishedSpy.count() == 1);
// Resume (take over) old session and make sure we got the publish
MqttClient *newClient1 = new MqttClient("client1", this);
m_clients.append(newClient1); // let cleanupTestcase() clean it up
QSignalSpy publishReceivedSpy(newClient1, &MqttClient::publishReceived);
newClient1->connectToHost(m_serverHost, m_serverPort, false);
QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Client did not receive publish packet upon session resume");
}
void OperationTests::testQoS2PublishToServerIsCompletedOnSessionResume()
{
MqttClient *client = connectAndWait("client1", true);
client->setAutoReconnect(true);
QSignalSpy reconnectedSpy(client, &MqttClient::connected);
QSignalSpy publishedSpy(client, &MqttClient::published);
client->publish("/testtopic", "Hello world", Mqtt::QoS2);
client->d_ptr->socket->flush();
client->d_ptr->socket->abort();
QVERIFY2(publishedSpy.count() == 0, "Should not have received the PUBACK yet... Test is bad.");
QTRY_VERIFY2(reconnectedSpy.count() == 1, "client didn't reconnect");
QTRY_VERIFY2(publishedSpy.count() == 1, "Published signal not emitted after reconnect");
}
void OperationTests::testQoS2PublishToClientIsCompletedOnSessionResume()
{
MqttClient *oldClient1 = connectAndWait("client1", true);
QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribed);
oldClient1->subscribe("/testtopic", Mqtt::QoS2);
QTRY_VERIFY(subscribedSpy.count() == 1);
// prevent the client from receiving anything
oldClient1->d_ptr->socket->blockSignals(true);
// pbulish something with a second client
MqttClient *client2 = connectAndWait("client2");
QSignalSpy publishedSpy(client2, &MqttClient::published);
client2->publish("/testtopic", "Hello world", Mqtt::QoS2);
QTRY_VERIFY(publishedSpy.count() == 1);
// Resume (take over) old session and make sure we got the publish
MqttClient *newClient1 = new MqttClient("client1", this);
m_clients.append(newClient1); // let cleanupTestcase() clean it up
QSignalSpy publishReceivedSpy(newClient1, &MqttClient::publishReceived);
newClient1->connectToHost(m_serverHost, m_serverPort, false);
QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Client did not receive publish packet upon session resume");
}
void OperationTests::testRetain()
{
MqttClient *client1 = connectAndWait("client1", true);
// post a retained message
QSignalSpy publishedSpy(client1, &MqttClient::published);
client1->publish("/retaintopic", "Message 1", Mqtt::QoS1, true);
QTRY_VERIFY(publishedSpy.count() == 1);
// Connect a second client
MqttClient *client2 = connectAndWait("client2");
// subscribe to topic and verify we received the retained message
QSignalSpy publishReceivedSpy(client2, &MqttClient::publishReceived);
client2->subscribe("/retaintopic", Mqtt::QoS1);
QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Did not receive retained topic on subscribe.");
QVERIFY2(publishReceivedSpy.first().at(2).toBool() == true, "Retain flag not set");
publishReceivedSpy.clear();
// Post another retained message from client1 and make sure we receive it
client1->publish("/retaintopic", "Message 2", Mqtt::QoS1, true);
QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Did not receive published meessage.");
QVERIFY2(publishReceivedSpy.first().at(2).toBool() == false, "Retain flag is set");
// Disconnect client, and connect again, verify we get 2 retained messages now
disconnectAndWait(client2);
client2 = connectAndWait("client2");
QSignalSpy publishReceivedSpy2(client2, &MqttClient::publishReceived);
client2->subscribe("/retaintopic", Mqtt::QoS1);
QTRY_VERIFY2(publishReceivedSpy2.count() == 2, "Did not receive retained topic on subscribe.");
QVERIFY2(publishReceivedSpy2.at(0).at(2).toBool() == true, "Retain flag not set");
QVERIFY2(publishReceivedSpy2.at(1).at(2).toBool() == true, "Retain flag not set");
publishReceivedSpy2.clear();
// Post a message with 0 paylod, it should be delivered as normal but discard any retained messages
client1->publish("/retaintopic", QByteArray(), Mqtt::QoS1, true);
QTRY_VERIFY2(publishReceivedSpy2.count() == 1, "Did not receive published message.");
QVERIFY2(publishReceivedSpy.first().at(2).toBool() == false, "Retain flag is set");
disconnectAndWait(client2);
client2 = connectAndWait("client2");
QSignalSpy publishReceivedSpy3(client2, &MqttClient::publishReceived);
client2->subscribe("/retaintopic", Mqtt::QoS1);
QTest::qWait(500);
QVERIFY2(publishReceivedSpy3.count() == 0, "Did receive retained messages on subscribe but should not have.");
// post another 2 retained messages (and some others), reconnect and verify they're there again
client1->publish("/retaintopic", "Message 3", Mqtt::QoS1, true);
client1->publish("/retaintopic", "Message 4", Mqtt::QoS1, false);
client1->publish("/retaintopic", "Message 5", Mqtt::QoS1, false);
client1->publish("/retaintopic", "Message 6", Mqtt::QoS1, true);
client1->publish("/retaintopic", "Message 7", Mqtt::QoS1, false);
QTRY_VERIFY(publishReceivedSpy3.count() == 5);
disconnectAndWait(client2);
client2 = connectAndWait("client2");
QSignalSpy publishReceivedSpy4(client2, &MqttClient::publishReceived);
client2->subscribe("/retaintopic", Mqtt::QoS1);
QTRY_VERIFY2(publishReceivedSpy4.count() == 2, "Did not receive retained messages.");
publishReceivedSpy4.clear();
// post a QoS0 message to this topic. it should discard previously retained messages but stay retained
client1->publish("/retaintopic", "Message 8", Mqtt::QoS0, true);
QTRY_VERIFY2(publishReceivedSpy4.count() == 1, "Did not receive retained messages.");
disconnectAndWait(client2);
client2 = connectAndWait("client2");
QSignalSpy publishReceivedSpy5(client2, &MqttClient::publishReceived);
client2->subscribe("/retaintopic", Mqtt::QoS1);
QTRY_VERIFY2(publishReceivedSpy5.count() == 1, "Did not receive exactly 1 retained message.");
}
void OperationTests::testUnsubscribe()
{
MqttClient *client1 = connectAndWait("client1");
QVERIFY(subscribeAndWait(client1, "testtopic"));
QSignalSpy publishReceivedSpy(client1, &MqttClient::publishReceived);
MqttClient *client2 = connectAndWait("client2");
client2->publish("testtopic", "Hello world");
QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Did not receive publish message");
QSignalSpy unsubscribedSpy(client1, &MqttClient::unsubscribed);
QSignalSpy serverSideUnsubscribedSpy(m_server, &MqttServer::clientUnsubscribed);
quint16 packetId = client1->unsubscribe("testtopic");
QTRY_VERIFY2(serverSideUnsubscribedSpy.count() == 1, "Server side unsubscribed signal not received");
QVERIFY2(serverSideUnsubscribedSpy.first().at(0).toString() == "client1", "ClientId not matching");
QVERIFY2(serverSideUnsubscribedSpy.first().at(1).toString() == "testtopic", "topicFilter not matching");
QTRY_VERIFY2(unsubscribedSpy.count() == 1, "Unsubscibed signal not emitted");
QVERIFY2(unsubscribedSpy.first().at(0).toInt() == packetId, "packet id not matching");
publishReceivedSpy.clear();
client2->publish("testtopic", "Hello world 2");
QTest::qWait(500);
QVERIFY2(publishReceivedSpy.count() == 0, "Received publish packet even though we should not have");
}
void OperationTests::testEmptyClientId()
{
MqttClient *client1 = connectAndWait("");
QVERIFY2(client1->isConnected(), "Client did not connect");
MqttClient *client2 = connectAndWait("");
QVERIFY2(client2->isConnected(), "Client did not connect");
QPair<MqttClient*, QSignalSpy*> client3 = connectToServer("", false);
QTRY_VERIFY2(client3.second->count() == 1, "Client did not emit connected signal");
QTRY_COMPARE(client3.second->first().at(0).value<Mqtt::ConnectReturnCode>(), Mqtt::ConnectReturnCodeIdentifierRejected);
}
QTEST_MAIN(OperationTests)
#include "test_operation.moc"

3
tests/tests.pro Normal file
View File

@ -0,0 +1,3 @@
TEMPLATE = subdirs
SUBDIRS += operation