155 lines
6.1 KiB
Python
155 lines
6.1 KiB
Python
import logging
|
|
import httpx
|
|
from urllib.parse import unquote
|
|
from xml.etree import ElementTree as ET
|
|
from config import NEXTCLOUD_URL, NEXTCLOUD_USER, NEXTCLOUD_PASSWORD, NEXTCLOUD_DECK_URL
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
|
|
|
|
|
|
async def _ensure_dirs(client: httpx.AsyncClient, dir_path: str) -> None:
|
|
parts = dir_path.strip("/").split("/")
|
|
for i in range(1, len(parts) + 1):
|
|
current = "/".join(parts[:i])
|
|
r = await client.request("MKCOL", f"{NEXTCLOUD_URL}/{current}")
|
|
# 201 = créé, 405 = existe déjà — les deux sont OK
|
|
if r.status_code not in (200, 201, 405):
|
|
log.warning("MKCOL %s → HTTP %s", current, r.status_code)
|
|
|
|
|
|
async def upload_photo(file_bytes: bytes, remote_path: str) -> bool:
|
|
dir_path = remote_path.rsplit("/", 1)[0]
|
|
try:
|
|
async with httpx.AsyncClient(auth=_auth, timeout=60) as client:
|
|
await _ensure_dirs(client, dir_path)
|
|
r = await client.put(f"{NEXTCLOUD_URL}/{remote_path}", content=file_bytes)
|
|
ok = r.status_code in (200, 201, 204)
|
|
if not ok:
|
|
log.error("PUT %s → HTTP %s", remote_path, r.status_code)
|
|
return ok
|
|
except Exception as exc:
|
|
log.error("upload_photo %s : %s", remote_path, exc)
|
|
return False
|
|
|
|
|
|
async def get_chantiers_actifs(max_results: int = 10) -> list[str]:
|
|
url = f"{NEXTCLOUD_URL}/Chantiers/"
|
|
body = b'<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:displayname/><d:resourcetype/></d:prop></d:propfind>'
|
|
try:
|
|
async with httpx.AsyncClient(auth=_auth, timeout=30) as client:
|
|
r = await client.request("PROPFIND", url, headers={"Depth": "1"}, content=body)
|
|
root = ET.fromstring(r.text)
|
|
ns = {"d": "DAV:"}
|
|
dossiers = []
|
|
for resp in root.findall("d:response", ns):
|
|
href = resp.find("d:href", ns).text
|
|
name = unquote(href.rstrip("/").split("/")[-1])
|
|
if name.lower() in ("chantiers", "") or name.startswith("."):
|
|
continue
|
|
resourcetype = resp.find(".//d:resourcetype", ns)
|
|
if resourcetype is not None and resourcetype.find("d:collection", ns) is not None:
|
|
dossiers.append(name)
|
|
dossiers.sort(reverse=True)
|
|
return dossiers[:max_results]
|
|
except Exception as exc:
|
|
log.error("get_chantiers_actifs : %s", exc)
|
|
return []
|
|
|
|
|
|
async def upload_text(content: str, remote_path: str) -> bool:
|
|
"""Upload un fichier texte (markdown, etc.) sur Nextcloud."""
|
|
dir_path = remote_path.rsplit("/", 1)[0]
|
|
try:
|
|
async with httpx.AsyncClient(auth=_auth, timeout=30) as client:
|
|
await _ensure_dirs(client, dir_path)
|
|
r = await client.put(
|
|
f"{NEXTCLOUD_URL}/{remote_path}",
|
|
content=content.encode("utf-8"),
|
|
headers={"Content-Type": "text/markdown; charset=utf-8"},
|
|
)
|
|
ok = r.status_code in (200, 201, 204)
|
|
if not ok:
|
|
log.error("PUT text %s → HTTP %s", remote_path, r.status_code)
|
|
return ok
|
|
except Exception as exc:
|
|
log.error("upload_text %s : %s", remote_path, exc)
|
|
return False
|
|
|
|
|
|
async def list_pdfs(folder: str = "Doc") -> list[str]:
|
|
"""Retourne les chemins relatifs des PDFs dans un dossier Nextcloud (récursif)."""
|
|
body = b'<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:displayname/><d:resourcetype/></d:prop></d:propfind>'
|
|
|
|
async def _scan(client: httpx.AsyncClient, path: str, prefix: str) -> list[str]:
|
|
r = await client.request("PROPFIND", f"{NEXTCLOUD_URL}/{path}/",
|
|
headers={"Depth": "1"}, content=body)
|
|
if r.status_code == 404:
|
|
log.error("Dossier '%s' introuvable sur Nextcloud", path)
|
|
return []
|
|
root = ET.fromstring(r.text)
|
|
ns = {"d": "DAV:"}
|
|
files, subdirs = [], []
|
|
for resp in root.findall("d:response", ns):
|
|
href = resp.find("d:href", ns).text
|
|
name = unquote(href.rstrip("/").split("/")[-1])
|
|
if not name or name == path.split("/")[-1]:
|
|
continue
|
|
rt = resp.find(".//d:resourcetype", ns)
|
|
is_dir = rt is not None and rt.find("d:collection", ns) is not None
|
|
rel = f"{prefix}/{name}" if prefix else name
|
|
if is_dir:
|
|
subdirs.append((f"{path}/{name}", rel))
|
|
elif name.lower().endswith(".pdf"):
|
|
files.append(rel)
|
|
for subpath, subprefix in subdirs:
|
|
files.extend(await _scan(client, subpath, subprefix))
|
|
return files
|
|
|
|
try:
|
|
async with httpx.AsyncClient(auth=_auth, timeout=60) as client:
|
|
files = await _scan(client, folder, "")
|
|
return sorted(files)
|
|
except Exception as exc:
|
|
log.error("list_pdfs : %s", exc)
|
|
return []
|
|
|
|
|
|
async def download_file(folder: str, filename: str) -> bytes | None:
|
|
"""Télécharge un fichier depuis Nextcloud."""
|
|
from urllib.parse import quote
|
|
url = f"{NEXTCLOUD_URL}/{folder}/{quote(filename, safe='/')}"
|
|
try:
|
|
async with httpx.AsyncClient(auth=_auth, timeout=120) as client:
|
|
r = await client.get(url)
|
|
if r.status_code == 200:
|
|
return r.content
|
|
log.error("download_file %s → HTTP %s", filename, r.status_code)
|
|
except Exception as exc:
|
|
log.error("download_file %s : %s", filename, exc)
|
|
return None
|
|
|
|
|
|
async def create_deck_card(board_id: int, stack_id: int, title: str, description: str) -> bool:
|
|
if not board_id or not stack_id:
|
|
return False
|
|
url = f"{NEXTCLOUD_DECK_URL}/boards/{board_id}/stacks/{stack_id}/cards"
|
|
payload = {
|
|
"title": title[:255],
|
|
"description": description,
|
|
"type": "plain",
|
|
"order": 999,
|
|
}
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
auth=_auth,
|
|
timeout=30,
|
|
headers={"OCS-APIREQUEST": "true"},
|
|
) as client:
|
|
r = await client.post(url, json=payload)
|
|
return r.status_code in (200, 201)
|
|
except Exception as exc:
|
|
log.error("create_deck_card : %s", exc)
|
|
return False
|