added missing files

This commit is contained in:
Michael Zanetti 2018-11-20 17:20:16 +01:00
parent 8d05f984c0
commit 751359c528
2 changed files with 277 additions and 0 deletions

View File

@ -0,0 +1,216 @@
#include "mqttbroker.h"
#include "loggingcategories.h"
#include "nymea-mqtt/mqttserver.h"
namespace nymeaserver {
class NymeaMqttAuthorizer: public MqttAuthorizer
{
public:
NymeaMqttAuthorizer(MqttBroker* broker) : m_broker(broker) {}
Mqtt::ConnectReturnCode authorizeConnect(int serverAddressId, const QString &clientId, const QString &username, const QString &password, const QHostAddress &peerAddress) override {
Q_UNUSED(peerAddress)
if (!m_broker->m_configs.value(serverAddressId).authenticationEnabled) {
qCDebug(dcMqtt) << "Accepting client" << clientId << ". Server configuration does not require authentication.";
return Mqtt::ConnectReturnCodeAccepted;
}
if (!m_broker->m_policies.contains(clientId)) {
qCDebug(dcMqtt) << "Rejecting client" << clientId << ". No policy for this client installed.";
return Mqtt::ConnectReturnCodeIdentifierRejected;
}
MqttPolicy policy = m_broker->m_policies.value(clientId);
if (policy.username != username || policy.password != password) {
qCDebug(dcMqtt) << "Rejecting client" << clientId << ". Bad username or password.";
return Mqtt::ConnectReturnCodeBadUsernameOrPassword;
}
qCDebug(dcMqtt) << "Accepting client" << clientId << ". Login successful.";
return Mqtt::ConnectReturnCodeAccepted;
}
bool authorizeSubscribe(int serverAddressId, const QString &clientId, const QString &topicFilter) override {
if (!m_broker->m_configs.value(serverAddressId).authenticationEnabled) {
return true;
}
if (!m_broker->m_policies.contains(clientId)) {
return false;
}
MqttPolicy policy = m_broker->m_policies.value(clientId);
return matchPolicy(policy.allowedSubscribeTopicFilters, topicFilter);
}
bool authorizePublish(int serverAddressId, const QString &clientId, const QString &topic) override {
if (!m_broker->m_configs.value(serverAddressId).authenticationEnabled) {
return true;
}
if (!m_broker->m_policies.contains(clientId)) {
return false;
}
MqttPolicy policy = m_broker->m_policies.value(clientId);
return matchPolicy(policy.allowedPublishTopicFilters, topic);
}
bool matchPolicy(const QStringList &policies, const QString &topic) {
foreach (const QString &policyFilter, policies) {
QStringList policyParts = policyFilter.split('/');
QStringList topicParts = topic.split('/');
if (topicParts.count() < policyParts.count() - 1) {
// Nope... actual topic is shorter than filter
continue;
}
bool bad = false;
for (int i = 0; i < policyParts.length(); i++) {
if (policyParts.at(i) == QStringLiteral("+")) {
continue;
}
if (policyParts.at(i) == QStringLiteral("#")) {
continue;
}
if (policyParts.at(i) == topicParts.at(i)) {
continue;
}
// Nope... this part does neither match nor is covered by a wildcard in the policy
bad = true;
}
if (bad) {
continue;
}
if (policyParts.count() == topicParts.count() || policyFilter.endsWith('#')) {
// OK, either the policy is matching or the topicFilter is longer as the policy and policy ends with #
return true;
}
}
// Nope... none of the policies matched...
return false;
}
private:
MqttBroker *m_broker;
};
MqttBroker::MqttBroker(QObject *parent) : QObject(parent)
{
m_server = new MqttServer(this);
m_authorizer = new NymeaMqttAuthorizer(this);
m_server->setAuthorizer(m_authorizer);
connect(m_server, &MqttServer::clientConnected, this, &MqttBroker::onClientConnected);
connect(m_server, &MqttServer::clientDisconnected, this, &MqttBroker::onClientDisconnected);
connect(m_server, &MqttServer::publishReceived, this, &MqttBroker::onPublishReceived);
connect(m_server, &MqttServer::clientSubscribed, this, &MqttBroker::onClientSubscribed);
connect(m_server, &MqttServer::clientUnsubscribed, this, &MqttBroker::onClientUnsubscribed);
}
MqttBroker::~MqttBroker()
{
delete m_server;
delete m_authorizer;
}
bool MqttBroker::startServer(const ServerConfiguration &config, const QSslConfiguration &sslConfiguration)
{
int serverAddressId = m_server->listen(config.address, config.port, config.sslEnabled ? sslConfiguration : QSslConfiguration());
if (serverAddressId == -1) {
qCWarning(dcMqtt) << "Error starting MQTT server on port" << config.port;
return false;
}
qCDebug(dcMqtt) << "MQTT server running on" << config.address << ":" << config.port;
m_configs.insert(serverAddressId, config);
return true;
}
bool MqttBroker::isRunning(const QString &configId) const
{
foreach (const ServerConfiguration &config, m_configs){
if (config.id == configId) {
return true;
}
}
return false;
}
void MqttBroker::stopServer(const QString &configId)
{
int serverAddressId = -1;
foreach (const ServerConfiguration &config, m_configs) {
if (config.id == configId) {
serverAddressId = m_configs.key(config);
break;
}
}
if (serverAddressId == -1) {
qCWarning(dcMqtt) << "Config" << configId << "unknown to MQTT server. Cannot stop server";
return;
}
m_server->close(serverAddressId);
qCDebug(dcMqtt) << "MQTT server stopped on" << m_configs.value(serverAddressId).address << ":" << m_configs.value(serverAddressId).port;
m_configs.remove(serverAddressId);
}
QList<MqttPolicy> MqttBroker::policies()
{
return m_policies.values();
}
MqttPolicy MqttBroker::policy(const QString &clientId)
{
return m_policies.value(clientId);
}
void MqttBroker::updatePolicy(const MqttPolicy &policy)
{
if (m_policies.contains(policy.clientId)) {
m_policies[policy.clientId] = policy;
qCDebug(dcMqtt) << "Policy for client" << policy.clientId << "updated.";
emit policyChanged(policy);
return;
}
qCDebug(dcMqtt) << "Policy for client" << policy.clientId << "added.";
m_policies.insert(policy.clientId, policy);
emit policyAdded(policy);
}
bool MqttBroker::removePolicy(const QString &clientId)
{
if (m_policies.contains(clientId)) {
qCDebug(dcMqtt) << "Policy for client" << clientId << "removed";
emit policyRemoved(m_policies.take(clientId));
return true;
}
return false;
}
void MqttBroker::onClientConnected(int serverAddressId, const QString &clientId, const QString &username, const QHostAddress &clientAddress)
{
Q_UNUSED(serverAddressId)
qCDebug(dcMqtt) << "Client" << clientId << "connected with username" << username << "from" << clientAddress.toString();
emit clientConnected(clientId);
}
void MqttBroker::onClientDisconnected(const QString &clientId)
{
qCDebug(dcMqtt) << "Client" << clientId << "disconnected";
emit clientDisconnected(clientId);
}
void MqttBroker::onPublishReceived(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload)
{
Q_UNUSED(packetId)
qCDebug(dcMqtt) << "Publish received from client" << clientId << ":" << topic << ">" << payload;
emit publishReceived(clientId, topic, payload);
}
void MqttBroker::onClientSubscribed(const QString &clientId, const QString &topicFilter, Mqtt::QoS requestedQoS)
{
qCDebug(dcMqtt) << "Client" << clientId << "subscribed to" << topicFilter << "(QoS:" << requestedQoS << ")";
emit clientSubscribed(clientId, topicFilter);
}
void MqttBroker::onClientUnsubscribed(const QString &clientId, const QString &topicFilter)
{
qCDebug(dcMqtt) << "Client" << clientId << "unsubscribed from" << topicFilter;
emit clientUnsubscribed(clientId, topicFilter);
}
}

View File

@ -0,0 +1,61 @@
#ifndef MQTTBROKER_H
#define MQTTBROKER_H
#include <QObject>
#include <QHostAddress>
#include <QSslConfiguration>
#include "nymea-mqtt/mqtt.h"
#include "nymeaconfiguration.h"
class MqttServer;
namespace nymeaserver {
class NymeaMqttAuthorizer;
class MqttBroker : public QObject
{
Q_OBJECT
public:
explicit MqttBroker(QObject *parent = nullptr);
~MqttBroker();
bool startServer(const ServerConfiguration &config, const QSslConfiguration &sslConfiguration = QSslConfiguration());
bool isRunning(const QString &configId) const;
void stopServer(const QString &configId);
QList<MqttPolicy> policies();
MqttPolicy policy(const QString &clientId);
void updatePolicy(const MqttPolicy &policy);
bool removePolicy(const QString &clientId);
private slots:
void onClientConnected(int serverAddressId, const QString &clientId, const QString &username, const QHostAddress &clientAddress);
void onClientDisconnected(const QString &clientId);
void onPublishReceived(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload);
void onClientSubscribed(const QString &clientId, const QString &topicFilter, Mqtt::QoS requestedQoS);
void onClientUnsubscribed(const QString &clientId, const QString &topicFilter);
signals:
void clientConnected(const QString &clientId);
void clientDisconnected(const QString &clientId);
void publishReceived(const QString &clientId, const QString &topic, const QByteArray &payload);
void clientSubscribed(const QString &clientId, const QString &topicFilter);
void clientUnsubscribed(const QString &clientId, const QString &topicFilter);
void policyAdded(const MqttPolicy &policy);
void policyChanged(const MqttPolicy &policy);
void policyRemoved(const MqttPolicy &policy);
private:
MqttServer* m_server = nullptr;
NymeaMqttAuthorizer *m_authorizer = nullptr;
QHash<int, ServerConfiguration> m_configs;
QHash<QString, MqttPolicy> m_policies;
friend class NymeaMqttAuthorizer;
};
}
#endif // MQTTBROKER_H