powersync-docs/scripts/gen_api_reference.py
Patrick Schurig b26274595c
Some checks failed
Build & Deploy docs / build-deploy (push) Failing after 10m17s
feat: référence API JsonRPC générée depuis introspect.json
- Nouveau générateur scripts/gen_api_reference.py : 19 namespaces →
  docs/api/metier/ (10) + docs/api/systeme/ (9) + notifications.md +
  types.md (96 types · 55 enums · 4 flags) + SUMMARY.md literate-nav
- Badges permissionScope (perm-none/control/configure/admin) dans extra.css
- Guide docs/integrations/jsonrpc-api.md (connexion TCP/WS, auth, conventions énergie)
- mkdocs.yml : Référence API dans la nav, REST→JsonRPC renommé
- mkdocs build --strict : 0 warnings · --check idempotent

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-04 08:15:37 +02:00

635 lines
23 KiB
Python

#!/usr/bin/env python3
"""
gen_api_reference.py — génère la référence API JsonRPC depuis introspect.json.
Lit le dump d'introspection nymea (introspect.json) et produit :
- Une page par namespace dans docs/api/metier/ ou docs/api/systeme/
- docs/api/notifications.md
- docs/api/types.md
- docs/api/index.md (page d'accueil statique — créée si absente, jamais écrasée)
- docs/api/SUMMARY.md (nav literate-nav)
Marqueurs gérés :
<!-- BEGIN GENERATED: __api_<Namespace>__ -->
<!-- BEGIN GENERATED: __api_notifications__ -->
<!-- BEGIN GENERATED: __api_types__ -->
Usage :
python3 scripts/gen_api_reference.py --src introspect.json --docs docs
python3 scripts/gen_api_reference.py --src introspect.json --docs docs --check
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from collections import defaultdict
from pathlib import Path
try:
pass # stdlib only
except ImportError:
pass
# ── Namespaces ────────────────────────────────────────────────────────────────
NS_METIER_ORDER = [
"JSONRPC", "Energy", "Integrations", "AirConditioning", "EvDash",
"Rules", "Scripts", "Tags", "Logging", "AppData",
]
NS_SYSTEM_ORDER = [
"Configuration", "System", "Users", "NetworkManager", "ModbusRtu",
"Debug", "Transfers", "ZWave", "Zigbee",
]
NS_METIER = set(NS_METIER_ORDER)
NS_SYSTEM = set(NS_SYSTEM_ORDER)
# ── Badges permission scope ───────────────────────────────────────────────────
PERM_BADGES: dict[str, str] = {
"PermissionScopeNone":
'<span class="badge perm-none">PUBLIC</span>',
"PermissionScopeControlThings":
'<span class="badge perm-control">CONTROL</span>',
"PermissionScopeExecuteRules":
'<span class="badge perm-control">EXECUTE</span>',
"PermissionScopeConfigureThings":
'<span class="badge perm-configure">CONFIGURE</span>',
"PermissionScopeConfigureRules":
'<span class="badge perm-configure">CONFIGURE-RULES</span>',
"PermissionScopeAdmin":
'<span class="badge perm-admin">ADMIN</span>',
}
MARKER_RE = re.compile(
r"(<!-- BEGIN GENERATED: (?P<key>[^\s]+) -->)(?P<body>.*?)(<!-- END GENERATED -->)",
re.DOTALL,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def ns_slug(ns: str) -> str:
return ns.lower()
def ns_folder(ns: str) -> str:
return "metier" if ns in NS_METIER else "systeme"
def parse_field_name(raw: str) -> tuple[str, bool, bool]:
"""Strip `o:` / `r:` prefixes. Returns (name, optional, readonly)."""
name, optional, readonly = raw, False, False
while True:
if name.startswith("o:"):
optional = True
name = name[2:]
elif name.startswith("r:"):
readonly = True
name = name[2:]
else:
break
return name, optional, readonly
def render_type(value: object, types_path: str = "../types.md") -> str:
"""Render a type value as Markdown, resolving $ref to a clickable link."""
if isinstance(value, list):
inner = render_type(value[0], types_path) if value else "`?`"
return f"{inner}[]"
if isinstance(value, str):
if value.startswith("$ref:"):
type_name = value[5:]
anchor = type_name.lower()
return f"[{type_name}]({types_path}#{anchor})"
return f"`{value}`"
return f"`{value!s}`"
def render_type_self(value: object) -> str:
"""Like render_type but for self-referencing anchors (types page)."""
if isinstance(value, list):
inner = render_type_self(value[0]) if value else "`?`"
return f"{inner}[]"
if isinstance(value, str):
if value.startswith("$ref:"):
type_name = value[5:]
anchor = type_name.lower()
return f"[{type_name}](#{anchor})"
return f"`{value}`"
return f"`{value!s}`"
def render_fields_table(fields: dict, types_path: str = "../types.md") -> str:
"""Render a params/returns dict as a Markdown table. Returns trailing newline."""
if not fields:
return "_Aucun paramètre._\n"
rows = []
for raw_name in sorted(fields):
value = fields[raw_name]
name, optional, readonly = parse_field_name(raw_name)
type_str = render_type(value, types_path)
notes = []
if optional:
notes.append("optionnel")
if readonly:
notes.append("lecture seule")
rows.append([f"`{name}`", type_str, " · ".join(notes)])
lines = [
"| Champ | Type | Notes |",
"| --- | --- | --- |",
]
for r in rows:
lines.append("| " + " | ".join(r) + " |")
return "\n".join(lines) + "\n"
# ── Rendu d'une méthode ───────────────────────────────────────────────────────
def render_method(name: str, method: dict, types_path: str) -> str:
perm = method.get("permissionScope", "")
badge = PERM_BADGES.get(perm)
if not badge:
badge = f'<span class="badge perm-unknown">{perm or "inconnu"}</span>'
print(f" INFO : permissionScope inconnu '{perm}' pour {name}", file=sys.stderr)
desc = method.get("description", "").strip()
params = method.get("params") or {}
returns = method.get("returns") or {}
parts: list[str] = []
parts.append(f"### {name}")
parts.append("")
parts.append(badge)
parts.append("")
if desc:
parts.append(desc)
parts.append("")
parts.append("**Paramètres :**")
parts.append("")
parts.append(render_fields_table(params, types_path).rstrip())
parts.append("")
parts.append("**Retour :**")
parts.append("")
parts.append(render_fields_table(returns, types_path).rstrip())
parts.append("")
parts.append("---")
parts.append("")
return "\n".join(parts)
# ── Contenu des blocs générés ─────────────────────────────────────────────────
def render_namespace_content(
ns: str,
methods: dict,
notifications: dict,
types_path: str,
) -> str:
"""Contenu du bloc BEGIN/END d'une page namespace."""
ns_methods = {k: v for k, v in methods.items() if k.startswith(ns + ".")}
ns_notifs = {k: v for k, v in notifications.items() if k.startswith(ns + ".")}
parts: list[str] = []
if ns_methods:
parts.append("## Méthodes")
parts.append("")
for name in sorted(ns_methods):
parts.append(render_method(name, ns_methods[name], types_path))
if ns_notifs:
parts.append("## Notifications")
parts.append("")
for name in sorted(ns_notifs):
notif = ns_notifs[name]
desc = notif.get("description", "").strip()
notif_params = notif.get("params") or {}
parts.append(f"### {name}")
parts.append("")
if desc:
parts.append(desc)
parts.append("")
parts.append("**Paramètres :**")
parts.append("")
parts.append(render_fields_table(notif_params, types_path).rstrip())
parts.append("")
parts.append("---")
parts.append("")
return "\n".join(parts).rstrip() + "\n"
def render_notifications_content(notifications: dict, types_path: str = "types.md") -> str:
"""Contenu du bloc BEGIN/END de la page Notifications."""
by_ns: dict[str, dict] = defaultdict(dict)
for k, v in notifications.items():
ns = k.split(".")[0]
by_ns[ns][k] = v
all_ns_sorted = sorted(
by_ns.keys(),
key=lambda x: (0 if x in NS_METIER else 1, x),
)
parts: list[str] = []
for ns in all_ns_sorted:
parts.append(f"## {ns}")
parts.append("")
for name in sorted(by_ns[ns]):
notif = by_ns[ns][name]
desc = notif.get("description", "").strip()
notif_params = notif.get("params") or {}
parts.append(f"### {name}")
parts.append("")
if desc:
parts.append(desc)
parts.append("")
parts.append("**Paramètres :**")
parts.append("")
parts.append(render_fields_table(notif_params, types_path).rstrip())
parts.append("")
parts.append("---")
parts.append("")
return "\n".join(parts).rstrip() + "\n"
def render_types_content(types: dict, enums: dict, flags: dict) -> str:
"""Contenu du bloc BEGIN/END de la page Types & Enums."""
parts: list[str] = []
# Types (objets et listes)
parts.append("## Types")
parts.append("")
for name in sorted(types):
anchor = name.lower()
defn = types[name]
parts.append(f'### {name} {{#{anchor}}}')
parts.append("")
if isinstance(defn, list):
inner = render_type_self(defn[0]) if defn else "`?`"
parts.append(f"Liste de {inner}.")
parts.append("")
elif isinstance(defn, dict):
rows = []
for raw_name in sorted(defn):
value = defn[raw_name]
field_name, optional, readonly = parse_field_name(raw_name)
type_str = render_type_self(value)
notes = []
if optional:
notes.append("optionnel")
if readonly:
notes.append("lecture seule")
rows.append([f"`{field_name}`", type_str, " · ".join(notes)])
lines = ["| Champ | Type | Notes |", "| --- | --- | --- |"]
for r in rows:
lines.append("| " + " | ".join(r) + " |")
parts.append("\n".join(lines))
parts.append("")
else:
parts.append(f"`{defn}`")
parts.append("")
parts.append("---")
parts.append("")
# Enums
parts.append("## Enums")
parts.append("")
for name in sorted(enums):
anchor = name.lower()
values = enums[name]
parts.append(f'### {name} {{#{anchor}}}')
parts.append("")
if isinstance(values, list):
for v in values:
if isinstance(v, str) and v.startswith("$ref:"):
parts.append(f"- {render_type_self(v)}")
else:
parts.append(f"- `{v}`")
parts.append("")
parts.append("---")
parts.append("")
# Flags
if flags:
parts.append("## Flags")
parts.append("")
for name in sorted(flags):
anchor = name.lower()
values = flags[name]
parts.append(f'### {name} {{#{anchor}}}')
parts.append("")
if isinstance(values, list):
for v in values:
if isinstance(v, str) and v.startswith("$ref:"):
parts.append(f"- {render_type_self(v)}")
else:
parts.append(f"- `{v}`")
parts.append("")
parts.append("---")
parts.append("")
return "\n".join(parts).rstrip() + "\n"
# ── Construction des pages complètes ─────────────────────────────────────────
def build_namespace_page(ns: str, methods: dict, notifications: dict) -> str:
"""Page namespace complète (pour ensure_pages — doit correspondre à process())."""
gen = render_namespace_content(ns, methods, notifications, types_path="../types.md")
block = f"<!-- BEGIN GENERATED: __api_{ns}__ -->\n{gen}\n<!-- END GENERATED -->"
return f"# {ns}\n\n{block}\n"
def build_notifications_page(notifications: dict) -> str:
gen = render_notifications_content(notifications, types_path="types.md")
block = f"<!-- BEGIN GENERATED: __api_notifications__ -->\n{gen}\n<!-- END GENERATED -->"
return f"# Notifications\n\n{block}\n"
def build_types_page(types: dict, enums: dict, flags: dict) -> str:
gen = render_types_content(types, enums, flags)
block = f"<!-- BEGIN GENERATED: __api_types__ -->\n{gen}\n<!-- END GENERATED -->"
return f"---\nhide:\n - toc\n---\n\n# Types & Enums\n\n{block}\n"
INDEX_PAGE_CONTENT = """\
---
hide:
- toc
---
# Référence API JsonRPC
ETM PowerSync expose l'**API JSON-RPC de nymea** en local, y compris les extensions ETM
(`Energy`, `AirConditioning`, `EvDash`).
## Connexion
| Mode | Adresse | Notes |
| --- | --- | --- |
| **TCP brut** | `<ip>:2222` | nymea parle en premier (welcome JSON `\\n`-delimited) |
| **WebSocket** | `ws://<ip>:4444` | le client envoie `JSONRPC.Hello` en premier |
## Authentification
1. Appeler [`JSONRPC.Authenticate`](metier/jsonrpc.md#jsonrpcauthenticate) avec
`username`, `password` et `deviceName` → reçoit un `token`.
2. Inclure le `token` dans chaque requête suivante.
3. Activer les push : `JSONRPC.SetNotificationStatus` `{ "enabled": true }`.
## Conventions énergie
| Grandeur | Convention |
| --- | --- |
| Production PV | **valeur négative** (convention producteur) |
| `currentPowerAcquisition` | réseau — positif = import, 0 si export |
| `currentPowerStorage` | batterie — positif = charge, négatif = décharge |
| Totaux d'énergie | en **kWh** |
| Timestamps | en **secondes** (Unix epoch) |
## Badges de permission
| Badge | Scope | Description |
| --- | --- | --- |
| <span class="badge perm-none">PUBLIC</span> | `PermissionScopeNone` | Lecture publique, aucun token requis |
| <span class="badge perm-control">CONTROL</span> | `PermissionScopeControlThings` / `ExecuteRules` | Contrôle des appareils et règles |
| <span class="badge perm-configure">CONFIGURE</span> | `PermissionScopeConfigureThings` / `ConfigureRules` | Configuration |
| <span class="badge perm-admin">ADMIN</span> | `PermissionScopeAdmin` | Administration système |
"""
# ── SUMMARY.md (literate-nav) ─────────────────────────────────────────────────
def generate_summary(all_ns: list[str], docs: Path, check: bool) -> int:
"""Génère docs/api/SUMMARY.md pour literate-nav. Retourne 1 si périmé en --check."""
metier_ns = [ns for ns in NS_METIER_ORDER if ns in set(all_ns)]
system_ns = [ns for ns in NS_SYSTEM_ORDER if ns in set(all_ns)]
unknown_ns = sorted(ns for ns in all_ns if ns not in NS_METIER and ns not in NS_SYSTEM)
if unknown_ns:
for ns in unknown_ns:
print(f" INFO : namespace '{ns}' non classé → groupe systeme par défaut",
file=sys.stderr)
lines = ["* [Référence API](index.md)"]
if metier_ns:
lines.append("* API — Métier")
for ns in metier_ns:
lines.append(f" * [{ns}](metier/{ns_slug(ns)}.md)")
if system_ns or unknown_ns:
lines.append("* API — Système")
for ns in system_ns + unknown_ns:
lines.append(f" * [{ns}](systeme/{ns_slug(ns)}.md)")
lines.append("* [Notifications](notifications.md)")
lines.append("* [Types & Enums](types.md)")
new_content = "\n".join(lines) + "\n"
path = docs / "api" / "SUMMARY.md"
if check:
if not path.exists() or path.read_text(encoding="utf-8") != new_content:
print(f" SUMMARY.md périmé : {path.relative_to(docs.parent)}")
return 1
return 0
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(new_content, encoding="utf-8")
print(f"SUMMARY.md → {path.relative_to(docs.parent)}")
return 0
# ── Création des fichiers manquants ───────────────────────────────────────────
def ensure_pages(data: dict, docs: Path, check: bool) -> int:
"""Crée les pages API inexistantes. Retourne 1 si manquant en mode --check."""
methods = data["methods"]
notifications = data["notifications"]
types_d = data["types"]
enums = data["enums"]
flags = data["flags"]
all_ns = data["_all_ns"]
rc = 0
# Pages namespace
for ns in all_ns:
folder = ns_folder(ns)
path = docs / "api" / folder / f"{ns_slug(ns)}.md"
if path.exists():
continue
content = build_namespace_page(ns, methods, notifications)
if check:
print(f" MANQUANT : {path.relative_to(docs.parent)}")
rc = 1
else:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
print(f" CRÉÉ : {path.relative_to(docs.parent)}")
# Notifications
notif_path = docs / "api" / "notifications.md"
if not notif_path.exists():
content = build_notifications_page(notifications)
if check:
print(f" MANQUANT : {notif_path.relative_to(docs.parent)}")
rc = 1
else:
notif_path.parent.mkdir(parents=True, exist_ok=True)
notif_path.write_text(content, encoding="utf-8")
print(f" CRÉÉ : {notif_path.relative_to(docs.parent)}")
# Types
types_path = docs / "api" / "types.md"
if not types_path.exists():
content = build_types_page(types_d, enums, flags)
if check:
print(f" MANQUANT : {types_path.relative_to(docs.parent)}")
rc = 1
else:
types_path.parent.mkdir(parents=True, exist_ok=True)
types_path.write_text(content, encoding="utf-8")
print(f" CRÉÉ : {types_path.relative_to(docs.parent)}")
# Index (statique — créé une fois, jamais écrasé)
index_path = docs / "api" / "index.md"
if not index_path.exists():
if check:
print(f" MANQUANT : {index_path.relative_to(docs.parent)}")
rc = 1
else:
index_path.parent.mkdir(parents=True, exist_ok=True)
index_path.write_text(INDEX_PAGE_CONTENT, encoding="utf-8")
print(f" CRÉÉ : {index_path.relative_to(docs.parent)}")
return rc
# ── Injection dans les marqueurs ──────────────────────────────────────────────
def process(docs: Path, data: dict, check: bool) -> int:
"""Injecte le contenu généré dans les marqueurs BEGIN/END des pages API."""
methods = data["methods"]
notifications = data["notifications"]
types_d = data["types"]
enums = data["enums"]
flags = data["flags"]
all_ns_set = set(data["_all_ns"])
changed: list[Path] = []
api_dir = docs / "api"
for md in sorted(api_dir.rglob("*.md")):
if md.name == "SUMMARY.md":
continue
text = md.read_text(encoding="utf-8")
if "<!-- BEGIN GENERATED:" not in text:
continue
def repl(m: re.Match, _md: Path = md) -> str:
key = m.group("key")
if key == "__api_notifications__":
gen = render_notifications_content(notifications, types_path="types.md")
elif key == "__api_types__":
gen = render_types_content(types_d, enums, flags)
elif key.startswith("__api_"):
ns = key.removeprefix("__api_").removesuffix("__")
if ns not in all_ns_set:
print(f" ! {_md.name}: namespace '{ns}' absent d'introspect.json",
file=sys.stderr)
return m.group(0)
# Détermine le chemin relatif vers types.md selon la profondeur du fichier
t = "../types.md" if _md.parent.name in ("metier", "systeme") else "types.md"
gen = render_namespace_content(ns, methods, notifications, types_path=t)
else:
print(f" ! {_md.name}: clé inconnue '{key}'", file=sys.stderr)
return m.group(0)
return f"{m.group(1)}\n{gen}\n{m.group(4)}"
new = MARKER_RE.sub(repl, text)
if new != text:
changed.append(md)
if not check:
md.write_text(new, encoding="utf-8")
if check:
if changed:
print("Doc PAS à jour :", ", ".join(str(c) for c in changed))
return 1
print("Doc à jour.")
return 0
print(f"Régénéré : {len(changed)} page(s).")
for c in changed:
print(f" - {c.relative_to(docs)}")
return 0
# ── Chargement du dump ────────────────────────────────────────────────────────
def load_introspect(src: Path) -> dict:
if not src.exists():
sys.exit(f"ERREUR : {src} introuvable")
raw = src.read_text(encoding="utf-8").strip()
if not raw:
sys.exit(f"ERREUR : {src} est vide")
try:
d = json.loads(raw)
except json.JSONDecodeError as exc:
sys.exit(f"ERREUR : {src} non parsable : {exc}")
try:
params = d["params"]
methods = params["methods"]
notifications = params["notifications"]
types_d = params["types"]
enums = params.get("enums", {})
flags = params.get("flags", {})
except KeyError as exc:
sys.exit(f"ERREUR : clé manquante dans introspect.json : {exc}")
for key, val in [("methods", methods), ("notifications", notifications),
("types", types_d)]:
if not isinstance(val, dict):
sys.exit(f"ERREUR : introspect.json params.{key} n'est pas un objet")
all_ns = sorted(
{k.split(".")[0] for k in methods} | {k.split(".")[0] for k in notifications}
)
print(f"{len(methods)} méthodes · {len(notifications)} notifications · "
f"{len(types_d)} types · {len(enums)} enums · {len(flags)} flags · "
f"{len(all_ns)} namespaces")
return {
"methods": methods,
"notifications": notifications,
"types": types_d,
"enums": enums,
"flags": flags,
"_all_ns": all_ns,
}
# ── Point d'entrée ────────────────────────────────────────────────────────────
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--src", default="introspect.json", type=Path,
help="Chemin vers introspect.json (défaut : introspect.json)")
ap.add_argument("--docs", default="docs", type=Path,
help="Dossier docs/ (défaut : docs)")
ap.add_argument("--check", action="store_true",
help="CI : exit 1 si la doc n'est pas à jour")
a = ap.parse_args()
data = load_introspect(a.src)
rc_pages = ensure_pages(data, a.docs, a.check)
rc_summary = generate_summary(data["_all_ns"], a.docs, a.check)
rc_process = process(a.docs, data, a.check)
return max(rc_pages, rc_summary, rc_process)
if __name__ == "__main__":
raise SystemExit(main())