- PORTING_STATUS.yaml : source de vérité canal APT + placement nav - scripts/gen_device_reference.py : génération matrice + fiches + SUMMARY.md depuis integrationplugin*.json + meta.json ; nightly sans JSON = invisible - mkdocs.yml : plugin literate-nav, nav 6 sections, Appareils via SUMMARY.md - .gitea/workflows/docs.yml : CI complet — fetch JSON (branche auto-détectée), génération, build --strict, check idempotence, rsync deploy - Badges HTML (stable/testing/nightly + consumer/community + ok/part/road) - Fiches appareils : Eastron, ABB B2x, ABB Terra, Keba, Waveshare - requirements.txt : mkdocs-material, mkdocs-literate-nav, PyYAML Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
410 lines
15 KiB
Python
410 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
gen_device_reference.py — génère la matrice de compatibilité, les sections de
|
||
référence des appareils et le fichier de nav literate-nav (SUMMARY.md) depuis
|
||
PORTING_STATUS.yaml + integrationplugin*.json + meta.json.
|
||
|
||
Principe docs-as-code : ne remplace que le contenu entre marqueurs existants.
|
||
|
||
<!-- BEGIN GENERATED: integrationplugineastron.json -->
|
||
... contenu régénéré ...
|
||
<!-- END GENERATED -->
|
||
|
||
Marqueur spécial pour la matrice de compatibilité :
|
||
<!-- BEGIN GENERATED: __matrix__ -->
|
||
|
||
Usage :
|
||
python3 scripts/gen_device_reference.py --src .plugins-src --docs docs --lang fr
|
||
python3 scripts/gen_device_reference.py --src .plugins-src --docs docs --lang fr --check
|
||
"""
|
||
from __future__ import annotations
|
||
import argparse
|
||
import json
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
try:
|
||
import yaml
|
||
except ImportError:
|
||
sys.exit("PyYAML est requis : pip install PyYAML")
|
||
|
||
INTERFACE_ROLE = {
|
||
"energymeter": "Compteur d'énergie",
|
||
"smartmeterconsumer": "Compteur de consommation",
|
||
"smartmeterproducer": "Compteur de production",
|
||
"evcharger": "Borne de recharge",
|
||
"heatpump": "Pompe à chaleur",
|
||
"smartmeter": "Compteur intelligent",
|
||
}
|
||
TECH_INTERFACES = {"connectable", "networkdevice"}
|
||
|
||
CREATE_METHOD = {
|
||
"discovery": "Découverte automatique",
|
||
"user": "Ajout manuel",
|
||
"auto": "Automatique",
|
||
}
|
||
|
||
CATEGORY_LABELS = {
|
||
"compteur": "Compteurs",
|
||
"irve": "Bornes de recharge",
|
||
"smartdevice": "SmartDevices",
|
||
"hvac": "HVAC",
|
||
"onduleur": "Onduleurs / PV",
|
||
"batterie": "Batteries / ESS",
|
||
"tarif": "Tarifs & prévisions",
|
||
}
|
||
|
||
CATEGORY_FOLDER = {
|
||
"compteur": "compteurs",
|
||
"irve": "bornes",
|
||
"smartdevice": "smart",
|
||
"hvac": "hvac",
|
||
"onduleur": "onduleurs",
|
||
"batterie": "batteries",
|
||
"tarif": "tarifs",
|
||
}
|
||
|
||
CATEGORY_ORDER = ["compteur", "irve", "smartdevice", "hvac", "onduleur", "batterie", "tarif"]
|
||
|
||
CHANNEL_BADGES = {
|
||
"stable": '<span class="badge stable">STABLE</span>',
|
||
"testing": '<span class="badge testing">TESTING</span>',
|
||
"nightly": '<span class="badge nightly">NIGHTLY</span>',
|
||
}
|
||
|
||
STABILITY_BADGES = {
|
||
"consumer": '<span class="badge consumer">CONSUMER</span>',
|
||
"community": '<span class="badge community">COMMUNITY</span>',
|
||
}
|
||
|
||
MARKER_RE = re.compile(
|
||
r"(<!-- BEGIN GENERATED: (?P<key>[^\s]+) -->)(?P<body>.*?)(<!-- END GENERATED -->)",
|
||
re.DOTALL,
|
||
)
|
||
|
||
|
||
# ── Chargement ────────────────────────────────────────────────────────────────
|
||
|
||
def load_porting_status(root: Path) -> list:
|
||
path = root / "PORTING_STATUS.yaml"
|
||
if not path.exists():
|
||
sys.exit(f"ERREUR : PORTING_STATUS.yaml introuvable à {path}")
|
||
with open(path) as f:
|
||
return yaml.safe_load(f)
|
||
|
||
|
||
def load_plugins(src: Path) -> dict:
|
||
"""Charge tous les integrationplugin*.json trouvés (récursivement)."""
|
||
plugins: dict = {}
|
||
for f in sorted(src.rglob("integrationplugin*.json")):
|
||
if f.name not in plugins:
|
||
plugins[f.name] = json.loads(f.read_text(encoding="utf-8"))
|
||
return plugins
|
||
|
||
|
||
def load_meta(src: Path, plugin: str) -> dict:
|
||
"""Charge meta.json depuis le même dossier que integrationplugin<plugin>.json."""
|
||
for f in sorted(src.rglob(f"integrationplugin{plugin}.json")):
|
||
meta_path = f.parent / "meta.json"
|
||
if meta_path.exists():
|
||
return json.loads(meta_path.read_text(encoding="utf-8"))
|
||
return {}
|
||
return {}
|
||
|
||
|
||
# ── Libellé d'un appareil ────────────────────────────────────────────────────
|
||
|
||
def entry_name(e: dict, meta: dict) -> str:
|
||
"""Titre : PORTING_STATUS.name > meta.json.title > plugin (fallback dernier recours)."""
|
||
return e.get("name") or meta.get("title") or e.get("plugin", "?")
|
||
|
||
|
||
# ── Helpers JSON nymea ────────────────────────────────────────────────────────
|
||
|
||
def transport_of(tc: dict) -> str:
|
||
params = {p["name"] for p in tc.get("paramTypes", [])} | \
|
||
{p["name"] for p in tc.get("discoveryParamTypes", [])}
|
||
ifaces = set(tc.get("interfaces", []))
|
||
if "networkdevice" in ifaces or {"hostName", "address", "port", "macAddress"} & params:
|
||
return "Modbus TCP"
|
||
if {"modbusMasterUuid", "rtuMaster"} & params:
|
||
return "Modbus RTU"
|
||
return "—"
|
||
|
||
|
||
def roles(tc: dict) -> str:
|
||
r = [INTERFACE_ROLE.get(i, i) for i in tc.get("interfaces", []) if i not in TECH_INTERFACES]
|
||
return ", ".join(r) if r else "—"
|
||
|
||
|
||
def add_method(tc: dict) -> str:
|
||
return " / ".join(CREATE_METHOD.get(m, m) for m in tc.get("createMethods", [])) or "—"
|
||
|
||
|
||
def resolve_protocol(plugin_data: dict | None) -> str:
|
||
if not plugin_data:
|
||
return "—"
|
||
for v in plugin_data.get("vendors", []):
|
||
for tc in v.get("thingClasses", []):
|
||
t = transport_of(tc)
|
||
if t != "—":
|
||
return t
|
||
return "—"
|
||
|
||
|
||
# ── Rendu Markdown ────────────────────────────────────────────────────────────
|
||
|
||
def fmt_range(p: dict) -> str:
|
||
lo, hi = p.get("minValue"), p.get("maxValue")
|
||
if lo is not None and hi is not None:
|
||
return f"{lo}–{hi}"
|
||
return "—"
|
||
|
||
|
||
def fmt_default(p: dict) -> str:
|
||
d = p.get("defaultValue", "")
|
||
if d == "" or d is None:
|
||
return "—"
|
||
return f"`{d}`"
|
||
|
||
|
||
def md_table(headers: list, rows: list) -> str:
|
||
out = ["| " + " | ".join(headers) + " |",
|
||
"| " + " | ".join("---" for _ in headers) + " |"]
|
||
for r in rows:
|
||
out.append("| " + " | ".join(str(c) for c in r) + " |")
|
||
return "\n".join(out)
|
||
|
||
|
||
def render_params(tc: dict) -> list:
|
||
lines = []
|
||
disc = tc.get("discoveryParamTypes", [])
|
||
if disc:
|
||
rows = [[f"`{p['name']}`", p.get("displayName", ""), p.get("type", ""),
|
||
fmt_range(p), fmt_default(p)] for p in disc]
|
||
lines.append("_Paramètres de découverte :_")
|
||
lines.append(md_table(["Clé", "Libellé", "Type", "Plage", "Défaut"], rows))
|
||
lines.append("")
|
||
settings = tc.get("paramTypes", [])
|
||
if settings:
|
||
rows = [[f"`{p['name']}`", p.get("displayName", ""), p.get("type", ""),
|
||
fmt_range(p), fmt_default(p),
|
||
"oui" if p.get("readOnly") else "non"] for p in settings]
|
||
lines.append("_Réglages :_")
|
||
lines.append(md_table(["Clé", "Libellé", "Type", "Plage", "Défaut", "Lecture seule"], rows))
|
||
lines.append("")
|
||
return lines
|
||
|
||
|
||
def render_states(tc: dict) -> list:
|
||
states = tc.get("stateTypes", [])
|
||
if not states:
|
||
return []
|
||
rows = [[f"`{s['name']}`", s.get("displayName", ""), s.get("type", ""),
|
||
s.get("unit") or "—"] for s in states]
|
||
return [md_table(["Clé", "Grandeur", "Type", "Unité"], rows), ""]
|
||
|
||
|
||
def render_plugin(plugin: dict) -> str:
|
||
out = []
|
||
vendors = plugin.get("vendors", [])
|
||
vname = ", ".join(v.get("displayName", v.get("name", "")) for v in vendors)
|
||
out.append(f"**Fabricant :** {vname} ")
|
||
out.append(f"**Plugin :** `{plugin.get('name', '')}`")
|
||
out.append("")
|
||
|
||
tcs = [(v, tc) for v in vendors for tc in v.get("thingClasses", [])]
|
||
rows = [[f"**{tc.get('displayName', tc.get('name'))}**",
|
||
roles(tc), transport_of(tc), add_method(tc),
|
||
str(len(tc.get("stateTypes", [])))]
|
||
for _, tc in tcs]
|
||
out.append("#### Modèles pris en charge")
|
||
out.append(md_table(["Modèle", "Rôle", "Transport", "Ajout", "Grandeurs"], rows))
|
||
out.append("")
|
||
|
||
out.append("#### Détail par modèle")
|
||
for _, tc in tcs:
|
||
title = tc.get("displayName", tc.get("name"))
|
||
out.append(f'??? abstract "{title} — `{tc.get("name")}`"')
|
||
body = []
|
||
body += render_params(tc)
|
||
if tc.get("stateTypes"):
|
||
body.append("_Grandeurs mesurées :_")
|
||
body += render_states(tc)
|
||
for ln in ("\n".join(body)).splitlines():
|
||
out.append(" " + ln if ln else "")
|
||
out.append("")
|
||
return "\n".join(out).rstrip() + "\n"
|
||
|
||
|
||
def render_matrix(entries: list, plugins: dict, src: Path) -> str:
|
||
by_cat: dict = {}
|
||
for e in entries:
|
||
by_cat.setdefault(e.get("category", "autre"), []).append(e)
|
||
|
||
lines = []
|
||
ordered_cats = [c for c in CATEGORY_ORDER if c in by_cat]
|
||
ordered_cats += [c for c in by_cat if c not in CATEGORY_ORDER]
|
||
|
||
for cat in ordered_cats:
|
||
label = CATEGORY_LABELS.get(cat, cat.capitalize())
|
||
cat_rows = []
|
||
for e in by_cat[cat]:
|
||
fname = f"integrationplugin{e['plugin']}.json" if e.get("plugin") else None
|
||
plugin_data = plugins.get(fname) if fname else None
|
||
# nightly sans JSON → absent de la matrice et du nav
|
||
if e["channel"] == "nightly" and (not fname or fname not in plugins):
|
||
continue
|
||
meta = load_meta(src, e["plugin"]) if e.get("plugin") else {}
|
||
name = entry_name(e, meta)
|
||
protocol = resolve_protocol(plugin_data)
|
||
channel_badge = CHANNEL_BADGES.get(e["channel"], e["channel"])
|
||
stability = meta.get("stability", "")
|
||
stability_badge = STABILITY_BADGES.get(stability, "—") if stability else "—"
|
||
cat_rows.append(f"| {name} | {protocol} | {channel_badge} | {stability_badge} |")
|
||
if not cat_rows:
|
||
continue
|
||
lines.append(f"## {label}\n")
|
||
lines.append("| Marque / Modèle | Protocole | Canal | Stabilité |")
|
||
lines.append("|---|---|---|---|")
|
||
lines.extend(cat_rows)
|
||
lines.append("")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ── SUMMARY.md pour mkdocs-literate-nav ──────────────────────────────────────
|
||
|
||
def generate_summary(entries: list, docs: Path, src: Path, plugins: dict) -> None:
|
||
"""Écrit docs/appareils/SUMMARY.md (nav literate-nav)."""
|
||
by_cat: dict = {}
|
||
for e in entries:
|
||
by_cat.setdefault(e.get("category", "autre"), []).append(e)
|
||
|
||
lines = ["* [Compatibilité](compatibilite.md)"]
|
||
|
||
ordered_cats = [c for c in CATEGORY_ORDER if c in by_cat]
|
||
ordered_cats += [c for c in by_cat if c not in CATEGORY_ORDER]
|
||
|
||
for cat in ordered_cats:
|
||
label = CATEGORY_LABELS.get(cat, cat.capitalize())
|
||
folder = CATEGORY_FOLDER.get(cat, cat)
|
||
cat_entries = []
|
||
for e in by_cat[cat]:
|
||
if not e.get("plugin"):
|
||
continue
|
||
fname = f"integrationplugin{e['plugin']}.json"
|
||
# nightly sans JSON → pas de fiche, pas d'entrée nav
|
||
if e["channel"] == "nightly" and fname not in plugins:
|
||
continue
|
||
meta = load_meta(src, e["plugin"])
|
||
name = entry_name(e, meta)
|
||
slug = e.get("slug") or e["plugin"]
|
||
cat_entries.append(f" * [{name}]({folder}/{slug}.md)")
|
||
if cat_entries:
|
||
lines.append(f"* [{label}]({folder}/index.md)")
|
||
lines.extend(cat_entries)
|
||
|
||
summary_path = docs / "appareils" / "SUMMARY.md"
|
||
summary_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||
print(f"SUMMARY.md → {summary_path.relative_to(docs.parent)}")
|
||
|
||
|
||
# ── Validation ────────────────────────────────────────────────────────────────
|
||
|
||
def validate_entries(entries: list, plugins: dict, src: Path) -> None:
|
||
errors = []
|
||
for e in entries:
|
||
if not e.get("plugin"):
|
||
continue
|
||
fname = f"integrationplugin{e['plugin']}.json"
|
||
|
||
if e["channel"] != "nightly":
|
||
# JSON obligatoire pour stable / testing
|
||
if fname not in plugins:
|
||
errors.append(
|
||
f" BLOQUANT : {fname} introuvable dans --src "
|
||
f"(channel={e['channel']}, plugin={e['plugin']})"
|
||
)
|
||
# Libellé obligatoire
|
||
meta = load_meta(src, e["plugin"])
|
||
if not e.get("name") and not meta.get("title"):
|
||
errors.append(
|
||
f" BLOQUANT : libellé manquant pour plugin={e['plugin']} "
|
||
f"(ni PORTING_STATUS.name ni meta.json.title)"
|
||
)
|
||
|
||
if errors:
|
||
sys.exit("ERREURS BLOQUANTES :\n" + "\n".join(errors))
|
||
|
||
|
||
# ── Traitement des fichiers MD ────────────────────────────────────────────────
|
||
|
||
def process(docs: Path, entries: list, plugins: dict, src: Path, check: bool) -> int:
|
||
entry_by_plugin = {e["plugin"]: e for e in entries if e.get("plugin")}
|
||
changed = []
|
||
|
||
for md in sorted(docs.rglob("*.md")):
|
||
text = md.read_text(encoding="utf-8")
|
||
if "<!-- BEGIN GENERATED:" not in text:
|
||
continue
|
||
|
||
def repl(m: re.Match) -> str:
|
||
key = m.group("key")
|
||
if key == "__matrix__":
|
||
gen = render_matrix(entries, plugins, src)
|
||
elif key in plugins:
|
||
gen = render_plugin(plugins[key])
|
||
else:
|
||
print(f" ! {md.name}: clé '{key}' absente de --src", file=sys.stderr)
|
||
return m.group(0)
|
||
return f"{m.group(1)}\n{gen}\n{m.group(4)}"
|
||
|
||
new = MARKER_RE.sub(repl, text)
|
||
if new != text:
|
||
changed.append(md)
|
||
if not check:
|
||
md.write_text(new, encoding="utf-8")
|
||
|
||
if check:
|
||
if changed:
|
||
print("Doc PAS à jour :", ", ".join(str(c) for c in changed))
|
||
return 1
|
||
print("Doc à jour.")
|
||
return 0
|
||
print(f"Régénéré : {len(changed)} page(s).")
|
||
for c in changed:
|
||
print(f" - {c.relative_to(docs)}")
|
||
return 0
|
||
|
||
|
||
# ── Point d'entrée ────────────────────────────────────────────────────────────
|
||
|
||
def main() -> int:
|
||
ap = argparse.ArgumentParser(description=__doc__)
|
||
ap.add_argument("--src", default=".plugins-src", type=Path,
|
||
help="Dossier source des JSON (plat ou avec sous-dossiers repos)")
|
||
ap.add_argument("--docs", default="docs", type=Path)
|
||
ap.add_argument("--lang", default="fr", help="Conservé pour compatibilité CI")
|
||
ap.add_argument("--check", action="store_true",
|
||
help="CI : exit 1 si la doc n'est pas à jour")
|
||
a = ap.parse_args()
|
||
|
||
src = a.src
|
||
entries = load_porting_status(Path.cwd())
|
||
plugins = load_plugins(src)
|
||
|
||
if not plugins:
|
||
print(f"Aucun integrationplugin*.json trouvé dans {src}", file=sys.stderr)
|
||
return 2
|
||
print(f"{len(plugins)} plugin(s) chargé(s) : {', '.join(plugins)}")
|
||
|
||
validate_entries(entries, plugins, src)
|
||
generate_summary(entries, a.docs, src, plugins)
|
||
return process(a.docs, entries, plugins, src, a.check)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|