add API to request a client connected to the internal mqtt broker to mqttprovier
This commit is contained in:
parent
5847d6831d
commit
5bf1fb5bc4
@ -19,18 +19,21 @@ MqttProviderImplementation::MqttProviderImplementation(MqttBroker *broker, QObje
|
||||
|
||||
MqttChannel *MqttProviderImplementation::createChannel(const DeviceId &deviceId, const QHostAddress &clientAddress)
|
||||
{
|
||||
if (m_broker->configurations().isEmpty()) {
|
||||
qCWarning(dcMqtt) << "MQTT broker not running. Cannot create a channel for device" << deviceId;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
MqttChannelImplementation* channel = new MqttChannelImplementation();
|
||||
channel->m_clientId = deviceId.toString().remove(QRegExp("[{}-]"));
|
||||
channel->m_username = QUuid::createUuid().toString().remove(QRegExp("[{}-]"));
|
||||
channel->m_password = QUuid::createUuid().toString().remove(QRegExp("[{}-]"));
|
||||
channel->m_topicPrefix = channel->m_clientId;
|
||||
|
||||
|
||||
foreach (const QNetworkInterface &interface, QNetworkInterface::allInterfaces()) {
|
||||
qCDebug(dcMqtt) << "### Interface:" << interface.name();
|
||||
foreach (const QNetworkAddressEntry &addressEntry, interface.addressEntries()) {
|
||||
qCDebug(dcMqtt) << "#### Address entry:" << addressEntry.ip();
|
||||
if (clientAddress.isInSubnet(addressEntry.ip(), addressEntry.prefixLength())) {
|
||||
qCDebug(dcMqtt) << "##### Is in subnet";
|
||||
foreach (const ServerConfiguration &config, m_broker->configurations()) {
|
||||
if (config.address == QHostAddress("0.0.0.0") || clientAddress.isInSubnet(config.address, addressEntry.prefixLength())) {
|
||||
channel->m_serverAddress = addressEntry.ip();
|
||||
@ -38,7 +41,6 @@ MqttChannel *MqttProviderImplementation::createChannel(const DeviceId &deviceId,
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -47,7 +49,7 @@ MqttChannel *MqttProviderImplementation::createChannel(const DeviceId &deviceId,
|
||||
delete channel;
|
||||
return nullptr;
|
||||
}
|
||||
qCDebug(dcMqtt).nospace() << "Found matching MQTT server interface " << channel->m_serverAddress << ":" << channel->m_serverPort << " for client IP " << clientAddress;
|
||||
qCDebug(dcMqtt) << "Suitable MQTT server for" << clientAddress.toString() << "found at" << channel->m_serverAddress.toString() << "on port" << channel->m_serverPort;
|
||||
|
||||
connect(channel, &MqttChannelImplementation::pluginPublished, this, &MqttProviderImplementation::onPluginPublished);
|
||||
|
||||
@ -77,6 +79,48 @@ void MqttProviderImplementation::releaseChannel(MqttChannel *channel)
|
||||
delete channel;
|
||||
}
|
||||
|
||||
MqttClient *MqttProviderImplementation::createInternalClient(const DeviceId &deviceId)
|
||||
{
|
||||
|
||||
ServerConfiguration preferredConfig;
|
||||
foreach (const ServerConfiguration &config, m_broker->configurations()) {
|
||||
if (config.address == QHostAddress::Any
|
||||
|| config.address == QHostAddress::AnyIPv4
|
||||
|| config.address == QHostAddress::LocalHost) {
|
||||
preferredConfig = config;
|
||||
break;
|
||||
}
|
||||
preferredConfig = config;
|
||||
}
|
||||
if (preferredConfig.id.isEmpty()) {
|
||||
qCWarning(dcMqtt) << "Unable to find a matching MQTT server port for internal client";
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
QString clientId = deviceId.toString().remove(QRegExp("[{}-]"));
|
||||
MqttPolicy policy;
|
||||
policy.clientId = clientId;
|
||||
policy.username = QUuid::createUuid().toString().remove(QRegExp("[{}-]"));
|
||||
policy.password = QUuid::createUuid().toString().remove(QRegExp("[{}-]"));
|
||||
policy.allowedPublishTopicFilters.append("#");
|
||||
policy.allowedSubscribeTopicFilters.append("#");
|
||||
m_broker->updatePolicy(policy);
|
||||
|
||||
MqttClient* client = new MqttClient(clientId, this);
|
||||
client->setUsername(policy.username);
|
||||
client->setPassword(policy.password);
|
||||
client->setAutoReconnect(false);
|
||||
|
||||
if (preferredConfig.address == QHostAddress::Any
|
||||
|| preferredConfig.address == QHostAddress::AnyIPv4
|
||||
|| preferredConfig.address == QHostAddress::LocalHost) {
|
||||
client->connectToHost("127.0.0.1", preferredConfig.port);
|
||||
} else {
|
||||
client->connectToHost(preferredConfig.address.toString(), preferredConfig.port);
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
bool MqttProviderImplementation::available() const
|
||||
{
|
||||
return m_broker->isRunning();
|
||||
|
||||
@ -17,6 +17,8 @@ public:
|
||||
MqttChannel* createChannel(const DeviceId &deviceId, const QHostAddress &clientAddress) override;
|
||||
void releaseChannel(MqttChannel* channel) override;
|
||||
|
||||
MqttClient* createInternalClient(const DeviceId &deviceId) override;
|
||||
|
||||
bool available() const override;
|
||||
bool enabled() const override;
|
||||
void setEnabled(bool enabled) override;
|
||||
|
||||
@ -6,6 +6,7 @@
|
||||
|
||||
#include "typeutils.h"
|
||||
#include "hardwareresource.h"
|
||||
#include "nymea-mqtt/mqttclient.h"
|
||||
|
||||
class MqttChannel;
|
||||
|
||||
@ -17,6 +18,8 @@ public:
|
||||
|
||||
virtual MqttChannel* createChannel(const DeviceId &deviceId, const QHostAddress &clientAddress) = 0;
|
||||
virtual void releaseChannel(MqttChannel *channel) = 0;
|
||||
|
||||
virtual MqttClient* createInternalClient(const DeviceId &deviceId) = 0;
|
||||
};
|
||||
|
||||
#endif // MQTTPROVIDER_H
|
||||
|
||||
Reference in New Issue
Block a user