better error handling if something goes wrong in the janus session

This commit is contained in:
Michael Zanetti 2017-12-14 12:42:42 +01:00
parent bec536af7d
commit 57c02f1e4d
2 changed files with 49 additions and 6 deletions

View File

@ -35,7 +35,10 @@ JanusConnector::JanusConnector(QObject *parent) : QObject(parent)
connect(m_socket, &QLocalSocket::disconnected, this, &JanusConnector::onDisconnected);
connect(m_socket, &QLocalSocket::readyRead, this, &JanusConnector::onReadyRead);
connectToJanus();
// When Janus crashes it will leave the socket in a very broken state which causes QLocalSocket to spin the CPU
// So let's use a rather short heartbeat to send ping messages and clean things up in case they are not acked.
m_pingTimer.setInterval(1000);
connect(&m_pingTimer, &QTimer::timeout, this, &JanusConnector::heartbeat);
}
bool JanusConnector::connectToJanus()
@ -58,10 +61,18 @@ bool JanusConnector::connectToJanus()
}
m_socket->setSocketDescriptor(sock);
m_pingTimer.start();
return true;
}
void JanusConnector::disconnectFromJanus()
{
m_socket->close();
m_pingTimer.stop();
m_lastUnconfirmedPing = QDateTime();
}
void JanusConnector::createSession(JanusConnector::WebRtcSession *session)
{
// Open the session
@ -231,7 +242,11 @@ void JanusConnector::onReadyRead()
foreach (WebRtcSession *session, m_sessions) {
if (session->matchJanusSessionId(sessionId)) {
qCDebug(dcJanus()) << "Session" << session << "timed out. Removing session";
delete m_sessions.take(session->sessionId);
m_sessions.remove(session->sessionId);
delete session;
if (m_sessions.isEmpty()) {
disconnectFromJanus();
}
return;
}
}
@ -267,7 +282,11 @@ void JanusConnector::onReadyRead()
hangup.insert("type", "hangup");
hangup.insert("reason", map.value("reason").toString());
emit webRtcHandshakeMessageReceived(session->sessionId, hangup);
delete m_sessions.take(session->sessionId);
m_sessions.remove(session->sessionId);
delete session;
if (m_sessions.isEmpty()) {
disconnectFromJanus();
}
return;
}
}
@ -286,6 +305,7 @@ void JanusConnector::onReadyRead()
if (!session) {
if (transactionId == "pingety") {
qCDebug(dcJanus()) << "Received PONG from Janus";
m_lastUnconfirmedPing = QDateTime();
return;
}
if (transactionId == "keepalive") {
@ -314,6 +334,9 @@ void JanusConnector::onReadyRead()
}
qCWarning(dcJanus()) << "Error establishing session";
delete m_sessions.take(session->sessionId);
if (m_sessions.isEmpty()) {
disconnectFromJanus();
}
return;
}
@ -362,13 +385,21 @@ void JanusConnector::onReadyRead()
void JanusConnector::heartbeat()
{
if (!m_lastUnconfirmedPing.isNull()) {
qCWarning(dcJanus()) << "Last ping not echoed by Janus. Seems the connection broke down. Cleaning up...";
while (!m_sessions.isEmpty()) {
delete m_sessions.take(m_sessions.keys().first());
}
disconnectFromJanus();
return;
}
QVariantMap map;
map.insert("janus", "ping");
map.insert("transaction", "pingety");
QJsonDocument jsonDoc = QJsonDocument::fromVariant(map);
// qCDebug(dcJanus()) << "Sending PING to Janus";
m_socket->write(jsonDoc.toJson());
m_socket->flush();
qCDebug(dcJanus()) << "Sending PING to Janus";
m_lastUnconfirmedPing = QDateTime::currentDateTime();
writeToJanus(jsonDoc.toJson());
}
void JanusConnector::createChannel(WebRtcSession *session)
@ -388,10 +419,16 @@ void JanusConnector::createChannel(WebRtcSession *session)
void JanusConnector::writeToJanus(const QByteArray &data)
{
if (!m_socket->isOpen() && !connectToJanus()) {
qCWarning(dcJanus()) << "Error connecting to Janus. Cannot write data to it.";
return;
}
qCDebug(dcJanusTraffic()) << "Writing to janus" << data;
qint64 count = m_socket->write(data);
if (count != data.length()) {
qCWarning(dcJanus()) << "Error writing to Janus.";
disconnectFromJanus();
return;
}
m_socket->flush();
}

View File

@ -24,6 +24,7 @@
#include <QObject>
#include <QLocalSocket>
#include <QTimer>
#include <QDateTime>
class JanusConnector : public QObject
{
@ -68,6 +69,8 @@ private:
QHash<QString, WebRtcSession*> m_pendingRequests;
bool connectToJanus();
void disconnectFromJanus();
void createSession(WebRtcSession *session);
void createChannel(WebRtcSession *session);
void writeToJanus(const QByteArray &data);
@ -77,6 +80,9 @@ private:
QLocalSocket *m_socket = nullptr;
QDateTime m_lastUnconfirmedPing;
QTimer m_pingTimer;
QHash<QString, WebRtcSession*> m_sessions;
QStringList m_wantedAcks;