From 6aa8ad8a5d41e57604147c072841959e1c8f1fb3 Mon Sep 17 00:00:00 2001 From: Michael Zanetti Date: Sat, 7 Oct 2017 15:37:20 +0200 Subject: [PATCH] rework AWS connection setup should be much more robust now --- libguh-core/awsconnector.cpp | 252 ++++++++++++++++++++++++----------- libguh-core/awsconnector.h | 35 ++++- libguh-core/cloudmanager.cpp | 5 + libguh-core/cloudmanager.h | 1 + 4 files changed, 211 insertions(+), 82 deletions(-) diff --git a/libguh-core/awsconnector.cpp b/libguh-core/awsconnector.cpp index e124d145..3234e21d 100644 --- a/libguh-core/awsconnector.cpp +++ b/libguh-core/awsconnector.cpp @@ -20,12 +20,14 @@ #include "awsconnector.h" #include "loggingcategories.h" +#include "guhsettings.h" #include #include #include #include #include +#include using namespace awsiotsdk; using namespace awsiotsdk::network; @@ -33,19 +35,28 @@ using namespace awsiotsdk::mqtt; QHash AWSConnector::s_requestMap; +// Somehow the linker fails to find this... missing in awsiotsdk? +DisconnectCallbackContextData::~DisconnectCallbackContextData() {} + AWSConnector::AWSConnector(QObject *parent) : QObject(parent) { connect(this, &AWSConnector::connected, this, &AWSConnector::onConnected, Qt::QueuedConnection); connect(this, &AWSConnector::disconnected, this, &AWSConnector::onDisconnected, Qt::QueuedConnection); // Enable some AWS logging (does not regard our logging categories) - std::shared_ptr p_log_system = - std::make_shared(awsiotsdk::util::Logging::LogLevel::Info); - awsiotsdk::util::Logging::InitializeAWSLogging(p_log_system); +// std::shared_ptr p_log_system = +// std::make_shared(awsiotsdk::util::Logging::LogLevel::Info); +// awsiotsdk::util::Logging::InitializeAWSLogging(p_log_system); + m_disconnectContextData = std::shared_ptr(new DisconnectContext(this)); + m_subscriptionContextData = std::shared_ptr(new SubscriptionContext(this)); } AWSConnector::~AWSConnector() { + qCDebug(dcAWS()) << "Stopping AWS connection. This might take a while..."; + m_client.reset(); + m_networkConnection.reset(); + qCDebug(dcAWS()) << "AWS connection stopped."; } void AWSConnector::connect2AWS(const QString &endpoint, const QString &clientId, const QString &clientName, const QString &caFile, const QString &clientCertFile, const QString &clientPrivKeyFile) @@ -54,39 +65,123 @@ void AWSConnector::connect2AWS(const QString &endpoint, const QString &clientId, m_caFile = caFile; m_clientCertFile = clientCertFile; m_clientPrivKeyFile = clientPrivKeyFile; - m_reconnect = true; + m_clientId = clientId; + m_clientName = clientName; + + m_client.reset(); + m_networkConnection.reset(); + + doConnect(); +} + +void AWSConnector::doConnect() +{ + m_setupInProgress = true; m_networkConnection = std::shared_ptr(new MbedTLSConnection( - endpoint.toStdString(), + m_currentEndpoint.toStdString(), 8883, - caFile.toStdString(), - clientCertFile.toStdString(), - clientPrivKeyFile.toStdString(), + m_caFile.toStdString(), + m_clientCertFile.toStdString(), + m_clientPrivKeyFile.toStdString(), std::chrono::milliseconds(30000), std::chrono::milliseconds(30000), std::chrono::milliseconds(30000), true )); - m_client = MqttClient::Create(m_networkConnection, std::chrono::milliseconds(30000), &onDisconnectedCallback, std::shared_ptr(this)); - m_clientId = clientId; - m_clientName = clientName; + m_client = MqttClient::Create(m_networkConnection, std::chrono::milliseconds(30000), &onDisconnectedCallback, m_disconnectContextData); m_client->SetAutoReconnectEnabled(true); m_client->SetMaxReconnectBackoffTimeout(std::chrono::seconds(10)); - qCDebug(dcAWS()) << "Connecting to AWS with ID:" << m_clientId << "endpoint:" << endpoint << m_client->GetMinReconnectBackoffTimeout().count() << (quint32)m_client->GetMaxReconnectBackoffTimeout().count(); + qCDebug(dcAWS()) << "Connecting to AWS with ID:" << m_clientId << "endpoint:" << m_currentEndpoint << "Min reconnect timeout:" << m_client->GetMinReconnectBackoffTimeout().count() << "Max reconnect timeout:" << (quint32)m_client->GetMaxReconnectBackoffTimeout().count(); m_connectingFuture = QtConcurrent::run([&]() { - ResponseCode rc = m_client->Connect(std::chrono::milliseconds(30000), true, mqtt::Version::MQTT_3_1_1, std::chrono::seconds(60), Utf8String::Create(m_clientId.toStdString()), nullptr, nullptr, nullptr); + ResponseCode rc = m_client->Connect(std::chrono::milliseconds(10000), true, mqtt::Version::MQTT_3_1_1, std::chrono::seconds(60), Utf8String::Create(m_clientId.toStdString()), nullptr, nullptr, nullptr); if (rc == ResponseCode::MQTT_CONNACK_CONNECTION_ACCEPTED) { emit connected(); } else { qCWarning(dcAWS) << "Error connecting to AWS. Response code:" << QString::fromStdString(ResponseHelper::ToString(rc)); m_client.reset(); m_networkConnection.reset(); + QTimer::singleShot(10000, this, &AWSConnector::doConnect); } }); } -DisconnectCallbackContextData::~DisconnectCallbackContextData() {} +void AWSConnector::onConnected() +{ + if (!readRegisteredFlag()) { + qCDebug(dcAWS()) << "AWS connected. Device not registered yet. Registering..."; + registerDevice(); + return; + } + qCDebug(dcAWS()) << "AWS connected. Device already registered in cloud."; + + // OK, we're registerd already, go straight to pairing setup + setupPairing(); + +// // TODO: remove this again. just using this for testing now +// QStringList subscriptions; +// subscriptions.append(QString("eu-west-1:%1/%2/#").arg("7127d36f-9644-455c-bb14-4a23bfac65fe").arg(m_clientId)); +// subscribe(subscriptions); +} + +void AWSConnector::registerDevice() +{ + // We create a temporary UUID for which will be used by the server to post the reply to our create/device call. + // Before the first create/device call the cloud doesn't know about us. In order to receive the reply for the + // call we need to subscribe to a topic every device can subscribe to. If we'd use our deviceId, a potential + // black hat could snoop in all the devices we register on the system. So in case someone actually does that + // let's give him meaningless IDs instead of real device ids. + m_createDeviceId = QUuid::createUuid().toString().remove(QRegExp("[{}]*")); + + // first subscribe to this tmp id topic + m_createDeviceSubscriptionId = subscribe({QString("create/device/%1").arg(m_createDeviceId)}); +} + +void AWSConnector::onDeviceRegistered(bool needsReconnect) +{ + storeRegisteredFlag(true); + + if (needsReconnect) { + qCDebug(dcAWS()) << "Disconnecting from AWS and reconnecting to use new policies"; + QtConcurrent::run([&]() { + m_client->Disconnect(std::chrono::milliseconds(500)); + m_client.reset(); + m_networkConnection.reset(); + staticMetaObject.invokeMethod(this, "doConnect", Qt::QueuedConnection); + }); + return; + } + + setupPairing(); +} + +void AWSConnector::setupPairing() +{ + // Subscribe to pairing info topics + QStringList subscriptions; + subscriptions.append(QString("%1/device/users/response").arg(m_clientId)); + subscriptions.append(QString("%1/pair/response").arg(m_clientId)); + subscribe(subscriptions); + + // fetch previous pairings + QVariantMap params; + params.insert("timestamp", QDateTime::currentMSecsSinceEpoch()); + params.insert("id", ++m_transactionId); + params.insert("command", "getUsers"); + publish(QString("%1/device/users").arg(m_clientId), params); +} + +void AWSConnector::onPairingsRetrieved(const QVariantList &pairings) +{ + QStringList topics; + foreach (const QVariant &pairing, pairings) { + topics << QString("eu-west-1:%1/%2/#").arg(pairing.toMap().value("cognitoIdIdentityId").toString()).arg(m_clientId); + } + subscribe(topics); + + m_setupInProgress = false; +} void AWSConnector::disconnectAWS() { @@ -118,7 +213,6 @@ void AWSConnector::sendWebRtcHandshakeMessage(const QString &sessionId, const QV publish(sessionId + "/reply", map); } - quint16 AWSConnector::publish(const QString &topic, const QVariantMap &message) { if (!isConnected()) { @@ -135,48 +229,35 @@ quint16 AWSConnector::publish(const QString &topic, const QVariantMap &message) return packetId; } -void AWSConnector::onConnected() -{ - qCDebug(dcAWS()) << "AWS connected"; - m_client->SetAutoReconnectEnabled(true); - registerDevice(); - - // TODO: remove this again. just using this for testing now to skip the registerDevice step - QStringList subscriptions; - subscriptions.append(QString("%1/pair/response").arg(m_clientId)); - subscriptions.append(QString("%1/device/users/response").arg(m_clientId)); - subscriptions.append(QString("eu-west-1:%1/%2/#").arg("7127d36f-9644-455c-bb14-4a23bfac65fe").arg(m_clientId)); - subscribe(subscriptions); - retrievePairedDeviceInfo(); -} - void AWSConnector::onDisconnected() { + bool needReRegistering = false; + if (m_setupInProgress) { + qCWarning(dcAWS()) << "Setup process interrupted by disconnect."; + needReRegistering = true; + } else { + if (m_lastConnectionDrop.addSecs(60) > QDateTime::currentDateTime()) { + m_reconnectCounter++; + } else { + m_reconnectCounter = 0; + } + m_lastConnectionDrop = QDateTime::currentDateTime(); + if (m_reconnectCounter > 5) { + qCWarning(dcAWS()) << "Connection dropped 5 times in a row within a minute."; + needReRegistering = true; + } + return; + } + + if (needReRegistering) { + qCDebug(dcAWS) << "Trying to reregister the device in the cloud"; + storeRegisteredFlag(false); + doConnect(); + } + qCDebug(dcAWS) << "AWS disconnected. (should reconnect on it's own)"; } -void AWSConnector::retrievePairedDeviceInfo() -{ - QVariantMap params; - params.insert("timestamp", QDateTime::currentMSecsSinceEpoch()); - params.insert("id", ++m_transactionId); - params.insert("command", "getUsers"); - publish(QString("%1/device/users").arg(m_clientId), params); -} - -void AWSConnector::registerDevice() -{ - // We create a temporary UUID for which will be used by the server to post the reply to our create/device call. - // Before the first create/device call the cloud doesn't know about us. In order to receive the reply for the - // call we need to subscribe to a topic every device can subscribe to. If we'd use our deviceId, a potential - // black hat could snoop in all the devices we register on the system. So in case someone actually does that - // let's give him meaningless IDs instead of real device ids. - m_createDeviceId = QUuid::createUuid().toString().remove(QRegExp("[{}]*")); - - // first subscribe to this tmp id topic - m_createDeviceSubscriptionId = subscribe({QString("create/device/%1").arg(m_createDeviceId)}); -} - void AWSConnector::setName() { QVariantMap params; @@ -191,15 +272,18 @@ quint16 AWSConnector::subscribe(const QStringList &topics) { util::Vector> subscription_list; foreach (const QString &topic, topics) { - qCDebug(dcAWSTraffic()) << "topic to subscribe is" << topic << "is valid topic:" << Subscription::IsValidTopicName(topic.toStdString()); - auto subscription = mqtt::Subscription::Create(Utf8String::Create(topic.toStdString()), mqtt::QoS::QOS1, &onSubscriptionReceivedCallback, std::shared_ptr(this)); + qCDebug(dcAWSTraffic()) << "Topic to subscribe is" << topic; + if (!Subscription::IsValidTopicName(topic.toStdString())) { + qCWarning(dcAWS()) << "Trying to subscribe to invalid topic:" << topic; + continue; + } + auto subscription = mqtt::Subscription::Create(Utf8String::Create(topic.toStdString()), mqtt::QoS::QOS1, &onSubscriptionReceivedCallback, m_subscriptionContextData); subscription_list.push_back(subscription); } uint16_t packetId; ResponseCode res = m_client->SubscribeAsync(subscription_list, subscribeCallback, packetId); - qCDebug(dcAWSTraffic()) << "subscribe call queued with status:" << QString::fromStdString(ResponseHelper::ToString(res)) << packetId; - qWarning() << "'''" << s_requestMap.count(); + qCDebug(dcAWSTraffic()) << "Subscribe call queued with status:" << QString::fromStdString(ResponseHelper::ToString(res)) << "Packet ID:" << packetId; s_requestMap.insert(packetId, this); return packetId; } @@ -236,10 +320,14 @@ void AWSConnector::subscribeCallback(uint16_t actionId, ResponseCode rc) if (actionId == connector->m_createDeviceSubscriptionId) { qCDebug(dcAWS()) << "subscribed to create/device/response"; - QVariantMap params; - params.insert("id", connector->m_createDeviceId); - params.insert("UUID", connector->m_clientId); - connector->publish("create/device", params); + // We might get this callback even if we didn't explicitly ask for it as the + // library automatically resubscribes to all the topics upon reconnect. + if (!connector->readRegisteredFlag()) { + QVariantMap params; + params.insert("id", connector->m_createDeviceId); + params.insert("UUID", connector->m_clientId); + connector->publish("create/device", params); + } return; } @@ -255,24 +343,24 @@ ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_nam return ResponseCode::JSON_PARSING_ERROR; } - AWSConnector *connector = dynamic_cast(p_app_handler_data.get()); + AWSConnector *connector = dynamic_cast(p_app_handler_data.get())->c; QString topic = QString::fromStdString(topic_name); if (topic.startsWith("create/device/")) { int statusCode = jsonDoc.toVariant().toMap().value("result").toMap().value("code").toInt(); - if (statusCode != 200) { + switch (statusCode) { + case 201: + qCDebug(dcAWS()) << "Device successfully registered to the cloud server:" << statusCode << jsonDoc.toVariant().toMap().value("result").toMap().value("message").toString(); + connector->staticMetaObject.invokeMethod(connector, "onDeviceRegistered", Qt::QueuedConnection, Q_ARG(bool, true)); + return ResponseCode::SUCCESS; + case 200: + qCDebug(dcAWS()) << "Device already known to the cloud server:" << statusCode << jsonDoc.toVariant().toMap().value("result").toMap().value("message").toString(); + // Ok, we have confirmation that everything went fine and we can proceed, let's remember that to minimize traffic. + connector->staticMetaObject.invokeMethod(connector, "onDeviceRegistered", Qt::QueuedConnection, Q_ARG(bool, false)); + break; + default: qCWarning(dcAWS()) << "Error registering device in the cloud. AWS connetion will not work:" << statusCode << jsonDoc.toVariant().toMap().value("result").toMap().value("message").toString(); return ResponseCode::SUCCESS; } - qCDebug(dcAWS()) << "Device registered in cloud"; - - QStringList subscriptions; - subscriptions.append(QString("%1/pair/response").arg(connector->m_clientId)); - subscriptions.append(QString("%1/device/users/response").arg(connector->m_clientId)); - connector->subscribe(subscriptions); - - connector->retrievePairedDeviceInfo(); - connector->setName(); - } else if (topic == QString("%1/pair/response").arg(connector->m_clientId)) { int statusCode = jsonDoc.toVariant().toMap().value("status").toInt(); int id = jsonDoc.toVariant().toMap().value("id").toInt(); @@ -290,11 +378,7 @@ ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_nam qCDebug(dcAWS()) << "No devices paired yet..."; return ResponseCode::SUCCESS; } - QStringList topics; - foreach (const QVariant &pairing, jsonDoc.toVariant().toMap().value("pairings").toList()) { - topics << QString("eu-west-1:%1/%2/#").arg(pairing.toMap().value("cognitoIdIdentityId").toString()).arg(connector->m_clientId); - } - connector->subscribe(topics); + connector->staticMetaObject.invokeMethod(connector, "inPairingsReceived", Qt::QueuedConnection, Q_ARG(QVariantList, jsonDoc.toVariant().toMap().value("pairings").toList())); } else if (topic == QString("%1/device/name/response").arg(connector->m_clientId)) { qCDebug(dcAWS) << "Set device name in cloud with status:" << jsonDoc.toVariant().toMap().value("status").toInt(); } else if (topic.startsWith("eu-west-1:") && !topic.contains("reply")) { @@ -320,7 +404,19 @@ ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_nam ResponseCode AWSConnector::onDisconnectedCallback(util::String mqtt_client_id, std::shared_ptr p_app_handler_data) { Q_UNUSED(mqtt_client_id) - AWSConnector* connector = dynamic_cast(p_app_handler_data.get()); + AWSConnector* connector = dynamic_cast(p_app_handler_data.get())->c; emit connector->disconnected(); return ResponseCode::SUCCESS; } + +void AWSConnector::storeRegisteredFlag(bool registered) +{ + QSettings settings(GuhSettings::storagePath() + "/cloudstatus.conf", QSettings::IniFormat); + settings.setValue("registered", registered); +} + +bool AWSConnector::readRegisteredFlag() const +{ + QSettings settings(GuhSettings::storagePath() + "/cloudstatus.conf", QSettings::IniFormat); + return settings.value("registered", false).toBool(); +} diff --git a/libguh-core/awsconnector.h b/libguh-core/awsconnector.h index 8c77b31a..b56e3a6d 100644 --- a/libguh-core/awsconnector.h +++ b/libguh-core/awsconnector.h @@ -23,6 +23,7 @@ #include #include +#include #include "MbedTLS/MbedTLSConnection.hpp" #include @@ -53,21 +54,39 @@ signals: void webRtcHandshakeMessageReceived(const QString &transactionId, const QVariantMap &data); private slots: + void doConnect(); void onConnected(); - void onDisconnected(); - void retrievePairedDeviceInfo(); void registerDevice(); + void onDeviceRegistered(bool needsReconnect); + void setupPairing(); + void onPairingsRetrieved(const QVariantList &pairings); void setName(); + void onDisconnected(); private: + class SubscriptionContext: public awsiotsdk::mqtt::SubscriptionHandlerContextData + { + public: + SubscriptionContext(AWSConnector *connector): c(connector) {} + AWSConnector *c; + }; + class DisconnectContext: public awsiotsdk::DisconnectCallbackContextData + { + public: + DisconnectContext(AWSConnector *connector): c(connector) {} + AWSConnector *c; + }; quint16 publish(const QString &topic, const QVariantMap &message); quint16 subscribe(const QStringList &topics); static void publishCallback(uint16_t actionId, awsiotsdk::ResponseCode rc); static void subscribeCallback(uint16_t actionId, awsiotsdk::ResponseCode rc); static awsiotsdk::ResponseCode onSubscriptionReceivedCallback(awsiotsdk::util::String topic_name, awsiotsdk::util::String payload, - std::shared_ptr p_app_handler_data); + std::shared_ptr p_app_handler_data); static awsiotsdk::ResponseCode onDisconnectedCallback(awsiotsdk::util::String mqtt_client_id, - std::shared_ptr p_app_handler_data); + std::shared_ptr p_app_handler_data); + + void storeRegisteredFlag(bool registered); + bool readRegisteredFlag() const; private: std::shared_ptr m_networkConnection; @@ -81,12 +100,20 @@ private: QString m_clientId; QString m_clientName; QFuture m_connectingFuture; + bool m_isCleanSession = true; int m_transactionId = 0; QString m_createDeviceId; int m_createDeviceSubscriptionId = 0; QHash m_pairingRequests; + bool m_setupInProgress = false; + int m_reconnectCounter = 0; + QDateTime m_lastConnectionDrop; + std::shared_ptr m_subscriptionContextData; + std::shared_ptr m_disconnectContextData; + + static AWSConnector* s_instance; static QHash s_requestMap; }; diff --git a/libguh-core/cloudmanager.cpp b/libguh-core/cloudmanager.cpp index 241bdba1..e58150e7 100644 --- a/libguh-core/cloudmanager.cpp +++ b/libguh-core/cloudmanager.cpp @@ -36,6 +36,11 @@ CloudManager::CloudManager(NetworkManager *networkManager, QObject *parent) : QO connect(m_networkManager, &NetworkManager::stateChanged, this, &CloudManager::onlineStateChanged); } +CloudManager::~CloudManager() +{ + qCDebug(dcApplication) << "Shutting down \"CloudManager\""; +} + void CloudManager::setServerUrl(const QString &serverUrl) { m_serverUrl = serverUrl; diff --git a/libguh-core/cloudmanager.h b/libguh-core/cloudmanager.h index 04cea56e..a369e186 100644 --- a/libguh-core/cloudmanager.h +++ b/libguh-core/cloudmanager.h @@ -36,6 +36,7 @@ class CloudManager : public QObject Q_OBJECT public: explicit CloudManager(NetworkManager *networkManager, QObject *parent = nullptr); + ~CloudManager(); void setServerUrl(const QString &serverUrl); void setDeviceId(const QUuid &deviceId);