LogEngine: Improve behavior of disabled logengine

This commit is contained in:
Simon Stürz 2026-01-22 11:58:58 +01:00
parent abd8dd2d97
commit a1d0574e20

View File

@ -24,18 +24,18 @@
#include "logengineinfluxdb.h"
#include <QNetworkReply>
#include <QUrlQuery>
#include <QJsonDocument>
#include <QCoreApplication>
#include <QJsonDocument>
#include <QNetworkReply>
#include <QRegularExpression>
#include <QUrlQuery>
LogEngineInfluxDB::LogEngineInfluxDB(const QString &host, const QString &dbName, const QString &username, const QString &password, QObject *parent)
: LogEngine{parent},
m_host(host),
m_dbName(dbName),
m_username(username),
m_password(password)
: LogEngine{parent}
, m_host(host)
, m_dbName(dbName)
, m_username(username)
, m_password(password)
{
m_nam = new QNetworkAccessManager(this);
@ -53,7 +53,7 @@ LogEngineInfluxDB::~LogEngineInfluxDB()
qCInfo(dcLogEngine()) << "Waiting for" << (m_initQueryQueue.count() + m_queryQueue.count() + m_writeQueue.count()) << "jobs to finish... Init status:" << m_initStatus;
}
while (jobsRunning()) {
// qCDebug(dcLogEngine()) << "Waiting for logs to finish processing." << m_writeQueue.count() << "jobs pending...";
// qCDebug(dcLogEngine()) << "Waiting for logs to finish processing." << m_writeQueue.count() << "jobs pending...";
processQueues();
qApp->processEvents();
}
@ -66,12 +66,12 @@ Logger *LogEngineInfluxDB::registerLogSource(const QString &name, const QStringL
return nullptr;
}
// qCDebug(dcLogEngine()) << "Registering log source" << name << "with tags" << tagNames;
// qCDebug(dcLogEngine()) << "Registering log source" << name << "with tags" << tagNames;
Logger *logger = createLogger(name, tagNames, loggingType);
m_loggers.insert(name, logger);
if (loggingType == Types::LoggingTypeSampled) {
if (m_initStatus != InitStatusDisabled && loggingType == Types::LoggingTypeSampled) {
qCDebug(dcLogEngine()) << "Setting up log sampling on" << sampleColumn;
if (sampleColumn.isEmpty()) {
@ -84,55 +84,45 @@ Logger *LogEngineInfluxDB::registerLogSource(const QString &name, const QStringL
columns.append(QString("MEAN(\"%1\") AS %1").arg(sampleColumn));
QString target = columns.join(", ");
QueryJob *minutesJob = query(QString("CREATE CONTINUOUS QUERY \"minutes-%1\" "
"ON \"nymea\" "
"BEGIN "
"SELECT %2 "
"INTO minutes.\"%1\" "
"FROM live.\"%1\" "
"GROUP BY time(1m) "
"fill(previous) "
"END").arg(name).arg(target), true);
connect(minutesJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response){
QueryJob *minutesJob = query(
QString(
"CREATE CONTINUOUS QUERY \"minutes-%1\" " "ON \"nymea\" " "BEGIN " "SELECT %2 " "INTO minutes.\"%1\" " "FROM live.\"%1\" " "GROUP BY time(1m) " "fill(previous)" " " "END")
.arg(name)
.arg(target),
true);
connect(minutesJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response) {
if (status == QNetworkReply::NoError) {
qCDebug(dcLogEngine()) << "Created minute based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson());
} else {
qCWarning(dcLogEngine()) << "Unable to create minute based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson());
}
});
QueryJob *hoursJob = query(QString("CREATE CONTINUOUS QUERY \"hours-%1\" "
"ON \"nymea\" "
"BEGIN "
"SELECT %2 "
"INTO hours.\"%1\" "
"FROM minutes.\"%1\" "
"GROUP BY time(1h) "
"fill(previous) "
"END").arg(name).arg(target), true);
connect(hoursJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response){
QueryJob *hoursJob = query(
QString(
"CREATE CONTINUOUS QUERY \"hours-%1\" " "ON \"nymea\" " "BEGIN " "SELECT %2 " "INTO hours.\"%1\" " "FROM minutes.\"%1\" " "GROUP BY time(1h) " "fill(previous) " "END")
.arg(name)
.arg(target),
true);
connect(hoursJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response) {
if (status == QNetworkReply::NoError) {
qCDebug(dcLogEngine()) << "Created hour based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson());
} else {
qCWarning(dcLogEngine()) << "Unable to create hour based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson());
}
});
QueryJob *daysJob = query(QString("CREATE CONTINUOUS QUERY \"days-%1\" "
"ON \"nymea\" "
"BEGIN "
"SELECT %2 "
"INTO days.\"%1\" "
"FROM hours.\"%1\" "
"GROUP BY time(24h) "
"fill(previous) "
"END").arg(name).arg(target), true);
connect(daysJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response){
QueryJob *daysJob = query(
QString(
"CREATE CONTINUOUS QUERY \"days-%1\" " "ON \"nymea\" " "BEGIN " "SELECT %2 " "INTO days.\"%1\" " "FROM hours.\"%1\" " "GROUP BY time(24h) " "fill(previous) " "END")
.arg(name)
.arg(target),
true);
connect(daysJob, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &response) {
if (status == QNetworkReply::NoError) {
qCDebug(dcLogEngine()) << "Created day based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson());
} else {
qCWarning(dcLogEngine()) << "Unable to create days based continuous query for" << name << qUtf8Printable(QJsonDocument::fromVariant(response).toJson());
}
});
}
}
@ -147,12 +137,15 @@ void LogEngineInfluxDB::unregisterLogSource(const QString &name)
return;
}
if (m_initStatus == InitStatusDisabled)
return;
QString queryString = QString("DROP MEASUREMENT \"%1\"").arg(name);
if (m_initStatus == InitStatusOK)
qCDebug(dcLogEngine()) << "Removing log entries:" << queryString;
QueryJob *job = query(queryString);
connect(job, &QueryJob::finished, this, [name](bool success){
connect(job, &QueryJob::finished, this, [name](bool success) {
if (success) {
qCDebug(dcLogEngine()) << "Removed log entries for source" << name;
} else {
@ -163,6 +156,9 @@ void LogEngineInfluxDB::unregisterLogSource(const QString &name)
void LogEngineInfluxDB::logEvent(Logger *logger, const QStringList &tags, const QVariantMap &values)
{
if (m_initStatus == InitStatusDisabled)
return;
QString measurement = logger->name();
QStringList tagsList;
QStringList fieldsList;
@ -233,7 +229,7 @@ void LogEngineInfluxDB::processQueues()
return;
}
// qCDebug(dcLogEngine()) << "Processing queue:" << m_initStatus << "init count:" << m_initQueryQueue.count() << "query count:" << m_queryQueue.count() << "write count:" << m_writeQueue.count();
// qCDebug(dcLogEngine()) << "Processing queue:" << m_initStatus << "init count:" << m_initQueryQueue.count() << "query count:" << m_queryQueue.count() << "write count:" << m_writeQueue.count();
if (!m_currentInitQuery && !m_initQueryQueue.isEmpty()) {
QueryJob *job = m_initQueryQueue.takeFirst();
@ -248,10 +244,14 @@ void LogEngineInfluxDB::processQueues()
m_currentInitQuery = job;
connect(reply, &QNetworkReply::finished, job, [=](){
connect(reply, &QNetworkReply::finished, job, [=]() {
m_currentInitQuery = nullptr;
qCDebug(dcLogEngine()) << "Init query job finished";
reply->deleteLater();
if (m_initStatus == InitStatusDisabled) {
job->finish(reply->error());
return;
}
if (reply->error() == QNetworkReply::ProtocolInvalidOperationError) {
qCWarning(dcLogEngine()) << "Influx DB protocol error:" << reply->readAll();
job->finish(reply->error());
@ -259,6 +259,10 @@ void LogEngineInfluxDB::processQueues()
}
if (reply->error() != QNetworkReply::NoError) {
if (m_initStatus == InitStatusDisabled) {
job->finish(reply->error());
return;
}
qCWarning(dcLogEngine()) << "Error in influxdb communication:" << reply->error() << reply->errorString() << "for query:" << job->m_request.url().toString();
job->finish(reply->error());
return;
@ -276,7 +280,7 @@ void LogEngineInfluxDB::processQueues()
});
}
if (m_initStatus != InitStatusOK ) {
if (m_initStatus != InitStatusOK) {
return;
}
@ -294,10 +298,15 @@ void LogEngineInfluxDB::processQueues()
m_currentQuery = job;
connect(reply, &QNetworkReply::finished, job, [=](){
connect(reply, &QNetworkReply::finished, job, [=]() {
qCDebug(dcLogEngine()) << "Query finished";
m_currentQuery = nullptr;
reply->deleteLater();
if (m_initStatus == InitStatusDisabled) {
job->finish(reply->error());
processQueues();
return;
}
if (reply->error() == QNetworkReply::ProtocolInvalidOperationError) {
qCWarning(dcLogEngine()) << "Influx DB protocol error:" << reply->readAll();
job->finish(reply->error());
@ -306,6 +315,11 @@ void LogEngineInfluxDB::processQueues()
}
if (reply->error() != QNetworkReply::NoError) {
if (m_initStatus == InitStatusDisabled) {
job->finish(reply->error());
processQueues();
return;
}
qCWarning(dcLogEngine()) << "Error in influxdb communication:" << reply->error() << reply->errorString();
job->finish(reply->error());
processQueues();
@ -320,7 +334,7 @@ void LogEngineInfluxDB::processQueues()
processQueues();
return;
}
// qCDebug(dcLogEngine()) << "Reply" << qUtf8Printable(jsonDoc.toJson(QJsonDocument::Indented));
// qCDebug(dcLogEngine()) << "Reply" << qUtf8Printable(jsonDoc.toJson(QJsonDocument::Indented));
job->finish(QNetworkReply::NoError, jsonDoc.toVariant().toMap().value("results").toList());
@ -341,11 +355,19 @@ void LogEngineInfluxDB::processQueues()
qCDebug(dcLogEngine()) << "Started:" << reply->isRunning() << reply->isFinished();
m_currentWriteReply = reply;
connect(reply, &QNetworkReply::finished, this, [=](){
connect(reply, &QNetworkReply::finished, this, [=]() {
m_currentWriteReply = nullptr;
reply->deleteLater();
if (m_initStatus == InitStatusDisabled) {
processQueues();
return;
}
if (reply->error() != QNetworkReply::NoError) {
if (m_initStatus == InitStatusDisabled) {
processQueues();
return;
}
qCWarning(dcLogEngine()) << "Unable to connect to influxdb. Cannot log events." << reply->error() << reply->readAll();
processQueues();
return;
@ -362,10 +384,23 @@ void LogEngineInfluxDB::processQueues()
}
}
LogFetchJob *LogEngineInfluxDB::fetchLogEntries(const QStringList &sources, const QStringList &columns, const QDateTime &startTime, const QDateTime &endTime, const QVariantMap &filter, Types::SampleRate sampleRate, Qt::SortOrder sortOrder, int offset, int limit)
LogFetchJob *LogEngineInfluxDB::fetchLogEntries(const QStringList &sources,
const QStringList &columns,
const QDateTime &startTime,
const QDateTime &endTime,
const QVariantMap &filter,
Types::SampleRate sampleRate,
Qt::SortOrder sortOrder,
int offset,
int limit)
{
LogFetchJob *job = new LogFetchJob(this);
if (m_initStatus == InitStatusDisabled) {
finishFetchJob(job, LogEntries());
return job;
}
// FIXME: injection attacks possible?
QString what = "*";
if (sampleRate == Types::SampleRateAny) {
@ -386,7 +421,6 @@ LogFetchJob *LogEngineInfluxDB::fetchLogEntries(const QStringList &sources, cons
QStringList escapedSourced;
foreach (const QString &source, sources) {
QString retentionPolicy;
switch (sampleRate) {
case Types::SampleRate1Min:
@ -456,9 +490,17 @@ LogFetchJob *LogEngineInfluxDB::fetchLogEntries(const QStringList &sources, cons
QNetworkRequest request = createQueryRequest(query);
qCDebug(dcLogEngine()) << "Request:" << request.url() << filter;
QNetworkReply *reply = m_nam->get(request);
connect(reply, &QNetworkReply::finished, this, [=](){
connect(reply, &QNetworkReply::finished, this, [=]() {
reply->deleteLater();
if (m_initStatus == InitStatusDisabled) {
finishFetchJob(job, QList<LogEntry>());
return;
}
if (reply->error() != QNetworkReply::NoError) {
if (m_initStatus == InitStatusDisabled) {
finishFetchJob(job, QList<LogEntry>());
return;
}
qCWarning(dcLogEngine()) << "Unable to obtain entries from influxdb" << reply->error() << reply->readAll();
finishFetchJob(job, QList<LogEntry>());
return;
@ -507,20 +549,18 @@ LogFetchJob *LogEngineInfluxDB::fetchLogEntries(const QStringList &sources, cons
bool LogEngineInfluxDB::jobsRunning() const
{
// qCDebug(dcLogEngine()) << "Jobs running:" << m_initStatus << m_writeQueue.count() << m_initQueryQueue.count() << m_queryQueue.count() << m_currentWriteReply;
return m_currentInitQuery
|| !m_initQueryQueue.isEmpty()
|| m_currentQuery
|| !m_queryQueue.isEmpty()
|| m_currentWriteReply
|| !m_writeQueue.isEmpty();
// qCDebug(dcLogEngine()) << "Jobs running:" << m_initStatus << m_writeQueue.count() << m_initQueryQueue.count() << m_queryQueue.count() << m_currentWriteReply;
return m_currentInitQuery || !m_initQueryQueue.isEmpty() || m_currentQuery || !m_queryQueue.isEmpty() || m_currentWriteReply || !m_writeQueue.isEmpty();
}
void LogEngineInfluxDB::clear(const QString &source)
{
if (m_initStatus == InitStatusDisabled)
return;
qCDebug(dcLogEngine()) << "Clearing entries for source:" << source;
QueryJob *job = query(QString("DROP MEASUREMENT \"%1\"").arg(source));
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results){
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results) {
if (status != QNetworkReply::NoError) {
qCWarning(dcLogEngine()) << "Unable to clear log entries for" << source << ":" << qUtf8Printable(QJsonDocument::fromVariant(results).toJson());
}
@ -552,8 +592,14 @@ void LogEngineInfluxDB::initDB()
void LogEngineInfluxDB::createDB()
{
if (m_initStatus == InitStatusDisabled)
return;
QueryJob *job = query("SHOW DATABASES", false, true);
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results){
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results) {
if (m_initStatus == InitStatusDisabled)
return;
if (status != QNetworkReply::NoError) {
if (status == QNetworkReply::ConnectionRefusedError) {
// Influx not up yet? trying again in 5 secs...
@ -563,6 +609,10 @@ void LogEngineInfluxDB::createDB()
}
return;
}
if (m_initStatus == InitStatusDisabled)
return;
qCCritical(dcLogEngine()) << "Unable to connect to InfluxDB";
m_initStatus = InitStatusFailure;
return;
@ -604,6 +654,9 @@ void LogEngineInfluxDB::createDB()
QueryJob *job = query(QString("CREATE DATABASE %1").arg(m_dbName), true, true);
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &result) {
if (m_initStatus == InitStatusDisabled)
return;
if (status != QNetworkReply::NoError) {
qCCritical(dcLogEngine()) << "Unable to create" << m_dbName << "database in influxdb:" << QJsonDocument::fromVariant(result).toJson();
m_initStatus = InitStatusFailure;
@ -617,8 +670,14 @@ void LogEngineInfluxDB::createDB()
void LogEngineInfluxDB::createRetentionPolicies()
{
if (m_initStatus == InitStatusDisabled)
return;
QueryJob *job = query("SHOW RETENTION POLICIES", false, true);
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results){
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status, const QVariantList &results) {
if (m_initStatus == InitStatusDisabled) {
return;
}
if (status != QNetworkReply::NoError) {
qCCritical(dcLogEngine()) << "Unable to query retention policies.";
m_initStatus = InitStatusFailure;
@ -679,7 +738,10 @@ void LogEngineInfluxDB::createRetentionPolicies()
if (!discreteRPFound) {
qCInfo(dcLogEngine()) << "Creating discrete nymea retention policy in influxdb";
QueryJob *job = query(QString("CREATE RETENTION POLICY discrete ON %1 DURATION 8760h REPLICATION 1").arg(m_dbName), true, true);
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status){
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status) {
if (m_initStatus == InitStatusDisabled)
return;
if (status != QNetworkReply::NoError) {
qCWarning(dcLogEngine()) << "Unable to create discrete retention policy in influxdb.";
m_initStatus = InitStatusFailure;
@ -693,7 +755,10 @@ void LogEngineInfluxDB::createRetentionPolicies()
if (!liveRPFound) {
qCInfo(dcLogEngine()) << "Creating live nymea retention policy in influxdb";
QueryJob *job = query(QString("CREATE RETENTION POLICY live ON %1 DURATION 24h REPLICATION 1").arg(m_dbName), true, true);
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status){
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status) {
if (m_initStatus == InitStatusDisabled)
return;
if (status != QNetworkReply::NoError) {
qCWarning(dcLogEngine()) << "Unable to create live retention policy in influxdb.";
m_initStatus = InitStatusFailure;
@ -707,7 +772,10 @@ void LogEngineInfluxDB::createRetentionPolicies()
if (!minutesRPFound) {
qCInfo(dcLogEngine()) << "Creating minutes nymea retention policy in influxdb";
QueryJob *job = query(QString("CREATE RETENTION POLICY minutes ON %1 DURATION 168h REPLICATION 1").arg(m_dbName), true, true);
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status){
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status) {
if (m_initStatus == InitStatusDisabled)
return;
if (status != QNetworkReply::NoError) {
qCWarning(dcLogEngine()) << "Unable to create minutes retention policy in influxdb.";
m_initStatus = InitStatusFailure;
@ -721,7 +789,10 @@ void LogEngineInfluxDB::createRetentionPolicies()
if (!hoursRPFound) {
qCInfo(dcLogEngine()) << "Creating hours nymea retention policy in influxdb";
QueryJob *job = query(QString("CREATE RETENTION POLICY hours ON %1 DURATION 26280h REPLICATION 1").arg(m_dbName), true, true);
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status){
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status) {
if (m_initStatus == InitStatusDisabled)
return;
if (status != QNetworkReply::NoError) {
qCWarning(dcLogEngine()) << "Unable to create hours retention policy in influxdb.";
m_initStatus = InitStatusFailure;
@ -735,7 +806,10 @@ void LogEngineInfluxDB::createRetentionPolicies()
if (!daysRPFound) {
qCInfo(dcLogEngine()) << "Creating days nymea retention policy in influxdb";
QueryJob *job = query(QString("CREATE RETENTION POLICY days ON %1 DURATION 175200h REPLICATION 1").arg(m_dbName), true, true);
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status){
connect(job, &QueryJob::finished, this, [=](QNetworkReply::NetworkError status) {
if (m_initStatus == InitStatusDisabled)
return;
if (status != QNetworkReply::NoError) {
qCWarning(dcLogEngine()) << "Unable to create days retention policy in influxdb.";
m_initStatus = InitStatusFailure;
@ -748,7 +822,8 @@ void LogEngineInfluxDB::createRetentionPolicies()
m_initStatus = InitStatusOK;
qCDebug(dcLogEngine()) << "Influx initialized. Starting to process log entries (" << m_initQueryQueue.count() << m_queryQueue.count() << m_writeQueue.count() << "in queue)";
qCDebug(dcLogEngine()) << "Influx initialized. Starting to process log entries (" << m_initQueryQueue.count() << m_queryQueue.count() << m_writeQueue.count()
<< "in queue)";
processQueues();
});
}
@ -819,18 +894,15 @@ QueryJob *LogEngineInfluxDB::query(const QString &query, bool post, bool isInit)
return job;
}
QueryJob::QueryJob(const QNetworkRequest &request, bool post, bool isInit, QObject *parent):
QObject(parent),
m_request(request),
m_post(post),
m_isInit(isInit)
{
}
QueryJob::QueryJob(const QNetworkRequest &request, bool post, bool isInit, QObject *parent)
: QObject(parent)
, m_request(request)
, m_post(post)
, m_isInit(isInit)
{}
void QueryJob::finish(QNetworkReply::NetworkError status, const QVariantList &results)
{
QMetaObject::invokeMethod(this, "finished", Qt::QueuedConnection, Q_ARG(QNetworkReply::NetworkError, status), Q_ARG(QVariantList, results));
QMetaObject::invokeMethod(this, "deleteLater", Qt::QueuedConnection);
}