#!/usr/bin/env python3 """ gen_device_reference.py — génère la matrice de compatibilité, les sections de référence des appareils et le fichier de nav literate-nav (SUMMARY.md) depuis PORTING_STATUS.yaml + integrationplugin*.json + meta.json. Deux modes de traitement des fiches : - Injection : remplace le contenu entre marqueurs dans les fiches existantes. - Création : crée entièrement les fiches et index inexistants. Marqueurs gérés : fiche appareil matrice compatibilité index de catégorie Usage : python3 scripts/gen_device_reference.py --src .plugins-src --docs docs --lang fr python3 scripts/gen_device_reference.py --src .plugins-src --docs docs --lang fr --check """ from __future__ import annotations import argparse import json import re import sys from pathlib import Path try: import yaml except ImportError: sys.exit("PyYAML est requis : pip install PyYAML") INTERFACE_ROLE = { "energymeter": "Compteur d'énergie", "smartmeterconsumer": "Compteur de consommation", "smartmeterproducer": "Compteur de production", "evcharger": "Borne de recharge", "heatpump": "Pompe à chaleur", "smartmeter": "Compteur intelligent", } TECH_INTERFACES = {"connectable", "networkdevice", "wirelessconnectable"} CREATE_METHOD = { "discovery": "Découverte automatique", "user": "Ajout manuel", "auto": "Automatique", } CATEGORY_LABELS = { "compteur": "Compteurs", "irve": "Bornes de recharge", "smartdevice": "SmartDevices", "hvac": "HVAC", "onduleur": "Onduleurs / PV", "batterie": "Batteries / ESS", "tarif": "Tarifs & prévisions", "generic": "Types génériques", } CATEGORY_FOLDER = { "compteur": "compteurs", "irve": "bornes", "smartdevice": "smart", "hvac": "hvac", "onduleur": "onduleurs", "batterie": "batteries", "tarif": "tarifs", "generic": "generic", } CATEGORY_ORDER = [ "compteur", "irve", "smartdevice", "hvac", "onduleur", "batterie", "tarif", "generic", ] CHANNEL_BADGES = { "stable": 'STABLE', "testing": 'TESTING', "nightly": 'NIGHTLY', } STABILITY_BADGES = { "consumer": 'CONSUMER', "community": 'COMMUNITY', "experimental": 'EXPERIMENTAL', } ORIGIN_BADGES = { "nymea": 'NYMEA', "etm": 'ETM', "tiers": 'TIERS', } def resolve_origin(e: dict) -> str: """Détermine l'origine depuis le champ explicite `origin:` ou depuis `repo:`.""" if e.get("origin"): return e["origin"] repo = e.get("repo", "") if repo.startswith("nymea-"): return "nymea" if repo.startswith("etm-"): return "etm" return "tiers" def render_header_badges(e: dict, meta: dict) -> str: """Rendu de la ligne de badges d'en-tête : canal, origine, stabilité.""" channel_badge = CHANNEL_BADGES.get(e["channel"], e["channel"]) origin_badge = ORIGIN_BADGES.get(resolve_origin(e), "") stability = meta.get("stability", "") stability_badge = STABILITY_BADGES.get(stability, "") if stability else "" parts = [b for b in [channel_badge, origin_badge, stability_badge] if b] return " ".join(parts) + "\n" MARKER_RE = re.compile( r"()(?P.*?)()", re.DOTALL, ) # ── Chargement ──────────────────────────────────────────────────────────────── def load_porting_status(root: Path) -> list: path = root / "PORTING_STATUS.yaml" if not path.exists(): sys.exit(f"ERREUR : PORTING_STATUS.yaml introuvable à {path}") with open(path) as f: return yaml.safe_load(f) def load_plugins(src: Path) -> dict: """Charge tous les integrationplugin*.json trouvés récursivement. Logue explicitement chaque fichier vide ou non parsable (ne plante pas). """ plugins: dict = {} for f in sorted(src.rglob("integrationplugin*.json")): if f.name in plugins: continue raw = f.read_text(encoding="utf-8").strip() if not raw: print(f" AVERTISSEMENT : {f} est vide — ignoré", file=sys.stderr) continue try: plugins[f.name] = json.loads(raw) except json.JSONDecodeError as exc: print(f" AVERTISSEMENT : {f} non parsable ({exc}) — ignoré", file=sys.stderr) return plugins def load_meta(src: Path, plugin: str) -> dict: """Charge meta.json depuis le même dossier que integrationplugin.json. Logue explicitement si le fichier est vide ou non parsable. """ for f in sorted(src.rglob(f"integrationplugin{plugin}.json")): meta_path = f.parent / "meta.json" if not meta_path.exists(): return {} raw = meta_path.read_text(encoding="utf-8").strip() if not raw: print(f" AVERTISSEMENT : {meta_path} est vide — meta ignoré", file=sys.stderr) return {} try: return json.loads(raw) except json.JSONDecodeError as exc: print(f" AVERTISSEMENT : {meta_path} non parsable ({exc}) — meta ignoré", file=sys.stderr) return {} return {} # ── Libellé d'un appareil ──────────────────────────────────────────────────── def entry_name(e: dict, meta: dict) -> str: """Titre : PORTING_STATUS.name > meta.json.title > plugin (fallback dernier recours).""" return e.get("name") or meta.get("title") or e.get("plugin", "?") # ── Helpers JSON nymea ──────────────────────────────────────────────────────── def transport_of(tc: dict) -> str: params = {p["name"] for p in tc.get("paramTypes", [])} | \ {p["name"] for p in tc.get("discoveryParamTypes", [])} ifaces = set(tc.get("interfaces", [])) if "networkdevice" in ifaces or {"hostName", "address", "port", "macAddress"} & params: return "Modbus TCP" if {"modbusMasterUuid", "rtuMaster"} & params: return "Modbus RTU" return "—" def roles(tc: dict) -> str: r = [INTERFACE_ROLE.get(i, i) for i in tc.get("interfaces", []) if i not in TECH_INTERFACES] return ", ".join(r) if r else "—" def add_method(tc: dict) -> str: return " / ".join(CREATE_METHOD.get(m, m) for m in tc.get("createMethods", [])) or "—" def resolve_protocol(plugin_data: dict | None) -> str: if not plugin_data: return "—" for v in plugin_data.get("vendors", []): for tc in v.get("thingClasses", []): t = transport_of(tc) if t != "—": return t return "—" # ── Rendu Markdown ──────────────────────────────────────────────────────────── def fmt_range(p: dict) -> str: lo, hi = p.get("minValue"), p.get("maxValue") if lo is not None and hi is not None: return f"{lo}–{hi}" return "—" def fmt_default(p: dict) -> str: d = p.get("defaultValue", "") if d == "" or d is None: return "—" return f"`{d}`" def md_table(headers: list, rows: list) -> str: out = ["| " + " | ".join(headers) + " |", "| " + " | ".join("---" for _ in headers) + " |"] for r in rows: out.append("| " + " | ".join(str(c) for c in r) + " |") return "\n".join(out) def render_params(tc: dict) -> list: lines = [] disc = tc.get("discoveryParamTypes", []) if disc: rows = [[f"`{p['name']}`", p.get("displayName", ""), p.get("type", ""), fmt_range(p), fmt_default(p)] for p in disc] lines.append("_Paramètres de découverte :_") lines.append(md_table(["Clé", "Libellé", "Type", "Plage", "Défaut"], rows)) lines.append("") settings = tc.get("paramTypes", []) if settings: rows = [[f"`{p['name']}`", p.get("displayName", ""), p.get("type", ""), fmt_range(p), fmt_default(p), "oui" if p.get("readOnly") else "non"] for p in settings] lines.append("_Réglages :_") lines.append(md_table(["Clé", "Libellé", "Type", "Plage", "Défaut", "Lecture seule"], rows)) lines.append("") return lines def render_states(tc: dict) -> list: states = tc.get("stateTypes", []) if not states: return [] rows = [[f"`{s['name']}`", s.get("displayName", ""), s.get("type", ""), s.get("unit") or "—"] for s in states] return [md_table(["Clé", "Grandeur", "Type", "Unité"], rows), ""] def render_plugin(plugin: dict) -> str: out = [] vendors = plugin.get("vendors", []) vname = ", ".join(v.get("displayName", v.get("name", "")) for v in vendors) out.append(f"**Fabricant :** {vname} ") out.append(f"**Plugin :** `{plugin.get('name', '')}`") out.append("") tcs = [(v, tc) for v in vendors for tc in v.get("thingClasses", [])] rows = [[f"**{tc.get('displayName', tc.get('name'))}**", roles(tc), transport_of(tc), add_method(tc), str(len(tc.get("stateTypes", [])))] for _, tc in tcs] out.append("#### Modèles pris en charge") out.append(md_table(["Modèle", "Rôle", "Transport", "Ajout", "Grandeurs"], rows)) out.append("") out.append("#### Détail par modèle") for _, tc in tcs: title = tc.get("displayName", tc.get("name")) out.append(f'??? abstract "{title} — `{tc.get("name")}`"') body = [] body += render_params(tc) if tc.get("stateTypes"): body.append("_Grandeurs mesurées :_") body += render_states(tc) for ln in ("\n".join(body)).splitlines(): out.append(" " + ln if ln else "") out.append("") return "\n".join(out).rstrip() + "\n" def render_nightly_stub(entry: dict, meta: dict) -> str: """Contenu généré pour une entrée nightly dont le JSON est absent.""" name = entry_name(entry, meta) return ( f"_Support de **{name}** en cours d'intégration — canal nightly._\n" "\n" "_La documentation technique sera disponible dès que le plugin sera " "disponible sur le dépôt stable._\n" ) def render_category_index(cat: str, entries: list, plugins: dict, src: Path) -> str: """Tableau listant les appareils d'une catégorie, pour l'index.md auto-généré.""" cat_entries = [e for e in entries if e.get("category") == cat and e.get("plugin")] if not cat_entries: return "_Aucun appareil dans cette catégorie._\n" rows = [] for e in cat_entries: fname = f"integrationplugin{e['plugin']}.json" plugin_data = plugins.get(fname) meta = load_meta(src, e["plugin"]) name = entry_name(e, meta) slug = e.get("slug") or e["plugin"] protocol = resolve_protocol(plugin_data) if plugin_data else "—" channel_badge = CHANNEL_BADGES.get(e["channel"], e["channel"]) origin_badge = ORIGIN_BADGES.get(resolve_origin(e), "—") stability = meta.get("stability", "") stability_badge = STABILITY_BADGES.get(stability, "—") if stability else "—" rows.append([f"[{name}]({slug}.md)", protocol, channel_badge, origin_badge, stability_badge]) return md_table(["Appareil", "Protocole", "Canal", "Origine", "Stabilité"], rows) + "\n" def render_matrix(entries: list, plugins: dict, src: Path) -> str: by_cat: dict = {} for e in entries: by_cat.setdefault(e.get("category", "autre"), []).append(e) lines = [] ordered_cats = [c for c in CATEGORY_ORDER if c in by_cat] ordered_cats += [c for c in by_cat if c not in CATEGORY_ORDER] for cat in ordered_cats: label = CATEGORY_LABELS.get(cat, cat.capitalize()) cat_rows = [] for e in by_cat[cat]: fname = f"integrationplugin{e['plugin']}.json" if e.get("plugin") else None plugin_data = plugins.get(fname) if fname else None if e["channel"] == "nightly" and (not fname or fname not in plugins): print(f" INFO matrice : {e.get('plugin', '?')} nightly sans JSON " f"→ ligne stub dans la matrice", file=sys.stderr) meta = load_meta(src, e["plugin"]) if e.get("plugin") else {} name = entry_name(e, meta) protocol = resolve_protocol(plugin_data) channel_badge = CHANNEL_BADGES.get(e["channel"], e["channel"]) origin_badge = ORIGIN_BADGES.get(resolve_origin(e), "—") stability = meta.get("stability", "") stability_badge = STABILITY_BADGES.get(stability, "—") if stability else "—" cat_rows.append( f"| {name} | {protocol} | {channel_badge} | {origin_badge} | {stability_badge} |" ) if not cat_rows: continue lines.append(f"## {label}\n") lines.append("| Marque / Modèle | Protocole | Canal | Origine | Stabilité |") lines.append("|---|---|---|---|---|") lines.extend(cat_rows) lines.append("") return "\n".join(lines) # ── SUMMARY.md pour mkdocs-literate-nav ────────────────────────────────────── def generate_summary(entries: list, docs: Path, src: Path, plugins: dict, check: bool) -> int: """Écrit docs/appareils/SUMMARY.md. Retourne 1 en mode --check si périmé.""" by_cat: dict = {} for e in entries: by_cat.setdefault(e.get("category", "autre"), []).append(e) lines = ["* [Compatibilité](compatibilite.md)"] ordered_cats = [c for c in CATEGORY_ORDER if c in by_cat] ordered_cats += [c for c in by_cat if c not in CATEGORY_ORDER] for cat in ordered_cats: label = CATEGORY_LABELS.get(cat, cat.capitalize()) folder = CATEGORY_FOLDER.get(cat, cat) cat_entries = [] for e in by_cat[cat]: if not e.get("plugin"): continue meta = load_meta(src, e["plugin"]) name = entry_name(e, meta) slug = e.get("slug") or e["plugin"] cat_entries.append(f" * [{name}]({folder}/{slug}.md)") if cat_entries: lines.append(f"* [{label}]({folder}/index.md)") lines.extend(cat_entries) new_content = "\n".join(lines) + "\n" summary_path = docs / "appareils" / "SUMMARY.md" if check: if not summary_path.exists() or summary_path.read_text(encoding="utf-8") != new_content: print(f" SUMMARY.md périmé : {summary_path.relative_to(docs.parent)}") return 1 return 0 summary_path.write_text(new_content, encoding="utf-8") print(f"SUMMARY.md → {summary_path.relative_to(docs.parent)}") return 0 # ── Validation ──────────────────────────────────────────────────────────────── def validate_entries(entries: list, plugins: dict, src: Path) -> None: errors = [] for e in entries: if not e.get("plugin"): continue fname = f"integrationplugin{e['plugin']}.json" if e["channel"] != "nightly": if fname not in plugins: errors.append( f" BLOQUANT : {fname} introuvable dans --src " f"(channel={e['channel']}, plugin={e['plugin']})" ) meta = load_meta(src, e["plugin"]) if not e.get("name") and not meta.get("title"): errors.append( f" BLOQUANT : libellé manquant pour plugin={e['plugin']} " f"(ni PORTING_STATUS.name ni meta.json.title)" ) if errors: sys.exit("ERREURS BLOQUANTES :\n" + "\n".join(errors)) # ── Construction d'une page complète ───────────────────────────────────────── def build_device_page(e: dict, plugin_data: dict | None, meta: dict) -> str: """Construit le contenu complet d'une fiche appareil générée.""" title = entry_name(e, meta) plugin = e["plugin"] fname = f"integrationplugin{plugin}.json" badges_gen = render_header_badges(e, meta) badges_block = ( f"\n{badges_gen}\n" ) tagline = meta.get("tagline", "") if plugin_data: gen = render_plugin(plugin_data) else: print(f" INFO : {fname} absent (nightly) → page stub pour {title}", file=sys.stderr) gen = render_nightly_stub(e, meta) block = f"\n{gen}\n" header_parts = [f"# {title}", "", badges_block] if tagline: header_parts += ["", tagline] header_parts.append("") # ligne vide avant le bloc BEGIN header = "\n".join(header_parts) return header + "\n" + block + "\n" def build_category_index_page(cat: str, entries: list, plugins: dict, src: Path) -> str: """Construit le contenu complet d'un index.md de catégorie.""" label = CATEGORY_LABELS.get(cat, cat.capitalize()) gen = render_category_index(cat, entries, plugins, src) # Même format que process() : f"{m.group(1)}\n{gen}\n{m.group(4)}" block = f"\n{gen}\n" return f"---\nhide:\n - toc\n---\n\n# {label}\n\n{block}\n" # ── Création des fichiers manquants ─────────────────────────────────────────── def ensure_device_pages(entries: list, plugins: dict, src: Path, docs: Path, check: bool) -> int: """Crée les fiches appareils inexistantes. Retourne 1 si manquant en mode --check.""" rc = 0 for e in entries: if not e.get("plugin"): continue slug = e.get("slug") or e["plugin"] folder = CATEGORY_FOLDER.get(e.get("category", ""), e.get("category", "autre")) md_path = docs / "appareils" / folder / f"{slug}.md" if md_path.exists(): continue fname = f"integrationplugin{e['plugin']}.json" plugin_data = plugins.get(fname) meta = load_meta(src, e["plugin"]) content = build_device_page(e, plugin_data, meta) if check: print(f" MANQUANT : {md_path.relative_to(docs.parent)}") rc = 1 else: md_path.parent.mkdir(parents=True, exist_ok=True) md_path.write_text(content, encoding="utf-8") print(f" CRÉÉ : {md_path.relative_to(docs.parent)}") return rc def ensure_category_indexes(entries: list, plugins: dict, src: Path, docs: Path, check: bool) -> int: """Crée les index.md de catégorie inexistants. Retourne 1 si manquant en --check.""" cats_present = {e.get("category") for e in entries if e.get("category")} rc = 0 for cat in cats_present: folder = CATEGORY_FOLDER.get(cat, cat) index_path = docs / "appareils" / folder / "index.md" if index_path.exists(): continue content = build_category_index_page(cat, entries, plugins, src) if check: print(f" MANQUANT : {index_path.relative_to(docs.parent)}") rc = 1 else: index_path.parent.mkdir(parents=True, exist_ok=True) index_path.write_text(content, encoding="utf-8") print(f" CRÉÉ : {index_path.relative_to(docs.parent)}") return rc # ── Traitement des fichiers MD (injection dans les marqueurs) ───────────────── def process(docs: Path, entries: list, plugins: dict, src: Path, check: bool) -> int: entry_by_plugin = {e["plugin"]: e for e in entries if e.get("plugin")} changed = [] for md in sorted(docs.rglob("*.md")): text = md.read_text(encoding="utf-8") if "