From 57c02f1e4d9ceb8403660888708e5a4f7770226b Mon Sep 17 00:00:00 2001 From: Michael Zanetti Date: Thu, 14 Dec 2017 12:42:42 +0100 Subject: [PATCH] better error handling if something goes wrong in the janus session --- libnymea-core/janusconnector.cpp | 49 ++++++++++++++++++++++++++++---- libnymea-core/janusconnector.h | 6 ++++ 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/libnymea-core/janusconnector.cpp b/libnymea-core/janusconnector.cpp index 23349af9..108392ea 100644 --- a/libnymea-core/janusconnector.cpp +++ b/libnymea-core/janusconnector.cpp @@ -35,7 +35,10 @@ JanusConnector::JanusConnector(QObject *parent) : QObject(parent) connect(m_socket, &QLocalSocket::disconnected, this, &JanusConnector::onDisconnected); connect(m_socket, &QLocalSocket::readyRead, this, &JanusConnector::onReadyRead); - connectToJanus(); + // When Janus crashes it will leave the socket in a very broken state which causes QLocalSocket to spin the CPU + // So let's use a rather short heartbeat to send ping messages and clean things up in case they are not acked. + m_pingTimer.setInterval(1000); + connect(&m_pingTimer, &QTimer::timeout, this, &JanusConnector::heartbeat); } bool JanusConnector::connectToJanus() @@ -58,10 +61,18 @@ bool JanusConnector::connectToJanus() } m_socket->setSocketDescriptor(sock); + m_pingTimer.start(); return true; } +void JanusConnector::disconnectFromJanus() +{ + m_socket->close(); + m_pingTimer.stop(); + m_lastUnconfirmedPing = QDateTime(); +} + void JanusConnector::createSession(JanusConnector::WebRtcSession *session) { // Open the session @@ -231,7 +242,11 @@ void JanusConnector::onReadyRead() foreach (WebRtcSession *session, m_sessions) { if (session->matchJanusSessionId(sessionId)) { qCDebug(dcJanus()) << "Session" << session << "timed out. Removing session"; - delete m_sessions.take(session->sessionId); + m_sessions.remove(session->sessionId); + delete session; + if (m_sessions.isEmpty()) { + disconnectFromJanus(); + } return; } } @@ -267,7 +282,11 @@ void JanusConnector::onReadyRead() hangup.insert("type", "hangup"); hangup.insert("reason", map.value("reason").toString()); emit webRtcHandshakeMessageReceived(session->sessionId, hangup); - delete m_sessions.take(session->sessionId); + m_sessions.remove(session->sessionId); + delete session; + if (m_sessions.isEmpty()) { + disconnectFromJanus(); + } return; } } @@ -286,6 +305,7 @@ void JanusConnector::onReadyRead() if (!session) { if (transactionId == "pingety") { qCDebug(dcJanus()) << "Received PONG from Janus"; + m_lastUnconfirmedPing = QDateTime(); return; } if (transactionId == "keepalive") { @@ -314,6 +334,9 @@ void JanusConnector::onReadyRead() } qCWarning(dcJanus()) << "Error establishing session"; delete m_sessions.take(session->sessionId); + if (m_sessions.isEmpty()) { + disconnectFromJanus(); + } return; } @@ -362,13 +385,21 @@ void JanusConnector::onReadyRead() void JanusConnector::heartbeat() { + if (!m_lastUnconfirmedPing.isNull()) { + qCWarning(dcJanus()) << "Last ping not echoed by Janus. Seems the connection broke down. Cleaning up..."; + while (!m_sessions.isEmpty()) { + delete m_sessions.take(m_sessions.keys().first()); + } + disconnectFromJanus(); + return; + } QVariantMap map; map.insert("janus", "ping"); map.insert("transaction", "pingety"); QJsonDocument jsonDoc = QJsonDocument::fromVariant(map); -// qCDebug(dcJanus()) << "Sending PING to Janus"; - m_socket->write(jsonDoc.toJson()); - m_socket->flush(); + qCDebug(dcJanus()) << "Sending PING to Janus"; + m_lastUnconfirmedPing = QDateTime::currentDateTime(); + writeToJanus(jsonDoc.toJson()); } void JanusConnector::createChannel(WebRtcSession *session) @@ -388,10 +419,16 @@ void JanusConnector::createChannel(WebRtcSession *session) void JanusConnector::writeToJanus(const QByteArray &data) { + if (!m_socket->isOpen() && !connectToJanus()) { + qCWarning(dcJanus()) << "Error connecting to Janus. Cannot write data to it."; + return; + } qCDebug(dcJanusTraffic()) << "Writing to janus" << data; qint64 count = m_socket->write(data); if (count != data.length()) { qCWarning(dcJanus()) << "Error writing to Janus."; + disconnectFromJanus(); + return; } m_socket->flush(); } diff --git a/libnymea-core/janusconnector.h b/libnymea-core/janusconnector.h index 17ee9260..950198d5 100644 --- a/libnymea-core/janusconnector.h +++ b/libnymea-core/janusconnector.h @@ -24,6 +24,7 @@ #include #include #include +#include class JanusConnector : public QObject { @@ -68,6 +69,8 @@ private: QHash m_pendingRequests; bool connectToJanus(); + void disconnectFromJanus(); + void createSession(WebRtcSession *session); void createChannel(WebRtcSession *session); void writeToJanus(const QByteArray &data); @@ -77,6 +80,9 @@ private: QLocalSocket *m_socket = nullptr; + QDateTime m_lastUnconfirmedPing; + QTimer m_pingTimer; + QHash m_sessions; QStringList m_wantedAcks;