From 3251e81aa49e56a1fb87f5e12ee71265cda1b2b8 Mon Sep 17 00:00:00 2001 From: claude-bot Date: Tue, 9 Jun 2026 18:51:37 +0300 Subject: [PATCH] feat(disk-watchdog): host-FS fill heartbeat + Telegram alert at >=85% (ORCH-063) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds src/disk_watchdog.py — a background daemon thread modelled on reconciler/job_reaper that measures host-FS fill via the mounted bind-paths (/repos, /app/data) with shutil.disk_usage and Telegram-alerts the operator at >= threshold (default 85%). The missing proactive signal: on 07.06.2026 the mva154 host disk silently hit 100% and stalled the whole self-hosting pipeline. - Pure decide_action(used_pct, threshold, prev, now, realert_s): alert on crossing up, cooldown re-alert, single recovery below threshold (unit-tested without a thread/timer; clock injected). - measure_paths: shutil.disk_usage per path, dedup by st_dev, per-path never-raise (a broken path never fails the tick). - Config flags ORCH_DISK_MONITOR_* with defensive validation (threshold 1..100, positive intervals -> default + warning). Kill-switch -> daemon does not start. - Additive disk_monitor block in GET /queue; start/stop in main.lifespan. - never-raise (per-path/per-tick/per-send); STAGE_TRANSITIONS/QG_CHECKS/check_*/ DB schema untouched, no migration (anti-spam state in-memory). Tests: tests/test_disk_watchdog.py (TC-01..TC-12, 18 cases); full suite green (1296). Docs: INFRA.md, .env.example, CHANGELOG.md (architecture/README.md + ADRs authored at architecture stage). Refs: ORCH-063 Co-Authored-By: Claude Opus 4.8 --- .env.example | 19 ++ CHANGELOG.md | 6 + docs/operations/INFRA.md | 26 +++ src/config.py | 64 +++++++ src/disk_watchdog.py | 358 ++++++++++++++++++++++++++++++++++++ src/main.py | 15 ++ tests/test_disk_watchdog.py | 329 +++++++++++++++++++++++++++++++++ 7 files changed, 817 insertions(+) create mode 100644 src/disk_watchdog.py create mode 100644 tests/test_disk_watchdog.py diff --git a/.env.example b/.env.example index aa74772..c08340f 100644 --- a/.env.example +++ b/.env.example @@ -267,6 +267,25 @@ ORCH_REAPER_MAX_RUNNING_S=3600 ORCH_REAPER_FINALIZE_GRACE_S=300 ORCH_LEASE_RECLAIM_ENABLED=true +# ORCH-063: disk-watchdog — background heartbeat that measures HOST-FS fill via the +# mounted bind-paths (/repos, /app/data) with shutil.disk_usage (NOT the container +# overlay /) and Telegram-alerts the operator at >= threshold. On 07.06.2026 the +# mva154 host disk silently hit 100% and stalled the WHOLE self-hosting pipeline; +# this is the missing proactive signal. Daemon thread modelled on reconciler/reaper +# (start/stop in main.lifespan, /queue snapshot, never-raise). Anti-spam state is +# in-memory (no DB migration); the watchdog only READS fill and SENDS Telegram — it +# never touches the disk/container or restarts prod (self-hosting safety). +# DISK_MONITOR_ENABLED -> kill-switch; false -> the daemon does not start (1:1 as before). +# DISK_MONITOR_INTERVAL_S -> heartbeat measurement period, seconds (order of minutes). +# DISK_MONITOR_THRESHOLD_PCT -> fill % that triggers the alert (Owner-fixed 85; valid 1..100). +# DISK_MONITOR_REALERT_S -> cooldown between repeat alerts while above threshold (~6h). +# DISK_MONITOR_PATHS -> CSV of monitored HOST bind-paths; empty -> /repos,/app/data. +ORCH_DISK_MONITOR_ENABLED=true +ORCH_DISK_MONITOR_INTERVAL_S=300 +ORCH_DISK_MONITOR_THRESHOLD_PCT=85 +ORCH_DISK_MONITOR_REALERT_S=21600 +ORCH_DISK_MONITOR_PATHS=/repos,/app/data + # ORCH-022: security-gate (secret-scanning + dependency audit) on the # deploy-staging -> deploy edge, run FIRST among the edge sub-gates. Deterministic # (no LLM): gitleaks (offline secret-scan, pinned Go binary in the image) + pip-audit diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d0a815..b01e1a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,12 @@ Формат: [Keep a Changelog](https://keepachangelog.com/). Записи — на смысловой PR/задачу. ## [Unreleased] +- **Disk-watchdog: мониторинг заполнения диска mva154 + Telegram-алерт при ≥85%** (ORCH-063, `feat`): новый фоновый daemon-поток `src/disk_watchdog.py` (каркас `reconciler`/`job_reaper`) — недостающий **проактивный** сигнал о заполнении хост-диска (07.06.2026 диск mva154 тихо дорос до 100% и положил весь self-hosting-конвейер всех проектов). **Аддитивно, never-raise:** `STAGE_TRANSITIONS`/`QG_CHECKS`/`check_*`/схема БД — **не тронуты**, новой миграции нет (состояние анти-спама — in-memory). + - **Замер хост-ФС (FR-2/AC-8):** каждые `disk_monitor_interval_s` (дефолт 300с) меряет заполнение **смонтированных хост-bind-путей** (`/repos`, `/app/data`) через stdlib `shutil.disk_usage` — НЕ overlay `/` контейнера, НЕ субпроцесс `df`; дедуп путей по физическому устройству (`st_dev`) → один алерт на раздел. Недоступный путь → пропуск с warning, остальные пути меряются (per-path never-raise). + - **Решение об алерте (FR-3/FR-4/AC-2..AC-4):** pure-функция `decide_action(used_pct, threshold, prev_state, now, realert_s)` (юнит-тестируема без потока/таймера, время инъецируется): алерт на пересечении порога (дефолт **85%**, граница `>=` включительно), cooldown-повтор `disk_monitor_realert_s` (~6ч, анти-спам — не на каждом тике), однократный recovery при возврате ниже порога. Алерт — `send_telegram` (notifying, не silent), best-effort. + - **Конфигурируемость + kill-switch (FR-5/AC-5):** флаги `disk_monitor_enabled`/`_interval_s`/`_threshold_pct`/`_realert_s`/`_paths` (`src/config.py`, env `ORCH_DISK_MONITOR_*`) с defensive-валидацией (порог 1..100, интервалы > 0 → невалидное к дефолту + warning). `disk_monitor_enabled=false` → демон не стартует (старт/стоп в `main.lifespan`, гард), `GET /queue` → `{"enabled": false}` — поведение 1:1 как сейчас. + - **Наблюдаемость (FR-6/AC-7):** аддитивный read-only блок `disk_monitor` в `GET /queue` (`enabled`/`threshold_pct`/`interval_s`/`realert_s`/`last_run_ts`/`paths`[`used_pct`/`free_gb`/`free_pct`/`alerting`/`last_alert_at`]); существующие ключи `/queue` не изменены; `status()` never-raise. + - **Self-hosting безопасность (NFR-6):** watchdog только читает заполнение и шлёт уведомление — не трогает диск/контейнер, не рестартит прод; безопасен для enduro-trails в общем инстансе. Откат тривиален (`ORCH_DISK_MONITOR_ENABLED=false`, миграций нет). Тесты: `tests/test_disk_watchdog.py` (TC-01..TC-12, 18 кейсов); полный регресс `tests/` зелёный (1296). Документация: `docs/architecture/README.md` (компонент + блок `/queue`), `docs/operations/INFRA.md` (что мониторится/порог/как отключить/реакция на алерт), `.env.example`. ADR: `docs/work-items/ORCH-063/06-adr/ADR-001-disk-watchdog.md`, сквозной `docs/architecture/adr/adr-0024-disk-watchdog.md`. - **Промпт-аудит 6 агентов: расхардкод даты/модели, сверка гейтов, escalation, чистка** (ORCH-092 / эпилог эпика ORCH-52, `docs`): точечная правка 6 системных промптов `.openclaw/agents/*.md` + анти-регресс-тестов, устраняющая класс дефектов промптов (хардкод даты/модели в примерах, размазанная эскалация, нереализуемая/конфликтующая инструкция rebase, мёртвая инструкция reviewer, недообогащённый tester). **Docs/prompts-only:** `src/**`, `STAGE_TRANSITIONS`, `QG_CHECKS`, состав machine-verdict ключей и схема БД — **не тронуты**; `frontmatter_validation_strict` остаётся `False`. Машинные verdict-ключи (`verdict:`/`result:`/`staging_status:`/`deploy_status:`/`security_status:` + значения APPROVED/REQUEST_CHANGES/PASS/FAIL/SUCCESS/FAILED) и канон 52d/52c/52e (5 секций, 6 полей) — байт-в-байт. - **Расхардкод даты/модели (FR-1/FR-2, AC-1/AC-2):** во всех 6 промптах копируемые примеры frontmatter несут плейсхолдеры `created_at: ` / `model_used: ` + явную врезку «не копируй буквально: подставь `date +%F` и фактическую модель из конфига». Литерал `claude-opus-4-8` остаётся лишь как справка в таблице полей (вне копируемого блока). - **Сверка имён гейтов (FR-3, AC-3):** все `check_*` в 6 промптах сверены с реестром `QG_CHECKS` — несовпадений нет (`check_tests_passed` подтверждён валидным, не «исправлен вслепую»); закреплено интеграционным тестом. diff --git a/docs/operations/INFRA.md b/docs/operations/INFRA.md index 3eb70aa..cf56ade 100644 --- a/docs/operations/INFRA.md +++ b/docs/operations/INFRA.md @@ -58,6 +58,27 @@ ADR `docs/work-items/ORCH-040/06-adr/ADR-001-run-agents-as-host-uid.md` и гл - `~/.orchestrator-ssh` → `/home/slin/.ssh` (ro, деплой по ssh; target в HOME агента, согласован с `HOME=/home/slin` из launcher — ORCH-040, ранее `/root/.ssh`) +### Disk-watchdog: мониторинг заполнения диска mva154 (ORCH-063) +07.06.2026 диск хоста mva154 тихо дорос до 100% и положил **весь конвейер всех проектов** +(один прод-инстанс `orchestrator` на общей БД/очереди). Чтобы такой инцидент сигнализировался +**заранее**, работает фоновый daemon-поток `src/disk_watchdog.py` (каркас `reconciler`/`job_reaper`): +- **Что мониторится:** заполнение **хост-разделов** по смонтированным bind-путям (`/repos` → + host `/home/slin/repos`, `/app/data` → host `./data`) через stdlib `shutil.disk_usage` — НЕ + overlay `/` контейнера (иначе замер ложно-низкий). Пути с одним физическим устройством (`st_dev`) + дедуплицируются → один алерт, не два. +- **Порог и период:** при заполнении **≥ 85%** (`ORCH_DISK_MONITOR_THRESHOLD_PCT`) шлётся + Telegram-алерт оператору; замер — раз в 300с (`ORCH_DISK_MONITOR_INTERVAL_S`). Пока диск выше + порога, повтор — не чаще раза в ~6ч (`ORCH_DISK_MONITOR_REALERT_S`, анти-спам). При возврате + ниже порога — однократное recovery-сообщение. +- **Как отключить:** `ORCH_DISK_MONITOR_ENABLED=false` (демон не стартует; `GET /queue` → + `disk_monitor.enabled=false`; поведение 1:1 как сейчас). Наблюдаемость — блок `disk_monitor` в + `GET /queue` (последний замер: `used_pct`/`free_gb`/`alerting`/`last_alert_at` по каждому пути). +- **Что делать при алерте:** watchdog **только сигнализирует** — он не трогает диск/контейнер и не + рестартит прод (self-hosting безопасность). Освобождение места — **ручная** операция оператора: + типовые «пожиратели» — старые worktree-каталоги `/home/slin/repos/_wt/*` завершённых задач, + логи, dangling Docker-образы/слои (`docker image prune`, `docker builder prune`). Авто-очистка — + вне объёма ORCH-063 (отдельная задача). + ## Переменные окружения (карта; значения — в `.env`) | Переменная | Назначение | @@ -91,6 +112,11 @@ ADR `docs/work-items/ORCH-040/06-adr/ADR-001-run-agents-as-host-uid.md` и гл | `ORCH_RECONCILE_GRACE_DEFAULT_S` | порог «застряла» по `tasks.updated_at`, сек; дефолт `600` | | `ORCH_RECONCILE_GRACE_OVERRIDES_JSON` | per-stage пороги, напр. `{"development":300}`; невалидный JSON → дефолт | | `ORCH_RECONCILE_NOTIFY_UNBLOCK` | слать Telegram при разблокировке застрявшей задачи; дефолт `true` | +| `ORCH_DISK_MONITOR_ENABLED` | kill-switch disk-watchdog (ORCH-063); дефолт `true`. `false` → демон не стартует, поведение 1:1 как сейчас | +| `ORCH_DISK_MONITOR_INTERVAL_S` | период heartbeat-замера заполнения диска, сек; дефолт `300` | +| `ORCH_DISK_MONITOR_THRESHOLD_PCT` | порог заполнения для алерта, %; дефолт `85` (валидация 1..100, иначе → дефолт) | +| `ORCH_DISK_MONITOR_REALERT_S` | cooldown повторного алерта, пока выше порога, сек; дефолт `21600` (~6 ч) | +| `ORCH_DISK_MONITOR_PATHS` | CSV отслеживаемых **хост**-bind-путей; пусто → `/repos,/app/data` | | `DEPLOY_SSH_USER` / `_HOST` / `DEPLOY_HOOK_SCRIPT` | параметры деплой-хука | **Секреты — только в `.env` / `.env.staging` на хосте, в гит НЕ коммитятся.** Канон — `.env.example`, `.env.staging.example`. diff --git a/src/config.py b/src/config.py index 39c610d..1a9377f 100644 --- a/src/config.py +++ b/src/config.py @@ -1,3 +1,5 @@ +import logging + from pydantic import field_validator from pydantic_settings import BaseSettings @@ -381,6 +383,68 @@ class Settings(BaseSettings): reaper_finalize_grace_s: int = 300 lease_reclaim_enabled: bool = True + # ORCH-063: disk-watchdog — background heartbeat that measures host-FS fill via + # the mounted bind-paths and Telegram-alerts the operator at >= threshold. On + # 07.06.2026 the mva154 host disk silently hit 100% and stalled the WHOLE + # self-hosting pipeline; the watchdog is the missing proactive signal. Modelled + # on reconciler/job_reaper (daemon thread, start/stop in main.lifespan, /queue + # snapshot, never-raise). Anti-spam state is in-memory (no DB migration). + # disk_monitor_enabled -> kill-switch; False -> the daemon does not start + # (zero regression), env ORCH_DISK_MONITOR_ENABLED. + # disk_monitor_interval_s -> heartbeat measurement period, seconds (order of + # minutes; cheap shutil.disk_usage, no df subprocess). + # disk_monitor_threshold_pct -> fill % that triggers the alert (Owner-fixed 85). + # disk_monitor_realert_s -> min interval between repeat alerts while still + # above threshold (anti-spam cooldown, ~6h). + # disk_monitor_paths -> CSV of monitored HOST bind-paths (NOT overlay /); + # empty -> the default set (/repos, /app/data). + # Defensive validation (ADR-001 D7): threshold out of 1..100 or a non-positive + # interval -> default + warning (the process never crashes on a bad env value). + disk_monitor_enabled: bool = True + disk_monitor_interval_s: int = 300 + disk_monitor_threshold_pct: int = 85 + disk_monitor_realert_s: int = 21600 + disk_monitor_paths: str = "/repos,/app/data" + + @field_validator( + "disk_monitor_interval_s", "disk_monitor_realert_s", mode="before" + ) + @classmethod + def _disk_positive_int(cls, v, info): + # Non-positive / non-numeric interval -> the field default (never crash). + _defaults = {"disk_monitor_interval_s": 300, "disk_monitor_realert_s": 21600} + fallback = _defaults.get(info.field_name, 1) + try: + if v is None or (isinstance(v, str) and v.strip() == ""): + return fallback + iv = int(v) + if iv <= 0: + logging.getLogger("orchestrator.config").warning( + "%s must be > 0, got %s; falling back to %s", + info.field_name, v, fallback, + ) + return fallback + return iv + except (TypeError, ValueError): + return fallback + + @field_validator("disk_monitor_threshold_pct", mode="before") + @classmethod + def _disk_threshold_pct(cls, v): + # Threshold must be a percentage in 1..100; otherwise -> default 85. + try: + if v is None or (isinstance(v, str) and v.strip() == ""): + return 85 + iv = int(v) + if 1 <= iv <= 100: + return iv + logging.getLogger("orchestrator.config").warning( + "disk_monitor_threshold_pct must be 1..100, got %s; using 85", v + ) + return 85 + except (TypeError, ValueError): + return 85 + # ORCH-071: merge-verify under-gate on the `deploy -> done` edge. For the # self-hosting repo the `deploy` stage runs the DETERMINISTIC self-deploy path # (Phase A/B/C), where the LLM `deployer` agent — historically the ONLY actor diff --git a/src/disk_watchdog.py b/src/disk_watchdog.py new file mode 100644 index 0000000..1ac991b --- /dev/null +++ b/src/disk_watchdog.py @@ -0,0 +1,358 @@ +"""ORCH-063: disk-watchdog — host-FS fill heartbeat + Telegram alert at >=85%. + +On 07.06.2026 the mva154 host disk silently grew to 100% and took down the WHOLE +self-hosting pipeline of every project (one prod ``orchestrator`` instance serves +all prod projects from a shared DB/queue). The system had no proactive signal — +the operator only learned of the problem once the instance was already stuck. + +This module is a background daemon thread modelled 1:1 on ``reconciler`` +(ORCH-053) and ``job_reaper`` (ORCH-065): a ``threading.Thread(daemon=True)`` + +``threading.Event`` for a clean stop, the ``start()`` / ``stop(timeout)`` / +``status()`` contract, a ``/queue`` snapshot, per-tick never-raise and a +kill-switch (``ORCH_DISK_MONITOR_ENABLED``). Each tick measures the fill of the +mounted **host** bind-paths (``/repos``, ``/app/data``) via stdlib +``shutil.disk_usage`` — NOT the container overlay ``/``, NOT a ``df`` subprocess — +deduplicates paths by physical device (``st_dev``), and through a pure decision +function from ``(used_pct, threshold, prev_state, now, realert_s)`` decides to +alert (threshold crossed up), re-alert (cooldown elapsed), send recovery (back +below threshold) or stay silent. + +Invariants (TRZ §10 / ADR-001): + * ``STAGE_TRANSITIONS`` / ``QG_CHECKS`` / ``check_*`` / the DB schema are + UNCHANGED — the watchdog is an operational daemon, not a Quality Gate (like + ``reconciler`` / ``job_reaper``). No new migration (anti-spam state is + in-memory, best-effort, may reset on restart — safe: an early signal, not an + SLA). + * never-raise on three levels: per-path (a broken path is skipped, the rest are + measured), per-tick (outer ``try/except`` in ``_run``), per-send + (``send_telegram`` wrapped). + * Self-hosting safety: the watchdog only READS fill and SENDS Telegram — it + never touches the disk/container, never restarts prod. Safe for enduro-trails + in the shared instance. + * Kill-switch ``disk_monitor_enabled=False`` -> the daemon does not start + (``main.lifespan`` guard) and ``/queue`` returns ``{"enabled": false}`` — + behaviour 1:1 as before. + +See docs/work-items/ORCH-063/06-adr/ADR-001-disk-watchdog.md and the cross-cutting +docs/architecture/adr/adr-0024-disk-watchdog.md. +""" + +import logging +import os +import shutil +import socket +import threading +import time +from dataclasses import dataclass +from datetime import datetime, timezone + +from .config import settings +from .notifications import send_telegram + +logger = logging.getLogger("orchestrator.disk_watchdog") + +_BYTES_PER_GB = 1024 ** 3 + +# Decision actions returned by ``decide_action`` (D3). +ACTION_NONE = "none" +ACTION_ALERT = "alert" +ACTION_REALERT = "realert" +ACTION_RECOVERY = "recovery" + + +@dataclass +class PathAlertState: + """In-memory anti-spam state for one logical device/path (D3). + + Best-effort: lives only in the daemon (no DB row, no migration). After a + process restart ``alerting`` resets to ``False`` -> a still-full disk re-alerts + once, which is safe (an early signal, not an SLA; TRZ §5/NFR-5). + """ + + alerting: bool = False + last_alert_at: float | None = None + + +def _resolve_host() -> str: + """Best-effort host label for alert text (never raises). + + The prod container runs ``network_mode: host`` so ``gethostname()`` resolves + to the real host (``mva154``). Any failure -> the neutral ``"host"``. + """ + try: + name = socket.gethostname() + return name or "host" + except Exception: # noqa: BLE001 - never break the tick + return "host" + + +def parse_paths(raw: str) -> list[str]: + """Parse the ``disk_monitor_paths`` CSV into a clean path list. + + Empty / blank -> the default host bind-paths (``/repos``, ``/app/data``, + TRZ §8). Never raises. + """ + default = ["/repos", "/app/data"] + try: + if not raw or not raw.strip(): + return default + paths = [p.strip() for p in raw.split(",") if p.strip()] + return paths or default + except Exception: # noqa: BLE001 - never break the tick + return default + + +def decide_action( + used_pct: float, + threshold: float, + prev: PathAlertState, + now: float, + realert_s: float, +) -> str: + """Pure alert decision (D3) — testable without a thread or a real timer. + + Returns one of ``ACTION_{NONE,ALERT,REALERT,RECOVERY}`` as a function of the + current fill, the threshold, the previous per-path state and the injected + clock: + + * not alerting & ``used_pct >= threshold`` -> ALERT (crossed up) + * alerting & still ``>= threshold`` & cooldown -> REALERT (re-alert) + * alerting & still ``>= threshold`` & in cooldown-> NONE (anti-spam) + * alerting & ``used_pct < threshold`` -> RECOVERY (crossed down) + * not alerting & ``used_pct < threshold`` -> NONE (normal) + + Threshold is inclusive: ``used_pct == threshold`` counts as exceeding + (``>=``, TC-05). + """ + above = used_pct >= threshold + if not prev.alerting: + return ACTION_ALERT if above else ACTION_NONE + # prev.alerting is True + if not above: + return ACTION_RECOVERY + last = prev.last_alert_at + if last is None or (now - last) >= realert_s: + return ACTION_REALERT + return ACTION_NONE + + +def _measure_one(path: str) -> dict | None: + """Measure one path via ``shutil.disk_usage`` (D1). Never raises. + + Returns a measurement dict, or ``None`` if the path is missing / unreadable + (``FileNotFoundError`` / ``PermissionError`` / ``OSError``) -> the caller skips + THIS path and keeps measuring the others (FR-2, AC-6: one broken path never + fails the whole tick). + """ + try: + usage = shutil.disk_usage(path) + total = int(usage.total) + used = int(usage.used) + free = int(usage.free) + used_pct = round(used / total * 100, 1) if total > 0 else 0.0 + free_pct = round(free / total * 100, 1) if total > 0 else 0.0 + return { + "path": path, + "total_bytes": total, + "used_bytes": used, + "free_bytes": free, + "used_pct": used_pct, + "free_pct": free_pct, + "free_gb": round(free / _BYTES_PER_GB, 1), + } + except Exception as e: # noqa: BLE001 - skip this path, keep the tick alive + logger.warning("disk-watchdog: cannot measure path %s, skipping: %s", path, e) + return None + + +def _dedup_key(path: str) -> object: + """Physical-device dedup key (D2): ``st_dev`` if resolvable, else the path. + + Paths sharing a device (``/repos`` and ``/app/data`` on the same host + partition) collapse to one logical partition -> one alert, not two. Failure to + ``os.stat`` -> fail-open (the path is its own key, measured independently). + """ + try: + return os.stat(path).st_dev + except Exception: # noqa: BLE001 - fail-open, treat as a distinct device + return path + + +def measure_paths(paths: list[str]) -> list[dict]: + """Measure every path, deduplicated by physical device (D1/D2). Never raises. + + For each distinct ``st_dev`` the FIRST successfully-measured path is kept and + carries a stable ``dedup_key`` (so anti-spam state is per-device). A path that + fails to measure is skipped (AC-6). + """ + out: list[dict] = [] + seen: set[object] = set() + for path in paths: + key = _dedup_key(path) + if key in seen: + continue + m = _measure_one(path) + if m is None: + continue + seen.add(key) + m["dedup_key"] = key + out.append(m) + return out + + +def format_alert_message(m: dict, threshold: float, host: str) -> str: + """Actionable Telegram alert text (FR-3/AC-2): host, path, used %, free, threshold.""" + return ( + f"\U0001f534 Диск {host}: {m['path']} заполнен на {m['used_pct']}% " + f"(порог {threshold}%). Свободно {m['free_gb']} ГБ ({m['free_pct']}%). " + f"Освободите место — риск остановки конвейера всех проектов." + ) + + +def format_recovery_message(m: dict, host: str) -> str: + """Single recovery message when fill returns below threshold (FR-4/AC-4).""" + return ( + f"\U0001f7e2 Диск {host}: {m['path']} вернулся ниже порога — " + f"{m['used_pct']}% (свободно {m['free_gb']} ГБ)." + ) + + +class DiskWatchdog: + """Background daemon measuring host-FS fill and alerting on >= threshold. + + Modelled on ``Reconciler`` / ``JobReaper``: a ``threading.Thread(daemon=True)`` + + a ``threading.Event`` for a clean stop. The only in-memory state is the + best-effort anti-spam map (``_states``), the last-measurement snapshot + (``_last``) and ``last_run_ts`` — all reset on restart, which is safe (D3). + + ``now_provider`` is injectable so the cooldown / recovery logic is testable + deterministically without a real timer (AC-3). + """ + + def __init__(self, interval_s: float | None = None, now_provider=None): + self.interval_s = ( + interval_s if interval_s is not None else settings.disk_monitor_interval_s + ) + self._now = now_provider or time.time + self._stop = threading.Event() + self._thread: threading.Thread | None = None + self._host = _resolve_host() + # Best-effort in-memory state, per dedup_key (device/path). + self._states: dict[object, PathAlertState] = {} + self._last: dict[object, dict] = {} + self.last_run_ts: float | None = None + + # -- config helpers ---------------------------------------------------- + @property + def _threshold(self) -> int: + return settings.disk_monitor_threshold_pct + + @property + def _realert_s(self) -> int: + return settings.disk_monitor_realert_s + + def _paths(self) -> list[str]: + return parse_paths(settings.disk_monitor_paths) + + # -- tick -------------------------------------------------------------- + def tick(self) -> None: + """One measurement pass over all monitored paths (never-raise per send). + + Measures every (deduplicated) path, runs the pure ``decide_action`` per + device and dispatches the resulting alert / re-alert / recovery via + ``send_telegram`` (notifying). Telegram failures are logged and swallowed + (best-effort delivery, AC-6). + """ + threshold = self._threshold + realert_s = self._realert_s + now = self._now() + for m in measure_paths(self._paths()): + key = m["dedup_key"] + prev = self._states.get(key) or PathAlertState() + action = decide_action(m["used_pct"], threshold, prev, now, realert_s) + if action in (ACTION_ALERT, ACTION_REALERT): + self._send(format_alert_message(m, threshold, self._host), notifying=True) + self._states[key] = PathAlertState(alerting=True, last_alert_at=now) + elif action == ACTION_RECOVERY: + self._send(format_recovery_message(m, self._host), notifying=True) + self._states[key] = PathAlertState(alerting=False, last_alert_at=None) + # ACTION_NONE: leave prev state untouched (anti-spam / normal). + # Record the snapshot for /queue observability. + cur = self._states.get(key) or prev + self._last[key] = { + "path": m["path"], + "used_pct": m["used_pct"], + "free_gb": m["free_gb"], + "free_pct": m["free_pct"], + "alerting": cur.alerting, + "last_alert_at": cur.last_alert_at, + } + + def _send(self, text: str, notifying: bool) -> None: + """Send a Telegram alert (notifying, not silent). Never raises (AC-6).""" + try: + send_telegram(text, disable_notification=not notifying) + except Exception as e: # noqa: BLE001 - delivery is best-effort + logger.warning("disk-watchdog: telegram send failed: %s", e) + + # -- loop / lifecycle -------------------------------------------------- + def _tick(self) -> None: + try: + self.tick() + finally: + self.last_run_ts = datetime.now(timezone.utc).timestamp() + + def _run(self) -> None: + logger.info( + "DiskWatchdog started (interval=%ss, threshold=%s%%, realert=%ss, " + "paths=%s, enabled=%s)", + self.interval_s, self._threshold, self._realert_s, + self._paths(), settings.disk_monitor_enabled, + ) + while not self._stop.is_set(): + try: + self._tick() + except Exception as e: # noqa: BLE001 - outer never-raise + logger.error("DiskWatchdog loop error: %s", e) + self._stop.wait(self.interval_s) + logger.info("DiskWatchdog stopped") + + def start(self) -> None: + """Start the daemon thread (idempotent: a live thread is a no-op). + + Honours the kill-switch: ``disk_monitor_enabled=False`` -> no-op (the + daemon never starts; ``main.lifespan`` also guards, AC-5/TC-09). + """ + if not settings.disk_monitor_enabled: + return + if self._thread and self._thread.is_alive(): + return + self._stop.clear() + self._thread = threading.Thread( + target=self._run, name="disk-watchdog", daemon=True + ) + self._thread.start() + + def stop(self, timeout: float = 5.0) -> None: + self._stop.set() + if self._thread: + self._thread.join(timeout=timeout) + + def status(self) -> dict: + """Disk-monitor snapshot for /queue observability (FR-6/AC-7). Never raises.""" + try: + return { + "enabled": settings.disk_monitor_enabled, + "threshold_pct": self._threshold, + "interval_s": self.interval_s, + "realert_s": self._realert_s, + "last_run_ts": self.last_run_ts, + "paths": list(self._last.values()), + } + except Exception as e: # noqa: BLE001 - observability must never raise + logger.warning("disk-watchdog: status() failed: %s", e) + return {"enabled": settings.disk_monitor_enabled} + + +# Module-level singleton used by the FastAPI lifespan. +disk_watchdog = DiskWatchdog() diff --git a/src/main.py b/src/main.py index ccb3734..38811c8 100644 --- a/src/main.py +++ b/src/main.py @@ -105,9 +105,19 @@ async def lifespan(app: FastAPI): from .job_reaper import reaper reaper.start() + # ORCH-063: start the disk-watchdog LAST (after the reaper). It is independent + # of the queue/DB — it only reads host-FS fill and Telegram-alerts at >= + # threshold — so the order is not critical, but we follow the daemon + # convention. Honours the kill-switch ORCH_DISK_MONITOR_ENABLED (start() is a + # no-op when disabled, so behaviour is 1:1 as before). + from .disk_watchdog import disk_watchdog + disk_watchdog.start() + try: yield finally: + # ORCH-063: stop the disk-watchdog first (reverse of startup). + disk_watchdog.stop() # Graceful shutdown order mirrors startup in reverse: stop the reaper # first, then the reconciler (it must not enqueue new work while the # worker is winding down), then the worker. Running agents keep going; @@ -151,6 +161,7 @@ async def queue(): from . import task_deps from . import serial_gate from . import labels + from .disk_watchdog import disk_watchdog return { "counts": job_status_counts(), "max_concurrency": worker.max_concurrency, @@ -169,6 +180,10 @@ async def queue(): # ORCH-089 (D7): auto-mode-by-label observability (read-only) — kill-switch, # label names, scope. Additive block. "auto_labels": labels.snapshot(), + # ORCH-063 (FR-6 / AC-7): disk-watchdog observability (read-only) — + # enabled, threshold, interval, last measurement per host-path. Additive + # block; never-raise (status() returns {"enabled": ...} minimum on error). + "disk_monitor": disk_watchdog.status(), "recent": recent_jobs(10), } diff --git a/tests/test_disk_watchdog.py b/tests/test_disk_watchdog.py new file mode 100644 index 0000000..328802b --- /dev/null +++ b/tests/test_disk_watchdog.py @@ -0,0 +1,329 @@ +"""ORCH-063: disk-watchdog tests (TC-01..TC-12). + +The watchdog never touches a real disk or Telegram: ``shutil.disk_usage`` is +monkeypatched to set ``used_pct`` deterministically, ``send_telegram`` is captured +via monkeypatch, and the cooldown/recovery clock is injected through +``now_provider`` so time-dependent decisions are tested without a real timer. +""" +import os +import tempfile + +import pytest + +# Override env before importing app modules (same convention as test_reaper.py). +os.environ.setdefault("ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_orch_disk.db")) +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.disk_watchdog as dw # noqa: E402 +from src.disk_watchdog import ( # noqa: E402 + ACTION_ALERT, + ACTION_NONE, + ACTION_REALERT, + ACTION_RECOVERY, + DiskWatchdog, + PathAlertState, + decide_action, + format_alert_message, + format_recovery_message, + measure_paths, + parse_paths, +) + + +# --------------------------------------------------------------------------- # +# Helpers +# --------------------------------------------------------------------------- # +def _usage(used_pct: float, total_gb: float = 100.0): + """Build a fake ``shutil.disk_usage`` result with the given fill %.""" + total = int(total_gb * (1024 ** 3)) + used = int(total * used_pct / 100) + free = total - used + + class _U: + pass + + u = _U() + u.total, u.used, u.free = total, used, free + return u + + +@pytest.fixture +def captured_sends(monkeypatch): + """Capture every ``send_telegram`` call made by the watchdog.""" + calls = [] + + def _fake_send(text, disable_notification=False): + calls.append({"text": text, "disable_notification": disable_notification}) + return 1 + + monkeypatch.setattr(dw, "send_telegram", _fake_send) + return calls + + +# --------------------------------------------------------------------------- # +# TC-01..TC-05: pure decision function +# --------------------------------------------------------------------------- # +def test_tc01_alert_on_crossing_up(): + """TC-01: was below, now >= threshold -> ALERT (threshold crossed).""" + prev = PathAlertState(alerting=False, last_alert_at=None) + assert decide_action(90.0, 85, prev, now=1000.0, realert_s=21600) == ACTION_ALERT + + +def test_tc02_antispam_within_cooldown(): + """TC-02: already alerting, above, < realert_s since last -> NONE (anti-spam).""" + prev = PathAlertState(alerting=True, last_alert_at=1000.0) + # 1000 s later, cooldown is 21600 -> still suppressed. + assert decide_action(90.0, 85, prev, now=2000.0, realert_s=21600) == ACTION_NONE + + +def test_tc03_realert_after_cooldown(): + """TC-03: already alerting, above, >= realert_s elapsed -> REALERT.""" + prev = PathAlertState(alerting=True, last_alert_at=1000.0) + assert decide_action(90.0, 85, prev, now=1000.0 + 21600, realert_s=21600) == ACTION_REALERT + + +def test_tc04_recovery_and_no_repeat(): + """TC-04: above->below resets state with one RECOVERY; staying below is silent.""" + prev_above = PathAlertState(alerting=True, last_alert_at=1000.0) + assert decide_action(70.0, 85, prev_above, now=5000.0, realert_s=21600) == ACTION_RECOVERY + # After recovery the state is non-alerting; staying below -> NONE (no repeat). + prev_below = PathAlertState(alerting=False, last_alert_at=None) + assert decide_action(70.0, 85, prev_below, now=6000.0, realert_s=21600) == ACTION_NONE + + +def test_tc05_threshold_boundary_inclusive(): + """TC-05: used_pct == threshold counts as exceeding; threshold-1 is silent.""" + below = PathAlertState(alerting=False, last_alert_at=None) + assert decide_action(85.0, 85, below, now=1.0, realert_s=10) == ACTION_ALERT + assert decide_action(84.0, 85, below, now=1.0, realert_s=10) == ACTION_NONE + + +# --------------------------------------------------------------------------- # +# TC-06: measurement + device dedup +# --------------------------------------------------------------------------- # +def test_tc06_measure_and_dedup_by_device(monkeypatch): + """TC-06: per-path used_pct/free computed; same-device paths dedup to one.""" + monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(50.0)) + # Both paths share st_dev=42 -> single logical partition. + monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 42})()) + + out = measure_paths(["/repos", "/app/data"]) + assert len(out) == 1 + m = out[0] + assert m["used_pct"] == 50.0 + assert m["free_bytes"] > 0 and m["free_gb"] > 0 + assert m["dedup_key"] == 42 + + # Distinct devices -> two measurements. + devs = iter([1, 2]) + monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": next(devs)})()) + out2 = measure_paths(["/repos", "/app/data"]) + assert len(out2) == 2 + + +# --------------------------------------------------------------------------- # +# TC-07: never-raise (broken path + send failure) +# --------------------------------------------------------------------------- # +def test_tc07_broken_path_does_not_kill_tick(monkeypatch): + """TC-07: a missing path is skipped; other paths are still measured.""" + def _maybe_raise(path): + if path == "/nope": + raise FileNotFoundError(path) + return _usage(50.0) + + monkeypatch.setattr(dw.shutil, "disk_usage", _maybe_raise) + devs = {"/nope": 1, "/repos": 2} + monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": devs[p]})()) + + out = measure_paths(["/nope", "/repos"]) + assert len(out) == 1 + assert out[0]["path"] == "/repos" + + +def test_tc07_send_failure_does_not_raise(monkeypatch): + """TC-07: an exception in send_telegram is swallowed; the tick completes.""" + monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(95.0)) + monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 7})()) + + def _boom(text, disable_notification=False): + raise RuntimeError("telegram down") + + monkeypatch.setattr(dw, "send_telegram", _boom) + wd = DiskWatchdog(now_provider=lambda: 1000.0) + wd.tick() # must not raise + + +# --------------------------------------------------------------------------- # +# TC-08: alert message format + notifying +# --------------------------------------------------------------------------- # +def test_tc08_alert_message_actionable_and_notifying(monkeypatch, captured_sends): + """TC-08: alert carries path/used_pct/free/threshold; sent notifying.""" + monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(87.3)) + monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 9})()) + monkeypatch.setattr(dw.settings, "disk_monitor_paths", "/repos", raising=False) + monkeypatch.setattr(dw.settings, "disk_monitor_threshold_pct", 85, raising=False) + + wd = DiskWatchdog(now_provider=lambda: 1000.0) + wd.tick() + + assert len(captured_sends) == 1 + call = captured_sends[0] + text = call["text"] + assert "/repos" in text + assert "87.3" in text + assert "85" in text # threshold + assert "ГБ" in text # free space + assert call["disable_notification"] is False # notifying, not silent + + +def test_tc08_format_helpers(): + """TC-08 (unit): format helpers contain the actionable fields.""" + m = {"path": "/repos", "used_pct": 88.0, "free_gb": 6.2, "free_pct": 12.0} + alert = format_alert_message(m, 85, "mva154") + assert "/repos" in alert and "88.0" in alert and "85" in alert and "6.2" in alert + rec = format_recovery_message(m, "mva154") + assert "/repos" in rec and "88.0" in rec + + +# --------------------------------------------------------------------------- # +# TC-09: kill-switch +# --------------------------------------------------------------------------- # +def test_tc09_killswitch_does_not_start(monkeypatch): + """TC-09: disk_monitor_enabled=False -> start() is a no-op (no thread).""" + monkeypatch.setattr(dw.settings, "disk_monitor_enabled", False, raising=False) + wd = DiskWatchdog() + wd.start() + assert wd._thread is None + + +def test_tc09_killswitch_status_block(monkeypatch): + """TC-09: status() reports enabled=False under the kill-switch.""" + monkeypatch.setattr(dw.settings, "disk_monitor_enabled", False, raising=False) + wd = DiskWatchdog() + assert wd.status()["enabled"] is False + + +# --------------------------------------------------------------------------- # +# TC-10: status() +# --------------------------------------------------------------------------- # +def test_tc10_status_shape(monkeypatch): + """TC-10: status() returns the expected keys, never-raise with no measurements.""" + monkeypatch.setattr(dw.settings, "disk_monitor_enabled", True, raising=False) + wd = DiskWatchdog() + st = wd.status() + for key in ("enabled", "threshold_pct", "interval_s", "realert_s", "last_run_ts", "paths"): + assert key in st + assert st["paths"] == [] # no tick yet + + +def test_tc10_status_reflects_last_measurement(monkeypatch): + """TC-10: after a tick status().paths carries used_pct/free/alerting/last_alert_at.""" + monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(90.0)) + monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 3})()) + monkeypatch.setattr(dw.settings, "disk_monitor_paths", "/repos", raising=False) + monkeypatch.setattr(dw, "send_telegram", lambda *a, **k: 1) + + wd = DiskWatchdog(now_provider=lambda: 1000.0) + wd.tick() + paths = wd.status()["paths"] + assert len(paths) == 1 + p = paths[0] + assert p["path"] == "/repos" + assert p["used_pct"] == 90.0 + assert p["alerting"] is True + assert p["last_alert_at"] == 1000.0 + for key in ("free_gb", "free_pct"): + assert key in p + + +# --------------------------------------------------------------------------- # +# Anti-spam / recovery end-to-end through tick() +# --------------------------------------------------------------------------- # +def test_tick_antispam_then_realert_then_recovery(monkeypatch, captured_sends): + """End-to-end: one alert on crossing, silence within cooldown, realert after + cooldown, then a single recovery — driving the daemon's in-memory state.""" + fill = {"pct": 90.0} + clock = {"t": 1000.0} + monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(fill["pct"])) + monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 5})()) + monkeypatch.setattr(dw.settings, "disk_monitor_paths", "/repos", raising=False) + monkeypatch.setattr(dw.settings, "disk_monitor_threshold_pct", 85, raising=False) + monkeypatch.setattr(dw.settings, "disk_monitor_realert_s", 100, raising=False) + + wd = DiskWatchdog(now_provider=lambda: clock["t"]) + + wd.tick() # crossing up -> ALERT + assert len(captured_sends) == 1 + + clock["t"] += 10 # within cooldown -> silent + wd.tick() + assert len(captured_sends) == 1 + + clock["t"] += 200 # cooldown elapsed -> REALERT + wd.tick() + assert len(captured_sends) == 2 + + fill["pct"] = 70.0 # drop below -> RECOVERY (one message) + clock["t"] += 10 + wd.tick() + assert len(captured_sends) == 3 + assert "ниже порога" in captured_sends[2]["text"] + + wd.tick() # stays below -> silent (no repeat recovery) + assert len(captured_sends) == 3 + + +# --------------------------------------------------------------------------- # +# parse_paths +# --------------------------------------------------------------------------- # +def test_parse_paths_default_and_csv(): + assert parse_paths("") == ["/repos", "/app/data"] + assert parse_paths(" ") == ["/repos", "/app/data"] + assert parse_paths("/a, /b ,/c") == ["/a", "/b", "/c"] + + +# --------------------------------------------------------------------------- # +# TC-11 / TC-12: GET /queue integration +# --------------------------------------------------------------------------- # +def test_tc11_queue_has_disk_monitor_block(monkeypatch): + """TC-11: GET /queue carries an additive disk_monitor block; existing keys kept.""" + import asyncio + import src.db as db + from src.db import init_db + from src import main + + dbfile = os.path.join(tempfile.gettempdir(), "test_disk_queue.db") + monkeypatch.setattr(db.settings, "db_path", dbfile, raising=False) + init_db() + + payload = asyncio.run(main.queue()) + + for key in ( + "counts", "max_concurrency", "poll_interval", "resilience", "reconcile", + "reaper", "post_deploy", "merge_verify", "task_deps", "serial_gate", + "auto_labels", "recent", + ): + assert key in payload, f"existing /queue key '{key}' must be preserved" + + assert "disk_monitor" in payload + dm = payload["disk_monitor"] + assert "enabled" in dm and "threshold_pct" in dm and "interval_s" in dm + assert "paths" in dm + + +def test_tc12_queue_disabled_block(monkeypatch): + """TC-12: with the kill-switch off, /queue reports disk_monitor.enabled=false.""" + import asyncio + import src.db as db + from src.db import init_db + from src import main + from src import disk_watchdog as dwmod + + dbfile = os.path.join(tempfile.gettempdir(), "test_disk_queue2.db") + monkeypatch.setattr(db.settings, "db_path", dbfile, raising=False) + monkeypatch.setattr(dwmod.settings, "disk_monitor_enabled", False, raising=False) + init_db() + + payload = asyncio.run(main.queue()) + assert payload["disk_monitor"]["enabled"] is False