Drop Janus and WebRTC support

pull/135/head
Michael Zanetti 2019-01-07 21:33:10 +01:00
parent a44b82c497
commit 948eff8319
12 changed files with 6 additions and 667 deletions

4
debian/man/nymead.1 vendored
View File

@ -61,10 +61,6 @@ Print the debug messages from the TCP connections in nymea.
Print the debug messages from the device manager.
.IP \fIHardware\fR\ (default\ disabled)
Print the debug messages from hardware resources in nymea.
.IP \fIJanus\fR\ (default\ disabled)
Print the debug messages from the janus connection.
.IP \fIJanusTraffic\fR\ (default\ disabled)
Print the debug messages from the janus connection traffic.
.IP \fIJsonRpc\fR\ (default\ disabled)
Print the debug messages from JSON-RPC API.
.IP \fIJsonRpcTraffic\fR\ (default\ disabled)

View File

@ -266,11 +266,6 @@ void AWSConnector::pairDevice(const QString &idToken, const QString &userId)
m_pairingRequests.insert(m_transactionId, userId);
}
void AWSConnector::sendWebRtcHandshakeMessage(const QString &sessionId, const QVariantMap &map)
{
publish(sessionId + "/reply", map);
}
int AWSConnector::sendPushNotification(const QString &userId, const QString &endpointId, const QString &title, const QString &text)
{
QVariantMap params;
@ -468,24 +463,6 @@ void AWSConnector::onPublishReceived(const QString &topic, const QByteArray &pay
if (jsonDoc.toVariant().toMap().value("status").toInt() == 200) {
storeSyncedNameCache(m_clientName);
}
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(m_clientId)) && !topic.contains("reply") && !topic.contains("proxy")) {
static QHash<QString, QDateTime> dupes;
QString id = jsonDoc.toVariant().toMap().value("id").toString();
QString type = jsonDoc.toVariant().toMap().value("type").toString();
if (dupes.contains(id+type)) {
qCDebug(dcAWS()) << "Dropping duplicate packet";
return;
}
dupes.insert(id+type, QDateTime::currentDateTime());
foreach (const QString &dupe, dupes.keys()) {
if (dupes.value(dupe).addSecs(60) < QDateTime::currentDateTime()) {
dupes.remove(dupe);
}
}
qCDebug(dcAWS) << "received webrtc handshake message.";
webRtcHandshakeMessageReceived(topic, jsonDoc.toVariant().toMap());
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(m_clientId)) && topic.contains("reply")) {
// silently drop our own things (should not be subscribed to that in the first place)
} else if (topic.startsWith(QString("%1/eu-west-1:").arg(m_clientId)) && topic.contains("proxy")) {
QString token = jsonDoc.toVariant().toMap().value("token").toString();
QString timestamp = jsonDoc.toVariant().toMap().value("timestamp").toString();

View File

@ -49,8 +49,6 @@ public:
void setDeviceName(const QString &deviceName);
void pairDevice(const QString &idToken, const QString &userId);
void sendWebRtcHandshakeMessage(const QString &sessionId, const QVariantMap &map);
public slots:
int sendPushNotification(const QString &userId, const QString &endpointId, const QString &title, const QString &text);
void requestTURNCredentials();
@ -59,7 +57,6 @@ signals:
void connected();
void disconnected();
void devicePaired(const QString &cognritoUserId, int errorCode, const QString &message);
void webRtcHandshakeMessageReceived(const QString &transactionId, const QVariantMap &data);
void pushNotificationEndpointsUpdated(const QList<AWSConnector::PushNotificationsEndpoint> pushNotificationEndpoints);
void pushNotificationEndpointAdded(const AWSConnector::PushNotificationsEndpoint &pushNotificationEndpoint);
void pushNotificationSent(int id, int status);

View File

@ -20,7 +20,6 @@
#include "cloudmanager.h"
#include "awsconnector.h"
#include "janusconnector.h"
#include "loggingcategories.h"
#include "cloudnotifications.h"
#include "nymeaconfiguration.h"
@ -40,16 +39,9 @@ CloudManager::CloudManager(NymeaConfiguration *configuration, NetworkManager *ne
{
m_awsConnector = new AWSConnector(this);
connect(m_awsConnector, &AWSConnector::devicePaired, this, &CloudManager::onPairingFinished);
connect(m_awsConnector, &AWSConnector::webRtcHandshakeMessageReceived, this, &CloudManager::onAWSWebRtcHandshakeMessageReceived);
connect(m_awsConnector, &AWSConnector::connected, this, &CloudManager::awsConnected);
connect(m_awsConnector, &AWSConnector::disconnected, this, &CloudManager::awsDisconnected);
m_janusConnector = new JanusConnector(this);
connect(m_janusConnector, &JanusConnector::webRtcHandshakeMessageReceived, this, &CloudManager::onJanusWebRtcHandshakeMessageReceived);
connect(m_janusConnector, &JanusConnector::requestTURNCredentials, m_awsConnector, &AWSConnector::requestTURNCredentials);
connect(m_awsConnector, &AWSConnector::turnCredentialsReceived, m_janusConnector, &JanusConnector::setTurnCredentials);
connect(m_networkManager, &NetworkManager::stateChanged, this, &CloudManager::onlineStateChanged);
ServerConfiguration config;
@ -215,11 +207,6 @@ void CloudManager::pairDevice(const QString &idToken, const QString &userId)
m_awsConnector->pairDevice(idToken, userId);
}
bool CloudManager::keepAlive(const QString &sessionId)
{
return m_janusConnector->sendKeepAliveMessage(sessionId);
}
CloudNotifications *CloudManager::createNotificationsPlugin() const
{
CloudNotifications* notifications = new CloudNotifications(m_awsConnector);
@ -256,16 +243,6 @@ void CloudManager::onPairingFinished(const QString &cognitoUserId, int errorCode
emit pairingReply(cognitoUserId, errorCode, message);
}
void CloudManager::onAWSWebRtcHandshakeMessageReceived(const QString &transactionId, const QVariantMap &data)
{
m_janusConnector->sendWebRtcHandshakeMessage(transactionId, data);
}
void CloudManager::onJanusWebRtcHandshakeMessageReceived(const QString &transactionId, const QVariantMap &data)
{
m_awsConnector->sendWebRtcHandshakeMessage(transactionId, data);
}
void CloudManager::awsConnected()
{
emit connectionStateChanged();

View File

@ -28,7 +28,6 @@
#include "networkmanager/networkmanager.h"
class JanusConnector;
class AWSConnector;
class CloudNotifications;
namespace remoteproxyclient {
@ -63,8 +62,6 @@ public:
void pairDevice(const QString &idToken, const QString &userId);
bool keepAlive(const QString &sessionId);
CloudNotifications* createNotificationsPlugin() const;
CloudTransport* createTransportInterface() const;
@ -79,8 +76,6 @@ private:
private slots:
void onlineStateChanged();
void onPairingFinished(const QString &cognitoUserId, int errorCode, const QString &message);
void onAWSWebRtcHandshakeMessageReceived(const QString &transactionId, const QVariantMap &data);
void onJanusWebRtcHandshakeMessageReceived(const QString &transactionId, const QVariantMap &data);
void awsConnected();
void awsDisconnected();
void setDeviceName(const QString &name);
@ -89,7 +84,6 @@ private:
QTimer m_reconnectTimer;
bool m_enabled = false;
AWSConnector *m_awsConnector = nullptr;
JanusConnector *m_janusConnector = nullptr;
NymeaConfiguration *m_configuration = nullptr;
NetworkManager *m_networkManager = nullptr;
CloudTransport *m_transport = nullptr;

View File

@ -1,495 +0,0 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* *
* Copyright (C) 2017 Michael Zanetti <michael.zanetti@guh.io> *
* *
* This file is part of nymea. *
* *
* nymea is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, version 2 of the License. *
* *
* nymea is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with nymea. If not, see <http://www.gnu.org/licenses/>. *
* *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include <sys/socket.h>
#include <sys/un.h>
#include "janusconnector.h"
#include "loggingcategories.h"
#include <QJsonDocument>
#include <QUuid>
#include <QTcpSocket>
JanusConnector::JanusConnector(QObject *parent) : QObject(parent)
{
m_socket = new QLocalSocket(this);
typedef void (QLocalSocket:: *errorSignal)(QLocalSocket::LocalSocketError);
connect(m_socket, static_cast<errorSignal>(&QLocalSocket::error), this, &JanusConnector::onError);
connect(m_socket, &QLocalSocket::disconnected, this, &JanusConnector::onDisconnected);
connect(m_socket, &QLocalSocket::readyRead, this, &JanusConnector::onReadyRead);
// When Janus crashes it will leave the socket in a very broken state which causes QLocalSocket to spin the CPU
// So let's use a rather short heartbeat to send ping messages and clean things up in case they are not acked.
m_pingTimer.setInterval(5000);
connect(&m_pingTimer, &QTimer::timeout, this, &JanusConnector::heartbeat);
m_turnCredentialsServer = new QTcpServer(this);
connect(m_turnCredentialsServer, &QTcpServer::newConnection, this, &JanusConnector::newTurnServerConnection);
if (m_turnCredentialsServer->listen(QHostAddress("127.0.0.1"), 8901)) {
qCDebug(dcJanus()) << "Dynamic TURN credential server opened.";
} else {
qCWarning(dcJanus()) << "Error opening TURN credential server. Dynamic TURN credentials won't work.";
}
}
bool JanusConnector::connectToJanus()
{
int sock = socket (PF_UNIX, SOCK_SEQPACKET, 0);
if (sock < 0) {
qCWarning(dcJanus) << "Failed to create socket";
return false;
}
struct sockaddr_un name;
name.sun_family = AF_UNIX;
strncpy (name.sun_path, "/tmp/janusapi", 13);
name.sun_path[13] = '\0';
int ret = ::connect(sock, (const sockaddr*)&name, sizeof(name));
if (ret < 0) {
qCWarning(dcJanus()) << "Error connecting to socket";
return false;
}
qCDebug(dcJanus()) << "Connected to Janus";
m_socket->setSocketDescriptor(sock);
m_pingTimer.start();
return true;
}
void JanusConnector::disconnectFromJanus()
{
m_socket->close();
m_pingTimer.stop();
m_lastUnconfirmedPing = QDateTime();
}
void JanusConnector::createSession(JanusConnector::WebRtcSession *session)
{
// Open the session
qCDebug(dcJanus()) << "Creating new janus session:" << session;
QString transactionId = QUuid::createUuid().toString();
QVariantMap map;
map.insert("transaction", transactionId);
map.insert("janus", "create");
QJsonDocument jsonDoc = QJsonDocument::fromVariant(map);
m_pendingRequests.insert(transactionId, session);
writeToJanus(jsonDoc.toJson());
}
void JanusConnector::sendWebRtcHandshakeMessage(const QString &sessionId, const QVariantMap &message)
{
if (!m_socket->isOpen()) {
if (!connectToJanus()) {
qCWarning(dcJanus()) << "Failed to establish a connection to Janus. Cannot send WebRtcHandshake.";
return;
}
}
QString messageType = message.value("type").toString();
QString transactionId = message.value("id").toString();
// Do we have a session for this transaction?
if (!m_sessions.contains(sessionId)) {
// create a new session and queue the actual offer call
WebRtcSession *session = new WebRtcSession();
session->sessionId = sessionId;
if (messageType == "offer") {
session->offer = message;
session->offerSent = false;
} else if (messageType == "trickle") {
session->trickles.append(message);
} else {
qCWarning(dcJanus()) << "Unhandled webrtc handshake message type" << messageType << message;
}
m_sessions.insert(sessionId, session);
if (messageType == "offer") {
createSession(session);
}
return;
}
WebRtcSession *session = m_sessions.value(sessionId);
if (messageType == "offer") {
session->offer = message;
session->offerSent = false;
createSession(session);
} else if (messageType == "trickle") {
m_sessions.value(sessionId)->trickles.append(message);
} else if (messageType == "webrtcup") {
// If we got the webrtc up from Janus already, directly reply with an ack
if (session->webRtcConnected) {
QVariantMap ack;
ack.insert("id", message.value("id").toString());
ack.insert("type", "ack");
emit webRtcHandshakeMessageReceived(session->sessionId, ack);
} else {
// otherwise store the request and reply when we get the webrtcup
session->webRtcUp = message;
}
} else if (messageType == "ack") {
// silence ack's we may get from the other end, janus doesn't need them...
} else {
qCWarning(dcJanus()) << "Unhandled message type:" << messageType << message;
}
processQueue();
}
bool JanusConnector::sendKeepAliveMessage(const QString &sessionId)
{
WebRtcSession *session = m_sessions.value(sessionId);
if (!session) {
qCWarning(dcJanus()) << "Received a keepalive message for a session we don't know.";
return false;
}
QVariantMap janusMessage;
janusMessage.insert("janus", "keepalive");
janusMessage.insert("session_id", session->janusSessionId);
janusMessage.insert("handle_id", session->janusChannelId);
janusMessage.insert("transaction", "keepalive");
writeToJanus(QJsonDocument::fromVariant(janusMessage).toJson(QJsonDocument::Compact));
return true;
}
void JanusConnector::setTurnCredentials(const QVariantMap &turnCredentials)
{
while (!m_pendingTurnCredentialRequests.isEmpty()) {
QJsonDocument jsonDoc = QJsonDocument::fromVariant(turnCredentials);
QByteArray content = jsonDoc.toJson(QJsonDocument::Compact);
qCDebug(dcJanus()) << "Providing TURN credentials to Janus." << qUtf8Printable(jsonDoc.toJson(QJsonDocument::Indented));
QTcpSocket* socket = m_pendingTurnCredentialRequests.takeFirst();
QByteArray reply = QByteArray("HTTP/1.1 200 Ok\r\n");
reply.append("Content-Type: application/json\r\n");
reply.append("Server: nymea\r\n");
reply.append("Content-Length: " + QString::number(content.length()) + "\r\n");
reply.append("\r\n");
reply.append(content);
reply.append("\r");
qCDebug(dcJanusTraffic()) << qUtf8Printable(reply);
socket->write(reply);
socket->flush();
socket->deleteLater();
}
}
void JanusConnector::processQueue()
{
if (!m_socket->isOpen()) {
qCWarning(dcJanus()) << "Janus socket not open. Cannot process queue";
return;
}
foreach (WebRtcSession* session, m_sessions) {
if (session->connectedToJanus) {
if (!session->offerSent) {
QVariantMap janusMessage;
janusMessage.insert("janus", "message");
janusMessage.insert("transaction", session->offer.value("id").toString());
janusMessage.insert("session_id", session->janusSessionId);
janusMessage.insert("handle_id", session->janusChannelId);
QVariantMap body;
body.insert("request", "setup");
janusMessage.insert("body", body);
janusMessage.insert("jsep", session->offer.value("jsep"));
m_pendingRequests.insert(session->offer.value("id").toString(), session);
QJsonDocument jsonDoc = QJsonDocument::fromVariant(janusMessage);
qCDebug(dcJanus()) << "Sending offer message to session" << session;
writeToJanus(jsonDoc.toJson());
session->offerSent = true;
return;
}
while (!session->trickles.isEmpty()) {
QVariantMap input = session->trickles.takeFirst().toMap();
QVariantMap janusMessage;
janusMessage.insert("janus", "trickle");
janusMessage.insert("transaction", input.value("id").toString());
janusMessage.insert("session_id", session->janusSessionId);
janusMessage.insert("handle_id", session->janusChannelId);
janusMessage.insert("candidate", input.value("candidate"));
m_pendingRequests.insert(input.value("id").toString(), session);
QJsonDocument jsonDoc = QJsonDocument::fromVariant(janusMessage);
qCDebug(dcJanus()) << "Sending trickle message";
writeToJanus(jsonDoc.toJson());
return;
}
}
}
}
void JanusConnector::newTurnServerConnection()
{
qCDebug(dcJanus) << "New TURN credentials server connection";
QTcpSocket* client = m_turnCredentialsServer->nextPendingConnection();
m_pendingTurnCredentialRequests.append(client);
connect(client, &QTcpSocket::readyRead, this, [client, this](){
QByteArray data = client->readAll();
qCDebug(dcJanusTraffic()) << "Request:" << data;
if (data.startsWith("GET /turn?service=turn")) {
emit requestTURNCredentials();
} else {
m_pendingTurnCredentialRequests.removeAll(client);
client->deleteLater();
}
});
}
void JanusConnector::onDisconnected()
{
qCDebug(dcJanus) << "Disconnected from Janus" << m_socket->isOpen();
}
void JanusConnector::onError(QLocalSocket::LocalSocketError socketError)
{
qCWarning(dcJanus) << "Error in janus connection" << socketError << m_socket->errorString();
}
void JanusConnector::onReadyRead()
{
QByteArray data = m_socket->readAll();
qCDebug(dcJanusTraffic()) << "Incoming data" << data;
QJsonParseError error;
QJsonDocument jsonDoc = QJsonDocument::fromJson(data, &error);
if (error.error != QJsonParseError::NoError) {
qWarning(dcJanus()) << "Cannot parse packet received by Janus:" << error.error << error.errorString();
return;
}
QVariantMap map = jsonDoc.toVariant().toMap();
if (map.value("janus").toString() == "error") {
qCWarning(dcJanus()) << "An error happened in the janus connection:" << map.value("error").toMap().value("reason").toString();
return;
}
if (map.value("janus").toString() == "timeout") {
quint64 sessionId = map.value("session_id").toLongLong();
foreach (WebRtcSession *session, m_sessions) {
if (session->matchJanusSessionId(sessionId)) {
qCDebug(dcJanus()) << "Session" << session << "timed out. Removing session";
m_sessions.remove(session->sessionId);
delete session;
if (m_sessions.isEmpty()) {
disconnectFromJanus();
}
return;
}
}
qCWarning(dcJanus()) << "Received a timeout event but don't have a session for it." << data << map.value("session_id").toLongLong();
return;
}
if (map.value("janus").toString() == "webrtcup") {
quint64 sessionId = map.value("session_id").toLongLong();
foreach (WebRtcSession *session, m_sessions) {
if (session->matchJanusSessionId(sessionId)) {
qCDebug(dcJanus()) << "Session" << session << "is up now!";
session->webRtcConnected = true;
if (!session->webRtcUp.isEmpty()) {
QVariantMap ack;
ack.insert("id", session->webRtcUp.value("id").toString());
ack.insert("type", "ack");
emit webRtcHandshakeMessageReceived(session->sessionId, ack);
}
return;
}
}
qCWarning(dcJanus()) << "Received a webrtcup event but don't have a session for it";
return;
}
if (map.value("janus").toString() == "hangup") {
quint64 sessionId = map.value("session_id").toLongLong();
foreach (WebRtcSession *session, m_sessions) {
if (session->matchJanusSessionId(sessionId)){
qCDebug(dcJanus()) << "Session" << session << "hangup received. Reason:" << map.value("reason").toString();
QVariantMap hangup;
hangup.insert("type", "hangup");
hangup.insert("reason", map.value("reason").toString());
emit webRtcHandshakeMessageReceived(session->sessionId, hangup);
m_sessions.remove(session->sessionId);
delete session;
if (m_sessions.isEmpty()) {
disconnectFromJanus();
}
return;
}
}
qCWarning(dcJanus()) << "Received a hangup message but don't have a session for it";
return;
}
// as of now, everything must be part of a transaction
if (!map.contains("transaction")) {
qCWarning(dcJanus) << "Unhandled message from Janus (missing transaction):" << data;
return;
}
QString transactionId = map.value("transaction").toString();
WebRtcSession *session = m_pendingRequests.value(transactionId);
if (!session) {
if (transactionId == "pingety") {
qCDebug(dcJanus()) << "Received PONG from Janus";
m_lastUnconfirmedPing = QDateTime();
return;
}
if (transactionId == "keepalive") {
qCDebug(dcJanus()) << "Keep alive acked by janus.";
return;
}
qCWarning(dcJanus()) << "received a janus message for a session we don't know...";
return;
}
if (session->janusSessionId == -1) {
// This should be a create session response
if (map.value("janus").toString() == "success") {
session->janusSessionId = map.value("data").toMap().value("id").toLongLong();
// oooohhhkaaay... now, this is gonna be dirty... So, Janus' session id is like a freakin huge number
// so that QVariant stores it in a double instead of a longlong, which could cause rounding errors when converting it
// back to to a long. Let's grep the raw data for the parsed session id and if not found, try to correct it one down.
if (!data.contains(QByteArray::number(session->janusSessionId)) && data.contains(QByteArray::number(session->janusSessionId-1))) {
session->janusSessionId--;
qCDebug(dcJanus()) << "corrected session id after rounding error";
}
qCDebug(dcJanus()) << "Session" << session << "established";
createChannel(session);
return;
}
qCWarning(dcJanus()) << "Error establishing session";
delete m_sessions.take(session->sessionId);
if (m_sessions.isEmpty()) {
disconnectFromJanus();
}
return;
}
qint64 janusSessionId = map.value("session_id").toLongLong();
if (!session->matchJanusSessionId(janusSessionId)) {
qCWarning(dcJanus) << "Transaction id and session id not matching!" << session->janusSessionId << "!=" << map.value("session_id").toLongLong();
return;
}
if (session->janusChannelId == -1) {
if (map.value("janus").toString() == "success") {
session->janusChannelId = map.value("data").toMap().value("id").toLongLong();
if (!data.contains(QByteArray::number(session->janusChannelId)) && data.contains(QByteArray::number(session->janusChannelId-1))) {
session->janusChannelId--;
qCDebug(dcJanus()) << "Corrected channel id after rounding error";
}
qCDebug(dcJanus()) << "Channel for session" << session << "established";
session->connectedToJanus = true;
processQueue();
return;
}
qCWarning(dcJanus()) << "Error establishing channel" << session << data;
return;
}
if (map.value("janus").toString() == "event" && map.value("jsep").toMap().value("type").toString() == "answer") {
qCDebug(dcJanus()) << "Emitting handshake event";
QVariantMap reply;
reply.insert("id", transactionId);
reply.insert("type", "answer");
reply.insert("jsep", map.value("jsep"));
emit webRtcHandshakeMessageReceived(session->sessionId, reply);
return;
}
if (map.value("janus").toString() == "ack") {
QVariantMap ackReply;
ackReply.insert("id", transactionId);
ackReply.insert("type", "ack");
emit webRtcHandshakeMessageReceived(session->sessionId, ackReply);
return;
}
qCWarning(dcJanus()) << "Unhandled Janus message:" << data;
}
void JanusConnector::heartbeat()
{
if (!m_lastUnconfirmedPing.isNull()) {
qCWarning(dcJanus()) << "Last ping not echoed by Janus. Seems the connection broke down. Cleaning up...";
while (!m_sessions.isEmpty()) {
delete m_sessions.take(m_sessions.keys().first());
}
disconnectFromJanus();
return;
}
QVariantMap map;
map.insert("janus", "ping");
map.insert("transaction", "pingety");
QJsonDocument jsonDoc = QJsonDocument::fromVariant(map);
qCDebug(dcJanus()) << "Sending PING to Janus";
m_lastUnconfirmedPing = QDateTime::currentDateTime();
writeToJanus(jsonDoc.toJson());
}
void JanusConnector::createChannel(WebRtcSession *session)
{
QVariantMap attachPluginMessage;
attachPluginMessage.insert("janus", "attach");
attachPluginMessage.insert("session_id", session->janusSessionId);
QString transactionId = QUuid::createUuid().toString();
m_pendingRequests.insert(transactionId, session);
attachPluginMessage.insert("transaction", transactionId);
attachPluginMessage.insert("plugin", "janus.plugin.guhio");
attachPluginMessage.insert("opaque_id", "nymea-" + QUuid::createUuid().toString());
QJsonDocument jsonDoc = QJsonDocument::fromVariant(attachPluginMessage);
qCDebug(dcJanus()) << "Establishing channel for session" << session->sessionId;
writeToJanus(jsonDoc.toJson());
}
void JanusConnector::writeToJanus(const QByteArray &data)
{
if (!m_socket->isOpen() && !connectToJanus()) {
qCWarning(dcJanus()) << "Error connecting to Janus. Cannot write data to it.";
return;
}
qCDebug(dcJanusTraffic()) << "Writing to janus" << data;
qint64 count = m_socket->write(data);
if (count != data.length()) {
qCWarning(dcJanus()) << "Error writing to Janus.";
disconnectFromJanus();
return;
}
m_socket->flush();
}
QDebug operator<<(QDebug debug, const JanusConnector::WebRtcSession &session)
{
debug.nospace() << session.sessionId << " (Janus session: " << session.janusSessionId << " channel: " << session.janusChannelId << " connected to Janus: " << session.connectedToJanus << " WebRTC connected: " << session.webRtcConnected << ") ";
return debug;
}
QDebug operator<<(QDebug debug, JanusConnector::WebRtcSession *session)
{
debug.nospace() << *session;
return debug;
}

View File

@ -1,102 +0,0 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* *
* Copyright (C) 2017 Michael Zanetti <michael.zanetti@guh.io> *
* *
* This file is part of nymea. *
* *
* nymea is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, version 2 of the License. *
* *
* nymea is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with nymea. If not, see <http://www.gnu.org/licenses/>. *
* *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#ifndef JANUSCONNECTOR_H
#define JANUSCONNECTOR_H
#include <QObject>
#include <QLocalSocket>
#include <QTimer>
#include <QDateTime>
#include <QTcpServer>
class JanusConnector : public QObject
{
Q_OBJECT
public:
class WebRtcSession {
public:
QString sessionId; // This should be unique but persistent for each remote device
qint64 janusSessionId = -1; // the session id for the janus session.
qint64 janusChannelId = -1;
bool connectedToJanus = false;
QVariantMap offer;
bool offerSent = false;
QVariantList trickles;
QVariantMap webRtcUp;
bool webRtcConnected = false;
bool matchJanusSessionId(qint64 id) {
return (janusSessionId == id) || (janusSessionId == id +1) || (janusSessionId == id -1);
}
};
explicit JanusConnector(QObject *parent = nullptr);
void sendWebRtcHandshakeMessage(const QString &sessionId, const QVariantMap &message);
bool sendKeepAliveMessage(const QString &sessionId);
void setTurnCredentials(const QVariantMap &turnCredentials);
signals:
void connected();
void webRtcHandshakeMessageReceived(const QString &sessionId, const QVariantMap &message);
void requestTURNCredentials();
private slots:
void onDisconnected();
void onError(QLocalSocket::LocalSocketError socketError);
void onReadyRead();
void heartbeat();
void processQueue();
void newTurnServerConnection();
private:
QHash<QString, WebRtcSession*> m_pendingRequests;
bool connectToJanus();
void disconnectFromJanus();
void createSession(WebRtcSession *session);
void createChannel(WebRtcSession *session);
void writeToJanus(const QByteArray &data);
private:
QLocalSocket *m_socket = nullptr;
QDateTime m_lastUnconfirmedPing;
QTimer m_pingTimer;
QHash<QString, WebRtcSession*> m_sessions;
QStringList m_wantedAcks;
QTcpServer *m_turnCredentialsServer = nullptr;
QList<QTcpSocket*> m_pendingTurnCredentialRequests;
};
QDebug operator<<(QDebug debug, const JanusConnector::WebRtcSession &session);
QDebug operator<<(QDebug debug, JanusConnector::WebRtcSession *session);
#endif // JANUSCONNECTOR_H

View File

@ -193,10 +193,11 @@ JsonRPCServer::JsonRPCServer(const QSslConfiguration &sslConfiguration, QObject
setReturns("IsCloudConnected", returns);
params.clear(); returns.clear();
setDescription("KeepAlive", "Keep alive a remote connection. The sessionId is the MQTT topic which has been used to establish the session. It will return false if no ongoing session with the given ID can be found.");
setDescription("KeepAlive", "This is basically a Ping/Pong mechanism a client app may use to check server connectivity. Currently, the server does not actually do anything with this information and will return the call providing the given sessionId back to the caller. It is up to the client whether to use this or not and not required by the server to keep the connection alive.");
params.insert("sessionId", JsonTypes::basicTypeToString(JsonTypes::String));
setParams("KeepAlive", params);
returns.insert("success", JsonTypes::basicTypeToString(JsonTypes::Bool));
returns.insert("sessionId", JsonTypes::basicTypeToString(JsonTypes::String));
setReturns("KeepAlive", returns);
// Notifications
@ -384,12 +385,14 @@ JsonReply *JsonRPCServer::IsCloudConnected(const QVariantMap &params)
return createReply(data);
}
/*! A client may use this as a ping/pong mechanism to check server connectivity. */
JsonReply *JsonRPCServer::KeepAlive(const QVariantMap &params)
{
QString sessionId = params.value("sessionId").toString();
bool result = NymeaCore::instance()->cloudManager()->keepAlive(sessionId);
qCDebug(dcJsonRpc()) << "KeepAlive received" << sessionId;
QVariantMap resultMap;
resultMap.insert("success", result);
resultMap.insert("success", true);
resultMap.insert("sessionId", sessionId);
return createReply(resultMap);
}

View File

@ -73,7 +73,6 @@ HEADERS += nymeacore.h \
cloud/awsconnector.h \
cloud/cloudmanager.h \
cloud/cloudnotifications.h \
cloud/janusconnector.h \
pushbuttondbusservice.h \
hardwaremanagerimplementation.h \
hardware/plugintimermanagerimplementation.h \
@ -158,7 +157,6 @@ SOURCES += nymeacore.cpp \
cloud/awsconnector.cpp \
cloud/cloudmanager.cpp \
cloud/cloudnotifications.cpp \
cloud/janusconnector.cpp \
pushbuttondbusservice.cpp \
hardwaremanagerimplementation.cpp \
hardware/plugintimermanagerimplementation.cpp \

View File

@ -50,8 +50,6 @@ Q_LOGGING_CATEGORY(dcNetworkManager, "NetworkManager")
Q_LOGGING_CATEGORY(dcUserManager, "UserManager")
Q_LOGGING_CATEGORY(dcAWS, "AWS")
Q_LOGGING_CATEGORY(dcAWSTraffic, "AWSTraffic")
Q_LOGGING_CATEGORY(dcJanus, "Janus")
Q_LOGGING_CATEGORY(dcJanusTraffic, "JanusTraffic")
Q_LOGGING_CATEGORY(dcBluetoothServer, "BluetoothServer")
Q_LOGGING_CATEGORY(dcBluetoothServerTraffic, "BluetoothServerTraffic")
Q_LOGGING_CATEGORY(dcMqtt, "Mqtt")

View File

@ -58,8 +58,6 @@ Q_DECLARE_LOGGING_CATEGORY(dcNetworkManager)
Q_DECLARE_LOGGING_CATEGORY(dcUserManager)
Q_DECLARE_LOGGING_CATEGORY(dcAWS)
Q_DECLARE_LOGGING_CATEGORY(dcAWSTraffic)
Q_DECLARE_LOGGING_CATEGORY(dcJanus)
Q_DECLARE_LOGGING_CATEGORY(dcJanusTraffic)
Q_DECLARE_LOGGING_CATEGORY(dcBluetoothServer)
Q_DECLARE_LOGGING_CATEGORY(dcBluetoothServerTraffic)
Q_DECLARE_LOGGING_CATEGORY(dcMqtt)

View File

@ -129,8 +129,6 @@ int main(int argc, char *argv[])
"UserManager",
"AWS",
"AWSTraffic",
"Janus",
"JanusTraffic",
"BluetoothServer",
"BluetoothServerTraffic",
"Mqtt"