Merge PR #241: Improve log DB housekeeping performance

pull/263/head
Jenkins nymea 2020-01-29 21:44:34 +01:00
commit 56517ce491
4 changed files with 114 additions and 77 deletions

View File

@ -155,14 +155,10 @@ LogEngine::LogEngine(const QString &driver, const QString &dbName, const QString
m_db = QSqlDatabase::addDatabase(driver, "logs");
m_db.setDatabaseName(dbName);
m_db.setHostName(hostname);
m_overflow = 100;
m_trimSize = qRound(0.01 * m_dbMaxSize);
m_maxQueueLength = 1000;
if (QCoreApplication::instance()->organizationName() == "nymea-test") {
m_dbMaxSize = 20;
qCDebug(dcLogEngine) << "Set logging dab max size to" << m_dbMaxSize << "for testing.";
}
qCDebug(dcLogEngine) << "Opening logging database" << m_db.databaseName();
qCDebug(dcLogEngine) << "Opening logging database" << m_db.databaseName() << "(Max size:" << m_dbMaxSize << "trim size:" << m_trimSize << ")";
if (!m_db.isValid()) {
qCWarning(dcLogEngine) << "Database not valid:" << m_db.lastError().driverText() << m_db.lastError().databaseText();
@ -175,12 +171,12 @@ LogEngine::LogEngine(const QString &driver, const QString &dbName, const QString
rotate(m_db.databaseName());
if (!initDB(username, password)) {
qCWarning(dcLogEngine()) << "Error fixing log database. Giving up. Logs can't be stored.";
return;
}
}
}
connect(&m_jobWatcher, SIGNAL(finished()), this, SLOT(handleJobFinished()));
checkDBSize();
}
@ -221,7 +217,7 @@ LogEntriesFetchJob *LogEngine::fetchLogEntries(const LogFilter &filter)
DatabaseJob *job = new DatabaseJob(m_db, queryString, filter.values());
LogEntriesFetchJob *fetchJob = new LogEntriesFetchJob(this);
connect(job, &DatabaseJob::finished, this, [this, job, fetchJob](){
connect(job, &DatabaseJob::finished, this, [job, fetchJob](){
fetchJob->deleteLater();
if (job->error().isValid()) {
qCWarning(dcLogEngine) << "Error fetching log entries. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText();
@ -247,7 +243,7 @@ LogEntriesFetchJob *LogEngine::fetchLogEntries(const LogFilter &filter)
fetchJob->finished();
});
enqueJob(job);
enqueJob(job, true);
return fetchJob;
}
@ -258,7 +254,7 @@ DevicesFetchJob *LogEngine::fetchDevices()
DatabaseJob *job = new DatabaseJob(m_db, queryString);
DevicesFetchJob *fetchJob = new DevicesFetchJob(this);
connect(job, &DatabaseJob::finished, this, [this, job, fetchJob](){
connect(job, &DatabaseJob::finished, this, [job, fetchJob](){
fetchJob->deleteLater();
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine()) << "Error fetching device entries from log database:" << job->error().driverText() << job->error().databaseText();
@ -271,6 +267,7 @@ DevicesFetchJob *LogEngine::fetchDevices()
}
fetchJob->finished();
});
enqueJob(job, true);
return fetchJob;
}
@ -279,11 +276,11 @@ bool LogEngine::jobsRunning() const
return !m_jobQueue.isEmpty() || m_currentJob;
}
void LogEngine::setMaxLogEntries(int maxLogEntries, int overflow)
void LogEngine::setMaxLogEntries(int maxLogEntries, int trimSize)
{
m_dbMaxSize = maxLogEntries;
m_overflow = overflow;
checkDBSize();
m_trimSize = trimSize;
trim();
}
/*! Removes all entries from the database. This method will be used for the tests. */
@ -298,7 +295,9 @@ void LogEngine::clearDatabase()
connect(job, &DatabaseJob::finished, this, [this, job](){
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Could not clear logging database. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText();
return;
}
m_entryCount = 0;
emit logDatabaseUpdated();
});
@ -319,6 +318,7 @@ void LogEngine::logEvent(const Event &event)
Logging::LoggingSource sourceType;
if (event.isStateChangeEvent()) {
sourceType = Logging::LoggingSourceStates;
// There should only be one param
if (!event.params().isEmpty())
valueList << event.params().first().value();
@ -430,9 +430,11 @@ void LogEngine::removeDeviceLogs(const DeviceId &deviceId)
connect(job, &DatabaseJob::finished, this, [this, job, deviceId](){
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting log entries from device" << deviceId.toString() << ". Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText();
} else {
emit logDatabaseUpdated();
return;
}
emit logDatabaseUpdated();
checkDBSize();
});
enqueJob(job);
@ -450,9 +452,11 @@ void LogEngine::removeRuleLogs(const RuleId &ruleId)
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting log entries from rule" << ruleId.toString() << ". Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText();
} else {
emit logDatabaseUpdated();
return;
}
emit logDatabaseUpdated();
checkDBSize();
});
enqueJob(job);
@ -473,8 +477,27 @@ void LogEngine::appendLogEntry(const LogEntry &entry)
DatabaseJob *job = new DatabaseJob(m_db, queryString);
// Check for log flooding. If we are exceeding the queue we'll start flagging log events of a certain type.
// If we'll get more log events of the same type while the queue is still exceededd, we'll discard the old
// ones and queue up the new one instead. The most recent one is more important (i.e. we don't want to lose
// the last event in a series).
if (m_jobQueue.count() > m_maxQueueLength) {
qCDebug(dcLogEngine()) << "An excessive amount of data is being logged. (" << m_jobQueue.length() << "jobs in the queue)";
if (m_flaggedJobs.contains(entry.typeId().toString() + entry.deviceId().toString())) {
if (m_flaggedJobs.value(entry.typeId().toString() + entry.deviceId().toString()).count() > 10) {
qCWarning(dcLogEngine()) << "Discarding log entry because of excessive log flooding.";
DatabaseJob *job = m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].takeFirst();
int jobIdx = m_jobQueue.indexOf(job);
m_jobQueue.takeAt(jobIdx)->deleteLater();
}
}
m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].append(job);
}
connect(job, &DatabaseJob::finished, this, [this, job, entry](){
m_flaggedJobs[entry.typeId().toString() + entry.deviceId().toString()].removeAll(job);
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error writing log entry. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText();
qCWarning(dcLogEngine) << entry;
@ -482,11 +505,11 @@ void LogEngine::appendLogEntry(const LogEntry &entry)
return;
}
emit logEntryAdded(entry);
if (++m_entryCount > m_dbMaxSize + m_overflow) {
checkDBSize();
}
m_entryCount++;
trim();
});
enqueJob(job);
@ -494,57 +517,52 @@ void LogEngine::appendLogEntry(const LogEntry &entry)
void LogEngine::checkDBSize()
{
if (m_dbMaxSize == -1) {
// No tripping required
return;
}
QDateTime startTime = QDateTime::currentDateTime();
QString queryString = "SELECT COUNT(*) FROM entries;";
DatabaseJob *job = new DatabaseJob(m_db, queryString);
connect(job, &DatabaseJob::finished, this, [this, job, startTime](){
DatabaseJob *job = new DatabaseJob(m_db, "SELECT COUNT(*) FROM entries;");
connect(job, &DatabaseJob::finished, this, [this, job](){
if (job->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine()) << "Failed to query entry count in db:" << job->error().databaseText();
return;
}
if (job->results().isEmpty()) {
qCWarning(dcLogEngine()) << "Failed retrieving entry count.";
qCWarning(dcLogEngine()) << "Error fetching log DB size. Driver error:" << job->error().driverText() << "Database error:" << job->error().databaseText();
m_entryCount = 0;
return;
}
m_entryCount = job->results().first().value(0).toInt();
if (m_entryCount <= m_dbMaxSize) {
return;
}
// keep only the latest m_dbMaxSize entries
if (!m_trimWarningPrinted) {
qCDebug(dcLogEngine) << "Deleting oldest entries" << (m_entryCount - m_dbMaxSize) << "and keep only the latest" << m_dbMaxSize << "entries.";
m_trimWarningPrinted = true;
}
QString queryDeleteString = QString("DELETE FROM entries WHERE ROWID IN (SELECT ROWID FROM entries ORDER BY timestamp DESC LIMIT -1 OFFSET %1);").arg(QString::number(m_dbMaxSize));
DatabaseJob *deleteJob = new DatabaseJob(m_db, queryDeleteString);
connect(deleteJob, &DatabaseJob::finished, this, [this, deleteJob,startTime](){
if (deleteJob->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting oldest log entries to keep size. Driver error:" << deleteJob->error().driverText() << "Database error:" << deleteJob->error().databaseText();
}
m_entryCount = m_dbMaxSize;
qCDebug(dcLogEngine()) << "Ran housekeeping on log database in" << startTime.msecsTo(QDateTime::currentDateTime()) << "ms.";
emit logDatabaseUpdated();
});
enqueJob(deleteJob);
});
enqueJob(job);
enqueJob(job, true);
}
void LogEngine::enqueJob(DatabaseJob *job)
void LogEngine::trim()
{
m_jobQueue.append(job);
if (m_dbMaxSize == -1 || m_entryCount < m_dbMaxSize) {
// No trimming required
return;
}
QDateTime startTime = QDateTime::currentDateTime();
QString queryDeleteString = QString("DELETE FROM entries WHERE ROWID IN (SELECT ROWID FROM entries ORDER BY timestamp DESC LIMIT -1 OFFSET %1);").arg(QString::number(m_dbMaxSize - m_trimSize));
DatabaseJob *deleteJob = new DatabaseJob(m_db, queryDeleteString);
connect(deleteJob, &DatabaseJob::finished, this, [this, deleteJob, startTime](){
if (deleteJob->error().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting oldest log entries to keep size. Driver error:" << deleteJob->error().driverText() << "Database error:" << deleteJob->error().databaseText();
}
qCDebug(dcLogEngine()) << "Ran housekeeping on log database in" << startTime.msecsTo(QDateTime::currentDateTime()) << "ms. (Deleted" << m_entryCount - (m_dbMaxSize - m_trimSize) << "entries)";
m_entryCount = m_dbMaxSize - m_trimSize;
emit logDatabaseUpdated();
});
qCDebug(dcLogEngine()) << "Scheduling housekeeping job.";
enqueJob(deleteJob, true);
}
void LogEngine::enqueJob(DatabaseJob *job, bool priority)
{
if (priority) {
m_jobQueue.prepend(job);
} else {
m_jobQueue.append(job);
}
qCDebug(dcLogEngine()) << "Scheduled job at position" << (priority ? 0 : m_jobQueue.count() - 1) << "(" << m_jobQueue.count() << "jobs in the queue)";
processQueue();
}
@ -569,7 +587,9 @@ void LogEngine::processQueue()
m_dbMalformed = false;
}
DatabaseJob *job = m_jobQueue.takeFirst();
qCDebug(dcLogEngine()) << "Processing DB queue. (" << m_jobQueue.count() << "jobs left in queue," << m_entryCount << "entries in DB)";
m_currentJob = job;
QFuture<DatabaseJob*> future = QtConcurrent::run([job](){
@ -603,6 +623,8 @@ void LogEngine::handleJobFinished()
job->finished();
job->deleteLater();
m_currentJob = nullptr;
qCDebug(dcLogEngine()) << "DB job finished. (" << m_entryCount << "entries in DB)";
processQueue();
}

View File

@ -56,7 +56,7 @@ public:
bool jobsRunning() const;
void setMaxLogEntries(int maxLogEntries, int overflow);
void setMaxLogEntries(int maxLogEntries, int trimSize);
void clearDatabase();
void logSystemEvent(const QDateTime &dateTime, bool active, Logging::LoggingLevel level = Logging::LoggingLevelInfo);
@ -88,8 +88,9 @@ private:
private slots:
void checkDBSize();
void trim();
void enqueJob(DatabaseJob *job);
void enqueJob(DatabaseJob *job, bool priority = false);
void processQueue();
void handleJobFinished();
@ -98,11 +99,14 @@ private:
QString m_username;
QString m_password;
int m_dbMaxSize;
int m_overflow;
bool m_trimWarningPrinted = false;
int m_trimSize;
int m_entryCount = 0;
bool m_dbMalformed = false;
// When maxQueueLength is exceeded, jobs will be flagged and discarded if this source logs more events
int m_maxQueueLength;
QHash<QString, QList<DatabaseJob*>> m_flaggedJobs;
QList<DatabaseJob*> m_jobQueue;
DatabaseJob *m_currentJob = nullptr;
QFutureWatcher<DatabaseJob*> m_jobWatcher;

View File

@ -946,7 +946,7 @@ void NymeaCore::deviceManagerLoaded()
qCDebug(dcApplication()) << "Starting housekeeping...";
QDateTime startTime = QDateTime::currentDateTime();
DevicesFetchJob *job = m_logger->fetchDevices();
connect(job, &DevicesFetchJob::finished, this, [this, job, startTime](){
connect(job, &DevicesFetchJob::finished, m_deviceManager, [this, job, startTime](){
foreach (const DeviceId &deviceId, job->results()) {
if (!m_deviceManager->findConfiguredDevice(deviceId)) {
qCDebug(dcApplication()) << "Cleaning stale device entries from log DB for device id" << deviceId;

View File

@ -41,6 +41,11 @@ private:
inline void verifyDeviceError(const QVariant &response, Device::DeviceError error = Device::DeviceErrorNoError) {
verifyError(response, "deviceError", enumValueName(error));
}
inline void waitForDBSync() {
while (NymeaCore::instance()->logEngine()->jobsRunning()) {
qApp->processEvents();
}
}
private slots:
void initTestCase();
@ -80,7 +85,8 @@ void TestLogging::initTestCase()
"LogEngine.debug=true\n"
"Tests.debug=true\n"
"MockDevice.debug=true\n"
"DeviceManager.debug=true\n");
"DeviceManager.debug=true\n"
"Application.debug=true\n");
}
void TestLogging::initLogs()
@ -101,6 +107,7 @@ void TestLogging::initLogs()
QVERIFY(logEntries.count() == 0);
restartServer();
NymeaCore::instance()->logEngine()->setMaxLogEntries(1000, 10);
}
void TestLogging::databaseSerializationTest_data()
@ -158,6 +165,8 @@ void TestLogging::systemLogs()
qWarning() << "Clearing logging DB";
clearLoggingDatabase();
waitForDBSync();
QVariantMap params;
params.insert("loggingSources", QVariantList() << enumValueName(Logging::LoggingSourceSystem));
params.insert("eventTypes", QVariantList() << enumValueName(Logging::LoggingEventTypeActiveChange));
@ -610,6 +619,8 @@ void TestLogging::testHouseKeeping()
connect(reply, SIGNAL(finished()), reply, SLOT(deleteLater()));
spy.wait();
waitForDBSync();
params.clear();
params.insert("deviceIds", QVariantList() << deviceId);
response = injectAndWait("Logging.GetLogEntries", params);
@ -623,15 +634,13 @@ void TestLogging::testHouseKeeping()
restartServer();
// Wait for the mock device to complete the setup
do {
qApp->processEvents();
params.clear();
params.insert("deviceId", m_mockDeviceId);
response = injectAndWait("Devices.GetConfiguredDevices", params);
} while (!response.toMap().value("params").toMap().value("devices").toList().first().toMap().value("setupComplete").toBool());
waitForDBSync();
params.clear();
params.insert("deviceIds", QVariantList() << deviceId);
response = injectAndWait("Logging.GetLogEntries", params);
qCDebug(dcTests()) << qUtf8Printable(QJsonDocument::fromVariant(response).toJson());
QVERIFY2(response.toMap().value("status").toString() == QString("success"), "GetLogEntries failed");
QVERIFY2(response.toMap().value("params").toMap().value("logEntries").toList().count() == 0, "Device state change event still in log. Should've been cleaned by housekeeping.");
}
@ -660,6 +669,8 @@ void TestLogging::testLimits()
verifyDeviceError(response);
}
waitForDBSync();
QVariantMap params;
QVariantMap response;