From 38d1b69e122c32515863dc713764185ecfb729ad Mon Sep 17 00:00:00 2001 From: Michael Zanetti Date: Mon, 12 Mar 2018 21:14:35 +0100 Subject: [PATCH] implement dynamic TURN credential provisoning to Janus through AWS --- libnymea-core/awsconnector.cpp | 32 ++++++++++++++-- libnymea-core/awsconnector.h | 2 + libnymea-core/cloudmanager.cpp | 3 ++ libnymea-core/janusconnector.cpp | 63 ++++++++++++++++++++++++++++---- libnymea-core/janusconnector.h | 8 ++++ 5 files changed, 97 insertions(+), 11 deletions(-) diff --git a/libnymea-core/awsconnector.cpp b/libnymea-core/awsconnector.cpp index afd55da5..9dcaca9a 100644 --- a/libnymea-core/awsconnector.cpp +++ b/libnymea-core/awsconnector.cpp @@ -165,6 +165,7 @@ void AWSConnector::setupSubscriptions() subscriptions.append(QString("%1/pair/response").arg(m_clientId)); subscriptions.append(QString("%1/notify/response").arg(m_clientId)); subscriptions.append(QString("%1/notify/info/endpoint").arg(m_clientId)); + subscriptions.append(QString("%1/services/turn/response").arg(m_clientId)); subscribe(subscriptions); // fetch previous pairings @@ -276,6 +277,21 @@ int AWSConnector::sendPushNotification(const QString &userId, const QString &end return m_transactionId; } +void AWSConnector::requestTURNCredentials() +{ + if (!isConnected()) { + qCWarning(dcAWS()) << "Not connected. Cannot request TURN credentials."; + emit turnCredentialsReceived(QVariantMap()); + return; + } + qCDebug(dcAWS()) << "Requesting TURN credentials"; + QVariantMap params; + params.insert("id", QUuid::createUuid()); + params.insert("command", "getTurnCredentials"); + params.insert("timestamp", QDateTime::currentMSecsSinceEpoch()); + publish(QString("%1/services/turn").arg(m_clientId), params); +} + quint16 AWSConnector::publish(const QString &topic, const QVariantMap &message) { if (!m_setupInProgress && !isConnected()) { @@ -392,12 +408,12 @@ void AWSConnector::subscribeCallback(uint16_t actionId, ResponseCode rc) AWSConnector *connector = s_requestMap.take(actionId); if (!connector) { - qCWarning(dcAWS()) << "received a subscribe callback but don't have a request id for it."; + qCWarning(dcAWS()) << "Received a subscribe callback but don't have a request id for it."; return; } if (actionId == connector->m_createDeviceSubscriptionId) { - qCDebug(dcAWS()) << "subscribed to create/device/response"; + qCDebug(dcAWS()) << "Subscribed to create/device/response"; // We might get this callback even if we didn't explicitly ask for it as the // library automatically resubscribes to all the topics upon reconnect. if (!connector->readRegisteredFlag()) { @@ -473,7 +489,7 @@ ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_nam } dupes.append(id+type); - qCDebug(dcAWS) << "received webrtc handshake message" << topic << jsonDoc.toJson(); + qCDebug(dcAWS) << "received webrtc handshake message."; connector->webRtcHandshakeMessageReceived(topic, jsonDoc.toVariant().toMap()); } else if (topic.startsWith(QString("%1/eu-west-1:").arg(connector->m_clientId)) && topic.contains("reply")) { // silently drop our own things (should not be subscribed to that in the first place) @@ -491,8 +507,16 @@ ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_nam ep.endpointId = endpoint.value(cognitoId).toMap().value("endpointId").toString(); ep.displayName = endpoint.value(cognitoId).toMap().value("displayName").toString(); emit connector->pushNotificationEndpointAdded(ep); + } else if (topic == QString("%1/services/turn/response").arg(connector->m_clientId)) { + QVariantMap turnCreds = jsonDoc.toVariant().toMap(); + if (turnCreds.value("result").toMap().value("code").toInt() != 201) { + qCWarning(dcAWS()) << "Error retrieving TURN credentials:" << turnCreds.value("result").toMap().value("code").toInt() << turnCreds.value("result").toMap().value("message").toString(); + return ResponseCode::SUCCESS; + } + qCDebug(dcAWS()) << "Dynamic TURN credentials received"; + emit connector->turnCredentialsReceived(turnCreds.value("turnCredentials").toMap()); } else { - qCWarning(dcAWS) << "Unhandled subscription received!" << topic << QString::fromStdString(payload); + qCWarning(dcAWS()) << "Unhandled subscription received!" << topic << QString::fromStdString(payload); } return ResponseCode::SUCCESS; } diff --git a/libnymea-core/awsconnector.h b/libnymea-core/awsconnector.h index cfe478e3..6bda1fcf 100644 --- a/libnymea-core/awsconnector.h +++ b/libnymea-core/awsconnector.h @@ -57,6 +57,7 @@ public: public slots: int sendPushNotification(const QString &userId, const QString &endpointId, const QString &title, const QString &text); + void requestTURNCredentials(); signals: void connected(); @@ -66,6 +67,7 @@ signals: void pushNotificationEndpointsUpdated(const QList pushNotificationEndpoints); void pushNotificationEndpointAdded(const AWSConnector::PushNotificationsEndpoint &pushNotificationEndpoint); void pushNotificationSent(int id, int status); + void turnCredentialsReceived(const QVariantMap &turnCredentials); private slots: void doConnect(); diff --git a/libnymea-core/cloudmanager.cpp b/libnymea-core/cloudmanager.cpp index 117739bd..a1e1fe2d 100644 --- a/libnymea-core/cloudmanager.cpp +++ b/libnymea-core/cloudmanager.cpp @@ -37,6 +37,9 @@ CloudManager::CloudManager(NetworkManager *networkManager, QObject *parent) : QO m_janusConnector = new JanusConnector(this); connect(m_janusConnector, &JanusConnector::webRtcHandshakeMessageReceived, this, &CloudManager::onJanusWebRtcHandshakeMessageReceived); + connect(m_janusConnector, &JanusConnector::requestTURNCredentials, m_awsConnector, &AWSConnector::requestTURNCredentials); + connect(m_awsConnector, &AWSConnector::turnCredentialsReceived, m_janusConnector, &JanusConnector::setTurnCredentials); + connect(m_networkManager, &NetworkManager::stateChanged, this, &CloudManager::onlineStateChanged); } diff --git a/libnymea-core/janusconnector.cpp b/libnymea-core/janusconnector.cpp index 108392ea..8fdecea5 100644 --- a/libnymea-core/janusconnector.cpp +++ b/libnymea-core/janusconnector.cpp @@ -26,6 +26,7 @@ #include #include +#include JanusConnector::JanusConnector(QObject *parent) : QObject(parent) { @@ -39,6 +40,14 @@ JanusConnector::JanusConnector(QObject *parent) : QObject(parent) // So let's use a rather short heartbeat to send ping messages and clean things up in case they are not acked. m_pingTimer.setInterval(1000); connect(&m_pingTimer, &QTimer::timeout, this, &JanusConnector::heartbeat); + + m_turnCredentialsServer = new QTcpServer(this); + connect(m_turnCredentialsServer, &QTcpServer::newConnection, this, &JanusConnector::newTurnServerConnection); + if (m_turnCredentialsServer->listen(QHostAddress("127.0.0.1"), 8901)) { + qCDebug(dcJanus()) << "Dynamic TURN credential server opened."; + } else { + qCWarning(dcJanus()) << "Error opening TURN credential server. Dynamic TURN credentials won't work."; + } } bool JanusConnector::connectToJanus() @@ -60,6 +69,7 @@ bool JanusConnector::connectToJanus() return false; } + qCDebug(dcJanus()) << "Connected to Janus"; m_socket->setSocketDescriptor(sock); m_pingTimer.start(); @@ -162,6 +172,27 @@ bool JanusConnector::sendKeepAliveMessage(const QString &sessionId) return true; } +void JanusConnector::setTurnCredentials(const QVariantMap &turnCredentials) +{ + while (!m_pendingTurnCredentialRequests.isEmpty()) { + QJsonDocument jsonDoc = QJsonDocument::fromVariant(turnCredentials); + QByteArray content = jsonDoc.toJson(QJsonDocument::Compact); + qCDebug(dcJanus()) << "Providing TURN credentials to Janus."; + QTcpSocket* socket = m_pendingTurnCredentialRequests.takeFirst(); + QByteArray reply = QByteArray("HTTP/1.1 200 Ok\r\n"); + reply.append("Content-Type: application/json\r\n"); + reply.append("Server: nymea\r\n"); + reply.append("Content-Length: " + QString::number(content.length()) + "\r\n"); + reply.append("\r\n"); + reply.append(content); + reply.append("\r"); + qCDebug(dcJanusTraffic()) << qUtf8Printable(reply); + socket->write(reply); + socket->flush(); + socket->deleteLater(); + } +} + void JanusConnector::processQueue() { if (!m_socket->isOpen()) { @@ -185,7 +216,7 @@ void JanusConnector::processQueue() m_pendingRequests.insert(session->offer.value("id").toString(), session); QJsonDocument jsonDoc = QJsonDocument::fromVariant(janusMessage); - qCDebug(dcJanus()) << "Sending offer message to session" << session << jsonDoc.toJson(); + qCDebug(dcJanus()) << "Sending offer message to session" << session; writeToJanus(jsonDoc.toJson()); session->offerSent = true; return; @@ -202,14 +233,32 @@ void JanusConnector::processQueue() m_pendingRequests.insert(input.value("id").toString(), session); QJsonDocument jsonDoc = QJsonDocument::fromVariant(janusMessage); - qCDebug(dcJanus()) << "sending trickle message" << jsonDoc.toJson(); - m_socket->write(jsonDoc.toJson()); + qCDebug(dcJanus()) << "Sending trickle message"; + writeToJanus(jsonDoc.toJson()); return; } } } } +void JanusConnector::newTurnServerConnection() +{ + qCDebug(dcJanus) << "New TURN credentials server connection"; + QTcpSocket* client = m_turnCredentialsServer->nextPendingConnection(); + m_pendingTurnCredentialRequests.append(client); + connect(client, &QTcpSocket::readyRead, this, [client, this](){ + QByteArray data = client->readAll(); + qCDebug(dcJanusTraffic()) << "Request:" << data; + if (data.startsWith("GET /turn?service=turn")) { + emit requestTURNCredentials(); + } else { + m_pendingTurnCredentialRequests.removeAll(client); + client->deleteLater(); + } + }); + +} + void JanusConnector::onDisconnected() { qCDebug(dcJanus) << "Disconnected from Janus" << m_socket->isOpen(); @@ -223,7 +272,7 @@ void JanusConnector::onError(QLocalSocket::LocalSocketError socketError) void JanusConnector::onReadyRead() { QByteArray data = m_socket->readAll(); - qCDebug(dcJanusTraffic()) << "incoming data" << data; + qCDebug(dcJanusTraffic()) << "Incoming data" << data; QJsonParseError error; QJsonDocument jsonDoc = QJsonDocument::fromJson(data, &error); if (error.error != QJsonParseError::NoError) { @@ -327,7 +376,7 @@ void JanusConnector::onReadyRead() session->janusSessionId--; qCDebug(dcJanus()) << "corrected session id after rounding error"; } - qCDebug(dcJanus()) << "Session" << session << "established";// << data << map; + qCDebug(dcJanus()) << "Session" << session << "established"; createChannel(session); return; @@ -353,7 +402,7 @@ void JanusConnector::onReadyRead() session->janusChannelId--; qCDebug(dcJanus()) << "Corrected channel id after rounding error"; } - qCDebug(dcJanus()) << "Channel for session" << session << "established";// << data << map; + qCDebug(dcJanus()) << "Channel for session" << session << "established"; session->connectedToJanus = true; processQueue(); return; @@ -363,7 +412,7 @@ void JanusConnector::onReadyRead() } if (map.value("janus").toString() == "event" && map.value("jsep").toMap().value("type").toString() == "answer") { - qCDebug(dcJanus()) << "Emitting handshake event" << data; + qCDebug(dcJanus()) << "Emitting handshake event"; QVariantMap reply; reply.insert("id", transactionId); reply.insert("type", "answer"); diff --git a/libnymea-core/janusconnector.h b/libnymea-core/janusconnector.h index 950198d5..26f65d44 100644 --- a/libnymea-core/janusconnector.h +++ b/libnymea-core/janusconnector.h @@ -25,6 +25,7 @@ #include #include #include +#include class JanusConnector : public QObject { @@ -53,10 +54,12 @@ public: void sendWebRtcHandshakeMessage(const QString &sessionId, const QVariantMap &message); bool sendKeepAliveMessage(const QString &sessionId); + void setTurnCredentials(const QVariantMap &turnCredentials); signals: void connected(); void webRtcHandshakeMessageReceived(const QString &sessionId, const QVariantMap &message); + void requestTURNCredentials(); private slots: void onDisconnected(); @@ -65,6 +68,8 @@ private slots: void heartbeat(); void processQueue(); + void newTurnServerConnection(); + private: QHash m_pendingRequests; @@ -86,6 +91,9 @@ private: QHash m_sessions; QStringList m_wantedAcks; + + QTcpServer *m_turnCredentialsServer = nullptr; + QList m_pendingTurnCredentialRequests; }; QDebug operator<<(QDebug debug, const JanusConnector::WebRtcSession &session);