cleanups and dedupe mqtt packets
This commit is contained in:
parent
a69b32a572
commit
637a30700b
@ -499,22 +499,39 @@ ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_nam
|
||||
connector->storeSyncedNameCache(connector->m_clientName);
|
||||
}
|
||||
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(connector->m_clientId)) && !topic.contains("reply") && !topic.contains("proxy")) {
|
||||
static QStringList dupes;
|
||||
static QHash<QString, QDateTime> dupes;
|
||||
QString id = jsonDoc.toVariant().toMap().value("id").toString();
|
||||
QString type = jsonDoc.toVariant().toMap().value("type").toString();
|
||||
if (dupes.contains(id+type)) {
|
||||
qCDebug(dcAWS()) << "Dropping duplicate packet";
|
||||
return ResponseCode::SUCCESS;
|
||||
}
|
||||
dupes.append(id+type);
|
||||
|
||||
dupes.insert(id+type, QDateTime::currentDateTime());
|
||||
foreach (const QString &dupe, dupes.keys()) {
|
||||
if (dupes.value(dupe).addSecs(60) < QDateTime::currentDateTime()) {
|
||||
dupes.remove(dupe);
|
||||
}
|
||||
}
|
||||
qCDebug(dcAWS) << "received webrtc handshake message.";
|
||||
connector->webRtcHandshakeMessageReceived(topic, jsonDoc.toVariant().toMap());
|
||||
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(connector->m_clientId)) && topic.contains("reply")) {
|
||||
// silently drop our own things (should not be subscribed to that in the first place)
|
||||
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(connector->m_clientId)) && topic.contains("proxy")) {
|
||||
qCDebug(dcAWS) << "Proxy remote connection request received";
|
||||
QString token = jsonDoc.toVariant().toMap().value("token").toString();
|
||||
qlonglong timestamp = jsonDoc.toVariant().toMap().value("timestamp").toLongLong();
|
||||
static QHash<QString, QDateTime> dupes;
|
||||
QString packetId = topic + token + QString::number(timestamp);
|
||||
if (dupes.contains(packetId)) {
|
||||
qCDebug(dcAWS()) << "Dropping duplicate packet";
|
||||
return ResponseCode::SUCCESS;
|
||||
}
|
||||
dupes.insert(packetId, QDateTime::currentDateTime());
|
||||
foreach (const QString &dupe, dupes.keys()) {
|
||||
if (dupes.value(dupe).addSecs(60) < QDateTime::currentDateTime()) {
|
||||
dupes.remove(dupe);
|
||||
}
|
||||
}
|
||||
qCDebug(dcAWS) << "Proxy remote connection request received";
|
||||
connector->staticMetaObject.invokeMethod(connector, "proxyConnectionRequestReceived", Qt::QueuedConnection, Q_ARG(QString, token));
|
||||
} else if (topic == QString("%1/notify/response").arg(connector->m_clientId)) {
|
||||
int transactionId = jsonDoc.toVariant().toMap().value("id").toInt();
|
||||
|
||||
@ -52,7 +52,11 @@ CloudManager::CloudManager(NymeaConfiguration *configuration, NetworkManager *ne
|
||||
|
||||
connect(m_networkManager, &NetworkManager::stateChanged, this, &CloudManager::onlineStateChanged);
|
||||
|
||||
m_transport = new CloudTransport(ServerConfiguration());
|
||||
ServerConfiguration config;
|
||||
config.id = "remote";
|
||||
config.authenticationEnabled = false;
|
||||
config.sslEnabled = true;
|
||||
m_transport = new CloudTransport(config);
|
||||
connect(m_awsConnector, &AWSConnector::proxyConnectionRequestReceived, m_transport, &CloudTransport::connectToCloud);
|
||||
|
||||
m_deviceId = m_configuration->serverUuid();
|
||||
|
||||
@ -55,31 +55,25 @@ void CloudTransport::sendData(const QList<QUuid> &clientIds, const QByteArray &d
|
||||
|
||||
bool CloudTransport::startServer()
|
||||
{
|
||||
qCDebug(dcCloud()) << "Start cloud server";
|
||||
qCDebug(dcCloud()) << "Started cloud transport";
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CloudTransport::stopServer()
|
||||
{
|
||||
qCDebug(dcCloud()) << "Stop cloud server";
|
||||
qCDebug(dcCloud()) << "Stopped cloud transport";
|
||||
return true;
|
||||
}
|
||||
|
||||
void CloudTransport::connectToCloud(const QString &token)
|
||||
{
|
||||
qCDebug(dcCloud()) << "Start connecting to remote proxy server" << m_proxyUrl.toString();
|
||||
|
||||
foreach (const ConnectionContext &connectionContext, m_connections.values()) {
|
||||
if (connectionContext.token == token) {
|
||||
qCWarning(dcCloud()) << "There is already a remote connection for this token. This is not allowed.";
|
||||
return;
|
||||
}
|
||||
}
|
||||
qCDebug(dcCloud()) << "Connecting to remote proxy server" << m_proxyUrl.toString();
|
||||
|
||||
ConnectionContext context;
|
||||
context.clientId = QUuid::createUuid();
|
||||
context.token = token;
|
||||
context.proxyConnection = new RemoteProxyConnection(NymeaCore::instance()->configuration()->serverUuid().toString(), NymeaCore::instance()->configuration()->serverName(), this);
|
||||
QString identifier = QString("nymea:core (%1)").arg(NymeaCore::instance()->configuration()->serverName());
|
||||
context.proxyConnection = new RemoteProxyConnection(NymeaCore::instance()->configuration()->serverUuid().toString(), identifier, this);
|
||||
m_connections.insert(context.proxyConnection, context);
|
||||
|
||||
connect(context.proxyConnection, &RemoteProxyConnection::ready, this, &CloudTransport::transportReady);
|
||||
|
||||
@ -472,7 +472,7 @@ QVariantMap JsonRPCServer::createWelcomeMessage(TransportInterface *interface) c
|
||||
handshake.insert("language", NymeaCore::instance()->configuration()->locale().name());
|
||||
handshake.insert("protocol version", JSON_PROTOCOL_VERSION);
|
||||
handshake.insert("initialSetupRequired", (interface->configuration().authenticationEnabled ? NymeaCore::instance()->userManager()->initRequired() : false));
|
||||
handshake.insert("authenticationRequired", m_interfaces.value(interface));
|
||||
handshake.insert("authenticationRequired", interface->configuration().authenticationEnabled);
|
||||
handshake.insert("pushButtonAuthAvailable", NymeaCore::instance()->userManager()->pushButtonAuthAvailable());
|
||||
return handshake;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user