From a1d0574e20f47cd446aab0f262981b375c8001bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20St=C3=BCrz?= Date: Thu, 22 Jan 2026 11:58:58 +0100 Subject: [PATCH] LogEngine: Improve behavior of disabled logengine --- libnymea-core/logging/logengineinfluxdb.cpp | 226 +++++++++++++------- 1 file changed, 149 insertions(+), 77 deletions(-) diff --git a/libnymea-core/logging/logengineinfluxdb.cpp b/libnymea-core/logging/logengineinfluxdb.cpp index 456a2d48..53aae0d2 100644 --- a/libnymea-core/logging/logengineinfluxdb.cpp +++ b/libnymea-core/logging/logengineinfluxdb.cpp @@ -24,18 +24,18 @@ #include "logengineinfluxdb.h" -#include -#include -#include #include +#include +#include #include +#include LogEngineInfluxDB::LogEngineInfluxDB(const QString &host, const QString &dbName, const QString &username, const QString &password, QObject *parent) - : LogEngine{parent}, - m_host(host), - m_dbName(dbName), - m_username(username), - m_password(password) + : LogEngine{parent} + , m_host(host) + , m_dbName(dbName) + , m_username(username) + , m_password(password) { m_nam = new QNetworkAccessManager(this); @@ -53,7 +53,7 @@ LogEngineInfluxDB::~LogEngineInfluxDB() qCInfo(dcLogEngine()) << "Waiting for" << (m_initQueryQueue.count() + m_queryQueue.count() + m_writeQueue.count()) << "jobs to finish... Init status:" << m_initStatus; } while (jobsRunning()) { -// qCDebug(dcLogEngine()) << "Waiting for logs to finish processing." << m_writeQueue.count() << "jobs pending..."; + // qCDebug(dcLogEngine()) << "Waiting for logs to finish processing." << m_writeQueue.count() << "jobs pending..."; processQueues(); qApp->processEvents(); } @@ -66,12 +66,12 @@ Logger *LogEngineInfluxDB::registerLogSource(const QString &name, const QStringL return nullptr; } -// qCDebug(dcLogEngine()) << "Registering log source" << name << "with tags" << tagNames; + // qCDebug(dcLogEngine()) << "Registering log source" << name << "with tags" << tagNames; Logger *logger = createLogger(name, tagNames, loggingType); m_loggers.insert(name, logger); - if (loggingType == Types::LoggingTypeSampled) { + if (m_initStatus != InitStatusDisabled && loggingType == Types::LoggingTypeSampled) { qCDebug(dcLogEngine()) << "Setting up log sampling on" << sampleColumn; if (sampleColumn.isEmpty()) { @@ -84,55 +84,45 @@ Logger *LogEngineInfluxDB::registerLogSource(const QString &name, const QStringL columns.append(QString("MEAN(\"%1\") AS %1").arg(sampleColumn)); QString target = columns.join(", "); - QueryJob *minutesJob = query(QString("CREATE CONTINUOUS QUERY \"minutes-%1\" " - "ON \"nymea\" " - "BEGIN " - "SELECT %2 " - "INTO minutes.\"%1\" " - "FROM live.\"%1\" " - "GROUP BY time(1m) " - "fill(previous) " - "END").arg(name).arg(target), true); - connect(minutesJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response){ + QueryJob *minutesJob = query( + QString( + "CREATE CONTINUOUS QUERY \"minutes-%1\" " "ON \"nymea\" " "BEGIN " "SELECT %2 " "INTO minutes.\"%1\" " "FROM live.\"%1\" " "GROUP BY time(1m) " "fill(previous)" " " "END") + .arg(name) + .arg(target), + true); + connect(minutesJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response) { if (status == QNetworkReply::NoError) { qCDebug(dcLogEngine()) << "Created minute based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson()); } else { qCWarning(dcLogEngine()) << "Unable to create minute based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson()); } }); - QueryJob *hoursJob = query(QString("CREATE CONTINUOUS QUERY \"hours-%1\" " - "ON \"nymea\" " - "BEGIN " - "SELECT %2 " - "INTO hours.\"%1\" " - "FROM minutes.\"%1\" " - "GROUP BY time(1h) " - "fill(previous) " - "END").arg(name).arg(target), true); - connect(hoursJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response){ + QueryJob *hoursJob = query( + QString( + "CREATE CONTINUOUS QUERY \"hours-%1\" " "ON \"nymea\" " "BEGIN " "SELECT %2 " "INTO hours.\"%1\" " "FROM minutes.\"%1\" " "GROUP BY time(1h) " "fill(previous) " "END") + .arg(name) + .arg(target), + true); + connect(hoursJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response) { if (status == QNetworkReply::NoError) { qCDebug(dcLogEngine()) << "Created hour based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson()); } else { qCWarning(dcLogEngine()) << "Unable to create hour based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson()); } }); - QueryJob *daysJob = query(QString("CREATE CONTINUOUS QUERY \"days-%1\" " - "ON \"nymea\" " - "BEGIN " - "SELECT %2 " - "INTO days.\"%1\" " - "FROM hours.\"%1\" " - "GROUP BY time(24h) " - "fill(previous) " - "END").arg(name).arg(target), true); - connect(daysJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response){ + QueryJob *daysJob = query( + QString( + "CREATE CONTINUOUS QUERY \"days-%1\" " "ON \"nymea\" " "BEGIN " "SELECT %2 " "INTO days.\"%1\" " "FROM hours.\"%1\" " "GROUP BY time(24h) " "fill(previous) " "END") + .arg(name) + .arg(target), + true); + connect(daysJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response) { if (status == QNetworkReply::NoError) { qCDebug(dcLogEngine()) << "Created day based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson()); } else { qCWarning(dcLogEngine()) << "Unable to create days based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson()); } }); - } } @@ -147,12 +137,15 @@ void LogEngineInfluxDB::unregisterLogSource(const QString &name) return; } + if (m_initStatus == InitStatusDisabled) + return; + QString queryString = QString("DROP MEASUREMENT \"%1\"").arg(name); if (m_initStatus == InitStatusOK) qCDebug(dcLogEngine()) << "Removing log entries:" << queryString; QueryJob *job = query(queryString); - connect(job, &QueryJob::finished, this, [name](bool success){ + connect(job, &QueryJob::finished, this, [name](bool success) { if (success) { qCDebug(dcLogEngine()) << "Removed log entries for source" << name; } else { @@ -163,6 +156,9 @@ void LogEngineInfluxDB::unregisterLogSource(const QString &name) void LogEngineInfluxDB::logEvent(Logger *logger, const QStringList &tags, const QVariantMap &values) { + if (m_initStatus == InitStatusDisabled) + return; + QString measurement = logger->name(); QStringList tagsList; QStringList fieldsList; @@ -233,7 +229,7 @@ void LogEngineInfluxDB::processQueues() return; } -// qCDebug(dcLogEngine()) << "Processing queue:" << m_initStatus << "init count:" << m_initQueryQueue.count() << "query count:" << m_queryQueue.count() << "write count:" << m_writeQueue.count(); + // qCDebug(dcLogEngine()) << "Processing queue:" << m_initStatus << "init count:" << m_initQueryQueue.count() << "query count:" << m_queryQueue.count() << "write count:" << m_writeQueue.count(); if (!m_currentInitQuery && !m_initQueryQueue.isEmpty()) { QueryJob *job = m_initQueryQueue.takeFirst(); @@ -248,10 +244,14 @@ void LogEngineInfluxDB::processQueues() m_currentInitQuery = job; - connect(reply, &QNetworkReply::finished, job, [=](){ + connect(reply, &QNetworkReply::finished, job, [=]() { m_currentInitQuery = nullptr; qCDebug(dcLogEngine()) << "Init query job finished"; reply->deleteLater(); + if (m_initStatus == InitStatusDisabled) { + job->finish(reply->error()); + return; + } if (reply->error() == QNetworkReply::ProtocolInvalidOperationError) { qCWarning(dcLogEngine()) << "Influx DB protocol error:" << reply->readAll(); job->finish(reply->error()); @@ -259,6 +259,10 @@ void LogEngineInfluxDB::processQueues() } if (reply->error() != QNetworkReply::NoError) { + if (m_initStatus == InitStatusDisabled) { + job->finish(reply->error()); + return; + } qCWarning(dcLogEngine()) << "Error in influxdb communication:" << reply->error() << reply->errorString() << "for query:" << job->m_request.url().toString(); job->finish(reply->error()); return; @@ -276,7 +280,7 @@ void LogEngineInfluxDB::processQueues() }); } - if (m_initStatus != InitStatusOK ) { + if (m_initStatus != InitStatusOK) { return; } @@ -294,10 +298,15 @@ void LogEngineInfluxDB::processQueues() m_currentQuery = job; - connect(reply, &QNetworkReply::finished, job, [=](){ + connect(reply, &QNetworkReply::finished, job, [=]() { qCDebug(dcLogEngine()) << "Query finished"; m_currentQuery = nullptr; reply->deleteLater(); + if (m_initStatus == InitStatusDisabled) { + job->finish(reply->error()); + processQueues(); + return; + } if (reply->error() == QNetworkReply::ProtocolInvalidOperationError) { qCWarning(dcLogEngine()) << "Influx DB protocol error:" << reply->readAll(); job->finish(reply->error()); @@ -306,6 +315,11 @@ void LogEngineInfluxDB::processQueues() } if (reply->error() != QNetworkReply::NoError) { + if (m_initStatus == InitStatusDisabled) { + job->finish(reply->error()); + processQueues(); + return; + } qCWarning(dcLogEngine()) << "Error in influxdb communication:" << reply->error() << reply->errorString(); job->finish(reply->error()); processQueues(); @@ -320,7 +334,7 @@ void LogEngineInfluxDB::processQueues() processQueues(); return; } - // qCDebug(dcLogEngine()) << "Reply" << qUtf8Printable(jsonDoc.toJson(QJsonDocument::Indented)); + // qCDebug(dcLogEngine()) << "Reply" << qUtf8Printable(jsonDoc.toJson(QJsonDocument::Indented)); job->finish(QNetworkReply::NoError, jsonDoc.toVariant().toMap().value("results").toList()); @@ -341,11 +355,19 @@ void LogEngineInfluxDB::processQueues() qCDebug(dcLogEngine()) << "Started:" << reply->isRunning() << reply->isFinished(); m_currentWriteReply = reply; - connect(reply, &QNetworkReply::finished, this, [=](){ + connect(reply, &QNetworkReply::finished, this, [=]() { m_currentWriteReply = nullptr; reply->deleteLater(); + if (m_initStatus == InitStatusDisabled) { + processQueues(); + return; + } if (reply->error() != QNetworkReply::NoError) { + if (m_initStatus == InitStatusDisabled) { + processQueues(); + return; + } qCWarning(dcLogEngine()) << "Unable to connect to influxdb. Cannot log events." << reply->error() << reply->readAll(); processQueues(); return; @@ -362,10 +384,23 @@ void LogEngineInfluxDB::processQueues() } } -LogFetchJob *LogEngineInfluxDB::fetchLogEntries(const QStringList &sources, const QStringList &columns, const QDateTime &startTime, const QDateTime &endTime, const QVariantMap &filter, Types::SampleRate sampleRate, Qt::SortOrder sortOrder, int offset, int limit) +LogFetchJob *LogEngineInfluxDB::fetchLogEntries(const QStringList &sources, + const QStringList &columns, + const QDateTime &startTime, + const QDateTime &endTime, + const QVariantMap &filter, + Types::SampleRate sampleRate, + Qt::SortOrder sortOrder, + int offset, + int limit) { LogFetchJob *job = new LogFetchJob(this); + if (m_initStatus == InitStatusDisabled) { + finishFetchJob(job, LogEntries()); + return job; + } + // FIXME: injection attacks possible? QString what = "*"; if (sampleRate == Types::SampleRateAny) { @@ -386,7 +421,6 @@ LogFetchJob *LogEngineInfluxDB::fetchLogEntries(const QStringList &sources, cons QStringList escapedSourced; foreach (const QString &source, sources) { - QString retentionPolicy; switch (sampleRate) { case Types::SampleRate1Min: @@ -456,9 +490,17 @@ LogFetchJob *LogEngineInfluxDB::fetchLogEntries(const QStringList &sources, cons QNetworkRequest request = createQueryRequest(query); qCDebug(dcLogEngine()) << "Request:" << request.url() << filter; QNetworkReply *reply = m_nam->get(request); - connect(reply, &QNetworkReply::finished, this, [=](){ + connect(reply, &QNetworkReply::finished, this, [=]() { reply->deleteLater(); + if (m_initStatus == InitStatusDisabled) { + finishFetchJob(job, QList()); + return; + } if (reply->error() != QNetworkReply::NoError) { + if (m_initStatus == InitStatusDisabled) { + finishFetchJob(job, QList()); + return; + } qCWarning(dcLogEngine()) << "Unable to obtain entries from influxdb" << reply->error() << reply->readAll(); finishFetchJob(job, QList()); return; @@ -507,20 +549,18 @@ LogFetchJob *LogEngineInfluxDB::fetchLogEntries(const QStringList &sources, cons bool LogEngineInfluxDB::jobsRunning() const { -// qCDebug(dcLogEngine()) << "Jobs running:" << m_initStatus << m_writeQueue.count() << m_initQueryQueue.count() << m_queryQueue.count() << m_currentWriteReply; - return m_currentInitQuery - || !m_initQueryQueue.isEmpty() - || m_currentQuery - || !m_queryQueue.isEmpty() - || m_currentWriteReply - || !m_writeQueue.isEmpty(); + // qCDebug(dcLogEngine()) << "Jobs running:" << m_initStatus << m_writeQueue.count() << m_initQueryQueue.count() << m_queryQueue.count() << m_currentWriteReply; + return m_currentInitQuery || !m_initQueryQueue.isEmpty() || m_currentQuery || !m_queryQueue.isEmpty() || m_currentWriteReply || !m_writeQueue.isEmpty(); } void LogEngineInfluxDB::clear(const QString &source) { + if (m_initStatus == InitStatusDisabled) + return; + qCDebug(dcLogEngine()) << "Clearing entries for source:" << source; QueryJob *job = query(QString("DROP MEASUREMENT \"%1\"").arg(source)); - connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results){ + connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results) { if (status != QNetworkReply::NoError) { qCWarning(dcLogEngine()) << "Unable to clear log entries for" << source << ":" << qUtf8Printable(QJsonDocument::fromVariant(results).toJson()); } @@ -552,8 +592,14 @@ void LogEngineInfluxDB::initDB() void LogEngineInfluxDB::createDB() { + if (m_initStatus == InitStatusDisabled) + return; + QueryJob *job = query("SHOW DATABASES", false, true); - connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results){ + connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results) { + if (m_initStatus == InitStatusDisabled) + return; + if (status != QNetworkReply::NoError) { if (status == QNetworkReply::ConnectionRefusedError) { // Influx not up yet? trying again in 5 secs... @@ -563,6 +609,10 @@ void LogEngineInfluxDB::createDB() } return; } + + if (m_initStatus == InitStatusDisabled) + return; + qCCritical(dcLogEngine()) << "Unable to connect to InfluxDB"; m_initStatus = InitStatusFailure; return; @@ -604,6 +654,9 @@ void LogEngineInfluxDB::createDB() QueryJob *job = query(QString("CREATE DATABASE %1").arg(m_dbName), true, true); connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &result) { + if (m_initStatus == InitStatusDisabled) + return; + if (status != QNetworkReply::NoError) { qCCritical(dcLogEngine()) << "Unable to create" << m_dbName << "database in influxdb:" << QJsonDocument::fromVariant(result).toJson(); m_initStatus = InitStatusFailure; @@ -617,8 +670,14 @@ void LogEngineInfluxDB::createDB() void LogEngineInfluxDB::createRetentionPolicies() { + if (m_initStatus == InitStatusDisabled) + return; + QueryJob *job = query("SHOW RETENTION POLICIES", false, true); - connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results){ + connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results) { + if (m_initStatus == InitStatusDisabled) { + return; + } if (status != QNetworkReply::NoError) { qCCritical(dcLogEngine()) << "Unable to query retention policies."; m_initStatus = InitStatusFailure; @@ -679,7 +738,10 @@ void LogEngineInfluxDB::createRetentionPolicies() if (!discreteRPFound) { qCInfo(dcLogEngine()) << "Creating discrete nymea retention policy in influxdb"; QueryJob *job = query(QString("CREATE RETENTION POLICY discrete ON %1 DURATION 8760h REPLICATION 1").arg(m_dbName), true, true); - connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status){ + connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status) { + if (m_initStatus == InitStatusDisabled) + return; + if (status != QNetworkReply::NoError) { qCWarning(dcLogEngine()) << "Unable to create discrete retention policy in influxdb."; m_initStatus = InitStatusFailure; @@ -693,7 +755,10 @@ void LogEngineInfluxDB::createRetentionPolicies() if (!liveRPFound) { qCInfo(dcLogEngine()) << "Creating live nymea retention policy in influxdb"; QueryJob *job = query(QString("CREATE RETENTION POLICY live ON %1 DURATION 24h REPLICATION 1").arg(m_dbName), true, true); - connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status){ + connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status) { + if (m_initStatus == InitStatusDisabled) + return; + if (status != QNetworkReply::NoError) { qCWarning(dcLogEngine()) << "Unable to create live retention policy in influxdb."; m_initStatus = InitStatusFailure; @@ -707,7 +772,10 @@ void LogEngineInfluxDB::createRetentionPolicies() if (!minutesRPFound) { qCInfo(dcLogEngine()) << "Creating minutes nymea retention policy in influxdb"; QueryJob *job = query(QString("CREATE RETENTION POLICY minutes ON %1 DURATION 168h REPLICATION 1").arg(m_dbName), true, true); - connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status){ + connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status) { + if (m_initStatus == InitStatusDisabled) + return; + if (status != QNetworkReply::NoError) { qCWarning(dcLogEngine()) << "Unable to create minutes retention policy in influxdb."; m_initStatus = InitStatusFailure; @@ -721,7 +789,10 @@ void LogEngineInfluxDB::createRetentionPolicies() if (!hoursRPFound) { qCInfo(dcLogEngine()) << "Creating hours nymea retention policy in influxdb"; QueryJob *job = query(QString("CREATE RETENTION POLICY hours ON %1 DURATION 26280h REPLICATION 1").arg(m_dbName), true, true); - connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status){ + connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status) { + if (m_initStatus == InitStatusDisabled) + return; + if (status != QNetworkReply::NoError) { qCWarning(dcLogEngine()) << "Unable to create hours retention policy in influxdb."; m_initStatus = InitStatusFailure; @@ -735,7 +806,10 @@ void LogEngineInfluxDB::createRetentionPolicies() if (!daysRPFound) { qCInfo(dcLogEngine()) << "Creating days nymea retention policy in influxdb"; QueryJob *job = query(QString("CREATE RETENTION POLICY days ON %1 DURATION 175200h REPLICATION 1").arg(m_dbName), true, true); - connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status){ + connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status) { + if (m_initStatus == InitStatusDisabled) + return; + if (status != QNetworkReply::NoError) { qCWarning(dcLogEngine()) << "Unable to create days retention policy in influxdb."; m_initStatus = InitStatusFailure; @@ -748,7 +822,8 @@ void LogEngineInfluxDB::createRetentionPolicies() m_initStatus = InitStatusOK; - qCDebug(dcLogEngine()) << "Influx initialized. Starting to process log entries (" << m_initQueryQueue.count() << m_queryQueue.count() << m_writeQueue.count() << "in queue)"; + qCDebug(dcLogEngine()) << "Influx initialized. Starting to process log entries (" << m_initQueryQueue.count() << m_queryQueue.count() << m_writeQueue.count() + << "in queue)"; processQueues(); }); } @@ -819,18 +894,15 @@ QueryJob *LogEngineInfluxDB::query(const QString &query, bool post, bool isInit) return job; } -QueryJob::QueryJob(const QNetworkRequest &request, bool post, bool isInit, QObject *parent): - QObject(parent), - m_request(request), - m_post(post), - m_isInit(isInit) -{ - -} +QueryJob::QueryJob(const QNetworkRequest &request, bool post, bool isInit, QObject *parent) + : QObject(parent) + , m_request(request) + , m_post(post) + , m_isInit(isInit) +{} void QueryJob::finish(QNetworkReply::NetworkError status, const QVariantList &results) { QMetaObject::invokeMethod(this, "finished", Qt::QueuedConnection, Q_ARG(QNetworkReply::NetworkError, status), Q_ARG(QVariantList, results)); QMetaObject::invokeMethod(this, "deleteLater", Qt::QueuedConnection); } -