From 751359c52861dce737c6936df3f8735839f1f8a0 Mon Sep 17 00:00:00 2001 From: Michael Zanetti Date: Tue, 20 Nov 2018 17:20:16 +0100 Subject: [PATCH] added missing files --- libnymea-core/servers/mqttbroker.cpp | 216 +++++++++++++++++++++++++++ libnymea-core/servers/mqttbroker.h | 61 ++++++++ 2 files changed, 277 insertions(+) create mode 100644 libnymea-core/servers/mqttbroker.cpp create mode 100644 libnymea-core/servers/mqttbroker.h diff --git a/libnymea-core/servers/mqttbroker.cpp b/libnymea-core/servers/mqttbroker.cpp new file mode 100644 index 00000000..a0dad374 --- /dev/null +++ b/libnymea-core/servers/mqttbroker.cpp @@ -0,0 +1,216 @@ +#include "mqttbroker.h" +#include "loggingcategories.h" + +#include "nymea-mqtt/mqttserver.h" + +namespace nymeaserver { + +class NymeaMqttAuthorizer: public MqttAuthorizer +{ +public: + NymeaMqttAuthorizer(MqttBroker* broker) : m_broker(broker) {} + + Mqtt::ConnectReturnCode authorizeConnect(int serverAddressId, const QString &clientId, const QString &username, const QString &password, const QHostAddress &peerAddress) override { + Q_UNUSED(peerAddress) + if (!m_broker->m_configs.value(serverAddressId).authenticationEnabled) { + qCDebug(dcMqtt) << "Accepting client" << clientId << ". Server configuration does not require authentication."; + return Mqtt::ConnectReturnCodeAccepted; + } + if (!m_broker->m_policies.contains(clientId)) { + qCDebug(dcMqtt) << "Rejecting client" << clientId << ". No policy for this client installed."; + return Mqtt::ConnectReturnCodeIdentifierRejected; + } + MqttPolicy policy = m_broker->m_policies.value(clientId); + if (policy.username != username || policy.password != password) { + qCDebug(dcMqtt) << "Rejecting client" << clientId << ". Bad username or password."; + return Mqtt::ConnectReturnCodeBadUsernameOrPassword; + } + qCDebug(dcMqtt) << "Accepting client" << clientId << ". Login successful."; + return Mqtt::ConnectReturnCodeAccepted; + } + + bool authorizeSubscribe(int serverAddressId, const QString &clientId, const QString &topicFilter) override { + if (!m_broker->m_configs.value(serverAddressId).authenticationEnabled) { + return true; + } + if (!m_broker->m_policies.contains(clientId)) { + return false; + } + MqttPolicy policy = m_broker->m_policies.value(clientId); + return matchPolicy(policy.allowedSubscribeTopicFilters, topicFilter); + } + + bool authorizePublish(int serverAddressId, const QString &clientId, const QString &topic) override { + if (!m_broker->m_configs.value(serverAddressId).authenticationEnabled) { + return true; + } + if (!m_broker->m_policies.contains(clientId)) { + return false; + } + MqttPolicy policy = m_broker->m_policies.value(clientId); + return matchPolicy(policy.allowedPublishTopicFilters, topic); + } + + bool matchPolicy(const QStringList &policies, const QString &topic) { + foreach (const QString &policyFilter, policies) { + QStringList policyParts = policyFilter.split('/'); + QStringList topicParts = topic.split('/'); + if (topicParts.count() < policyParts.count() - 1) { + // Nope... actual topic is shorter than filter + continue; + } + bool bad = false; + for (int i = 0; i < policyParts.length(); i++) { + if (policyParts.at(i) == QStringLiteral("+")) { + continue; + } + if (policyParts.at(i) == QStringLiteral("#")) { + continue; + } + if (policyParts.at(i) == topicParts.at(i)) { + continue; + } + // Nope... this part does neither match nor is covered by a wildcard in the policy + bad = true; + } + if (bad) { + continue; + } + if (policyParts.count() == topicParts.count() || policyFilter.endsWith('#')) { + // OK, either the policy is matching or the topicFilter is longer as the policy and policy ends with # + return true; + } + } + // Nope... none of the policies matched... + return false; + } + +private: + MqttBroker *m_broker; +}; + +MqttBroker::MqttBroker(QObject *parent) : QObject(parent) +{ + m_server = new MqttServer(this); + m_authorizer = new NymeaMqttAuthorizer(this); + m_server->setAuthorizer(m_authorizer); + + connect(m_server, &MqttServer::clientConnected, this, &MqttBroker::onClientConnected); + connect(m_server, &MqttServer::clientDisconnected, this, &MqttBroker::onClientDisconnected); + connect(m_server, &MqttServer::publishReceived, this, &MqttBroker::onPublishReceived); + connect(m_server, &MqttServer::clientSubscribed, this, &MqttBroker::onClientSubscribed); + connect(m_server, &MqttServer::clientUnsubscribed, this, &MqttBroker::onClientUnsubscribed); +} + +MqttBroker::~MqttBroker() +{ + delete m_server; + delete m_authorizer; +} + +bool MqttBroker::startServer(const ServerConfiguration &config, const QSslConfiguration &sslConfiguration) +{ + int serverAddressId = m_server->listen(config.address, config.port, config.sslEnabled ? sslConfiguration : QSslConfiguration()); + if (serverAddressId == -1) { + qCWarning(dcMqtt) << "Error starting MQTT server on port" << config.port; + return false; + } + qCDebug(dcMqtt) << "MQTT server running on" << config.address << ":" << config.port; + m_configs.insert(serverAddressId, config); + return true; +} + +bool MqttBroker::isRunning(const QString &configId) const +{ + foreach (const ServerConfiguration &config, m_configs){ + if (config.id == configId) { + return true; + } + } + return false; +} + +void MqttBroker::stopServer(const QString &configId) +{ + int serverAddressId = -1; + foreach (const ServerConfiguration &config, m_configs) { + if (config.id == configId) { + serverAddressId = m_configs.key(config); + break; + } + } + if (serverAddressId == -1) { + qCWarning(dcMqtt) << "Config" << configId << "unknown to MQTT server. Cannot stop server"; + return; + } + m_server->close(serverAddressId); + qCDebug(dcMqtt) << "MQTT server stopped on" << m_configs.value(serverAddressId).address << ":" << m_configs.value(serverAddressId).port; + m_configs.remove(serverAddressId); +} + +QList MqttBroker::policies() +{ + return m_policies.values(); +} + +MqttPolicy MqttBroker::policy(const QString &clientId) +{ + return m_policies.value(clientId); +} + +void MqttBroker::updatePolicy(const MqttPolicy &policy) +{ + if (m_policies.contains(policy.clientId)) { + m_policies[policy.clientId] = policy; + qCDebug(dcMqtt) << "Policy for client" << policy.clientId << "updated."; + emit policyChanged(policy); + return; + } + qCDebug(dcMqtt) << "Policy for client" << policy.clientId << "added."; + m_policies.insert(policy.clientId, policy); + emit policyAdded(policy); +} + +bool MqttBroker::removePolicy(const QString &clientId) +{ + if (m_policies.contains(clientId)) { + qCDebug(dcMqtt) << "Policy for client" << clientId << "removed"; + emit policyRemoved(m_policies.take(clientId)); + return true; + } + return false; +} + +void MqttBroker::onClientConnected(int serverAddressId, const QString &clientId, const QString &username, const QHostAddress &clientAddress) +{ + Q_UNUSED(serverAddressId) + qCDebug(dcMqtt) << "Client" << clientId << "connected with username" << username << "from" << clientAddress.toString(); + emit clientConnected(clientId); +} + +void MqttBroker::onClientDisconnected(const QString &clientId) +{ + qCDebug(dcMqtt) << "Client" << clientId << "disconnected"; + emit clientDisconnected(clientId); +} + +void MqttBroker::onPublishReceived(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload) +{ + Q_UNUSED(packetId) + qCDebug(dcMqtt) << "Publish received from client" << clientId << ":" << topic << ">" << payload; + emit publishReceived(clientId, topic, payload); +} + +void MqttBroker::onClientSubscribed(const QString &clientId, const QString &topicFilter, Mqtt::QoS requestedQoS) +{ + qCDebug(dcMqtt) << "Client" << clientId << "subscribed to" << topicFilter << "(QoS:" << requestedQoS << ")"; + emit clientSubscribed(clientId, topicFilter); +} + +void MqttBroker::onClientUnsubscribed(const QString &clientId, const QString &topicFilter) +{ + qCDebug(dcMqtt) << "Client" << clientId << "unsubscribed from" << topicFilter; + emit clientUnsubscribed(clientId, topicFilter); +} + +} diff --git a/libnymea-core/servers/mqttbroker.h b/libnymea-core/servers/mqttbroker.h new file mode 100644 index 00000000..168e7c32 --- /dev/null +++ b/libnymea-core/servers/mqttbroker.h @@ -0,0 +1,61 @@ +#ifndef MQTTBROKER_H +#define MQTTBROKER_H + +#include +#include +#include + +#include "nymea-mqtt/mqtt.h" +#include "nymeaconfiguration.h" + +class MqttServer; + +namespace nymeaserver { + +class NymeaMqttAuthorizer; + +class MqttBroker : public QObject +{ + Q_OBJECT +public: + explicit MqttBroker(QObject *parent = nullptr); + ~MqttBroker(); + + bool startServer(const ServerConfiguration &config, const QSslConfiguration &sslConfiguration = QSslConfiguration()); + bool isRunning(const QString &configId) const; + void stopServer(const QString &configId); + + QList policies(); + MqttPolicy policy(const QString &clientId); + void updatePolicy(const MqttPolicy &policy); + bool removePolicy(const QString &clientId); + +private slots: + void onClientConnected(int serverAddressId, const QString &clientId, const QString &username, const QHostAddress &clientAddress); + void onClientDisconnected(const QString &clientId); + void onPublishReceived(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload); + void onClientSubscribed(const QString &clientId, const QString &topicFilter, Mqtt::QoS requestedQoS); + void onClientUnsubscribed(const QString &clientId, const QString &topicFilter); + +signals: + void clientConnected(const QString &clientId); + void clientDisconnected(const QString &clientId); + void publishReceived(const QString &clientId, const QString &topic, const QByteArray &payload); + void clientSubscribed(const QString &clientId, const QString &topicFilter); + void clientUnsubscribed(const QString &clientId, const QString &topicFilter); + void policyAdded(const MqttPolicy &policy); + void policyChanged(const MqttPolicy &policy); + void policyRemoved(const MqttPolicy &policy); + +private: + MqttServer* m_server = nullptr; + NymeaMqttAuthorizer *m_authorizer = nullptr; + QHash m_configs; + QHash m_policies; + + + friend class NymeaMqttAuthorizer; +}; +} + +#endif // MQTTBROKER_H