powersync-docs/scripts/gen_device_reference.py
Patrick Schurig 0f2be2e000
All checks were successful
Build & Deploy docs / build-deploy (push) Successful in 1m47s
feat: badge origine + catalogue complet PORTING_STATUS (60 plugins)
Partie 1 — Dimension « origine » (3ème badge)
- gen_device_reference.py : ORIGIN_BADGES dict, resolve_origin() déduit
  l'origine depuis repo: (nymea-* → NYMEA, etm-* → ETM) ou champ origin:
  explicite ; colonne Origine ajoutée dans matrice et index de catégorie ;
  badge origine dans l'en-tête des fiches générées
- extra.css : styles .badge.origin-nymea (gris-bleu), .badge.origin-etm
  (vert), .badge.origin-tiers (orange)
- compatibilite.md : légende étendue aux 3 dimensions (canal, origine, stab.)
- Catégorie « generic » ajoutée (Types génériques nymea)

Partie 2 — Catalogue complet PORTING_STATUS.yaml
- 9 entrées validées conservées avec leurs canaux (stable/testing/nightly)
- 41 nouveaux plugins énergie activés (channel: nightly) : onduleurs PV
  (sma, solax, sungrow, huawei, kostal, mtec, wattsonic, sunspec, bosswerk,
  solarlog), IRVE (easee, goecharger, evbox, everest, v2xeamberelectric,
  amperfied, mennekes, webasto, pcelectric, phoenixconnect, inro),
  HVAC (tado, homeconnect, stiebeleltron, idm, drexelundweiss, alphainnotec,
  mypv), compteurs (powerfox, bgetech, inepro, schrack, vestel, senseair),
  smartdevices (shelly, tasmota, tuya, gpio, usbrelay, usbrly82, mqttclient,
  modbuscommander, unipi), tarifs (awattar, tempo, spothinta),
  generic (genericenergy, genericheatingcooling, genericcar)
- 60 plugins domotique/hors-périmètre commentés (réactivables via # → -)
- 50 fiches générées + 2 index créés (tarifs/, generic/)
- mkdocs build --strict OK, --check exit 0 (idempotent)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 15:02:25 +02:00

616 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
gen_device_reference.py — génère la matrice de compatibilité, les sections de
référence des appareils et le fichier de nav literate-nav (SUMMARY.md) depuis
PORTING_STATUS.yaml + integrationplugin*.json + meta.json.
Deux modes de traitement des fiches :
- Injection : remplace le contenu entre marqueurs dans les fiches existantes.
- Création : crée entièrement les fiches et index inexistants.
Marqueurs gérés :
<!-- BEGIN GENERATED: integrationplugin<plugin>.json --> fiche appareil
<!-- BEGIN GENERATED: __matrix__ --> matrice compatibilité
<!-- BEGIN GENERATED: __index_<cat>__ --> index de catégorie
Usage :
python3 scripts/gen_device_reference.py --src .plugins-src --docs docs --lang fr
python3 scripts/gen_device_reference.py --src .plugins-src --docs docs --lang fr --check
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
try:
import yaml
except ImportError:
sys.exit("PyYAML est requis : pip install PyYAML")
INTERFACE_ROLE = {
"energymeter": "Compteur d'énergie",
"smartmeterconsumer": "Compteur de consommation",
"smartmeterproducer": "Compteur de production",
"evcharger": "Borne de recharge",
"heatpump": "Pompe à chaleur",
"smartmeter": "Compteur intelligent",
}
TECH_INTERFACES = {"connectable", "networkdevice", "wirelessconnectable"}
CREATE_METHOD = {
"discovery": "Découverte automatique",
"user": "Ajout manuel",
"auto": "Automatique",
}
CATEGORY_LABELS = {
"compteur": "Compteurs",
"irve": "Bornes de recharge",
"smartdevice": "SmartDevices",
"hvac": "HVAC",
"onduleur": "Onduleurs / PV",
"batterie": "Batteries / ESS",
"tarif": "Tarifs & prévisions",
"generic": "Types génériques",
}
CATEGORY_FOLDER = {
"compteur": "compteurs",
"irve": "bornes",
"smartdevice": "smart",
"hvac": "hvac",
"onduleur": "onduleurs",
"batterie": "batteries",
"tarif": "tarifs",
"generic": "generic",
}
CATEGORY_ORDER = [
"compteur", "irve", "smartdevice", "hvac", "onduleur", "batterie", "tarif", "generic",
]
CHANNEL_BADGES = {
"stable": '<span class="badge stable">STABLE</span>',
"testing": '<span class="badge testing">TESTING</span>',
"nightly": '<span class="badge nightly">NIGHTLY</span>',
}
STABILITY_BADGES = {
"consumer": '<span class="badge consumer">CONSUMER</span>',
"community": '<span class="badge community">COMMUNITY</span>',
}
ORIGIN_BADGES = {
"nymea": '<span class="badge origin-nymea">NYMEA</span>',
"etm": '<span class="badge origin-etm">ETM</span>',
"tiers": '<span class="badge origin-tiers">TIERS</span>',
}
def resolve_origin(e: dict) -> str:
"""Détermine l'origine depuis le champ explicite `origin:` ou depuis `repo:`."""
if e.get("origin"):
return e["origin"]
repo = e.get("repo", "")
if repo.startswith("nymea-"):
return "nymea"
if repo.startswith("etm-"):
return "etm"
return "tiers"
MARKER_RE = re.compile(
r"(<!-- BEGIN GENERATED: (?P<key>[^\s]+) -->)(?P<body>.*?)(<!-- END GENERATED -->)",
re.DOTALL,
)
# ── Chargement ────────────────────────────────────────────────────────────────
def load_porting_status(root: Path) -> list:
path = root / "PORTING_STATUS.yaml"
if not path.exists():
sys.exit(f"ERREUR : PORTING_STATUS.yaml introuvable à {path}")
with open(path) as f:
return yaml.safe_load(f)
def load_plugins(src: Path) -> dict:
"""Charge tous les integrationplugin*.json trouvés récursivement.
Logue explicitement chaque fichier vide ou non parsable (ne plante pas).
"""
plugins: dict = {}
for f in sorted(src.rglob("integrationplugin*.json")):
if f.name in plugins:
continue
raw = f.read_text(encoding="utf-8").strip()
if not raw:
print(f" AVERTISSEMENT : {f} est vide — ignoré", file=sys.stderr)
continue
try:
plugins[f.name] = json.loads(raw)
except json.JSONDecodeError as exc:
print(f" AVERTISSEMENT : {f} non parsable ({exc}) — ignoré", file=sys.stderr)
return plugins
def load_meta(src: Path, plugin: str) -> dict:
"""Charge meta.json depuis le même dossier que integrationplugin<plugin>.json.
Logue explicitement si le fichier est vide ou non parsable.
"""
for f in sorted(src.rglob(f"integrationplugin{plugin}.json")):
meta_path = f.parent / "meta.json"
if not meta_path.exists():
return {}
raw = meta_path.read_text(encoding="utf-8").strip()
if not raw:
print(f" AVERTISSEMENT : {meta_path} est vide — meta ignoré", file=sys.stderr)
return {}
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
print(f" AVERTISSEMENT : {meta_path} non parsable ({exc}) — meta ignoré",
file=sys.stderr)
return {}
return {}
# ── Libellé d'un appareil ────────────────────────────────────────────────────
def entry_name(e: dict, meta: dict) -> str:
"""Titre : PORTING_STATUS.name > meta.json.title > plugin (fallback dernier recours)."""
return e.get("name") or meta.get("title") or e.get("plugin", "?")
# ── Helpers JSON nymea ────────────────────────────────────────────────────────
def transport_of(tc: dict) -> str:
params = {p["name"] for p in tc.get("paramTypes", [])} | \
{p["name"] for p in tc.get("discoveryParamTypes", [])}
ifaces = set(tc.get("interfaces", []))
if "networkdevice" in ifaces or {"hostName", "address", "port", "macAddress"} & params:
return "Modbus TCP"
if {"modbusMasterUuid", "rtuMaster"} & params:
return "Modbus RTU"
return ""
def roles(tc: dict) -> str:
r = [INTERFACE_ROLE.get(i, i) for i in tc.get("interfaces", []) if i not in TECH_INTERFACES]
return ", ".join(r) if r else ""
def add_method(tc: dict) -> str:
return " / ".join(CREATE_METHOD.get(m, m) for m in tc.get("createMethods", [])) or ""
def resolve_protocol(plugin_data: dict | None) -> str:
if not plugin_data:
return ""
for v in plugin_data.get("vendors", []):
for tc in v.get("thingClasses", []):
t = transport_of(tc)
if t != "":
return t
return ""
# ── Rendu Markdown ────────────────────────────────────────────────────────────
def fmt_range(p: dict) -> str:
lo, hi = p.get("minValue"), p.get("maxValue")
if lo is not None and hi is not None:
return f"{lo}{hi}"
return ""
def fmt_default(p: dict) -> str:
d = p.get("defaultValue", "")
if d == "" or d is None:
return ""
return f"`{d}`"
def md_table(headers: list, rows: list) -> str:
out = ["| " + " | ".join(headers) + " |",
"| " + " | ".join("---" for _ in headers) + " |"]
for r in rows:
out.append("| " + " | ".join(str(c) for c in r) + " |")
return "\n".join(out)
def render_params(tc: dict) -> list:
lines = []
disc = tc.get("discoveryParamTypes", [])
if disc:
rows = [[f"`{p['name']}`", p.get("displayName", ""), p.get("type", ""),
fmt_range(p), fmt_default(p)] for p in disc]
lines.append("_Paramètres de découverte :_")
lines.append(md_table(["Clé", "Libellé", "Type", "Plage", "Défaut"], rows))
lines.append("")
settings = tc.get("paramTypes", [])
if settings:
rows = [[f"`{p['name']}`", p.get("displayName", ""), p.get("type", ""),
fmt_range(p), fmt_default(p),
"oui" if p.get("readOnly") else "non"] for p in settings]
lines.append("_Réglages :_")
lines.append(md_table(["Clé", "Libellé", "Type", "Plage", "Défaut", "Lecture seule"], rows))
lines.append("")
return lines
def render_states(tc: dict) -> list:
states = tc.get("stateTypes", [])
if not states:
return []
rows = [[f"`{s['name']}`", s.get("displayName", ""), s.get("type", ""),
s.get("unit") or ""] for s in states]
return [md_table(["Clé", "Grandeur", "Type", "Unité"], rows), ""]
def render_plugin(plugin: dict) -> str:
out = []
vendors = plugin.get("vendors", [])
vname = ", ".join(v.get("displayName", v.get("name", "")) for v in vendors)
out.append(f"**Fabricant :** {vname} ")
out.append(f"**Plugin :** `{plugin.get('name', '')}`")
out.append("")
tcs = [(v, tc) for v in vendors for tc in v.get("thingClasses", [])]
rows = [[f"**{tc.get('displayName', tc.get('name'))}**",
roles(tc), transport_of(tc), add_method(tc),
str(len(tc.get("stateTypes", [])))]
for _, tc in tcs]
out.append("#### Modèles pris en charge")
out.append(md_table(["Modèle", "Rôle", "Transport", "Ajout", "Grandeurs"], rows))
out.append("")
out.append("#### Détail par modèle")
for _, tc in tcs:
title = tc.get("displayName", tc.get("name"))
out.append(f'??? abstract "{title} — `{tc.get("name")}`"')
body = []
body += render_params(tc)
if tc.get("stateTypes"):
body.append("_Grandeurs mesurées :_")
body += render_states(tc)
for ln in ("\n".join(body)).splitlines():
out.append(" " + ln if ln else "")
out.append("")
return "\n".join(out).rstrip() + "\n"
def render_nightly_stub(entry: dict, meta: dict) -> str:
"""Contenu généré pour une entrée nightly dont le JSON est absent."""
name = entry_name(entry, meta)
return (
f"_Support de **{name}** en cours d'intégration — canal nightly._\n"
"\n"
"_La documentation technique sera disponible dès que le plugin sera "
"disponible sur le dépôt stable._\n"
)
def render_category_index(cat: str, entries: list, plugins: dict, src: Path) -> str:
"""Tableau listant les appareils d'une catégorie, pour l'index.md auto-généré."""
cat_entries = [e for e in entries if e.get("category") == cat and e.get("plugin")]
if not cat_entries:
return "_Aucun appareil dans cette catégorie._\n"
rows = []
for e in cat_entries:
fname = f"integrationplugin{e['plugin']}.json"
plugin_data = plugins.get(fname)
meta = load_meta(src, e["plugin"])
name = entry_name(e, meta)
slug = e.get("slug") or e["plugin"]
protocol = resolve_protocol(plugin_data) if plugin_data else ""
channel_badge = CHANNEL_BADGES.get(e["channel"], e["channel"])
origin_badge = ORIGIN_BADGES.get(resolve_origin(e), "")
stability = meta.get("stability", "")
stability_badge = STABILITY_BADGES.get(stability, "") if stability else ""
rows.append([f"[{name}]({slug}.md)", protocol, channel_badge, origin_badge,
stability_badge])
return md_table(["Appareil", "Protocole", "Canal", "Origine", "Stabilité"], rows) + "\n"
def render_matrix(entries: list, plugins: dict, src: Path) -> str:
by_cat: dict = {}
for e in entries:
by_cat.setdefault(e.get("category", "autre"), []).append(e)
lines = []
ordered_cats = [c for c in CATEGORY_ORDER if c in by_cat]
ordered_cats += [c for c in by_cat if c not in CATEGORY_ORDER]
for cat in ordered_cats:
label = CATEGORY_LABELS.get(cat, cat.capitalize())
cat_rows = []
for e in by_cat[cat]:
fname = f"integrationplugin{e['plugin']}.json" if e.get("plugin") else None
plugin_data = plugins.get(fname) if fname else None
if e["channel"] == "nightly" and (not fname or fname not in plugins):
print(f" INFO matrice : {e.get('plugin', '?')} nightly sans JSON "
f"→ ligne stub dans la matrice", file=sys.stderr)
meta = load_meta(src, e["plugin"]) if e.get("plugin") else {}
name = entry_name(e, meta)
protocol = resolve_protocol(plugin_data)
channel_badge = CHANNEL_BADGES.get(e["channel"], e["channel"])
origin_badge = ORIGIN_BADGES.get(resolve_origin(e), "")
stability = meta.get("stability", "")
stability_badge = STABILITY_BADGES.get(stability, "") if stability else ""
cat_rows.append(
f"| {name} | {protocol} | {channel_badge} | {origin_badge} | {stability_badge} |"
)
if not cat_rows:
continue
lines.append(f"## {label}\n")
lines.append("| Marque / Modèle | Protocole | Canal | Origine | Stabilité |")
lines.append("|---|---|---|---|---|")
lines.extend(cat_rows)
lines.append("")
return "\n".join(lines)
# ── SUMMARY.md pour mkdocs-literate-nav ──────────────────────────────────────
def generate_summary(entries: list, docs: Path, src: Path, plugins: dict,
check: bool) -> int:
"""Écrit docs/appareils/SUMMARY.md. Retourne 1 en mode --check si périmé."""
by_cat: dict = {}
for e in entries:
by_cat.setdefault(e.get("category", "autre"), []).append(e)
lines = ["* [Compatibilité](compatibilite.md)"]
ordered_cats = [c for c in CATEGORY_ORDER if c in by_cat]
ordered_cats += [c for c in by_cat if c not in CATEGORY_ORDER]
for cat in ordered_cats:
label = CATEGORY_LABELS.get(cat, cat.capitalize())
folder = CATEGORY_FOLDER.get(cat, cat)
cat_entries = []
for e in by_cat[cat]:
if not e.get("plugin"):
continue
meta = load_meta(src, e["plugin"])
name = entry_name(e, meta)
slug = e.get("slug") or e["plugin"]
cat_entries.append(f" * [{name}]({folder}/{slug}.md)")
if cat_entries:
lines.append(f"* [{label}]({folder}/index.md)")
lines.extend(cat_entries)
new_content = "\n".join(lines) + "\n"
summary_path = docs / "appareils" / "SUMMARY.md"
if check:
if not summary_path.exists() or summary_path.read_text(encoding="utf-8") != new_content:
print(f" SUMMARY.md périmé : {summary_path.relative_to(docs.parent)}")
return 1
return 0
summary_path.write_text(new_content, encoding="utf-8")
print(f"SUMMARY.md → {summary_path.relative_to(docs.parent)}")
return 0
# ── Validation ────────────────────────────────────────────────────────────────
def validate_entries(entries: list, plugins: dict, src: Path) -> None:
errors = []
for e in entries:
if not e.get("plugin"):
continue
fname = f"integrationplugin{e['plugin']}.json"
if e["channel"] != "nightly":
if fname not in plugins:
errors.append(
f" BLOQUANT : {fname} introuvable dans --src "
f"(channel={e['channel']}, plugin={e['plugin']})"
)
meta = load_meta(src, e["plugin"])
if not e.get("name") and not meta.get("title"):
errors.append(
f" BLOQUANT : libellé manquant pour plugin={e['plugin']} "
f"(ni PORTING_STATUS.name ni meta.json.title)"
)
if errors:
sys.exit("ERREURS BLOQUANTES :\n" + "\n".join(errors))
# ── Construction d'une page complète ─────────────────────────────────────────
def build_device_page(e: dict, plugin_data: dict | None, meta: dict) -> str:
"""Construit le contenu complet d'une fiche appareil générée."""
title = entry_name(e, meta)
plugin = e["plugin"]
fname = f"integrationplugin{plugin}.json"
channel_badge = CHANNEL_BADGES.get(e["channel"], e["channel"])
origin_badge = ORIGIN_BADGES.get(resolve_origin(e), "")
stability = meta.get("stability", "")
stability_badge = STABILITY_BADGES.get(stability, "") if stability else ""
badges_parts = [b for b in [channel_badge, origin_badge, stability_badge] if b]
badges_line = " ".join(badges_parts)
tagline = meta.get("tagline", "")
if plugin_data:
gen = render_plugin(plugin_data)
else:
print(f" INFO : {fname} absent (nightly) → page stub pour {title}",
file=sys.stderr)
gen = render_nightly_stub(e, meta)
# Bloc BEGIN/END au format exact produit par process() :
# f"{m.group(1)}\n{gen}\n{m.group(4)}" avec gen qui se termine par \n
block = f"<!-- BEGIN GENERATED: {fname} -->\n{gen}\n<!-- END GENERATED -->"
header_parts = [f"# {title}", "", badges_line]
if tagline:
header_parts += ["", tagline]
header_parts.append("") # ligne vide avant le bloc BEGIN
header = "\n".join(header_parts)
return header + "\n" + block + "\n"
def build_category_index_page(cat: str, entries: list, plugins: dict, src: Path) -> str:
"""Construit le contenu complet d'un index.md de catégorie."""
label = CATEGORY_LABELS.get(cat, cat.capitalize())
gen = render_category_index(cat, entries, plugins, src)
# Même format que process() : f"{m.group(1)}\n{gen}\n{m.group(4)}"
block = f"<!-- BEGIN GENERATED: __index_{cat}__ -->\n{gen}\n<!-- END GENERATED -->"
return f"# {label}\n\n{block}\n"
# ── Création des fichiers manquants ───────────────────────────────────────────
def ensure_device_pages(entries: list, plugins: dict, src: Path,
docs: Path, check: bool) -> int:
"""Crée les fiches appareils inexistantes. Retourne 1 si manquant en mode --check."""
rc = 0
for e in entries:
if not e.get("plugin"):
continue
slug = e.get("slug") or e["plugin"]
folder = CATEGORY_FOLDER.get(e.get("category", ""), e.get("category", "autre"))
md_path = docs / "appareils" / folder / f"{slug}.md"
if md_path.exists():
continue
fname = f"integrationplugin{e['plugin']}.json"
plugin_data = plugins.get(fname)
meta = load_meta(src, e["plugin"])
content = build_device_page(e, plugin_data, meta)
if check:
print(f" MANQUANT : {md_path.relative_to(docs.parent)}")
rc = 1
else:
md_path.parent.mkdir(parents=True, exist_ok=True)
md_path.write_text(content, encoding="utf-8")
print(f" CRÉÉ : {md_path.relative_to(docs.parent)}")
return rc
def ensure_category_indexes(entries: list, plugins: dict, src: Path,
docs: Path, check: bool) -> int:
"""Crée les index.md de catégorie inexistants. Retourne 1 si manquant en --check."""
cats_present = {e.get("category") for e in entries if e.get("category")}
rc = 0
for cat in cats_present:
folder = CATEGORY_FOLDER.get(cat, cat)
index_path = docs / "appareils" / folder / "index.md"
if index_path.exists():
continue
content = build_category_index_page(cat, entries, plugins, src)
if check:
print(f" MANQUANT : {index_path.relative_to(docs.parent)}")
rc = 1
else:
index_path.parent.mkdir(parents=True, exist_ok=True)
index_path.write_text(content, encoding="utf-8")
print(f" CRÉÉ : {index_path.relative_to(docs.parent)}")
return rc
# ── Traitement des fichiers MD (injection dans les marqueurs) ─────────────────
def process(docs: Path, entries: list, plugins: dict, src: Path, check: bool) -> int:
entry_by_plugin = {e["plugin"]: e for e in entries if e.get("plugin")}
changed = []
for md in sorted(docs.rglob("*.md")):
text = md.read_text(encoding="utf-8")
if "<!-- BEGIN GENERATED:" not in text:
continue
def repl(m: re.Match) -> str:
key = m.group("key")
if key == "__matrix__":
gen = render_matrix(entries, plugins, src)
elif key.startswith("__index_"):
# key = "__index_<cat>__" → strip prefix "__index_" et suffix "__"
cat = key.removeprefix("__index_").removesuffix("__")
gen = render_category_index(cat, entries, plugins, src)
elif key in plugins:
gen = render_plugin(plugins[key])
else:
# Clé plugin absente : nightly sans JSON → stub ; sinon avertissement
plugin_name = key.removeprefix("integrationplugin").removesuffix(".json")
entry = entry_by_plugin.get(plugin_name)
if entry and entry.get("channel") == "nightly":
print(f" INFO : {key} absent (nightly) → stub dans {md.name}",
file=sys.stderr)
gen = render_nightly_stub(entry, load_meta(src, plugin_name))
else:
print(f" ! {md.name}: clé '{key}' absente de --src", file=sys.stderr)
return m.group(0)
return f"{m.group(1)}\n{gen}\n{m.group(4)}"
new = MARKER_RE.sub(repl, text)
if new != text:
changed.append(md)
if not check:
md.write_text(new, encoding="utf-8")
if check:
if changed:
print("Doc PAS à jour :", ", ".join(str(c) for c in changed))
return 1
print("Doc à jour.")
return 0
print(f"Régénéré : {len(changed)} page(s).")
for c in changed:
print(f" - {c.relative_to(docs)}")
return 0
# ── Point d'entrée ────────────────────────────────────────────────────────────
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--src", default=".plugins-src", type=Path,
help="Dossier source des JSON (plat ou avec sous-dossiers repos)")
ap.add_argument("--docs", default="docs", type=Path)
ap.add_argument("--lang", default="fr", help="Conservé pour compatibilité CI")
ap.add_argument("--check", action="store_true",
help="CI : exit 1 si la doc n'est pas à jour")
a = ap.parse_args()
src = a.src
entries = load_porting_status(Path.cwd())
plugins = load_plugins(src)
if not plugins:
print(f"Aucun integrationplugin*.json trouvé dans {src}", file=sys.stderr)
return 2
print(f"{len(plugins)} plugin(s) chargé(s) : {', '.join(plugins)}")
validate_entries(entries, plugins, src)
rc_pages = ensure_device_pages(entries, plugins, src, a.docs, a.check)
rc_indexes = ensure_category_indexes(entries, plugins, src, a.docs, a.check)
rc_summary = generate_summary(entries, a.docs, src, plugins, a.check)
rc_process = process(a.docs, entries, plugins, src, a.check)
return max(rc_pages, rc_indexes, rc_summary, rc_process)
if __name__ == "__main__":
raise SystemExit(main())