add support for an MQTT hardware resource

This commit is contained in:
Michael Zanetti 2018-11-23 10:21:18 +01:00
parent 751359c528
commit 5847d6831d
20 changed files with 404 additions and 11 deletions

View File

@ -0,0 +1,45 @@
#include "mqttchannelimplementation.h"
namespace nymeaserver {
MqttChannelImplementation::MqttChannelImplementation() : MqttChannel()
{
}
QString MqttChannelImplementation::clientId() const
{
return m_clientId;
}
QString MqttChannelImplementation::username() const
{
return m_username;
}
QString MqttChannelImplementation::password() const
{
return m_password;
}
QHostAddress MqttChannelImplementation::serverAddress() const
{
return m_serverAddress;
}
quint16 MqttChannelImplementation::serverPort() const
{
return m_serverPort;
}
QString MqttChannelImplementation::topicPrefix() const
{
return m_topicPrefix;
}
void MqttChannelImplementation::publish(const QString &topic, const QByteArray &payload)
{
emit pluginPublished(topic, payload);
}
}

View File

@ -0,0 +1,39 @@
#ifndef MQTTCHANNELIMPLEMENTATION_H
#define MQTTCHANNELIMPLEMENTATION_H
#include "network/mqtt/mqttchannel.h"
namespace nymeaserver {
class MqttChannelImplementation : public MqttChannel
{
Q_OBJECT
public:
explicit MqttChannelImplementation();
QString clientId() const override;
QString username() const override;
QString password() const override;
QHostAddress serverAddress() const override;
quint16 serverPort() const override;
QString topicPrefix() const override;
void publish(const QString &topic, const QByteArray &payload) override;
signals:
void pluginPublished(const QString &topic, const QByteArray &payload);
private:
QString m_clientId;
QString m_username;
QString m_password;
QHostAddress m_serverAddress;
quint16 m_serverPort;
QString m_topicPrefix;
friend class MqttProviderImplementation;
};
}
#endif // MQTTCHANNELIMPLEMENTATION_H

View File

@ -0,0 +1,130 @@
#include "mqttproviderimplementation.h"
#include "mqttchannelimplementation.h"
#include "loggingcategories.h"
#include <QtDebug>
#include <QUuid>
#include <QNetworkInterface>
namespace nymeaserver {
MqttProviderImplementation::MqttProviderImplementation(MqttBroker *broker, QObject *parent):
MqttProvider(parent),
m_broker(broker)
{
connect(broker, &MqttBroker::clientConnected, this, &MqttProviderImplementation::onClientConnected);
connect(broker, &MqttBroker::clientDisconnected, this, &MqttProviderImplementation::onClientDisconnected);
connect(broker, &MqttBroker::publishReceived, this, &MqttProviderImplementation::onPublishReceived);
}
MqttChannel *MqttProviderImplementation::createChannel(const DeviceId &deviceId, const QHostAddress &clientAddress)
{
MqttChannelImplementation* channel = new MqttChannelImplementation();
channel->m_clientId = deviceId.toString().remove(QRegExp("[{}-]"));
channel->m_username = QUuid::createUuid().toString().remove(QRegExp("[{}-]"));
channel->m_password = QUuid::createUuid().toString().remove(QRegExp("[{}-]"));
channel->m_topicPrefix = channel->m_clientId;
foreach (const QNetworkInterface &interface, QNetworkInterface::allInterfaces()) {
qCDebug(dcMqtt) << "### Interface:" << interface.name();
foreach (const QNetworkAddressEntry &addressEntry, interface.addressEntries()) {
qCDebug(dcMqtt) << "#### Address entry:" << addressEntry.ip();
if (clientAddress.isInSubnet(addressEntry.ip(), addressEntry.prefixLength())) {
qCDebug(dcMqtt) << "##### Is in subnet";
foreach (const ServerConfiguration &config, m_broker->configurations()) {
if (config.address == QHostAddress("0.0.0.0") || clientAddress.isInSubnet(config.address, addressEntry.prefixLength())) {
channel->m_serverAddress = addressEntry.ip();
channel->m_serverPort = config.port;
break;
}
}
}
}
}
if (channel->serverAddress().isNull()) {
qCWarning(dcMqtt) << "Unable to find a matching MQTT server port for client address" << clientAddress.toString();
delete channel;
return nullptr;
}
qCDebug(dcMqtt).nospace() << "Found matching MQTT server interface " << channel->m_serverAddress << ":" << channel->m_serverPort << " for client IP " << clientAddress;
connect(channel, &MqttChannelImplementation::pluginPublished, this, &MqttProviderImplementation::onPluginPublished);
m_createdChannels.insert(channel->clientId(), channel);
// Create a policy for this client
MqttPolicy policy;
policy.clientId = channel->clientId();
policy.username = channel->username();
policy.password = channel->password();
policy.allowedPublishTopicFilters.append(QString("%1/#").arg(channel->m_topicPrefix));
policy.allowedSubscribeTopicFilters.append(QString("%1/#").arg(channel->m_topicPrefix));
m_broker->updatePolicy(policy);
return channel;
}
void MqttProviderImplementation::releaseChannel(MqttChannel *channel)
{
if (!m_createdChannels.contains(channel->clientId())) {
qCWarning(dcMqtt) << "ReleaseChannel called for a channel we don't manage. Potential memory leak!";
return;
}
m_createdChannels.take(channel->clientId());
m_broker->removePolicy(channel->clientId());
qCDebug(dcMqtt) << "Released MQTT channel for client ID" << channel->clientId();
delete channel;
}
bool MqttProviderImplementation::available() const
{
return m_broker->isRunning();
}
bool MqttProviderImplementation::enabled() const
{
return available();
}
void MqttProviderImplementation::setEnabled(bool enabled)
{
Q_UNUSED(enabled)
qCWarning(dcMqtt) << "MQTT hardware resource cannot be disabled";
}
void MqttProviderImplementation::onClientConnected(const QString &clientId)
{
if (m_createdChannels.contains(clientId)) {
MqttChannel* channel = m_createdChannels.value(clientId);
emit channel->clientConnected(channel);
}
}
void MqttProviderImplementation::onClientDisconnected(const QString &clientId)
{
if (m_createdChannels.contains(clientId)) {
MqttChannel *channel = m_createdChannels.value(clientId);
emit channel->clientDisconnected(channel);
}
}
void MqttProviderImplementation::onPublishReceived(const QString &clientId, const QString &topic, const QByteArray &payload)
{
if (m_createdChannels.contains(clientId)) {
MqttChannel* channel = m_createdChannels.value(clientId);
emit channel->publishReceived(channel, topic, payload);
}
}
void MqttProviderImplementation::onPluginPublished(const QString &topic, const QByteArray &payload)
{
MqttChannelImplementation *channel = static_cast<MqttChannelImplementation*>(sender());
if (!topic.startsWith(channel->topicPrefix())) {
qCWarning(dcMqtt) << "Attempt to publish to MQTT channel for client" << channel->clientId() << "but topic is not within allowed topic prefix. Discarding message.";
return;
}
m_broker->publish(topic, payload);
}
}

View File

@ -0,0 +1,38 @@
#ifndef MQTTPROVIDERIMPLEMENTATION_H
#define MQTTPROVIDERIMPLEMENTATION_H
#include <QObject>
#include "servers/mqttbroker.h"
#include "network/mqtt/mqttprovider.h"
namespace nymeaserver {
class MqttProviderImplementation : public MqttProvider
{
Q_OBJECT
public:
explicit MqttProviderImplementation(MqttBroker *broker, QObject *parent = nullptr);
MqttChannel* createChannel(const DeviceId &deviceId, const QHostAddress &clientAddress) override;
void releaseChannel(MqttChannel* channel) override;
bool available() const override;
bool enabled() const override;
void setEnabled(bool enabled) override;
private slots:
void onClientConnected(const QString &clientId);
void onClientDisconnected(const QString &clientId);
void onPublishReceived(const QString &clientId, const QString &topic, const QByteArray &payload);
void onPluginPublished(const QString &topic, const QByteArray &payload);
private:
MqttBroker* m_broker = nullptr;
QHash<QString, MqttChannel*> m_createdChannels;
};
}
#endif // MQTTPROVIDERIMPLEMENTATION_H

View File

@ -29,10 +29,11 @@
#include "hardware/radio433/radio433brennenstuhl.h"
#include "hardware/bluetoothlowenergy/bluetoothlowenergymanagerimplementation.h"
#include "hardware/network/avahi/qtavahiservicebrowserimplementation.h"
#include "hardware/network/mqtt/mqttproviderimplementation.h"
namespace nymeaserver {
HardwareManagerImplementation::HardwareManagerImplementation(QObject *parent) :
HardwareManagerImplementation::HardwareManagerImplementation(MqttBroker *mqttBroker, QObject *parent) :
HardwareManager(parent)
{
// Create network access manager for all resources, centralized
@ -74,6 +75,8 @@ HardwareManagerImplementation::HardwareManagerImplementation(QObject *parent) :
if (m_bluetoothLowEnergyManager->available())
setResourceEnabled(m_bluetoothLowEnergyManager, true);
m_mqttProvider = new MqttProviderImplementation(mqttBroker, this);
}
HardwareManagerImplementation::~HardwareManagerImplementation()
@ -110,4 +113,9 @@ BluetoothLowEnergyManager *HardwareManagerImplementation::bluetoothLowEnergyMana
return m_bluetoothLowEnergyManager;
}
MqttProvider *HardwareManagerImplementation::mqttProvider()
{
return m_mqttProvider;
}
}

View File

@ -37,16 +37,17 @@ class UpnpDeviceDescriptor;
class QtAvahiServiceBrowser;
class BluetoothLowEnergyManager;
namespace nymeaserver {
class MqttBroker;
class HardwareManagerImplementation : public HardwareManager
{
Q_OBJECT
public:
explicit HardwareManagerImplementation(QObject *parent = nullptr);
~HardwareManagerImplementation();
explicit HardwareManagerImplementation(MqttBroker *mqttBroker, QObject *parent = nullptr);
~HardwareManagerImplementation() override;
Radio433 *radio433() override;
PluginTimerManager *pluginTimerManager() override;
@ -54,6 +55,7 @@ public:
UpnpDiscovery *upnpDiscovery() override;
QtAvahiServiceBrowser *avahiBrowser() override;
BluetoothLowEnergyManager *bluetoothLowEnergyManager() override;
MqttProvider *mqttProvider() override;
private:
QNetworkAccessManager *m_networkAccessManager = nullptr;
@ -65,6 +67,7 @@ private:
UpnpDiscovery *m_upnpDiscovery = nullptr;
QtAvahiServiceBrowser *m_avahiBrowser = nullptr;
BluetoothLowEnergyManager *m_bluetoothLowEnergyManager = nullptr;
MqttProvider *m_mqttProvider = nullptr;
};
}

View File

@ -380,7 +380,9 @@ QVariantMap NetworkManagerHandler::packNetworkManagerStatus()
void NetworkManagerHandler::onNetworkManagerStatusChanged()
{
emit NetworkStatusChanged(packNetworkManagerStatus());
QVariantMap notification;
notification.insert("status", packNetworkManagerStatus());
emit NetworkStatusChanged(notification);
}
void NetworkManagerHandler::onWirelessNetworkDeviceAdded(WirelessNetworkDevice *networkDevice)

View File

@ -31,7 +31,7 @@ class NetworkManagerHandler : public JsonHandler
{
Q_OBJECT
public:
explicit NetworkManagerHandler(QObject *parent = 0);
explicit NetworkManagerHandler(QObject *parent = nullptr);
QString name() const;

View File

@ -93,6 +93,8 @@ HEADERS += nymeacore.h \
hardware/network/avahi/qtavahiservice_p.h \
hardware/network/avahi/qtavahiservicebrowserimplementation.h \
hardware/network/avahi/qtavahiservicebrowserimplementation_p.h \
hardware/network/mqtt/mqttproviderimplementation.h \
hardware/network/mqtt/mqttchannelimplementation.h \
debugserverhandler.h \
tagging/tagsstorage.h \
tagging/tag.h \
@ -176,9 +178,11 @@ SOURCES += nymeacore.cpp \
hardware/network/avahi/qtavahiservice_p.cpp \
hardware/network/avahi/qtavahiservicebrowserimplementation.cpp \
hardware/network/avahi/qtavahiservicebrowserimplementation_p.cpp \
hardware/network/mqtt/mqttproviderimplementation.cpp \
hardware/network/mqtt/mqttchannelimplementation.cpp \
debugserverhandler.cpp \
tagging/tagsstorage.cpp \
tagging/tag.cpp \
jsonrpc/tagshandler.cpp \
cloud/cloudtransport.cpp \
debugreportgenerator.cpp
debugreportgenerator.cpp \

View File

@ -188,6 +188,19 @@ NymeaConfiguration::NymeaConfiguration(QObject *parent) :
storeServerConfig("MqttServer", config);
}
NymeaSettings mqttPolicies(NymeaSettings::SettingsRoleMqttPolicies);
foreach (const QString &clientId, mqttPolicies.childGroups()) {
mqttPolicies.beginGroup(clientId);
MqttPolicy policy;
policy.clientId = clientId;
policy.username = mqttPolicies.value("username").toString();
policy.password = mqttPolicies.value("password").toString();
policy.allowedPublishTopicFilters = mqttPolicies.value("allowedPublishTopicFilters").toStringList();
policy.allowedSubscribeTopicFilters = mqttPolicies.value("allowedSubscribeTopicFilters").toStringList();
m_mqttPolicies.insert(clientId, policy);
mqttPolicies.endGroup();
}
// Write defaults for log settings
settings.beginGroup("Logs");
settings.setValue("logDBDriver", logDBDriver());

View File

@ -524,8 +524,11 @@ void NymeaCore::init() {
qCDebug(dcApplication) << "Creating Log Engine";
m_logger = new LogEngine(m_configuration->logDBDriver(), m_configuration->logDBName(), m_configuration->logDBHost(), m_configuration->logDBUser(), m_configuration->logDBPassword(), m_configuration->logDBMaxEntries(), this);
qCDebug(dcApplication) << "Creating Server Manager";
m_serverManager = new ServerManager(m_configuration, this);
qCDebug(dcApplication) << "Creating Hardware Manager";
m_hardwareManager = new HardwareManagerImplementation(this);
m_hardwareManager = new HardwareManagerImplementation(m_serverManager->mqttBroker(), this);
qCDebug(dcApplication) << "Creating Device Manager (locale:" << m_configuration->locale() << ")";
m_deviceManager = new DeviceManager(m_hardwareManager, m_configuration->locale(), this);
@ -539,9 +542,6 @@ void NymeaCore::init() {
qCDebug(dcApplication()) << "Creating Tags Storage";
m_tagsStorage = new TagsStorage(m_deviceManager, m_ruleEngine, this);
qCDebug(dcApplication) << "Creating Server Manager";
m_serverManager = new ServerManager(m_configuration, this);
qCDebug(dcApplication) << "Creating Network Manager";
m_networkManager = new NetworkManager(this);

View File

@ -134,6 +134,7 @@ ServerManager::ServerManager(NymeaConfiguration *configuration, QObject *parent)
foreach (const ServerConfiguration &config, configuration->mqttServerConfigurations()) {
m_mqttBroker->startServer(config);
}
m_mqttBroker->updatePolicies(configuration->mqttPolicies().values());
connect(configuration, &NymeaConfiguration::tcpServerConfigurationChanged, this, &ServerManager::tcpServerConfigurationChanged);
connect(configuration, &NymeaConfiguration::tcpServerConfigurationRemoved, this, &ServerManager::tcpServerConfigurationRemoved);

View File

@ -130,6 +130,16 @@ bool MqttBroker::isRunning(const QString &configId) const
return false;
}
bool MqttBroker::isRunning() const
{
return !m_configs.isEmpty();
}
QList<ServerConfiguration> MqttBroker::configurations() const
{
return m_configs.values();
}
void MqttBroker::stopServer(const QString &configId)
{
int serverAddressId = -1;
@ -171,9 +181,21 @@ void MqttBroker::updatePolicy(const MqttPolicy &policy)
emit policyAdded(policy);
}
void MqttBroker::updatePolicies(const QList<MqttPolicy> &policies)
{
foreach (const MqttPolicy &policy, policies) {
updatePolicy(policy);
}
}
bool MqttBroker::removePolicy(const QString &clientId)
{
if (m_policies.contains(clientId)) {
// Is there a client connected for this policy?
if (m_server->clients().contains(clientId)) {
m_server->disconnectClient(clientId);
}
qCDebug(dcMqtt) << "Policy for client" << clientId << "removed";
emit policyRemoved(m_policies.take(clientId));
return true;
@ -181,6 +203,11 @@ bool MqttBroker::removePolicy(const QString &clientId)
return false;
}
void MqttBroker::publish(const QString &topic, const QByteArray &payload)
{
m_server->publish(topic, payload);
}
void MqttBroker::onClientConnected(int serverAddressId, const QString &clientId, const QString &username, const QHostAddress &clientAddress)
{
Q_UNUSED(serverAddressId)

View File

@ -23,13 +23,18 @@ public:
bool startServer(const ServerConfiguration &config, const QSslConfiguration &sslConfiguration = QSslConfiguration());
bool isRunning(const QString &configId) const;
bool isRunning() const;
QList<ServerConfiguration> configurations() const;
void stopServer(const QString &configId);
QList<MqttPolicy> policies();
MqttPolicy policy(const QString &clientId);
void updatePolicy(const MqttPolicy &policy);
void updatePolicies(const QList<MqttPolicy> &policies);
bool removePolicy(const QString &clientId);
void publish(const QString &topic, const QByteArray &payload);
private slots:
void onClientConnected(int serverAddressId, const QString &clientId, const QString &username, const QHostAddress &clientAddress);
void onClientDisconnected(const QString &clientId);

View File

@ -32,6 +32,7 @@ class NetworkAccessManager;
class UpnpDeviceDescriptor;
class QtAvahiServiceBrowser;
class BluetoothLowEnergyManager;
class MqttProvider;
class HardwareResource;
class HardwareManager : public QObject
@ -48,6 +49,7 @@ public:
virtual UpnpDiscovery *upnpDiscovery() = 0;
virtual QtAvahiServiceBrowser *avahiBrowser() = 0;
virtual BluetoothLowEnergyManager *bluetoothLowEnergyManager() = 0;
virtual MqttProvider *mqttProvider() = 0;
protected:
void setResourceEnabled(HardwareResource* resource, bool enabled);

View File

@ -72,6 +72,8 @@ HEADERS += devicemanager.h \
plugintimer.h \
hardwaremanager.h \
nymeadbusservice.h \
network/mqtt/mqttprovider.h \
network/mqtt/mqttchannel.h
SOURCES += devicemanager.cpp \
loggingcategories.cpp \
@ -129,6 +131,8 @@ SOURCES += devicemanager.cpp \
plugintimer.cpp \
hardwaremanager.cpp \
nymeadbusservice.cpp \
network/mqtt/mqttprovider.cpp \
network/mqtt/mqttchannel.cpp
# install plugininfo python script for libnymea-dev

View File

@ -0,0 +1,12 @@
#include "mqttchannel.h"
MqttChannel::MqttChannel(QObject *parent): QObject(parent)
{
}
MqttChannel::~MqttChannel()
{
}

View File

@ -0,0 +1,31 @@
#ifndef MQTTCHANNEL_H
#define MQTTCHANNEL_H
#include <QObject>
#include <QHostAddress>
#include "libnymea.h"
class LIBNYMEA_EXPORT MqttChannel: public QObject
{
Q_OBJECT
public:
MqttChannel(QObject *parent = nullptr);
virtual ~MqttChannel();
virtual QString clientId() const = 0;
virtual QString username() const = 0;
virtual QString password() const = 0;
virtual QHostAddress serverAddress() const = 0;
virtual quint16 serverPort() const = 0;
virtual QString topicPrefix() const = 0;
virtual void publish(const QString &topic, const QByteArray &payload) = 0;
signals:
void clientConnected(MqttChannel* channel);
void clientDisconnected(MqttChannel* channel);
void publishReceived(MqttChannel* channel, const QString &topic, const QByteArray &payload);
};
#endif // MQTTCHANNEL_H

View File

@ -0,0 +1,7 @@
#include "mqttprovider.h"
#include "mqttchannel.h"
MqttProvider::MqttProvider(QObject *parent) : HardwareResource("MQTT", parent)
{
}

View File

@ -0,0 +1,22 @@
#ifndef MQTTPROVIDER_H
#define MQTTPROVIDER_H
#include <QObject>
#include <QHostAddress>
#include "typeutils.h"
#include "hardwareresource.h"
class MqttChannel;
class MqttProvider : public HardwareResource
{
Q_OBJECT
public:
explicit MqttProvider(QObject *parent = nullptr);
virtual MqttChannel* createChannel(const DeviceId &deviceId, const QHostAddress &clientAddress) = 0;
virtual void releaseChannel(MqttChannel *channel) = 0;
};
#endif // MQTTPROVIDER_H