mirror of https://github.com/nymea/nymea-mqtt
730 lines
29 KiB
C++
730 lines
29 KiB
C++
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
|
|
*
|
|
* Copyright 2013 - 2020, nymea GmbH
|
|
* Contact: contact@nymea.io
|
|
*
|
|
* This file is part of nymea.
|
|
* This project including source code and documentation is protected by copyright law, and
|
|
* remains the property of nymea GmbH. All rights, including reproduction, publication,
|
|
* editing and translation, are reserved. The use of this project is subject to the terms of a
|
|
* license agreement to be concluded with nymea GmbH in accordance with the terms
|
|
* of use of nymea GmbH, available under https://nymea.io/license
|
|
*
|
|
* GNU Lesser General Public License Usage
|
|
* Alternatively, this project may be redistributed and/or modified under the terms of the GNU
|
|
* Lesser General Public License as published by the Free Software Foundation; version 3.
|
|
* this project is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
|
|
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
|
* See the GNU Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public License along with this project.
|
|
* If not, see <https://www.gnu.org/licenses/>.
|
|
*
|
|
* For any further details and any questions please contact us under contact@nymea.io
|
|
* or see our FAQ/Licensing Information on https://nymea.io/license/faq
|
|
*
|
|
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
|
|
|
|
/*!
|
|
\class MqttServer
|
|
\brief A MQTT server implementation
|
|
\inmodule nymea-mqtt
|
|
\ingroup mqtt
|
|
|
|
MqttServer is used to expose a MQTT server interface in the network. The currently supported
|
|
MQTT protocol version is 3.1.1 including SSL encryption support.
|
|
Note: Just starting up such a MqttServer does not provide a full MQTT broker. A MqttServer
|
|
listens on the network for incoming connections, accepts them and parses the network payload into a
|
|
MqttPacket.
|
|
|
|
A MQTT broker implementation, in addition requires to handle permissions and dispatch MQTT messages
|
|
between multiple MqttServer interfaces as well as handle will messages across clients.
|
|
|
|
When implementing a MQTT broker, reimplement MqttAutorizer. Then instantiate one or many MqttServer objects
|
|
(depending on the network interfaces to be exposed). Using \l setAutorizer, the custom autorizer is
|
|
registered at the server and the server will ask back to the autorizer for any incoming connection to
|
|
be authorized.
|
|
*/
|
|
|
|
/*!
|
|
\class MqttAuthorizer
|
|
\brief Authorizer base class for authorizing incoming client connections on an \l MqttServer
|
|
\inmodule nymea-mqtt
|
|
\ingroup mqtt
|
|
|
|
MqttAuthorizer is the base class for authorization handlers in \l MqttServer interfaces.
|
|
The \l MqttServer will call the authorizer methods on any incoming connect, publish or subscribe
|
|
packets. This can be used to check any user/policy database and authorize/reject such requests
|
|
for particular clients.
|
|
*/
|
|
|
|
|
|
#include "mqttserver.h"
|
|
#include "mqttserver_p.h"
|
|
#include "mqttpacket.h"
|
|
|
|
#include <QDebug>
|
|
#include <QDataStream>
|
|
#include <QUuid>
|
|
#include <QtGlobal>
|
|
#include <QRegExp>
|
|
|
|
Q_LOGGING_CATEGORY(dbgServer, "nymea.mqtt.server")
|
|
|
|
MqttServerPrivate::MqttServerPrivate(MqttServer *q):
|
|
QObject(q),
|
|
q_ptr(q)
|
|
{
|
|
qRegisterMetaType<Mqtt::QoS>();
|
|
}
|
|
|
|
QHash<QString, quint16> MqttServerPrivate::publish(const QString &topic, const QByteArray &payload)
|
|
{
|
|
QHash<QTcpSocket*, Mqtt::QoS> receivers;
|
|
foreach (QTcpSocket *c, clientList.keys()) {
|
|
foreach (const MqttSubscription &subscription, clientList.value(c)->subscriptions) {
|
|
if (matchTopic(subscription.topicFilter(), topic)) {
|
|
if (!receivers.contains(c) || receivers.value(c) < subscription.qoS()) {
|
|
receivers[c] = subscription.qoS();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
QHash<QString, quint16> packets;
|
|
foreach (QTcpSocket *receiver, receivers.keys()) {
|
|
ClientContext *ctx = clientList.value(receiver);
|
|
qCDebug(dbgServer) << "Relaying packet to subscribed client:" << ctx->clientId;
|
|
Mqtt::QoS qos = receivers.value(receiver);
|
|
MqttPacket packet(MqttPacket::TypePublish, qos >= Mqtt::QoS0 ? newPacketId(ctx) : 0, qos);
|
|
packet.setTopic(topic.toUtf8());
|
|
packet.setPayload(payload);
|
|
receiver->write(packet.serialize());
|
|
packets.insert(ctx->clientId, packet.packetId());
|
|
if (packet.qos() == Mqtt::QoS0) {
|
|
QString clientId = ctx->clientId;
|
|
QTimer::singleShot(0, this, [this, clientId, packet](){
|
|
emit q_ptr->published(clientId, packet.packetId(), packet.topic(), packet.payload());
|
|
});
|
|
} else {
|
|
ClientContext *ctx = clientList.value(receiver);
|
|
ctx->unackedPackets.insert(packet.packetId(), packet);
|
|
ctx->unackedPacketList.append(packet.packetId());
|
|
}
|
|
}
|
|
return packets;
|
|
}
|
|
|
|
MqttServer::MqttServer(QObject *parent):
|
|
QObject(parent),
|
|
d_ptr(new MqttServerPrivate(this))
|
|
{
|
|
|
|
}
|
|
|
|
Mqtt::QoS MqttServer::maximumSubscriptionsQoS() const
|
|
{
|
|
return d_ptr->maximumSubscriptionQoS;
|
|
}
|
|
|
|
void MqttServer::setMaximumSubscriptionsQoS(Mqtt::QoS maximumSubscriptionQoS)
|
|
{
|
|
d_ptr->maximumSubscriptionQoS = maximumSubscriptionQoS;
|
|
}
|
|
|
|
void MqttServer::setAuthorizer(MqttAuthorizer *authorizer)
|
|
{
|
|
d_ptr->authorizer = authorizer;
|
|
}
|
|
|
|
int MqttServer::listen(const QHostAddress &address, quint16 port, const QSslConfiguration &sslConfiguration)
|
|
{
|
|
SslServer *server = new SslServer(sslConfiguration, this);
|
|
connect(server, &SslServer::clientConnected, d_ptr, &MqttServerPrivate::onClientConnected);
|
|
connect(server, &SslServer::clientDisconnected, d_ptr, &MqttServerPrivate::onClientDisconnected);
|
|
connect(server, &SslServer::dataAvailable, d_ptr, &MqttServerPrivate::onDataAvailable);
|
|
|
|
if (!server->listen(address, port)) {
|
|
qCWarning(dbgServer) << "Error listening on port" << port;
|
|
server->deleteLater();
|
|
return -1;
|
|
}
|
|
static int addressId = -1;
|
|
d_ptr->servers.insert(++addressId, server);
|
|
qCDebug(dbgServer) << "nymea MQTT server running on" << address.toString() << ":" << port << "( Address ID" << addressId << ")";
|
|
return addressId;
|
|
}
|
|
|
|
bool MqttServer::isListening(const QHostAddress &address, quint16 port) const
|
|
{
|
|
foreach (SslServer *server, d_ptr->servers) {
|
|
if (server->serverAddress() == address && server->serverPort() == port && server->isListening()) {
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
QList<int> MqttServer::listeningAddressIds() const
|
|
{
|
|
return d_ptr->servers.keys();
|
|
}
|
|
|
|
void MqttServer::close(int interfaceId)
|
|
{
|
|
if (!d_ptr->servers.contains(interfaceId)) {
|
|
qCWarning(dbgServer) << "No such server address ID" << interfaceId;
|
|
return;
|
|
}
|
|
SslServer *server = d_ptr->servers.take(interfaceId);
|
|
while (!d_ptr->clientServerMap.keys(server).isEmpty()) {
|
|
d_ptr->cleanupClient(d_ptr->clientServerMap.keys(server).first());
|
|
}
|
|
server->close();
|
|
server->deleteLater();
|
|
}
|
|
|
|
QStringList MqttServer::clients() const
|
|
{
|
|
QStringList clientIds;
|
|
foreach (ClientContext *ctx, d_ptr->clientList) {
|
|
clientIds << ctx->clientId;
|
|
}
|
|
return clientIds;
|
|
}
|
|
|
|
void MqttServer::disconnectClient(const QString &clientId)
|
|
{
|
|
foreach (ClientContext *ctx, d_ptr->clientList) {
|
|
if (ctx->clientId == clientId) {
|
|
d_ptr->cleanupClient(d_ptr->clientList.key(ctx));
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
QHash<QString, quint16> MqttServer::publish(const QString &topic, const QByteArray &payload)
|
|
{
|
|
return d_ptr->publish(topic, payload);
|
|
}
|
|
|
|
void MqttServerPrivate::onClientConnected(QSslSocket *client)
|
|
{
|
|
SslServer *server = static_cast<SslServer*>(sender());
|
|
|
|
// Start a 10 second timer to clean up the connection if we don't get data until then.
|
|
QTimer *timeoutTimer = new QTimer(this);
|
|
connect(timeoutTimer, &QTimer::timeout, client, [this, client]() {
|
|
qCWarning(dbgServer) << "A client connected but did not send data in 10 seconds. Dropping connection.";
|
|
client->abort();
|
|
pendingConnections.take(client)->deleteLater();
|
|
client->deleteLater();
|
|
});
|
|
timeoutTimer->start(10000);
|
|
clientServerMap.insert(client, server);
|
|
pendingConnections.insert(client, timeoutTimer);
|
|
}
|
|
|
|
void MqttServerPrivate::onDataAvailable(QSslSocket *client, const QByteArray &data)
|
|
{
|
|
clientBuffers[client].append(data);
|
|
|
|
do {
|
|
MqttPacket packet;
|
|
int ret = packet.parse(clientBuffers[client]);
|
|
if (ret == 0) {
|
|
qCDebug(dbgServer) << "Packet too short... Waiting for more...";
|
|
return;
|
|
}
|
|
|
|
// Ok, we've got a full packet (or garbage data). If this client is still pending
|
|
// we can stop the timer, the protocol will take it from here.
|
|
if (pendingConnections.contains(client)) {
|
|
pendingConnections.take(client)->deleteLater();
|
|
}
|
|
|
|
if (ret == -1) {
|
|
qCWarning(dbgServer) << "Bad MQTT packet data, Dropping connection" << packet.serialize().toHex();
|
|
cleanupClient(client);
|
|
return;
|
|
}
|
|
|
|
clientBuffers[client].remove(0, ret);
|
|
|
|
processPacket(packet, client);
|
|
|
|
} while (!clientBuffers.value(client).isEmpty());
|
|
}
|
|
|
|
void MqttServerPrivate::onClientDisconnected(QSslSocket *client)
|
|
{
|
|
cleanupClient(client);
|
|
}
|
|
|
|
void MqttServerPrivate::cleanupClient(QTcpSocket *client)
|
|
{
|
|
if (clientBuffers.contains(client)) {
|
|
clientBuffers.remove(client);
|
|
}
|
|
if (clientServerMap.contains(client)) {
|
|
clientServerMap.remove(client);
|
|
}
|
|
if (pendingConnections.contains(client)) {
|
|
delete pendingConnections.take(client);
|
|
}
|
|
if (clientList.contains(client)) {
|
|
ClientContext *ctx = clientList.value(client);
|
|
qCDebug(dbgServer) << "Client" << ctx->clientId << "disconnected.";
|
|
ctx->keepAliveTimer.stop();
|
|
|
|
if (!ctx->willTopic.isEmpty()) {
|
|
qCDebug(dbgServer) << "Publishing will message for client" << ctx->clientId << "on topic" << ctx->willTopic << "( Retain:" << ctx->willRetain << ")";
|
|
MqttPacket willPacket(MqttPacket::TypePublish, ctx->willQoS >= Mqtt::QoS1 ? newPacketId(ctx) : 0, ctx->willQoS, ctx->willRetain);
|
|
willPacket.setTopic(ctx->willTopic);
|
|
willPacket.setPayload(ctx->willMessage);
|
|
processPacket(willPacket, client);
|
|
}
|
|
|
|
while (!ctx->subscriptions.isEmpty()) {
|
|
emit q_ptr->clientUnsubscribed(ctx->clientId, ctx->subscriptions.takeFirst().topicFilter());
|
|
}
|
|
|
|
emit q_ptr->clientDisconnected(ctx->clientId);
|
|
|
|
clientList.remove(client);
|
|
delete ctx;
|
|
}
|
|
|
|
if (client->isOpen()) {
|
|
client->flush();
|
|
client->close();
|
|
}
|
|
client->deleteLater();
|
|
}
|
|
|
|
void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *client)
|
|
{
|
|
if (packet.type() == MqttPacket::TypeConnect) {
|
|
if (clientList.contains(client)) {
|
|
ClientContext *ctx = clientList.value(client);
|
|
qCWarning(dbgServer) << "Client" << ctx->clientId << "sends duplicate CONNECT packets. Dropping connection.";
|
|
cleanupClient(client);
|
|
return;
|
|
}
|
|
|
|
MqttPacket response(MqttPacket::TypeConnack, packet.packetId());
|
|
|
|
if (packet.protocolLevel() != Mqtt::Protocol310 && packet.protocolLevel() != Mqtt::Protocol311) {
|
|
qCWarning(dbgServer) << "This MQTT broker only supports Protocol version 3.1.0 and 3.1.1 but client is" << packet.protocolLevel();
|
|
response.setConnectReturnCode(Mqtt::ConnectReturnCodeUnacceptableProtocolVersion);
|
|
client->write(response.serialize());
|
|
cleanupClient(client);
|
|
return;
|
|
}
|
|
|
|
QString clientId = packet.clientId();
|
|
if (clientId.isEmpty()) {
|
|
if (!packet.cleanSession()) {
|
|
qCWarning(dbgServer) << "Empty client id provided but clean session flag not set. Rejecting connection.";
|
|
response.setConnectReturnCode(Mqtt::ConnectReturnCodeIdentifierRejected);
|
|
client->write(response.serialize());
|
|
cleanupClient(client);
|
|
return;
|
|
}
|
|
clientId = QUuid::createUuid().toString().remove(QRegExp("[{}-]*"));
|
|
}
|
|
|
|
if (authorizer) {
|
|
QString username;
|
|
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagUsername)) {
|
|
username = packet.username();
|
|
}
|
|
QString password;
|
|
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagPassword)) {
|
|
password = packet.password();
|
|
}
|
|
SslServer *server = clientServerMap.value(client);
|
|
int serverAddressId = servers.key(server);
|
|
Mqtt::ConnectReturnCode userValidationReturnCode = authorizer->authorizeConnect(serverAddressId, clientId, username, password, client->peerAddress());
|
|
if (userValidationReturnCode != Mqtt::ConnectReturnCodeAccepted) {
|
|
qCWarning(dbgServer) << "Rejecting connection due to user validation.";
|
|
response.setConnectReturnCode(userValidationReturnCode);
|
|
client->write(response.serialize());
|
|
cleanupClient(client);
|
|
return;
|
|
}
|
|
}
|
|
|
|
ClientContext *ctx = nullptr;
|
|
|
|
QList<QTcpSocket*> existingSockets = clientList.keys();
|
|
for (int i = 0; i < existingSockets.count(); i++) {
|
|
QTcpSocket *existingClient = existingSockets.at(i);
|
|
if (clientId == clientList.value(existingClient)->clientId) {
|
|
if (!packet.connectFlags().testFlag(Mqtt::ConnectFlagCleanSession)) {
|
|
qCDebug(dbgServer).nospace() << clientId << ": Already have a session for this client ID. Taking over existing session.";
|
|
|
|
response.setConnackFlags(Mqtt::ConnackFlagSessionPresent);
|
|
ctx = clientList.value(existingClient);
|
|
|
|
// remove old client manually, we don't want to clean up the context, nor send any will message or emit disconnected signals
|
|
clientList.remove(existingClient);
|
|
clientBuffers.remove(existingClient);
|
|
existingClient->flush();
|
|
existingClient->deleteLater();
|
|
} else {
|
|
qCDebug(dbgServer).nospace() << clientId << ": Already have a session for this client ID. Dropping old session.";
|
|
cleanupClient(existingClient);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!ctx) {
|
|
if (!packet.connectFlags().testFlag(Mqtt::ConnectFlagCleanSession)) {
|
|
qCWarning(dbgServer).nospace() << clientId << ": Request to take over existing session but we don't have an existing session.";
|
|
}
|
|
|
|
ctx = new ClientContext();
|
|
ctx->clientId = clientId;
|
|
|
|
connect(&ctx->keepAliveTimer, &QTimer::timeout, this, [this, client](){
|
|
qCWarning(dbgServer) << "Keep alive timeout reached for client:" << clientList.value(client)->clientId;
|
|
cleanupClient(client);
|
|
});
|
|
}
|
|
|
|
ctx->keepAlive = packet.keepAlive();
|
|
ctx->version = packet.protocolLevel();
|
|
|
|
|
|
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWill)) {
|
|
ctx->willTopic = packet.willTopic();
|
|
ctx->willMessage = packet.willMessage();
|
|
ctx->willRetain = packet.willRetain();
|
|
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWillQoS2)) {
|
|
ctx->willQoS = Mqtt::QoS2;
|
|
} else if (packet.connectFlags().testFlag(Mqtt::ConnectFlagWillQoS1)) {
|
|
ctx->willQoS = Mqtt::QoS1;
|
|
}
|
|
}
|
|
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagUsername)) {
|
|
ctx->username = packet.username();
|
|
}
|
|
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagPassword)) {
|
|
}
|
|
|
|
qCDebug(dbgServer).nospace().noquote()
|
|
<< "New MQTT client: \"" << clientId << '\"'
|
|
<< ", Protocol: " << packet.protocolName() << " (" << packet.protocolLevel() << ')'
|
|
<< ", Flags: " << packet.connectFlags()
|
|
<< ", KeepAlive: " << packet.keepAlive()
|
|
<< ", Will Topic: \"" << packet.willTopic() << '\"'
|
|
<< ", Will Message: \"" << packet.willMessage() << '\"'
|
|
<< ", Will Retain: " << packet.willRetain()
|
|
<< ", Username: " << packet.username()
|
|
<< ", Password: " << QString(packet.password()).replace(QRegExp("."), "*");
|
|
|
|
if (ctx->keepAlive > 0) {
|
|
ctx->keepAliveTimer.start(ctx->keepAlive * 1500);
|
|
}
|
|
|
|
clientList.insert(client, ctx);
|
|
response.setConnectReturnCode(Mqtt::ConnectReturnCodeAccepted);
|
|
client->write(response.serialize());
|
|
emit q_ptr->clientConnected(servers.key(clientServerMap.value(client)), ctx->clientId, ctx->username, client->peerAddress());
|
|
|
|
foreach (quint16 retryPacketId, ctx->unackedPacketList) {
|
|
qCDebug(dbgServer) << "Resending unacked packet" << retryPacketId << "to" << ctx->clientId;;
|
|
MqttPacket retryPacket = ctx->unackedPackets.value(retryPacketId);
|
|
retryPacket.setDup(true);
|
|
client->write(retryPacket.serialize());
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (!clientList.contains(client)) {
|
|
qCWarning(dbgServer) << "Protocol error: Client connection did not send CONNECT yet. Dropping connection.";
|
|
client->close();
|
|
return;
|
|
}
|
|
|
|
ClientContext *ctx = clientList.value(client);
|
|
if (ctx->keepAlive > 0) {
|
|
ctx->keepAliveTimer.start();
|
|
}
|
|
emit q_ptr->clientAlive(ctx->clientId);
|
|
|
|
if (packet.type() == MqttPacket::TypePublish) {
|
|
qCDebug(dbgServer).nospace() << "Publish received from client " << ctx->clientId << ": Topic: " << packet.topic() << ", Payload: " << packet.payload() << " (Packet ID: " << packet.packetId() << ", DUP: " << packet.dup() << ", QoS: " << packet.qos() << ", Retain: " << packet.retain() << ')';
|
|
switch (packet.qos()) {
|
|
case Mqtt::QoS0:
|
|
break;
|
|
case Mqtt::QoS1: {
|
|
MqttPacket response(MqttPacket::TypePuback, packet.packetId());
|
|
client->write(response.serialize());
|
|
break;
|
|
}
|
|
case Mqtt::QoS2: {
|
|
if (packet.dup() && ctx->unackedPacketList.contains(packet.packetId())) {
|
|
// We received this message before but the client keeps on trying... Just send a PUBREC and stop processing
|
|
client->write(ctx->unackedPackets.value(packet.packetId()).serialize());
|
|
return;
|
|
} else if (ctx->unackedPacketList.contains(packet.packetId())) {
|
|
// Hmm... Client says this is a new packet, but the ID is not released yet! Drop client connection.
|
|
qCWarning(dbgServer()).nospace() << "Received a bad packet from \"" << ctx->clientId << "\". DUP is not set but packet ID is already used and not released. Dropping client connection.";
|
|
cleanupClient(client);
|
|
return;
|
|
}
|
|
// Ok, a new packet, ack it with a PUBREC and store the number
|
|
MqttPacket response(MqttPacket::TypePubrec, packet.packetId());
|
|
ctx->unackedPackets.insert(response.packetId(), response);
|
|
ctx->unackedPacketList.append(packet.packetId());
|
|
client->write(response.serialize());
|
|
break;
|
|
}
|
|
}
|
|
if (packet.retain()) {
|
|
if (packet.payload().isEmpty()) {
|
|
qCDebug(dbgServer) << "Clearing retained messages for topic" << packet.topic();
|
|
retainedMessages.remove(packet.topic());
|
|
} else {
|
|
if (packet.qos() == Mqtt::QoS0) {
|
|
qCDebug(dbgServer) << "Clearing retained messages for topic" << packet.topic();
|
|
retainedMessages.remove(packet.topic());
|
|
}
|
|
qCDebug(dbgServer) << "Adding retained message for topic" << packet.topic();
|
|
retainedMessages[packet.topic()].append(packet);
|
|
}
|
|
}
|
|
|
|
if (authorizer && !authorizer->authorizePublish(servers.key(clientServerMap.value(client)), ctx->clientId, packet.topic())) {
|
|
qCDebug(dbgServer) << "Client not authorized to publish to this topic. Discarding packet";
|
|
return;
|
|
}
|
|
|
|
emit q_ptr->publishReceived(ctx->clientId, packet.packetId(), packet.topic(), packet.payload());
|
|
publish(packet.topic(), packet.payload());
|
|
|
|
return;
|
|
}
|
|
if (packet.type() == MqttPacket::TypePuback) {
|
|
ctx->unackedPacketList.removeAll(packet.packetId());
|
|
MqttPacket publishedPacket = ctx->unackedPackets.take(packet.packetId());
|
|
emit q_ptr->published(ctx->clientId, packet.packetId(), publishedPacket.topic(), publishedPacket.payload());
|
|
return;
|
|
}
|
|
if (packet.type() == MqttPacket::TypePubrec) {
|
|
MqttPacket publishedPacket = ctx->unackedPackets.take(packet.packetId());
|
|
emit q_ptr->published(ctx->clientId, packet.packetId(), publishedPacket.topic(), publishedPacket.payload());
|
|
MqttPacket pubrel(MqttPacket::TypePubrel, packet.packetId());
|
|
ctx->unackedPackets.insert(packet.packetId(), pubrel);
|
|
client->write(pubrel.serialize());
|
|
return;
|
|
}
|
|
if (packet.type() == MqttPacket::TypePubrel) {
|
|
ctx->unackedPackets.remove(packet.packetId());
|
|
ctx->unackedPacketList.removeAll(packet.packetId());
|
|
MqttPacket response(MqttPacket::TypePubcomp, packet.packetId());
|
|
client->write(response.serialize());
|
|
return;
|
|
}
|
|
if (packet.type() == MqttPacket::TypePubcomp) {
|
|
ctx->unackedPackets.remove(packet.packetId());
|
|
ctx->unackedPacketList.removeAll(packet.packetId());
|
|
return;
|
|
}
|
|
if (packet.type() == MqttPacket::TypeSubscribe) {
|
|
// qCDebug(dbgServer).nospace() << ctx->clientId ": Subscribe packet received.";
|
|
MqttPacket response(MqttPacket::TypeSuback, packet.packetId());
|
|
QByteArray payload;
|
|
MqttSubscriptions effectiveSubscriptions;
|
|
foreach (MqttSubscription subscription, packet.subscriptions()) {
|
|
if (authorizer && !authorizer->authorizeSubscribe(servers.key(clientServerMap.value(client)), ctx->clientId, subscription.topicFilter())) {
|
|
qCWarning(dbgServer).nospace().noquote() << "Subscription topic filter not allowed for client \"" << ctx->clientId << "\": \"" << subscription.topicFilter() << '\"';
|
|
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeFailure);
|
|
continue;
|
|
}
|
|
if (!validateTopicFilter(subscription.topicFilter())) {
|
|
qCWarning(dbgServer).nospace() << "Subscription topic filter not valid for client \"" << ctx->clientId << "\": " << subscription.topicFilter();
|
|
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeFailure);
|
|
continue;
|
|
}
|
|
subscription.setQoS(qMin(subscription.qoS(), maximumSubscriptionQoS));
|
|
bool updated = false;
|
|
for (int i = 0; i < ctx->subscriptions.count(); i++) {
|
|
if (ctx->subscriptions.at(i).topicFilter() == subscription.topicFilter()) {
|
|
qCDebug(dbgServer).noquote().nospace() << "Client \"" << ctx->clientId << "\" subscribed with a duplicate topic filter. Replacing subcription with new QoS" << subscription.qoS();
|
|
ctx->subscriptions.replace(i, subscription);
|
|
updated = true;
|
|
}
|
|
}
|
|
if (!updated) {
|
|
ctx->subscriptions.append(subscription);
|
|
}
|
|
qCDebug(dbgServer).noquote().nospace() << "Subscribed client \"" << ctx->clientId << "\" to topic filter: \"" << subscription.topicFilter() << "\" with QoS " << subscription.qoS();
|
|
effectiveSubscriptions << subscription;
|
|
emit q_ptr->clientSubscribed(ctx->clientId, subscription.topicFilter(), subscription.qoS());
|
|
switch (subscription.qoS()) {
|
|
case Mqtt::QoS0:
|
|
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS0);
|
|
break;
|
|
case Mqtt::QoS1:
|
|
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS1);
|
|
break;
|
|
case Mqtt::QoS2:
|
|
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeSuccessQoS2);
|
|
break;
|
|
}
|
|
}
|
|
client->write(response.serialize());
|
|
|
|
// Deliver any retained messages for this topic
|
|
foreach (MqttSubscription subscription, effectiveSubscriptions) {
|
|
foreach (const QString &topic, retainedMessages.keys()) {
|
|
if (matchTopic(subscription.topicFilter(), topic)) {
|
|
foreach (MqttPacket packet, retainedMessages.value(topic)) {
|
|
packet.setRetain(true);
|
|
client->write(packet.serialize());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
if (packet.type() == MqttPacket::TypeUnsubscribe) {
|
|
MqttSubscriptions newSubscriptions;
|
|
foreach (const MqttSubscription &existingSubscription, ctx->subscriptions) {
|
|
bool matching = false;
|
|
foreach (const MqttSubscription &unsub, packet.subscriptions()) {
|
|
if (existingSubscription.topicFilter() == unsub.topicFilter()) {
|
|
qCDebug(dbgServer) << "Unsubscribing client" << ctx->clientId << "from" << unsub.topicFilter();
|
|
emit q_ptr->clientUnsubscribed(ctx->clientId, unsub.topicFilter());
|
|
matching = true;
|
|
break;
|
|
}
|
|
}
|
|
if (!matching) {
|
|
newSubscriptions.append(existingSubscription);
|
|
}
|
|
}
|
|
ctx->subscriptions = newSubscriptions;
|
|
MqttPacket response(MqttPacket::TypeUnsuback, packet.packetId());
|
|
client->write(response.serialize());
|
|
return;
|
|
}
|
|
if (packet.type() == MqttPacket::TypePingreq) {
|
|
// qCDebug(dbgServer).nospace() << ctx->clientId << ": Pingreq received";
|
|
MqttPacket response(MqttPacket::TypePingresp, packet.packetId());
|
|
client->write(response.serialize());
|
|
return;
|
|
}
|
|
if (packet.type() == MqttPacket::TypeDisconnect) {
|
|
ctx->willMessage.clear();
|
|
ctx->willTopic.clear();
|
|
return;
|
|
}
|
|
qCWarning(dbgServer).nospace().noquote() << "Unknown packet received from client \"" << ctx->clientId << "\": " << QString::number(packet.type(), 16);
|
|
Q_ASSERT(false);
|
|
cleanupClient(client);
|
|
|
|
}
|
|
|
|
bool MqttServerPrivate::validateTopicFilter(const QString &topicFilter)
|
|
{
|
|
if (topicFilter.length() < 1) {
|
|
return false;
|
|
}
|
|
QStringList parts = topicFilter.split('/');
|
|
for (int i = 0; i < parts.count(); i++) {
|
|
const QString &part = parts.at(i);
|
|
if (part.contains(QStringLiteral("#")) && (part != QStringLiteral("#") || i != parts.count() - 1)) {
|
|
return false;
|
|
}
|
|
if (part.contains(QStringLiteral("+")) && part != QStringLiteral("+")) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool MqttServerPrivate::matchTopic(const QString &topicFilter, const QString &topic)
|
|
{
|
|
if (topic.startsWith('$')) {
|
|
return false;
|
|
}
|
|
|
|
QStringList filterParts = topicFilter.split('/');
|
|
QStringList topicParts = topic.split('/');
|
|
|
|
if (topicParts.count() < filterParts.count() - 1) {
|
|
return false;
|
|
}
|
|
|
|
for (int i = 0; i < filterParts.count(); i++) {
|
|
if (filterParts.at(i) == QStringLiteral("+")) {
|
|
continue;
|
|
}
|
|
if (filterParts.at(i) == QStringLiteral("#")) {
|
|
continue;
|
|
}
|
|
if (topicParts.length() <= i) {
|
|
return false;
|
|
}
|
|
if (topicParts.at(i) == filterParts.at(i)) {
|
|
continue;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
return filterParts.count() == topicParts.count() || topicFilter.endsWith('#');
|
|
}
|
|
|
|
quint16 MqttServerPrivate::newPacketId(ClientContext *ctx)
|
|
{
|
|
static quint16 packetId = 0;
|
|
do {
|
|
packetId++;
|
|
} while(ctx->unackedPacketList.contains(packetId));
|
|
return packetId;
|
|
}
|
|
|
|
void SslServer::incomingConnection(qintptr socketDescriptor)
|
|
{
|
|
QSslSocket *sslSocket = new QSslSocket(this);
|
|
|
|
qCDebug(dbgServer) << "New client socket connection:" << sslSocket;
|
|
|
|
connect(sslSocket, &QSslSocket::encrypted, [this, sslSocket](){ emit clientConnected(sslSocket); });
|
|
connect(sslSocket, &QSslSocket::readyRead, this, &SslServer::onSocketReadyRead);
|
|
connect(sslSocket, &QSslSocket::disconnected, this, &SslServer::onClientDisconnected);
|
|
|
|
if (!sslSocket->setSocketDescriptor(socketDescriptor)) {
|
|
qCWarning(dbgServer) << "Failed to set SSL socket descriptor.";
|
|
delete sslSocket;
|
|
return;
|
|
}
|
|
if (!m_config.isNull()) {
|
|
sslSocket->setSslConfiguration(m_config);
|
|
sslSocket->startServerEncryption();
|
|
} else {
|
|
emit clientConnected(sslSocket);
|
|
}
|
|
}
|
|
|
|
void SslServer::onClientDisconnected()
|
|
{
|
|
QSslSocket *socket = static_cast<QSslSocket*>(sender());
|
|
qCDebug(dbgServer) << "Client socket disconnected:" << socket;
|
|
emit clientDisconnected(socket);
|
|
socket->deleteLater();
|
|
}
|
|
|
|
void SslServer::onSocketReadyRead()
|
|
{
|
|
QSslSocket *socket = static_cast<QSslSocket*>(sender());
|
|
QByteArray data = socket->readAll();
|
|
emit dataAvailable(socket, data);
|
|
}
|