implement dynamic TURN credential provisoning to Janus through AWS

This commit is contained in:
Michael Zanetti 2018-03-12 21:14:35 +01:00
parent e712d7bb01
commit 38d1b69e12
5 changed files with 97 additions and 11 deletions

View File

@ -165,6 +165,7 @@ void AWSConnector::setupSubscriptions()
subscriptions.append(QString("%1/pair/response").arg(m_clientId));
subscriptions.append(QString("%1/notify/response").arg(m_clientId));
subscriptions.append(QString("%1/notify/info/endpoint").arg(m_clientId));
subscriptions.append(QString("%1/services/turn/response").arg(m_clientId));
subscribe(subscriptions);
// fetch previous pairings
@ -276,6 +277,21 @@ int AWSConnector::sendPushNotification(const QString &userId, const QString &end
return m_transactionId;
}
void AWSConnector::requestTURNCredentials()
{
if (!isConnected()) {
qCWarning(dcAWS()) << "Not connected. Cannot request TURN credentials.";
emit turnCredentialsReceived(QVariantMap());
return;
}
qCDebug(dcAWS()) << "Requesting TURN credentials";
QVariantMap params;
params.insert("id", QUuid::createUuid());
params.insert("command", "getTurnCredentials");
params.insert("timestamp", QDateTime::currentMSecsSinceEpoch());
publish(QString("%1/services/turn").arg(m_clientId), params);
}
quint16 AWSConnector::publish(const QString &topic, const QVariantMap &message)
{
if (!m_setupInProgress && !isConnected()) {
@ -392,12 +408,12 @@ void AWSConnector::subscribeCallback(uint16_t actionId, ResponseCode rc)
AWSConnector *connector = s_requestMap.take(actionId);
if (!connector) {
qCWarning(dcAWS()) << "received a subscribe callback but don't have a request id for it.";
qCWarning(dcAWS()) << "Received a subscribe callback but don't have a request id for it.";
return;
}
if (actionId == connector->m_createDeviceSubscriptionId) {
qCDebug(dcAWS()) << "subscribed to create/device/response";
qCDebug(dcAWS()) << "Subscribed to create/device/response";
// We might get this callback even if we didn't explicitly ask for it as the
// library automatically resubscribes to all the topics upon reconnect.
if (!connector->readRegisteredFlag()) {
@ -473,7 +489,7 @@ ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_nam
}
dupes.append(id+type);
qCDebug(dcAWS) << "received webrtc handshake message" << topic << jsonDoc.toJson();
qCDebug(dcAWS) << "received webrtc handshake message.";
connector->webRtcHandshakeMessageReceived(topic, jsonDoc.toVariant().toMap());
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(connector->m_clientId)) && topic.contains("reply")) {
// silently drop our own things (should not be subscribed to that in the first place)
@ -491,8 +507,16 @@ ResponseCode AWSConnector::onSubscriptionReceivedCallback(util::String topic_nam
ep.endpointId = endpoint.value(cognitoId).toMap().value("endpointId").toString();
ep.displayName = endpoint.value(cognitoId).toMap().value("displayName").toString();
emit connector->pushNotificationEndpointAdded(ep);
} else if (topic == QString("%1/services/turn/response").arg(connector->m_clientId)) {
QVariantMap turnCreds = jsonDoc.toVariant().toMap();
if (turnCreds.value("result").toMap().value("code").toInt() != 201) {
qCWarning(dcAWS()) << "Error retrieving TURN credentials:" << turnCreds.value("result").toMap().value("code").toInt() << turnCreds.value("result").toMap().value("message").toString();
return ResponseCode::SUCCESS;
}
qCDebug(dcAWS()) << "Dynamic TURN credentials received";
emit connector->turnCredentialsReceived(turnCreds.value("turnCredentials").toMap());
} else {
qCWarning(dcAWS) << "Unhandled subscription received!" << topic << QString::fromStdString(payload);
qCWarning(dcAWS()) << "Unhandled subscription received!" << topic << QString::fromStdString(payload);
}
return ResponseCode::SUCCESS;
}

View File

@ -57,6 +57,7 @@ public:
public slots:
int sendPushNotification(const QString &userId, const QString &endpointId, const QString &title, const QString &text);
void requestTURNCredentials();
signals:
void connected();
@ -66,6 +67,7 @@ signals:
void pushNotificationEndpointsUpdated(const QList<AWSConnector::PushNotificationsEndpoint> pushNotificationEndpoints);
void pushNotificationEndpointAdded(const AWSConnector::PushNotificationsEndpoint &pushNotificationEndpoint);
void pushNotificationSent(int id, int status);
void turnCredentialsReceived(const QVariantMap &turnCredentials);
private slots:
void doConnect();

View File

@ -37,6 +37,9 @@ CloudManager::CloudManager(NetworkManager *networkManager, QObject *parent) : QO
m_janusConnector = new JanusConnector(this);
connect(m_janusConnector, &JanusConnector::webRtcHandshakeMessageReceived, this, &CloudManager::onJanusWebRtcHandshakeMessageReceived);
connect(m_janusConnector, &JanusConnector::requestTURNCredentials, m_awsConnector, &AWSConnector::requestTURNCredentials);
connect(m_awsConnector, &AWSConnector::turnCredentialsReceived, m_janusConnector, &JanusConnector::setTurnCredentials);
connect(m_networkManager, &NetworkManager::stateChanged, this, &CloudManager::onlineStateChanged);
}

View File

@ -26,6 +26,7 @@
#include <QJsonDocument>
#include <QUuid>
#include <QTcpSocket>
JanusConnector::JanusConnector(QObject *parent) : QObject(parent)
{
@ -39,6 +40,14 @@ JanusConnector::JanusConnector(QObject *parent) : QObject(parent)
// So let's use a rather short heartbeat to send ping messages and clean things up in case they are not acked.
m_pingTimer.setInterval(1000);
connect(&m_pingTimer, &QTimer::timeout, this, &JanusConnector::heartbeat);
m_turnCredentialsServer = new QTcpServer(this);
connect(m_turnCredentialsServer, &QTcpServer::newConnection, this, &JanusConnector::newTurnServerConnection);
if (m_turnCredentialsServer->listen(QHostAddress("127.0.0.1"), 8901)) {
qCDebug(dcJanus()) << "Dynamic TURN credential server opened.";
} else {
qCWarning(dcJanus()) << "Error opening TURN credential server. Dynamic TURN credentials won't work.";
}
}
bool JanusConnector::connectToJanus()
@ -60,6 +69,7 @@ bool JanusConnector::connectToJanus()
return false;
}
qCDebug(dcJanus()) << "Connected to Janus";
m_socket->setSocketDescriptor(sock);
m_pingTimer.start();
@ -162,6 +172,27 @@ bool JanusConnector::sendKeepAliveMessage(const QString &sessionId)
return true;
}
void JanusConnector::setTurnCredentials(const QVariantMap &turnCredentials)
{
while (!m_pendingTurnCredentialRequests.isEmpty()) {
QJsonDocument jsonDoc = QJsonDocument::fromVariant(turnCredentials);
QByteArray content = jsonDoc.toJson(QJsonDocument::Compact);
qCDebug(dcJanus()) << "Providing TURN credentials to Janus.";
QTcpSocket* socket = m_pendingTurnCredentialRequests.takeFirst();
QByteArray reply = QByteArray("HTTP/1.1 200 Ok\r\n");
reply.append("Content-Type: application/json\r\n");
reply.append("Server: nymea\r\n");
reply.append("Content-Length: " + QString::number(content.length()) + "\r\n");
reply.append("\r\n");
reply.append(content);
reply.append("\r");
qCDebug(dcJanusTraffic()) << qUtf8Printable(reply);
socket->write(reply);
socket->flush();
socket->deleteLater();
}
}
void JanusConnector::processQueue()
{
if (!m_socket->isOpen()) {
@ -185,7 +216,7 @@ void JanusConnector::processQueue()
m_pendingRequests.insert(session->offer.value("id").toString(), session);
QJsonDocument jsonDoc = QJsonDocument::fromVariant(janusMessage);
qCDebug(dcJanus()) << "Sending offer message to session" << session << jsonDoc.toJson();
qCDebug(dcJanus()) << "Sending offer message to session" << session;
writeToJanus(jsonDoc.toJson());
session->offerSent = true;
return;
@ -202,14 +233,32 @@ void JanusConnector::processQueue()
m_pendingRequests.insert(input.value("id").toString(), session);
QJsonDocument jsonDoc = QJsonDocument::fromVariant(janusMessage);
qCDebug(dcJanus()) << "sending trickle message" << jsonDoc.toJson();
m_socket->write(jsonDoc.toJson());
qCDebug(dcJanus()) << "Sending trickle message";
writeToJanus(jsonDoc.toJson());
return;
}
}
}
}
void JanusConnector::newTurnServerConnection()
{
qCDebug(dcJanus) << "New TURN credentials server connection";
QTcpSocket* client = m_turnCredentialsServer->nextPendingConnection();
m_pendingTurnCredentialRequests.append(client);
connect(client, &QTcpSocket::readyRead, this, [client, this](){
QByteArray data = client->readAll();
qCDebug(dcJanusTraffic()) << "Request:" << data;
if (data.startsWith("GET /turn?service=turn")) {
emit requestTURNCredentials();
} else {
m_pendingTurnCredentialRequests.removeAll(client);
client->deleteLater();
}
});
}
void JanusConnector::onDisconnected()
{
qCDebug(dcJanus) << "Disconnected from Janus" << m_socket->isOpen();
@ -223,7 +272,7 @@ void JanusConnector::onError(QLocalSocket::LocalSocketError socketError)
void JanusConnector::onReadyRead()
{
QByteArray data = m_socket->readAll();
qCDebug(dcJanusTraffic()) << "incoming data" << data;
qCDebug(dcJanusTraffic()) << "Incoming data" << data;
QJsonParseError error;
QJsonDocument jsonDoc = QJsonDocument::fromJson(data, &error);
if (error.error != QJsonParseError::NoError) {
@ -327,7 +376,7 @@ void JanusConnector::onReadyRead()
session->janusSessionId--;
qCDebug(dcJanus()) << "corrected session id after rounding error";
}
qCDebug(dcJanus()) << "Session" << session << "established";// << data << map;
qCDebug(dcJanus()) << "Session" << session << "established";
createChannel(session);
return;
@ -353,7 +402,7 @@ void JanusConnector::onReadyRead()
session->janusChannelId--;
qCDebug(dcJanus()) << "Corrected channel id after rounding error";
}
qCDebug(dcJanus()) << "Channel for session" << session << "established";// << data << map;
qCDebug(dcJanus()) << "Channel for session" << session << "established";
session->connectedToJanus = true;
processQueue();
return;
@ -363,7 +412,7 @@ void JanusConnector::onReadyRead()
}
if (map.value("janus").toString() == "event" && map.value("jsep").toMap().value("type").toString() == "answer") {
qCDebug(dcJanus()) << "Emitting handshake event" << data;
qCDebug(dcJanus()) << "Emitting handshake event";
QVariantMap reply;
reply.insert("id", transactionId);
reply.insert("type", "answer");

View File

@ -25,6 +25,7 @@
#include <QLocalSocket>
#include <QTimer>
#include <QDateTime>
#include <QTcpServer>
class JanusConnector : public QObject
{
@ -53,10 +54,12 @@ public:
void sendWebRtcHandshakeMessage(const QString &sessionId, const QVariantMap &message);
bool sendKeepAliveMessage(const QString &sessionId);
void setTurnCredentials(const QVariantMap &turnCredentials);
signals:
void connected();
void webRtcHandshakeMessageReceived(const QString &sessionId, const QVariantMap &message);
void requestTURNCredentials();
private slots:
void onDisconnected();
@ -65,6 +68,8 @@ private slots:
void heartbeat();
void processQueue();
void newTurnServerConnection();
private:
QHash<QString, WebRtcSession*> m_pendingRequests;
@ -86,6 +91,9 @@ private:
QHash<QString, WebRtcSession*> m_sessions;
QStringList m_wantedAcks;
QTcpServer *m_turnCredentialsServer = nullptr;
QList<QTcpSocket*> m_pendingTurnCredentialRequests;
};
QDebug operator<<(QDebug debug, const JanusConnector::WebRtcSession &session);