replace emqtt with nymea-mqtt

This commit is contained in:
Michael Zanetti 2018-11-28 12:19:34 +01:00
parent 5bf1fb5bc4
commit 1d903862ed
5 changed files with 31 additions and 34 deletions

2
debian/control vendored
View File

@ -23,7 +23,7 @@ Build-Depends: debhelper (>= 9.0.0),
libavahi-client-dev,
libavahi-common-dev,
libssl-dev,
libqmqtt-dev,
libnymea-mqtt-dev,
dbus-test-runner,
Package: nymea

View File

@ -91,23 +91,21 @@ void AWSConnector::doConnect()
sslConfig.setCaCertificates({caCertificate});
m_client = new QMQTT::Client(m_currentEndpoint, 8883, sslConfig, true, this);
m_client->setClientId(m_clientId);
m_client->setVersion(QMQTT::V3_1_1);
m_client = new MqttClient(m_clientId, this);
m_client->setKeepAlive(30);
m_client->setCleanSession(true);
m_client->setAutoReconnect(true);
m_client->connectToHost();
qWarning() << "Connecting MQTT to" << m_currentEndpoint;
m_client->connectToHost(m_currentEndpoint, 8883, true, true, sslConfig);
connect(m_client, &QMQTT::Client::connected, this, &AWSConnector::onConnected);
connect(m_client, &QMQTT::Client::disconnected, this, &AWSConnector::onDisconnected);
connect(m_client, &QMQTT::Client::error, this, [](const QMQTT::ClientError error){
connect(m_client, &MqttClient::connected, this, &AWSConnector::onConnected);
connect(m_client, &MqttClient::disconnected, this, &AWSConnector::onDisconnected);
connect(m_client, &MqttClient::error, this, [](const QAbstractSocket::SocketError error){
qCWarning(dcAWS()) << "An error happened in the MQTT transport" << error;
});
connect(m_client, &QMQTT::Client::subscribed, this, &AWSConnector::onSubscribed);
connect(m_client, &QMQTT::Client::received, this, &AWSConnector::onSubscriptionReceived);
connect(m_client, &QMQTT::Client::published, this, &AWSConnector::onPublished);
connect(m_client, &MqttClient::subscribed, this, &AWSConnector::onSubscribed);
connect(m_client, &MqttClient::publishReceived, this, &AWSConnector::onPublishReceived);
connect(m_client, &MqttClient::published, this, &AWSConnector::onPublished);
}
void AWSConnector::onConnected()
@ -228,7 +226,7 @@ void AWSConnector::disconnectAWS()
bool AWSConnector::isConnected() const
{
return m_client && m_client->isConnectedToHost() && !m_setupInProgress;
return m_client && m_client->isConnected() && !m_setupInProgress;
}
void AWSConnector::setDeviceName(const QString &deviceName)
@ -297,9 +295,8 @@ quint16 AWSConnector::publish(const QString &topic, const QVariantMap &message)
}
QJsonDocument jsonDoc = QJsonDocument::fromVariant(message);
QMQTT::Message msg(0, topic, jsonDoc.toJson(QJsonDocument::Compact), 1);
qCDebug(dcAWSTraffic()) << "Publishing:" << topic << jsonDoc.toJson(QJsonDocument::Compact);
quint16 packetId = m_client->publish(msg);
quint16 packetId = m_client->publish(topic, jsonDoc.toJson(QJsonDocument::Compact), Mqtt::QoS1, false);
return packetId;
}
@ -335,7 +332,7 @@ void AWSConnector::onDisconnected()
if (m_shouldReconnect) {
qCDebug(dcAWS()) << "Reconnecting to AWS...";
doConnect();
QTimer::singleShot(1000, this, &AWSConnector::doConnect);
}
}
@ -370,19 +367,20 @@ void AWSConnector::subscribe(const QStringList &topics)
continue;
}
qCDebug(dcAWSTraffic()) << "Topic to subscribe is" << topic;
m_client->subscribe(topic, 1);
MqttSubscription subscription(topic.toUtf8(), Mqtt::QoS1);
m_client->subscribe(subscription);
m_subscriptionCache.append(topic);
}
}
void AWSConnector::onPublished(const QMQTT::Message& message, quint16 msgid)
void AWSConnector::onPublished(quint16 msgid, const QString &topic)
{
qCDebug(dcAWS()) << "Published message:" << message.topic() << msgid;
qCDebug(dcAWS()) << "Published message:" << msgid << topic;
}
void AWSConnector::onSubscribed(const QString& topic, const quint8 qos)
void AWSConnector::onSubscribed(const QString &topic, Mqtt::SubscribeReturnCode returnCode)
{
qCDebug(dcAWSTraffic()) << "Subscribed to topic:" << topic << qos;
qCDebug(dcAWSTraffic()) << "Subscribed to topic:" << topic << returnCode;
if (topic.startsWith("create/device/")) {
qCDebug(dcAWS()) << "Subscribed to create/device/";
@ -398,18 +396,17 @@ void AWSConnector::onSubscribed(const QString& topic, const quint8 qos)
}
}
void AWSConnector::onSubscriptionReceived(const QMQTT::Message &message)
void AWSConnector::onPublishReceived(const QString &topic, const QByteArray &payload)
{
QJsonParseError error;
QJsonDocument jsonDoc = QJsonDocument::fromJson(message.payload(), &error);
QJsonDocument jsonDoc = QJsonDocument::fromJson(payload, &error);
if (error.error != QJsonParseError::NoError) {
qCDebug(dcAWS()) << "Failed to parse JSON from AWS subscription on topic" << message.topic() << ":" << error.errorString() << "\n" << message.payload();
qCDebug(dcAWS()) << "Failed to parse JSON from AWS subscription on topic" << topic << ":" << error.errorString() << "\n" << payload;
return;
}
qCDebug(dcAWSTraffic()) << "Subscription received: Topic:" << message.topic() << "payload:" << message.payload();
qCDebug(dcAWSTraffic()) << "Subscription received: Topic:" << topic << "payload:" << payload;
QString topic = message.topic();
if (topic.startsWith("create/device/")) {
int statusCode = jsonDoc.toVariant().toMap().value("result").toMap().value("code").toInt();
switch (statusCode) {
@ -505,7 +502,7 @@ void AWSConnector::onSubscriptionReceived(const QMQTT::Message &message)
}
onTurnCredentialsReceived(turnCreds.value("turnCredentials").toMap());
} else {
qCWarning(dcAWS()) << "Unhandled subscription received!" << topic << message.payload();
qCWarning(dcAWS()) << "Unhandled subscription received!" << topic << payload;
}
}

View File

@ -25,7 +25,7 @@
#include <QFuture>
#include <QDateTime>
#include <qmqtt/qmqtt.h>
#include <nymea-mqtt/mqttclient.h>
class AWSConnector : public QObject
{
@ -70,9 +70,9 @@ private slots:
void doConnect();
void onConnected();
void onDisconnected();
void onPublished(const QMQTT::Message &message, quint16 msgid);
void onSubscribed(const QString& topic, const quint8 qos);
void onSubscriptionReceived(const QMQTT::Message &message);
void onPublished(quint16 msgid, const QString &topic);
void onSubscribed(const QString &topic, Mqtt::SubscribeReturnCode returnCode);
void onPublishReceived(const QString &topic, const QByteArray &payload);
void registerDevice();
void onDeviceRegistered(bool needsReconnect);
@ -96,7 +96,7 @@ private:
QString getCertificateFingerprint(const QString &certificateFilePath) const;
private:
QMQTT::Client *m_client = nullptr;
MqttClient *m_client = nullptr;
QString m_currentEndpoint;
QString m_caFile;
QString m_clientCertFile;

View File

@ -3,7 +3,7 @@ TARGET = nymea-core
include(../nymea.pri)
QT += sql qmqtt
QT += sql
INCLUDEPATH += $$top_srcdir/libnymea
LIBS += -L$$top_builddir/libnymea/ -lnymea -lssl -lcrypto -lavahi-common -lavahi-client -lnymea-mqtt

View File

@ -8,7 +8,7 @@ INCLUDEPATH += ../libnymea ../libnymea-core
target.path = /usr/bin
INSTALLS += target
QT *= sql xml websockets bluetooth dbus network qmqtt
QT *= sql xml websockets bluetooth dbus network
LIBS += -L$$top_builddir/libnymea/ -lnymea \
-L$$top_builddir/libnymea-core -lnymea-core \