diff --git a/debian/control b/debian/control index 593c38bb..b9ff6750 100644 --- a/debian/control +++ b/debian/control @@ -23,7 +23,7 @@ Build-Depends: debhelper (>= 9.0.0), libavahi-client-dev, libavahi-common-dev, libssl-dev, - libqmqtt-dev, + libnymea-mqtt-dev, dbus-test-runner, Package: nymea diff --git a/libnymea-core/cloud/awsconnector.cpp b/libnymea-core/cloud/awsconnector.cpp index 338a77cb..880f9c3f 100644 --- a/libnymea-core/cloud/awsconnector.cpp +++ b/libnymea-core/cloud/awsconnector.cpp @@ -91,23 +91,21 @@ void AWSConnector::doConnect() sslConfig.setCaCertificates({caCertificate}); - m_client = new QMQTT::Client(m_currentEndpoint, 8883, sslConfig, true, this); - m_client->setClientId(m_clientId); - m_client->setVersion(QMQTT::V3_1_1); + m_client = new MqttClient(m_clientId, this); m_client->setKeepAlive(30); - m_client->setCleanSession(true); m_client->setAutoReconnect(true); - m_client->connectToHost(); + qWarning() << "Connecting MQTT to" << m_currentEndpoint; + m_client->connectToHost(m_currentEndpoint, 8883, true, true, sslConfig); - connect(m_client, &QMQTT::Client::connected, this, &AWSConnector::onConnected); - connect(m_client, &QMQTT::Client::disconnected, this, &AWSConnector::onDisconnected); - connect(m_client, &QMQTT::Client::error, this, [](const QMQTT::ClientError error){ + connect(m_client, &MqttClient::connected, this, &AWSConnector::onConnected); + connect(m_client, &MqttClient::disconnected, this, &AWSConnector::onDisconnected); + connect(m_client, &MqttClient::error, this, [](const QAbstractSocket::SocketError error){ qCWarning(dcAWS()) << "An error happened in the MQTT transport" << error; }); - connect(m_client, &QMQTT::Client::subscribed, this, &AWSConnector::onSubscribed); - connect(m_client, &QMQTT::Client::received, this, &AWSConnector::onSubscriptionReceived); - connect(m_client, &QMQTT::Client::published, this, &AWSConnector::onPublished); + connect(m_client, &MqttClient::subscribed, this, &AWSConnector::onSubscribed); + connect(m_client, &MqttClient::publishReceived, this, &AWSConnector::onPublishReceived); + connect(m_client, &MqttClient::published, this, &AWSConnector::onPublished); } void AWSConnector::onConnected() @@ -228,7 +226,7 @@ void AWSConnector::disconnectAWS() bool AWSConnector::isConnected() const { - return m_client && m_client->isConnectedToHost() && !m_setupInProgress; + return m_client && m_client->isConnected() && !m_setupInProgress; } void AWSConnector::setDeviceName(const QString &deviceName) @@ -297,9 +295,8 @@ quint16 AWSConnector::publish(const QString &topic, const QVariantMap &message) } QJsonDocument jsonDoc = QJsonDocument::fromVariant(message); - QMQTT::Message msg(0, topic, jsonDoc.toJson(QJsonDocument::Compact), 1); qCDebug(dcAWSTraffic()) << "Publishing:" << topic << jsonDoc.toJson(QJsonDocument::Compact); - quint16 packetId = m_client->publish(msg); + quint16 packetId = m_client->publish(topic, jsonDoc.toJson(QJsonDocument::Compact), Mqtt::QoS1, false); return packetId; } @@ -335,7 +332,7 @@ void AWSConnector::onDisconnected() if (m_shouldReconnect) { qCDebug(dcAWS()) << "Reconnecting to AWS..."; - doConnect(); + QTimer::singleShot(1000, this, &AWSConnector::doConnect); } } @@ -370,19 +367,20 @@ void AWSConnector::subscribe(const QStringList &topics) continue; } qCDebug(dcAWSTraffic()) << "Topic to subscribe is" << topic; - m_client->subscribe(topic, 1); + MqttSubscription subscription(topic.toUtf8(), Mqtt::QoS1); + m_client->subscribe(subscription); m_subscriptionCache.append(topic); } } -void AWSConnector::onPublished(const QMQTT::Message& message, quint16 msgid) +void AWSConnector::onPublished(quint16 msgid, const QString &topic) { - qCDebug(dcAWS()) << "Published message:" << message.topic() << msgid; + qCDebug(dcAWS()) << "Published message:" << msgid << topic; } -void AWSConnector::onSubscribed(const QString& topic, const quint8 qos) +void AWSConnector::onSubscribed(const QString &topic, Mqtt::SubscribeReturnCode returnCode) { - qCDebug(dcAWSTraffic()) << "Subscribed to topic:" << topic << qos; + qCDebug(dcAWSTraffic()) << "Subscribed to topic:" << topic << returnCode; if (topic.startsWith("create/device/")) { qCDebug(dcAWS()) << "Subscribed to create/device/"; @@ -398,18 +396,17 @@ void AWSConnector::onSubscribed(const QString& topic, const quint8 qos) } } -void AWSConnector::onSubscriptionReceived(const QMQTT::Message &message) +void AWSConnector::onPublishReceived(const QString &topic, const QByteArray &payload) { QJsonParseError error; - QJsonDocument jsonDoc = QJsonDocument::fromJson(message.payload(), &error); + QJsonDocument jsonDoc = QJsonDocument::fromJson(payload, &error); if (error.error != QJsonParseError::NoError) { - qCDebug(dcAWS()) << "Failed to parse JSON from AWS subscription on topic" << message.topic() << ":" << error.errorString() << "\n" << message.payload(); + qCDebug(dcAWS()) << "Failed to parse JSON from AWS subscription on topic" << topic << ":" << error.errorString() << "\n" << payload; return; } - qCDebug(dcAWSTraffic()) << "Subscription received: Topic:" << message.topic() << "payload:" << message.payload(); + qCDebug(dcAWSTraffic()) << "Subscription received: Topic:" << topic << "payload:" << payload; - QString topic = message.topic(); if (topic.startsWith("create/device/")) { int statusCode = jsonDoc.toVariant().toMap().value("result").toMap().value("code").toInt(); switch (statusCode) { @@ -505,7 +502,7 @@ void AWSConnector::onSubscriptionReceived(const QMQTT::Message &message) } onTurnCredentialsReceived(turnCreds.value("turnCredentials").toMap()); } else { - qCWarning(dcAWS()) << "Unhandled subscription received!" << topic << message.payload(); + qCWarning(dcAWS()) << "Unhandled subscription received!" << topic << payload; } } diff --git a/libnymea-core/cloud/awsconnector.h b/libnymea-core/cloud/awsconnector.h index 817e36cb..fcaeb1c3 100644 --- a/libnymea-core/cloud/awsconnector.h +++ b/libnymea-core/cloud/awsconnector.h @@ -25,7 +25,7 @@ #include #include -#include +#include class AWSConnector : public QObject { @@ -70,9 +70,9 @@ private slots: void doConnect(); void onConnected(); void onDisconnected(); - void onPublished(const QMQTT::Message &message, quint16 msgid); - void onSubscribed(const QString& topic, const quint8 qos); - void onSubscriptionReceived(const QMQTT::Message &message); + void onPublished(quint16 msgid, const QString &topic); + void onSubscribed(const QString &topic, Mqtt::SubscribeReturnCode returnCode); + void onPublishReceived(const QString &topic, const QByteArray &payload); void registerDevice(); void onDeviceRegistered(bool needsReconnect); @@ -96,7 +96,7 @@ private: QString getCertificateFingerprint(const QString &certificateFilePath) const; private: - QMQTT::Client *m_client = nullptr; + MqttClient *m_client = nullptr; QString m_currentEndpoint; QString m_caFile; QString m_clientCertFile; diff --git a/libnymea-core/libnymea-core.pro b/libnymea-core/libnymea-core.pro index 8ba45769..b76a9780 100644 --- a/libnymea-core/libnymea-core.pro +++ b/libnymea-core/libnymea-core.pro @@ -3,7 +3,7 @@ TARGET = nymea-core include(../nymea.pri) -QT += sql qmqtt +QT += sql INCLUDEPATH += $$top_srcdir/libnymea LIBS += -L$$top_builddir/libnymea/ -lnymea -lssl -lcrypto -lavahi-common -lavahi-client -lnymea-mqtt diff --git a/server/server.pro b/server/server.pro index 66db0702..0ae74e99 100644 --- a/server/server.pro +++ b/server/server.pro @@ -8,7 +8,7 @@ INCLUDEPATH += ../libnymea ../libnymea-core target.path = /usr/bin INSTALLS += target -QT *= sql xml websockets bluetooth dbus network qmqtt +QT *= sql xml websockets bluetooth dbus network LIBS += -L$$top_builddir/libnymea/ -lnymea \ -L$$top_builddir/libnymea-core -lnymea-core \