"""ORCH-063: disk-watchdog — host-FS fill heartbeat + Telegram alert at >=85%. On 07.06.2026 the mva154 host disk silently grew to 100% and took down the WHOLE self-hosting pipeline of every project (one prod ``orchestrator`` instance serves all prod projects from a shared DB/queue). The system had no proactive signal — the operator only learned of the problem once the instance was already stuck. This module is a background daemon thread modelled 1:1 on ``reconciler`` (ORCH-053) and ``job_reaper`` (ORCH-065): a ``threading.Thread(daemon=True)`` + ``threading.Event`` for a clean stop, the ``start()`` / ``stop(timeout)`` / ``status()`` contract, a ``/queue`` snapshot, per-tick never-raise and a kill-switch (``ORCH_DISK_MONITOR_ENABLED``). Each tick measures the fill of the mounted **host** bind-paths (``/repos``, ``/app/data``) via stdlib ``shutil.disk_usage`` — NOT the container overlay ``/``, NOT a ``df`` subprocess — deduplicates paths by physical device (``st_dev``), and through a pure decision function from ``(used_pct, threshold, prev_state, now, realert_s)`` decides to alert (threshold crossed up), re-alert (cooldown elapsed), send recovery (back below threshold) or stay silent. Invariants (TRZ §10 / ADR-001): * ``STAGE_TRANSITIONS`` / ``QG_CHECKS`` / ``check_*`` / the DB schema are UNCHANGED — the watchdog is an operational daemon, not a Quality Gate (like ``reconciler`` / ``job_reaper``). No new migration (anti-spam state is in-memory, best-effort, may reset on restart — safe: an early signal, not an SLA). * never-raise on three levels: per-path (a broken path is skipped, the rest are measured), per-tick (outer ``try/except`` in ``_run``), per-send (``send_telegram`` wrapped). * Self-hosting safety: the watchdog only READS fill and SENDS Telegram — it never touches the disk/container, never restarts prod. Safe for enduro-trails in the shared instance. * Kill-switch ``disk_monitor_enabled=False`` -> the daemon does not start (``main.lifespan`` guard) and ``/queue`` returns ``{"enabled": false}`` — behaviour 1:1 as before. See docs/work-items/ORCH-063/06-adr/ADR-001-disk-watchdog.md and the cross-cutting docs/architecture/adr/adr-0024-disk-watchdog.md. """ import logging import os import shutil import socket import threading import time from dataclasses import dataclass from datetime import datetime, timezone from .config import settings from .notifications import send_telegram logger = logging.getLogger("orchestrator.disk_watchdog") _BYTES_PER_GB = 1024 ** 3 # Decision actions returned by ``decide_action`` (D3). ACTION_NONE = "none" ACTION_ALERT = "alert" ACTION_REALERT = "realert" ACTION_RECOVERY = "recovery" @dataclass class PathAlertState: """In-memory anti-spam state for one logical device/path (D3). Best-effort: lives only in the daemon (no DB row, no migration). After a process restart ``alerting`` resets to ``False`` -> a still-full disk re-alerts once, which is safe (an early signal, not an SLA; TRZ §5/NFR-5). """ alerting: bool = False last_alert_at: float | None = None def _resolve_host() -> str: """Best-effort host label for alert text (never raises). The prod container runs ``network_mode: host`` so ``gethostname()`` resolves to the real host (``mva154``). Any failure -> the neutral ``"host"``. """ try: name = socket.gethostname() return name or "host" except Exception: # noqa: BLE001 - never break the tick return "host" def parse_paths(raw: str) -> list[str]: """Parse the ``disk_monitor_paths`` CSV into a clean path list. Empty / blank -> the default host bind-paths (``/repos``, ``/app/data``, TRZ §8). Never raises. """ default = ["/repos", "/app/data"] try: if not raw or not raw.strip(): return default paths = [p.strip() for p in raw.split(",") if p.strip()] return paths or default except Exception: # noqa: BLE001 - never break the tick return default def decide_action( used_pct: float, threshold: float, prev: PathAlertState, now: float, realert_s: float, ) -> str: """Pure alert decision (D3) — testable without a thread or a real timer. Returns one of ``ACTION_{NONE,ALERT,REALERT,RECOVERY}`` as a function of the current fill, the threshold, the previous per-path state and the injected clock: * not alerting & ``used_pct >= threshold`` -> ALERT (crossed up) * alerting & still ``>= threshold`` & cooldown -> REALERT (re-alert) * alerting & still ``>= threshold`` & in cooldown-> NONE (anti-spam) * alerting & ``used_pct < threshold`` -> RECOVERY (crossed down) * not alerting & ``used_pct < threshold`` -> NONE (normal) Threshold is inclusive: ``used_pct == threshold`` counts as exceeding (``>=``, TC-05). """ above = used_pct >= threshold if not prev.alerting: return ACTION_ALERT if above else ACTION_NONE # prev.alerting is True if not above: return ACTION_RECOVERY last = prev.last_alert_at if last is None or (now - last) >= realert_s: return ACTION_REALERT return ACTION_NONE def _measure_one(path: str) -> dict | None: """Measure one path via ``shutil.disk_usage`` (D1). Never raises. Returns a measurement dict, or ``None`` if the path is missing / unreadable (``FileNotFoundError`` / ``PermissionError`` / ``OSError``) -> the caller skips THIS path and keeps measuring the others (FR-2, AC-6: one broken path never fails the whole tick). """ try: usage = shutil.disk_usage(path) total = int(usage.total) used = int(usage.used) free = int(usage.free) used_pct = round(used / total * 100, 1) if total > 0 else 0.0 free_pct = round(free / total * 100, 1) if total > 0 else 0.0 return { "path": path, "total_bytes": total, "used_bytes": used, "free_bytes": free, "used_pct": used_pct, "free_pct": free_pct, "free_gb": round(free / _BYTES_PER_GB, 1), } except Exception as e: # noqa: BLE001 - skip this path, keep the tick alive logger.warning("disk-watchdog: cannot measure path %s, skipping: %s", path, e) return None def _dedup_key(path: str) -> object: """Physical-device dedup key (D2): ``st_dev`` if resolvable, else the path. Paths sharing a device (``/repos`` and ``/app/data`` on the same host partition) collapse to one logical partition -> one alert, not two. Failure to ``os.stat`` -> fail-open (the path is its own key, measured independently). """ try: return os.stat(path).st_dev except Exception: # noqa: BLE001 - fail-open, treat as a distinct device return path def measure_paths(paths: list[str]) -> list[dict]: """Measure every path, deduplicated by physical device (D1/D2). Never raises. For each distinct ``st_dev`` the FIRST successfully-measured path is kept and carries a stable ``dedup_key`` (so anti-spam state is per-device). A path that fails to measure is skipped (AC-6). """ out: list[dict] = [] seen: set[object] = set() for path in paths: key = _dedup_key(path) if key in seen: continue m = _measure_one(path) if m is None: continue seen.add(key) m["dedup_key"] = key out.append(m) return out def format_alert_message(m: dict, threshold: float, host: str) -> str: """Actionable Telegram alert text (FR-3/AC-2): host, path, used %, free, threshold.""" return ( f"\U0001f534 Диск {host}: {m['path']} заполнен на {m['used_pct']}% " f"(порог {threshold}%). Свободно {m['free_gb']} ГБ ({m['free_pct']}%). " f"Освободите место — риск остановки конвейера всех проектов." ) def format_recovery_message(m: dict, host: str) -> str: """Single recovery message when fill returns below threshold (FR-4/AC-4).""" return ( f"\U0001f7e2 Диск {host}: {m['path']} вернулся ниже порога — " f"{m['used_pct']}% (свободно {m['free_gb']} ГБ)." ) class DiskWatchdog: """Background daemon measuring host-FS fill and alerting on >= threshold. Modelled on ``Reconciler`` / ``JobReaper``: a ``threading.Thread(daemon=True)`` + a ``threading.Event`` for a clean stop. The only in-memory state is the best-effort anti-spam map (``_states``), the last-measurement snapshot (``_last``) and ``last_run_ts`` — all reset on restart, which is safe (D3). ``now_provider`` is injectable so the cooldown / recovery logic is testable deterministically without a real timer (AC-3). """ def __init__(self, interval_s: float | None = None, now_provider=None): self.interval_s = ( interval_s if interval_s is not None else settings.disk_monitor_interval_s ) self._now = now_provider or time.time self._stop = threading.Event() self._thread: threading.Thread | None = None self._host = _resolve_host() # Best-effort in-memory state, per dedup_key (device/path). self._states: dict[object, PathAlertState] = {} self._last: dict[object, dict] = {} self.last_run_ts: float | None = None # -- config helpers ---------------------------------------------------- @property def _threshold(self) -> int: return settings.disk_monitor_threshold_pct @property def _realert_s(self) -> int: return settings.disk_monitor_realert_s def _paths(self) -> list[str]: return parse_paths(settings.disk_monitor_paths) # -- tick -------------------------------------------------------------- def tick(self) -> None: """One measurement pass over all monitored paths (never-raise per send). Measures every (deduplicated) path, runs the pure ``decide_action`` per device and dispatches the resulting alert / re-alert / recovery via ``send_telegram`` (notifying). Telegram failures are logged and swallowed (best-effort delivery, AC-6). """ threshold = self._threshold realert_s = self._realert_s now = self._now() for m in measure_paths(self._paths()): key = m["dedup_key"] prev = self._states.get(key) or PathAlertState() action = decide_action(m["used_pct"], threshold, prev, now, realert_s) if action in (ACTION_ALERT, ACTION_REALERT): self._send(format_alert_message(m, threshold, self._host), notifying=True) self._states[key] = PathAlertState(alerting=True, last_alert_at=now) elif action == ACTION_RECOVERY: self._send(format_recovery_message(m, self._host), notifying=True) self._states[key] = PathAlertState(alerting=False, last_alert_at=None) # ACTION_NONE: leave prev state untouched (anti-spam / normal). # Record the snapshot for /queue observability. cur = self._states.get(key) or prev self._last[key] = { "path": m["path"], "used_pct": m["used_pct"], "free_gb": m["free_gb"], "free_pct": m["free_pct"], "alerting": cur.alerting, "last_alert_at": cur.last_alert_at, } def _send(self, text: str, notifying: bool) -> None: """Send a Telegram alert (notifying, not silent). Never raises (AC-6).""" try: send_telegram(text, disable_notification=not notifying) except Exception as e: # noqa: BLE001 - delivery is best-effort logger.warning("disk-watchdog: telegram send failed: %s", e) # -- loop / lifecycle -------------------------------------------------- def _tick(self) -> None: try: self.tick() finally: self.last_run_ts = datetime.now(timezone.utc).timestamp() def _run(self) -> None: logger.info( "DiskWatchdog started (interval=%ss, threshold=%s%%, realert=%ss, " "paths=%s, enabled=%s)", self.interval_s, self._threshold, self._realert_s, self._paths(), settings.disk_monitor_enabled, ) while not self._stop.is_set(): try: self._tick() except Exception as e: # noqa: BLE001 - outer never-raise logger.error("DiskWatchdog loop error: %s", e) self._stop.wait(self.interval_s) logger.info("DiskWatchdog stopped") def start(self) -> None: """Start the daemon thread (idempotent: a live thread is a no-op). Honours the kill-switch: ``disk_monitor_enabled=False`` -> no-op (the daemon never starts; ``main.lifespan`` also guards, AC-5/TC-09). """ if not settings.disk_monitor_enabled: return if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread( target=self._run, name="disk-watchdog", daemon=True ) self._thread.start() def stop(self, timeout: float = 5.0) -> None: self._stop.set() if self._thread: self._thread.join(timeout=timeout) def status(self) -> dict: """Disk-monitor snapshot for /queue observability (FR-6/AC-7). Never raises.""" try: return { "enabled": settings.disk_monitor_enabled, "threshold_pct": self._threshold, "interval_s": self.interval_s, "realert_s": self._realert_s, "last_run_ts": self.last_run_ts, "paths": list(self._last.values()), } except Exception as e: # noqa: BLE001 - observability must never raise logger.warning("disk-watchdog: status() failed: %s", e) return {"enabled": settings.disk_monitor_enabled} # Module-level singleton used by the FastAPI lifespan. disk_watchdog = DiskWatchdog()