"""ORCH-112 (ADR-001 / adr-0044): deploy-base checkout-hygiene leaf — pure policy. Leaf module mirroring ``src/serial_gate.py`` / ``src/cancel.py`` / ``src/self_deploy.py``: pure, unit-testable, never-raise functions over ``config`` + the deploy-state sentinels. Module-level imports are limited to ``config`` (and stdlib); ``self_deploy``, ``qg.checks.is_self_hosting_repo`` and ``notifications`` are imported LAZILY so this stays a leaf and an import cycle can never form. What it answers / does (the MECHANISM — git fetch/reset/clean — lives in the host deploy hook ``scripts/orchestrator-deploy-hook.sh`` block "2a. Resilient pull"; this leaf only decides conditionality, builds the env gate, reads the report and alerts): * ``applies(repo)`` — is resilient-pull hygiene REAL here? * ``hook_env(repo, work_item_id)`` — the ``CHECKOUT_HYGIENE=1 HYGIENE_REPORT=…`` env prefix injected into the detached deploy-hook command ("" when not applies). * ``read_report(repo, work_item_id)`` — read the ``hygiene`` sentinel the hook wrote. * ``alert_dirty(repo, work_item_id, report)``— best-effort Telegram + structured log. * ``snapshot()`` — read-only block for ``GET /queue``. never-raise contract (self-hosting safety): every public function degrades conservatively. ``applies`` -> False on error (hygiene inert == kill-switch off, the safe default that keeps the bare ``git pull`` 1:1 as before ORCH-112). ``hook_env`` -> "" on error (no env -> the hook's ``${CHECKOUT_HYGIENE:-0}`` guard stays 0). The report reader / alert swallow every error so a deploy is NEVER crashed by an observability hiccup (D5 / AC-8). """ from __future__ import annotations import logging import os import re import shlex from .config import settings logger = logging.getLogger("orchestrator.checkout_hygiene") # Sentinel filename the hook writes (HYGIENE_REPORT points at it) and read_report # reads back. Lives in the SAME deploy-state dir as self_deploy's ``result`` (shared # mount visible to both host and container). REPORT_NAME = "hygiene" # Repo tokens in the CSV scope must match this (mirrors serial_gate._REPO_TOKEN). The # CSV is operator config, not user input, but the guard is mandatory; an invalid token # is dropped. _REPO_TOKEN = re.compile(r"^[A-Za-z0-9._-]+$") # --------------------------------------------------------------------------- # Conditionality (mirrors self_deploy_applies / serial_gate_applies) # --------------------------------------------------------------------------- def _scope_repos() -> set[str]: """Sanitised set of in-scope repo tokens from ``checkout_hygiene_repos`` (CSV). Empty/blank CSV -> empty set, meaning "self-hosting only" (resolved in ``applies``). Invalid tokens (regex miss) are dropped. Never raises. """ try: raw = (settings.checkout_hygiene_repos or "").strip() except Exception: # noqa: BLE001 return set() if not raw: return set() out: set[str] = set() for tok in raw.split(","): t = tok.strip() if t and _REPO_TOKEN.match(t): out.add(t) elif t: logger.warning("checkout_hygiene: dropping invalid repo token %r from CSV", t) return out def applies(repo: str) -> bool: """Whether resilient-pull hygiene is REAL for this repo (D3 / AC-6). * ``checkout_hygiene_enabled=False`` -> always False (kill-switch; the hook sees no CHECKOUT_HYGIENE env -> bare ``git pull origin main`` 1:1 as before ORCH-112). * ``checkout_hygiene_repos`` (CSV) non-empty -> real only for listed repos. * empty CSV -> real ONLY for the self-hosting repo (``orchestrator``), mirroring ``self_deploy_repos`` — this is a self-hosting prod-deploy-path feature, so it must NOT touch enduro / other repos' synchronous deploy. Local-only (no network), meant to be checked FIRST. Never raises -> False on error. """ try: if not getattr(settings, "checkout_hygiene_enabled", False): return False scope = _scope_repos() if scope: return (repo or "").strip() in scope # Lazy import keeps this module a leaf (no qg import at module load). from .qg.checks import is_self_hosting_repo return is_self_hosting_repo(repo) except Exception as e: # noqa: BLE001 - never-raise logger.warning("checkout_hygiene.applies error for %s: %s", repo, e) return False # --------------------------------------------------------------------------- # Env gate injected into the detached deploy-hook command (Phase B wiring) # --------------------------------------------------------------------------- def report_path_host(repo: str, work_item_id: str | None) -> str: """HOST view of the ``hygiene`` sentinel path (the wrapper writes it there).""" from . import self_deploy return os.path.join(self_deploy.host_state_dir(repo, work_item_id), REPORT_NAME) def hook_env(repo: str, work_item_id: str | None) -> str: """Build the env-assignment prefix injected into the detached deploy-hook command. Returns ``CHECKOUT_HYGIENE=1 HYGIENE_REPORT=`` (shlex-quoted) ONLY when ``applies(repo)`` is True; otherwise ``""`` so the hook's ``${CHECKOUT_HYGIENE:-0}`` guard stays 0 and the bare ``git pull`` runs (1:1 before ORCH-112). The ``HYGIENE_REPORT`` path is the HOST view of the deploy-state dir (the host wrapper writes the sentinel there; the container reads it back via ``read_report``). Never raises -> "" (no hygiene env, the safe default). """ try: if not applies(repo): return "" report = report_path_host(repo, work_item_id) return f"CHECKOUT_HYGIENE=1 HYGIENE_REPORT={shlex.quote(report)}" except Exception as e: # noqa: BLE001 - never-raise -> no hygiene env logger.warning("checkout_hygiene.hook_env error for %s/%s: %s", repo, work_item_id, e) return "" # --------------------------------------------------------------------------- # Report sentinel reader (Phase C observability) # --------------------------------------------------------------------------- def read_report(repo: str, work_item_id: str | None) -> dict | None: """Read the ``hygiene`` sentinel the hook wrote (container view of deploy-state). The hook writes the sentinel ONLY when it detected a dirty base, body:: dirty=1 Returns ``{"dirty": True, "paths": [...]}`` when the sentinel exists and reports a dirty base; ``None`` when there is no sentinel (clean base / hygiene disabled / not written yet). Never raises -> None on error. """ try: from . import self_deploy p = os.path.join(self_deploy.container_state_dir(repo, work_item_id), REPORT_NAME) with open(p, "r", encoding="utf-8") as f: raw = f.read() except FileNotFoundError: return None except Exception as e: # noqa: BLE001 - never-raise logger.warning("checkout_hygiene.read_report error for %s/%s: %s", repo, work_item_id, e) return None lines = raw.splitlines() if not any(ln.strip() == "dirty=1" for ln in lines): return None paths = [ ln.strip() for ln in lines if ln.strip() and not ln.strip().startswith("dirty=") ] return {"dirty": True, "paths": paths} # --------------------------------------------------------------------------- # Best-effort Telegram alert (Phase C observability) — D5 / AC-8 # --------------------------------------------------------------------------- def alert_dirty(repo: str, work_item_id: str | None, report: dict | None) -> bool: """Structured log + best-effort Telegram that the deploy-base was dirty and was converged to ``origin/main`` before the pull (D5 / AC-8). Returns True iff an alert was sent. Its failure NEVER crashes the finalizer (never-raise) — observability is best-effort and must not block the conveyor (AC-8 FAIL is "alert crashes deploy"). """ try: if not report or not report.get("dirty"): return False paths = report.get("paths") or [] n = len(paths) logger.warning( "checkout_hygiene: dirty deploy-base converged to origin/main for %s/%s " "(%d path(s)): %s", repo, work_item_id, n, paths[:20], ) from .notifications import link_for, send_telegram send_telegram( f"\U0001f9f9 {link_for(work_item_id)}: грязная deploy-база сведена к " f"origin/main перед прод-деплоем ({n} путь(ей) сброшено)." ) return True except Exception as e: # noqa: BLE001 - never-raise: alert is best-effort logger.warning("checkout_hygiene.alert_dirty error for %s/%s: %s", repo, work_item_id, e) return False # --------------------------------------------------------------------------- # Observability snapshot for GET /queue (D3, optional) # --------------------------------------------------------------------------- def snapshot() -> dict: """Read-only checkout-hygiene summary for GET /queue. Additive block; existing /queue keys are untouched. never-raise -> a minimal dict with the flags on error. """ try: enabled = bool(getattr(settings, "checkout_hygiene_enabled", False)) except Exception: # noqa: BLE001 enabled = False try: repos_cfg = getattr(settings, "checkout_hygiene_repos", "") or "" except Exception: # noqa: BLE001 repos_cfg = "" return { "enabled": enabled, "repos": repos_cfg, "scope": "csv" if (repos_cfg or "").strip() else "self-hosting-only", }