diff --git a/libnymea-core/logging/logengine.cpp b/libnymea-core/logging/logengine.cpp index 0313154d..249dbf3b 100644 --- a/libnymea-core/logging/logengine.cpp +++ b/libnymea-core/logging/logengine.cpp @@ -155,14 +155,10 @@ LogEngine::LogEngine(const QString &driver, const QString &dbName, const QString m_db = QSqlDatabase::addDatabase(driver, "logs"); m_db.setDatabaseName(dbName); m_db.setHostName(hostname); - m_overflow = 100; + m_trimSize = qRound(0.01 * m_dbMaxSize); + m_maxQueueLength = 1000; - if (QCoreApplication::instance()->organizationName() == "nymea-test") { - m_dbMaxSize = 20; - qCDebug(dcLogEngine) << "Set logging dab max size to" << m_dbMaxSize << "for testing."; - } - - qCDebug(dcLogEngine) << "Opening logging database" << m_db.databaseName(); + qCDebug(dcLogEngine) << "Opening logging database" << m_db.databaseName() << "(Max size:" << m_dbMaxSize << "trim size:" << m_trimSize << ")"; if (!m_db.isValid()) { qCWarning(dcLogEngine) << "Database not valid:" << m_db.lastError().driverText() << m_db.lastError().databaseText(); @@ -175,12 +171,12 @@ LogEngine::LogEngine(const QString &driver, const QString &dbName, const QString rotate(m_db.databaseName()); if (!initDB(username, password)) { qCWarning(dcLogEngine()) << "Error fixing log database. Giving up. Logs can't be stored."; + return; } } } connect(&m_jobWatcher, SIGNAL(finished()), this, SLOT(handleJobFinished())); - checkDBSize(); } @@ -221,7 +217,7 @@ LogEntriesFetchJob *LogEngine::fetchLogEntries(const LogFilter &filter) DatabaseJob *job = new DatabaseJob(m_db, queryString, filter.values()); LogEntriesFetchJob *fetchJob = new LogEntriesFetchJob(this); - connect(job, &DatabaseJob::finished, this, [this, job, fetchJob](){ + connect(job, &DatabaseJob::finished, this, [job, fetchJob](){ fetchJob->deleteLater(); if (job->error().isValid()) { qCWarning(dcLogEngine) << "Error fetching log entries. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText(); @@ -247,7 +243,7 @@ LogEntriesFetchJob *LogEngine::fetchLogEntries(const LogFilter &filter) fetchJob->finished(); }); - enqueJob(job); + enqueJob(job, true); return fetchJob; } @@ -258,7 +254,7 @@ DevicesFetchJob *LogEngine::fetchDevices() DatabaseJob *job = new DatabaseJob(m_db, queryString); DevicesFetchJob *fetchJob = new DevicesFetchJob(this); - connect(job, &DatabaseJob::finished, this, [this, job, fetchJob](){ + connect(job, &DatabaseJob::finished, this, [job, fetchJob](){ fetchJob->deleteLater(); if (job->error().type() != QSqlError::NoError) { qCWarning(dcLogEngine()) << "Error fetching device entries from log database:" << job->error().driverText() << job->error().databaseText(); @@ -271,6 +267,7 @@ DevicesFetchJob *LogEngine::fetchDevices() } fetchJob->finished(); }); + enqueJob(job, true); return fetchJob; } @@ -279,11 +276,11 @@ bool LogEngine::jobsRunning() const return !m_jobQueue.isEmpty() || m_currentJob; } -void LogEngine::setMaxLogEntries(int maxLogEntries, int overflow) +void LogEngine::setMaxLogEntries(int maxLogEntries, int trimSize) { m_dbMaxSize = maxLogEntries; - m_overflow = overflow; - checkDBSize(); + m_trimSize = trimSize; + trim(); } /*! Removes all entries from the database. This method will be used for the tests. */ @@ -298,7 +295,9 @@ void LogEngine::clearDatabase() connect(job, &DatabaseJob::finished, this, [this, job](){ if (job->error().type() != QSqlError::NoError) { qCWarning(dcLogEngine) << "Could not clear logging database. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText(); + return; } + m_entryCount = 0; emit logDatabaseUpdated(); }); @@ -319,6 +318,7 @@ void LogEngine::logEvent(const Event &event) Logging::LoggingSource sourceType; if (event.isStateChangeEvent()) { sourceType = Logging::LoggingSourceStates; + // There should only be one param if (!event.params().isEmpty()) valueList << event.params().first().value(); @@ -430,9 +430,11 @@ void LogEngine::removeDeviceLogs(const DeviceId &deviceId) connect(job, &DatabaseJob::finished, this, [this, job, deviceId](){ if (job->error().type() != QSqlError::NoError) { qCWarning(dcLogEngine) << "Error deleting log entries from device" << deviceId.toString() << ". Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText(); - } else { - emit logDatabaseUpdated(); + return; } + + emit logDatabaseUpdated(); + checkDBSize(); }); enqueJob(job); @@ -450,9 +452,11 @@ void LogEngine::removeRuleLogs(const RuleId &ruleId) if (job->error().type() != QSqlError::NoError) { qCWarning(dcLogEngine) << "Error deleting log entries from rule" << ruleId.toString() << ". Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText(); - } else { - emit logDatabaseUpdated(); + return; } + + emit logDatabaseUpdated(); + checkDBSize(); }); enqueJob(job); @@ -473,8 +477,27 @@ void LogEngine::appendLogEntry(const LogEntry &entry) DatabaseJob *job = new DatabaseJob(m_db, queryString); + // Check for log flooding. If we are exceeding the queue we'll start flagging log events of a certain type. + // If we'll get more log events of the same type while the queue is still exceededd, we'll discard the old + // ones and queue up the new one instead. The most recent one is more important (i.e. we don't want to lose + // the last event in a series). + if (m_jobQueue.count() > m_maxQueueLength) { + qCDebug(dcLogEngine()) << "An excessive amount of data is being logged. (" << m_jobQueue.length() << "jobs in the queue)"; + if (m_flaggedJobs.contains(entry.typeId().toString() + entry.deviceId().toString())) { + if (m_flaggedJobs.value(entry.typeId().toString() + entry.deviceId().toString()).count() > 10) { + qCWarning(dcLogEngine()) << "Discarding log entry because of excessive log flooding."; + DatabaseJob *job = m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].takeFirst(); + int jobIdx = m_jobQueue.indexOf(job); + m_jobQueue.takeAt(jobIdx)->deleteLater(); + } + } + m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].append(job); + } + connect(job, &DatabaseJob::finished, this, [this, job, entry](){ + m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].removeAll(job); + if (job->error().type() != QSqlError::NoError) { qCWarning(dcLogEngine) << "Error writing log entry. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText(); qCWarning(dcLogEngine) << entry; @@ -482,11 +505,11 @@ void LogEngine::appendLogEntry(const LogEntry &entry) return; } + emit logEntryAdded(entry); - if (++m_entryCount > m_dbMaxSize + m_overflow) { - checkDBSize(); - } + m_entryCount++; + trim(); }); enqueJob(job); @@ -494,57 +517,52 @@ void LogEngine::appendLogEntry(const LogEntry &entry) void LogEngine::checkDBSize() { - if (m_dbMaxSize == -1) { - // No tripping required - return; - } - QDateTime startTime = QDateTime::currentDateTime(); - QString queryString = "SELECT COUNT(*) FROM entries;"; - - DatabaseJob *job = new DatabaseJob(m_db, queryString); - connect(job, &DatabaseJob::finished, this, [this, job, startTime](){ - + DatabaseJob *job = new DatabaseJob(m_db, "SELECT COUNT(*) FROM entries;"); + connect(job, &DatabaseJob::finished, this, [this, job](){ if (job->error().type() != QSqlError::NoError) { - qCWarning(dcLogEngine()) << "Failed to query entry count in db:" << job->error().databaseText(); - return; - } - if (job->results().isEmpty()) { - qCWarning(dcLogEngine()) << "Failed retrieving entry count."; + qCWarning(dcLogEngine()) << "Error fetching log DB size. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText(); + m_entryCount = 0; return; } m_entryCount = job->results().first().value(0).toInt(); - - if (m_entryCount <= m_dbMaxSize) { - return; - } - - // keep only the latest m_dbMaxSize entries - if (!m_trimWarningPrinted) { - qCDebug(dcLogEngine) << "Deleting oldest entries" << (m_entryCount - m_dbMaxSize) << "and keep only the latest" << m_dbMaxSize << "entries."; - m_trimWarningPrinted = true; - } - QString queryDeleteString = QString("DELETE FROM entries WHERE ROWID IN (SELECT ROWID FROM entries ORDER BY timestamp DESC LIMIT -1 OFFSET %1);").arg(QString::number(m_dbMaxSize)); - - DatabaseJob *deleteJob = new DatabaseJob(m_db, queryDeleteString); - - connect(deleteJob, &DatabaseJob::finished, this, [this, deleteJob,startTime](){ - if (deleteJob->error().type() != QSqlError::NoError) { - qCWarning(dcLogEngine) << "Error deleting oldest log entries to keep size. Driver error:" << deleteJob->error().driverText() << "Database error:" << deleteJob->error().databaseText(); - } - m_entryCount = m_dbMaxSize; - qCDebug(dcLogEngine()) << "Ran housekeeping on log database in" << startTime.msecsTo(QDateTime::currentDateTime()) << "ms."; - - emit logDatabaseUpdated(); - }); - enqueJob(deleteJob); }); - enqueJob(job); - + enqueJob(job, true); } -void LogEngine::enqueJob(DatabaseJob *job) +void LogEngine::trim() { - m_jobQueue.append(job); + if (m_dbMaxSize == -1 || m_entryCount < m_dbMaxSize) { + // No trimming required + return; + } + QDateTime startTime = QDateTime::currentDateTime(); + + QString queryDeleteString = QString("DELETE FROM entries WHERE ROWID IN (SELECT ROWID FROM entries ORDER BY timestamp DESC LIMIT -1 OFFSET %1);").arg(QString::number(m_dbMaxSize - m_trimSize)); + + DatabaseJob *deleteJob = new DatabaseJob(m_db, queryDeleteString); + + connect(deleteJob, &DatabaseJob::finished, this, [this, deleteJob, startTime](){ + if (deleteJob->error().type() != QSqlError::NoError) { + qCWarning(dcLogEngine) << "Error deleting oldest log entries to keep size. Driver error:" << deleteJob->error().driverText() << "Database error:" << deleteJob->error().databaseText(); + } + qCDebug(dcLogEngine()) << "Ran housekeeping on log database in" << startTime.msecsTo(QDateTime::currentDateTime()) << "ms. (Deleted" << m_entryCount - (m_dbMaxSize - m_trimSize) << "entries)"; + m_entryCount = m_dbMaxSize - m_trimSize; + + emit logDatabaseUpdated(); + }); + + qCDebug(dcLogEngine()) << "Scheduling housekeeping job."; + enqueJob(deleteJob, true); +} + +void LogEngine::enqueJob(DatabaseJob *job, bool priority) +{ + if (priority) { + m_jobQueue.prepend(job); + } else { + m_jobQueue.append(job); + } + qCDebug(dcLogEngine()) << "Scheduled job at position" << (priority ? 0 : m_jobQueue.count() - 1) << "(" << m_jobQueue.count() << "jobs in the queue)"; processQueue(); } @@ -569,7 +587,9 @@ void LogEngine::processQueue() m_dbMalformed = false; } + DatabaseJob *job = m_jobQueue.takeFirst(); + qCDebug(dcLogEngine()) << "Processing DB queue. (" << m_jobQueue.count() << "jobs left in queue," << m_entryCount << "entries in DB)"; m_currentJob = job; QFuture future = QtConcurrent::run([job](){ @@ -603,6 +623,8 @@ void LogEngine::handleJobFinished() job->finished(); job->deleteLater(); m_currentJob = nullptr; + + qCDebug(dcLogEngine()) << "DB job finished. (" << m_entryCount << "entries in DB)"; processQueue(); } diff --git a/libnymea-core/logging/logengine.h b/libnymea-core/logging/logengine.h index b15e7ddd..58d3ab2a 100644 --- a/libnymea-core/logging/logengine.h +++ b/libnymea-core/logging/logengine.h @@ -56,7 +56,7 @@ public: bool jobsRunning() const; - void setMaxLogEntries(int maxLogEntries, int overflow); + void setMaxLogEntries(int maxLogEntries, int trimSize); void clearDatabase(); void logSystemEvent(const QDateTime &dateTime, bool active, Logging::LoggingLevel level = Logging::LoggingLevelInfo); @@ -88,8 +88,9 @@ private: private slots: void checkDBSize(); + void trim(); - void enqueJob(DatabaseJob *job); + void enqueJob(DatabaseJob *job, bool priority = false); void processQueue(); void handleJobFinished(); @@ -98,11 +99,14 @@ private: QString m_username; QString m_password; int m_dbMaxSize; - int m_overflow; - bool m_trimWarningPrinted = false; + int m_trimSize; int m_entryCount = 0; bool m_dbMalformed = false; + // When maxQueueLength is exceeded, jobs will be flagged and discarded if this source logs more events + int m_maxQueueLength; + QHash> m_flaggedJobs; + QList m_jobQueue; DatabaseJob *m_currentJob = nullptr; QFutureWatcher m_jobWatcher; diff --git a/libnymea-core/nymeacore.cpp b/libnymea-core/nymeacore.cpp index 2df324a2..0dc9cc6b 100644 --- a/libnymea-core/nymeacore.cpp +++ b/libnymea-core/nymeacore.cpp @@ -946,7 +946,7 @@ void NymeaCore::deviceManagerLoaded() qCDebug(dcApplication()) << "Starting housekeeping..."; QDateTime startTime = QDateTime::currentDateTime(); DevicesFetchJob *job = m_logger->fetchDevices(); - connect(job, &DevicesFetchJob::finished, this, [this, job, startTime](){ + connect(job, &DevicesFetchJob::finished, m_deviceManager, [this, job, startTime](){ foreach (const DeviceId &deviceId, job->results()) { if (!m_deviceManager->findConfiguredDevice(deviceId)) { qCDebug(dcApplication()) << "Cleaning stale device entries from log DB for device id" << deviceId; diff --git a/tests/auto/logging/testlogging.cpp b/tests/auto/logging/testlogging.cpp index ba6fda7f..f2f91b40 100644 --- a/tests/auto/logging/testlogging.cpp +++ b/tests/auto/logging/testlogging.cpp @@ -41,6 +41,11 @@ private: inline void verifyDeviceError(const QVariant &response, Device::DeviceError error = Device::DeviceErrorNoError) { verifyError(response, "deviceError", enumValueName(error)); } + inline void waitForDBSync() { + while (NymeaCore::instance()->logEngine()->jobsRunning()) { + qApp->processEvents(); + } + } private slots: void initTestCase(); @@ -80,7 +85,8 @@ void TestLogging::initTestCase() "LogEngine.debug=true\n" "Tests.debug=true\n" "MockDevice.debug=true\n" - "DeviceManager.debug=true\n"); + "DeviceManager.debug=true\n" + "Application.debug=true\n"); } void TestLogging::initLogs() @@ -101,6 +107,7 @@ void TestLogging::initLogs() QVERIFY(logEntries.count() == 0); restartServer(); + NymeaCore::instance()->logEngine()->setMaxLogEntries(1000, 10); } void TestLogging::databaseSerializationTest_data() @@ -158,6 +165,8 @@ void TestLogging::systemLogs() qWarning() << "Clearing logging DB"; clearLoggingDatabase(); + waitForDBSync(); + QVariantMap params; params.insert("loggingSources", QVariantList() << enumValueName(Logging::LoggingSourceSystem)); params.insert("eventTypes", QVariantList() << enumValueName(Logging::LoggingEventTypeActiveChange)); @@ -610,6 +619,8 @@ void TestLogging::testHouseKeeping() connect(reply, SIGNAL(finished()), reply, SLOT(deleteLater())); spy.wait(); + waitForDBSync(); + params.clear(); params.insert("deviceIds", QVariantList() << deviceId); response = injectAndWait("Logging.GetLogEntries", params); @@ -623,15 +634,13 @@ void TestLogging::testHouseKeeping() restartServer(); - // Wait for the mock device to complete the setup - do { - qApp->processEvents(); - params.clear(); - params.insert("deviceId", m_mockDeviceId); - response = injectAndWait("Devices.GetConfiguredDevices", params); - } while (!response.toMap().value("params").toMap().value("devices").toList().first().toMap().value("setupComplete").toBool()); + waitForDBSync(); + params.clear(); + params.insert("deviceIds", QVariantList() << deviceId); response = injectAndWait("Logging.GetLogEntries", params); + qCDebug(dcTests()) << qUtf8Printable(QJsonDocument::fromVariant(response).toJson()); + QVERIFY2(response.toMap().value("status").toString() == QString("success"), "GetLogEntries failed"); QVERIFY2(response.toMap().value("params").toMap().value("logEntries").toList().count() == 0, "Device state change event still in log. Should've been cleaned by housekeeping."); } @@ -660,6 +669,8 @@ void TestLogging::testLimits() verifyDeviceError(response); } + waitForDBSync(); + QVariantMap params; QVariantMap response;