This repository has been archived on 2026-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
powersync-mqtt/libnymea-mqtt/mqttserver.cpp
2018-11-14 02:37:23 +01:00

600 lines
24 KiB
C++

/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* *
* Copyright (C) 2018 Michael Zanetti <michael.zanetti@guh.io> *
* *
* This file is part of nymea. *
* *
* nymea is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, version 2 of the License. *
* *
* nymea is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with nymea. If not, see <http://www.gnu.org/licenses/>. *
* *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include "mqttserver.h"
#include "mqttserver_p.h"
#include "mqttpacket.h"
#include <QDebug>
#include <QDataStream>
#include <QUuid>
#include <QtGlobal>
#include <QRegExp>
Q_LOGGING_CATEGORY(dbgServer, "nymea.mqtt.server")
MqttServerPrivate::MqttServerPrivate(MqttServer *q):
QObject(q),
q_ptr(q)
{
qRegisterMetaType<Mqtt::QoS>();
server = new QTcpServer(this);
connect(server, &QTcpServer::newConnection, this, &MqttServerPrivate::onNewConnection);
}
QHash<QString, quint16> MqttServerPrivate::publish(const QString &topic, const QByteArray &payload)
{
QHash<QTcpSocket*, Mqtt::QoS> receivers;
foreach (QTcpSocket *c, clientList.keys()) {
foreach (const MqttSubscription &subscription, clientList.value(c)->subscriptions) {
if (matchTopic(subscription.topicFilter(), topic)) {
if (!receivers.contains(c) || receivers.value(c) < subscription.qoS()) {
receivers[c] = subscription.qoS();
}
}
}
}
QHash<QString, quint16> packets;
foreach (QTcpSocket *receiver, receivers.keys()) {
ClientContext *ctx = clientList.value(receiver);
qCDebug(dbgServer) << "Relaying packet to subscribed client:" << ctx->clientId;
Mqtt::QoS qos = receivers.value(receiver);
MqttPacket packet(MqttPacket::TypePublish, qos >= Mqtt::QoS0 ? newPacketId(ctx) : 0, qos);
packet.setTopic(topic.toUtf8());
packet.setPayload(payload);
receiver->write(packet.serialize());
packets.insert(ctx->clientId, packet.packetId());
if (packet.qos() == Mqtt::QoS0) {
QString clientId = ctx->clientId;
QTimer::singleShot(0, this, [this, clientId, packet](){
emit q_ptr->published(clientId, packet.packetId(), packet.topic(), packet.payload());
});
} else {
ClientContext *ctx = clientList.value(receiver);
ctx->unackedPackets.insert(packet.packetId(), packet);
ctx->unackedPacketList.append(packet.packetId());
}
}
return packets;
}
MqttServer::MqttServer(QObject *parent):
QObject(parent),
d_ptr(new MqttServerPrivate(this))
{
}
Mqtt::QoS MqttServer::maximumSubscriptionsQoS() const
{
return d_ptr->maximumSubscriptionQoS;
}
void MqttServer::setMaximumSubscriptionsQoS(Mqtt::QoS maximumSubscriptionQoS)
{
d_ptr->maximumSubscriptionQoS = maximumSubscriptionQoS;
}
bool MqttServer::listen(const QHostAddress &address, quint16 port, MqttUserValidator *userValidator)
{
d_ptr->userValidator = userValidator;
if (!d_ptr->server->listen(address, port)) {
qCWarning(dbgServer()) << "Error listening on port" << port;
return false;
}
qCDebug(dbgServer) << "nymea MQTT server running on" << address.toString() << ":" << port;
return true;
}
bool MqttServer::isListening() const
{
return d_ptr->server->isListening();
}
void MqttServer::close()
{
d_ptr->server->close();
}
QStringList MqttServer::clients() const
{
QStringList clientIds;
foreach (ClientContext *ctx, d_ptr->clientList) {
clientIds << ctx->clientId;
}
return clientIds;
}
QHash<QString, quint16> MqttServer::publish(const QString &topic, const QByteArray &payload)
{
return d_ptr->publish(topic, payload);
}
void MqttServerPrivate::onNewConnection()
{
QTcpSocket *client = server->nextPendingConnection();
// Start a 10 second timer to clean up the connection if we don't get data until then.
QTimer *timeoutTimer = new QTimer(this);
connect(timeoutTimer, &QTimer::timeout, this, [this, client]() {
qCWarning(dbgServer) << "A client connected but did not send data in 10 seconds. Dropping connection.";
client->abort();
pendingConnections.take(client)->deleteLater();
client->deleteLater();
});
timeoutTimer->start(10000);
pendingConnections.insert(client, timeoutTimer);
connect(client, &QTcpSocket::readyRead, this, &MqttServerPrivate::onClientReadyRead);
connect(client, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(onClientError(QAbstractSocket::SocketError)));
connect(client, &QTcpSocket::disconnected, this, &MqttServerPrivate::onClientDisconnected);
}
void MqttServerPrivate::onClientDisconnected()
{
QTcpSocket *client = static_cast<QTcpSocket*>(sender());
cleanupClient(client);
}
void MqttServerPrivate::cleanupClient(QTcpSocket *client)
{
if (clientBuffers.contains(client)) {
clientBuffers.remove(client);
}
if (clientList.contains(client)) {
ClientContext *ctx = clientList.value(client);
qCDebug(dbgServer) << "Client" << ctx->clientId << "disconnected.";
ctx->keepAliveTimer.stop();
if (!ctx->willTopic.isEmpty()) {
qCDebug(dbgServer) << "Publishing will message for client" << ctx->clientId << "on topic" << ctx->willTopic << "( Retain:" << ctx->willRetain << ")";
MqttPacket willPacket(MqttPacket::TypePublish, ctx->willQoS >= Mqtt::QoS1 ? newPacketId(ctx) : 0, ctx->willQoS, ctx->willRetain);
willPacket.setTopic(ctx->willTopic);
willPacket.setPayload(ctx->willMessage);
processPacket(willPacket, client);
}
emit q_ptr->clientDisconnected(ctx->clientId);
clientList.remove(client);
delete ctx;
}
client->flush();
client->deleteLater();
}
void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *client)
{
if (packet.type() == MqttPacket::TypeConnect) {
if (clientList.contains(client)) {
ClientContext *ctx = clientList.value(client);
qCWarning(dbgServer) << "Client" << ctx->clientId << "sends duplicate CONNECT packets. Dropping connection.";
cleanupClient(client);
return;
}
MqttPacket response(MqttPacket::TypeConnack, packet.packetId());
if (packet.protocolLevel() != Mqtt::Protocol311) {
qCWarning(dbgServer) << "This MQTT broker only supports Protocol version 3.1.1";
response.setConnectReturnCode(Mqtt::ConnectReturnCodeUnacceptableProtocolVersion);
client->write(response.serialize());
cleanupClient(client);
return;
}
QString clientId = packet.clientId();
if (clientId.isEmpty()) {
if (!packet.cleanSession()) {
qCWarning(dbgServer) << "Empty client id provided but clean session flag not set. Rejecting connection.";
response.setConnectReturnCode(Mqtt::ConnectReturnCodeIdentifierRejected);
client->write(response.serialize());
cleanupClient(client);
return;
}
clientId = QUuid::createUuid().toString().remove(QRegExp("[{}-]*"));
}
if (userValidator) {
QString username;
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagUsername)) {
username = packet.username();
}
QString password;
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagPassword)) {
password = packet.password();
}
Mqtt::ConnectReturnCode userValidationReturnCode = userValidator->validateConnect(clientId, username, password, client->peerAddress());
if (userValidationReturnCode != Mqtt::ConnectReturnCodeAccepted) {
qCWarning(dbgServer) << "Rejecting connection due to user validation.";
response.setConnectReturnCode(userValidationReturnCode);
client->write(response.serialize());
cleanupClient(client);
return;
}
}
ClientContext *ctx = nullptr;
QList<QTcpSocket*> existingSockets = clientList.keys();
for (int i = 0; i < existingSockets.count(); i++) {
QTcpSocket *existingClient = existingSockets.at(i);
if (clientId == clientList.value(existingClient)->clientId) {
if (!packet.connectFlags().testFlag(Mqtt::ConnectFlagCleanSession)) {
qCDebug(dbgServer).nospace() << clientId << ": Already have a session for this client ID. Taking over existing session.";
response.setConnackFlags(Mqtt::ConnackFlagSessionPresent);
ctx = clientList.value(existingClient);
// remove old client manually, we don't want to clean up the context, nor send any will message or emit disconnected signals
clientList.remove(existingClient);
clientBuffers.remove(existingClient);
existingClient->flush();
existingClient->deleteLater();
} else {
qCDebug(dbgServer).nospace() << clientId << ": Already have a session for this client ID. Dropping old session.";
cleanupClient(existingClient);
}
break;
}
}
if (!ctx) {
if (!packet.connectFlags().testFlag(Mqtt::ConnectFlagCleanSession)) {
qCWarning(dbgServer).nospace() << clientId << ": Request to take over existing session but we don't have an existing session.";
}
ctx = new ClientContext();
ctx->clientId = clientId;
connect(&ctx->keepAliveTimer, &QTimer::timeout, this, [this, client](){
qCWarning(dbgServer) << "Keep alive timeout reached for client:" << clientList.value(client)->clientId;
cleanupClient(client);
});
}
ctx->keepAlive = packet.keepAlive();
ctx->version = packet.protocolLevel();
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWill)) {
ctx->willTopic = packet.willTopic();
ctx->willMessage = packet.willMessage();
ctx->willRetain = packet.willRetain();
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWillQoS2)) {
ctx->willQoS = Mqtt::QoS2;
} else if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWillQoS1)) {
ctx->willQoS = Mqtt::QoS1;
}
}
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagUsername)) {
ctx->username = packet.username();
}
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagPassword)) {
}
qCDebug(dbgServer).nospace().noquote()
<< "New MQTT client: \"" << clientId << '\"'
<< ", Protocol: " << packet.protocolName() << " (" << packet.protocolLevel() << ')'
<< ", Flags: " << packet.connectFlags()
<< ", KeepAlive: " << packet.keepAlive()
<< ", Will Topic: \"" << packet.willTopic() << '\"'
<< ", Will Message: \"" << packet.willMessage() << '\"'
<< ", Will Retain: " << packet.willRetain()
<< ", Username: " << packet.username()
<< ", Password: " << QString(packet.password()).replace(QRegExp("."), "*");
if (ctx->keepAlive > 0) {
ctx->keepAliveTimer.start(ctx->keepAlive * 1500);
}
clientList.insert(client, ctx);
response.setConnectReturnCode(Mqtt::ConnectReturnCodeAccepted);
client->write(response.serialize());
emit q_ptr->clientConnected(ctx->clientId, ctx->username, client->peerAddress());
foreach (quint16 retryPacketId, ctx->unackedPacketList) {
qCDebug(dbgServer) << "Resending unacked packet" << retryPacketId << "to" << ctx->clientId;;
MqttPacket retryPacket = ctx->unackedPackets.value(retryPacketId);
retryPacket.setDup(true);
client->write(retryPacket.serialize());
}
return;
}
if (!clientList.contains(client)) {
qCWarning(dbgServer) << "Protocol error: Client connection did not send CONNECT yet. Dropping connection.";
client->close();
return;
}
ClientContext *ctx = clientList.value(client);
if (ctx->keepAlive > 0) {
ctx->keepAliveTimer.start();
}
emit q_ptr->clientAlive(ctx->clientId);
if (packet.type() == MqttPacket::TypePublish) {
qCDebug(dbgServer).nospace() << "Publish received from client " << ctx->clientId << ": Topic: " << packet.topic() << ", Payload: " << packet.payload() << " (Packet ID: " << packet.packetId() << ", DUP: " << packet.dup() << ", QoS: " << packet.qos() << ", Retain: " << packet.retain() << ')';
emit q_ptr->publishReceived(ctx->clientId, packet.packetId(), packet.topic(), packet.payload(), packet.dup());
switch (packet.qos()) {
case Mqtt::QoS0:
break;
case Mqtt::QoS1: {
MqttPacket response(MqttPacket::TypePuback, packet.packetId());
client->write(response.serialize());
break;
}
case Mqtt::QoS2: {
if (packet.dup() && ctx->unackedPacketList.contains(packet.packetId())) {
// We received this message before but the client keeps on trying... Just send a PUBREC and stop processing
client->write(ctx->unackedPackets.value(packet.packetId()).serialize());
return;
} else if (ctx->unackedPacketList.contains(packet.packetId())) {
// Hmm... Client says this is a new packet, but the ID is not released yet! Drop client connection.
qCWarning(dbgServer()).nospace() << "Received a bad packet from \"" << ctx->clientId << "\". DUP is not set but packet ID is already used and not released. Dropping client connection.";
cleanupClient(client);
return;
}
// Ok, a new packet, ack it with a PUBREC and store the number
MqttPacket response(MqttPacket::TypePubrec, packet.packetId());
ctx->unackedPackets.insert(response.packetId(), response);
ctx->unackedPacketList.append(packet.packetId());
client->write(response.serialize());
break;
}
}
if (packet.retain()) {
if (packet.payload().isEmpty()) {
qCDebug(dbgServer) << "Clearing retained messages for topic" << packet.topic();
retainedMessages.remove(packet.topic());
} else {
if (packet.qos() == Mqtt::QoS0) {
qCDebug(dbgServer) << "Clearing retained messages for topic" << packet.topic();
retainedMessages.remove(packet.topic());
}
qCDebug(dbgServer) << "Adding retained message for topic" << packet.topic();
retainedMessages[packet.topic()].append(packet);
}
}
publish(packet.topic(), packet.payload());
return;
}
if (packet.type() == MqttPacket::TypePuback) {
ctx->unackedPacketList.removeAll(packet.packetId());
MqttPacket publishedPacket = ctx->unackedPackets.take(packet.packetId());
emit q_ptr->published(ctx->clientId, packet.packetId(), publishedPacket.topic(), publishedPacket.payload());
return;
}
if (packet.type() == MqttPacket::TypePubrec) {
MqttPacket publishedPacket = ctx->unackedPackets.take(packet.packetId());
emit q_ptr->published(ctx->clientId, packet.packetId(), publishedPacket.topic(), publishedPacket.payload());
MqttPacket pubrel(MqttPacket::TypePubrel, packet.packetId());
ctx->unackedPackets.insert(packet.packetId(), pubrel);
client->write(pubrel.serialize());
return;
}
if (packet.type() == MqttPacket::TypePubrel) {
ctx->unackedPackets.remove(packet.packetId());
ctx->unackedPacketList.removeAll(packet.packetId());
MqttPacket response(MqttPacket::TypePubcomp, packet.packetId());
client->write(response.serialize());
return;
}
if (packet.type() == MqttPacket::TypePubcomp) {
ctx->unackedPackets.remove(packet.packetId());
ctx->unackedPacketList.removeAll(packet.packetId());
return;
}
if (packet.type() == MqttPacket::TypeSubscribe) {
// qCDebug(dbgServer).nospace() << ctx->clientId ": Subscribe packet received.";
MqttPacket response(MqttPacket::TypeSuback, packet.packetId());
QByteArray payload;
foreach (MqttSubscription subscription, packet.subscriptions()) {
if (userValidator && !userValidator->validateSubscribe(subscription.topicFilter(), ctx->clientId, ctx->username)) {
qCWarning(dbgServer).nospace().noquote() << "Subscription topic filter not allowed for client \"" << ctx->clientId << "\": \"" << subscription.topicFilter() << '\"';
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeFailure);
continue;
}
if (!validateTopicFilter(subscription.topicFilter())) {
qCWarning(dbgServer).nospace() << "Subscription topic filter not valid for client \"" << ctx->clientId << "\": " << subscription.topicFilter();
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeFailure);
continue;
}
subscription.setQoS(qMin(subscription.qoS(), maximumSubscriptionQoS));
bool updated = false;
for (int i = 0; i < ctx->subscriptions.count(); i++) {
if (ctx->subscriptions.at(i).topicFilter() == subscription.topicFilter()) {
qCDebug(dbgServer).noquote().nospace() << "Client \"" << ctx->clientId << "\" subscribed with a duplicate topic filter. Replacing subcription with new QoS" << subscription.qoS();
ctx->subscriptions.replace(i, subscription);
updated = true;
}
}
if (!updated) {
ctx->subscriptions.append(subscription);
}
qCDebug(dbgServer).noquote().nospace() << "Subscribed client \"" << ctx->clientId << "\" to topic filter: \"" << subscription.topicFilter() << "\" with QoS " << subscription.qoS();
emit q_ptr->clientSubscribed(ctx->clientId, subscription.topicFilter(), subscription.qoS());
switch (subscription.qoS()) {
case Mqtt::QoS0:
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS0);
break;
case Mqtt::QoS1:
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS1);
break;
case Mqtt::QoS2:
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS2);
break;
}
}
client->write(response.serialize());
// Deliver any retained messages for this topic
foreach (MqttSubscription subscription, packet.subscriptions()) {
foreach (const QString &topic, retainedMessages.keys()) {
if (matchTopic(subscription.topicFilter(), topic)) {
foreach (MqttPacket packet, retainedMessages.value(topic)) {
packet.setRetain(true);
client->write(packet.serialize());
}
}
}
}
return;
}
if (packet.type() == MqttPacket::TypeUnsubscribe) {
MqttSubscriptions newSubscriptions;
foreach (const MqttSubscription &existingSubscription, ctx->subscriptions) {
bool matching = false;
foreach (const MqttSubscription &unsub, packet.subscriptions()) {
if (existingSubscription.topicFilter() == unsub.topicFilter()) {
qCDebug(dbgServer) << "Unsubscribing client" << ctx->clientId << "from" << unsub.topicFilter();
emit q_ptr->clientUnsubscribed(ctx->clientId, unsub.topicFilter());
matching = true;
break;
}
}
if (!matching) {
newSubscriptions.append(existingSubscription);
}
}
ctx->subscriptions = newSubscriptions;
MqttPacket response(MqttPacket::TypeUnsuback, packet.packetId());
client->write(response.serialize());
return;
}
if (packet.type() == MqttPacket::TypePingreq) {
// qCDebug(dbgServer).nospace() << ctx->clientId << ": Pingreq received";
MqttPacket response(MqttPacket::TypePingresp, packet.packetId());
client->write(response.serialize());
return;
}
if (packet.type() == MqttPacket::TypeDisconnect) {
ctx->willMessage.clear();
ctx->willTopic.clear();
return;
}
qCWarning(dbgServer).nospace().noquote() << "Unknown packet received from client \"" << ctx->clientId << "\": " << QString::number(packet.type(), 16);
Q_ASSERT(false);
cleanupClient(client);
}
bool MqttServerPrivate::validateTopicFilter(const QString &topicFilter)
{
if (topicFilter.length() < 1) {
return false;
}
QStringList parts = topicFilter.split('/');
for (int i = 0; i < parts.count(); i++) {
const QString &part = parts.at(i);
if (part.contains(QStringLiteral("#")) && (part != QStringLiteral("#") || i != parts.count() - 1)) {
return false;
}
if (part.contains(QStringLiteral("+")) && part != QStringLiteral("+")) {
return false;
}
}
return true;
}
bool MqttServerPrivate::matchTopic(const QString &topicFilter, const QString &topic)
{
if (topic.startsWith('$')) {
return false;
}
QStringList filterParts = topicFilter.split('/');
QStringList topicParts = topic.split('/');
if (topicParts.count() < filterParts.count() - 1) {
return false;
}
for (int i = 0; i < filterParts.count(); i++) {
if (filterParts.at(i) == QStringLiteral("+")) {
continue;
}
if (filterParts.at(i) == QStringLiteral("#")) {
continue;
}
if (topicParts.at(i) == filterParts.at(i)) {
continue;
}
return false;
}
return filterParts.count() == topicParts.count() || topicFilter.endsWith('#');
}
quint16 MqttServerPrivate::newPacketId(ClientContext *ctx)
{
static quint16 packetId = 0;
do {
packetId++;
} while(ctx->unackedPacketList.contains(packetId));
return packetId;
}
void MqttServerPrivate::onClientReadyRead()
{
QTcpSocket* client = static_cast<QTcpSocket*>(sender());
clientBuffers[client].append(client->readAll());
do {
MqttPacket packet;
int ret = packet.parse(clientBuffers[client]);
if (ret == 0) {
qCDebug(dbgServer) << "Packet too short... Waiting for more...";
return;
}
// Ok, we've got a full packet (or garbage data). If this client is still pending
// we can stop the timer, the protocol will take it from here.
if (pendingConnections.contains(client)) {
pendingConnections.take(client)->deleteLater();
}
if (ret == -1) {
qCWarning(dbgServer) << "Bad MQTT packet data, Dropping connection";
cleanupClient(client);
return;
}
clientBuffers[client].remove(0, ret);
processPacket(packet, client);
} while (!clientBuffers.value(client).isEmpty());
}
void MqttServerPrivate::onClientError(QAbstractSocket::SocketError error)
{
// qCWarning(dbgServer) << "Client error:" << error;
}