drop usage of asyncio and run everything regularly threaded
This commit is contained in:
parent
64e7d2784c
commit
3436e9b998
@ -7,6 +7,8 @@
|
||||
#include <QStringList>
|
||||
#include <QLoggingCategory>
|
||||
|
||||
Q_DECLARE_LOGGING_CATEGORY(dcPythonIntegrations)
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
|
||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||
@ -16,14 +18,23 @@ typedef struct {
|
||||
char *category;
|
||||
} PyNymeaLoggingHandler;
|
||||
|
||||
static int PyNymeaLoggingHandler_init(PyNymeaLoggingHandler */*self*/, PyObject */*args*/, PyObject */*kwds*/)
|
||||
static int PyNymeaLoggingHandler_init(PyNymeaLoggingHandler *self, PyObject *args, PyObject */*kwds*/)
|
||||
{
|
||||
char *category = nullptr;
|
||||
if (!PyArg_ParseTuple(args, "s", &category)) {
|
||||
qCWarning(dcPythonIntegrations()) << "PyNymeaLoggingHandler: Error parsing parameters";
|
||||
return -1;
|
||||
}
|
||||
|
||||
self->category = (char*)malloc(qstrlen(category));
|
||||
qstrcpy(self->category, category);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void PyNymeaLoggingHandler_dealloc(PyNymeaLoggingHandler * self)
|
||||
// destruct the object
|
||||
{
|
||||
free(self->category);
|
||||
Py_TYPE(self)->tp_free(self);
|
||||
}
|
||||
|
||||
@ -106,5 +117,6 @@ static void registerNymeaLoggingHandler(PyObject *module)
|
||||
}
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
#endif // PYNYMEALOGGINGHANDLER_H
|
||||
|
||||
62
libnymea-core/integrations/python/pynymeamodule.h
Normal file
62
libnymea-core/integrations/python/pynymeamodule.h
Normal file
@ -0,0 +1,62 @@
|
||||
#ifndef PYNYMEAMODULE_H
|
||||
#define PYNYMEAMODULE_H
|
||||
|
||||
#include <Python.h>
|
||||
|
||||
#include "pynymealogginghandler.h"
|
||||
#include "pything.h"
|
||||
#include "pythingdiscoveryinfo.h"
|
||||
#include "pythingsetupinfo.h"
|
||||
#include "pyparam.h"
|
||||
#include "pythingactioninfo.h"
|
||||
#include "pythingpairinginfo.h"
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
|
||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||
|
||||
static int nymea_exec(PyObject *m) {
|
||||
|
||||
// Override stdout/stderr to use qDebug instead
|
||||
PyObject* pyLog = PyModule_Create(&pyLog_module);
|
||||
PySys_SetObject("stdout", pyLog);
|
||||
PyObject* pyWarn = PyModule_Create(&pyWarn_module);
|
||||
PySys_SetObject("stderr", pyWarn);
|
||||
|
||||
registerNymeaLoggingHandler(m);
|
||||
registerParamType(m);
|
||||
registerThingType(m);
|
||||
registerThingDescriptorType(m);
|
||||
registerThingDiscoveryInfoType(m);
|
||||
registerThingPairingInfoType(m);
|
||||
registerThingSetupInfoType(m);
|
||||
registerThingActionInfoType(m);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct PyModuleDef_Slot nymea_slots[] = {
|
||||
{Py_mod_exec, (void*)nymea_exec},
|
||||
{0, NULL},
|
||||
};
|
||||
|
||||
static struct PyModuleDef nymea_module = {
|
||||
PyModuleDef_HEAD_INIT,
|
||||
"nymea",
|
||||
"The nymea module. Provdes types used in the nymea plugin API.",
|
||||
0,
|
||||
nullptr, // methods
|
||||
nymea_slots, // slots
|
||||
nullptr,
|
||||
nullptr,
|
||||
nullptr
|
||||
};
|
||||
|
||||
PyMODINIT_FUNC PyInit_nymea(void)
|
||||
{
|
||||
return PyModuleDef_Init(&nymea_module);
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
#endif // PYNYMEAMODULE_H
|
||||
@ -35,7 +35,7 @@ static PyMemberDef PyParam_members[] = {
|
||||
|
||||
static int PyParam_init(PyParam *self, PyObject *args, PyObject *kwds)
|
||||
{
|
||||
qWarning() << "++++ PyParam";
|
||||
qCDebug(dcPythonIntegrations()) << "+++ PyParam";
|
||||
static char *kwlist[] = {"paramTypeId", "value", nullptr};
|
||||
PyObject *paramTypeId = nullptr, *value = nullptr;
|
||||
|
||||
@ -54,7 +54,7 @@ static int PyParam_init(PyParam *self, PyObject *args, PyObject *kwds)
|
||||
}
|
||||
|
||||
static void PyParam_dealloc(PyParam * self) {
|
||||
qWarning() << "---- PyParam";
|
||||
qCDebug(dcPythonIntegrations()) << "--- PyParam";
|
||||
Py_XDECREF(self->pyParamTypeId);
|
||||
Py_XDECREF(self->pyValue);
|
||||
Py_TYPE(self)->tp_free(self);
|
||||
|
||||
@ -53,7 +53,7 @@ static PyObject* PyThing_new(PyTypeObject *type, PyObject */*args*/, PyObject */
|
||||
if (self == NULL) {
|
||||
return nullptr;
|
||||
}
|
||||
qWarning() << "*++++ PyThing" << self;
|
||||
qCDebug(dcPythonIntegrations()) << "+++ PyThing" << self;
|
||||
|
||||
return (PyObject*)self;
|
||||
}
|
||||
@ -127,7 +127,7 @@ static void PyThing_setThing(PyThing *self, Thing *thing)
|
||||
|
||||
|
||||
static void PyThing_dealloc(PyThing * self) {
|
||||
qWarning() << "----- PyThing" << self;
|
||||
qCDebug(dcPythonIntegrations()) << "--- PyThing" << self;
|
||||
Py_XDECREF(self->pyId);
|
||||
Py_XDECREF(self->pyThingClassId);
|
||||
Py_XDECREF(self->pyName);
|
||||
|
||||
@ -43,7 +43,7 @@ static PyObject* PyThingActionInfo_new(PyTypeObject *type, PyObject */*args*/, P
|
||||
if (self == NULL) {
|
||||
return nullptr;
|
||||
}
|
||||
qWarning() << "+++ PyThingActionInfo";
|
||||
qCDebug(dcPythonIntegrations()) << "+++ PyThingActionInfo";
|
||||
return (PyObject*)self;
|
||||
}
|
||||
|
||||
@ -59,7 +59,7 @@ void PyThingActionInfo_setInfo(PyThingActionInfo *self, ThingActionInfo *info, P
|
||||
|
||||
static void PyThingActionInfo_dealloc(PyThingActionInfo * self)
|
||||
{
|
||||
qWarning() << "---- PyThingActionInfo";
|
||||
qCDebug(dcPythonIntegrations()) << "--- PyThingActionInfo";
|
||||
Py_DECREF(self->pyThing);
|
||||
Py_DECREF(self->pyActionTypeId);
|
||||
Py_DECREF(self->pyParams);
|
||||
|
||||
@ -41,7 +41,7 @@ static PyObject* PyThingSetupInfo_new(PyTypeObject *type, PyObject *args, PyObje
|
||||
if (self == NULL) {
|
||||
return nullptr;
|
||||
}
|
||||
qWarning() << "++++ PyThingSetupInfo";
|
||||
qCDebug(dcPythonIntegrations()) << "+++ PyThingSetupInfo";
|
||||
|
||||
|
||||
static char *kwlist[] = {"thing", nullptr};
|
||||
@ -60,7 +60,7 @@ static PyObject* PyThingSetupInfo_new(PyTypeObject *type, PyObject *args, PyObje
|
||||
}
|
||||
|
||||
static void PyThingSetupInfo_dealloc(PyThingSetupInfo * self) {
|
||||
qWarning() << "--- PyThingSetupInfo";
|
||||
qCDebug(dcPythonIntegrations()) << "--- PyThingSetupInfo";
|
||||
Py_DECREF(self->pyThing);
|
||||
Py_TYPE(self)->tp_free(self);
|
||||
}
|
||||
|
||||
@ -65,7 +65,7 @@ QVariant PyObjectToQVariant(PyObject *pyObject)
|
||||
return QVariant(PyObject_IsTrue(pyObject));
|
||||
}
|
||||
|
||||
Q_ASSERT_X(false, "pyutils.h", "Unhandled data type in conversion PyObject to QVariant!");
|
||||
Q_ASSERT_X(false, "pyutils.h", QString("Unhandled data type in conversion PyObject to QVariant: %1").arg(pyObject->ob_type->tp_name).toUtf8());
|
||||
return QVariant();
|
||||
}
|
||||
|
||||
|
||||
@ -1,14 +1,7 @@
|
||||
#include <Python.h>
|
||||
|
||||
#include "python/pynymealogginghandler.h"
|
||||
#include "python/pything.h"
|
||||
#include "python/pythingdiscoveryinfo.h"
|
||||
#include "python/pythingsetupinfo.h"
|
||||
#include "python/pyparam.h"
|
||||
#include "python/pythingactioninfo.h"
|
||||
#include "python/pythingpairinginfo.h"
|
||||
|
||||
#include "pythonintegrationplugin.h"
|
||||
#include "python/pynymeamodule.h"
|
||||
|
||||
#include "loggingcategories.h"
|
||||
|
||||
@ -22,82 +15,8 @@
|
||||
#include <QFutureWatcher>
|
||||
|
||||
PyThreadState* PythonIntegrationPlugin::s_mainThreadState = nullptr;
|
||||
QThreadPool* PythonIntegrationPlugin::s_threadPool = nullptr;
|
||||
|
||||
QHash<PythonIntegrationPlugin*, PyObject*> PythonIntegrationPlugin::s_plugins;
|
||||
|
||||
|
||||
PyObject* PythonIntegrationPlugin::task_done(PyObject* self, PyObject* args)
|
||||
{
|
||||
Q_UNUSED(self)
|
||||
|
||||
PyObject *result = nullptr;
|
||||
|
||||
if (!PyArg_ParseTuple(args, "O", &result)) {
|
||||
qCWarning(dcPythonIntegrations()) << "Cannot fetch result from coroutine callback.";
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
PyObject *exceptionMethod = PyObject_GetAttrString(result, "exception");
|
||||
|
||||
PyObject *exception = PyObject_CallFunctionObjArgs(exceptionMethod, nullptr);
|
||||
if (exception != Py_None) {
|
||||
PyObject* repr = PyObject_Repr(exception);
|
||||
PyObject* str = PyUnicode_AsEncodedString(repr, "utf-8", "~E~");
|
||||
const char *bytes = PyBytes_AS_STRING(str);
|
||||
Py_XDECREF(repr);
|
||||
Py_XDECREF(str);
|
||||
|
||||
// PyObject *traceback = PyObject_CallMethodObjArgs(exception, "__traceback__", nullptr);
|
||||
|
||||
qCWarning(dcPythonIntegrations()) << "Exception in plugin:" << bytes;
|
||||
|
||||
PyErr_Clear();
|
||||
}
|
||||
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
|
||||
static PyMethodDef nymea_methods[] =
|
||||
{
|
||||
{"task_done", PythonIntegrationPlugin::task_done, METH_VARARGS, "callback to clean up after asyc coroutines"},
|
||||
{nullptr, nullptr, 0, nullptr} // sentinel
|
||||
};
|
||||
|
||||
static PyModuleDef nymea_module =
|
||||
{
|
||||
PyModuleDef_HEAD_INIT, // PyModuleDef_Base m_base;
|
||||
"nymea", // const char* m_name;
|
||||
"nymea module for python based integration plugins", // const char* m_doc;
|
||||
-1, // Py_ssize_t m_size;
|
||||
nymea_methods, // PyMethodDef *m_methods
|
||||
nullptr, nullptr, nullptr, nullptr
|
||||
};
|
||||
|
||||
PyMODINIT_FUNC PyInit_nymea(void)
|
||||
{
|
||||
// Overrride stdout/stderr to use qDebug instead
|
||||
PyObject* pyLog = PyModule_Create(&pyLog_module);
|
||||
PySys_SetObject("stdout", pyLog);
|
||||
PyObject* pyWarn = PyModule_Create(&pyWarn_module);
|
||||
PySys_SetObject("stderr", pyWarn);
|
||||
|
||||
|
||||
// Register nymea types
|
||||
PyObject* m = PyModule_Create(&nymea_module);
|
||||
registerNymeaLoggingHandler(m);
|
||||
registerParamType(m);
|
||||
registerThingType(m);
|
||||
registerThingDescriptorType(m);
|
||||
registerThingDiscoveryInfoType(m);
|
||||
registerThingPairingInfoType(m);
|
||||
registerThingSetupInfoType(m);
|
||||
registerThingActionInfoType(m);
|
||||
|
||||
return m;
|
||||
}
|
||||
|
||||
PyObject* PythonIntegrationPlugin::pyConfiguration(PyObject* self, PyObject* /*args*/)
|
||||
{
|
||||
PythonIntegrationPlugin *plugin = s_plugins.key(self);
|
||||
@ -274,17 +193,27 @@ PythonIntegrationPlugin::PythonIntegrationPlugin(QObject *parent) : IntegrationP
|
||||
|
||||
PythonIntegrationPlugin::~PythonIntegrationPlugin()
|
||||
{
|
||||
if (m_pluginModule) {
|
||||
callPluginFunction("deinit");
|
||||
}
|
||||
|
||||
// Acquire GIL for this plugin's interpreter
|
||||
PyEval_RestoreThread(m_threadState);
|
||||
|
||||
// Cancel all the thread in here
|
||||
while (!m_runningTasks.isEmpty()) {
|
||||
QFutureWatcher<void> *watcher = m_runningTasks.values().first();
|
||||
watcher->cancel();
|
||||
QFutureWatcher<void> *watcher = m_runningTasks.keys().first();
|
||||
QString function = m_runningTasks.value(watcher);
|
||||
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
qCDebug(dcPythonIntegrations()) << "Waiting for" << metadata().pluginName() << "to finish" << function;
|
||||
watcher->waitForFinished();
|
||||
Py_END_ALLOW_THREADS
|
||||
}
|
||||
|
||||
Py_XDECREF(s_plugins.take(this));
|
||||
s_plugins.take(this);
|
||||
Py_XDECREF(m_pluginModule);
|
||||
Py_XDECREF(m_logger);
|
||||
Py_DECREF(m_nymeaModule);
|
||||
|
||||
Py_EndInterpreter(m_threadState);
|
||||
|
||||
@ -301,33 +230,25 @@ void PythonIntegrationPlugin::initPython()
|
||||
PyEval_InitThreads();
|
||||
// Store the main thread state and release the GIL
|
||||
s_mainThreadState = PyEval_SaveThread();
|
||||
|
||||
// Allocate a shared thread pool for the plugins
|
||||
s_threadPool = new QThreadPool();
|
||||
qCDebug(dcPythonIntegrations()) << "Created a thread pool with a maximum of" << s_threadPool->maxThreadCount() << "threads for python plugins.";
|
||||
}
|
||||
|
||||
void PythonIntegrationPlugin::deinitPython()
|
||||
{
|
||||
PyEval_RestoreThread(s_mainThreadState);
|
||||
|
||||
Py_FinalizeEx();
|
||||
|
||||
s_threadPool->deleteLater();
|
||||
s_threadPool = nullptr;
|
||||
}
|
||||
|
||||
bool PythonIntegrationPlugin::loadScript(const QString &scriptFile)
|
||||
{
|
||||
QFileInfo fi(scriptFile);
|
||||
|
||||
QFile metaData(fi.absolutePath() + "/" + fi.baseName() + ".json");
|
||||
if (!metaData.open(QFile::ReadOnly)) {
|
||||
qCWarning(dcThingManager()) << "Error opening metadata file:" << metaData.fileName();
|
||||
QFile metaDataFile(fi.absolutePath() + "/" + fi.baseName() + ".json");
|
||||
if (!metaDataFile.open(QFile::ReadOnly)) {
|
||||
qCWarning(dcThingManager()) << "Error opening metadata file:" << metaDataFile.fileName();
|
||||
return false;
|
||||
}
|
||||
QJsonParseError error;
|
||||
QJsonDocument jsonDoc = QJsonDocument::fromJson(metaData.readAll(), &error);
|
||||
QJsonDocument jsonDoc = QJsonDocument::fromJson(metaDataFile.readAll(), &error);
|
||||
if (error.error != QJsonParseError::NoError) {
|
||||
qCWarning(dcThingManager()) << "Error parsing metadata file:" << error.errorString();
|
||||
return false;
|
||||
@ -350,19 +271,21 @@ bool PythonIntegrationPlugin::loadScript(const QString &scriptFile)
|
||||
// Import nymea module into this interpreter
|
||||
m_nymeaModule = PyImport_ImportModule("nymea");
|
||||
|
||||
// Set up import paths for the plugin
|
||||
// Set up import path for the plugin directory
|
||||
PyObject* sysPath = PySys_GetObject("path");
|
||||
PyObject* pluginImportPath = PyUnicode_FromString(fi.absolutePath().toUtf8());
|
||||
PyList_Append(sysPath, pluginImportPath);
|
||||
Py_DECREF(pluginImportPath);
|
||||
|
||||
// Set up import path for the "modules" subdir in the plugin directory
|
||||
PyObject* pluginModulesImportPath = PyUnicode_FromString(QString("%1/modules/").arg(fi.absolutePath()).toUtf8());
|
||||
PyList_Append(sysPath, pluginModulesImportPath);
|
||||
Py_DECREF(pluginModulesImportPath);
|
||||
|
||||
m_module = PyImport_ImportModule(fi.baseName().toUtf8());
|
||||
// Import the plugin
|
||||
m_pluginModule = PyImport_ImportModule(fi.baseName().toUtf8());
|
||||
|
||||
if (!m_module) {
|
||||
if (!m_pluginModule) {
|
||||
qCWarning(dcThingManager()) << "Error importing python plugin from:" << fi.absoluteFilePath();
|
||||
PyErr_Print();
|
||||
PyErr_Clear();
|
||||
@ -371,34 +294,37 @@ bool PythonIntegrationPlugin::loadScript(const QString &scriptFile)
|
||||
}
|
||||
qCDebug(dcThingManager()) << "Imported python plugin from" << fi.absoluteFilePath();
|
||||
|
||||
s_plugins.insert(this, m_module);
|
||||
|
||||
// We'll be using asyncio everywhere, so let's import it right away
|
||||
// IMPORTANT: The asyncio module is a bit special in a sense that it actually shares
|
||||
// stuff between interpreters. See https://docs.python.org/3/c-api/init.html#c.Py_NewInterpreter
|
||||
// for the listed bugs and caveats.
|
||||
// If we destroy the first interpreter that imports asyncio, things will become crashy as the
|
||||
// interpreter will tear down the module, leaving shallow copies of the modules dict in other
|
||||
// interpreters. So let's import the module down here after we're sure this module compiles and
|
||||
// won't be unloaded any more.
|
||||
// Note: This becomes a problem when we'll be supporting to unload plugins at runtime.
|
||||
m_asyncio = PyImport_ImportModule("asyncio");
|
||||
s_plugins.insert(this, m_pluginModule);
|
||||
|
||||
// Set up logger with appropriate logging category
|
||||
PyNymeaLoggingHandler *logger = reinterpret_cast<PyNymeaLoggingHandler*>(_PyObject_New(&PyNymeaLoggingHandlerType));
|
||||
QString category = metadata().pluginName();
|
||||
category.replace(0, 1, category[0].toUpper());
|
||||
logger->category = static_cast<char*>(malloc(category.length() + 1));
|
||||
memset(logger->category, '0', category.length() +1);
|
||||
strcpy(logger->category, category.toUtf8().data());
|
||||
PyModule_AddObject(m_module, "logger", reinterpret_cast<PyObject*>(logger));
|
||||
|
||||
PyObject *args = Py_BuildValue("(s)", category.toUtf8().data());
|
||||
PyNymeaLoggingHandler *logger = reinterpret_cast<PyNymeaLoggingHandler*>(PyObject_CallObject((PyObject*)&PyNymeaLoggingHandlerType, args));
|
||||
Py_DECREF(args);
|
||||
PyModule_AddObject(m_pluginModule, "logger", reinterpret_cast<PyObject*>(logger));
|
||||
m_logger = (PyObject*)logger;
|
||||
|
||||
// Export metadata ids into module
|
||||
exportIds();
|
||||
|
||||
// Register config access methods
|
||||
PyModule_AddFunctions(m_module, plugin_methods);
|
||||
// Register plugin api methods (plugin params etc)
|
||||
PyModule_AddFunctions(m_pluginModule, plugin_methods);
|
||||
|
||||
// As python does not have an event loop by default and uses blocking code a lot, we'll
|
||||
// call every plugin method in a threaded way to prevent blocking the core while still not
|
||||
// forcing every plugin developer to deal with threading in the plugin.
|
||||
// In oder to not create and destroy a thread for each plugin api call, we'll be using a
|
||||
// thread pool.
|
||||
// The maximum number of threads in a plugin will be amount of things it manages + 2.
|
||||
// This would allow for e.g. running an event loop using init(), performing something on a thing
|
||||
// and still allow the user to perform a discovery at the same time. On the other hand, this is
|
||||
// strict enough to not encourage the plugin developer to block forever in ever api call but use
|
||||
// proper task processing means (timers, event loops etc) instead.
|
||||
// Plugins can still spawn more threads on their own if the need to but have to manage them on their own.
|
||||
m_threadPool = new QThreadPool(this);
|
||||
m_threadPool->setMaxThreadCount(2);
|
||||
qCDebug(dcPythonIntegrations()) << "Created a thread pool with a maximum of" << m_threadPool->maxThreadCount() << "threads for python plugin" << metadata().pluginName();
|
||||
|
||||
PyEval_ReleaseThread(m_threadState);
|
||||
|
||||
@ -517,18 +443,24 @@ void PythonIntegrationPlugin::setupThing(ThingSetupInfo *info)
|
||||
|
||||
PyObject *args = PyTuple_New(1);
|
||||
PyTuple_SetItem(args, 0, (PyObject*)pyThing);
|
||||
Py_INCREF(pyThing);
|
||||
|
||||
PyThingSetupInfo *pyInfo = (PyThingSetupInfo*)PyObject_CallObject((PyObject*)&PyThingSetupInfoType, args);
|
||||
Py_DECREF(args);
|
||||
|
||||
pyInfo->info = info;
|
||||
|
||||
m_threadPool->setMaxThreadCount(m_threadPool->maxThreadCount() + 1);
|
||||
qCDebug(dcPythonIntegrations()) << "Expanded thread pool for plugin" << metadata().pluginName() << "to" << m_threadPool->maxThreadCount();
|
||||
|
||||
PyEval_ReleaseThread(m_threadState);
|
||||
|
||||
connect(info->thing(), &Thing::destroyed, this, [=](){
|
||||
PyEval_RestoreThread(m_threadState);
|
||||
pyThing->thing = nullptr;
|
||||
Py_DECREF(pyThing);
|
||||
m_threadPool->setMaxThreadCount(m_threadPool->maxThreadCount() - 1);
|
||||
qCDebug(dcPythonIntegrations()) << "Shrunk thread pool for plugin" << metadata().pluginName() << "to" << m_threadPool->maxThreadCount();
|
||||
PyEval_ReleaseThread(m_threadState);
|
||||
});
|
||||
connect(info, &ThingSetupInfo::destroyed, this, [=](){
|
||||
@ -549,12 +481,7 @@ void PythonIntegrationPlugin::setupThing(ThingSetupInfo *info)
|
||||
void PythonIntegrationPlugin::postSetupThing(Thing *thing)
|
||||
{
|
||||
PyThing* pyThing = m_things.value(thing);
|
||||
Py_INCREF(pyThing);
|
||||
|
||||
bool success = callPluginFunction("postSetupThing", reinterpret_cast<PyObject*>(pyThing));
|
||||
if (!success) {
|
||||
Py_DECREF(pyThing);
|
||||
}
|
||||
callPluginFunction("postSetupThing", reinterpret_cast<PyObject*>(pyThing));
|
||||
}
|
||||
|
||||
void PythonIntegrationPlugin::executeAction(ThingActionInfo *info)
|
||||
@ -569,10 +496,13 @@ void PythonIntegrationPlugin::executeAction(ThingActionInfo *info)
|
||||
PyEval_ReleaseThread(m_threadState);
|
||||
|
||||
connect(info, &ThingActionInfo::destroyed, this, [=](){
|
||||
qCDebug(dcPythonIntegrations()) << "Info destroyed";
|
||||
PyEval_RestoreThread(m_threadState);
|
||||
qCDebug(dcPythonIntegrations()) << "Info destroyed2";
|
||||
pyInfo->info = nullptr;
|
||||
Py_DECREF(pyInfo);
|
||||
PyEval_ReleaseThread(m_threadState);
|
||||
qCDebug(dcPythonIntegrations()) << "Info destroyed3";
|
||||
});
|
||||
|
||||
bool success = callPluginFunction("executeAction", reinterpret_cast<PyObject*>(pyInfo));
|
||||
@ -584,12 +514,7 @@ void PythonIntegrationPlugin::executeAction(ThingActionInfo *info)
|
||||
void PythonIntegrationPlugin::thingRemoved(Thing *thing)
|
||||
{
|
||||
PyThing *pyThing = m_things.value(thing);
|
||||
Py_INCREF(pyThing);
|
||||
|
||||
bool success = callPluginFunction("thingRemoved", reinterpret_cast<PyObject*>(pyThing));
|
||||
if (!success) {
|
||||
Py_DECREF(pyThing);
|
||||
}
|
||||
callPluginFunction("thingRemoved", reinterpret_cast<PyObject*>(pyThing));
|
||||
|
||||
m_mutex.lock();
|
||||
m_things.remove(thing);
|
||||
@ -623,13 +548,13 @@ void PythonIntegrationPlugin::exportIds()
|
||||
QString pluginName = metadata().pluginName();
|
||||
QString pluginId = metadata().pluginId().toString();
|
||||
qCDebug(dcThingManager()) << "- Plugin:" << pluginName << pluginId;
|
||||
PyModule_AddStringConstant(m_module, "pluginId", pluginId.toUtf8());
|
||||
PyModule_AddStringConstant(m_pluginModule, "pluginId", pluginId.toUtf8());
|
||||
|
||||
exportParamTypes(configurationDescription(), pluginName, "", "plugin");
|
||||
|
||||
foreach (const Vendor &vendor, supportedVendors()) {
|
||||
qCDebug(dcThingManager()) << "|- Vendor:" << vendor.name() << vendor.id().toString();
|
||||
PyModule_AddStringConstant(m_module, QString("%1VendorId").arg(vendor.name()).toUtf8(), vendor.id().toString().toUtf8());
|
||||
PyModule_AddStringConstant(m_pluginModule, QString("%1VendorId").arg(vendor.name()).toUtf8(), vendor.id().toString().toUtf8());
|
||||
}
|
||||
|
||||
foreach (const ThingClass &thingClass, supportedThings()) {
|
||||
@ -642,7 +567,7 @@ void PythonIntegrationPlugin::exportThingClass(const ThingClass &thingClass)
|
||||
QString variableName = QString("%1ThingClassId").arg(thingClass.name());
|
||||
|
||||
qCDebug(dcThingManager()) << "|- ThingClass:" << variableName << thingClass.id().toString();
|
||||
PyModule_AddStringConstant(m_module, variableName.toUtf8(), thingClass.id().toString().toUtf8());
|
||||
PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), thingClass.id().toString().toUtf8());
|
||||
|
||||
exportParamTypes(thingClass.paramTypes(), thingClass.name(), "", "thing");
|
||||
exportParamTypes(thingClass.settingsTypes(), thingClass.name(), "", "settings");
|
||||
@ -659,7 +584,7 @@ void PythonIntegrationPlugin::exportParamTypes(const ParamTypes ¶mTypes, con
|
||||
foreach (const ParamType ¶mType, paramTypes) {
|
||||
QString variableName = QString("%1ParamTypeId").arg(thingClassName + typeName[0].toUpper() + typeName.right(typeName.length()-1) + typeClass + paramType.name()[0].toUpper() + paramType.name().right(paramType.name().length() -1 ));
|
||||
qCDebug(dcThingManager()) << " |- ParamType:" << variableName << paramType.id().toString();
|
||||
PyModule_AddStringConstant(m_module, variableName.toUtf8(), paramType.id().toString().toUtf8());
|
||||
PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), paramType.id().toString().toUtf8());
|
||||
}
|
||||
}
|
||||
|
||||
@ -668,7 +593,7 @@ void PythonIntegrationPlugin::exportStateTypes(const StateTypes &stateTypes, con
|
||||
foreach (const StateType &stateType, stateTypes) {
|
||||
QString variableName = QString("%1%2StateTypeId").arg(thingClassName, stateType.name()[0].toUpper() + stateType.name().right(stateType.name().length() - 1));
|
||||
qCDebug(dcThingManager()) << " |- StateType:" << variableName << stateType.id().toString();
|
||||
PyModule_AddStringConstant(m_module, variableName.toUtf8(), stateType.id().toString().toUtf8());
|
||||
PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), stateType.id().toString().toUtf8());
|
||||
}
|
||||
}
|
||||
|
||||
@ -677,10 +602,9 @@ void PythonIntegrationPlugin::exportEventTypes(const EventTypes &eventTypes, con
|
||||
foreach (const EventType &eventType, eventTypes) {
|
||||
QString variableName = QString("%1%2EventTypeId").arg(thingClassName, eventType.name()[0].toUpper() + eventType.name().right(eventType.name().length() - 1));
|
||||
qCDebug(dcThingManager()) << " |- EventType:" << variableName << eventType.id().toString();
|
||||
PyModule_AddStringConstant(m_module, variableName.toUtf8(), eventType.id().toString().toUtf8());
|
||||
PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), eventType.id().toString().toUtf8());
|
||||
exportParamTypes(eventType.paramTypes(), thingClassName, "Event", eventType.name());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void PythonIntegrationPlugin::exportActionTypes(const ActionTypes &actionTypes, const QString &thingClassName)
|
||||
@ -688,7 +612,7 @@ void PythonIntegrationPlugin::exportActionTypes(const ActionTypes &actionTypes,
|
||||
foreach (const ActionType &actionType, actionTypes) {
|
||||
QString variableName = QString("%1%2ActionTypeId").arg(thingClassName, actionType.name()[0].toUpper() + actionType.name().right(actionType.name().length() - 1));
|
||||
qCDebug(dcThingManager()) << " |- ActionType:" << variableName << actionType.id().toString();
|
||||
PyModule_AddStringConstant(m_module, variableName.toUtf8(), actionType.id().toString().toUtf8());
|
||||
PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), actionType.id().toString().toUtf8());
|
||||
exportParamTypes(actionType.paramTypes(), thingClassName, "Action", actionType.name());
|
||||
}
|
||||
}
|
||||
@ -698,19 +622,17 @@ void PythonIntegrationPlugin::exportBrowserItemActionTypes(const ActionTypes &ac
|
||||
foreach (const ActionType &actionType, actionTypes) {
|
||||
QString variableName = QString("%1%2BrowserItemActionTypeId").arg(thingClassName, actionType.name()[0].toUpper() + actionType.name().right(actionType.name().length() - 1));
|
||||
qCDebug(dcThingManager()) << " |- BrowserActionType:" << variableName << actionType.id().toString();
|
||||
PyModule_AddStringConstant(m_module, variableName.toUtf8(), actionType.id().toString().toUtf8());
|
||||
PyModule_AddStringConstant(m_pluginModule, variableName.toUtf8(), actionType.id().toString().toUtf8());
|
||||
exportParamTypes(actionType.paramTypes(), thingClassName, "BrowserItemAction", actionType.name());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
bool PythonIntegrationPlugin::callPluginFunction(const QString &function, PyObject *param1, PyObject *param2, PyObject *param3)
|
||||
{
|
||||
PyEval_RestoreThread(m_threadState);
|
||||
|
||||
qCDebug(dcThingManager()) << "Calling python plugin function" << function << "on plugin" << pluginName();
|
||||
PyObject *pluginFunction = PyObject_GetAttrString(m_module, function.toUtf8());
|
||||
PyObject *pluginFunction = PyObject_GetAttrString(m_pluginModule, function.toUtf8());
|
||||
if(!pluginFunction || !PyCallable_Check(pluginFunction)) {
|
||||
PyErr_Clear();
|
||||
Py_XDECREF(pluginFunction);
|
||||
@ -719,84 +641,45 @@ bool PythonIntegrationPlugin::callPluginFunction(const QString &function, PyObje
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
PyObject *pluginFunctionResult = PyObject_CallFunctionObjArgs(pluginFunction, param1, param2, param3, nullptr);
|
||||
|
||||
Py_XDECREF(pluginFunction);
|
||||
|
||||
if (PyErr_Occurred()) {
|
||||
qCWarning(dcThingManager()) << "Error calling python method:" << function << "on plugin" << pluginName();
|
||||
PyErr_Print();
|
||||
PyErr_Clear();
|
||||
PyEval_ReleaseThread(m_threadState);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (QByteArray(pluginFunctionResult->ob_type->tp_name) != "coroutine") {
|
||||
Py_DECREF(pluginFunctionResult);
|
||||
PyEval_ReleaseThread(m_threadState);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Spawn a new event loop for the thread
|
||||
QFuture<void> future = QtConcurrent::run([this, pluginFunctionResult, function](){
|
||||
qCDebug(dcPythonIntegrations()) << "Spawning thread for" << function << "in plugin" << metadata().pluginName();
|
||||
|
||||
// Register this new thread in the interpreter
|
||||
PyThreadState *thread = PyThreadState_New(m_threadState->interp);
|
||||
|
||||
// Acquire GIL and make the new thread state the current one
|
||||
PyEval_RestoreThread(thread);
|
||||
|
||||
PyObject *new_event_loop = PyObject_GetAttrString(m_asyncio, "new_event_loop");
|
||||
PyObject *loop = PyObject_CallFunctionObjArgs(new_event_loop, nullptr);
|
||||
|
||||
Py_DECREF(new_event_loop);
|
||||
|
||||
PyObject *create_task = PyObject_GetAttrString(loop, "create_task");
|
||||
PyObject *task = PyObject_CallFunctionObjArgs(create_task, pluginFunctionResult, nullptr);
|
||||
dumpError();
|
||||
|
||||
Py_DECREF(create_task);
|
||||
Py_DECREF(pluginFunctionResult);
|
||||
|
||||
PyObject *add_done_callback = PyObject_GetAttrString(task, "add_done_callback");
|
||||
dumpError();
|
||||
|
||||
PyObject *task_done = PyObject_GetAttrString(m_nymeaModule, "task_done");
|
||||
PyObject *coroutineResult = PyObject_CallFunctionObjArgs(add_done_callback, task_done, nullptr);
|
||||
dumpError();
|
||||
|
||||
Py_DECREF(coroutineResult);
|
||||
Py_DECREF(add_done_callback);
|
||||
Py_DECREF(task_done);
|
||||
|
||||
PyObject *run_until_complete = PyObject_GetAttrString(loop, "run_until_complete");
|
||||
|
||||
Py_DECREF(loop);
|
||||
|
||||
PyObject *taskResult = PyObject_CallFunctionObjArgs(run_until_complete, task, nullptr);
|
||||
dumpError();
|
||||
|
||||
Py_XDECREF(taskResult);
|
||||
Py_DECREF(run_until_complete);
|
||||
Py_DECREF(task);
|
||||
|
||||
// Destroy the thread and release the GIL
|
||||
PyThreadState_Clear(thread);
|
||||
PyThreadState_DeleteCurrent();
|
||||
qCDebug(dcPythonIntegrations()) << "Thread for" << function << "in plugin" << metadata().pluginName() << "ended";
|
||||
});
|
||||
Py_XINCREF(param1);
|
||||
Py_XINCREF(param2);
|
||||
Py_XINCREF(param3);
|
||||
|
||||
QFutureWatcher<void> *watcher = new QFutureWatcher<void>(this);
|
||||
watcher->setFuture(future);
|
||||
m_runningTasks.insert(watcher);
|
||||
|
||||
connect(watcher, &QFutureWatcher<void>::finished, this, [this, watcher](){
|
||||
// Run the plugin function in the thread pool
|
||||
QFuture<void> future = QtConcurrent::run(m_threadPool, [=](){
|
||||
qCDebug(dcPythonIntegrations()) << "+++ Thread for" << function << "in plugin" << metadata().pluginName();
|
||||
|
||||
// Register this new thread in the interpreter
|
||||
PyThreadState *threadState = PyThreadState_New(m_threadState->interp);
|
||||
|
||||
// Acquire GIL and make the new thread state the current one
|
||||
PyEval_RestoreThread(threadState);
|
||||
|
||||
PyObject *pluginFunctionResult = PyObject_CallFunctionObjArgs(pluginFunction, param1, param2, param3, nullptr);
|
||||
dumpError();
|
||||
|
||||
if (PyErr_Occurred()) {
|
||||
qCWarning(dcThingManager()) << "Error calling python method:" << function << "on plugin" << pluginName();
|
||||
}
|
||||
|
||||
Py_DECREF(pluginFunction);
|
||||
Py_XDECREF(pluginFunctionResult);
|
||||
Py_XDECREF(param1);
|
||||
Py_XDECREF(param2);
|
||||
Py_XDECREF(param3);
|
||||
|
||||
m_runningTasks.remove(watcher);
|
||||
delete watcher;
|
||||
});
|
||||
|
||||
// Destroy the thread and release the GIL
|
||||
PyThreadState_Clear(threadState);
|
||||
PyEval_ReleaseThread(threadState);
|
||||
PyThreadState_Delete(threadState);
|
||||
qCDebug(dcPythonIntegrations()) << "--- Thread for" << function << "in plugin" << metadata().pluginName();
|
||||
});
|
||||
watcher->setFuture(future);
|
||||
m_runningTasks.insert(watcher, function);
|
||||
|
||||
PyEval_ReleaseThread(m_threadState);
|
||||
return true;
|
||||
|
||||
@ -47,10 +47,6 @@ public:
|
||||
static PyObject* pyAutoThingsAppeared(PyObject *self, PyObject* args);
|
||||
static PyObject* pyAutoThingDisappeared(PyObject *self, PyObject* args);
|
||||
|
||||
public:
|
||||
// python callbacks
|
||||
static PyObject* task_done(PyObject* self, PyObject* args);
|
||||
|
||||
private:
|
||||
void exportIds();
|
||||
void exportThingClass(const ThingClass &thingClass);
|
||||
@ -66,17 +62,22 @@ private:
|
||||
private:
|
||||
// The main thread state in which we create an interpreter per plugin
|
||||
static PyThreadState* s_mainThreadState;
|
||||
static QThreadPool *s_threadPool;
|
||||
|
||||
// A per plugin thread state and interpreter
|
||||
PyThreadState *m_threadState = nullptr;
|
||||
|
||||
// Modules imported into the interpreter
|
||||
PyObject *m_nymeaModule;
|
||||
PyObject *m_asyncio;
|
||||
// A per plugin thread pool
|
||||
QThreadPool *m_threadPool = nullptr;
|
||||
|
||||
// Running concurrent tasks in this plugins thread pool
|
||||
QHash<QFutureWatcher<void>*, QString> m_runningTasks;
|
||||
|
||||
// The nymea module we import into the interpreter
|
||||
PyObject *m_nymeaModule = nullptr;
|
||||
// The imported plugin module (the plugin.py)
|
||||
PyObject *m_module = nullptr;
|
||||
PyObject *m_pluginModule = nullptr;
|
||||
|
||||
PyObject *m_logger = nullptr;
|
||||
|
||||
// A map of plugin instances to plugin python scripts/modules
|
||||
// Make sure to hold the GIL when accessing this.
|
||||
@ -91,8 +92,6 @@ private:
|
||||
// Need to keep a copy of plugin params and sync that in a thread-safe manner
|
||||
ParamList m_pluginConfigCopy;
|
||||
|
||||
// Running concurrent tasks in this plugin
|
||||
QSet<QFutureWatcher<void>*> m_runningTasks;
|
||||
};
|
||||
|
||||
#endif // PYTHONINTEGRATIONPLUGIN_H
|
||||
|
||||
@ -21,6 +21,7 @@ RESOURCES += $$top_srcdir/icons.qrc \
|
||||
HEADERS += nymeacore.h \
|
||||
integrations/plugininfocache.h \
|
||||
integrations/python/pynymealogginghandler.h \
|
||||
integrations/python/pynymeamodule.h \
|
||||
integrations/python/pyparam.h \
|
||||
integrations/python/pything.h \
|
||||
integrations/python/pythingactioninfo.h \
|
||||
|
||||
@ -1,14 +1,17 @@
|
||||
import nymea
|
||||
import asyncio
|
||||
import time
|
||||
#from fastdotcom import fast_com
|
||||
|
||||
watchingAutoThings = False
|
||||
loopRunning = False
|
||||
|
||||
async def init():
|
||||
def init():
|
||||
logger.log("Python mock plugin init")
|
||||
global loopRunning
|
||||
loopRunning = True
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(5);
|
||||
while loopRunning:
|
||||
time.sleep(5);
|
||||
for thing in myThings():
|
||||
if thing.thingClassId == pyMockThingClassId:
|
||||
logger.log("Emitting event 1 for", thing.name, "eventTypeId", pyMockEvent1EventTypeId)
|
||||
@ -20,6 +23,13 @@ async def init():
|
||||
thing.emitEvent(pyMockDiscoveryPairingEvent1EventTypeId, [nymea.Param(pyMockDiscoveryPairingEvent1EventParam1ParamTypeId, "Im an event")])
|
||||
logger.log("Setting state 1 for", thing.name, "Old value is:", thing.stateValue(pyMockDiscoveryPairingState1StateTypeId))
|
||||
thing.setStateValue(pyMockDiscoveryPairingState1StateTypeId, thing.stateValue(pyMockDiscoveryPairingState1StateTypeId) + 1)
|
||||
logger.log("Bye bye")
|
||||
|
||||
|
||||
def deinit():
|
||||
logger.log("shutting down")
|
||||
global loopRunning
|
||||
loopRunning = False
|
||||
|
||||
|
||||
def configValueChanged(paramTypeId, value):
|
||||
@ -55,9 +65,9 @@ def startMonitoringAutoThings():
|
||||
logger.log("Done start monitoring auto things")
|
||||
|
||||
|
||||
async def discoverThings(info):
|
||||
def discoverThings(info):
|
||||
logger.log("Discovery started for", info.thingClassId, "with result count:", info.params[0].value)
|
||||
await asyncio.sleep(10) # Some delay for giving a feeling of a discovery
|
||||
time.sleep(10) # Some delay for giving a feeling of a discovery
|
||||
# Add 2 new discovery results
|
||||
for i in range(0, info.params[0].value):
|
||||
info.addDescriptor(nymea.ThingDescriptor(pyMockDiscoveryPairingThingClassId, "Python mock thing %i" % i))
|
||||
@ -69,26 +79,26 @@ async def discoverThings(info):
|
||||
info.finish(nymea.ThingErrorNoError)
|
||||
|
||||
|
||||
async def startPairing(info):
|
||||
def startPairing(info):
|
||||
logger.log("startPairing for", info.thingName, info.thingId, info.params)
|
||||
info.finish(nymea.ThingErrorNoError, "Log in as user \"john\" with password \"smith\".")
|
||||
|
||||
|
||||
async def confirmPairing(info, username, secret):
|
||||
def confirmPairing(info, username, secret):
|
||||
logger.log("confirming pairing for", info.thingName, username, secret)
|
||||
await asyncio.sleep(1)
|
||||
time.sleep(1)
|
||||
if username == "john" and secret == "smith":
|
||||
info.finish(nymea.ThingErrorNoError)
|
||||
else:
|
||||
info.finish(nymea.ThingErrorAuthenticationFailure, "Error logging in here!")
|
||||
|
||||
|
||||
async def setupThing(info):
|
||||
def setupThing(info):
|
||||
logger.log("setupThing for", info.thing.name)
|
||||
info.finish(nymea.ThingErrorNoError)
|
||||
|
||||
|
||||
async def postSetupThing(thing):
|
||||
def postSetupThing(thing):
|
||||
logger.log("postSetupThing for", thing.name)
|
||||
thing.nameChangedHandler = lambda thing : logger.log("Thing name changed", thing.name)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user