Improve log DB housekeeping performance

Getting row count on a db with approx 200000 entries on an RPi takes
about 500ms. To avoid this, this branch keeps better track of entries
in the DB and only queries DB count if we can't calculate it ourselves.
Trimming itself takes some 150ms. To reduce those calls it changes the
threshold of when to trim the DB from a fixed value of 100 to 1% of
maxDBSize.

Last but not least, getLogEntry() calls are now prioritized over
appendLogEntry() calls in order to stay responsive to client apps even
if the DB is overloaded with a huge job queue.

If the job queue grows to over 1000 jobs, logs of the same device/type
will be discared to avoid log flooding.
This commit is contained in:
Michael Zanetti 2020-01-01 23:42:02 +01:00
parent 6cdcd47f9b
commit 03c8e8d114
4 changed files with 114 additions and 77 deletions

View File

@ -155,14 +155,10 @@ LogEngine::LogEngine(const QString &driver, const QString &dbName, const QString
m_db = QSqlDatabase::addDatabase(driver, "logs");
m_db.setDatabaseName(dbName);
m_db.setHostName(hostname);
m_overflow = 100;
m_trimSize = qRound(0.01 * m_dbMaxSize);
m_maxQueueLength = 1000;
if (QCoreApplication::instance()->organizationName() == "nymea-test") {
m_dbMaxSize = 20;
qCDebug(dcLogEngine) << "Set logging dab max size to" << m_dbMaxSize << "for testing.";
}
qCDebug(dcLogEngine) << "Opening logging database" << m_db.databaseName();
qCDebug(dcLogEngine) << "Opening logging database" << m_db.databaseName() << "(Max size:" << m_dbMaxSize << "trim size:" << m_trimSize << ")";
if (!m_db.isValid()) {
qCWarning(dcLogEngine) << "Database not valid:" << m_db.lastError().driverText() << m_db.lastError().databaseText();
@ -175,12 +171,12 @@ LogEngine::LogEngine(const QString &driver, const QString &dbName, const QString
rotate(m_db.databaseName());
if (!initDB(username, password)) {
qCWarning(dcLogEngine()) << "Error fixing log database. Giving up. Logs can't be stored.";
return;
}
}
}
connect(&m_jobWatcher, SIGNAL(finished()), this, SLOT(handleJobFinished()));
checkDBSize();
}
@ -221,7 +217,7 @@ LogEntriesFetchJob *LogEngine::fetchLogEntries(const LogFilter &filter)
DatabaseJob *job = new DatabaseJob(m_db, queryString, filter.values());
LogEntriesFetchJob *fetchJob = new LogEntriesFetchJob(this);
connect(job, &DatabaseJob::finished, this, [this, job, fetchJob](){
connect(job, &DatabaseJob::finished, this, [job, fetchJob](){
fetchJob->deleteLater();
if (job->error().isValid()) {
qCWarning(dcLogEngine) << "Error fetching log entries. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText();
@ -247,7 +243,7 @@ LogEntriesFetchJob *LogEngine::fetchLogEntries(const LogFilter &filter)
fetchJob->finished();
});
enqueJob(job);
enqueJob(job, true);
return fetchJob;
}
@ -258,7 +254,7 @@ DevicesFetchJob *LogEngine::fetchDevices()
DatabaseJob *job = new DatabaseJob(m_db, queryString);
DevicesFetchJob *fetchJob = new DevicesFetchJob(this);
connect(job, &DatabaseJob::finished, this, [this, job, fetchJob](){
connect(job, &DatabaseJob::finished, this, [job, fetchJob](){
fetchJob->deleteLater();
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine()) << "Error fetching device entries from log database:" << job->error().driverText() << job->error().databaseText();
@ -271,6 +267,7 @@ DevicesFetchJob *LogEngine::fetchDevices()
}
fetchJob->finished();
});
enqueJob(job, true);
return fetchJob;
}
@ -279,11 +276,11 @@ bool LogEngine::jobsRunning() const
return !m_jobQueue.isEmpty() || m_currentJob;
}
void LogEngine::setMaxLogEntries(int maxLogEntries, int overflow)
void LogEngine::setMaxLogEntries(int maxLogEntries, int trimSize)
{
m_dbMaxSize = maxLogEntries;
m_overflow = overflow;
checkDBSize();
m_trimSize = trimSize;
trim();
}
/*! Removes all entries from the database. This method will be used for the tests. */
@ -298,7 +295,9 @@ void LogEngine::clearDatabase()
connect(job, &DatabaseJob::finished, this, [this, job](){
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Could not clear logging database. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText();
return;
}
m_entryCount = 0;
emit logDatabaseUpdated();
});
@ -319,6 +318,7 @@ void LogEngine::logEvent(const Event &event)
Logging::LoggingSource sourceType;
if (event.isStateChangeEvent()) {
sourceType = Logging::LoggingSourceStates;
// There should only be one param
if (!event.params().isEmpty())
valueList << event.params().first().value();
@ -430,9 +430,11 @@ void LogEngine::removeDeviceLogs(const DeviceId &deviceId)
connect(job, &DatabaseJob::finished, this, [this, job, deviceId](){
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting log entries from device" << deviceId.toString() << ". Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText();
} else {
emit logDatabaseUpdated();
return;
}
emit logDatabaseUpdated();
checkDBSize();
});
enqueJob(job);
@ -450,9 +452,11 @@ void LogEngine::removeRuleLogs(const RuleId &ruleId)
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting log entries from rule" << ruleId.toString() << ". Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText();
} else {
emit logDatabaseUpdated();
return;
}
emit logDatabaseUpdated();
checkDBSize();
});
enqueJob(job);
@ -473,8 +477,27 @@ void LogEngine::appendLogEntry(const LogEntry &entry)
DatabaseJob *job = new DatabaseJob(m_db, queryString);
// Check for log flooding. If we are exceeding the queue we'll start flagging log events of a certain type.
// If we'll get more log events of the same type while the queue is still exceededd, we'll discard the old
// ones and queue up the new one instead. The most recent one is more important (i.e. we don't want to lose
// the last event in a series).
if (m_jobQueue.count() > m_maxQueueLength) {
qCDebug(dcLogEngine()) << "An excessive amount of data is being logged. (" << m_jobQueue.length() << "jobs in the queue)";
if (m_flaggedJobs.contains(entry.typeId().toString() + entry.deviceId().toString())) {
if (m_flaggedJobs.value(entry.typeId().toString() + entry.deviceId().toString()).count() > 10) {
qCWarning(dcLogEngine()) << "Discarding log entry because of excessive log flooding.";
DatabaseJob *job = m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].takeFirst();
int jobIdx = m_jobQueue.indexOf(job);
m_jobQueue.takeAt(jobIdx)->deleteLater();
}
}
m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].append(job);
}
connect(job, &DatabaseJob::finished, this, [this, job, entry](){
m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].removeAll(job);
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error writing log entry. Driver error:" << job->error().driverText() << "Database error:" << job->error().number() << job->error().databaseText();
qCWarning(dcLogEngine) << entry;
@ -482,11 +505,11 @@ void LogEngine::appendLogEntry(const LogEntry &entry)
return;
}
emit logEntryAdded(entry);
if (++m_entryCount > m_dbMaxSize + m_overflow) {
checkDBSize();
}
m_entryCount++;
trim();
});
enqueJob(job);
@ -494,57 +517,52 @@ void LogEngine::appendLogEntry(const LogEntry &entry)
void LogEngine::checkDBSize()
{
if (m_dbMaxSize == -1) {
// No tripping required
return;
}
QDateTime startTime = QDateTime::currentDateTime();
QString queryString = "SELECT COUNT(*) FROM entries;";
DatabaseJob *job = new DatabaseJob(m_db, queryString);
connect(job, &DatabaseJob::finished, this, [this, job, startTime](){
DatabaseJob *job = new DatabaseJob(m_db, "SELECT COUNT(*) FROM entries;");
connect(job, &DatabaseJob::finished, this, [this, job](){
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine()) << "Failed to query entry count in db:" << job->error().databaseText();
return;
}
if (job->results().isEmpty()) {
qCWarning(dcLogEngine()) << "Failed retrieving entry count.";
qCWarning(dcLogEngine()) << "Error fetching log DB size. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText();
m_entryCount = 0;
return;
}
m_entryCount = job->results().first().value(0).toInt();
if (m_entryCount <= m_dbMaxSize) {
return;
}
// keep only the latest m_dbMaxSize entries
if (!m_trimWarningPrinted) {
qCDebug(dcLogEngine) << "Deleting oldest entries" << (m_entryCount - m_dbMaxSize) << "and keep only the latest" << m_dbMaxSize << "entries.";
m_trimWarningPrinted = true;
}
QString queryDeleteString = QString("DELETE FROM entries WHERE ROWID IN (SELECT ROWID FROM entries ORDER BY timestamp DESC LIMIT -1 OFFSET %1);").arg(QString::number(m_dbMaxSize));
DatabaseJob *deleteJob = new DatabaseJob(m_db, queryDeleteString);
connect(deleteJob, &DatabaseJob::finished, this, [this, deleteJob,startTime](){
if (deleteJob->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting oldest log entries to keep size. Driver error:" << deleteJob->error().driverText() << "Database error:" << deleteJob->error().databaseText();
}
m_entryCount = m_dbMaxSize;
qCDebug(dcLogEngine()) << "Ran housekeeping on log database in" << startTime.msecsTo(QDateTime::currentDateTime()) << "ms.";
emit logDatabaseUpdated();
});
enqueJob(deleteJob);
});
enqueJob(job);
enqueJob(job, true);
}
void LogEngine::enqueJob(DatabaseJob *job)
void LogEngine::trim()
{
m_jobQueue.append(job);
if (m_dbMaxSize == -1 || m_entryCount < m_dbMaxSize) {
// No trimming required
return;
}
QDateTime startTime = QDateTime::currentDateTime();
QString queryDeleteString = QString("DELETE FROM entries WHERE ROWID IN (SELECT ROWID FROM entries ORDER BY timestamp DESC LIMIT -1 OFFSET %1);").arg(QString::number(m_dbMaxSize - m_trimSize));
DatabaseJob *deleteJob = new DatabaseJob(m_db, queryDeleteString);
connect(deleteJob, &DatabaseJob::finished, this, [this, deleteJob, startTime](){
if (deleteJob->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting oldest log entries to keep size. Driver error:" << deleteJob->error().driverText() << "Database error:" << deleteJob->error().databaseText();
}
qCDebug(dcLogEngine()) << "Ran housekeeping on log database in" << startTime.msecsTo(QDateTime::currentDateTime()) << "ms. (Deleted" << m_entryCount - (m_dbMaxSize - m_trimSize) << "entries)";
m_entryCount = m_dbMaxSize - m_trimSize;
emit logDatabaseUpdated();
});
qCDebug(dcLogEngine()) << "Scheduling housekeeping job.";
enqueJob(deleteJob, true);
}
void LogEngine::enqueJob(DatabaseJob *job, bool priority)
{
if (priority) {
m_jobQueue.prepend(job);
} else {
m_jobQueue.append(job);
}
qCDebug(dcLogEngine()) << "Scheduled job at position" << (priority ? 0 : m_jobQueue.count() - 1) << "(" << m_jobQueue.count() << "jobs in the queue)";
processQueue();
}
@ -569,7 +587,9 @@ void LogEngine::processQueue()
m_dbMalformed = false;
}
DatabaseJob *job = m_jobQueue.takeFirst();
qCDebug(dcLogEngine()) << "Processing DB queue. (" << m_jobQueue.count() << "jobs left in queue," << m_entryCount << "entries in DB)";
m_currentJob = job;
QFuture<DatabaseJob*> future = QtConcurrent::run([job](){
@ -603,6 +623,8 @@ void LogEngine::handleJobFinished()
job->finished();
job->deleteLater();
m_currentJob = nullptr;
qCDebug(dcLogEngine()) << "DB job finished. (" << m_entryCount << "entries in DB)";
processQueue();
}

View File

@ -56,7 +56,7 @@ public:
bool jobsRunning() const;
void setMaxLogEntries(int maxLogEntries, int overflow);
void setMaxLogEntries(int maxLogEntries, int trimSize);
void clearDatabase();
void logSystemEvent(const QDateTime &dateTime, bool active, Logging::LoggingLevel level = Logging::LoggingLevelInfo);
@ -88,8 +88,9 @@ private:
private slots:
void checkDBSize();
void trim();
void enqueJob(DatabaseJob *job);
void enqueJob(DatabaseJob *job, bool priority = false);
void processQueue();
void handleJobFinished();
@ -98,11 +99,14 @@ private:
QString m_username;
QString m_password;
int m_dbMaxSize;
int m_overflow;
bool m_trimWarningPrinted = false;
int m_trimSize;
int m_entryCount = 0;
bool m_dbMalformed = false;
// When maxQueueLength is exceeded, jobs will be flagged and discarded if this source logs more events
int m_maxQueueLength;
QHash<QString, QList<DatabaseJob*>> m_flaggedJobs;
QList<DatabaseJob*> m_jobQueue;
DatabaseJob *m_currentJob = nullptr;
QFutureWatcher<DatabaseJob*> m_jobWatcher;

View File

@ -925,7 +925,7 @@ void NymeaCore::deviceManagerLoaded()
qCDebug(dcApplication()) << "Starting housekeeping...";
QDateTime startTime = QDateTime::currentDateTime();
DevicesFetchJob *job = m_logger->fetchDevices();
connect(job, &DevicesFetchJob::finished, this, [this, job, startTime](){
connect(job, &DevicesFetchJob::finished, m_deviceManager, [this, job, startTime](){
foreach (const DeviceId &deviceId, job->results()) {
if (!m_deviceManager->findConfiguredDevice(deviceId)) {
qCDebug(dcApplication()) << "Cleaning stale device entries from log DB for device id" << deviceId;

View File

@ -41,6 +41,11 @@ private:
inline void verifyDeviceError(const QVariant &response, Device::DeviceError error = Device::DeviceErrorNoError) {
verifyError(response, "deviceError", enumValueName(error));
}
inline void waitForDBSync() {
while (NymeaCore::instance()->logEngine()->jobsRunning()) {
qApp->processEvents();
}
}
private slots:
void initTestCase();
@ -80,7 +85,8 @@ void TestLogging::initTestCase()
"LogEngine.debug=true\n"
"Tests.debug=true\n"
"MockDevice.debug=true\n"
"DeviceManager.debug=true\n");
"DeviceManager.debug=true\n"
"Application.debug=true\n");
}
void TestLogging::initLogs()
@ -101,6 +107,7 @@ void TestLogging::initLogs()
QVERIFY(logEntries.count() == 0);
restartServer();
NymeaCore::instance()->logEngine()->setMaxLogEntries(1000, 10);
}
void TestLogging::databaseSerializationTest_data()
@ -158,6 +165,8 @@ void TestLogging::systemLogs()
qWarning() << "Clearing logging DB";
clearLoggingDatabase();
waitForDBSync();
QVariantMap params;
params.insert("loggingSources", QVariantList() << enumValueName(Logging::LoggingSourceSystem));
params.insert("eventTypes", QVariantList() << enumValueName(Logging::LoggingEventTypeActiveChange));
@ -610,6 +619,8 @@ void TestLogging::testHouseKeeping()
connect(reply, SIGNAL(finished()), reply, SLOT(deleteLater()));
spy.wait();
waitForDBSync();
params.clear();
params.insert("deviceIds", QVariantList() << deviceId);
response = injectAndWait("Logging.GetLogEntries", params);
@ -623,15 +634,13 @@ void TestLogging::testHouseKeeping()
restartServer();
// Wait for the mock device to complete the setup
do {
qApp->processEvents();
params.clear();
params.insert("deviceId", m_mockDeviceId);
response = injectAndWait("Devices.GetConfiguredDevices", params);
} while (!response.toMap().value("params").toMap().value("devices").toList().first().toMap().value("setupComplete").toBool());
waitForDBSync();
params.clear();
params.insert("deviceIds", QVariantList() << deviceId);
response = injectAndWait("Logging.GetLogEntries", params);
qCDebug(dcTests()) << qUtf8Printable(QJsonDocument::fromVariant(response).toJson());
QVERIFY2(response.toMap().value("status").toString() == QString("success"), "GetLogEntries failed");
QVERIFY2(response.toMap().value("params").toMap().value("logEntries").toList().count() == 0, "Device state change event still in log. Should've been cleaned by housekeeping.");
}
@ -660,6 +669,8 @@ void TestLogging::testLimits()
verifyDeviceError(response);
}
waitForDBSync();
QVariantMap params;
QVariantMap response;