Threaded log database

This
a) makes the log db threaded by using QtConcurrent to run queries in a
different thread but still keeps ordering of the queries and always
only allows a single query at a time (QSql is not threadsafe). This fixes
removeDevice calls failing if we take more than $jsonprc_timeout to clean
a deleted device from the DB and keeps nymead responsive during that too.

b) generally improces performance of the system by not requiring operations
(emitting events, changing states) to wait for the sync log db entry to be
made.

c) drops some of the houskeeping code on nymea startup. While this will still
do log db housekeeping if the DB exceeds maxDbSize, it will not run
housekeeping on the DB any more at application startup. Reasoning for this
is that there have been reports of rules/log entries beimg destroyed if a
plugin can't be found at application startup. Given our general direction
of working towards more dynamic plugin loading, this might happen more often
in the future and we sure don't want to destroy rules etc when we just
temporarily miss a plugin.

d) tries to fix issue #226 by rotating the DB not only when it fails to open
initially, but also when it fails to insert new entries.
This commit is contained in:
Michael Zanetti 2019-11-15 18:01:18 +01:00
parent ec15e664e1
commit 3eac06de6e
6 changed files with 277 additions and 130 deletions

View File

@ -140,16 +140,27 @@ JsonReply* LoggingHandler::GetLogEntries(const QVariantMap &params) const
{
LogFilter filter = unpackLogFilter(params);
QVariantList entries;
foreach (const LogEntry &entry, NymeaCore::instance()->logEngine()->logEntries(filter)) {
entries.append(packLogEntry(entry));
}
QVariantMap returns;
returns.insert("loggingError", enumValueName<Logging::LoggingError>(Logging::LoggingErrorNoError));
returns.insert("logEntries", entries);
returns.insert("offset", filter.offset());
returns.insert("count", entries.count());
return createReply(returns);
LogEngineFetchJob *job = NymeaCore::instance()->logEngine()->logEntries(filter);
JsonReply *reply = createAsyncReply("GetLogEntries");
connect(job, &LogEngineFetchJob::finished, reply, [this, reply, job, filter](){
QVariantList entries;
foreach (const LogEntry &entry, job->results()) {
entries.append(packLogEntry(entry));
}
QVariantMap returns;
returns.insert("loggingError", enumValueName<Logging::LoggingError>(Logging::LoggingErrorNoError));
returns.insert("logEntries", entries);
returns.insert("offset", filter.offset());
returns.insert("count", entries.count());
reply->setData(returns);
reply->finished();
});
return reply;
}
QVariantMap LoggingHandler::packLogEntry(const LogEntry &logEntry)

View File

@ -128,6 +128,7 @@
#include <QDateTime>
#include <QFileInfo>
#include <QTime>
#include <QtConcurrent/QtConcurrent>
#define DB_SCHEMA_VERSION 3
@ -140,6 +141,8 @@ namespace nymeaserver {
*/
LogEngine::LogEngine(const QString &driver, const QString &dbName, const QString &hostname, const QString &username, const QString &password, int maxDBSize, QObject *parent):
QObject(parent),
m_username(username),
m_password(password),
m_dbMaxSize(maxDBSize)
{
m_db = QSqlDatabase::addDatabase(driver, "logs");
@ -169,11 +172,10 @@ LogEngine::LogEngine(const QString &driver, const QString &dbName, const QString
}
}
connect(&m_housekeepingTimer, &QTimer::timeout, this, &LogEngine::checkDBSize);
m_housekeepingTimer.setInterval(1); // Trigger on next idle event loop run
m_housekeepingTimer.setSingleShot(true);
checkDBSize();
connect(&m_jobWatcher, SIGNAL(finished()), this, SLOT(handleJobFinished()));
}
/*! Destructs the \l{LogEngine}. */
@ -186,7 +188,7 @@ LogEngine::~LogEngine()
\sa LogEntry, LogFilter
*/
QList<LogEntry> LogEngine::logEntries(const LogFilter &filter) const
LogEngineFetchJob* LogEngine::logEntries(const LogFilter &filter)
{
QList<LogEntry> results;
QSqlQuery query(m_db);
@ -205,7 +207,7 @@ QList<LogEntry> LogEngine::logEntries(const LogFilter &filter) const
} else {
queryString = QString("SELECT * FROM entries WHERE %1 ORDER BY timestamp DESC %2;").arg(filter.queryString()).arg(limitString);
}
qCDebug(dcLogEngine()) << "Preparing query:" << queryString;
// qCDebug(dcLogEngine()) << "Preparing query:" << queryString;
query.prepare(queryString);
foreach (const QString &value, filter.values()) {
@ -213,29 +215,38 @@ QList<LogEntry> LogEngine::logEntries(const LogFilter &filter) const
qCDebug(dcLogEngine()) << "Binding value to query:" << LogValueTool::serializeValue(value);
}
query.exec();
DatabaseJob *job = new DatabaseJob(query, this);
LogEngineFetchJob *fetchJob = new LogEngineFetchJob(this);
if (m_db.lastError().isValid()) {
qCWarning(dcLogEngine) << "Error fetching log entries. Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
return QList<LogEntry>();
}
connect(job, &DatabaseJob::finished, this, [this, job, fetchJob](){
fetchJob->deleteLater();
if (job->query().lastError().isValid()) {
qCWarning(dcLogEngine) << "Error fetching log entries. Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
fetchJob->finished();
return;
}
while (query.next()) {
LogEntry entry(
QDateTime::fromTime_t(query.value("timestamp").toLongLong()),
(Logging::LoggingLevel)query.value("loggingLevel").toInt(),
(Logging::LoggingSource)query.value("sourceType").toInt(),
query.value("errorCode").toInt());
entry.setTypeId(query.value("typeId").toUuid());
entry.setDeviceId(DeviceId(query.value("deviceId").toString()));
entry.setValue(LogValueTool::convertVariantToString(LogValueTool::deserializeValue(query.value("value").toString())));
entry.setEventType((Logging::LoggingEventType)query.value("loggingEventType").toInt());
entry.setActive(query.value("active").toBool());
results.append(entry);
}
// qCDebug(dcLogEngine) << "Fetched" << results.count() << "entries for db query:" << query.executedQuery();
while (job->query().next()) {
LogEntry entry(
QDateTime::fromTime_t(job->query().value("timestamp").toLongLong()),
(Logging::LoggingLevel)job->query().value("loggingLevel").toInt(),
(Logging::LoggingSource)job->query().value("sourceType").toInt(),
job->query().value("errorCode").toInt());
entry.setTypeId(job->query().value("typeId").toUuid());
entry.setDeviceId(DeviceId(job->query().value("deviceId").toString()));
entry.setValue(LogValueTool::convertVariantToString(LogValueTool::deserializeValue(job->query().value("value").toString())));
entry.setEventType((Logging::LoggingEventType)job->query().value("loggingEventType").toInt());
entry.setActive(job->query().value("active").toBool());
return results;
fetchJob->addResult(entry);
}
qCDebug(dcLogEngine) << "Fetched" << fetchJob->results().count() << "entries for db query:" << job->query().executedQuery();
fetchJob->finished();
});
enqueJob(job);
return fetchJob;
}
void LogEngine::setMaxLogEntries(int maxLogEntries, int overflow)
@ -251,11 +262,18 @@ void LogEngine::clearDatabase()
qCWarning(dcLogEngine) << "Clear logging database.";
QString queryDeleteString = QString("DELETE FROM entries;");
if (m_db.exec(queryDeleteString).lastError().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Could not clear logging database. Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
}
QSqlQuery query(queryDeleteString, m_db);
emit logDatabaseUpdated();
DatabaseJob *job = new DatabaseJob(query, this);
connect(job, &DatabaseJob::finished, this, [this, job](){
if (job->query().lastError().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Could not clear logging database. Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
}
emit logDatabaseUpdated();
});
enqueJob(job);
}
void LogEngine::logSystemEvent(const QDateTime &dateTime, bool active, Logging::LoggingLevel level)
@ -378,11 +396,18 @@ void LogEngine::removeDeviceLogs(const DeviceId &deviceId)
qCDebug(dcLogEngine) << "Deleting log entries from device" << deviceId.toString();
QString queryDeleteString = QString("DELETE FROM entries WHERE deviceId = '%1';").arg(deviceId.toString());
if (m_db.exec(queryDeleteString).lastError().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting log entries from device" << deviceId.toString() << ". Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
} else {
emit logDatabaseUpdated();
}
QSqlQuery query(queryDeleteString, m_db);
DatabaseJob *job = new DatabaseJob(query, this);
connect(job, &DatabaseJob::finished, this, [this, job, deviceId](){
if (job->query().lastError().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting log entries from device" << deviceId.toString() << ". Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
} else {
emit logDatabaseUpdated();
}
});
enqueJob(job);
}
void LogEngine::removeRuleLogs(const RuleId &ruleId)
@ -390,29 +415,20 @@ void LogEngine::removeRuleLogs(const RuleId &ruleId)
qCDebug(dcLogEngine) << "Deleting log entries from rule" << ruleId.toString();
QString queryDeleteString = QString("DELETE FROM entries WHERE typeId = '%1';").arg(ruleId.toString());
if (m_db.exec(queryDeleteString).lastError().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting log entries from rule" << ruleId.toString() << ". Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
} else {
emit logDatabaseUpdated();
}
}
QSqlQuery query(queryDeleteString, m_db);
QList<DeviceId> LogEngine::devicesInLogs() const
{
QString queryString = QString("SELECT deviceId FROM entries WHERE deviceId != \"%1\" GROUP BY deviceId;").arg(QUuid().toString());
QSqlQuery result = m_db.exec(queryString);
QList<DeviceId> ret;
if (result.lastError().type() != QSqlError::NoError) {
qCWarning(dcLogEngine()) << "Error fetching device entries from log database:" << m_db.lastError().driverText() << m_db.lastError().databaseText();
return ret;
}
if (!result.first()) {
return ret;
}
do {
ret.append(DeviceId::fromUuid(result.value("deviceId").toUuid()));
} while (result.next());
return ret;
DatabaseJob *job = new DatabaseJob(query, this);
connect(job, &DatabaseJob::finished, this, [this, job, ruleId](){
if (job->query().lastError().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting log entries from rule" << ruleId.toString() << ". Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
} else {
emit logDatabaseUpdated();
}
});
enqueJob(job);
}
void LogEngine::appendLogEntry(const LogEntry &entry)
@ -428,17 +444,28 @@ void LogEngine::appendLogEntry(const LogEntry &entry)
.arg(entry.active())
.arg(entry.errorCode());
if (m_db.exec(queryString).lastError().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error writing log entry. Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
qCWarning(dcLogEngine) << entry;
return;
}
QSqlQuery query(queryString, m_db);
emit logEntryAdded(entry);
DatabaseJob *job = new DatabaseJob(query, this);
if (++m_entryCount > m_dbMaxSize + m_overflow) {
m_housekeepingTimer.start();
}
connect(job, &DatabaseJob::finished, this, [this, job, entry](){
if (job->query().lastError().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error writing log entry. Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
qCWarning(dcLogEngine) << entry;
m_dbMalformed = true;
processQueue();
return;
}
emit logEntryAdded(entry);
if (++m_entryCount > m_dbMaxSize + m_overflow) {
checkDBSize();
}
});
enqueJob(job);
}
void LogEngine::checkDBSize()
@ -449,31 +476,92 @@ void LogEngine::checkDBSize()
}
QDateTime startTime = QDateTime::currentDateTime();
QString queryString = "SELECT COUNT(*) FROM entries;";
QSqlQuery result = m_db.exec(queryString);
if (m_db.lastError().type() != QSqlError::NoError) {
qWarning(dcLogEngine()) << "Failed to query entry count in db:" << m_db.lastError().databaseText();
return;
}
if (!result.first()) {
qWarning(dcLogEngine()) << "Failed retrieving entry count.";
return;
}
m_entryCount = result.value(0).toInt();
if (m_entryCount >= m_dbMaxSize) {
QSqlQuery query(queryString, m_db);
DatabaseJob *job = new DatabaseJob(query, this);
connect(job, &DatabaseJob::finished, this, [this, job, startTime](){
QSqlQuery result = job->query();
if (m_db.lastError().type() != QSqlError::NoError) {
qWarning(dcLogEngine()) << "Failed to query entry count in db:" << m_db.lastError().databaseText();
return;
}
if (!result.first()) {
qWarning(dcLogEngine()) << "Failed retrieving entry count.";
return;
}
m_entryCount = result.value(0).toInt();
if (m_entryCount <= m_dbMaxSize) {
return;
}
// keep only the latest m_dbMaxSize entries
if (!m_trimWarningPrinted) {
qCDebug(dcLogEngine) << "Deleting oldest entries" << (m_entryCount - m_dbMaxSize) << "and keep only the latest" << m_dbMaxSize << "entries.";
m_trimWarningPrinted = true;
}
QString queryDeleteString = QString("DELETE FROM entries WHERE ROWID IN (SELECT ROWID FROM entries ORDER BY timestamp DESC LIMIT -1 OFFSET %1);").arg(QString::number(m_dbMaxSize));
if (m_db.exec(queryDeleteString).lastError().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting oldest log entries to keep size. Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
}
m_entryCount = m_dbMaxSize;
emit logDatabaseUpdated();
QSqlQuery query(queryDeleteString);
DatabaseJob *deleteJob = new DatabaseJob(query, this);
connect(deleteJob, &DatabaseJob::finished, this, [this, deleteJob,startTime](){
if (deleteJob->query().lastError().type() != QSqlError::NoError) {
qCWarning(dcLogEngine) << "Error deleting oldest log entries to keep size. Driver error:" << m_db.lastError().driverText() << "Database error:" << m_db.lastError().databaseText();
}
m_entryCount = m_dbMaxSize;
qCDebug(dcLogEngine()) << "Ran housekeeping on log database in" << startTime.msecsTo(QDateTime::currentDateTime()) << "ms.";
emit logDatabaseUpdated();
});
enqueJob(deleteJob);
});
enqueJob(job);
}
void LogEngine::enqueJob(DatabaseJob *job)
{
m_jobQueue.append(job);
processQueue();
}
void LogEngine::processQueue()
{
if (m_jobQueue.isEmpty()) {
return;
}
qCDebug(dcLogEngine()) << "Ran housekeeping on log database in" << startTime.msecsTo(QDateTime::currentDateTime()) << "ms.";
if (!m_jobWatcher.isFinished()) {
return;
}
if (m_dbMalformed) {
qCWarning(dcLogEngine()) << "Database is malformed. Trying to recover...";
m_db.close();
rotate(m_db.databaseName());
initDB(m_username, m_password);
m_dbMalformed = false;
}
DatabaseJob *job = m_jobQueue.takeFirst();
QFuture<DatabaseJob*> future = QtConcurrent::run([job](){
job->query().exec();
return job;
});
m_jobWatcher.setFuture(future);
}
void LogEngine::handleJobFinished()
{
DatabaseJob *job = m_jobWatcher.result();
job->finished();
job->deleteLater();
processQueue();
}
void LogEngine::rotate(const QString &dbName)

View File

@ -32,18 +32,23 @@
#include <QObject>
#include <QSqlDatabase>
#include <QSqlQuery>
#include <QTimer>
#include <QFutureWatcher>
namespace nymeaserver {
class DatabaseJob;
class LogEngineFetchJob;
class LogEngine: public QObject
{
Q_OBJECT
public:
LogEngine(const QString &driver, const QString &dbName, const QString &hostname = QString("127.0.0.1"), const QString &username = QString(), const QString &password = QString(), int maxDBSize = 50000, QObject *parent = 0);
LogEngine(const QString &driver, const QString &dbName, const QString &hostname = QString("127.0.0.1"), const QString &username = QString(), const QString &password = QString(), int maxDBSize = 50000, QObject *parent = nullptr);
~LogEngine();
QList<LogEntry> logEntries(const LogFilter &filter = LogFilter()) const;
LogEngineFetchJob *logEntries(const LogFilter &filter = LogFilter());
void setMaxLogEntries(int maxLogEntries, int overflow);
void clearDatabase();
@ -60,7 +65,6 @@ public:
void logRuleExitActionsExecuted(const Rule &rule);
void removeDeviceLogs(const DeviceId &deviceId);
void removeRuleLogs(const RuleId &ruleId);
QList<DeviceId> devicesInLogs() const;
signals:
void logEntryAdded(const LogEntry &logEntry);
@ -77,15 +81,57 @@ private:
private slots:
void checkDBSize();
void enqueJob(DatabaseJob *job);
void processQueue();
void handleJobFinished();
private:
QSqlDatabase m_db;
QString m_username;
QString m_password;
int m_dbMaxSize;
int m_overflow;
bool m_trimWarningPrinted = false;
int m_entryCount = 0;
QTimer m_housekeepingTimer;
bool m_dbMalformed = false;
QList<DatabaseJob*> m_jobQueue;
QFutureWatcher<DatabaseJob*> m_jobWatcher;
};
class DatabaseJob: public QObject
{
Q_OBJECT
public:
DatabaseJob(const QSqlQuery &query, QObject *parent): QObject(parent), m_query(query) {}
QSqlQuery query() const { return m_query; }
signals:
void finished();
private:
QSqlQuery m_query;
};
class LogEngineFetchJob: public QObject
{
Q_OBJECT
public:
LogEngineFetchJob(QObject *parent): QObject(parent) {}
QList<LogEntry> results() { return m_results; }
signals:
void finished();
private:
void addResult(const LogEntry &entry) { m_results.append(entry); }
QList<LogEntry> m_results;
friend class LogEngine;
};
}
#endif

View File

@ -920,28 +920,6 @@ void NymeaCore::deviceManagerLoaded()
onDateTimeChanged(m_timeManager->currentDateTime());
emit initialized();
// Do some houskeeping...
qCDebug(dcApplication()) << "Starting housekeeping...";
QDateTime startTime = QDateTime::currentDateTime();
foreach (const DeviceId &deviceId, m_logger->devicesInLogs()) {
if (!m_deviceManager->findConfiguredDevice(deviceId)) {
qCDebug(dcApplication()) << "Cleaning stale device entries from log DB for device id" << deviceId;
m_logger->removeDeviceLogs(deviceId);
}
}
foreach (const DeviceId &deviceId, m_ruleEngine->devicesInRules()) {
if (!m_deviceManager->findConfiguredDevice(deviceId)) {
qCDebug(dcApplication()) << "Cleaning stale rule entries for device id" << deviceId;
foreach (const RuleId &ruleId, m_ruleEngine->findRules(deviceId)) {
m_ruleEngine->removeDeviceFromRule(ruleId, deviceId);
}
}
}
qCDebug(dcApplication()) << "Housekeeping done in" << startTime.msecsTo(QDateTime::currentDateTime()) << "ms.";
}
}

View File

@ -81,25 +81,43 @@ void TestLoggingDirect::benchmarkDB()
qDebug() << "Flushing DB for test";
engine->setMaxLogEntries(prefill, overflow);
engine->setMaxLogEntries(maxSize, overflow);
qDebug() << "DB has" << engine->logEntries().count() << "entries";
LogEngineFetchJob *job = engine->logEntries();
QSignalSpy fetchSpy(job, &LogEngineFetchJob::finished);
fetchSpy.wait();
QList<LogEntry> entries = job->results();
qDebug() << "DB has" << entries.count() << "entries";
qDebug() << "Prefilling DB for test";
for (int i = engine->logEntries().count(); i < prefill; i++) {
for (int i = entries.count(); i < prefill; i++) {
engine->logSystemEvent(QDateTime::currentDateTime(), true);
}
qDebug() << "DB has" << engine->logEntries().count() << "entries";
qDebug() << "Starting benchmark with" << engine->logEntries().count() << "entries in the db";
job = engine->logEntries();
QSignalSpy fetchSpy2(job, &LogEngineFetchJob::finished);
fetchSpy2.wait();
entries = job->results();
qDebug() << "DB has" << entries.count() << "entries";
qDebug() << "Starting benchmark with" << entries.count() << "entries in the db";
QBENCHMARK {
engine->logSystemEvent(QDateTime::currentDateTime(), true);
}
QDateTime now = QDateTime::currentDateTime();
while (engine->logEntries().count() > maxSize + overflow) {
job = engine->logEntries();
QSignalSpy fetchSpy3(job, &LogEngineFetchJob::finished);
fetchSpy3.wait();
entries = job->results();
while (entries.count() > maxSize + overflow) {
qApp->processEvents();
if (now.addSecs(5) < QDateTime::currentDateTime()) {
QVERIFY2(false, QString("Housekeeping didn't work. Have %1 entries but expected to have max %2").arg(engine->logEntries().count()).arg(QString::number(maxSize)).toLocal8Bit());
QVERIFY2(false, QString("Housekeeping didn't work. Have %1 entries but expected to have max %2").arg(entries.count()).arg(QString::number(maxSize)).toLocal8Bit());
}
}
qDebug() << "Ended benchmark with" << engine->logEntries().count() << "entries in the db";
qDebug() << "Ended benchmark with" << entries.count() << "entries in the db";
}
#include "testloggingdirect.moc"

View File

@ -2312,7 +2312,11 @@ void TestRules::testStateBasedAction()
LogFilter filter;
filter.addDeviceId(m_mockDeviceId);
filter.addTypeId(mockWithParamsActionTypeId);
QList<LogEntry> entries = NymeaCore::instance()->logEngine()->logEntries(filter);
LogEngineFetchJob *job = NymeaCore::instance()->logEngine()->logEntries(filter);
QSignalSpy fetchSpy(job, &LogEngineFetchJob::finished);
fetchSpy.wait();
QList<LogEntry> entries = job->results();
qCDebug(dcTests()) << "Log entries:" << entries;
// set bool state to false
@ -2331,10 +2335,12 @@ void TestRules::testStateBasedAction()
QCOMPARE(spy.count(), 1);
reply->deleteLater();
entries = NymeaCore::instance()->logEngine()->logEntries(filter);
job = NymeaCore::instance()->logEngine()->logEntries(filter);
QSignalSpy fetchSpy2(job, &LogEngineFetchJob::finished);
fetchSpy2.wait();
entries = job->results();
qCDebug(dcTests()) << "Log entries:" << entries;
}
void TestRules::removePolicyUpdate()