switch from aws sdk to qmqtt
This commit is contained in:
parent
637a30700b
commit
096a285212
2
debian/control
vendored
2
debian/control
vendored
@ -23,7 +23,7 @@ Build-Depends: debhelper (>= 9.0.0),
|
||||
libavahi-client-dev,
|
||||
libavahi-common-dev,
|
||||
libssl-dev,
|
||||
libaws-iot-device-sdk-cpp,
|
||||
libqmqtt-dev,
|
||||
dbus-test-runner,
|
||||
|
||||
Package: nymea
|
||||
|
||||
@ -30,35 +30,21 @@
|
||||
#include <QSettings>
|
||||
#include <QSslCertificate>
|
||||
#include <QFile>
|
||||
|
||||
using namespace awsiotsdk;
|
||||
using namespace awsiotsdk::network;
|
||||
using namespace awsiotsdk::mqtt;
|
||||
|
||||
QHash<quint16, AWSConnector*> AWSConnector::s_requestMap;
|
||||
|
||||
// Somehow the linker fails to find this... missing in awsiotsdk?
|
||||
DisconnectCallbackContextData::~DisconnectCallbackContextData() {}
|
||||
#include <QSslKey>
|
||||
|
||||
AWSConnector::AWSConnector(QObject *parent) : QObject(parent)
|
||||
{
|
||||
// Enable some AWS logging (does not regard our logging categories)
|
||||
std::shared_ptr<awsiotsdk::util::Logging::ConsoleLogSystem> p_log_system =
|
||||
std::make_shared<awsiotsdk::util::Logging::ConsoleLogSystem>(awsiotsdk::util::Logging::LogLevel::Info);
|
||||
awsiotsdk::util::Logging::InitializeAWSLogging(p_log_system);
|
||||
m_disconnectContextData = std::shared_ptr<awsiotsdk::DisconnectCallbackContextData>(new DisconnectContext(this));
|
||||
m_subscriptionContextData = std::shared_ptr<awsiotsdk::mqtt::SubscriptionHandlerContextData>(new SubscriptionContext(this));
|
||||
|
||||
qRegisterMetaType<AWSConnector::PushNotificationsEndpoint>();
|
||||
m_clientName = readSyncedNameCache();
|
||||
}
|
||||
|
||||
AWSConnector::~AWSConnector()
|
||||
{
|
||||
qCDebug(dcAWS()) << "Stopping AWS connection. This might take a while...";
|
||||
m_client.reset();
|
||||
m_networkConnection.reset();
|
||||
qCDebug(dcAWS()) << "AWS connection stopped.";
|
||||
if (m_client) {
|
||||
m_client->disconnectFromHost();
|
||||
qCDebug(dcAWS()) << "Disconnected from AWS.";
|
||||
emit disconnected();
|
||||
}
|
||||
}
|
||||
|
||||
void AWSConnector::connect2AWS(const QString &endpoint, const QString &clientId, const QString &clientName, const QString &caFile, const QString &clientCertFile, const QString &clientPrivKeyFile)
|
||||
@ -71,8 +57,12 @@ void AWSConnector::connect2AWS(const QString &endpoint, const QString &clientId,
|
||||
m_clientId = clientId;
|
||||
m_clientName = clientName;
|
||||
|
||||
m_client.reset();
|
||||
m_networkConnection.reset();
|
||||
if (m_client) {
|
||||
m_shouldReconnect = true;
|
||||
m_client->disconnectFromHost();
|
||||
qCDebug(dcAWS()) << "Disconnecting from AWS";
|
||||
return;
|
||||
}
|
||||
|
||||
doConnect();
|
||||
}
|
||||
@ -81,35 +71,42 @@ void AWSConnector::doConnect()
|
||||
{
|
||||
m_setupInProgress = true;
|
||||
m_subscriptionCache.clear();
|
||||
m_networkConnection = std::shared_ptr<OpenSSLConnection>(new OpenSSLConnection(
|
||||
m_currentEndpoint.toStdString(),
|
||||
8883,
|
||||
m_caFile.toStdString(),
|
||||
m_clientCertFile.toStdString(),
|
||||
m_clientPrivKeyFile.toStdString(),
|
||||
std::chrono::milliseconds(3000),
|
||||
std::chrono::milliseconds(3000),
|
||||
std::chrono::milliseconds(3000),
|
||||
true
|
||||
));
|
||||
m_networkConnection->Initialize();
|
||||
m_client = MqttClient::Create(m_networkConnection, std::chrono::milliseconds(2800), &onDisconnectedCallback, m_disconnectContextData);
|
||||
|
||||
m_client->SetAutoReconnectEnabled(false);
|
||||
m_client->SetMaxReconnectBackoffTimeout(std::chrono::seconds(10));
|
||||
QSslConfiguration sslConfig = QSslConfiguration::defaultConfiguration();
|
||||
QFile certFile(m_clientCertFile);
|
||||
certFile.open(QFile::ReadOnly);
|
||||
QSslCertificate certificate(certFile.readAll());
|
||||
|
||||
qCDebug(dcAWS()) << "Connecting to AWS with ID:" << m_clientId << "endpoint:" << m_currentEndpoint << "Certificate file:" << m_clientCertFile << "Fingerprint:" << getCertificateFingerprint(m_clientCertFile);
|
||||
m_connectingFuture = QtConcurrent::run([&]() {
|
||||
ResponseCode rc = m_client->Connect(std::chrono::milliseconds(10000), true, mqtt::Version::MQTT_3_1_1, std::chrono::seconds(30), Utf8String::Create(m_clientId.toStdString()), nullptr, nullptr, nullptr);
|
||||
if (rc == ResponseCode::MQTT_CONNACK_CONNECTION_ACCEPTED) {
|
||||
staticMetaObject.invokeMethod(this, "onConnected", Qt::QueuedConnection);
|
||||
} else {
|
||||
qCWarning(dcAWS) << "Error connecting to AWS. Response code:" << QString::fromStdString(ResponseHelper::ToString(rc));
|
||||
m_client.reset();
|
||||
m_networkConnection.reset();
|
||||
QTimer::singleShot(10000, this, &AWSConnector::doConnect);
|
||||
}
|
||||
QFile keyFile(m_clientPrivKeyFile);
|
||||
keyFile.open(QFile::ReadOnly);
|
||||
QSslKey key(keyFile.readAll(), QSsl::Rsa);
|
||||
|
||||
sslConfig.setLocalCertificate(certificate);
|
||||
sslConfig.setPrivateKey(key);
|
||||
sslConfig.setPeerVerifyMode(QSslSocket::VerifyNone);
|
||||
|
||||
QFile caCertFile(m_caFile);
|
||||
caCertFile.open(QFile::ReadOnly);
|
||||
QSslCertificate caCertificate(caCertFile.readAll());
|
||||
|
||||
sslConfig.setCaCertificates({caCertificate});
|
||||
|
||||
m_client = new QMQTT::Client(m_currentEndpoint, 8883, sslConfig, true, this);
|
||||
m_client->setClientId(m_clientId);
|
||||
m_client->setVersion(QMQTT::V3_1_1);
|
||||
m_client->setKeepAlive(30*60);
|
||||
m_client->setCleanSession(true);
|
||||
m_client->connectToHost();
|
||||
|
||||
connect(m_client, &QMQTT::Client::connected, this, &AWSConnector::onConnected);
|
||||
connect(m_client, &QMQTT::Client::disconnected, this, &AWSConnector::onDisconnected);
|
||||
connect(m_client, &QMQTT::Client::error, this, [](const QMQTT::ClientError error){
|
||||
qDebug() << "error" << error;
|
||||
});
|
||||
|
||||
connect(m_client, &QMQTT::Client::subscribed, this, &AWSConnector::onSubscribed);
|
||||
connect(m_client, &QMQTT::Client::received, this, &AWSConnector::onSubscriptionReceived);
|
||||
connect(m_client, &QMQTT::Client::published, this, &AWSConnector::onPublished);
|
||||
}
|
||||
|
||||
void AWSConnector::onConnected()
|
||||
@ -135,7 +132,7 @@ void AWSConnector::registerDevice()
|
||||
m_createDeviceId = QUuid::createUuid().toString().remove(QRegExp("[{}]*"));
|
||||
|
||||
// first subscribe to this tmp id topic
|
||||
m_createDeviceSubscriptionId = subscribe({QString("create/device/%1").arg(m_createDeviceId)});
|
||||
subscribe({QString("create/device/%1").arg(m_createDeviceId)});
|
||||
}
|
||||
|
||||
void AWSConnector::onDeviceRegistered(bool needsReconnect)
|
||||
@ -144,12 +141,7 @@ void AWSConnector::onDeviceRegistered(bool needsReconnect)
|
||||
|
||||
if (needsReconnect) {
|
||||
qCDebug(dcAWS()) << "Disconnecting from AWS and reconnecting to use new policies";
|
||||
QtConcurrent::run([&]() {
|
||||
m_client->Disconnect(std::chrono::milliseconds(500));
|
||||
m_client.reset();
|
||||
m_networkConnection.reset();
|
||||
QTimer::singleShot(1000, this, &AWSConnector::doConnect);
|
||||
});
|
||||
m_client->disconnectFromHost();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -183,6 +175,9 @@ void AWSConnector::fetchPairings()
|
||||
|
||||
void AWSConnector::onPairingsRetrieved(const QVariantMap &pairings)
|
||||
{
|
||||
m_setupInProgress = false;
|
||||
emit connected();
|
||||
|
||||
qCDebug(dcAWS) << pairings.value("users").toList().count() << "devices paired in cloud.";
|
||||
if (pairings.value("users").toList().count() > 0) {
|
||||
QStringList topics;
|
||||
@ -209,14 +204,12 @@ void AWSConnector::onPairingsRetrieved(const QVariantMap &pairings)
|
||||
}
|
||||
}
|
||||
}
|
||||
emit pushNotificationEndpointsUpdated(pushNotificationEndpoints);
|
||||
|
||||
if (readSyncedNameCache() != m_clientName) {
|
||||
setName();
|
||||
}
|
||||
|
||||
m_setupInProgress = false;
|
||||
emit connected();
|
||||
emit pushNotificationEndpointsUpdated(pushNotificationEndpoints);
|
||||
|
||||
requestTURNCredentials();
|
||||
}
|
||||
@ -225,17 +218,14 @@ void AWSConnector::disconnectAWS()
|
||||
{
|
||||
m_shouldReconnect = false;
|
||||
if (isConnected()) {
|
||||
m_client->Disconnect(std::chrono::milliseconds(2000));
|
||||
m_client.reset();
|
||||
m_networkConnection.reset();
|
||||
qCDebug(dcAWS()) << "Disconnected from AWS.";
|
||||
emit disconnected();
|
||||
m_client->disconnectFromHost();
|
||||
qCDebug(dcAWS()) << "Disconnecting from AWS.";
|
||||
}
|
||||
}
|
||||
|
||||
bool AWSConnector::isConnected() const
|
||||
{
|
||||
return m_connectingFuture.isFinished() && m_networkConnection && m_client && m_client->IsConnected() && !m_setupInProgress;
|
||||
return m_client && m_client->isConnectedToHost() && !m_setupInProgress;
|
||||
}
|
||||
|
||||
void AWSConnector::setDeviceName(const QString &deviceName)
|
||||
@ -304,21 +294,19 @@ quint16 AWSConnector::publish(const QString &topic, const QVariantMap &message)
|
||||
qCWarning(dcAWS()) << "Can't publish to AWS: Not connected.";
|
||||
return -1;
|
||||
}
|
||||
QString fullTopic = topic;
|
||||
QJsonDocument jsonDoc = QJsonDocument::fromVariant(message);
|
||||
|
||||
uint16_t packetId = 0;
|
||||
ResponseCode res = m_client->PublishAsync(Utf8String::Create(fullTopic.toStdString()), false, false, mqtt::QoS::QOS1, jsonDoc.toJson(QJsonDocument::Compact).toStdString(), &publishCallback, packetId);
|
||||
qCDebug(dcAWSTraffic()) << "publish call queued with status:" << QString::fromStdString(ResponseHelper::ToString(res)) << packetId << "for topic" << topic << jsonDoc.toJson();
|
||||
s_requestMap.insert(packetId, this);
|
||||
QMQTT::Message msg(0, topic, jsonDoc.toJson(QJsonDocument::Compact), 1);
|
||||
qCDebug(dcAWSTraffic()) << "Publishing:" << topic << jsonDoc.toJson(QJsonDocument::Compact);
|
||||
quint16 packetId = m_client->publish(msg);
|
||||
return packetId;
|
||||
}
|
||||
|
||||
void AWSConnector::onDisconnected()
|
||||
{
|
||||
qCDebug(dcAWS) << "AWS disconnected.";
|
||||
m_client.reset();
|
||||
m_networkConnection.reset();
|
||||
m_client->deleteLater();
|
||||
m_client = nullptr;
|
||||
emit disconnected();
|
||||
|
||||
bool needReRegistering = false;
|
||||
@ -373,138 +361,99 @@ void AWSConnector::setName()
|
||||
publish(QString("%1/device/name").arg(m_clientId), params);
|
||||
}
|
||||
|
||||
quint16 AWSConnector::subscribe(const QStringList &topics)
|
||||
void AWSConnector::subscribe(const QStringList &topics)
|
||||
{
|
||||
util::Vector<std::shared_ptr<mqtt::Subscription>> subscription_list;
|
||||
foreach (const QString &topic, topics) {
|
||||
if (m_subscriptionCache.contains(topic)) {
|
||||
qCDebug(dcAWS()) << "Already subscribed to topic:" << topic << ". Not resubscribing";
|
||||
continue;
|
||||
}
|
||||
qCDebug(dcAWSTraffic()) << "Topic to subscribe is" << topic;
|
||||
if (!Subscription::IsValidTopicName(topic.toStdString())) {
|
||||
qCWarning(dcAWS()) << "Trying to subscribe to invalid topic:" << topic;
|
||||
continue;
|
||||
}
|
||||
auto subscription = mqtt::Subscription::Create(Utf8String::Create(topic.toStdString()), mqtt::QoS::QOS1, &onSubscriptionReceivedCallback, m_subscriptionContextData);
|
||||
subscription_list.push_back(subscription);
|
||||
m_client->subscribe(topic, 1);
|
||||
m_subscriptionCache.append(topic);
|
||||
}
|
||||
if (subscription_list.size() == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint16_t packetId;
|
||||
ResponseCode res = m_client->SubscribeAsync(subscription_list, subscribeCallback, packetId);
|
||||
qCDebug(dcAWSTraffic()) << "Subscribe call queued with status:" << QString::fromStdString(ResponseHelper::ToString(res)) << "Packet ID:" << packetId;
|
||||
s_requestMap.insert(packetId, this);
|
||||
return packetId;
|
||||
}
|
||||
|
||||
void AWSConnector::publishCallback(uint16_t actionId, ResponseCode rc)
|
||||
void AWSConnector::onPublished(const QMQTT::Message& message, quint16 msgid)
|
||||
{
|
||||
AWSConnector* obj = s_requestMap.take(actionId);
|
||||
if (!obj) {
|
||||
qCWarning(dcAWS())<< "Received a response callback but don't have an object waiting for it.";
|
||||
return;
|
||||
}
|
||||
|
||||
switch (rc) {
|
||||
case ResponseCode::SUCCESS:
|
||||
qCDebug(dcAWSTraffic()) << "Successfully published" << actionId;
|
||||
break;
|
||||
default:
|
||||
qCDebug(dcAWS())<< "Error publishing data to AWS:" << QString::fromStdString(ResponseHelper::ToString(rc));
|
||||
}
|
||||
qCDebug(dcAWS()) << "Published message:" << message.topic() << msgid;
|
||||
}
|
||||
|
||||
void AWSConnector::subscribeCallback(uint16_t actionId, ResponseCode rc)
|
||||
void AWSConnector::onSubscribed(const QString& topic, const quint8 qos)
|
||||
{
|
||||
if (rc != ResponseCode::SUCCESS) {
|
||||
qCWarning(dcAWS()) << "Error subscribing to" << actionId << QString::fromStdString(ResponseHelper::ToString(rc));
|
||||
return;
|
||||
}
|
||||
qCDebug(dcAWSTraffic()) << "Subscribed to topic:" << topic << qos;
|
||||
|
||||
AWSConnector *connector = s_requestMap.take(actionId);
|
||||
if (!connector) {
|
||||
qCWarning(dcAWS()) << "Received a subscribe callback but don't have a request id for it.";
|
||||
return;
|
||||
}
|
||||
|
||||
if (actionId == connector->m_createDeviceSubscriptionId) {
|
||||
qCDebug(dcAWS()) << "Subscribed to create/device/response";
|
||||
if (topic.startsWith("create/device/")) {
|
||||
qCDebug(dcAWS()) << "Subscribed to create/device/";
|
||||
// We might get this callback even if we didn't explicitly ask for it as the
|
||||
// library automatically resubscribes to all the topics upon reconnect.
|
||||
if (!connector->readRegisteredFlag()) {
|
||||
if (!readRegisteredFlag()) {
|
||||
QVariantMap params;
|
||||
params.insert("id", connector->m_createDeviceId);
|
||||
params.insert("UUID", connector->m_clientId);
|
||||
connector->publish("create/device", params);
|
||||
params.insert("id", m_createDeviceId);
|
||||
params.insert("UUID", m_clientId);
|
||||
publish("create/device", params);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
qCDebug(dcAWSTraffic()) << "Successfully subscribed (actionId:" << actionId << ")";
|
||||
}
|
||||
|
||||
ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_name, util::String payload, std::shared_ptr<SubscriptionHandlerContextData> p_app_handler_data)
|
||||
void AWSConnector::onSubscriptionReceived(const QMQTT::Message &message)
|
||||
{
|
||||
QJsonParseError error;
|
||||
QJsonDocument jsonDoc = QJsonDocument::fromJson(QByteArray::fromStdString(payload), &error);
|
||||
QJsonDocument jsonDoc = QJsonDocument::fromJson(message.payload(), &error);
|
||||
if (error.error != QJsonParseError::NoError) {
|
||||
qCDebug(dcAWS()) << "Failed to parse JSON from AWS subscription on topic" << QString::fromStdString(topic_name) << ":" << error.errorString() << "\n" << QString::fromStdString(payload);
|
||||
return ResponseCode::JSON_PARSING_ERROR;
|
||||
qCDebug(dcAWS()) << "Failed to parse JSON from AWS subscription on topic" << message.topic() << ":" << error.errorString() << "\n" << message.payload();
|
||||
return;
|
||||
}
|
||||
|
||||
qCDebug(dcAWSTraffic()) << "Subscription received: Topic:" << QString::fromStdString(topic_name) << "payload:" << QString::fromStdString(payload);
|
||||
qCDebug(dcAWSTraffic()) << "Subscription received: Topic:" << message.topic() << "payload:" << message.payload();
|
||||
|
||||
AWSConnector *connector = dynamic_cast<SubscriptionContext*>(p_app_handler_data.get())->c;
|
||||
QString topic = QString::fromStdString(topic_name);
|
||||
QString topic = message.topic();
|
||||
if (topic.startsWith("create/device/")) {
|
||||
int statusCode = jsonDoc.toVariant().toMap().value("result").toMap().value("code").toInt();
|
||||
switch (statusCode) {
|
||||
case 201:
|
||||
qCDebug(dcAWS()) << "Device successfully registered to the cloud server:" << statusCode << jsonDoc.toVariant().toMap().value("result").toMap().value("message").toString();
|
||||
connector->staticMetaObject.invokeMethod(connector, "onDeviceRegistered", Qt::QueuedConnection, Q_ARG(bool, true));
|
||||
return ResponseCode::SUCCESS;
|
||||
onDeviceRegistered(true);
|
||||
return;
|
||||
case 200:
|
||||
qCDebug(dcAWS()) << "Device already known to the cloud server:" << statusCode << jsonDoc.toVariant().toMap().value("result").toMap().value("message").toString();
|
||||
// Ok, we have confirmation that everything went fine and we can proceed, let's remember that to minimize traffic.
|
||||
connector->staticMetaObject.invokeMethod(connector, "onDeviceRegistered", Qt::QueuedConnection, Q_ARG(bool, false));
|
||||
onDeviceRegistered(false);
|
||||
break;
|
||||
default:
|
||||
qCWarning(dcAWS()) << "Error registering device in the cloud. AWS connetion will not work:" << statusCode << jsonDoc.toVariant().toMap().value("result").toMap().value("message").toString();
|
||||
return ResponseCode::SUCCESS;
|
||||
return;
|
||||
}
|
||||
} else if (topic == QString("%1/pair/response").arg(connector->m_clientId)) {
|
||||
} else if (topic == QString("%1/pair/response").arg(m_clientId)) {
|
||||
int statusCode = jsonDoc.toVariant().toMap().value("status").toInt();
|
||||
int id = jsonDoc.toVariant().toMap().value("id").toInt();
|
||||
quint16 id = jsonDoc.toVariant().toMap().value("id").toUInt();
|
||||
QString message = jsonDoc.toVariant().toMap().value("result").toMap().value("message").toString();
|
||||
QString userId = connector->m_pairingRequests.take(id);
|
||||
QString userId = m_pairingRequests.take(id);
|
||||
if (statusCode != 200) {
|
||||
qCWarning(dcAWS()) << "Pairing failed:" << statusCode << message;
|
||||
emit connector->devicePaired(userId, statusCode, message);
|
||||
emit devicePaired(userId, statusCode, message);
|
||||
} else if (!userId.isEmpty()) {
|
||||
qCDebug(dcAWS()) << "Pairing response for id:" << userId << statusCode;
|
||||
emit connector->devicePaired(userId, statusCode, message);
|
||||
connector->staticMetaObject.invokeMethod(connector, "fetchPairings", Qt::QueuedConnection);
|
||||
emit devicePaired(userId, statusCode, message);
|
||||
fetchPairings();
|
||||
} else {
|
||||
qCWarning(dcAWS()) << "Received a pairing response for a transaction we didn't start";
|
||||
}
|
||||
} else if (topic == QString("%1/device/users/response").arg(connector->m_clientId)) {
|
||||
connector->staticMetaObject.invokeMethod(connector, "onPairingsRetrieved", Qt::QueuedConnection, Q_ARG(QVariantMap, jsonDoc.toVariant().toMap()));
|
||||
} else if (topic == QString("%1/device/name/response").arg(connector->m_clientId)) {
|
||||
} else if (topic == QString("%1/device/users/response").arg(m_clientId)) {
|
||||
onPairingsRetrieved(jsonDoc.toVariant().toMap());
|
||||
} else if (topic == QString("%1/device/name/response").arg(m_clientId)) {
|
||||
qCDebug(dcAWS) << "Set device name in cloud with status:" << jsonDoc.toVariant().toMap().value("status").toInt();
|
||||
if (jsonDoc.toVariant().toMap().value("status").toInt() == 200) {
|
||||
connector->storeSyncedNameCache(connector->m_clientName);
|
||||
storeSyncedNameCache(m_clientName);
|
||||
}
|
||||
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(connector->m_clientId)) && !topic.contains("reply") && !topic.contains("proxy")) {
|
||||
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(m_clientId)) && !topic.contains("reply") && !topic.contains("proxy")) {
|
||||
static QHash<QString, QDateTime> dupes;
|
||||
QString id = jsonDoc.toVariant().toMap().value("id").toString();
|
||||
QString type = jsonDoc.toVariant().toMap().value("type").toString();
|
||||
if (dupes.contains(id+type)) {
|
||||
qCDebug(dcAWS()) << "Dropping duplicate packet";
|
||||
return ResponseCode::SUCCESS;
|
||||
return;
|
||||
}
|
||||
dupes.insert(id+type, QDateTime::currentDateTime());
|
||||
foreach (const QString &dupe, dupes.keys()) {
|
||||
@ -513,17 +462,17 @@ ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_nam
|
||||
}
|
||||
}
|
||||
qCDebug(dcAWS) << "received webrtc handshake message.";
|
||||
connector->webRtcHandshakeMessageReceived(topic, jsonDoc.toVariant().toMap());
|
||||
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(connector->m_clientId)) && topic.contains("reply")) {
|
||||
webRtcHandshakeMessageReceived(topic, jsonDoc.toVariant().toMap());
|
||||
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(m_clientId)) && topic.contains("reply")) {
|
||||
// silently drop our own things (should not be subscribed to that in the first place)
|
||||
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(connector->m_clientId)) && topic.contains("proxy")) {
|
||||
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(m_clientId)) && topic.contains("proxy")) {
|
||||
QString token = jsonDoc.toVariant().toMap().value("token").toString();
|
||||
qlonglong timestamp = jsonDoc.toVariant().toMap().value("timestamp").toLongLong();
|
||||
static QHash<QString, QDateTime> dupes;
|
||||
QString packetId = topic + token + QString::number(timestamp);
|
||||
if (dupes.contains(packetId)) {
|
||||
qCDebug(dcAWS()) << "Dropping duplicate packet";
|
||||
return ResponseCode::SUCCESS;
|
||||
return;
|
||||
}
|
||||
dupes.insert(packetId, QDateTime::currentDateTime());
|
||||
foreach (const QString &dupe, dupes.keys()) {
|
||||
@ -532,13 +481,13 @@ ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_nam
|
||||
}
|
||||
}
|
||||
qCDebug(dcAWS) << "Proxy remote connection request received";
|
||||
connector->staticMetaObject.invokeMethod(connector, "proxyConnectionRequestReceived", Qt::QueuedConnection, Q_ARG(QString, token));
|
||||
} else if (topic == QString("%1/notify/response").arg(connector->m_clientId)) {
|
||||
proxyConnectionRequestReceived(token);
|
||||
} else if (topic == QString("%1/notify/response").arg(m_clientId)) {
|
||||
int transactionId = jsonDoc.toVariant().toMap().value("id").toInt();
|
||||
int status = jsonDoc.toVariant().toMap().value("status").toInt();
|
||||
qCDebug(dcAWS()) << "Push notification reply for transaction" << transactionId << " Status:" << status << jsonDoc.toVariant().toMap().value("message").toString();
|
||||
emit connector->pushNotificationSent(transactionId, status);
|
||||
} else if (topic == QString("%1/notify/info/endpoint").arg(connector->m_clientId)) {
|
||||
emit pushNotificationSent(transactionId, status);
|
||||
} else if (topic == QString("%1/notify/info/endpoint").arg(m_clientId)) {
|
||||
QVariantMap endpoint = jsonDoc.toVariant().toMap().value("newPushNotificationsEndpoint").toMap();
|
||||
Q_ASSERT(endpoint.keys().count() == 1);
|
||||
QString cognitoId = endpoint.keys().first();
|
||||
@ -546,26 +495,17 @@ ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_nam
|
||||
ep.userId = cognitoId;
|
||||
ep.endpointId = endpoint.value(cognitoId).toMap().value("endpointId").toString();
|
||||
ep.displayName = endpoint.value(cognitoId).toMap().value("displayName").toString();
|
||||
emit connector->pushNotificationEndpointAdded(ep);
|
||||
} else if (topic == QString("%1/services/turn/response").arg(connector->m_clientId)) {
|
||||
emit pushNotificationEndpointAdded(ep);
|
||||
} else if (topic == QString("%1/services/turn/response").arg(m_clientId)) {
|
||||
QVariantMap turnCreds = jsonDoc.toVariant().toMap();
|
||||
if (turnCreds.value("result").toMap().value("code").toInt() != 201) {
|
||||
qCWarning(dcAWS()) << "Error retrieving TURN credentials:" << turnCreds.value("result").toMap().value("code").toInt() << turnCreds.value("result").toMap().value("message").toString();
|
||||
return ResponseCode::SUCCESS;
|
||||
return;
|
||||
}
|
||||
connector->staticMetaObject.invokeMethod(connector, "onTurnCredentialsReceived", Qt::QueuedConnection, Q_ARG(QVariantMap, turnCreds.value("turnCredentials").toMap()));
|
||||
onTurnCredentialsReceived(turnCreds.value("turnCredentials").toMap());
|
||||
} else {
|
||||
qCWarning(dcAWS()) << "Unhandled subscription received!" << topic << QString::fromStdString(payload);
|
||||
qCWarning(dcAWS()) << "Unhandled subscription received!" << topic << message.payload();
|
||||
}
|
||||
return ResponseCode::SUCCESS;
|
||||
}
|
||||
|
||||
ResponseCode AWSConnector::onDisconnectedCallback(util::String mqtt_client_id, std::shared_ptr<DisconnectCallbackContextData> p_app_handler_data)
|
||||
{
|
||||
Q_UNUSED(mqtt_client_id)
|
||||
AWSConnector* connector = dynamic_cast<DisconnectContext*>(p_app_handler_data.get())->c;
|
||||
connector->staticMetaObject.invokeMethod(connector, "onDisconnected", Qt::QueuedConnection);
|
||||
return ResponseCode::SUCCESS;
|
||||
}
|
||||
|
||||
void AWSConnector::storeRegisteredFlag(bool registered)
|
||||
@ -596,7 +536,7 @@ QString AWSConnector::getCertificateFingerprint(const QString &certificateFile)
|
||||
{
|
||||
QFile certFile(certificateFile);
|
||||
if (!certFile.open(QFile::ReadOnly)) {
|
||||
qCWarning(dcAWS()) << "Error opening certificate file" << certificateFile;
|
||||
qCWarning(dcAWS()) << "Error openi<ng certificate file" << certificateFile;
|
||||
return QString();
|
||||
}
|
||||
QSslCertificate crt = QSslCertificate(certFile.readAll());
|
||||
|
||||
@ -25,18 +25,20 @@
|
||||
#include <QFuture>
|
||||
#include <QDateTime>
|
||||
|
||||
#include "OpenSSL/OpenSSLConnection.hpp"
|
||||
#include <mqtt/Client.hpp>
|
||||
#include <mqtt/Common.hpp>
|
||||
#include "util/logging/Logging.hpp"
|
||||
#include "util/logging/LogMacros.hpp"
|
||||
#include "util/logging/ConsoleLogSystem.hpp"
|
||||
#include <qmqtt/qmqtt.h>
|
||||
|
||||
class AWSConnector : public QObject, public awsiotsdk::mqtt::SubscriptionHandlerContextData, public awsiotsdk::DisconnectCallbackContextData
|
||||
//#include "OpenSSL/OpenSSLConnection.hpp"
|
||||
//#include <mqtt/Client.hpp>
|
||||
//#include <mqtt/Common.hpp>
|
||||
//#include "util/logging/Logging.hpp"
|
||||
//#include "util/logging/LogMacros.hpp"
|
||||
//#include "util/logging/ConsoleLogSystem.hpp"
|
||||
|
||||
class AWSConnector : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
public:
|
||||
explicit AWSConnector(QObject *parent = 0);
|
||||
explicit AWSConnector(QObject *parent = nullptr);
|
||||
~AWSConnector();
|
||||
|
||||
class PushNotificationsEndpoint {
|
||||
@ -74,36 +76,29 @@ signals:
|
||||
private slots:
|
||||
void doConnect();
|
||||
void onConnected();
|
||||
void onDisconnected();
|
||||
void onPublished(const QMQTT::Message &message, quint16 msgid);
|
||||
void onSubscribed(const QString& topic, const quint8 qos);
|
||||
void onSubscriptionReceived(const QMQTT::Message &message);
|
||||
|
||||
void registerDevice();
|
||||
void onDeviceRegistered(bool needsReconnect);
|
||||
void setupSubscriptions();
|
||||
void fetchPairings();
|
||||
void onPairingsRetrieved(const QVariantMap &pairings);
|
||||
void setName();
|
||||
void onDisconnected();
|
||||
void onTurnCredentialsReceived(const QVariantMap &turnCredentials);
|
||||
|
||||
|
||||
private:
|
||||
class SubscriptionContext: public awsiotsdk::mqtt::SubscriptionHandlerContextData
|
||||
{
|
||||
public:
|
||||
SubscriptionContext(AWSConnector *connector): c(connector) {}
|
||||
AWSConnector *c;
|
||||
};
|
||||
class DisconnectContext: public awsiotsdk::DisconnectCallbackContextData
|
||||
{
|
||||
public:
|
||||
DisconnectContext(AWSConnector *connector): c(connector) {}
|
||||
AWSConnector *c;
|
||||
};
|
||||
quint16 publish(const QString &topic, const QVariantMap &message);
|
||||
quint16 subscribe(const QStringList &topics);
|
||||
static void publishCallback(uint16_t actionId, awsiotsdk::ResponseCode rc);
|
||||
static void subscribeCallback(uint16_t actionId, awsiotsdk::ResponseCode rc);
|
||||
static awsiotsdk::ResponseCode onSubscriptionReceivedCallback(awsiotsdk::util::String topic_name, awsiotsdk::util::String payload,
|
||||
std::shared_ptr<awsiotsdk::mqtt::SubscriptionHandlerContextData> p_app_handler_data);
|
||||
static awsiotsdk::ResponseCode onDisconnectedCallback(awsiotsdk::util::String mqtt_client_id,
|
||||
std::shared_ptr<awsiotsdk::DisconnectCallbackContextData> p_app_handler_data);
|
||||
void subscribe(const QStringList &topics);
|
||||
// static void publishCallback(uint16_t actionId, awsiotsdk::ResponseCode rc);
|
||||
// static void subscribeCallback(uint16_t actionId, awsiotsdk::ResponseCode rc);
|
||||
// static awsiotsdk::ResponseCode onSubscriptionReceivedCallback(awsiotsdk::util::String topic_name, awsiotsdk::util::String payload,
|
||||
// std::shared_ptr<awsiotsdk::mqtt::SubscriptionHandlerContextData> p_app_handler_data);
|
||||
// static awsiotsdk::ResponseCode onDisconnectedCallback(awsiotsdk::util::String mqtt_client_id,
|
||||
// std::shared_ptr<awsiotsdk::DisconnectCallbackContextData> p_app_handler_data);
|
||||
|
||||
void storeRegisteredFlag(bool registered);
|
||||
bool readRegisteredFlag() const;
|
||||
@ -114,8 +109,9 @@ private:
|
||||
QString getCertificateFingerprint(const QString &certificateFilePath) const;
|
||||
|
||||
private:
|
||||
std::shared_ptr<awsiotsdk::network::OpenSSLConnection> m_networkConnection;
|
||||
std::shared_ptr<awsiotsdk::MqttClient> m_client;
|
||||
// std::shared_ptr<awsiotsdk::network::OpenSSLConnection> m_networkConnection;
|
||||
// std::shared_ptr<awsiotsdk::MqttClient> m_client;
|
||||
QMQTT::Client *m_client = nullptr;
|
||||
QString m_currentEndpoint;
|
||||
QString m_caFile;
|
||||
QString m_clientCertFile;
|
||||
@ -137,11 +133,11 @@ private:
|
||||
QStringList m_subscriptionCache;
|
||||
QPair<QVariantMap, QDateTime> m_cachedTURNCredentials;
|
||||
|
||||
std::shared_ptr<awsiotsdk::mqtt::SubscriptionHandlerContextData> m_subscriptionContextData;
|
||||
std::shared_ptr<awsiotsdk::DisconnectCallbackContextData> m_disconnectContextData;
|
||||
// std::shared_ptr<awsiotsdk::mqtt::SubscriptionHandlerContextData> m_subscriptionContextData;
|
||||
// std::shared_ptr<awsiotsdk::DisconnectCallbackContextData> m_disconnectContextData;
|
||||
|
||||
static AWSConnector* s_instance;
|
||||
static QHash<quint16, AWSConnector*> s_requestMap;
|
||||
// static AWSConnector* s_instance;
|
||||
// QHash<quint16, AWSConnector*> m_requestMap;
|
||||
};
|
||||
Q_DECLARE_METATYPE(AWSConnector::PushNotificationsEndpoint)
|
||||
|
||||
|
||||
@ -3,7 +3,7 @@ TARGET = nymea-core
|
||||
|
||||
include(../nymea.pri)
|
||||
|
||||
QT += sql
|
||||
QT += sql qmqtt
|
||||
INCLUDEPATH += $$top_srcdir/libnymea
|
||||
LIBS += -L$$top_builddir/libnymea/ -lnymea -lssl -lcrypto -lavahi-common -lavahi-client
|
||||
|
||||
@ -97,7 +97,7 @@ HEADERS += nymeacore.h \
|
||||
tagging/tagsstorage.h \
|
||||
tagging/tag.h \
|
||||
jsonrpc/tagshandler.h \
|
||||
cloud/cloudtransport.h
|
||||
cloud/cloudtransport.h \
|
||||
|
||||
SOURCES += nymeacore.cpp \
|
||||
tcpserver.cpp \
|
||||
@ -154,7 +154,7 @@ SOURCES += nymeacore.cpp \
|
||||
cloud/awsconnector.cpp \
|
||||
cloud/cloudmanager.cpp \
|
||||
cloud/cloudnotifications.cpp \
|
||||
cloud/OpenSSL/OpenSSLConnection.cpp \
|
||||
# cloud/OpenSSL/OpenSSLConnection.cpp \
|
||||
cloud/janusconnector.cpp \
|
||||
pushbuttondbusservice.cpp \
|
||||
hardwaremanagerimplementation.cpp \
|
||||
@ -179,4 +179,4 @@ SOURCES += nymeacore.cpp \
|
||||
tagging/tagsstorage.cpp \
|
||||
tagging/tag.cpp \
|
||||
jsonrpc/tagshandler.cpp \
|
||||
cloud/cloudtransport.cpp
|
||||
cloud/cloudtransport.cpp \
|
||||
|
||||
@ -8,11 +8,11 @@ INCLUDEPATH += ../libnymea ../libnymea-core
|
||||
target.path = /usr/bin
|
||||
INSTALLS += target
|
||||
|
||||
QT *= sql xml websockets bluetooth dbus network
|
||||
QT *= sql xml websockets bluetooth dbus network qmqtt
|
||||
|
||||
LIBS += -L$$top_builddir/libnymea/ -lnymea \
|
||||
-L$$top_builddir/libnymea-core -lnymea-core \
|
||||
-lssl -lcrypto -laws-iot-sdk-cpp -lnymea-remoteproxyclient
|
||||
-lssl -lcrypto -lnymea-remoteproxyclient
|
||||
|
||||
# Server files
|
||||
include(qtservice/qtservice.pri)
|
||||
|
||||
@ -8,7 +8,7 @@ INCLUDEPATH += $$top_srcdir/libnymea \
|
||||
LIBS += -L$$top_builddir/libnymea/ -lnymea \
|
||||
-L$$top_builddir/libnymea-core/ -lnymea-core \
|
||||
-L$$top_builddir/plugins/mock/ \
|
||||
-lssl -lcrypto -laws-iot-sdk-cpp -lavahi-common -lavahi-client -lnymea-remoteproxyclient
|
||||
-lssl -lcrypto -lavahi-common -lavahi-client -lnymea-remoteproxyclient
|
||||
|
||||
SOURCES += ../nymeatestbase.cpp \
|
||||
|
||||
|
||||
Reference in New Issue
Block a user