From 03c8e8d1144674b6015be555ff692f9426b92ff4 Mon Sep 17 00:00:00 2001 From: Michael Zanetti Date: Wed, 1 Jan 2020 23:42:02 +0100 Subject: [PATCH] Improve log DB housekeeping performance Getting row count on a db with approx 200000 entries on an RPi takes about 500ms. To avoid this, this branch keeps better track of entries in the DB and only queries DB count if we can't calculate it ourselves. Trimming itself takes some 150ms. To reduce those calls it changes the threshold of when to trim the DB from a fixed value of 100 to 1% of maxDBSize. Last but not least, getLogEntry() calls are now prioritized over appendLogEntry() calls in order to stay responsive to client apps even if the DB is overloaded with a huge job queue. If the job queue grows to over 1000 jobs, logs of the same device/type will be discared to avoid log flooding. --- libnymea-core/logging/logengine.cpp | 150 ++++++++++++++++------------ libnymea-core/logging/logengine.h | 12 ++- libnymea-core/nymeacore.cpp | 2 +- tests/auto/logging/testlogging.cpp | 27 +++-- 4 files changed, 114 insertions(+), 77 deletions(-) diff --git a/libnymea-core/logging/logengine.cpp b/libnymea-core/logging/logengine.cpp index c205ab1e..6a98d2eb 100644 --- a/libnymea-core/logging/logengine.cpp +++ b/libnymea-core/logging/logengine.cpp @@ -155,14 +155,10 @@ LogEngine::LogEngine(const QString &driver, const QString &dbName, const QString m_db = QSqlDatabase::addDatabase(driver, "logs"); m_db.setDatabaseName(dbName); m_db.setHostName(hostname); - m_overflow = 100; + m_trimSize = qRound(0.01 * m_dbMaxSize); + m_maxQueueLength = 1000; - if (QCoreApplication::instance()->organizationName() == "nymea-test") { - m_dbMaxSize = 20; - qCDebug(dcLogEngine) << "Set logging dab max size to" << m_dbMaxSize << "for testing."; - } - - qCDebug(dcLogEngine) << "Opening logging database" << m_db.databaseName(); + qCDebug(dcLogEngine) << "Opening logging database" << m_db.databaseName() << "(Max size:" << m_dbMaxSize << "trim size:" << m_trimSize << ")"; if (!m_db.isValid()) { qCWarning(dcLogEngine) << "Database not valid:" << m_db.lastError().driverText() << m_db.lastError().databaseText(); @@ -175,12 +171,12 @@ LogEngine::LogEngine(const QString &driver, const QString &dbName, const QString rotate(m_db.databaseName()); if (!initDB(username, password)) { qCWarning(dcLogEngine()) << "Error fixing log database. Giving up. Logs can't be stored."; + return; } } } connect(&m_jobWatcher, SIGNAL(finished()), this, SLOT(handleJobFinished())); - checkDBSize(); } @@ -221,7 +217,7 @@ LogEntriesFetchJob *LogEngine::fetchLogEntries(const LogFilter &filter) DatabaseJob *job = new DatabaseJob(m_db, queryString, filter.values()); LogEntriesFetchJob *fetchJob = new LogEntriesFetchJob(this); - connect(job, &DatabaseJob::finished, this, [this, job, fetchJob](){ + connect(job, &DatabaseJob::finished, this, [job, fetchJob](){ fetchJob->deleteLater(); if (job->error().isValid()) { qCWarning(dcLogEngine) << "Error fetching log entries. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText(); @@ -247,7 +243,7 @@ LogEntriesFetchJob *LogEngine::fetchLogEntries(const LogFilter &filter) fetchJob->finished(); }); - enqueJob(job); + enqueJob(job, true); return fetchJob; } @@ -258,7 +254,7 @@ DevicesFetchJob *LogEngine::fetchDevices() DatabaseJob *job = new DatabaseJob(m_db, queryString); DevicesFetchJob *fetchJob = new DevicesFetchJob(this); - connect(job, &DatabaseJob::finished, this, [this, job, fetchJob](){ + connect(job, &DatabaseJob::finished, this, [job, fetchJob](){ fetchJob->deleteLater(); if (job->error().type() != QSqlError::NoError) { qCWarning(dcLogEngine()) << "Error fetching device entries from log database:" << job->error().driverText() << job->error().databaseText(); @@ -271,6 +267,7 @@ DevicesFetchJob *LogEngine::fetchDevices() } fetchJob->finished(); }); + enqueJob(job, true); return fetchJob; } @@ -279,11 +276,11 @@ bool LogEngine::jobsRunning() const return !m_jobQueue.isEmpty() || m_currentJob; } -void LogEngine::setMaxLogEntries(int maxLogEntries, int overflow) +void LogEngine::setMaxLogEntries(int maxLogEntries, int trimSize) { m_dbMaxSize = maxLogEntries; - m_overflow = overflow; - checkDBSize(); + m_trimSize = trimSize; + trim(); } /*! Removes all entries from the database. This method will be used for the tests. */ @@ -298,7 +295,9 @@ void LogEngine::clearDatabase() connect(job, &DatabaseJob::finished, this, [this, job](){ if (job->error().type() != QSqlError::NoError) { qCWarning(dcLogEngine) << "Could not clear logging database. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText(); + return; } + m_entryCount = 0; emit logDatabaseUpdated(); }); @@ -319,6 +318,7 @@ void LogEngine::logEvent(const Event &event) Logging::LoggingSource sourceType; if (event.isStateChangeEvent()) { sourceType = Logging::LoggingSourceStates; + // There should only be one param if (!event.params().isEmpty()) valueList << event.params().first().value(); @@ -430,9 +430,11 @@ void LogEngine::removeDeviceLogs(const DeviceId &deviceId) connect(job, &DatabaseJob::finished, this, [this, job, deviceId](){ if (job->error().type() != QSqlError::NoError) { qCWarning(dcLogEngine) << "Error deleting log entries from device" << deviceId.toString() << ". Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText(); - } else { - emit logDatabaseUpdated(); + return; } + + emit logDatabaseUpdated(); + checkDBSize(); }); enqueJob(job); @@ -450,9 +452,11 @@ void LogEngine::removeRuleLogs(const RuleId &ruleId) if (job->error().type() != QSqlError::NoError) { qCWarning(dcLogEngine) << "Error deleting log entries from rule" << ruleId.toString() << ". Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText(); - } else { - emit logDatabaseUpdated(); + return; } + + emit logDatabaseUpdated(); + checkDBSize(); }); enqueJob(job); @@ -473,8 +477,27 @@ void LogEngine::appendLogEntry(const LogEntry &entry) DatabaseJob *job = new DatabaseJob(m_db, queryString); + // Check for log flooding. If we are exceeding the queue we'll start flagging log events of a certain type. + // If we'll get more log events of the same type while the queue is still exceededd, we'll discard the old + // ones and queue up the new one instead. The most recent one is more important (i.e. we don't want to lose + // the last event in a series). + if (m_jobQueue.count() > m_maxQueueLength) { + qCDebug(dcLogEngine()) << "An excessive amount of data is being logged. (" << m_jobQueue.length() << "jobs in the queue)"; + if (m_flaggedJobs.contains(entry.typeId().toString() + entry.deviceId().toString())) { + if (m_flaggedJobs.value(entry.typeId().toString() + entry.deviceId().toString()).count() > 10) { + qCWarning(dcLogEngine()) << "Discarding log entry because of excessive log flooding."; + DatabaseJob *job = m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].takeFirst(); + int jobIdx = m_jobQueue.indexOf(job); + m_jobQueue.takeAt(jobIdx)->deleteLater(); + } + } + m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].append(job); + } + connect(job, &DatabaseJob::finished, this, [this, job, entry](){ + m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].removeAll(job); + if (job->error().type() != QSqlError::NoError) { qCWarning(dcLogEngine) << "Error writing log entry. Driver error:" << job->error().driverText() << "Database error:" << job->error().number() << job->error().databaseText(); qCWarning(dcLogEngine) << entry; @@ -482,11 +505,11 @@ void LogEngine::appendLogEntry(const LogEntry &entry) return; } + emit logEntryAdded(entry); - if (++m_entryCount > m_dbMaxSize + m_overflow) { - checkDBSize(); - } + m_entryCount++; + trim(); }); enqueJob(job); @@ -494,57 +517,52 @@ void LogEngine::appendLogEntry(const LogEntry &entry) void LogEngine::checkDBSize() { - if (m_dbMaxSize == -1) { - // No tripping required - return; - } - QDateTime startTime = QDateTime::currentDateTime(); - QString queryString = "SELECT COUNT(*) FROM entries;"; - - DatabaseJob *job = new DatabaseJob(m_db, queryString); - connect(job, &DatabaseJob::finished, this, [this, job, startTime](){ - + DatabaseJob *job = new DatabaseJob(m_db, "SELECT COUNT(*) FROM entries;"); + connect(job, &DatabaseJob::finished, this, [this, job](){ if (job->error().type() != QSqlError::NoError) { - qCWarning(dcLogEngine()) << "Failed to query entry count in db:" << job->error().databaseText(); - return; - } - if (job->results().isEmpty()) { - qCWarning(dcLogEngine()) << "Failed retrieving entry count."; + qCWarning(dcLogEngine()) << "Error fetching log DB size. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText(); + m_entryCount = 0; return; } m_entryCount = job->results().first().value(0).toInt(); - - if (m_entryCount <= m_dbMaxSize) { - return; - } - - // keep only the latest m_dbMaxSize entries - if (!m_trimWarningPrinted) { - qCDebug(dcLogEngine) << "Deleting oldest entries" << (m_entryCount - m_dbMaxSize) << "and keep only the latest" << m_dbMaxSize << "entries."; - m_trimWarningPrinted = true; - } - QString queryDeleteString = QString("DELETE FROM entries WHERE ROWID IN (SELECT ROWID FROM entries ORDER BY timestamp DESC LIMIT -1 OFFSET %1);").arg(QString::number(m_dbMaxSize)); - - DatabaseJob *deleteJob = new DatabaseJob(m_db, queryDeleteString); - - connect(deleteJob, &DatabaseJob::finished, this, [this, deleteJob,startTime](){ - if (deleteJob->error().type() != QSqlError::NoError) { - qCWarning(dcLogEngine) << "Error deleting oldest log entries to keep size. Driver error:" << deleteJob->error().driverText() << "Database error:" << deleteJob->error().databaseText(); - } - m_entryCount = m_dbMaxSize; - qCDebug(dcLogEngine()) << "Ran housekeeping on log database in" << startTime.msecsTo(QDateTime::currentDateTime()) << "ms."; - - emit logDatabaseUpdated(); - }); - enqueJob(deleteJob); }); - enqueJob(job); - + enqueJob(job, true); } -void LogEngine::enqueJob(DatabaseJob *job) +void LogEngine::trim() { - m_jobQueue.append(job); + if (m_dbMaxSize == -1 || m_entryCount < m_dbMaxSize) { + // No trimming required + return; + } + QDateTime startTime = QDateTime::currentDateTime(); + + QString queryDeleteString = QString("DELETE FROM entries WHERE ROWID IN (SELECT ROWID FROM entries ORDER BY timestamp DESC LIMIT -1 OFFSET %1);").arg(QString::number(m_dbMaxSize - m_trimSize)); + + DatabaseJob *deleteJob = new DatabaseJob(m_db, queryDeleteString); + + connect(deleteJob, &DatabaseJob::finished, this, [this, deleteJob, startTime](){ + if (deleteJob->error().type() != QSqlError::NoError) { + qCWarning(dcLogEngine) << "Error deleting oldest log entries to keep size. Driver error:" << deleteJob->error().driverText() << "Database error:" << deleteJob->error().databaseText(); + } + qCDebug(dcLogEngine()) << "Ran housekeeping on log database in" << startTime.msecsTo(QDateTime::currentDateTime()) << "ms. (Deleted" << m_entryCount - (m_dbMaxSize - m_trimSize) << "entries)"; + m_entryCount = m_dbMaxSize - m_trimSize; + + emit logDatabaseUpdated(); + }); + + qCDebug(dcLogEngine()) << "Scheduling housekeeping job."; + enqueJob(deleteJob, true); +} + +void LogEngine::enqueJob(DatabaseJob *job, bool priority) +{ + if (priority) { + m_jobQueue.prepend(job); + } else { + m_jobQueue.append(job); + } + qCDebug(dcLogEngine()) << "Scheduled job at position" << (priority ? 0 : m_jobQueue.count() - 1) << "(" << m_jobQueue.count() << "jobs in the queue)"; processQueue(); } @@ -569,7 +587,9 @@ void LogEngine::processQueue() m_dbMalformed = false; } + DatabaseJob *job = m_jobQueue.takeFirst(); + qCDebug(dcLogEngine()) << "Processing DB queue. (" << m_jobQueue.count() << "jobs left in queue," << m_entryCount << "entries in DB)"; m_currentJob = job; QFuture future = QtConcurrent::run([job](){ @@ -603,6 +623,8 @@ void LogEngine::handleJobFinished() job->finished(); job->deleteLater(); m_currentJob = nullptr; + + qCDebug(dcLogEngine()) << "DB job finished. (" << m_entryCount << "entries in DB)"; processQueue(); } diff --git a/libnymea-core/logging/logengine.h b/libnymea-core/logging/logengine.h index b15e7ddd..58d3ab2a 100644 --- a/libnymea-core/logging/logengine.h +++ b/libnymea-core/logging/logengine.h @@ -56,7 +56,7 @@ public: bool jobsRunning() const; - void setMaxLogEntries(int maxLogEntries, int overflow); + void setMaxLogEntries(int maxLogEntries, int trimSize); void clearDatabase(); void logSystemEvent(const QDateTime &dateTime, bool active, Logging::LoggingLevel level = Logging::LoggingLevelInfo); @@ -88,8 +88,9 @@ private: private slots: void checkDBSize(); + void trim(); - void enqueJob(DatabaseJob *job); + void enqueJob(DatabaseJob *job, bool priority = false); void processQueue(); void handleJobFinished(); @@ -98,11 +99,14 @@ private: QString m_username; QString m_password; int m_dbMaxSize; - int m_overflow; - bool m_trimWarningPrinted = false; + int m_trimSize; int m_entryCount = 0; bool m_dbMalformed = false; + // When maxQueueLength is exceeded, jobs will be flagged and discarded if this source logs more events + int m_maxQueueLength; + QHash> m_flaggedJobs; + QList m_jobQueue; DatabaseJob *m_currentJob = nullptr; QFutureWatcher m_jobWatcher; diff --git a/libnymea-core/nymeacore.cpp b/libnymea-core/nymeacore.cpp index 6c98bf9c..d4413b8d 100644 --- a/libnymea-core/nymeacore.cpp +++ b/libnymea-core/nymeacore.cpp @@ -925,7 +925,7 @@ void NymeaCore::deviceManagerLoaded() qCDebug(dcApplication()) << "Starting housekeeping..."; QDateTime startTime = QDateTime::currentDateTime(); DevicesFetchJob *job = m_logger->fetchDevices(); - connect(job, &DevicesFetchJob::finished, this, [this, job, startTime](){ + connect(job, &DevicesFetchJob::finished, m_deviceManager, [this, job, startTime](){ foreach (const DeviceId &deviceId, job->results()) { if (!m_deviceManager->findConfiguredDevice(deviceId)) { qCDebug(dcApplication()) << "Cleaning stale device entries from log DB for device id" << deviceId; diff --git a/tests/auto/logging/testlogging.cpp b/tests/auto/logging/testlogging.cpp index 98e40a75..ebeb158d 100644 --- a/tests/auto/logging/testlogging.cpp +++ b/tests/auto/logging/testlogging.cpp @@ -41,6 +41,11 @@ private: inline void verifyDeviceError(const QVariant &response, Device::DeviceError error = Device::DeviceErrorNoError) { verifyError(response, "deviceError", enumValueName(error)); } + inline void waitForDBSync() { + while (NymeaCore::instance()->logEngine()->jobsRunning()) { + qApp->processEvents(); + } + } private slots: void initTestCase(); @@ -80,7 +85,8 @@ void TestLogging::initTestCase() "LogEngine.debug=true\n" "Tests.debug=true\n" "MockDevice.debug=true\n" - "DeviceManager.debug=true\n"); + "DeviceManager.debug=true\n" + "Application.debug=true\n"); } void TestLogging::initLogs() @@ -101,6 +107,7 @@ void TestLogging::initLogs() QVERIFY(logEntries.count() == 0); restartServer(); + NymeaCore::instance()->logEngine()->setMaxLogEntries(1000, 10); } void TestLogging::databaseSerializationTest_data() @@ -158,6 +165,8 @@ void TestLogging::systemLogs() qWarning() << "Clearing logging DB"; clearLoggingDatabase(); + waitForDBSync(); + QVariantMap params; params.insert("loggingSources", QVariantList() << enumValueName(Logging::LoggingSourceSystem)); params.insert("eventTypes", QVariantList() << enumValueName(Logging::LoggingEventTypeActiveChange)); @@ -610,6 +619,8 @@ void TestLogging::testHouseKeeping() connect(reply, SIGNAL(finished()), reply, SLOT(deleteLater())); spy.wait(); + waitForDBSync(); + params.clear(); params.insert("deviceIds", QVariantList() << deviceId); response = injectAndWait("Logging.GetLogEntries", params); @@ -623,15 +634,13 @@ void TestLogging::testHouseKeeping() restartServer(); - // Wait for the mock device to complete the setup - do { - qApp->processEvents(); - params.clear(); - params.insert("deviceId", m_mockDeviceId); - response = injectAndWait("Devices.GetConfiguredDevices", params); - } while (!response.toMap().value("params").toMap().value("devices").toList().first().toMap().value("setupComplete").toBool()); + waitForDBSync(); + params.clear(); + params.insert("deviceIds", QVariantList() << deviceId); response = injectAndWait("Logging.GetLogEntries", params); + qCDebug(dcTests()) << qUtf8Printable(QJsonDocument::fromVariant(response).toJson()); + QVERIFY2(response.toMap().value("status").toString() == QString("success"), "GetLogEntries failed"); QVERIFY2(response.toMap().value("params").toMap().value("logEntries").toList().count() == 0, "Device state change event still in log. Should've been cleaned by housekeeping."); } @@ -660,6 +669,8 @@ void TestLogging::testLimits() verifyDeviceError(response); } + waitForDBSync(); + QVariantMap params; QVariantMap response;