From 3436e9b998380519e39fa238d728e378e8474207 Mon Sep 17 00:00:00 2001 From: Michael Zanetti Date: Sun, 19 Jul 2020 23:18:18 +0200 Subject: [PATCH] drop usage of asyncio and run everything regularly threaded --- .../python/pynymealogginghandler.h | 16 +- .../integrations/python/pynymeamodule.h | 62 ++++ libnymea-core/integrations/python/pyparam.h | 4 +- libnymea-core/integrations/python/pything.h | 4 +- .../integrations/python/pythingactioninfo.h | 4 +- .../integrations/python/pythingsetupinfo.h | 4 +- libnymea-core/integrations/python/pyutils.h | 2 +- .../integrations/pythonintegrationplugin.cpp | 319 ++++++------------ .../integrations/pythonintegrationplugin.h | 21 +- libnymea-core/libnymea-core.pro | 1 + plugins/pymock/integrationpluginpymock.py | 32 +- 11 files changed, 218 insertions(+), 251 deletions(-) create mode 100644 libnymea-core/integrations/python/pynymeamodule.h diff --git a/libnymea-core/integrations/python/pynymealogginghandler.h b/libnymea-core/integrations/python/pynymealogginghandler.h index 2fa5d3c9..1a5b3f5f 100644 --- a/libnymea-core/integrations/python/pynymealogginghandler.h +++ b/libnymea-core/integrations/python/pynymealogginghandler.h @@ -7,6 +7,8 @@ #include #include +Q_DECLARE_LOGGING_CATEGORY(dcPythonIntegrations) + #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Winvalid-offsetof" #pragma GCC diagnostic ignored "-Wwrite-strings" @@ -16,14 +18,23 @@ typedef struct { char *category; } PyNymeaLoggingHandler; -static int PyNymeaLoggingHandler_init(PyNymeaLoggingHandler */*self*/, PyObject */*args*/, PyObject */*kwds*/) +static int PyNymeaLoggingHandler_init(PyNymeaLoggingHandler *self, PyObject *args, PyObject */*kwds*/) { + char *category = nullptr; + if (!PyArg_ParseTuple(args, "s", &category)) { + qCWarning(dcPythonIntegrations()) << "PyNymeaLoggingHandler: Error parsing parameters"; + return -1; + } + + self->category = (char*)malloc(qstrlen(category)); + qstrcpy(self->category, category); + return 0; } static void PyNymeaLoggingHandler_dealloc(PyNymeaLoggingHandler * self) -// destruct the object { + free(self->category); Py_TYPE(self)->tp_free(self); } @@ -106,5 +117,6 @@ static void registerNymeaLoggingHandler(PyObject *module) } } +#pragma GCC diagnostic pop #endif // PYNYMEALOGGINGHANDLER_H diff --git a/libnymea-core/integrations/python/pynymeamodule.h b/libnymea-core/integrations/python/pynymeamodule.h new file mode 100644 index 00000000..e750e650 --- /dev/null +++ b/libnymea-core/integrations/python/pynymeamodule.h @@ -0,0 +1,62 @@ +#ifndef PYNYMEAMODULE_H +#define PYNYMEAMODULE_H + +#include + +#include "pynymealogginghandler.h" +#include "pything.h" +#include "pythingdiscoveryinfo.h" +#include "pythingsetupinfo.h" +#include "pyparam.h" +#include "pythingactioninfo.h" +#include "pythingpairinginfo.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Winvalid-offsetof" +#pragma GCC diagnostic ignored "-Wwrite-strings" + +static int nymea_exec(PyObject *m) { + + // Override stdout/stderr to use qDebug instead + PyObject* pyLog = PyModule_Create(&pyLog_module); + PySys_SetObject("stdout", pyLog); + PyObject* pyWarn = PyModule_Create(&pyWarn_module); + PySys_SetObject("stderr", pyWarn); + + registerNymeaLoggingHandler(m); + registerParamType(m); + registerThingType(m); + registerThingDescriptorType(m); + registerThingDiscoveryInfoType(m); + registerThingPairingInfoType(m); + registerThingSetupInfoType(m); + registerThingActionInfoType(m); + + return 0; +} + +static struct PyModuleDef_Slot nymea_slots[] = { + {Py_mod_exec, (void*)nymea_exec}, + {0, NULL}, +}; + +static struct PyModuleDef nymea_module = { + PyModuleDef_HEAD_INIT, + "nymea", + "The nymea module. Provdes types used in the nymea plugin API.", + 0, + nullptr, // methods + nymea_slots, // slots + nullptr, + nullptr, + nullptr +}; + +PyMODINIT_FUNC PyInit_nymea(void) +{ + return PyModuleDef_Init(&nymea_module); +} + +#pragma GCC diagnostic pop + +#endif // PYNYMEAMODULE_H diff --git a/libnymea-core/integrations/python/pyparam.h b/libnymea-core/integrations/python/pyparam.h index 387b446f..87ae8063 100644 --- a/libnymea-core/integrations/python/pyparam.h +++ b/libnymea-core/integrations/python/pyparam.h @@ -35,7 +35,7 @@ static PyMemberDef PyParam_members[] = { static int PyParam_init(PyParam *self, PyObject *args, PyObject *kwds) { - qWarning() << "++++ PyParam"; + qCDebug(dcPythonIntegrations()) << "+++ PyParam"; static char *kwlist[] = {"paramTypeId", "value", nullptr}; PyObject *paramTypeId = nullptr, *value = nullptr; @@ -54,7 +54,7 @@ static int PyParam_init(PyParam *self, PyObject *args, PyObject *kwds) } static void PyParam_dealloc(PyParam * self) { - qWarning() << "---- PyParam"; + qCDebug(dcPythonIntegrations()) << "--- PyParam"; Py_XDECREF(self->pyParamTypeId); Py_XDECREF(self->pyValue); Py_TYPE(self)->tp_free(self); diff --git a/libnymea-core/integrations/python/pything.h b/libnymea-core/integrations/python/pything.h index 5ab70716..28e9092b 100644 --- a/libnymea-core/integrations/python/pything.h +++ b/libnymea-core/integrations/python/pything.h @@ -53,7 +53,7 @@ static PyObject* PyThing_new(PyTypeObject *type, PyObject */*args*/, PyObject */ if (self == NULL) { return nullptr; } - qWarning() << "*++++ PyThing" << self; + qCDebug(dcPythonIntegrations()) << "+++ PyThing" << self; return (PyObject*)self; } @@ -127,7 +127,7 @@ static void PyThing_setThing(PyThing *self, Thing *thing) static void PyThing_dealloc(PyThing * self) { - qWarning() << "----- PyThing" << self; + qCDebug(dcPythonIntegrations()) << "--- PyThing" << self; Py_XDECREF(self->pyId); Py_XDECREF(self->pyThingClassId); Py_XDECREF(self->pyName); diff --git a/libnymea-core/integrations/python/pythingactioninfo.h b/libnymea-core/integrations/python/pythingactioninfo.h index 05a1a35e..d29c4e4e 100644 --- a/libnymea-core/integrations/python/pythingactioninfo.h +++ b/libnymea-core/integrations/python/pythingactioninfo.h @@ -43,7 +43,7 @@ static PyObject* PyThingActionInfo_new(PyTypeObject *type, PyObject */*args*/, P if (self == NULL) { return nullptr; } - qWarning() << "+++ PyThingActionInfo"; + qCDebug(dcPythonIntegrations()) << "+++ PyThingActionInfo"; return (PyObject*)self; } @@ -59,7 +59,7 @@ void PyThingActionInfo_setInfo(PyThingActionInfo *self, ThingActionInfo *info, P static void PyThingActionInfo_dealloc(PyThingActionInfo * self) { - qWarning() << "---- PyThingActionInfo"; + qCDebug(dcPythonIntegrations()) << "--- PyThingActionInfo"; Py_DECREF(self->pyThing); Py_DECREF(self->pyActionTypeId); Py_DECREF(self->pyParams); diff --git a/libnymea-core/integrations/python/pythingsetupinfo.h b/libnymea-core/integrations/python/pythingsetupinfo.h index 19010c09..83199546 100644 --- a/libnymea-core/integrations/python/pythingsetupinfo.h +++ b/libnymea-core/integrations/python/pythingsetupinfo.h @@ -41,7 +41,7 @@ static PyObject* PyThingSetupInfo_new(PyTypeObject *type, PyObject *args, PyObje if (self == NULL) { return nullptr; } - qWarning() << "++++ PyThingSetupInfo"; + qCDebug(dcPythonIntegrations()) << "+++ PyThingSetupInfo"; static char *kwlist[] = {"thing", nullptr}; @@ -60,7 +60,7 @@ static PyObject* PyThingSetupInfo_new(PyTypeObject *type, PyObject *args, PyObje } static void PyThingSetupInfo_dealloc(PyThingSetupInfo * self) { - qWarning() << "--- PyThingSetupInfo"; + qCDebug(dcPythonIntegrations()) << "--- PyThingSetupInfo"; Py_DECREF(self->pyThing); Py_TYPE(self)->tp_free(self); } diff --git a/libnymea-core/integrations/python/pyutils.h b/libnymea-core/integrations/python/pyutils.h index 62b3cc7b..2e00da05 100644 --- a/libnymea-core/integrations/python/pyutils.h +++ b/libnymea-core/integrations/python/pyutils.h @@ -65,7 +65,7 @@ QVariant PyObjectToQVariant(PyObject *pyObject) return QVariant(PyObject_IsTrue(pyObject)); } - Q_ASSERT_X(false, "pyutils.h", "Unhandled data type in conversion PyObject to QVariant!"); + Q_ASSERT_X(false, "pyutils.h", QString("Unhandled data type in conversion PyObject to QVariant: %1").arg(pyObject->ob_type->tp_name).toUtf8()); return QVariant(); } diff --git a/libnymea-core/integrations/pythonintegrationplugin.cpp b/libnymea-core/integrations/pythonintegrationplugin.cpp index 0ba9f215..8cabf7b3 100644 --- a/libnymea-core/integrations/pythonintegrationplugin.cpp +++ b/libnymea-core/integrations/pythonintegrationplugin.cpp @@ -1,14 +1,7 @@ #include -#include "python/pynymealogginghandler.h" -#include "python/pything.h" -#include "python/pythingdiscoveryinfo.h" -#include "python/pythingsetupinfo.h" -#include "python/pyparam.h" -#include "python/pythingactioninfo.h" -#include "python/pythingpairinginfo.h" - #include "pythonintegrationplugin.h" +#include "python/pynymeamodule.h" #include "loggingcategories.h" @@ -22,82 +15,8 @@ #include PyThreadState* PythonIntegrationPlugin::s_mainThreadState = nullptr; -QThreadPool* PythonIntegrationPlugin::s_threadPool = nullptr; - QHash PythonIntegrationPlugin::s_plugins; - -PyObject* PythonIntegrationPlugin::task_done(PyObject* self, PyObject* args) -{ - Q_UNUSED(self) - - PyObject *result = nullptr; - - if (!PyArg_ParseTuple(args, "O", &result)) { - qCWarning(dcPythonIntegrations()) << "Cannot fetch result from coroutine callback."; - return nullptr; - } - - PyObject *exceptionMethod = PyObject_GetAttrString(result, "exception"); - - PyObject *exception = PyObject_CallFunctionObjArgs(exceptionMethod, nullptr); - if (exception != Py_None) { - PyObject* repr = PyObject_Repr(exception); - PyObject* str = PyUnicode_AsEncodedString(repr, "utf-8", "~E~"); - const char *bytes = PyBytes_AS_STRING(str); - Py_XDECREF(repr); - Py_XDECREF(str); - -// PyObject *traceback = PyObject_CallMethodObjArgs(exception, "__traceback__", nullptr); - - qCWarning(dcPythonIntegrations()) << "Exception in plugin:" << bytes; - - PyErr_Clear(); - } - - Py_RETURN_NONE; -} - - -static PyMethodDef nymea_methods[] = -{ - {"task_done", PythonIntegrationPlugin::task_done, METH_VARARGS, "callback to clean up after asyc coroutines"}, - {nullptr, nullptr, 0, nullptr} // sentinel -}; - -static PyModuleDef nymea_module = -{ - PyModuleDef_HEAD_INIT, // PyModuleDef_Base m_base; - "nymea", // const char* m_name; - "nymea module for python based integration plugins", // const char* m_doc; - -1, // Py_ssize_t m_size; - nymea_methods, // PyMethodDef *m_methods - nullptr, nullptr, nullptr, nullptr -}; - -PyMODINIT_FUNC PyInit_nymea(void) -{ - // Overrride stdout/stderr to use qDebug instead - PyObject* pyLog = PyModule_Create(&pyLog_module); - PySys_SetObject("stdout", pyLog); - PyObject* pyWarn = PyModule_Create(&pyWarn_module); - PySys_SetObject("stderr", pyWarn); - - - // Register nymea types - PyObject* m = PyModule_Create(&nymea_module); - registerNymeaLoggingHandler(m); - registerParamType(m); - registerThingType(m); - registerThingDescriptorType(m); - registerThingDiscoveryInfoType(m); - registerThingPairingInfoType(m); - registerThingSetupInfoType(m); - registerThingActionInfoType(m); - - return m; -} - PyObject* PythonIntegrationPlugin::pyConfiguration(PyObject* self, PyObject* /*args*/) { PythonIntegrationPlugin *plugin = s_plugins.key(self); @@ -274,17 +193,27 @@ PythonIntegrationPlugin::PythonIntegrationPlugin(QObject *parent) : IntegrationP PythonIntegrationPlugin::~PythonIntegrationPlugin() { + if (m_pluginModule) { + callPluginFunction("deinit"); + } + // Acquire GIL for this plugin's interpreter PyEval_RestoreThread(m_threadState); - // Cancel all the thread in here while (!m_runningTasks.isEmpty()) { - QFutureWatcher *watcher = m_runningTasks.values().first(); - watcher->cancel(); + QFutureWatcher *watcher = m_runningTasks.keys().first(); + QString function = m_runningTasks.value(watcher); + + Py_BEGIN_ALLOW_THREADS + qCDebug(dcPythonIntegrations()) << "Waiting for" << metadata().pluginName() << "to finish" << function; watcher->waitForFinished(); + Py_END_ALLOW_THREADS } - Py_XDECREF(s_plugins.take(this)); + s_plugins.take(this); + Py_XDECREF(m_pluginModule); + Py_XDECREF(m_logger); + Py_DECREF(m_nymeaModule); Py_EndInterpreter(m_threadState); @@ -301,33 +230,25 @@ void PythonIntegrationPlugin::initPython() PyEval_InitThreads(); // Store the main thread state and release the GIL s_mainThreadState = PyEval_SaveThread(); - - // Allocate a shared thread pool for the plugins - s_threadPool = new QThreadPool(); - qCDebug(dcPythonIntegrations()) << "Created a thread pool with a maximum of" << s_threadPool->maxThreadCount() << "threads for python plugins."; } void PythonIntegrationPlugin::deinitPython() { PyEval_RestoreThread(s_mainThreadState); - Py_FinalizeEx(); - - s_threadPool->deleteLater(); - s_threadPool = nullptr; } bool PythonIntegrationPlugin::loadScript(const QString &scriptFile) { QFileInfo fi(scriptFile); - QFile metaData(fi.absolutePath() + "/" + fi.baseName() + ".json"); - if (!metaData.open(QFile::ReadOnly)) { - qCWarning(dcThingManager()) << "Error opening metadata file:" << metaData.fileName(); + QFile metaDataFile(fi.absolutePath() + "/" + fi.baseName() + ".json"); + if (!metaDataFile.open(QFile::ReadOnly)) { + qCWarning(dcThingManager()) << "Error opening metadata file:" << metaDataFile.fileName(); return false; } QJsonParseError error; - QJsonDocument jsonDoc = QJsonDocument::fromJson(metaData.readAll(), &error); + QJsonDocument jsonDoc = QJsonDocument::fromJson(metaDataFile.readAll(), &error); if (error.error != QJsonParseError::NoError) { qCWarning(dcThingManager()) << "Error parsing metadata file:" << error.errorString(); return false; @@ -350,19 +271,21 @@ bool PythonIntegrationPlugin::loadScript(const QString &scriptFile) // Import nymea module into this interpreter m_nymeaModule = PyImport_ImportModule("nymea"); - // Set up import paths for the plugin + // Set up import path for the plugin directory PyObject* sysPath = PySys_GetObject("path"); PyObject* pluginImportPath = PyUnicode_FromString(fi.absolutePath().toUtf8()); PyList_Append(sysPath, pluginImportPath); Py_DECREF(pluginImportPath); + // Set up import path for the "modules" subdir in the plugin directory PyObject* pluginModulesImportPath = PyUnicode_FromString(QString("%1/modules/").arg(fi.absolutePath()).toUtf8()); PyList_Append(sysPath, pluginModulesImportPath); Py_DECREF(pluginModulesImportPath); - m_module = PyImport_ImportModule(fi.baseName().toUtf8()); + // Import the plugin + m_pluginModule = PyImport_ImportModule(fi.baseName().toUtf8()); - if (!m_module) { + if (!m_pluginModule) { qCWarning(dcThingManager()) << "Error importing python plugin from:" << fi.absoluteFilePath(); PyErr_Print(); PyErr_Clear(); @@ -371,34 +294,37 @@ bool PythonIntegrationPlugin::loadScript(const QString &scriptFile) } qCDebug(dcThingManager()) << "Imported python plugin from" << fi.absoluteFilePath(); - s_plugins.insert(this, m_module); - - // We'll be using asyncio everywhere, so let's import it right away - // IMPORTANT: The asyncio module is a bit special in a sense that it actually shares - // stuff between interpreters. See https://docs.python.org/3/c-api/init.html#c.Py_NewInterpreter - // for the listed bugs and caveats. - // If we destroy the first interpreter that imports asyncio, things will become crashy as the - // interpreter will tear down the module, leaving shallow copies of the modules dict in other - // interpreters. So let's import the module down here after we're sure this module compiles and - // won't be unloaded any more. - // Note: This becomes a problem when we'll be supporting to unload plugins at runtime. - m_asyncio = PyImport_ImportModule("asyncio"); + s_plugins.insert(this, m_pluginModule); // Set up logger with appropriate logging category - PyNymeaLoggingHandler *logger = reinterpret_cast(_PyObject_New(&PyNymeaLoggingHandlerType)); QString category = metadata().pluginName(); category.replace(0, 1, category[0].toUpper()); - logger->category = static_cast(malloc(category.length() + 1)); - memset(logger->category, '0', category.length() +1); - strcpy(logger->category, category.toUtf8().data()); - PyModule_AddObject(m_module, "logger", reinterpret_cast(logger)); - + PyObject *args = Py_BuildValue("(s)", category.toUtf8().data()); + PyNymeaLoggingHandler *logger = reinterpret_cast(PyObject_CallObject((PyObject*)&PyNymeaLoggingHandlerType, args)); + Py_DECREF(args); + PyModule_AddObject(m_pluginModule, "logger", reinterpret_cast(logger)); + m_logger = (PyObject*)logger; // Export metadata ids into module exportIds(); - // Register config access methods - PyModule_AddFunctions(m_module, plugin_methods); + // Register plugin api methods (plugin params etc) + PyModule_AddFunctions(m_pluginModule, plugin_methods); + + // As python does not have an event loop by default and uses blocking code a lot, we'll + // call every plugin method in a threaded way to prevent blocking the core while still not + // forcing every plugin developer to deal with threading in the plugin. + // In oder to not create and destroy a thread for each plugin api call, we'll be using a + // thread pool. + // The maximum number of threads in a plugin will be amount of things it manages + 2. + // This would allow for e.g. running an event loop using init(), performing something on a thing + // and still allow the user to perform a discovery at the same time. On the other hand, this is + // strict enough to not encourage the plugin developer to block forever in ever api call but use + // proper task processing means (timers, event loops etc) instead. + // Plugins can still spawn more threads on their own if the need to but have to manage them on their own. + m_threadPool = new QThreadPool(this); + m_threadPool->setMaxThreadCount(2); + qCDebug(dcPythonIntegrations()) << "Created a thread pool with a maximum of" << m_threadPool->maxThreadCount() << "threads for python plugin" << metadata().pluginName(); PyEval_ReleaseThread(m_threadState); @@ -517,18 +443,24 @@ void PythonIntegrationPlugin::setupThing(ThingSetupInfo *info) PyObject *args = PyTuple_New(1); PyTuple_SetItem(args, 0, (PyObject*)pyThing); + Py_INCREF(pyThing); PyThingSetupInfo *pyInfo = (PyThingSetupInfo*)PyObject_CallObject((PyObject*)&PyThingSetupInfoType, args); Py_DECREF(args); pyInfo->info = info; + m_threadPool->setMaxThreadCount(m_threadPool->maxThreadCount() + 1); + qCDebug(dcPythonIntegrations()) << "Expanded thread pool for plugin" << metadata().pluginName() << "to" << m_threadPool->maxThreadCount(); + PyEval_ReleaseThread(m_threadState); connect(info->thing(), &Thing::destroyed, this, [=](){ PyEval_RestoreThread(m_threadState); pyThing->thing = nullptr; Py_DECREF(pyThing); + m_threadPool->setMaxThreadCount(m_threadPool->maxThreadCount() - 1); + qCDebug(dcPythonIntegrations()) << "Shrunk thread pool for plugin" << metadata().pluginName() << "to" << m_threadPool->maxThreadCount(); PyEval_ReleaseThread(m_threadState); }); connect(info, &ThingSetupInfo::destroyed, this, [=](){ @@ -549,12 +481,7 @@ void PythonIntegrationPlugin::setupThing(ThingSetupInfo *info) void PythonIntegrationPlugin::postSetupThing(Thing *thing) { PyThing* pyThing = m_things.value(thing); - Py_INCREF(pyThing); - - bool success = callPluginFunction("postSetupThing", reinterpret_cast(pyThing)); - if (!success) { - Py_DECREF(pyThing); - } + callPluginFunction("postSetupThing", reinterpret_cast(pyThing)); } void PythonIntegrationPlugin::executeAction(ThingActionInfo *info) @@ -569,10 +496,13 @@ void PythonIntegrationPlugin::executeAction(ThingActionInfo *info) PyEval_ReleaseThread(m_threadState); connect(info, &ThingActionInfo::destroyed, this, [=](){ + qCDebug(dcPythonIntegrations()) << "Info destroyed"; PyEval_RestoreThread(m_threadState); + qCDebug(dcPythonIntegrations()) << "Info destroyed2"; pyInfo->info = nullptr; Py_DECREF(pyInfo); PyEval_ReleaseThread(m_threadState); + qCDebug(dcPythonIntegrations()) << "Info destroyed3"; }); bool success = callPluginFunction("executeAction", reinterpret_cast(pyInfo)); @@ -584,12 +514,7 @@ void PythonIntegrationPlugin::executeAction(ThingActionInfo *info) void PythonIntegrationPlugin::thingRemoved(Thing *thing) { PyThing *pyThing = m_things.value(thing); - Py_INCREF(pyThing); - - bool success = callPluginFunction("thingRemoved", reinterpret_cast(pyThing)); - if (!success) { - Py_DECREF(pyThing); - } + callPluginFunction("thingRemoved", reinterpret_cast(pyThing)); m_mutex.lock(); m_things.remove(thing); @@ -623,13 +548,13 @@ void PythonIntegrationPlugin::exportIds() QString pluginName = metadata().pluginName(); QString pluginId = metadata().pluginId().toString(); qCDebug(dcThingManager()) << "- Plugin:" << pluginName << pluginId; - PyModule_AddStringConstant(m_module, "pluginId", pluginId.toUtf8()); + PyModule_AddStringConstant(m_pluginModule, "pluginId", pluginId.toUtf8()); exportParamTypes(configurationDescription(), pluginName, "", "plugin"); foreach (const Vendor &vendor, supportedVendors()) { qCDebug(dcThingManager()) << "|- Vendor:" << vendor.name() << vendor.id().toString(); - PyModule_AddStringConstant(m_module, QString("%1VendorId").arg(vendor.name()).toUtf8(), vendor.id().toString().toUtf8()); + PyModule_AddStringConstant(m_pluginModule, QString("%1VendorId").arg(vendor.name()).toUtf8(), vendor.id().toString().toUtf8()); } foreach (const ThingClass &thingClass, supportedThings()) { @@ -642,7 +567,7 @@ void PythonIntegrationPlugin::exportThingClass(const ThingClass &thingClass) QString variableName = QString("%1ThingClassId").arg(thingClass.name()); qCDebug(dcThingManager()) << "|- ThingClass:" << variableName << thingClass.id().toString(); - PyModule_AddStringConstant(m_module, variableName.toUtf8(), thingClass.id().toString().toUtf8()); + PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), thingClass.id().toString().toUtf8()); exportParamTypes(thingClass.paramTypes(), thingClass.name(), "", "thing"); exportParamTypes(thingClass.settingsTypes(), thingClass.name(), "", "settings"); @@ -659,7 +584,7 @@ void PythonIntegrationPlugin::exportParamTypes(const ParamTypes ¶mTypes, con foreach (const ParamType ¶mType, paramTypes) { QString variableName = QString("%1ParamTypeId").arg(thingClassName + typeName[0].toUpper() + typeName.right(typeName.length()-1) + typeClass + paramType.name()[0].toUpper() + paramType.name().right(paramType.name().length() -1 )); qCDebug(dcThingManager()) << " |- ParamType:" << variableName << paramType.id().toString(); - PyModule_AddStringConstant(m_module, variableName.toUtf8(), paramType.id().toString().toUtf8()); + PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), paramType.id().toString().toUtf8()); } } @@ -668,7 +593,7 @@ void PythonIntegrationPlugin::exportStateTypes(const StateTypes &stateTypes, con foreach (const StateType &stateType, stateTypes) { QString variableName = QString("%1%2StateTypeId").arg(thingClassName, stateType.name()[0].toUpper() + stateType.name().right(stateType.name().length() - 1)); qCDebug(dcThingManager()) << " |- StateType:" << variableName << stateType.id().toString(); - PyModule_AddStringConstant(m_module, variableName.toUtf8(), stateType.id().toString().toUtf8()); + PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), stateType.id().toString().toUtf8()); } } @@ -677,10 +602,9 @@ void PythonIntegrationPlugin::exportEventTypes(const EventTypes &eventTypes, con foreach (const EventType &eventType, eventTypes) { QString variableName = QString("%1%2EventTypeId").arg(thingClassName, eventType.name()[0].toUpper() + eventType.name().right(eventType.name().length() - 1)); qCDebug(dcThingManager()) << " |- EventType:" << variableName << eventType.id().toString(); - PyModule_AddStringConstant(m_module, variableName.toUtf8(), eventType.id().toString().toUtf8()); + PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), eventType.id().toString().toUtf8()); exportParamTypes(eventType.paramTypes(), thingClassName, "Event", eventType.name()); } - } void PythonIntegrationPlugin::exportActionTypes(const ActionTypes &actionTypes, const QString &thingClassName) @@ -688,7 +612,7 @@ void PythonIntegrationPlugin::exportActionTypes(const ActionTypes &actionTypes, foreach (const ActionType &actionType, actionTypes) { QString variableName = QString("%1%2ActionTypeId").arg(thingClassName, actionType.name()[0].toUpper() + actionType.name().right(actionType.name().length() - 1)); qCDebug(dcThingManager()) << " |- ActionType:" << variableName << actionType.id().toString(); - PyModule_AddStringConstant(m_module, variableName.toUtf8(), actionType.id().toString().toUtf8()); + PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), actionType.id().toString().toUtf8()); exportParamTypes(actionType.paramTypes(), thingClassName, "Action", actionType.name()); } } @@ -698,19 +622,17 @@ void PythonIntegrationPlugin::exportBrowserItemActionTypes(const ActionTypes &ac foreach (const ActionType &actionType, actionTypes) { QString variableName = QString("%1%2BrowserItemActionTypeId").arg(thingClassName, actionType.name()[0].toUpper() + actionType.name().right(actionType.name().length() - 1)); qCDebug(dcThingManager()) << " |- BrowserActionType:" << variableName << actionType.id().toString(); - PyModule_AddStringConstant(m_module, variableName.toUtf8(), actionType.id().toString().toUtf8()); + PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), actionType.id().toString().toUtf8()); exportParamTypes(actionType.paramTypes(), thingClassName, "BrowserItemAction", actionType.name()); } - } - bool PythonIntegrationPlugin::callPluginFunction(const QString &function, PyObject *param1, PyObject *param2, PyObject *param3) { PyEval_RestoreThread(m_threadState); qCDebug(dcThingManager()) << "Calling python plugin function" << function << "on plugin" << pluginName(); - PyObject *pluginFunction = PyObject_GetAttrString(m_module, function.toUtf8()); + PyObject *pluginFunction = PyObject_GetAttrString(m_pluginModule, function.toUtf8()); if(!pluginFunction || !PyCallable_Check(pluginFunction)) { PyErr_Clear(); Py_XDECREF(pluginFunction); @@ -719,84 +641,45 @@ bool PythonIntegrationPlugin::callPluginFunction(const QString &function, PyObje return false; } - - PyObject *pluginFunctionResult = PyObject_CallFunctionObjArgs(pluginFunction, param1, param2, param3, nullptr); - - Py_XDECREF(pluginFunction); - - if (PyErr_Occurred()) { - qCWarning(dcThingManager()) << "Error calling python method:" << function << "on plugin" << pluginName(); - PyErr_Print(); - PyErr_Clear(); - PyEval_ReleaseThread(m_threadState); - return false; - } - - if (QByteArray(pluginFunctionResult->ob_type->tp_name) != "coroutine") { - Py_DECREF(pluginFunctionResult); - PyEval_ReleaseThread(m_threadState); - return true; - } - - // Spawn a new event loop for the thread - QFuture future = QtConcurrent::run([this, pluginFunctionResult, function](){ - qCDebug(dcPythonIntegrations()) << "Spawning thread for" << function << "in plugin" << metadata().pluginName(); - - // Register this new thread in the interpreter - PyThreadState *thread = PyThreadState_New(m_threadState->interp); - - // Acquire GIL and make the new thread state the current one - PyEval_RestoreThread(thread); - - PyObject *new_event_loop = PyObject_GetAttrString(m_asyncio, "new_event_loop"); - PyObject *loop = PyObject_CallFunctionObjArgs(new_event_loop, nullptr); - - Py_DECREF(new_event_loop); - - PyObject *create_task = PyObject_GetAttrString(loop, "create_task"); - PyObject *task = PyObject_CallFunctionObjArgs(create_task, pluginFunctionResult, nullptr); - dumpError(); - - Py_DECREF(create_task); - Py_DECREF(pluginFunctionResult); - - PyObject *add_done_callback = PyObject_GetAttrString(task, "add_done_callback"); - dumpError(); - - PyObject *task_done = PyObject_GetAttrString(m_nymeaModule, "task_done"); - PyObject *coroutineResult = PyObject_CallFunctionObjArgs(add_done_callback, task_done, nullptr); - dumpError(); - - Py_DECREF(coroutineResult); - Py_DECREF(add_done_callback); - Py_DECREF(task_done); - - PyObject *run_until_complete = PyObject_GetAttrString(loop, "run_until_complete"); - - Py_DECREF(loop); - - PyObject *taskResult = PyObject_CallFunctionObjArgs(run_until_complete, task, nullptr); - dumpError(); - - Py_XDECREF(taskResult); - Py_DECREF(run_until_complete); - Py_DECREF(task); - - // Destroy the thread and release the GIL - PyThreadState_Clear(thread); - PyThreadState_DeleteCurrent(); - qCDebug(dcPythonIntegrations()) << "Thread for" << function << "in plugin" << metadata().pluginName() << "ended"; - }); + Py_XINCREF(param1); + Py_XINCREF(param2); + Py_XINCREF(param3); QFutureWatcher *watcher = new QFutureWatcher(this); - watcher->setFuture(future); - m_runningTasks.insert(watcher); - connect(watcher, &QFutureWatcher::finished, this, [this, watcher](){ + // Run the plugin function in the thread pool + QFuture future = QtConcurrent::run(m_threadPool, [=](){ + qCDebug(dcPythonIntegrations()) << "+++ Thread for" << function << "in plugin" << metadata().pluginName(); + + // Register this new thread in the interpreter + PyThreadState *threadState = PyThreadState_New(m_threadState->interp); + + // Acquire GIL and make the new thread state the current one + PyEval_RestoreThread(threadState); + + PyObject *pluginFunctionResult = PyObject_CallFunctionObjArgs(pluginFunction, param1, param2, param3, nullptr); + dumpError(); + + if (PyErr_Occurred()) { + qCWarning(dcThingManager()) << "Error calling python method:" << function << "on plugin" << pluginName(); + } + + Py_DECREF(pluginFunction); + Py_XDECREF(pluginFunctionResult); + Py_XDECREF(param1); + Py_XDECREF(param2); + Py_XDECREF(param3); + m_runningTasks.remove(watcher); - delete watcher; - }); + // Destroy the thread and release the GIL + PyThreadState_Clear(threadState); + PyEval_ReleaseThread(threadState); + PyThreadState_Delete(threadState); + qCDebug(dcPythonIntegrations()) << "--- Thread for" << function << "in plugin" << metadata().pluginName(); + }); + watcher->setFuture(future); + m_runningTasks.insert(watcher, function); PyEval_ReleaseThread(m_threadState); return true; diff --git a/libnymea-core/integrations/pythonintegrationplugin.h b/libnymea-core/integrations/pythonintegrationplugin.h index 927cd314..733777da 100644 --- a/libnymea-core/integrations/pythonintegrationplugin.h +++ b/libnymea-core/integrations/pythonintegrationplugin.h @@ -47,10 +47,6 @@ public: static PyObject* pyAutoThingsAppeared(PyObject *self, PyObject* args); static PyObject* pyAutoThingDisappeared(PyObject *self, PyObject* args); -public: - // python callbacks - static PyObject* task_done(PyObject* self, PyObject* args); - private: void exportIds(); void exportThingClass(const ThingClass &thingClass); @@ -66,17 +62,22 @@ private: private: // The main thread state in which we create an interpreter per plugin static PyThreadState* s_mainThreadState; - static QThreadPool *s_threadPool; // A per plugin thread state and interpreter PyThreadState *m_threadState = nullptr; - // Modules imported into the interpreter - PyObject *m_nymeaModule; - PyObject *m_asyncio; + // A per plugin thread pool + QThreadPool *m_threadPool = nullptr; + // Running concurrent tasks in this plugins thread pool + QHash*, QString> m_runningTasks; + + // The nymea module we import into the interpreter + PyObject *m_nymeaModule = nullptr; // The imported plugin module (the plugin.py) - PyObject *m_module = nullptr; + PyObject *m_pluginModule = nullptr; + + PyObject *m_logger = nullptr; // A map of plugin instances to plugin python scripts/modules // Make sure to hold the GIL when accessing this. @@ -91,8 +92,6 @@ private: // Need to keep a copy of plugin params and sync that in a thread-safe manner ParamList m_pluginConfigCopy; - // Running concurrent tasks in this plugin - QSet*> m_runningTasks; }; #endif // PYTHONINTEGRATIONPLUGIN_H diff --git a/libnymea-core/libnymea-core.pro b/libnymea-core/libnymea-core.pro index a1afcd38..c7f488cc 100644 --- a/libnymea-core/libnymea-core.pro +++ b/libnymea-core/libnymea-core.pro @@ -21,6 +21,7 @@ RESOURCES += $$top_srcdir/icons.qrc \ HEADERS += nymeacore.h \ integrations/plugininfocache.h \ integrations/python/pynymealogginghandler.h \ + integrations/python/pynymeamodule.h \ integrations/python/pyparam.h \ integrations/python/pything.h \ integrations/python/pythingactioninfo.h \ diff --git a/plugins/pymock/integrationpluginpymock.py b/plugins/pymock/integrationpluginpymock.py index 2107a51e..42eaa846 100644 --- a/plugins/pymock/integrationpluginpymock.py +++ b/plugins/pymock/integrationpluginpymock.py @@ -1,14 +1,17 @@ import nymea -import asyncio +import time #from fastdotcom import fast_com watchingAutoThings = False +loopRunning = False -async def init(): +def init(): logger.log("Python mock plugin init") + global loopRunning + loopRunning = True - while True: - await asyncio.sleep(5); + while loopRunning: + time.sleep(5); for thing in myThings(): if thing.thingClassId == pyMockThingClassId: logger.log("Emitting event 1 for", thing.name, "eventTypeId", pyMockEvent1EventTypeId) @@ -20,6 +23,13 @@ async def init(): thing.emitEvent(pyMockDiscoveryPairingEvent1EventTypeId, [nymea.Param(pyMockDiscoveryPairingEvent1EventParam1ParamTypeId, "Im an event")]) logger.log("Setting state 1 for", thing.name, "Old value is:", thing.stateValue(pyMockDiscoveryPairingState1StateTypeId)) thing.setStateValue(pyMockDiscoveryPairingState1StateTypeId, thing.stateValue(pyMockDiscoveryPairingState1StateTypeId) + 1) + logger.log("Bye bye") + + +def deinit(): + logger.log("shutting down") + global loopRunning + loopRunning = False def configValueChanged(paramTypeId, value): @@ -55,9 +65,9 @@ def startMonitoringAutoThings(): logger.log("Done start monitoring auto things") -async def discoverThings(info): +def discoverThings(info): logger.log("Discovery started for", info.thingClassId, "with result count:", info.params[0].value) - await asyncio.sleep(10) # Some delay for giving a feeling of a discovery + time.sleep(10) # Some delay for giving a feeling of a discovery # Add 2 new discovery results for i in range(0, info.params[0].value): info.addDescriptor(nymea.ThingDescriptor(pyMockDiscoveryPairingThingClassId, "Python mock thing %i" % i)) @@ -69,26 +79,26 @@ async def discoverThings(info): info.finish(nymea.ThingErrorNoError) -async def startPairing(info): +def startPairing(info): logger.log("startPairing for", info.thingName, info.thingId, info.params) info.finish(nymea.ThingErrorNoError, "Log in as user \"john\" with password \"smith\".") -async def confirmPairing(info, username, secret): +def confirmPairing(info, username, secret): logger.log("confirming pairing for", info.thingName, username, secret) - await asyncio.sleep(1) + time.sleep(1) if username == "john" and secret == "smith": info.finish(nymea.ThingErrorNoError) else: info.finish(nymea.ThingErrorAuthenticationFailure, "Error logging in here!") -async def setupThing(info): +def setupThing(info): logger.log("setupThing for", info.thing.name) info.finish(nymea.ThingErrorNoError) -async def postSetupThing(thing): +def postSetupThing(thing): logger.log("postSetupThing for", thing.name) thing.nameChangedHandler = lambda thing : logger.log("Thing name changed", thing.name)