diff --git a/libnymea-core/integrations/pythonintegrationplugin.cpp b/libnymea-core/integrations/pythonintegrationplugin.cpp index d4a2f33a..0ba9f215 100644 --- a/libnymea-core/integrations/pythonintegrationplugin.cpp +++ b/libnymea-core/integrations/pythonintegrationplugin.cpp @@ -19,10 +19,10 @@ #include #include #include +#include -PyThreadState* PythonIntegrationPlugin::s_mainThread = nullptr; -PyObject* PythonIntegrationPlugin::s_nymeaModule = nullptr; -PyObject* PythonIntegrationPlugin::s_asyncio = nullptr; +PyThreadState* PythonIntegrationPlugin::s_mainThreadState = nullptr; +QThreadPool* PythonIntegrationPlugin::s_threadPool = nullptr; QHash PythonIntegrationPlugin::s_plugins; @@ -34,7 +34,7 @@ PyObject* PythonIntegrationPlugin::task_done(PyObject* self, PyObject* args) PyObject *result = nullptr; if (!PyArg_ParseTuple(args, "O", &result)) { - qCWarning(dcThingManager()) << "Cannot fetch result from coroutine callback."; + qCWarning(dcPythonIntegrations()) << "Cannot fetch result from coroutine callback."; return nullptr; } @@ -50,7 +50,7 @@ PyObject* PythonIntegrationPlugin::task_done(PyObject* self, PyObject* args) // PyObject *traceback = PyObject_CallMethodObjArgs(exception, "__traceback__", nullptr); - qCWarning(dcThingManager()) << "Exception in plugin:" << bytes; + qCWarning(dcPythonIntegrations()) << "Exception in plugin:" << bytes; PyErr_Clear(); } @@ -274,39 +274,47 @@ PythonIntegrationPlugin::PythonIntegrationPlugin(QObject *parent) : IntegrationP PythonIntegrationPlugin::~PythonIntegrationPlugin() { - PyGILState_STATE s = PyGILState_Ensure(); + // Acquire GIL for this plugin's interpreter + PyEval_RestoreThread(m_threadState); - while (!m_runningThreads.isEmpty()) { - PyObject *loop = m_runningThreads.keys().first(); - PyObject *stop = PyObject_GetAttrString(loop, "stop"); - PyObject_CallFunctionObjArgs(stop, nullptr); + // Cancel all the thread in here + while (!m_runningTasks.isEmpty()) { + QFutureWatcher *watcher = m_runningTasks.values().first(); + watcher->cancel(); + watcher->waitForFinished(); } Py_XDECREF(s_plugins.take(this)); - PyGILState_Release(s); + + Py_EndInterpreter(m_threadState); + + PyThreadState_Swap(s_mainThreadState); + PyEval_ReleaseThread(s_mainThreadState); } void PythonIntegrationPlugin::initPython() { + Q_ASSERT_X(s_mainThreadState == nullptr, "PythonIntegrationPlugin::initPython()", "initPython() must be called exactly once."); + PyImport_AppendInittab("nymea", PyInit_nymea); Py_InitializeEx(0); PyEval_InitThreads(); + // Store the main thread state and release the GIL + s_mainThreadState = PyEval_SaveThread(); - // Import nymea module into this interpreter - s_nymeaModule = PyImport_ImportModule("nymea"); - - // We'll be using asyncio everywhere, so let's import it right away - s_asyncio = PyImport_ImportModule("asyncio"); - - // Need to release the lock from the main thread before spawning new threads - s_mainThread = PyEval_SaveThread(); + // Allocate a shared thread pool for the plugins + s_threadPool = new QThreadPool(); + qCDebug(dcPythonIntegrations()) << "Created a thread pool with a maximum of" << s_threadPool->maxThreadCount() << "threads for python plugins."; } void PythonIntegrationPlugin::deinitPython() { - PyEval_RestoreThread(s_mainThread); + PyEval_RestoreThread(s_mainThreadState); Py_FinalizeEx(); + + s_threadPool->deleteLater(); + s_threadPool = nullptr; } bool PythonIntegrationPlugin::loadScript(const QString &scriptFile) @@ -330,31 +338,52 @@ bool PythonIntegrationPlugin::loadScript(const QString &scriptFile) return false; } -// PyThreadState *m_thread = Py_NewInterpreter(); -// PyInterpreterState *m_interpreter = PyInterpreterState_New(); + // Grab the main thread context and GIL + PyEval_RestoreThread(s_mainThreadState); -// PyEval_RestoreThread(s_mainThread); - PyGILState_STATE s = PyGILState_Ensure(); + // Create a new interpreter + m_threadState = Py_NewInterpreter(); - // Finally, import the plugin + // Switch to the new interpreter thread state + PyThreadState_Swap(m_threadState); + + // Import nymea module into this interpreter + m_nymeaModule = PyImport_ImportModule("nymea"); + + // Set up import paths for the plugin PyObject* sysPath = PySys_GetObject("path"); - PyObject* importPath = PyUnicode_FromString(fi.absolutePath().toUtf8()); - PyList_Append(sysPath, importPath); - Py_DECREF(importPath); + PyObject* pluginImportPath = PyUnicode_FromString(fi.absolutePath().toUtf8()); + PyList_Append(sysPath, pluginImportPath); + Py_DECREF(pluginImportPath); + + PyObject* pluginModulesImportPath = PyUnicode_FromString(QString("%1/modules/").arg(fi.absolutePath()).toUtf8()); + PyList_Append(sysPath, pluginModulesImportPath); + Py_DECREF(pluginModulesImportPath); + m_module = PyImport_ImportModule(fi.baseName().toUtf8()); if (!m_module) { qCWarning(dcThingManager()) << "Error importing python plugin from:" << fi.absoluteFilePath(); PyErr_Print(); PyErr_Clear(); -// PyEval_SaveThread(); - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); return false; } qCDebug(dcThingManager()) << "Imported python plugin from" << fi.absoluteFilePath(); s_plugins.insert(this, m_module); + // We'll be using asyncio everywhere, so let's import it right away + // IMPORTANT: The asyncio module is a bit special in a sense that it actually shares + // stuff between interpreters. See https://docs.python.org/3/c-api/init.html#c.Py_NewInterpreter + // for the listed bugs and caveats. + // If we destroy the first interpreter that imports asyncio, things will become crashy as the + // interpreter will tear down the module, leaving shallow copies of the modules dict in other + // interpreters. So let's import the module down here after we're sure this module compiles and + // won't be unloaded any more. + // Note: This becomes a problem when we'll be supporting to unload plugins at runtime. + m_asyncio = PyImport_ImportModule("asyncio"); + // Set up logger with appropriate logging category PyNymeaLoggingHandler *logger = reinterpret_cast(_PyObject_New(&PyNymeaLoggingHandlerType)); QString category = metadata().pluginName(); @@ -371,8 +400,7 @@ bool PythonIntegrationPlugin::loadScript(const QString &scriptFile) // Register config access methods PyModule_AddFunctions(m_module, plugin_methods); -// PyEval_SaveThread(); - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); // Set up connections to be forwareded into the plugin connect(this, &PythonIntegrationPlugin::configValueChanged, this, [this](const ParamTypeId ¶mTypeId, const QVariant &value){ @@ -408,18 +436,18 @@ void PythonIntegrationPlugin::startMonitoringAutoThings() void PythonIntegrationPlugin::discoverThings(ThingDiscoveryInfo *info) { - PyGILState_STATE s = PyGILState_Ensure(); + PyEval_RestoreThread(m_threadState); PyThingDiscoveryInfo *pyInfo = (PyThingDiscoveryInfo*)PyObject_CallObject((PyObject*)&PyThingDiscoveryInfoType, NULL); PyThingDiscoveryInfo_setInfo(pyInfo, info); - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); connect(info, &ThingDiscoveryInfo::destroyed, this, [=](){ - auto s = PyGILState_Ensure(); + PyEval_RestoreThread(m_threadState); pyInfo->info = nullptr; Py_DECREF(pyInfo); - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); }); callPluginFunction("discoverThings", reinterpret_cast(pyInfo)); @@ -427,19 +455,18 @@ void PythonIntegrationPlugin::discoverThings(ThingDiscoveryInfo *info) void PythonIntegrationPlugin::startPairing(ThingPairingInfo *info) { - PyGILState_STATE s = PyGILState_Ensure(); + PyEval_RestoreThread(m_threadState); PyThingPairingInfo *pyInfo = (PyThingPairingInfo*)PyObject_CallObject((PyObject*)&PyThingPairingInfoType, nullptr); PyThingPairingInfo_setInfo(pyInfo, info); - PyGILState_Release(s); - + PyEval_ReleaseThread(m_threadState); connect(info, &ThingPairingInfo::destroyed, this, [=](){ - auto s = PyGILState_Ensure(); + PyEval_RestoreThread(m_threadState); pyInfo->info = nullptr; Py_DECREF(pyInfo); - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); }); bool result = callPluginFunction("startPairing", reinterpret_cast(pyInfo)); @@ -450,19 +477,18 @@ void PythonIntegrationPlugin::startPairing(ThingPairingInfo *info) void PythonIntegrationPlugin::confirmPairing(ThingPairingInfo *info, const QString &username, const QString &secret) { - PyGILState_STATE s = PyGILState_Ensure(); + PyEval_RestoreThread(m_threadState); PyThingPairingInfo *pyInfo = (PyThingPairingInfo*)PyObject_CallObject((PyObject*)&PyThingPairingInfoType, nullptr); PyThingPairingInfo_setInfo(pyInfo, info); - PyGILState_Release(s); - + PyEval_ReleaseThread(m_threadState); connect(info, &ThingPairingInfo::destroyed, this, [=](){ - auto s = PyGILState_Ensure(); + PyEval_RestoreThread(m_threadState); pyInfo->info = nullptr; Py_DECREF(pyInfo); - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); }); PyObject *pyUsername = PyUnicode_FromString(username.toUtf8().data()); @@ -478,7 +504,7 @@ void PythonIntegrationPlugin::confirmPairing(ThingPairingInfo *info, const QStri void PythonIntegrationPlugin::setupThing(ThingSetupInfo *info) { - PyGILState_STATE s = PyGILState_Ensure(); + PyEval_RestoreThread(m_threadState); PyThing *pyThing = nullptr; if (m_things.contains(info->thing())) { @@ -497,19 +523,19 @@ void PythonIntegrationPlugin::setupThing(ThingSetupInfo *info) pyInfo->info = info; - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); connect(info->thing(), &Thing::destroyed, this, [=](){ - auto s = PyGILState_Ensure(); + PyEval_RestoreThread(m_threadState); pyThing->thing = nullptr; Py_DECREF(pyThing); - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); }); connect(info, &ThingSetupInfo::destroyed, this, [=](){ - auto s = PyGILState_Ensure(); + PyEval_RestoreThread(m_threadState); pyInfo->info = nullptr; Py_DECREF(pyInfo); - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); }); @@ -529,25 +555,24 @@ void PythonIntegrationPlugin::postSetupThing(Thing *thing) if (!success) { Py_DECREF(pyThing); } - } void PythonIntegrationPlugin::executeAction(ThingActionInfo *info) { PyThing *pyThing = m_things.value(info->thing()); - PyGILState_STATE s = PyGILState_Ensure(); + PyEval_RestoreThread(m_threadState); PyThingActionInfo *pyInfo = (PyThingActionInfo*)PyObject_CallObject((PyObject*)&PyThingActionInfoType, NULL); PyThingActionInfo_setInfo(pyInfo, info, pyThing); - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); - connect(info, &ThingActionInfo::destroyed, this, [=](){ - auto s = PyGILState_Ensure(); + connect(info, &ThingActionInfo::destroyed, this, [=](){ + PyEval_RestoreThread(m_threadState); pyInfo->info = nullptr; Py_DECREF(pyInfo); - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); }); bool success = callPluginFunction("executeAction", reinterpret_cast(pyInfo)); @@ -682,84 +707,98 @@ void PythonIntegrationPlugin::exportBrowserItemActionTypes(const ActionTypes &ac bool PythonIntegrationPlugin::callPluginFunction(const QString &function, PyObject *param1, PyObject *param2, PyObject *param3) { - PyGILState_STATE s = PyGILState_Ensure(); -// PyEval_RestoreThread(s_mainThread); + PyEval_RestoreThread(m_threadState); qCDebug(dcThingManager()) << "Calling python plugin function" << function << "on plugin" << pluginName(); - PyObject *pFunc = PyObject_GetAttrString(m_module, function.toUtf8()); - if(!pFunc || !PyCallable_Check(pFunc)) { + PyObject *pluginFunction = PyObject_GetAttrString(m_module, function.toUtf8()); + if(!pluginFunction || !PyCallable_Check(pluginFunction)) { PyErr_Clear(); - Py_XDECREF(pFunc); - qCWarning(dcThingManager()) << "Python plugin" << pluginName() << "does not implement" << function; - PyGILState_Release(s); -// PyEval_SaveThread(); + Py_XDECREF(pluginFunction); + qCDebug(dcThingManager()) << "Python plugin" << pluginName() << "does not implement" << function; + PyEval_ReleaseThread(m_threadState); return false; } - PyObject *result = PyObject_CallFunctionObjArgs(pFunc, param1, param2, param3, nullptr); + PyObject *pluginFunctionResult = PyObject_CallFunctionObjArgs(pluginFunction, param1, param2, param3, nullptr); - Py_XDECREF(pFunc); + Py_XDECREF(pluginFunction); if (PyErr_Occurred()) { qCWarning(dcThingManager()) << "Error calling python method:" << function << "on plugin" << pluginName(); PyErr_Print(); PyErr_Clear(); -// PyEval_SaveThread(); - PyGILState_Release(s); + PyEval_ReleaseThread(m_threadState); return false; } - if (QByteArray(result->ob_type->tp_name) != "coroutine") { - Py_DECREF(result); -// PyEval_SaveThread(); - PyGILState_Release(s); + if (QByteArray(pluginFunctionResult->ob_type->tp_name) != "coroutine") { + Py_DECREF(pluginFunctionResult); + PyEval_ReleaseThread(m_threadState); return true; } - // Spawn a event loop for the thread - PyObject *new_event_loop = PyObject_GetAttrString(s_asyncio, "new_event_loop"); - PyObject *loop = PyObject_CallFunctionObjArgs(new_event_loop, nullptr); + // Spawn a new event loop for the thread + QFuture future = QtConcurrent::run([this, pluginFunctionResult, function](){ + qCDebug(dcPythonIntegrations()) << "Spawning thread for" << function << "in plugin" << metadata().pluginName(); - Py_DECREF(new_event_loop); + // Register this new thread in the interpreter + PyThreadState *thread = PyThreadState_New(m_threadState->interp); - PyObject *create_task = PyObject_GetAttrString(loop, "create_task"); - PyObject *task = PyObject_CallFunctionObjArgs(create_task, result, nullptr); - dumpError(); + // Acquire GIL and make the new thread state the current one + PyEval_RestoreThread(thread); - Py_DECREF(result); + PyObject *new_event_loop = PyObject_GetAttrString(m_asyncio, "new_event_loop"); + PyObject *loop = PyObject_CallFunctionObjArgs(new_event_loop, nullptr); - PyObject *add_done_callback = PyObject_GetAttrString(task, "add_done_callback"); - dumpError(); + Py_DECREF(new_event_loop); - PyObject *task_done = PyObject_GetAttrString(s_nymeaModule, "task_done"); - result = PyObject_CallFunctionObjArgs(add_done_callback, task_done, nullptr); - dumpError(); + PyObject *create_task = PyObject_GetAttrString(loop, "create_task"); + PyObject *task = PyObject_CallFunctionObjArgs(create_task, pluginFunctionResult, nullptr); + dumpError(); + + Py_DECREF(create_task); + Py_DECREF(pluginFunctionResult); + + PyObject *add_done_callback = PyObject_GetAttrString(task, "add_done_callback"); + dumpError(); + + PyObject *task_done = PyObject_GetAttrString(m_nymeaModule, "task_done"); + PyObject *coroutineResult = PyObject_CallFunctionObjArgs(add_done_callback, task_done, nullptr); + dumpError(); + + Py_DECREF(coroutineResult); + Py_DECREF(add_done_callback); + Py_DECREF(task_done); + + PyObject *run_until_complete = PyObject_GetAttrString(loop, "run_until_complete"); - PyObject *run_until_complete = PyObject_GetAttrString(loop, "run_until_complete"); - QFuture future = QtConcurrent::run([this, run_until_complete, task, loop, result](){ - PyGILState_STATE g = PyGILState_Ensure(); -// auto s = PyThreadState_New(PyInterpreterState_Main()); -// PyThreadState *previousThreadState = PyThreadState_Swap(s); - PyObject_CallFunctionObjArgs(run_until_complete, task, nullptr); Py_DECREF(loop); - Py_DECREF(task); + + PyObject *taskResult = PyObject_CallFunctionObjArgs(run_until_complete, task, nullptr); + dumpError(); + + Py_XDECREF(taskResult); Py_DECREF(run_until_complete); - Py_DECREF(result); - m_runningThreads.remove(loop); -// PyThreadState_Swap(previousThreadState); -// PyThreadState_Clear(s); -// PyThreadState_Delete(s); - PyGILState_Release(g); + Py_DECREF(task); + + // Destroy the thread and release the GIL + PyThreadState_Clear(thread); + PyThreadState_DeleteCurrent(); + qCDebug(dcPythonIntegrations()) << "Thread for" << function << "in plugin" << metadata().pluginName() << "ended"; }); - m_runningThreads.insert(loop, future); - Py_DECREF(create_task); - Py_DECREF(add_done_callback); + QFutureWatcher *watcher = new QFutureWatcher(this); + watcher->setFuture(future); + m_runningTasks.insert(watcher); - PyGILState_Release(s); -// PyEval_SaveThread(); + connect(watcher, &QFutureWatcher::finished, this, [this, watcher](){ + m_runningTasks.remove(watcher); + delete watcher; + }); + + PyEval_ReleaseThread(m_threadState); return true; } diff --git a/libnymea-core/integrations/pythonintegrationplugin.h b/libnymea-core/integrations/pythonintegrationplugin.h index e34835e9..927cd314 100644 --- a/libnymea-core/integrations/pythonintegrationplugin.h +++ b/libnymea-core/integrations/pythonintegrationplugin.h @@ -6,11 +6,13 @@ #include #include #include +#include extern "C" { typedef struct _object PyObject; typedef struct _ts PyThreadState; typedef struct _thing PyThing; +typedef struct _is PyInterpreterState; } @@ -62,11 +64,19 @@ private: bool callPluginFunction(const QString &function, PyObject *param1 = nullptr, PyObject *param2 = nullptr, PyObject *param3 = nullptr); private: - static PyThreadState* s_mainThread; + // The main thread state in which we create an interpreter per plugin + static PyThreadState* s_mainThreadState; + static QThreadPool *s_threadPool; - // Modules imported into the global context - static PyObject *s_nymeaModule; - static PyObject *s_asyncio; + // A per plugin thread state and interpreter + PyThreadState *m_threadState = nullptr; + + // Modules imported into the interpreter + PyObject *m_nymeaModule; + PyObject *m_asyncio; + + // The imported plugin module (the plugin.py) + PyObject *m_module = nullptr; // A map of plugin instances to plugin python scripts/modules // Make sure to hold the GIL when accessing this. @@ -75,16 +85,14 @@ private: // Used for guarding access from the python threads to the plugin instance QMutex m_mutex; - // The imported plugin module - PyObject *m_module = nullptr; - // Things held by this plugin instance QHash m_things; // Need to keep a copy of plugin params and sync that in a thread-safe manner ParamList m_pluginConfigCopy; - QHash> m_runningThreads; + // Running concurrent tasks in this plugin + QSet*> m_runningTasks; }; #endif // PYTHONINTEGRATIONPLUGIN_H diff --git a/plugins/pymock/integrationpluginpymock.py b/plugins/pymock/integrationpluginpymock.py index 285a0071..2107a51e 100644 --- a/plugins/pymock/integrationpluginpymock.py +++ b/plugins/pymock/integrationpluginpymock.py @@ -1,5 +1,6 @@ import nymea import asyncio +#from fastdotcom import fast_com watchingAutoThings = False @@ -56,7 +57,7 @@ def startMonitoringAutoThings(): async def discoverThings(info): logger.log("Discovery started for", info.thingClassId, "with result count:", info.params[0].value) - await asyncio.sleep(1) # Some delay for giving a feeling of a discovery + await asyncio.sleep(10) # Some delay for giving a feeling of a discovery # Add 2 new discovery results for i in range(0, info.params[0].value): info.addDescriptor(nymea.ThingDescriptor(pyMockDiscoveryPairingThingClassId, "Python mock thing %i" % i))