/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * Copyright (C) 2018 Michael Zanetti * * * * This file is part of nymea. * * * * nymea is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, version 2 of the License. * * * * nymea is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with nymea. If not, see . * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ #include "mqttbroker.h" #include "loggingcategories.h" #include "nymea-mqtt/mqttserver.h" namespace nymeaserver { class NymeaMqttAuthorizer: public MqttAuthorizer { public: NymeaMqttAuthorizer(MqttBroker* broker) : m_broker(broker) {} Mqtt::ConnectReturnCode authorizeConnect(int serverAddressId, const QString &clientId, const QString &username, const QString &password, const QHostAddress &peerAddress) override { Q_UNUSED(peerAddress) if (!m_broker->m_configs.value(serverAddressId).authenticationEnabled) { qCDebug(dcMqtt) << "Accepting client" << clientId << ". Server configuration does not require authentication."; return Mqtt::ConnectReturnCodeAccepted; } if (!m_broker->m_policies.contains(clientId)) { qCDebug(dcMqtt) << "Rejecting client" << clientId << ". No policy for this client installed."; return Mqtt::ConnectReturnCodeIdentifierRejected; } MqttPolicy policy = m_broker->m_policies.value(clientId); if (policy.username != username || policy.password != password) { qCDebug(dcMqtt) << "Rejecting client" << clientId << ". Bad username or password."; return Mqtt::ConnectReturnCodeBadUsernameOrPassword; } qCDebug(dcMqtt) << "Accepting client" << clientId << ". Login successful."; return Mqtt::ConnectReturnCodeAccepted; } bool authorizeSubscribe(int serverAddressId, const QString &clientId, const QString &topicFilter) override { if (!m_broker->m_configs.value(serverAddressId).authenticationEnabled) { return true; } if (!m_broker->m_policies.contains(clientId)) { return false; } MqttPolicy policy = m_broker->m_policies.value(clientId); return matchPolicy(policy.allowedSubscribeTopicFilters, topicFilter); } bool authorizePublish(int serverAddressId, const QString &clientId, const QString &topic) override { if (!m_broker->m_configs.value(serverAddressId).authenticationEnabled) { return true; } if (!m_broker->m_policies.contains(clientId)) { return false; } MqttPolicy policy = m_broker->m_policies.value(clientId); return matchPolicy(policy.allowedPublishTopicFilters, topic); } bool matchPolicy(const QStringList &policies, const QString &topic) { foreach (const QString &policyFilter, policies) { QStringList policyParts = policyFilter.split('/'); QStringList topicParts = topic.split('/'); if (topicParts.count() < policyParts.count() - 1) { // Nope... actual topic is shorter than filter continue; } bool bad = false; for (int i = 0; i < policyParts.length(); i++) { if (policyParts.at(i) == QStringLiteral("+")) { continue; } if (policyParts.at(i) == QStringLiteral("#")) { continue; } if (policyParts.at(i) == topicParts.at(i)) { continue; } // Nope... this part does neither match nor is covered by a wildcard in the policy bad = true; } if (bad) { continue; } if (policyParts.count() == topicParts.count() || policyFilter.endsWith('#')) { // OK, either the policy is matching or the topicFilter is longer as the policy and policy ends with # return true; } } // Nope... none of the policies matched... return false; } private: MqttBroker *m_broker; }; MqttBroker::MqttBroker(QObject *parent) : QObject(parent) { m_server = new MqttServer(this); m_authorizer = new NymeaMqttAuthorizer(this); m_server->setAuthorizer(m_authorizer); connect(m_server, &MqttServer::clientConnected, this, &MqttBroker::onClientConnected); connect(m_server, &MqttServer::clientDisconnected, this, &MqttBroker::onClientDisconnected); connect(m_server, &MqttServer::publishReceived, this, &MqttBroker::onPublishReceived); connect(m_server, &MqttServer::clientSubscribed, this, &MqttBroker::onClientSubscribed); connect(m_server, &MqttServer::clientUnsubscribed, this, &MqttBroker::onClientUnsubscribed); } MqttBroker::~MqttBroker() { delete m_server; delete m_authorizer; } bool MqttBroker::startServer(const ServerConfiguration &config, const QSslConfiguration &sslConfiguration) { int serverAddressId = m_server->listen(config.address, config.port, config.sslEnabled ? sslConfiguration : QSslConfiguration()); if (serverAddressId == -1) { qCWarning(dcMqtt) << "Error starting MQTT server on port" << config.port; return false; } qCDebug(dcMqtt) << "MQTT server running on" << config.address << ":" << config.port; m_configs.insert(serverAddressId, config); return true; } bool MqttBroker::isRunning(const QString &configId) const { foreach (const ServerConfiguration &config, m_configs){ if (config.id == configId) { return true; } } return false; } bool MqttBroker::isRunning() const { return !m_configs.isEmpty(); } QList MqttBroker::configurations() const { return m_configs.values(); } void MqttBroker::stopServer(const QString &configId) { int serverAddressId = -1; foreach (const ServerConfiguration &config, m_configs) { if (config.id == configId) { serverAddressId = m_configs.key(config); break; } } if (serverAddressId == -1) { qCWarning(dcMqtt) << "Config" << configId << "unknown to MQTT server. Cannot stop server"; return; } m_server->close(serverAddressId); qCDebug(dcMqtt) << "MQTT server stopped on" << m_configs.value(serverAddressId).address << ":" << m_configs.value(serverAddressId).port; m_configs.remove(serverAddressId); } QList MqttBroker::policies() { return m_policies.values(); } MqttPolicy MqttBroker::policy(const QString &clientId) { return m_policies.value(clientId); } void MqttBroker::updatePolicy(const MqttPolicy &policy) { if (m_policies.contains(policy.clientId)) { m_policies[policy.clientId] = policy; qCDebug(dcMqtt) << "Policy for client" << policy.clientId << "updated."; emit policyChanged(policy); return; } qCDebug(dcMqtt) << "Policy for client" << policy.clientId << "added."; m_policies.insert(policy.clientId, policy); emit policyAdded(policy); } void MqttBroker::updatePolicies(const QList &policies) { foreach (const MqttPolicy &policy, policies) { updatePolicy(policy); } } bool MqttBroker::removePolicy(const QString &clientId) { if (m_policies.contains(clientId)) { // Is there a client connected for this policy? if (m_server->clients().contains(clientId)) { m_server->disconnectClient(clientId); } qCDebug(dcMqtt) << "Policy for client" << clientId << "removed"; emit policyRemoved(m_policies.take(clientId)); return true; } return false; } void MqttBroker::publish(const QString &topic, const QByteArray &payload) { m_server->publish(topic, payload); } void MqttBroker::onClientConnected(int serverAddressId, const QString &clientId, const QString &username, const QHostAddress &clientAddress) { Q_UNUSED(serverAddressId) qCDebug(dcMqtt) << "Client" << clientId << "connected with username" << username << "from" << clientAddress.toString(); emit clientConnected(clientId); } void MqttBroker::onClientDisconnected(const QString &clientId) { qCDebug(dcMqtt) << "Client" << clientId << "disconnected"; emit clientDisconnected(clientId); } void MqttBroker::onPublishReceived(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload) { Q_UNUSED(packetId) qCDebug(dcMqtt) << "Publish received from client" << clientId << ":" << topic << ">" << payload; emit publishReceived(clientId, topic, payload); } void MqttBroker::onClientSubscribed(const QString &clientId, const QString &topicFilter, Mqtt::QoS requestedQoS) { qCDebug(dcMqtt) << "Client" << clientId << "subscribed to" << topicFilter << "(QoS:" << requestedQoS << ")"; emit clientSubscribed(clientId, topicFilter); } void MqttBroker::onClientUnsubscribed(const QString &clientId, const QString &topicFilter) { qCDebug(dcMqtt) << "Client" << clientId << "unsubscribed from" << topicFilter; emit clientUnsubscribed(clientId, topicFilter); } }