Merge PR #22: Add support for websocket connections

This commit is contained in:
jenkins 2022-06-07 19:42:55 +02:00
commit 3507067252
43 changed files with 2433 additions and 342 deletions

View File

@ -1,5 +1,13 @@
# nymea-mqtt
Nymea MQTT broker
The nymea MQTT broker consists of a library containing a MqttClient and a MqttServer implementation.
The nymea MQTT broker consists of a Qt library containing a MqttClient and a MqttServer implementation.
It can be used standalone or integrated in other applications.
The currently supported MQTT protocol versions are 3.1.0 and 3.1.1.
Both, the client and the server support raw MQTT over TCP as well as MQTT over web socket. Both transports
can be used with or without SSL encryption.
Please refer to the server and client directories for minimalistic, yet fully featured examples on how to
use the library.

16
client/client.pro Normal file
View File

@ -0,0 +1,16 @@
TEMPLATE = app
TARGET = nymea-mqtt-client
include(../nymea-mqtt.pri)
QT += network
QT -= gui
INCLUDEPATH += $$top_srcdir/libnymea-mqtt/
SOURCES += main.cpp
LIBS += -L$${top_builddir}/libnymea-mqtt -lnymea-mqtt
target.path = $$[QT_INSTALL_PREFIX]/bin
INSTALLS += target

178
client/main.cpp Normal file
View File

@ -0,0 +1,178 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU General Public License Usage
* Alternatively, this project may be redistributed and/or modified under
* the terms of the GNU General Public License as published by the Free Software Foundation,
* GNU version 3. this project is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqttclient.h"
#include <QCoreApplication>
#include <QCommandLineParser>
#include <QUuid>
#include <QHostAddress>
int main(int argc, char *argv[]) {
QCoreApplication *app = new QCoreApplication(argc, argv);
QCommandLineParser parser;
parser.addPositionalArgument("server", "The server address.");
parser.addOptions({
{{"clientid", "c"}, "The client ID to use for the connection (default: autogenerated).", "clientId", QUuid::createUuid().toString()},
{{"username", "u"}, "The user name to use for the connection.", "username", ""},
{{"password", "P"}, "The password to use for the connection.", "password", ""},
{{"subscribe", "s"}, "Subscribe to a topic filter.", "topicfilter"},
{{"publish", "p"}, "Publish to topic.", "topic"},
{{"payload", "l"}, "Publish payload (requires -p).", "payload"},
{{"retain", "r"}, "Retain flag for publishes (default: no retaining)."},
{{"qos", "q"}, "QoS (default: 1).", "QoS", "1"},
{{"ssl", "S"}, "Use SSL for TCP connections (default: off) (Websocket connections will determine the use of SSL using the url scheme)."},
{{"accept-self-signed-certificate", "A"}, "Ignore self signed certificate errors"},
{{"verbose", "v"}, "Be more verbose"}
});
parser.addHelpOption();
parser.process(app->arguments());
if (parser.positionalArguments().count() < 1) {
parser.showHelp(-1);
}
QString verbose = parser.isSet("verbose") ? "true" : "false";
QLoggingCategory::setFilterRules(QString("nymea.mqtt.client.debug=%1\nnymea.mqtt.client.warning=%2\n").arg(verbose).arg(verbose));
bool ok;
int qosInt = parser.value("qos").toInt(&ok);
if (!ok || qosInt < 0 || qosInt > 2) {
qCritical() << "Invalid QoS option. Options are 0, 1 or 2.";
exit(EXIT_FAILURE);
}
Mqtt::QoS qos = static_cast<Mqtt::QoS>(qosInt);
bool retain = parser.isSet("retain");
MqttClient client(parser.value("clientid"), app);
QObject::connect(&client, &MqttClient::error, app, [&client, app](QAbstractSocket::SocketError socketError){
qCritical() << "Connection error:" << socketError;
client.setAutoReconnect(false);
app->quit();
});
QObject::connect(&client, &MqttClient::sslErrors, app, [&client, &parser](const QList<QSslError> &sslErrors){
QSslCertificate certificate = sslErrors.first().certificate();
if (parser.isSet("accept-self-signed-certificate")) {
QList<QSslError> copy = sslErrors;
copy.removeOne(QSslError(QSslError::HostNameMismatch, certificate));
copy.removeAll(QSslError(QSslError::SelfSignedCertificate, certificate));
if (copy.isEmpty()) {
qInfo() << "Accepting self signed certificate";
client.ignoreSslErrors();
return;
}
}
qWarning() << "SSL errors for certificate:" << qUtf8Printable(certificate.toText()) << sslErrors;
});
QObject::connect(&client, &MqttClient::connected, app, [&parser, &client, qos, retain](Mqtt::ConnectReturnCode connectReturnCode, Mqtt::ConnackFlags /*connackFlags*/){
switch (connectReturnCode) {
case Mqtt::ConnectReturnCodeIdentifierRejected:
qCritical() << "Connection failed: Identifier rejected";
exit(EXIT_FAILURE);
case Mqtt::ConnectReturnCodeUnacceptableProtocolVersion:
qCritical() << "Connection failed: Unsupported MQTT protocol version";
exit(EXIT_FAILURE);
case Mqtt::ConnectReturnCodeBadUsernameOrPassword:
qCritical() << "Connection failed: Bad username or password";
exit(EXIT_FAILURE);
case Mqtt::ConnectReturnCodeNotAuthorized:
qCritical() << "Connection failed: Not authorized";
exit(EXIT_FAILURE);
case Mqtt::ConnectReturnCodeServerUnavailable:
qCritical() << "Connection failed: Server unavailable";
exit(EXIT_FAILURE);
default:
qInfo() << "Connected to server.";
}
foreach (const QString &topicFilter, parser.values("subscribe")) {
qDebug() << "Subscribing to" << topicFilter;
client.subscribe(topicFilter, qos);
}
for (int i = 0; i < parser.values("publish").length(); i++) {
QString topic = parser.values("publish").at(0);
QByteArray payload;
if (parser.values("payload").length() > i) {
payload = parser.values("payload").at(i).toUtf8();
}
qDebug().nospace() << "Publishing to " << topic << (!payload.isEmpty() ? ": " + payload : "");
client.publish(parser.value("publish"), parser.value("payload").toUtf8(), qos, retain);
}
});
QObject::connect(&client, &MqttClient::subscribed, app, [](const QString &topic, Mqtt::SubscribeReturnCode subscribeReturnCode){
if (subscribeReturnCode == Mqtt::SubscribeReturnCodeFailure) {
qWarning() << "Subscribing to topic" << topic << "failed.";
} else {
qInfo() << "Subscribed to topic filter" << topic << "with QoS" << subscribeReturnCode;
}
});
QObject::connect(&client, &MqttClient::published, app, [](quint16 /*packetId*/, const QString &topic){
qInfo() << "Published to topic" << topic;
});
QObject::connect(&client, &MqttClient::publishReceived, app, [](const QString &topic, const QByteArray &payload, bool retained){
qInfo().nospace() << "Publish received on topic " << topic << ": " << payload << (retained ? " (retained message)" : "");
});
QString server = parser.positionalArguments().at(0);
if (!server.startsWith("ws://") && !server.startsWith("wss://")) {
server.prepend("mqtt://");
}
QUrl serverUrl = QUrl(server);
if (!serverUrl.isValid() || !QStringList({"mqtt", "ws", "wss"}).contains(serverUrl.scheme())) {
qCritical() << "Invalid server argument. Examples:";
qCritical() << "192.168.0.1:1883";
qCritical() << "example.com:1883";
qCritical() << "ws://192.168.0.1:80";
qCritical() << "wss://example.com:443";
exit(EXIT_FAILURE);
}
if (parser.isSet("username")) {
client.setUsername(parser.value("username"));
}
if (parser.isSet("password")) {
client.setPassword(parser.value("password"));
}
if (serverUrl.scheme() == "mqtt") {
qDebug() << "Connecting to server" << serverUrl.host() << serverUrl.port(1883);
client.connectToHost(serverUrl.host(), serverUrl.port(1883), true, parser.isSet("ssl"));
} else {
qDebug() << "Connecting to web socket server" << serverUrl.toString();
QNetworkRequest request(serverUrl);
client.connectToHost(request);
}
return app->exec();
}

23
debian/control vendored
View File

@ -1,8 +1,10 @@
Source: nymea-mqtt
Section: comm
Priority: optional
Maintainer: Michael Zanetti <michael.zanetti@guh.io>
Maintainer: nymea GmbH <developer@nymea.io>
Build-Depends: debhelper,
libssl-dev,
libqt5websockets5-dev,
qtbase5-dev,
Standards-Version: 4.0.0
Homepage: http://nymea.io
@ -13,8 +15,8 @@ Multi-Arch: same
Depends:
${misc:Depends},
${shlibs:Depends},
Description: nymea-mqtt libraries
nymea-mqtt is a mqtt broker implementation
Description: nymea-mqtt library
nymeas mqtt implementation for mqtt client and server development.
Package: libnymea-mqtt-dev
Section: devel
@ -24,8 +26,8 @@ Depends:
libnymea-mqtt (=${binary:Version}),
${misc:Depends},
Description: nymea-mqtt libaries - development files
nymea-mqtt is a mqtt broker implementation. This package contains
related development files.
nymeas mqtt implementation for mqtt client and server development.
This package contains related development files.
Package: nymea-mqtt-server
Architecture: any
@ -33,5 +35,12 @@ Depends:
${misc:Depends},
${shlibs:Depends},
Description: nymea-mqtt standalone server
nymea-mqtt is a mqtt broker implementation. This package contains
a standalone mqtt server.
nymeas mqtt implementation. This package contains a standalone mqtt server.
Package: nymea-mqtt-client
Architecture: any
Depends:
${misc:Depends},
${shlibs:Depends},
Description: nymea-mqtt command line client
nymeas mqtt implementation. This package contains a command line mqtt client.

1
debian/nymea-mqtt-client.install vendored Normal file
View File

@ -0,0 +1 @@
/usr/bin/nymea-mqtt-client

View File

@ -1 +1 @@
/usr/bin/nymea-mqttserver
/usr/bin/nymea-mqtt-server

View File

@ -1,28 +1,47 @@
# Include this file in your project if you want to
# statically link to libnymea-mqtt
TEMPLATE = lib
TARGET = nymea-mqtt
QT -= gui
QT += network
QT += network websockets
CONFIG += c++11 console static
CONFIG -= app_bundle
SOURCES += \
mqttserver.cpp \
mqttpacket.cpp \
mqttsubscription.cpp \
mqttclient.cpp
mqttserver.cpp \
mqttclient.cpp \
transports/mqttservertransport.cpp \
transports/mqtttcpservertransport.cpp \
transports/mqttwebsocketservertransport.cpp \
transports/mqttclienttransport.cpp \
transports/mqtttcpclienttransport.cpp \
transports/mqttwebsocketclienttransport.cpp \
PRIVATE_HEADERS = \
mqttpacket_p.h \
mqttclient_p.h \
mqttserver_p.h
mqttserver_p.h \
transports/mqttservertransport.h \
transports/mqtttcpservertransport.h \
transports/mqttwebsocketservertransport.h \
transports/mqttclienttransport.h \
transports/mqtttcpclienttransport.h \
transports/mqttwebsocketclienttransport.h \
PUBLIC_HEADERS = \
mqttserver.h \
mqttpacket.h \
mqtt.h \
mqttpacket.h \
mqttsubscription.h \
mqttserver.h \
mqttclient.h \
HEADERS += $$PRIVATE_HEADERS $$PUBLIC_HEADERS
# https://bugreports.qt.io/browse/QTBUG-83165
android: {
DESTDIR = $${ANDROID_TARGET_ARCH}
OBJECTS_DIR = $${ANDROID_TARGET_ARCH}
}

View File

@ -70,6 +70,7 @@ enum ConnectReturnCode {
ConnectReturnCodeBadUsernameOrPassword = 0x04,
ConnectReturnCodeNotAuthorized = 0x05
};
enum SubscribeReturnCode {
SubscribeReturnCodeSuccessQoS0 = 0x00,
SubscribeReturnCodeSuccessQoS1 = 0x01,

View File

@ -1,6 +1,6 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2020, nymea GmbH
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
@ -31,19 +31,24 @@
\inmodule nymea-mqtt
\ingroup mqtt
MqttClient is used to connect to MQTT servers/brokers. The currently supported
MQTT protocol version is 3.1.1 including SSL encryption support.
MqttClient is used to connect to MQTT servers/brokers.
The currently supported MQTT protocol version is 3.1.1.
The currently supported transports are TCP socket and WebSocket with SSL encryption.
*/
#include "mqttclient.h"
#include "mqttclient_p.h"
#include "mqttpacket.h"
#include "transports/mqtttcpclienttransport.h"
#include "transports/mqttwebsocketclienttransport.h"
Q_LOGGING_CATEGORY(dbgClient, "nymea.mqtt.client")
MqttClientPrivate::MqttClientPrivate(MqttClient *q):
QObject(q),
q_ptr(q)
{
qRegisterMetaType<Mqtt::ConnectReturnCode>();
qRegisterMetaType<Mqtt::SubscribeReturnCodes>();
qRegisterMetaType<Mqtt::ConnackFlags>();
reconnectTimer.setSingleShot(true);
@ -53,49 +58,54 @@ MqttClientPrivate::MqttClientPrivate(MqttClient *q):
void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, bool cleanSession, bool useSsl, const QSslConfiguration &sslConfiguration)
{
if (serverHostname != hostName || serverPort != port || this->useSsl != useSsl || sslConfiguration != this->sslConfiguration) {
serverHostname = hostName;
serverPort = port;
this->useSsl = useSsl;
this->sslConfiguration = sslConfiguration;
MqttTcpClientTransport *tcpTransport = new MqttTcpClientTransport(hostName, port, useSsl, sslConfiguration, this);
connectToHost(tcpTransport, cleanSession);
}
void MqttClientPrivate::connectToHost(const QNetworkRequest &request, bool cleanSession)
{
MqttWebSocketClientTransport *webSocketTransport = new MqttWebSocketClientTransport(request, this);
connectToHost(webSocketTransport, cleanSession);
}
void MqttClientPrivate::connectToHost(MqttClientTransport *transport, bool cleanSession)
{
if (this->transport != transport) {
reconnectAttempt = 1;
reconnectTimer.stop();
if (this->transport) {
this->transport->abort();
this->transport->deleteLater();
}
this->transport = transport;
connect(transport, &MqttClientTransport::connected, this, &MqttClientPrivate::onConnected);
connect(transport, &MqttClientTransport::disconnected, this, &MqttClientPrivate::onDisconnected);
connect(transport, &MqttClientTransport::dataReceived, this, &MqttClientPrivate::onDataReceived);
connect(transport, &MqttClientTransport::stateChanged, this, &MqttClientPrivate::onSocketStateChanged);
connect(transport, &MqttClientTransport::errorSignal, this, &MqttClientPrivate::onSocketError);
connect(transport, &MqttClientTransport::sslErrors, this, &MqttClientPrivate::onSslErrors);
}
this->cleanSession = cleanSession;
sessionActive = true;
if (socket) {
socket->abort();
socket->deleteLater();
}
socket = new QSslSocket(this);
socket->setSslConfiguration(sslConfiguration);
connect(socket, &QTcpSocket::connected, this, &MqttClientPrivate::onConnected);
connect(socket, &QTcpSocket::disconnected, this, &MqttClientPrivate::onDisconnected);
connect(socket, &QTcpSocket::readyRead, this, &MqttClientPrivate::onReadyRead);
connect(socket, &QTcpSocket::stateChanged, this, &MqttClientPrivate::onSocketStateChanged);
typedef void (QSslSocket:: *sslErrorsSignal)(const QList<QSslError> &);
connect(socket, static_cast<sslErrorsSignal>(&QSslSocket::sslErrors), this, &MqttClientPrivate::onSslErrors);
typedef void (QSslSocket:: *errorSignal)(QAbstractSocket::SocketError);
connect(socket, static_cast<errorSignal>(&QSslSocket::error), this, &MqttClientPrivate::onSocketError);
if (useSsl) {
socket->connectToHostEncrypted(hostName, port);
} else {
socket->connectToHost(hostName, port);
}
transport->connectToHost();
}
void MqttClientPrivate::disconnectFromHost()
{
sessionActive = false;
if (!socket || !socket->isOpen()) {
if (!transport || !transport->isOpen()) {
return;
}
MqttPacket packet(MqttPacket::TypeDisconnect);
socket->write(packet.serialize());
socket->flush();
socket->disconnectFromHost();
transport->write(packet.serialize());
transport->flush();
transport->disconnectFromHost();
}
/*!
@ -229,6 +239,11 @@ void MqttClient::connectToHost(const QString &hostName, quint16 port, bool clean
d_ptr->connectToHost(hostName, port, cleanSession, useSsl, sslConfiguration);
}
void MqttClient::connectToHost(const QNetworkRequest &request, bool cleanSession)
{
d_ptr->connectToHost(request, cleanSession);
}
void MqttClient::disconnectFromHost()
{
d_ptr->disconnectFromHost();
@ -236,7 +251,12 @@ void MqttClient::disconnectFromHost()
bool MqttClient::isConnected() const
{
return d_ptr->socket && d_ptr->socket->state() == QAbstractSocket::ConnectedState && d_ptr->keepAliveTimer.isActive();
return d_ptr->transport && d_ptr->transport->state() == QAbstractSocket::ConnectedState && d_ptr->keepAliveTimer.isActive();
}
void MqttClient::ignoreSslErrors()
{
d_ptr->transport->ignoreSslErrors();
}
quint16 MqttClient::subscribe(const MqttSubscription &subscription)
@ -257,7 +277,7 @@ quint16 MqttClient::subscribe(const MqttSubscriptions &subscriptions)
packet.setSubscriptions(subscriptions);
d_ptr->unackedPackets.insert(packet.packetId(), packet);
d_ptr->unackedPacketList.append(packet.packetId());
d_ptr->socket->write(packet.serialize());
d_ptr->transport->write(packet.serialize());
return packet.packetId();
}
@ -278,7 +298,7 @@ quint16 MqttClient::unsubscribe(const MqttSubscriptions &subscriptions)
packet.setSubscriptions(subscriptions);
d_ptr->unackedPackets.insert(packet.packetId(), packet);
d_ptr->unackedPacketList.append(packet.packetId());
d_ptr->socket->write(packet.serialize());
d_ptr->transport->write(packet.serialize());
return packet.packetId();
}
@ -288,7 +308,7 @@ quint16 MqttClient::publish(const QString &topic, const QByteArray &payload, Mqt
MqttPacket packet(MqttPacket::TypePublish, packetId, qos, retain, false);
packet.setTopic(topic.toUtf8());
packet.setPayload(payload);
d_ptr->socket->write(packet.serialize());
d_ptr->transport->write(packet.serialize());
if (qos == Mqtt::QoS0) {
QTimer::singleShot(0, this, [this, packet](){
emit published(packet.packetId(), packet.topic());
@ -313,7 +333,7 @@ void MqttClientPrivate::onConnected()
packet.setWillRetain(willRetain);
packet.setUsername(username.toUtf8());
packet.setPassword(password.toUtf8());
socket->write(packet.serialize());
transport->write(packet.serialize());
}
void MqttClientPrivate::onDisconnected()
@ -322,30 +342,29 @@ void MqttClientPrivate::onDisconnected()
emit q_ptr->disconnected();
if (sessionActive && autoReconnect) {
reconnectAttempt = qMin(maxReconnectTimeout / 60 / 60, reconnectAttempt * 2);
qCDebug(dbgClient) << "Reconnecing in" << reconnectAttempt << "seconds";
qCDebug(dbgClient) << "Reconnecting in" << reconnectAttempt << "seconds";
reconnectTimer.setInterval(reconnectAttempt * 1000);
reconnectTimer.start();
}
}
void MqttClientPrivate::onReadyRead()
void MqttClientPrivate::onDataReceived(const QByteArray &data)
{
static QByteArray data;
data.append(socket->readAll());
inputBuffer.append(data);
// qCDebug(dbgClient) << "Received data from server:" << data.toHex() << "\n" << data;
MqttPacket packet;
int ret = packet.parse(data);
int ret = packet.parse(inputBuffer);
if (ret == -1) {
qCDebug(dbgClient) << "Bad data from server. Dropping connection.";
data.clear();
socket->abort();
inputBuffer.clear();
transport->abort();
return;
}
if (ret == 0) {
qCDebug(dbgClient) << "Not enough data from server...";
return;
}
data.remove(0, ret);
inputBuffer.remove(0, ret);
switch (packet.type()) {
case MqttPacket::TypeConnack:
@ -353,7 +372,7 @@ void MqttClientPrivate::onReadyRead()
qCWarning(dbgClient) << "MQTT connection refused:" << packet.connectReturnCode();
// Always emit connected, even if just to indicate a "ClientRefusedError"
emit q_ptr->connected(packet.connectReturnCode(), packet.connackFlags());
socket->abort();
transport->abort();
emit q_ptr->error(QAbstractSocket::ConnectionRefusedError);
return;
}
@ -362,7 +381,7 @@ void MqttClientPrivate::onReadyRead()
if (retryPacket.type() == MqttPacket::TypePublish) {
retryPacket.setDup(true);
}
socket->write(retryPacket.serialize());
transport->write(retryPacket.serialize());
}
restartKeepAliveTimer();
// Make sure we emit connected after having handled all the retransmission queue
@ -377,13 +396,13 @@ void MqttClientPrivate::onReadyRead()
case Mqtt::QoS1: {
emit q_ptr->publishReceived(packet.topic(), packet.payload(), packet.retain());
MqttPacket response(MqttPacket::TypePuback, packet.packetId());
socket->write(response.serialize());
transport->write(response.serialize());
break;
}
case Mqtt::QoS2: {
if (!packet.dup() && unackedPacketList.contains(packet.packetId())) {
// Hmm... Server says it's not a duplicate, but packet id is not released yet... Drop connection.
socket->disconnectFromHost();
transport->disconnectFromHost();
return;
}
@ -394,7 +413,7 @@ void MqttClientPrivate::onReadyRead()
unackedPacketList.append(packet.packetId());
emit q_ptr->publishReceived(packet.topic(), packet.payload(), packet.retain());
}
socket->write(response.serialize());
transport->write(response.serialize());
break;
}
}
@ -410,7 +429,7 @@ void MqttClientPrivate::onReadyRead()
MqttPacket publishPacket = unackedPackets.value(packet.packetId());
MqttPacket response(MqttPacket::TypePubrel, packet.packetId());
unackedPackets[packet.packetId()] = response;
socket->write(response.serialize());
transport->write(response.serialize());
emit q_ptr->published(packet.packetId(), publishPacket.topic());
restartKeepAliveTimer();
break;
@ -418,7 +437,7 @@ void MqttClientPrivate::onReadyRead()
case MqttPacket::TypePubrel: {
MqttPacket response(MqttPacket::TypePubcomp, packet.packetId());
unackedPackets[packet.packetId()] = response;
socket->write(response.serialize());
transport->write(response.serialize());
restartKeepAliveTimer();
break;
}
@ -433,7 +452,7 @@ void MqttClientPrivate::onReadyRead()
if (subscribePacket.subscriptions().count() != packet.subscribeReturnCodes().count()) {
qCWarning(dbgClient) << "Subscription return code count not matching subscribe packet!";
socket->abort();
transport->abort();
return;
}
@ -450,7 +469,7 @@ void MqttClientPrivate::onReadyRead()
case MqttPacket::TypeUnsuback:
if (!unackedPackets.contains(packet.packetId())) {
qCWarning(dbgClient) << "UNSUBACK received but not waiting for it. Dropping connection. Packet ID:" << packet.packetId();
socket->abort();
transport->abort();
return;
}
unackedPackets.remove(packet.packetId());
@ -465,8 +484,8 @@ void MqttClientPrivate::onReadyRead()
Q_ASSERT(false);
}
if (!data.isEmpty()) {
onReadyRead();
if (!inputBuffer.isEmpty()) {
onDataReceived(QByteArray());
}
}
@ -484,6 +503,7 @@ void MqttClientPrivate::onSocketError(QAbstractSocket::SocketError error)
void MqttClientPrivate::onSslErrors(const QList<QSslError> &errors)
{
qCWarning(dbgClient) << "SSL error in MQTT connection:" << errors;
emit q_ptr->sslErrors(errors);
}
quint16 MqttClientPrivate::newPacketId()
@ -498,7 +518,7 @@ quint16 MqttClientPrivate::newPacketId()
void MqttClientPrivate::sendPingreq()
{
MqttPacket packet(MqttPacket::TypePingreq);
socket->write(packet.serialize());
transport->write(packet.serialize());
}
void MqttClientPrivate::restartKeepAliveTimer()
@ -510,8 +530,9 @@ void MqttClientPrivate::restartKeepAliveTimer()
void MqttClientPrivate::reconnectTimerTimeout()
{
qCDebug(dbgClient()) << "Reconnecting now...";
if (!autoReconnect) {
return;
}
connectToHost(serverHostname, serverPort, false, useSsl, sslConfiguration);
connectToHost(transport, false);
}

View File

@ -1,6 +1,6 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2020, nymea GmbH
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
@ -31,6 +31,7 @@
#include <QObject>
#include <QAbstractSocket>
#include <QSslConfiguration>
#include <QNetworkRequest>
#include "mqttpacket.h"
#include "mqttsubscription.h"
@ -72,10 +73,13 @@ public:
void setPassword(const QString &password);
void connectToHost(const QString &hostName, quint16 port, bool cleanSession = true, bool useSsl = false, const QSslConfiguration &sslConfiguration = QSslConfiguration());
void connectToHost(const QNetworkRequest &request, bool cleanSession = true);
void disconnectFromHost();
bool isConnected() const;
void ignoreSslErrors();
public slots:
quint16 subscribe(const MqttSubscription &subscription);
quint16 subscribe(const QString &topicFilter, Mqtt::QoS qos = Mqtt::QoS0);
@ -92,6 +96,7 @@ signals:
void disconnected();
void stateChanged(QAbstractSocket::SocketState state);
void error(QAbstractSocket::SocketError socketError);
void sslErrors(const QList<QSslError> &sslErrors);
void subscribeResult(quint16 packetId, const Mqtt::SubscribeReturnCodes &subscribeReturnCodes);
void subscribed(const QString &topic, Mqtt::SubscribeReturnCode subscribeReturnCode);
@ -101,7 +106,7 @@ signals:
private:
MqttClientPrivate *d_ptr;
friend class OperationTests;
friend class MqttTests;
};
#endif // MQTTCLIENT_H

View File

@ -30,12 +30,14 @@
#include <QObject>
#include <QTcpSocket>
#include <QWebSocket>
#include <QTimer>
#include <QLoggingCategory>
#include "mqttpacket.h"
#include "mqttclient.h"
#include "mqttsubscription.h"
#include "mqttclient.h"
#include "transports/mqttclienttransport.h"
Q_DECLARE_LOGGING_CATEGORY(dbgClient)
@ -47,12 +49,14 @@ public:
MqttClient *q_ptr;
void connectToHost(const QString &hostName, quint16 port, bool cleanSession, bool useSsl, const QSslConfiguration &sslConfiguration);
void connectToHost(const QNetworkRequest &request, bool cleanSession);
void connectToHost(MqttClientTransport *transport, bool cleanSession = true);
void disconnectFromHost();
public slots:
void onConnected();
void onDisconnected();
void onReadyRead();
void onDataReceived(const QByteArray &data);
void onSocketStateChanged(QAbstractSocket::SocketState socketState);
void onSocketError(QAbstractSocket::SocketError error);
void onSslErrors(const QList<QSslError> &errors);
@ -64,14 +68,10 @@ public slots:
void reconnectTimerTimeout();
public:
QString serverHostname;
quint16 serverPort = 0;
bool useSsl = false;
QSslConfiguration sslConfiguration;
bool autoReconnect = true;
bool sessionActive = false;
bool cleanSession = true;
QSslSocket *socket = nullptr;
MqttClientTransport *transport = nullptr;
QTimer reconnectTimer;
int reconnectAttempt = 0;
quint16 maxReconnectTimeout = 36000;
@ -88,6 +88,8 @@ public:
QVector<quint16> unackedPacketList;
QHash<quint16, MqttPacket> unackedPackets;
QByteArray inputBuffer;
};
#endif // MQTTCLIENT_P_H

View File

@ -61,6 +61,8 @@
#include "mqttserver.h"
#include "mqttserver_p.h"
#include "transports/mqtttcpservertransport.h"
#include "transports/mqttwebsocketservertransport.h"
#include "mqttpacket.h"
#include <QDebug>
@ -78,10 +80,25 @@ MqttServerPrivate::MqttServerPrivate(MqttServer *q):
qRegisterMetaType<Mqtt::QoS>();
}
int MqttServerPrivate::listen(MqttServerTransport *transport, const QHostAddress &address, quint16 port)
{
connect(transport, &MqttServerTransport::clientConnected, this, &MqttServerPrivate::onClientConnected);
if (!transport->listen(address, port)) {
qCWarning(dbgServer) << "Error listening on port" << port;
transport->deleteLater();
return -1;
}
static int addressId = -1;
servers.insert(++addressId, transport);
qCDebug(dbgServer) << "nymea MQTT server running on" << address.toString() << ":" << port << "( Address ID" << addressId << ")";
return addressId;
}
QHash<QString, quint16> MqttServerPrivate::publish(const QString &topic, const QByteArray &payload)
{
QHash<QTcpSocket*, Mqtt::QoS> receivers;
foreach (QTcpSocket *c, clientList.keys()) {
QHash<MqttServerClient*, Mqtt::QoS> receivers;
foreach (MqttServerClient *c, clientList.keys()) {
foreach (const MqttSubscription &subscription, clientList.value(c)->subscriptions) {
if (matchTopic(subscription.topicFilter(), topic)) {
if (!receivers.contains(c) || receivers.value(c) < subscription.qoS()) {
@ -92,7 +109,7 @@ QHash<QString, quint16> MqttServerPrivate::publish(const QString &topic, const Q
}
QHash<QString, quint16> packets;
foreach (QTcpSocket *receiver, receivers.keys()) {
foreach (MqttServerClient *receiver, receivers.keys()) {
ClientContext *ctx = clientList.value(receiver);
qCDebug(dbgServer) << "Relaying packet to subscribed client:" << ctx->clientId;
Mqtt::QoS qos = receivers.value(receiver);
@ -139,26 +156,22 @@ void MqttServer::setAuthorizer(MqttAuthorizer *authorizer)
int MqttServer::listen(const QHostAddress &address, quint16 port, const QSslConfiguration &sslConfiguration)
{
SslServer *server = new SslServer(sslConfiguration, this);
connect(server, &SslServer::clientConnected, d_ptr, &MqttServerPrivate::onClientConnected);
connect(server, &SslServer::clientDisconnected, d_ptr, &MqttServerPrivate::onClientDisconnected);
connect(server, &SslServer::dataAvailable, d_ptr, &MqttServerPrivate::onDataAvailable);
qCDebug(dbgServer) << "Starting nymea MQTT server on TCP";
MqttServerTransport *transport = new MqttTcpServerTransport(sslConfiguration, this);
return d_ptr->listen(transport, address, port);
}
if (!server->listen(address, port)) {
qCWarning(dbgServer) << "Error listening on port" << port;
server->deleteLater();
return -1;
}
static int addressId = -1;
d_ptr->servers.insert(++addressId, server);
qCDebug(dbgServer) << "nymea MQTT server running on" << address.toString() << ":" << port << "( Address ID" << addressId << ")";
return addressId;
int MqttServer::listenWebSocket(const QHostAddress &address, quint16 port, const QSslConfiguration &sslConfiguration)
{
qCDebug(dbgServer) << "Starting nymea MQTT server on WebSocket";
MqttServerTransport *transport = new MqttWebSocketServerTransport(sslConfiguration, this);
return d_ptr->listen(transport, address, port);
}
bool MqttServer::isListening(const QHostAddress &address, quint16 port) const
{
foreach (SslServer *server, d_ptr->servers) {
if (server->serverAddress() == address && server->serverPort() == port && server->isListening()) {
foreach (MqttServerTransport *transport, d_ptr->servers) {
if (transport->serverAddress() == address && transport->serverPort() == port && transport->isListening()) {
return true;
}
}
@ -176,12 +189,12 @@ void MqttServer::close(int interfaceId)
qCWarning(dbgServer) << "No such server address ID" << interfaceId;
return;
}
SslServer *server = d_ptr->servers.take(interfaceId);
while (!d_ptr->clientServerMap.keys(server).isEmpty()) {
d_ptr->cleanupClient(d_ptr->clientServerMap.keys(server).first());
MqttServerTransport *transport = d_ptr->servers.take(interfaceId);
while (!d_ptr->clientServerMap.keys(transport).isEmpty()) {
d_ptr->cleanupClient(d_ptr->clientServerMap.keys(transport).first());
}
server->close();
server->deleteLater();
transport->close();
transport->deleteLater();
}
QStringList MqttServer::clients() const
@ -208,9 +221,12 @@ QHash<QString, quint16> MqttServer::publish(const QString &topic, const QByteArr
return d_ptr->publish(topic, payload);
}
void MqttServerPrivate::onClientConnected(QSslSocket *client)
void MqttServerPrivate::onClientConnected(MqttServerClient *client)
{
SslServer *server = static_cast<SslServer*>(sender());
connect(client, &MqttServerClient::dataAvailable, this, &MqttServerPrivate::onDataAvailable);
connect(client, &MqttServerClient::disconnected, this, &MqttServerPrivate::onClientDisconnected);
MqttServerTransport *transport = static_cast<MqttServerTransport*>(sender());
// Start a 10 second timer to clean up the connection if we don't get data until then.
QTimer *timeoutTimer = new QTimer(this);
@ -221,12 +237,14 @@ void MqttServerPrivate::onClientConnected(QSslSocket *client)
client->deleteLater();
});
timeoutTimer->start(10000);
clientServerMap.insert(client, server);
clientServerMap.insert(client, transport);
pendingConnections.insert(client, timeoutTimer);
}
void MqttServerPrivate::onDataAvailable(QSslSocket *client, const QByteArray &data)
void MqttServerPrivate::onDataAvailable(const QByteArray &data)
{
MqttServerClient *client = qobject_cast<MqttServerClient*>(sender());
clientBuffers[client].append(data);
do {
@ -256,12 +274,13 @@ void MqttServerPrivate::onDataAvailable(QSslSocket *client, const QByteArray &da
} while (!clientBuffers.value(client).isEmpty());
}
void MqttServerPrivate::onClientDisconnected(QSslSocket *client)
void MqttServerPrivate::onClientDisconnected()
{
MqttServerClient *client = qobject_cast<MqttServerClient*>(sender());
cleanupClient(client);
}
void MqttServerPrivate::cleanupClient(QTcpSocket *client)
void MqttServerPrivate::cleanupClient(MqttServerClient *client)
{
if (clientBuffers.contains(client)) {
clientBuffers.remove(client);
@ -302,7 +321,7 @@ void MqttServerPrivate::cleanupClient(QTcpSocket *client)
client->deleteLater();
}
void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *client)
void MqttServerPrivate::processPacket(const MqttPacket &packet, MqttServerClient *client)
{
if (packet.type() == MqttPacket::TypeConnect) {
if (clientList.contains(client)) {
@ -343,8 +362,8 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagPassword)) {
password = packet.password();
}
SslServer *server = clientServerMap.value(client);
int serverAddressId = servers.key(server);
MqttServerTransport *transport = clientServerMap.value(client);
int serverAddressId = servers.key(transport);
Mqtt::ConnectReturnCode userValidationReturnCode = authorizer->authorizeConnect(serverAddressId, clientId, username, password, client->peerAddress());
if (userValidationReturnCode != Mqtt::ConnectReturnCodeAccepted) {
qCWarning(dbgServer) << "Rejecting connection due to user validation.";
@ -357,9 +376,9 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie
ClientContext *ctx = nullptr;
QList<QTcpSocket*> existingSockets = clientList.keys();
for (int i = 0; i < existingSockets.count(); i++) {
QTcpSocket *existingClient = existingSockets.at(i);
QList<MqttServerClient*> existingClients = clientList.keys();
for (int i = 0; i < existingClients.count(); i++) {
MqttServerClient *existingClient = existingClients.at(i);
if (clientId == clientList.value(existingClient)->clientId) {
if (!packet.connectFlags().testFlag(Mqtt::ConnectFlagCleanSession)) {
qCDebug(dbgServer).nospace() << clientId << ": Already have a session for this client ID. Taking over existing session.";
@ -371,6 +390,7 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie
clientList.remove(existingClient);
clientBuffers.remove(existingClient);
existingClient->flush();
existingClient->abort();
existingClient->deleteLater();
} else {
qCDebug(dbgServer).nospace() << clientId << ": Already have a session for this client ID. Dropping old session.";
@ -690,40 +710,3 @@ quint16 MqttServerPrivate::newPacketId(ClientContext *ctx)
return packetId;
}
void SslServer::incomingConnection(qintptr socketDescriptor)
{
QSslSocket *sslSocket = new QSslSocket(this);
qCDebug(dbgServer) << "New client socket connection:" << sslSocket;
connect(sslSocket, &QSslSocket::encrypted, [this, sslSocket](){ emit clientConnected(sslSocket); });
connect(sslSocket, &QSslSocket::readyRead, this, &SslServer::onSocketReadyRead);
connect(sslSocket, &QSslSocket::disconnected, this, &SslServer::onClientDisconnected);
if (!sslSocket->setSocketDescriptor(socketDescriptor)) {
qCWarning(dbgServer) << "Failed to set SSL socket descriptor.";
delete sslSocket;
return;
}
if (!m_config.isNull()) {
sslSocket->setSslConfiguration(m_config);
sslSocket->startServerEncryption();
} else {
emit clientConnected(sslSocket);
}
}
void SslServer::onClientDisconnected()
{
QSslSocket *socket = static_cast<QSslSocket*>(sender());
qCDebug(dbgServer) << "Client socket disconnected:" << socket;
emit clientDisconnected(socket);
socket->deleteLater();
}
void SslServer::onSocketReadyRead()
{
QSslSocket *socket = static_cast<QSslSocket*>(sender());
QByteArray data = socket->readAll();
emit dataAvailable(socket, data);
}

View File

@ -29,9 +29,8 @@
#define MQTTSERVER_H
#include <QObject>
#include <QTcpServer>
#include <QTcpSocket>
#include <QTimer>
#include <QHostAddress>
#include <QLoggingCategory>
#include <QSslConfiguration>
@ -60,6 +59,7 @@ public:
void setAuthorizer(MqttAuthorizer *authorizer);
int listen(const QHostAddress &address = QHostAddress::Any, quint16 port = 1883, const QSslConfiguration &sslConfiguration = QSslConfiguration());
int listenWebSocket(const QHostAddress &address = QHostAddress::Any, quint16 port = 80, const QSslConfiguration &sslConfiguration = QSslConfiguration());
QList<int> listeningAddressIds() const;
QPair<QHostAddress, quint16> listeningAddress(int addressId);
void close(int addressId);

View File

@ -41,7 +41,8 @@ Q_DECLARE_LOGGING_CATEGORY(dbgServer)
class ClientContext;
class Subscription;
class SslServer;
class MqttServerTransport;
class MqttServerClient;
class MqttServerPrivate: public QObject
{
@ -49,34 +50,33 @@ class MqttServerPrivate: public QObject
public:
explicit MqttServerPrivate(MqttServer *q);
int listen(MqttServerTransport *transport, const QHostAddress &address, quint16 port);
QHash<QString, quint16> publish(const QString &topic, const QByteArray &payload = QByteArray());
void cleanupClient(MqttServerClient *client);
public:
void cleanupClient(QTcpSocket *client);
void processPacket(const MqttPacket &packet, QTcpSocket *client);
void processPacket(const MqttPacket &packet, MqttServerClient *client);
bool validateTopicFilter(const QString &topicFilter);
bool matchTopic(const QString &topicFilter, const QString &topic);
quint16 newPacketId(ClientContext *ctx);
public slots:
void onClientConnected(QSslSocket *client);
void onDataAvailable(QSslSocket *client, const QByteArray &data);
void onClientDisconnected(QSslSocket *client);
void onClientConnected(MqttServerClient *client);
void onDataAvailable(const QByteArray &data);
void onClientDisconnected();
public:
MqttServer *q_ptr;
QHash<int, SslServer*> servers;
QHash<int, MqttServerTransport*> servers;
MqttAuthorizer *authorizer = nullptr;
Mqtt::QoS maximumSubscriptionQoS = Mqtt::QoS2;
QHash<QTcpSocket*, QTimer*> pendingConnections;
QHash<QTcpSocket*, ClientContext*> clientList;
QHash<QTcpSocket*, QByteArray> clientBuffers;
QHash<MqttServerClient*, QTimer*> pendingConnections;
QHash<MqttServerClient*, ClientContext*> clientList;
QHash<MqttServerClient*, QByteArray> clientBuffers;
QHash<QString, MqttPackets> retainedMessages;
QHash<QTcpSocket*, SslServer*> clientServerMap;
QHash<MqttServerClient*, MqttServerTransport*> clientServerMap;
};
class ClientContext {
@ -98,31 +98,4 @@ public:
QHash<quint16, MqttPacket> unackedPackets;
};
class SslServer: public QTcpServer
{
Q_OBJECT
public:
SslServer(const QSslConfiguration &config, QObject *parent = nullptr):
QTcpServer(parent),
m_config(config)
{
}
signals:
void clientConnected(QSslSocket *socket);
void clientDisconnected(QSslSocket *socket);
void dataAvailable(QSslSocket *socket, const QByteArray &data);
protected:
void incomingConnection(qintptr socketDescriptor) override;
private slots:
void onClientDisconnected();
void onSocketReadyRead();
private:
QSslConfiguration m_config;
};
#endif // MQTTSERVER_P_H

View File

@ -0,0 +1,34 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqttclienttransport.h"
MqttClientTransport::MqttClientTransport(QObject *parent)
: QObject(parent)
{
}

View File

@ -0,0 +1,66 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#ifndef MQTTCLIENTTRANSPORT_H
#define MQTTCLIENTTRANSPORT_H
#include <QObject>
#include <QSslSocket>
#include <QSslConfiguration>
#include <QWebSocket>
#include <QNetworkRequest>
#include <QLoggingCategory>
Q_DECLARE_LOGGING_CATEGORY(dbgClient)
class MqttClientTransport : public QObject
{
Q_OBJECT
public:
explicit MqttClientTransport(QObject *parent = nullptr);
virtual ~MqttClientTransport() = default;
virtual void connectToHost() = 0;
virtual void abort() = 0;
virtual bool isOpen() const = 0;
virtual bool write(const QByteArray &data) = 0;
virtual void flush() = 0;
virtual void disconnectFromHost() = 0;
virtual QAbstractSocket::SocketState state() const = 0;
virtual void ignoreSslErrors() = 0;
signals:
void connected();
void disconnected();
void dataReceived(const QByteArray &data);
void stateChanged(QAbstractSocket::SocketState state);
void errorSignal(QAbstractSocket::SocketError error);
void sslErrors(const QList<QSslError> &sslErrors);
};
#endif // MQTTCLIENTTRANSPORT_H

View File

@ -0,0 +1,47 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqttservertransport.h"
#include <QTcpServer>
#include <QLoggingCategory>
Q_DECLARE_LOGGING_CATEGORY(dbgServer)
MqttServerClient::MqttServerClient(QObject *parent):
QObject(parent)
{
}
MqttServerTransport::MqttServerTransport(QObject *parent):
QObject(parent)
{
}

View File

@ -0,0 +1,73 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#ifndef MQTTSERVERTRANSPORT_H
#define MQTTSERVERTRANSPORT_H
#include <QObject>
#include <QSslConfiguration>
class QTcpServer;
class MqttServerClient: public QObject
{
Q_OBJECT
public:
explicit MqttServerClient(QObject *parent = nullptr);
virtual ~MqttServerClient() = default;
virtual bool write(const QByteArray &data) = 0;
virtual void abort() = 0;
virtual bool isOpen() const = 0;
virtual void flush() = 0;
virtual void close() = 0;
virtual QHostAddress peerAddress() const = 0;
signals:
void dataAvailable(const QByteArray &data);
void disconnected();
};
class MqttServerTransport : public QObject
{
Q_OBJECT
public:
explicit MqttServerTransport(QObject *parent = nullptr);
virtual ~MqttServerTransport() = default;
virtual bool listen(const QHostAddress &address, int port) = 0;
virtual bool isListening() const = 0;
virtual QHostAddress serverAddress() const = 0;
virtual int serverPort() const = 0;
virtual void close() = 0;
signals:
void clientConnected(MqttServerClient *client);
};
#endif // MQTTSERVERTRANSPORT_H

View File

@ -0,0 +1,99 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqtttcpclienttransport.h"
MqttTcpClientTransport::MqttTcpClientTransport(const QString &hostName, quint16 port, bool useSsl, const QSslConfiguration &sslConfiguration, QObject *parent):
MqttClientTransport(parent),
m_hostName(hostName),
m_port(port),
m_useSsl(useSsl)
{
m_socket = new QSslSocket(this);
m_socket->setSslConfiguration(sslConfiguration);
connect(m_socket, &QTcpSocket::connected, this, &MqttClientTransport::connected);
connect(m_socket, &QTcpSocket::disconnected, this, &MqttClientTransport::disconnected);
connect(m_socket, &QTcpSocket::stateChanged, this, &MqttClientTransport::stateChanged);
typedef void (QSslSocket:: *sslErrorsSignal)(const QList<QSslError> &);
connect(m_socket, static_cast<sslErrorsSignal>(&QSslSocket::sslErrors), this, &MqttClientTransport::sslErrors);
typedef void (QSslSocket:: *errorSignal)(QAbstractSocket::SocketError);
connect(m_socket, static_cast<errorSignal>(&QSslSocket::error), this, &MqttClientTransport::errorSignal);
connect(m_socket, &QTcpSocket::readyRead, this, &MqttTcpClientTransport::onReadyRead);
}
void MqttTcpClientTransport::connectToHost()
{
if (m_useSsl) {
m_socket->connectToHostEncrypted(m_hostName, m_port);
} else {
m_socket->connectToHost(m_hostName, m_port);
}
}
void MqttTcpClientTransport::abort()
{
m_socket->abort();
}
bool MqttTcpClientTransport::isOpen() const
{
return m_socket->isOpen();
}
bool MqttTcpClientTransport::write(const QByteArray &data)
{
int ret = m_socket->write(data);
return ret == data.length();
}
void MqttTcpClientTransport::flush()
{
m_socket->flush();
}
void MqttTcpClientTransport::disconnectFromHost()
{
m_socket->disconnectFromHost();
}
QAbstractSocket::SocketState MqttTcpClientTransport::state() const
{
return m_socket->state();
}
void MqttTcpClientTransport::ignoreSslErrors()
{
m_socket->ignoreSslErrors();
}
void MqttTcpClientTransport::onReadyRead()
{
QByteArray data = m_socket->readAll();
emit dataReceived(data);
}

View File

@ -0,0 +1,59 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#ifndef MQTTTCPCLIENTTRANSPORT_H
#define MQTTTCPCLIENTTRANSPORT_H
#include "mqttclienttransport.h"
class MqttTcpClientTransport: public MqttClientTransport
{
Q_OBJECT
public:
explicit MqttTcpClientTransport(const QString &hostName, quint16 port, bool useSsl, const QSslConfiguration &sslConfiguration, QObject *parent = nullptr);
void connectToHost() override;
void abort() override;
bool isOpen() const override;
bool write(const QByteArray &data) override;
void flush() override;
void disconnectFromHost() override;
QAbstractSocket::SocketState state() const override;
void ignoreSslErrors() override;
private slots:
void onReadyRead();
private:
QString m_hostName;
quint16 m_port;
bool m_useSsl = false;
QSslSocket *m_socket = nullptr;
};
#endif

View File

@ -0,0 +1,153 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqtttcpservertransport.h"
#include <QLoggingCategory>
Q_DECLARE_LOGGING_CATEGORY(dbgServer)
SslServer::SslServer(const QSslConfiguration &config, QObject *parent):
QTcpServer(parent),
m_config(config)
{
}
void SslServer::incomingConnection(qintptr socketDescriptor)
{
QSslSocket *sslSocket = new QSslSocket(this);
qCDebug(dbgServer) << "New client socket connection:" << sslSocket;
connect(sslSocket, &QSslSocket::encrypted, [this, sslSocket](){ emit clientConnected(sslSocket); });
connect(sslSocket, &QSslSocket::disconnected, this, &SslServer::onClientDisconnected);
if (!sslSocket->setSocketDescriptor(socketDescriptor)) {
qCWarning(dbgServer) << "Failed to set SSL socket descriptor.";
delete sslSocket;
return;
}
if (!m_config.isNull()) {
sslSocket->setSslConfiguration(m_config);
sslSocket->startServerEncryption();
} else {
emit clientConnected(sslSocket);
}
}
void SslServer::onClientDisconnected()
{
QSslSocket *socket = static_cast<QSslSocket*>(sender());
qCDebug(dbgServer) << "Client socket disconnected:" << socket;
emit clientDisconnected(socket);
socket->deleteLater();
}
MqttTcpServerClient::MqttTcpServerClient(QTcpSocket *socket, QObject *parent):
MqttServerClient(parent),
m_socket(socket)
{
m_socket->setParent(this);
connect(socket, &QTcpSocket::readyRead, this, &MqttTcpServerClient::onSocketReadyRead);
connect(socket, &QTcpSocket::disconnected, this, &MqttTcpServerClient::disconnected);
}
void MqttTcpServerClient::onSocketReadyRead()
{
emit dataAvailable(m_socket->readAll());
}
bool MqttTcpServerClient::write(const QByteArray &data)
{
qint64 len = m_socket->write(data);
return len == data.length();
}
void MqttTcpServerClient::abort()
{
m_socket->abort();
}
bool MqttTcpServerClient::isOpen() const
{
return m_socket->isOpen();
}
void MqttTcpServerClient::flush()
{
m_socket->flush();
}
void MqttTcpServerClient::close()
{
m_socket->close();
}
QHostAddress MqttTcpServerClient::peerAddress() const
{
return m_socket->peerAddress();
}
MqttTcpServerTransport::MqttTcpServerTransport(const QSslConfiguration &config, QObject *parent):
MqttServerTransport(parent),
m_sslServer(new SslServer(config, this))
{
connect(m_sslServer, &SslServer::clientConnected, this, &MqttTcpServerTransport::onClientConnected);
}
bool MqttTcpServerTransport::listen(const QHostAddress &address, int port)
{
return m_sslServer->listen(address, port);
}
bool MqttTcpServerTransport::isListening() const
{
return m_sslServer->isListening();
}
QHostAddress MqttTcpServerTransport::serverAddress() const
{
return m_sslServer->serverAddress();
}
int MqttTcpServerTransport::serverPort() const
{
return m_sslServer->serverPort();
}
void MqttTcpServerTransport::close()
{
return m_sslServer->close();
}
void MqttTcpServerTransport::onClientConnected(QTcpSocket *socket)
{
MqttTcpServerClient *client = new MqttTcpServerClient(socket, this);
emit clientConnected(client);
}

View File

@ -0,0 +1,96 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#ifndef MQTTTCPSERVERTRANSPORT_H
#define MQTTTCPSERVERTRANSPORT_H
#include "mqttservertransport.h"
#include <QObject>
#include <QTcpServer>
class SslServer: public QTcpServer
{
Q_OBJECT
public:
SslServer(const QSslConfiguration &config, QObject *parent = nullptr);
signals:
void clientConnected(QSslSocket *socket);
void clientDisconnected(QSslSocket *socket);
void dataAvailable(QSslSocket *socket, const QByteArray &data);
protected:
void incomingConnection(qintptr socketDescriptor) override;
private slots:
void onClientDisconnected();
private:
QSslConfiguration m_config;
};
class MqttTcpServerClient: public MqttServerClient
{
Q_OBJECT
public:
explicit MqttTcpServerClient(QTcpSocket *socket, QObject *parent = nullptr);
bool write(const QByteArray &data) override;
void abort() override;
bool isOpen() const override;
void flush() override;
void close() override;
QHostAddress peerAddress() const override;
private slots:
void onSocketReadyRead();
private:
QTcpSocket *m_socket = nullptr;
};
class MqttTcpServerTransport: public MqttServerTransport
{
Q_OBJECT
public:
explicit MqttTcpServerTransport(const QSslConfiguration &config, QObject *parent = nullptr);
bool listen(const QHostAddress &address, int port) override;
bool isListening() const override;
QHostAddress serverAddress() const override;
int serverPort() const override;
void close() override;
private slots:
void onClientConnected(QTcpSocket *socket);
private:
SslServer *m_sslServer = nullptr;
};
#endif // MQTTTCPSERVERTRANSPORT_H

View File

@ -0,0 +1,105 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqttwebsocketclienttransport.h"
MqttWebSocketClientTransport::MqttWebSocketClientTransport(const QNetworkRequest &request, QObject *parent):
MqttClientTransport(parent),
m_request(request)
{
m_socket = new QWebSocket(QString(), QWebSocketProtocol::VersionLatest, this);
connect(m_socket, &QWebSocket::connected, this, &MqttClientTransport::connected);
connect(m_socket, &QWebSocket::disconnected, this, &MqttClientTransport::disconnected);
connect(m_socket, &QWebSocket::stateChanged, this, &MqttClientTransport::stateChanged);
connect(m_socket, &QWebSocket::sslErrors, this, &MqttClientTransport::sslErrors);
typedef void (QWebSocket:: *errorSignal)(QAbstractSocket::SocketError);
connect(m_socket, static_cast<errorSignal>(&QWebSocket::error), this, &MqttClientTransport::errorSignal);
connect(m_socket, &QWebSocket::textMessageReceived, this, &MqttWebSocketClientTransport::onTextMessageReceived);
connect(m_socket, &QWebSocket::binaryMessageReceived, this, &MqttWebSocketClientTransport::onBinaryMessageReceived);
}
void MqttWebSocketClientTransport::connectToHost()
{
#if QT_VERSION < QT_VERSION_CHECK(5, 6, 0)
if (!m_request.rawHeaderList().isEmpty()) {
qCWarning(dbgClient) << "Qt versions older than 5.6 do not support HTTP request headers with web sockets. The connection may fail.";
}
m_socket->open(m_request.url());
#else
m_socket->open(m_request);
#endif
}
void MqttWebSocketClientTransport::abort()
{
m_socket->abort();
}
bool MqttWebSocketClientTransport::isOpen() const
{
return m_socket->isValid();
}
bool MqttWebSocketClientTransport::write(const QByteArray &data)
{
int ret = m_socket->sendBinaryMessage(data);
return ret == data.length();
}
void MqttWebSocketClientTransport::flush()
{
m_socket->flush();
}
void MqttWebSocketClientTransport::disconnectFromHost()
{
m_socket->close();
}
QAbstractSocket::SocketState MqttWebSocketClientTransport::state() const
{
return m_socket->state();
}
void MqttWebSocketClientTransport::ignoreSslErrors()
{
m_socket->ignoreSslErrors();
}
void MqttWebSocketClientTransport::onTextMessageReceived(const QString &message)
{
qCDebug(dbgClient()) << "Text message received:" << message;
qCWarning(dbgClient()) << "Received a text message from the server. Doesn't look like MQTT. Closing connection.";
m_socket->close(QWebSocketProtocol::CloseCodeProtocolError);
}
void MqttWebSocketClientTransport::onBinaryMessageReceived(const QByteArray &message)
{
emit dataReceived(message);
}

View File

@ -0,0 +1,58 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#ifndef MQTTWEBSOCKETCLIENTTRANSPORT_H
#define MQTTWEBSOCKETCLIENTTRANSPORT_H
#include "mqttclienttransport.h"
class MqttWebSocketClientTransport: public MqttClientTransport
{
Q_OBJECT
public:
explicit MqttWebSocketClientTransport(const QNetworkRequest &request, QObject *parent = nullptr);
void connectToHost() override;
void abort() override;
bool isOpen() const override;
bool write(const QByteArray &data) override;
void flush() override;
void disconnectFromHost() override;
QAbstractSocket::SocketState state() const override;
void ignoreSslErrors() override;
private slots:
void onTextMessageReceived(const QString &message);
void onBinaryMessageReceived(const QByteArray &message);
private:
QNetworkRequest m_request;
QWebSocket *m_socket = nullptr;
};
#endif

View File

@ -0,0 +1,134 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqttwebsocketservertransport.h"
#include <QWebSocket>
#include <QLoggingCategory>
Q_DECLARE_LOGGING_CATEGORY(dbgServer)
MqttWebSocketServerClient::MqttWebSocketServerClient(QWebSocket *socket, QObject *parent):
MqttServerClient(parent),
m_socket(socket)
{
m_socket->setParent(this);
connect(m_socket, &QWebSocket::textMessageReceived, this, &MqttWebSocketServerClient::onTextMessageReceived);
connect(m_socket, &QWebSocket::binaryMessageReceived, this, &MqttWebSocketServerClient::onBinaryMessageReceived);
connect(m_socket, &QWebSocket::disconnected, this, &MqttServerClient::disconnected);
}
bool MqttWebSocketServerClient::write(const QByteArray &data)
{
qint64 len = m_socket->sendBinaryMessage(data);
return len == data.length();
}
void MqttWebSocketServerClient::abort()
{
m_socket->abort();
}
bool MqttWebSocketServerClient::isOpen() const
{
return m_socket->isValid();
}
void MqttWebSocketServerClient::flush()
{
m_socket->flush();
}
void MqttWebSocketServerClient::close()
{
m_socket->close();
}
QHostAddress MqttWebSocketServerClient::peerAddress() const
{
return m_socket->peerAddress();
}
void MqttWebSocketServerClient::onTextMessageReceived(const QString &message)
{
qCWarning(dbgServer).nospace() << "WebSocket received a text message from " << peerAddress() << ": " << message << ". This is not valid. Closing connection.";
m_socket->abort();
}
void MqttWebSocketServerClient::onBinaryMessageReceived(const QByteArray &data)
{
emit dataAvailable(data);
}
MqttWebSocketServerTransport::MqttWebSocketServerTransport(const QSslConfiguration &sslConfiguration, QObject *parent):
MqttServerTransport(parent)
{
if (sslConfiguration.isNull()) {
m_server = new QWebSocketServer("nymea-mqtt", QWebSocketServer::NonSecureMode, this);
} else {
m_server = new QWebSocketServer("nymea-mqtt", QWebSocketServer::SecureMode, this);
m_server->setSslConfiguration(sslConfiguration);
}
connect(m_server, &QWebSocketServer::newConnection, this, &MqttWebSocketServerTransport::onNewConnection);
}
bool MqttWebSocketServerTransport::listen(const QHostAddress &address, int port)
{
return m_server->listen(address, port);
}
bool MqttWebSocketServerTransport::isListening() const
{
return m_server->isListening();
}
QHostAddress MqttWebSocketServerTransport::serverAddress() const
{
return m_server->serverAddress();
}
int MqttWebSocketServerTransport::serverPort() const
{
return m_server->serverPort();
}
void MqttWebSocketServerTransport::close()
{
m_server->close();
}
void MqttWebSocketServerTransport::onNewConnection()
{
QWebSocket *webSocket = m_server->nextPendingConnection();
if (!webSocket) {
qCWarning(dbgServer()) << "New connection signalled but no pending socket available";
return;
}
MqttWebSocketServerClient *client = new MqttWebSocketServerClient(webSocket, this);
emit clientConnected(client);
}

View File

@ -0,0 +1,79 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#ifndef MQTTWEBSOCKETSERVERTRANSPORT_H
#define MQTTWEBSOCKETSERVERTRANSPORT_H
#include "mqttservertransport.h"
#include <QWebSocketServer>
class MqttWebSocketServerClient: public MqttServerClient
{
Q_OBJECT
public:
explicit MqttWebSocketServerClient(QWebSocket *socket, QObject *parent = nullptr);
bool write(const QByteArray &data) override;
void abort() override;
bool isOpen() const override;
void flush() override;
void close() override;
QHostAddress peerAddress() const override;
private slots:
void onTextMessageReceived(const QString &message);
void onBinaryMessageReceived(const QByteArray &data);
private:
QWebSocket *m_socket = nullptr;
};
class MqttWebSocketServerTransport : public MqttServerTransport
{
Q_OBJECT
public:
explicit MqttWebSocketServerTransport(const QSslConfiguration &sslConfiguration, QObject *parent = nullptr);
bool listen(const QHostAddress &address, int port) override;
bool isListening() const override;
QHostAddress serverAddress() const override;
int serverPort() const override;
void close() override;
signals:
private slots:
void onNewConnection();
private:
QWebSocketServer *m_server = nullptr;
};
#endif // MQTTWEBSOCKETSERVERTRANSPORT_H

View File

@ -1,6 +1,7 @@
TEMPLATE = subdirs
SUBDIRS += libnymea-mqtt server tests
SUBDIRS += libnymea-mqtt server client tests
server.depends = libnymea-mqtt
client.depends = libnymea-mqtt
tests.depends = libnymea-mqtt

126
server/authorizer.cpp Normal file
View File

@ -0,0 +1,126 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU General Public License Usage
* Alternatively, this project may be redistributed and/or modified under
* the terms of the GNU General Public License as published by the Free Software Foundation,
* GNU version 3. this project is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "authorizer.h"
#include <QFile>
#include <QSettings>
Authorizer::Authorizer(const QString &policyFile, QObject *parent):
QObject{parent},
m_settingsFile(policyFile)
{
if (QFile::exists(policyFile)) {
qInfo() << "Using policy file:" << policyFile;
}
}
Mqtt::ConnectReturnCode Authorizer::authorizeConnect(int serverAddressId, const QString &clientId, const QString &username, const QString &password, const QHostAddress &peerAddress)
{
Q_UNUSED(serverAddressId)
Q_UNUSED(peerAddress);
if (!QFile::exists(m_settingsFile)) {
return Mqtt::ConnectReturnCodeServerUnavailable;
}
MqttPolicy policy = loadPolicy(clientId);
if (!policy.isValid()) {
return Mqtt::ConnectReturnCodeNotAuthorized;
}
if (policy.username() != username || policy.password() != password) {
return Mqtt::ConnectReturnCodeBadUsernameOrPassword;
}
return Mqtt::ConnectReturnCodeAccepted;
}
bool Authorizer::authorizeSubscribe(int serverAddressId, const QString &clientId, const QString &topicFilter)
{
Q_UNUSED(serverAddressId)
qCritical() << "sub filters" << topicFilter;
if (!QFile::exists(m_settingsFile)) {
return false;
}
MqttPolicy policy = loadPolicy(clientId);
if (!policy.isValid()) {
return false;
}
qCritical() << "policy" << policy.allowedSubscribeTopicFilters();
if (policy.allowedSubscribeTopicFilters().contains(topicFilter)) {
return true;
}
return false;
}
bool Authorizer::authorizePublish(int serverAddressId, const QString &clientId, const QString &topic)
{
Q_UNUSED(serverAddressId)
if (!QFile::exists(m_settingsFile)) {
return false;
}
MqttPolicy policy = loadPolicy(clientId);
if (!policy.isValid()) {
return false;
}
if (policy.allowedPublishTopicFilters().contains(topic)) {
return true;
}
return false;
}
void Authorizer::addPolicy(const QString &clientId, const QString &username, const QString &password, const QStringList &allowedSubscribeTopicFilters, const QStringList &allowedPublishTopicFilters)
{
QSettings settings(m_settingsFile, QSettings::IniFormat);
settings.beginGroup(clientId);
settings.setValue("username", username);
settings.setValue("password", password);
settings.setValue("allowedSubscribeTopicFilters", allowedSubscribeTopicFilters);
settings.setValue("allowedPublishTopicFilters", allowedPublishTopicFilters);
}
void Authorizer::removePolicy(const QString &clientId)
{
QSettings settings(m_settingsFile, QSettings::IniFormat);
settings.remove(clientId);
}
MqttPolicy Authorizer::loadPolicy(const QString &clientId)
{
QSettings settings(m_settingsFile, QSettings::IniFormat);
if (!settings.childGroups().contains(clientId)) {
return MqttPolicy();
}
settings.beginGroup(clientId);
MqttPolicy policy(clientId,
settings.value("username").toString(),
settings.value("password").toString(),
settings.value("allowedSubscribeTopicFilters").toStringList(),
settings.value("allowedPublishTopicFilters").toStringList());
return policy;
}

59
server/authorizer.h Normal file
View File

@ -0,0 +1,59 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU General Public License Usage
* Alternatively, this project may be redistributed and/or modified under
* the terms of the GNU General Public License as published by the Free Software Foundation,
* GNU version 3. this project is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#ifndef AUTHORIZER_H
#define AUTHORIZER_H
#include "mqttpolicy.h"
#include <mqttserver.h>
#include <QObject>
class Authorizer : public QObject, public MqttAuthorizer
{
Q_OBJECT
public:
explicit Authorizer(const QString &policyFile, QObject *parent = nullptr);
Mqtt::ConnectReturnCode authorizeConnect(int serverAddressId, const QString &clientId, const QString &username, const QString &password, const QHostAddress &peerAddress) override;
bool authorizeSubscribe(int serverAddressId, const QString &clientId, const QString &topicFilter) override;
bool authorizePublish(int serverAddressId, const QString &clientId, const QString &topic) override;
void addPolicy(const QString &clientId, const QString &username, const QString &password, const QStringList &allowedSubscribeTopicFilters, const QStringList &allowedPublishTopicFilters);
void removePolicy(const QString &clientId);
private:
MqttPolicy loadPolicy(const QString &clientId);
private:
QString m_settingsFile;
};
#endif // AUTHORIZER_H

View File

@ -0,0 +1,215 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU General Public License Usage
* Alternatively, this project may be redistributed and/or modified under
* the terms of the GNU General Public License as published by the Free Software Foundation,
* GNU version 3. this project is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "certificateloader.h"
#include <openssl/ssl.h>
#include <QFileInfo>
#include <QDir>
#include <QDebug>
#include <QSaveFile>
#include <QUuid>
// Disabling deprecation warnings for openssl 3.0
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
CertificateLoader::CertificateLoader()
{
}
bool CertificateLoader::loadCertificate(const QString &certificateKeyFileName, const QString &certificateFileName)
{
QFile certificateKeyFile(certificateKeyFileName);
if (!certificateKeyFile.exists()) {
qWarning() << "Could not load certificate key file" << certificateKeyFile.fileName() << "because the file does not exist.";
return false;
}
if (!certificateKeyFile.open(QIODevice::ReadOnly)) {
qWarning() << "Could not open" << certificateKeyFile.fileName() << ":" << certificateKeyFile.errorString();
return false;
}
m_certificateKey = QSslKey(&certificateKeyFile, QSsl::Rsa);
if (m_certificateKey.isNull()) {
qWarning() << "SSL certificate key" << certificateFileName << "is not valid.";
return false;
}
qDebug() << "Loaded private certificate key" << certificateKeyFileName;
certificateKeyFile.close();
QFile certificateFile(certificateFileName);
if (!certificateFile.exists()) {
qWarning() << "Could not load certificate file" << certificateFile.fileName() << "because the file does not exist.";
return false;
}
if (!certificateFile.open(QIODevice::ReadOnly)) {
qWarning() << "Could not open" << certificateFile.fileName() << ":" << certificateFile.errorString();
return false;
}
m_certificate = QSslCertificate(&certificateFile);
if (m_certificate.isNull()) {
qWarning() << "SSL certificate" << certificateFileName << "is not valid.";;
return false;
}
qDebug() << "Loaded certificate file" << certificateFileName;
certificateFile.close();
return true;
}
bool CertificateLoader::generateCertificate(const QString &certificateKeyFileName, const QString &certificateFileName)
{
EVP_PKEY * pkey = nullptr;
BIGNUM *bne = NULL;
RSA * rsa = nullptr;
X509 * x509 = nullptr;
X509_NAME * name = nullptr;
BIO * bp_public = nullptr, * bp_private = nullptr;
const char * keyBuffer = nullptr;
const char * certBuffer = nullptr;
QFileInfo certFi(certificateFileName);
QFileInfo keyFi(certificateKeyFileName);
QDir dir;
if (!dir.mkpath(certFi.absolutePath()) || !dir.mkpath(keyFi.absolutePath())) {
qWarning() << "Error creating SSL certificate destination folder";
return false;
}
QSaveFile certfile(certificateFileName);
QSaveFile keyFile(certificateKeyFileName);
if (!certfile.open(QFile::WriteOnly | QFile::Truncate | QFile::Unbuffered) || !keyFile.open(QFile::WriteOnly | QFile::Truncate | QFile::Unbuffered)) {
qWarning() << "Error opening SSL certificate files";
return false;
}
bne = BN_new();
BN_set_word(bne, RSA_F4);
q_check_ptr(bne);
rsa = RSA_new();
RSA_generate_key_ex(rsa, 2048, bne, nullptr);
q_check_ptr(rsa);
pkey = EVP_PKEY_new();
q_check_ptr(pkey);
EVP_PKEY_assign_RSA(pkey, rsa);
x509 = X509_new();
q_check_ptr(x509);
// Randomize serial number in case a previous one is stuck in a browser (Chromium
// completely rejects reused serial numbers and doesn't even allow to bypass it by an exception)
qsrand(QUuid::createUuid().toString().remove(QRegExp("[a-zA-Z{}-]")).left(5).toInt());
ASN1_INTEGER_set(X509_get_serialNumber(x509), qrand());
X509_gmtime_adj(X509_get_notBefore(x509), 0); // not before current time
X509_gmtime_adj(X509_get_notAfter(x509), 31536000L*10); // not after 10 years from this point
X509_set_pubkey(x509, pkey);
name = X509_get_subject_name(x509);
q_check_ptr(name);
X509_NAME_add_entry_by_txt(name, "E", MBSTRING_ASC, (unsigned char *)"nymea", -1, -1, 0);
X509_NAME_add_entry_by_txt(name, "CN", MBSTRING_ASC, (unsigned char *)"nymea.io", -1, -1, 0);
X509_NAME_add_entry_by_txt(name, "OU", MBSTRING_ASC, (unsigned char *)"home", -1, -1, 0);
X509_NAME_add_entry_by_txt(name, "O", MBSTRING_ASC, (unsigned char *)"nymea GmbH", -1, -1, 0);
X509_NAME_add_entry_by_txt(name, "L", MBSTRING_ASC, (unsigned char *)"Vienna", -1, -1, 0);
X509_NAME_add_entry_by_txt(name, "C", MBSTRING_ASC, (unsigned char *)"AT", -1, -1, 0);
X509_set_issuer_name(x509, name);
X509_sign(x509, pkey, EVP_sha256());
bp_private = BIO_new(BIO_s_mem());
q_check_ptr(bp_private);
if(PEM_write_bio_PrivateKey(bp_private, pkey, nullptr, nullptr, 0, nullptr, nullptr) != 1)
{
BN_free(bne);
EVP_PKEY_free(pkey);
X509_free(x509);
BIO_free_all(bp_private);
qWarning() << "PEM_write_bio_PrivateKey";
return false;
}
bp_public = BIO_new(BIO_s_mem());
q_check_ptr(bp_public);
if(PEM_write_bio_X509(bp_public, x509) != 1)
{
BN_free(bne);
EVP_PKEY_free(pkey);
X509_free(x509);
BIO_free_all(bp_public);
BIO_free_all(bp_private);
qWarning() << "PEM_write_bio_X509";
return false;
}
long pubSize = BIO_get_mem_data(bp_public, &certBuffer);
q_check_ptr(certBuffer);
long privSize = BIO_get_mem_data(bp_private, &keyBuffer);
q_check_ptr(keyBuffer);
if (certfile.write(certBuffer, pubSize) == pubSize && keyFile.write(keyBuffer, privSize) == privSize) {
certfile.commit();
keyFile.commit();
qDebug() << "Generated new SSL certificate";
} else {
qWarning() << "Error writing SSL certificate files" << certificateKeyFileName << certificateFileName;
certfile.cancelWriting();
keyFile.cancelWriting();
BN_free(bne);
EVP_PKEY_free(pkey); // this will also free the rsa key
X509_free(x509);
BIO_free_all(bp_public);
BIO_free_all(bp_private);
return false;
}
BN_free(bne);
EVP_PKEY_free(pkey); // this will also free the rsa key
X509_free(x509);
BIO_free_all(bp_public);
BIO_free_all(bp_private);
return true;
}
QSslKey CertificateLoader::certificateKey() const
{
return m_certificateKey;
}
QSslCertificate CertificateLoader::certificate() const
{
return m_certificate;
}
#pragma GCC diagnostic pop

View File

@ -0,0 +1,51 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU General Public License Usage
* Alternatively, this project may be redistributed and/or modified under
* the terms of the GNU General Public License as published by the Free Software Foundation,
* GNU version 3. this project is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#ifndef CERTIFICATELOADER_H
#define CERTIFICATELOADER_H
#include <QString>
#include <QSslKey>
#include <QSslCertificate>
class CertificateLoader
{
public:
CertificateLoader();
bool loadCertificate(const QString &certificateKeyFileName, const QString &certificateFileName);
bool generateCertificate(const QString &certificateKeyFileName, const QString &certificateFileName);
QSslKey certificateKey() const;
QSslCertificate certificate() const;
private:
QSslKey m_certificateKey;
QSslCertificate m_certificate;
};
#endif // CERTIFICATELOADER_H

View File

@ -1,6 +1,6 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2020, nymea GmbH
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
@ -25,16 +25,126 @@
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include <QCoreApplication>
#include "mqttserver.h"
#include "authorizer.h"
#include "certificateloader.h"
#include <QCoreApplication>
#include <QCommandLineParser>
#include <QStandardPaths>
#include <QSettings>
#include <iostream>
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
QString defaultConfigFile = QStandardPaths::writableLocation(QStandardPaths::ConfigLocation) + "/nymea/nymea-mqtt-server.conf";
QString defaultPolicyFile = QStandardPaths::writableLocation(QStandardPaths::ConfigLocation) + "/nymea/mqttpolicies.conf";
quint16 defaultTcpPort = 1883;
quint16 defaultWsPort = 0;
bool useSslDefault = false;
QString defaultCertKeyFileName = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation) + "/certs/certificate.key";
QString defaultCertFileName = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation) + "/certs/certificate.crt";
QCommandLineParser parser;
parser.addOptions({
{{"config", "c"}, QString("The configuration file to use (default: %1").arg(defaultConfigFile), "file", defaultConfigFile},
{{"policy-file", "p"}, QString("The policy configuration file to use (default: %1)").arg(defaultPolicyFile), "file", defaultPolicyFile},
{{"insecure", "i"}, "Run in insecure mode (allow all connections, publishes and subscribes)"},
{{"tcp-port", "t"}, QString("The port for the TCP server (default: %1, 0 to disable)").arg(defaultTcpPort), "port", QString::number(defaultTcpPort)},
{{"ws-port", "w"}, "The port for the web socket server (default: disabled)", "port", QString::number(defaultWsPort)},
{{"add-policy", "a"}, "Add a new client policy"},
{{"remove-policy", "r"}, "Remove a client policy", "clientId"},
{{"ssl", "S"}, "Enable SSL encryption (default: disabled)"},
{{"certificate", "C"}, QString("The SSL certificate to use (default: %1)").arg(defaultCertFileName), "crt file", defaultCertFileName},
{{"certificate-key", "K"}, QString("The SSL certificate key to use (default: %1)").arg(defaultCertKeyFileName), "key file", defaultCertKeyFileName},
});
parser.setApplicationDescription("nymea-mqtt-server is a standalone MQTT broker with support for TCP and web socket connections.\n\n"
"Every command line argument which can be passed, can also be set into the configuration file by specifing the long name for it followed by = and the desired value."
"For example:\n\ntcp-port=1883\nssl=true\n\n"
"Note that any passed command line arguments will still override any values set in the configuration file.\n\n"
"Enabling SSL requires an SSL ertificate which can be configured with the certificate and certificate-key options. If no certificate is found in the given locations, a new self-signed certificate will be generated.\n\n"
"Invoking the application with \"add-policy\" or \"remove-policy\" will allow changing the policies at run time, no broker restart is required. However, existing clients won't be disconnected immediately when a policy is removed but subsequent connect, subscribe or publish operations will be blocked.");
parser.addHelpOption();
parser.process(a.arguments());
qDebug() << "Using configuration file:" << parser.value("config");
QSettings settings(parser.value("config"), QSettings::IniFormat);
QString policyFile = parser.isSet("policy-file") ? parser.value("policy-file") : settings.value("policy-file", defaultPolicyFile).toString();
bool insecure = parser.isSet("insecure") ? true : settings.value("insecure", false).toBool();
quint16 tcpPort = parser.isSet("tcp-port") ? parser.value("tcp-port").toUInt() : settings.value("tcp-port", defaultTcpPort).toUInt();
quint16 wsPort = parser.isSet("ws-port") ? parser.value("ws-port").toUInt() : settings.value("ws-port", defaultWsPort).toUInt();
bool useSsl = parser.isSet("ssl") || settings.value("ssl", useSslDefault).toBool();
QString certificateKeyFile = parser.isSet("certificate-key") ? parser.value("certificate-key") : settings.value("certificate-key", defaultCertKeyFileName).toString();
QString certificateFile = parser.isSet("certificate") ? parser.value("certificate") : settings.value("certificate", defaultCertFileName).toString();
if (parser.isSet("add-policy")) {
Authorizer authorizer(policyFile);
std::string line;
std::cout << "Client ID: ";
std::getline(std::cin, line);
QString clientId = QString::fromStdString(line);
std::cout << "Username: ";
std::getline(std::cin, line);
QString username = QString::fromStdString(line);
std::cout << "Password: ";
std::getline(std::cin, line);
QString password = QString::fromStdString(line);
std::cout << "Subscribe topic filters (comma separated): ";
std::getline(std::cin, line);
QStringList allowedSubscribeTopicFilters = QString::fromStdString(line).split(",");
std::cout << "Publish topic filters (comma separated): ";
std::getline(std::cin, line);
QStringList allowedPublishTopicFilters = QString::fromStdString(line).split(",");
authorizer.addPolicy(clientId, username, password, allowedSubscribeTopicFilters, allowedPublishTopicFilters);
exit(EXIT_SUCCESS);
}
if (parser.isSet("remove-policy")) {
Authorizer authorizer(policyFile);
authorizer.removePolicy(parser.value("remove-policy"));
exit(EXIT_SUCCESS);
}
MqttServer server;
server.listen(QHostAddress::AnyIPv4, 1883);
Authorizer *authorizer = nullptr;
if (!insecure) {
authorizer = new Authorizer(policyFile);
server.setAuthorizer(authorizer);
}
QSslConfiguration sslConfiguration;
if (useSsl) {
CertificateLoader certLoader;
bool loaded = certLoader.loadCertificate(certificateKeyFile, certificateFile);
if (!loaded) {
certLoader.generateCertificate(certificateKeyFile, certificateFile);
loaded = certLoader.loadCertificate(certificateKeyFile, certificateFile);
}
if (!loaded) {
qCritical() << "Certificate files not found and unable to generate a new one.";
exit(EXIT_FAILURE);
}
sslConfiguration.setProtocol(QSsl::TlsV1_2OrLater);
sslConfiguration.setPrivateKey(certLoader.certificateKey());
sslConfiguration.setLocalCertificate(certLoader.certificate());
}
if (tcpPort != 0) {
int serverId = server.listen(QHostAddress::AnyIPv4, tcpPort, sslConfiguration);
if (serverId == -1) {
exit(EXIT_FAILURE);
}
}
if (wsPort != 0) {
int serverId = server.listenWebSocket(QHostAddress::AnyIPv4, wsPort, sslConfiguration);
if (serverId == -1) {
exit(EXIT_FAILURE);
}
}
return a.exec();
}

73
server/mqttpolicy.cpp Normal file
View File

@ -0,0 +1,73 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU General Public License Usage
* Alternatively, this project may be redistributed and/or modified under
* the terms of the GNU General Public License as published by the Free Software Foundation,
* GNU version 3. this project is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqttpolicy.h"
MqttPolicy::MqttPolicy()
{
}
MqttPolicy::MqttPolicy(const QString &clientId, const QString &username, const QString &password, const QStringList &allowedSubscribeTopicFilters, const QStringList &allowedPublishTopicFilters):
m_clientId(clientId),
m_username(username),
m_password(password),
m_allowedSubscribeTopicFilters(allowedSubscribeTopicFilters),
m_allowedPublishTopicFilters(allowedPublishTopicFilters)
{
}
QString MqttPolicy::clientId() const
{
return m_clientId;
}
QString MqttPolicy::username() const
{
return m_username;
}
QString MqttPolicy::password() const
{
return m_password;
}
QStringList MqttPolicy::allowedSubscribeTopicFilters() const
{
return m_allowedSubscribeTopicFilters;
}
QStringList MqttPolicy::allowedPublishTopicFilters() const
{
return m_allowedPublishTopicFilters;
}
bool MqttPolicy::isValid() const
{
return !m_clientId.isEmpty();
}

56
server/mqttpolicy.h Normal file
View File

@ -0,0 +1,56 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2022, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU General Public License Usage
* Alternatively, this project may be redistributed and/or modified under
* the terms of the GNU General Public License as published by the Free Software Foundation,
* GNU version 3. this project is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#ifndef MQTTPOLICY_H
#define MQTTPOLICY_H
#include <QString>
#include <QStringList>
class MqttPolicy
{
public:
MqttPolicy();
MqttPolicy(const QString &clientId, const QString &username, const QString &password, const QStringList &allowedSubscribeTopicFilters, const QStringList &allowedPublishTopicFilters);
QString clientId() const;
QString username() const;
QString password() const;
QStringList allowedSubscribeTopicFilters() const;
QStringList allowedPublishTopicFilters() const;
bool isValid() const;
private:
QString m_clientId;
QString m_username;
QString m_password;
QStringList m_allowedSubscribeTopicFilters;
QStringList m_allowedPublishTopicFilters;
};
#endif // MQTTPOLICY_H

View File

@ -1,5 +1,5 @@
TEMPLATE = app
TARGET = nymea-mqttserver
TARGET = nymea-mqtt-server
include(../nymea-mqtt.pri)
@ -8,9 +8,18 @@ QT -= gui
INCLUDEPATH += $$top_srcdir/libnymea-mqtt/
SOURCES += main.cpp
HEADERS += \
authorizer.h \
certificateloader.h \
mqttpolicy.h
LIBS += -L$$top_builddir/libnymea-mqtt/ -lnymea-mqtt
SOURCES += main.cpp \
authorizer.cpp \
certificateloader.cpp \
mqttpolicy.cpp
target.path = /usr/bin/
LIBS += -L$$top_builddir/libnymea-mqtt/ -lnymea-mqtt -lssl -lcrypto
target.path = $$[QT_INSTALL_PREFIX]/bin
INSTALLS += target

View File

@ -1,4 +1,4 @@
QT += testlib network
QT += testlib network websockets
QT -= gui
CONFIG += qt console warn_on depend_includepath testcase
@ -11,6 +11,8 @@ include(../../nymea-mqtt.pri)
INCLUDEPATH += $$top_srcdir/libnymea-mqtt/
SOURCES += test_operation.cpp
HEADERS += $${top_srcdir}/tests/common/mqtttests.h
SOURCES += $${top_srcdir}/tests/common/mqtttests.cpp
LIBS += -L$$top_builddir/libnymea-mqtt/ -lnymea-mqtt

View File

@ -32,81 +32,11 @@
#include <QTest>
#include <QSignalSpy>
class OperationTests: public QObject
{
Q_OBJECT
#include "mqtttests.h"
#if (QT_VERSION >= QT_VERSION_CHECK(5, 7, 0))
private slots:
void initTestCase();
void cleanup();
void connectAndDisconnect();
void keepAliveTimesOut();
void subscribeAndPublish_data();
void subscribeAndPublish();
void willIsSentOnClientDisappearing();
void willIsNotSentOnClientDisconnecting();
void testWillRetain();
void testAutoReconnect();
void testQoS1Retransmissions();
void testMultiSubscription();
void testSubscriptionTopicFilters_data();
void testSubscriptionTopicFilters();
void testSubscriptionTopicMatching_data();
void testSubscriptionTopicMatching();
void testSessionManagementDropOldSession();
void testSessionManagementResumeOldSession();
void testSessionManagementFailResumeOldSession();
void testQoS1PublishToServerIsAckedOnSessionResume();
void testQoS1PublishToClientIsDeliveredOnSessionResume();
void testQoS2PublishToServerIsCompletedOnSessionResume();
void testQoS2PublishToClientIsCompletedOnSessionResume();
void testRetain();
void testUnsubscribe();
void testEmptyClientId();
void testBinaryPaylaod();
private:
// Connects and waits for the MQTT CONNECT to be finished
MqttClient *connectAndWait(const QString &clientId, bool cleanSession = true, quint16 keepAlive = 300, const QString &willTopic = QString(), const QString &willMessage = QString(), Mqtt::QoS willQoS = Mqtt::QoS0, bool willRetain = false);
// Just connects, returns the client and signalspy which has been created before calling connect. You must delete the spy yourself!
QPair<MqttClient*, QSignalSpy*> connectToServer(const QString &clientId, bool cleanSession = true, quint16 keepAlive = 300, const QString &willTopic = QString(), const QString &willMessage = QString(), Mqtt::QoS willQoS = Mqtt::QoS0, bool willRetain = false);
void disconnectAndWait(MqttClient* client);
bool subscribeAndWait(MqttClient* client, const QString &topic, Mqtt::QoS qos = Mqtt::QoS1);
private:
QString m_serverHost = "127.0.0.1";
quint16 m_serverPort = 5555;
MqttServer *m_server = nullptr;
QList<MqttClient*> m_clients;
#endif
};
#if (QT_VERSION >= QT_VERSION_CHECK(5, 7, 0))
MqttClient *OperationTests::connectAndWait(const QString &clientId, bool cleanSession, quint16 keepAlive, const QString &willTopic, const QString &willMessage, Mqtt::QoS willQoS, bool willRetain)
MqttClient *MqttTests::connectAndWait(const QString &clientId, bool cleanSession, quint16 keepAlive, const QString &willTopic, const QString &willMessage, Mqtt::QoS willQoS, bool willRetain)
{
QPair<MqttClient*, QSignalSpy*> result = connectToServer(clientId, cleanSession, keepAlive, willTopic, willMessage, willQoS, willRetain);
if (result.second->count() == 0) {
@ -120,7 +50,7 @@ MqttClient *OperationTests::connectAndWait(const QString &clientId, bool cleanSe
return result.first;
}
QPair<MqttClient*, QSignalSpy*> OperationTests::connectToServer(const QString &clientId, bool cleanSession, quint16 keepAlive, const QString &willTopic, const QString &willMessage, Mqtt::QoS willQoS, bool willRetain)
QPair<MqttClient*, QSignalSpy*> MqttTests::connectToServer(const QString &clientId, bool cleanSession, quint16 keepAlive, const QString &willTopic, const QString &willMessage, Mqtt::QoS willQoS, bool willRetain)
{
MqttClient* client = new MqttClient(clientId, keepAlive, willTopic, willMessage.toUtf8(), willQoS, willRetain, this);
client->setAutoReconnect(false);
@ -128,11 +58,13 @@ QPair<MqttClient*, QSignalSpy*> OperationTests::connectToServer(const QString &c
m_clients.append(client);
QSignalSpy *spy = new QSignalSpy(client, &MqttClient::connected);
client->connectToHost(m_serverHost, m_serverPort, cleanSession);
connectClientToServer(client, cleanSession);
return qMakePair<MqttClient*, QSignalSpy*>(client, spy);
}
void OperationTests::disconnectAndWait(MqttClient* client)
void MqttTests::disconnectAndWait(MqttClient* client)
{
QSignalSpy disconnectedSpy(client, &MqttClient::disconnected);
client->disconnectFromHost();
@ -141,7 +73,7 @@ void OperationTests::disconnectAndWait(MqttClient* client)
}
}
bool OperationTests::subscribeAndWait(MqttClient* client, const QString &topic, Mqtt::QoS qos)
bool MqttTests::subscribeAndWait(MqttClient* client, const QString &topic, Mqtt::QoS qos)
{
QSignalSpy subscribedSpy(client, &MqttClient::subscribeResult);
quint16 packetId = client->subscribe(topic, qos);
@ -152,24 +84,18 @@ bool OperationTests::subscribeAndWait(MqttClient* client, const QString &topic,
return subscribedSpy.count() == 1 && subscribedSpy.first().at(0).toInt() == packetId && subscribedSpy.first().at(1).value<Mqtt::SubscribeReturnCodes>().first() == expectedSubscribeReturnCode;
}
void OperationTests::initTestCase()
void MqttTests::initTestCase()
{
// QLoggingCategory::setFilterRules("nymea.mqtt.protocol.debug=false");
m_server = new MqttServer(this);
bool registered = false;
quint16 attempts = 0;
do {
registered = m_server->listen(QHostAddress(m_serverHost), m_serverPort + attempts);
} while(!registered && attempts++ < 20);
m_serverId = startServer(m_server);
QVERIFY2(registered, QString("Failed to register server on %1 from port %2 to %3. Tests won't work.").arg(m_serverHost).arg(m_serverPort).arg(m_serverPort+attempts).toUtf8().data());
m_serverPort += attempts;
QVERIFY2(m_serverId >= 0, "Failed to register server. Tests won't work.");
}
void OperationTests::cleanup()
void MqttTests::cleanup()
{
while (!m_clients.isEmpty()) {
MqttClient *client = m_clients.takeFirst();
@ -179,7 +105,13 @@ void OperationTests::cleanup()
QTRY_COMPARE(m_server->clients().count(), 0);
}
void OperationTests::connectAndDisconnect()
void MqttTests::cleanupTestCase()
{
m_server->close(m_serverId);
delete m_server;
}
void MqttTests::connectAndDisconnect()
{
QSignalSpy serverSpy(m_server, &MqttServer::clientConnected);
@ -206,7 +138,7 @@ void OperationTests::connectAndDisconnect()
QVERIFY2(serverSpyDisconnect.at(0).first() == clientId, "ClientId not matching on server side.");
}
void OperationTests::keepAliveTimesOut()
void MqttTests::keepAliveTimesOut()
{
QSignalSpy keepAliveSpy(m_server, &MqttServer::clientAlive);
MqttClient *client = connectAndWait("keepAlive1sec-client", true, 1);
@ -228,7 +160,7 @@ void OperationTests::keepAliveTimesOut()
QVERIFY2(!client->isConnected(), "Client connection still alive but it should have been dropped");
}
void OperationTests::subscribeAndPublish_data()
void MqttTests::subscribeAndPublish_data()
{
QTest::addColumn<Mqtt::QoS>("qosClient1");
QTest::addColumn<Mqtt::QoS>("qosClient2");
@ -249,7 +181,7 @@ void OperationTests::subscribeAndPublish_data()
}
}
void OperationTests::subscribeAndPublish()
void MqttTests::subscribeAndPublish()
{
QFETCH(Mqtt::QoS, qosClient1);
QFETCH(Mqtt::QoS, qosClient2);
@ -298,7 +230,7 @@ void OperationTests::subscribeAndPublish()
}
void OperationTests::willIsSentOnClientDisappearing()
void MqttTests::willIsSentOnClientDisappearing()
{
MqttClient *client1 = connectAndWait("subWill-client");
MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye");
@ -307,14 +239,14 @@ void OperationTests::willIsSentOnClientDisappearing()
QVERIFY(subscribeAndWait(client1, "#"));
client2->d_ptr->socket->abort();
client2->d_ptr->transport->abort();
QTRY_VERIFY2(publishSpy.count() == 1, "Will has not been sent");
QVERIFY2(publishSpy.first().at(0) == "/testtopic", "Will topic not matching");
QVERIFY2(publishSpy.first().at(1) == "Bye bye", "Will message not matching");
}
void OperationTests::willIsNotSentOnClientDisconnecting()
void MqttTests::willIsNotSentOnClientDisconnecting()
{
MqttClient *client1 = connectAndWait("subWill-client");
MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye");
@ -331,7 +263,7 @@ void OperationTests::willIsNotSentOnClientDisconnecting()
QVERIFY2(publishSpy.count() == 0, "Will has been sent but it should not have been");
}
void OperationTests::testWillRetain()
void MqttTests::testWillRetain()
{
MqttClient *client1 = connectAndWait("subWill-client");
MqttClient *client2 = connectAndWait("pubWill-client", true, 300, "/testtopic", "Bye bye", Mqtt::QoS1, true);
@ -343,7 +275,7 @@ void OperationTests::testWillRetain()
subscribeSpy.wait();
client2->setAutoReconnect(false);
client2->d_ptr->socket->abort();
client2->d_ptr->transport->abort();
QTRY_VERIFY2(publishSpy.count() == 1, "Will has not been sent");
QVERIFY2(publishSpy.first().at(0) == "/testtopic", QString("Will topic not matching: %1").arg(publishSpy.first().at(0).toString()).toUtf8().data());
@ -365,7 +297,7 @@ void OperationTests::testWillRetain()
QTRY_VERIFY2(clearRetainSpy.count() == 1, "Clearing retain message did not succeed");
}
void OperationTests::testAutoReconnect()
void MqttTests::testAutoReconnect()
{
MqttClient *client1 = connectAndWait("client1");
client1->setAutoReconnect(true);
@ -373,13 +305,13 @@ void OperationTests::testAutoReconnect()
QSignalSpy disconnectedSpy(client1, &MqttClient::disconnected);
QSignalSpy connectedSpy(client1, &MqttClient::connected);
client1->d_ptr->socket->abort();
client1->d_ptr->transport->abort();
QTRY_VERIFY2(disconnectedSpy.count() == 1, "client did not emit disconnected");
QTRY_VERIFY2(connectedSpy.count() == 1, "client did not emit connected");
}
void OperationTests::testQoS1Retransmissions()
void MqttTests::testQoS1Retransmissions()
{
QSignalSpy serverSpy(m_server, &MqttServer::publishReceived);
@ -388,9 +320,9 @@ void OperationTests::testQoS1Retransmissions()
// publish a packet, flush the pipe and immediately drop the connection before we have a chance to receive the PUBACK
int packetId = client->publish("/testtopic", "Hello world", Mqtt::QoS1);
client->d_ptr->socket->flush();
client->d_ptr->transport->flush();
QSignalSpy connectedSpy(client, &MqttClient::connected);
client->d_ptr->socket->abort();
client->d_ptr->transport->abort();
// Wait for it to reconnect, it should then republish the packet
connectedSpy.wait();
@ -407,7 +339,7 @@ void OperationTests::testQoS1Retransmissions()
QCOMPARE(serverSpy.at(1).at(3).toString(), QString("Hello world"));
}
void OperationTests::testMultiSubscription()
void MqttTests::testMultiSubscription()
{
MqttClient *client = connectAndWait("subscription-topics");
QSignalSpy subscribedSpy(client, &MqttClient::subscribeResult);
@ -422,7 +354,7 @@ void OperationTests::testMultiSubscription()
QCOMPARE(retCodes, subscriptionReturnCodes);
}
void OperationTests::testSubscriptionTopicFilters_data()
void MqttTests::testSubscriptionTopicFilters_data()
{
QTest::addColumn<QString>("topicFilter");
QTest::addColumn<Mqtt::SubscribeReturnCode>("subscriptionReturnCode");
@ -445,7 +377,7 @@ void OperationTests::testSubscriptionTopicFilters_data()
QTest::newRow("+/a/#") << "+/a/#" << Mqtt::SubscribeReturnCodeSuccessQoS0;
}
void OperationTests::testSubscriptionTopicFilters()
void MqttTests::testSubscriptionTopicFilters()
{
QFETCH(QString, topicFilter);
QFETCH(Mqtt::SubscribeReturnCode, subscriptionReturnCode);
@ -459,7 +391,7 @@ void OperationTests::testSubscriptionTopicFilters()
QCOMPARE(retCodes.first(), subscriptionReturnCode);
}
void OperationTests::testSubscriptionTopicMatching_data()
void MqttTests::testSubscriptionTopicMatching_data()
{
QTest::addColumn<QString>("topicFilter");
QTest::addColumn<QString>("topic");
@ -518,7 +450,7 @@ void OperationTests::testSubscriptionTopicMatching_data()
}
}
void OperationTests::testSubscriptionTopicMatching()
void MqttTests::testSubscriptionTopicMatching()
{
QFETCH(QString, topicFilter);
QFETCH(QString, topic);
@ -546,7 +478,7 @@ void OperationTests::testSubscriptionTopicMatching()
QVERIFY2(publishReceivedSpy.count() == receivedPublishMessageCount, QString("PublishReceived signal not received the expected amount of time.\nActual: %1\nExpected: %2").arg(publishReceivedSpy.count()).arg(receivedPublishMessageCount).toUtf8().data());
}
void OperationTests::testSessionManagementDropOldSession()
void MqttTests::testSessionManagementDropOldSession()
{
MqttClient *client1Session1 = connectAndWait("client1");
client1Session1->setAutoReconnect(false);
@ -578,7 +510,7 @@ void OperationTests::testSessionManagementDropOldSession()
QVERIFY2(client1PublishReceivedSpy.count() == 0, "Client 1 did receive the publish but it should not have.");
}
void OperationTests::testSessionManagementResumeOldSession()
void MqttTests::testSessionManagementResumeOldSession()
{
MqttClient *client1Session1 = connectAndWait("client1");
client1Session1->setAutoReconnect(false);
@ -609,7 +541,7 @@ void OperationTests::testSessionManagementResumeOldSession()
QTRY_VERIFY2(client1PublishReceivedSpy.count() == 1, "Client 1 did not receive the publish but it should have.");
}
void OperationTests::testSessionManagementFailResumeOldSession()
void MqttTests::testSessionManagementFailResumeOldSession()
{
// try to resume non existing session
QPair<MqttClient*, QSignalSpy*> client = connectToServer("client1", false);
@ -619,7 +551,7 @@ void OperationTests::testSessionManagementFailResumeOldSession()
QVERIFY2(!client.second->first().at(0).value<Mqtt::ConnackFlags>().testFlag(Mqtt::ConnackFlagSessionPresent), "Session present flag is set while it should not be.");
}
void OperationTests::testQoS1PublishToServerIsAckedOnSessionResume()
void MqttTests::testQoS1PublishToServerIsAckedOnSessionResume()
{
MqttClient *client = connectAndWait("client1", true);
client->setAutoReconnect(true);
@ -628,8 +560,8 @@ void OperationTests::testQoS1PublishToServerIsAckedOnSessionResume()
QSignalSpy publishedSpy(client, &MqttClient::published);
client->publish("/testtopic", "Hello world", Mqtt::QoS1);
client->d_ptr->socket->flush();
client->d_ptr->socket->abort();
client->d_ptr->transport->flush();
client->d_ptr->transport->abort();
QVERIFY2(publishedSpy.count() == 0, "Should not have received the PUBACK yet... Test is bad.");
@ -639,7 +571,7 @@ void OperationTests::testQoS1PublishToServerIsAckedOnSessionResume()
}
void OperationTests::testQoS1PublishToClientIsDeliveredOnSessionResume()
void MqttTests::testQoS1PublishToClientIsDeliveredOnSessionResume()
{
MqttClient *oldClient1 = connectAndWait("client1", true);
QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribeResult);
@ -647,25 +579,22 @@ void OperationTests::testQoS1PublishToClientIsDeliveredOnSessionResume()
QTRY_VERIFY(subscribedSpy.count() == 1);
// prevent the client from receiving anything
oldClient1->d_ptr->socket->blockSignals(true);
oldClient1->d_ptr->transport->blockSignals(true);
// pbulish something with a second client
// publish something with a second client
MqttClient *client2 = connectAndWait("client2");
QSignalSpy publishedSpy(client2, &MqttClient::published);
client2->publish("/testtopic", "Hello world", Mqtt::QoS1);
QTRY_VERIFY(publishedSpy.count() == 1);
// Resume (take over) old session and make sure we got the publish
MqttClient *newClient1 = new MqttClient("client1", this);
m_clients.append(newClient1); // let cleanupTestcase() clean it up
MqttClient *newClient1 = connectToServer("client1", false).first;;
QSignalSpy publishReceivedSpy(newClient1, &MqttClient::publishReceived);
newClient1->connectToHost(m_serverHost, m_serverPort, false);
QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Client did not receive publish packet upon session resume");
}
void OperationTests::testQoS2PublishToServerIsCompletedOnSessionResume()
void MqttTests::testQoS2PublishToServerIsCompletedOnSessionResume()
{
MqttClient *client = connectAndWait("client1", true);
client->setAutoReconnect(true);
@ -674,8 +603,8 @@ void OperationTests::testQoS2PublishToServerIsCompletedOnSessionResume()
QSignalSpy publishedSpy(client, &MqttClient::published);
client->publish("/testtopic", "Hello world", Mqtt::QoS2);
client->d_ptr->socket->flush();
client->d_ptr->socket->abort();
client->d_ptr->transport->flush();
client->d_ptr->transport->abort();
QVERIFY2(publishedSpy.count() == 0, "Should not have received the PUBACK yet... Test is bad.");
@ -684,7 +613,7 @@ void OperationTests::testQoS2PublishToServerIsCompletedOnSessionResume()
QTRY_VERIFY2(publishedSpy.count() == 1, "Published signal not emitted after reconnect");
}
void OperationTests::testQoS2PublishToClientIsCompletedOnSessionResume()
void MqttTests::testQoS2PublishToClientIsCompletedOnSessionResume()
{
MqttClient *oldClient1 = connectAndWait("client1", true);
QSignalSpy subscribedSpy(oldClient1, &MqttClient::subscribeResult);
@ -692,7 +621,7 @@ void OperationTests::testQoS2PublishToClientIsCompletedOnSessionResume()
QTRY_VERIFY(subscribedSpy.count() == 1);
// prevent the client from receiving anything
oldClient1->d_ptr->socket->blockSignals(true);
oldClient1->d_ptr->transport->blockSignals(true);
// pbulish something with a second client
MqttClient *client2 = connectAndWait("client2");
@ -701,16 +630,13 @@ void OperationTests::testQoS2PublishToClientIsCompletedOnSessionResume()
QTRY_VERIFY(publishedSpy.count() == 1);
// Resume (take over) old session and make sure we got the publish
MqttClient *newClient1 = new MqttClient("client1", this);
m_clients.append(newClient1); // let cleanupTestcase() clean it up
MqttClient *newClient1 = connectToServer("client1", false).first;
QSignalSpy publishReceivedSpy(newClient1, &MqttClient::publishReceived);
newClient1->connectToHost(m_serverHost, m_serverPort, false);
QTRY_VERIFY2(publishReceivedSpy.count() == 1, "Client did not receive publish packet upon session resume");
}
void OperationTests::testRetain()
void MqttTests::testRetain()
{
MqttClient *client1 = connectAndWait("client1", true);
@ -798,7 +724,7 @@ void OperationTests::testRetain()
}
void OperationTests::testUnsubscribe()
void MqttTests::testUnsubscribe()
{
MqttClient *client1 = connectAndWait("client1");
QVERIFY(subscribeAndWait(client1, "testtopic"));
@ -830,7 +756,7 @@ void OperationTests::testUnsubscribe()
QVERIFY2(publishReceivedSpy.count() == 0, "Received publish packet even though we should not have");
}
void OperationTests::testEmptyClientId()
void MqttTests::testEmptyClientId()
{
MqttClient *client1 = connectAndWait("");
QVERIFY2(client1->isConnected(), "Client did not connect");
@ -844,7 +770,7 @@ void OperationTests::testEmptyClientId()
QTRY_VERIFY2(client3.first->isConnected() == false, "Connection should have been dropped");
}
void OperationTests::testBinaryPaylaod()
void MqttTests::testBinaryPaylaod()
{
MqttClient *client = connectAndWait("");
QVERIFY2(client->isConnected(), "Client did not connect");
@ -858,6 +784,3 @@ void OperationTests::testBinaryPaylaod()
}
#endif
QTEST_MAIN(OperationTests)
#include "test_operation.moc"

108
tests/common/mqtttests.h Normal file
View File

@ -0,0 +1,108 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2020, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqttserver.h"
#include "mqttclient.h"
#include "mqttclient_p.h"
#include <QTest>
#include <QSignalSpy>
class MqttTests: public QObject
{
Q_OBJECT
#if (QT_VERSION >= QT_VERSION_CHECK(5, 7, 0))
private slots:
void initTestCase();
void cleanup();
void cleanupTestCase();
void connectAndDisconnect();
void keepAliveTimesOut();
void subscribeAndPublish_data();
void subscribeAndPublish();
void willIsSentOnClientDisappearing();
void willIsNotSentOnClientDisconnecting();
void testWillRetain();
void testAutoReconnect();
void testQoS1Retransmissions();
void testMultiSubscription();
void testSubscriptionTopicFilters_data();
void testSubscriptionTopicFilters();
void testSubscriptionTopicMatching_data();
void testSubscriptionTopicMatching();
void testSessionManagementDropOldSession();
void testSessionManagementResumeOldSession();
void testSessionManagementFailResumeOldSession();
void testQoS1PublishToServerIsAckedOnSessionResume();
void testQoS1PublishToClientIsDeliveredOnSessionResume();
void testQoS2PublishToServerIsCompletedOnSessionResume();
void testQoS2PublishToClientIsCompletedOnSessionResume();
void testRetain();
void testUnsubscribe();
void testEmptyClientId();
void testBinaryPaylaod();
#endif
private:
// Connects and waits for the MQTT CONNECT to be finished
MqttClient *connectAndWait(const QString &clientId, bool cleanSession = true, quint16 keepAlive = 300, const QString &willTopic = QString(), const QString &willMessage = QString(), Mqtt::QoS willQoS = Mqtt::QoS0, bool willRetain = false);
// Just connects, returns the client and signalspy which has been created before calling connect. You must delete the spy yourself!
QPair<MqttClient*, QSignalSpy*> connectToServer(const QString &clientId, bool cleanSession = true, quint16 keepAlive = 300, const QString &willTopic = QString(), const QString &willMessage = QString(), Mqtt::QoS willQoS = Mqtt::QoS0, bool willRetain = false);
void disconnectAndWait(MqttClient* client);
bool subscribeAndWait(MqttClient* client, const QString &topic, Mqtt::QoS qos = Mqtt::QoS1);
virtual int startServer(MqttServer *server) = 0;
virtual void connectClientToServer(MqttClient *client, bool cleanSession) = 0;
private:
MqttServer *m_server = nullptr;
int m_serverId = -1;
QList<MqttClient*> m_clients;
};

4
tests/tcp/tcp.pro Normal file
View File

@ -0,0 +1,4 @@
include(../common/common.pri)
SOURCES += test_tcp.cpp

63
tests/tcp/test_tcp.cpp Normal file
View File

@ -0,0 +1,63 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2020, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqttserver.h"
#include "mqttclient.h"
#include "mqttclient_p.h"
#include <QTest>
#include <QSignalSpy>
#include "../common/mqtttests.h"
class TcpTests: public MqttTests
{
Q_OBJECT
private:
int startServer(MqttServer *server) override;
void connectClientToServer(MqttClient *client, bool cleanSession) override;
QString m_serverHost = "127.0.0.1";
quint16 m_serverPort = 5555;
};
int TcpTests::startServer(MqttServer *server)
{
return server->listen(QHostAddress(m_serverHost), m_serverPort);
}
void TcpTests::connectClientToServer(MqttClient *client, bool cleanSession)
{
qDebug() << "Connecting to TCP";
client->connectToHost(m_serverHost, m_serverPort, cleanSession);
}
QTEST_MAIN(TcpTests)
#include "test_tcp.moc"

View File

@ -1,3 +1,3 @@
TEMPLATE = subdirs
SUBDIRS += operation
SUBDIRS += tcp websocket

View File

@ -0,0 +1,68 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Copyright 2013 - 2020, nymea GmbH
* Contact: contact@nymea.io
*
* This file is part of nymea.
* This project including source code and documentation is protected by copyright law, and
* remains the property of nymea GmbH. All rights, including reproduction, publication,
* editing and translation, are reserved. The use of this project is subject to the terms of a
* license agreement to be concluded with nymea GmbH in accordance with the terms
* of use of nymea GmbH, available under https://nymea.io/license
*
* GNU Lesser General Public License Usage
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation; version 3.
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this project.
* If not, see <https://www.gnu.org/licenses/>.
*
* For any further details and any questions please contact us under contact@nymea.io
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqttserver.h"
#include "mqttclient.h"
#include "mqttclient_p.h"
#include <QTest>
#include <QSignalSpy>
#include "../common/mqtttests.h"
class WebSocketTests: public MqttTests
{
Q_OBJECT
private:
int startServer(MqttServer *server) override;
void connectClientToServer(MqttClient *client, bool cleanSession) override;
QString m_serverHost = "127.0.0.1";
quint16 m_serverPort = 5556;
};
int WebSocketTests::startServer(MqttServer *server)
{
return server->listenWebSocket(QHostAddress(m_serverHost), m_serverPort);
}
void WebSocketTests::connectClientToServer(MqttClient *client, bool cleanSession)
{
QUrl url;
url.setScheme("ws");
url.setHost(m_serverHost);
url.setPort(m_serverPort);
QNetworkRequest request(url);
qDebug() << "Connecting to WebSocket";
client->connectToHost(request, cleanSession);
}
QTEST_MAIN(WebSocketTests)
#include "test_websocket.moc"

View File

@ -0,0 +1,4 @@
include(../common/common.pri)
SOURCES += test_websocket.cpp