From 6c8319117a1812172603611c39c4f35f7adc8f07 Mon Sep 17 00:00:00 2001 From: claude-bot Date: Mon, 15 Jun 2026 01:46:09 +0300 Subject: [PATCH] feat(watchdog): proc_blocking alert for orphaned long-lived test processes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Close the observability gap between agent_hung (only tracked jobs by jobs.pid) and orphaned pytest subprocesses the orchestrator launches itself (merge_gate.retest_branch / coverage_gate.measure_coverage). On a timeout-kill of the agent (-9, ORCH-109) the grand-child pytest reparents onto tini and keeps running for days, starving CPU and failing merge-gate re-test — with no alert. Strictly inside the observer (watchdog/** + the watchdog compose service): - watchdog/collectors/proc.py: stdlib-only /proc scan (under pid: host), read-only, never-raise -> []; pure parsers split from I/O (tested on a fake /proc tree). Never reads /proc//environ. - watchdog/signals.py: pure proc_signals builder, per-entity ("proc_blocking", pid), active iff age_s > proc_age_s; actionable RU detail. - watchdog/core.py: opt-in tick block (gated on proc_enabled -> zero overhead / byte-for-byte when off) + RECOVERY synthesis for a vanished process through the existing decide()/AlertState (no new anti-spam logic). - watchdog/config.py: WATCHDOG_PROC_{ENABLED(false),AGE_MIN(60),PATTERNS(pytest), COOLDOWN_S(1800)}; default threshold > max(merge_retest_timeout_s=600, coverage_run_timeout_s=900) so a legit in-flight run never crosses it. - docker-compose.yml: pid: host on orchestrator-watchdog ONLY (read-only privilege). Anti-false-positive and no overlap with agent_hung are by construction (cmdline scope + age threshold), not fragile cross-namespace PID matching. Canon synced: WATCHDOG_PROC_* in .env.watchdog.example <-> .env.example block; documented in LITE_SETUP.md and docs/architecture/README.md (architect). src/**, /metrics, schema_version, STAGE_TRANSITIONS, QG_CHECKS, check_*, machine-verdict and the DB schema are untouched; deploy rebuilds only the sidecar, prod orchestrator is not restarted (NFR-3). Tests: tests/watchdog/test_proc_blocking_signal.py (TC-01..TC-06), test_proc_collector.py (/proc parsing), test_tick_proc_blocking_integration.py (TC-07), plus pid: host and proc-config assertions. Full pytest tests/ green (1930). Refs: ORCH-111 Co-Authored-By: Claude Opus 4.8 --- .env.example | 10 + .env.watchdog.example | 10 + .task-dev.md | 4 +- CHANGELOG.md | 6 + docker-compose.yml | 6 + docs/deployment/LITE_SETUP.md | 12 + tests/watchdog/test_compose_service.py | 11 + tests/watchdog/test_config_killswitch.py | 32 +++ tests/watchdog/test_proc_blocking_signal.py | 256 ++++++++++++++++++ tests/watchdog/test_proc_collector.py | 148 ++++++++++ .../test_tick_proc_blocking_integration.py | 128 +++++++++ watchdog/collectors/proc.py | 205 ++++++++++++++ watchdog/config.py | 18 ++ watchdog/core.py | 56 ++++ watchdog/signals.py | 48 ++++ 15 files changed, 948 insertions(+), 2 deletions(-) create mode 100644 tests/watchdog/test_proc_blocking_signal.py create mode 100644 tests/watchdog/test_proc_collector.py create mode 100644 tests/watchdog/test_tick_proc_blocking_integration.py create mode 100644 watchdog/collectors/proc.py diff --git a/.env.example b/.env.example index d5d4305..4920ee6 100644 --- a/.env.example +++ b/.env.example @@ -569,6 +569,12 @@ ORCH_QG0_TITLE_MAX=200 # CONTAINERS -> CSV of container names to watch (status != running/healthy). # DOCKER_SOCK -> path to the read-only docker.sock inside the container. # DEPS -> CSV of name=url dependency pings (empty -> no pings). +# PROC_ENABLED -> ORCH-111 opt-in: alert on a long-lived test process (pytest) +# orphaned on the host (needs `pid: host`, default OFF). +# PROC_AGE_MIN -> minutes a test process may live before alerting; MUST exceed +# max(merge_retest_timeout_s, coverage_run_timeout_s)/60. +# PROC_PATTERNS -> CSV of cmdline substrings that mark the test-class (pytest). +# PROC_COOLDOWN_S-> per-signal re-alert throttle for proc_blocking. # TG_BOT_TOKEN / TG_CHAT_ID -> the sidecar's OWN Telegram bot/chat (independent # of the orchestrator's; absent -> logs, does not send). WATCHDOG_ENABLED=true @@ -588,5 +594,9 @@ WATCHDOG_QUEUE_DEPTH=20 WATCHDOG_CONTAINERS=orchestrator WATCHDOG_DOCKER_SOCK=/var/run/docker.sock WATCHDOG_DEPS= +WATCHDOG_PROC_ENABLED=false +WATCHDOG_PROC_AGE_MIN=60 +WATCHDOG_PROC_PATTERNS=pytest +WATCHDOG_PROC_COOLDOWN_S=1800 WATCHDOG_TG_BOT_TOKEN= WATCHDOG_TG_CHAT_ID= diff --git a/.env.watchdog.example b/.env.watchdog.example index 08fcfe0..0b2d9ea 100644 --- a/.env.watchdog.example +++ b/.env.watchdog.example @@ -38,5 +38,15 @@ WATCHDOG_QUEUE_DEPTH=20 WATCHDOG_CONTAINERS=orchestrator WATCHDOG_DOCKER_SOCK=/var/run/docker.sock WATCHDOG_DEPS= +# proc_blocking (ORCH-111): opt-in алерт на долго живущий осиротевший тест-процесс +# (pytest), репарентированный на хост. Требует `pid: host` на сервисе +# orchestrator-watchdog (compose) — привилегия только у наблюдателя, read-only. +# Дефолт-off → нулевая регрессия. PROC_AGE_MIN ОБЯЗАН превышать +# max(merge_retest_timeout_s=600, coverage_run_timeout_s=900)/60 = 15 мин, иначе +# легитимный прогон даст ложный алерт. 60 мин = 4× запас. +WATCHDOG_PROC_ENABLED=false +WATCHDOG_PROC_AGE_MIN=60 +WATCHDOG_PROC_PATTERNS=pytest +WATCHDOG_PROC_COOLDOWN_S=1800 WATCHDOG_TG_BOT_TOKEN= WATCHDOG_TG_CHAT_ID= diff --git a/.task-dev.md b/.task-dev.md index ed13550..56db51d 100644 --- a/.task-dev.md +++ b/.task-dev.md @@ -1,4 +1,4 @@ -Work item: ORCH-011 +Work item: ORCH-111 Repo: orchestrator -Branch: feature/ORCH-011- +Branch: feature/ORCH-111-bug-watchdog-must-alert-on-lon Stage: development \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 45529da..f440101 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,12 @@ Формат: [Keep a Changelog](https://keepachangelog.com/). Записи — на смысловой PR/задачу. ## [Unreleased] +- **Watchdog-сигнал `proc_blocking`: алерт на долго живущий осиротевший тест-процесс** (ORCH-111, `feat`): закрыта слепая зона наблюдаемости между `agent_hung` (видит только треканые джобы по `jobs.pid`) и осиротевшими субпроцессами `pytest`, которые орк запускает сам (`merge_gate.retest_branch`/`coverage_gate.measure_coverage`) и которые при timeout-kill агента (`-9`, ORCH-109) репарентируются на tini и живут сутками, грузя CPU и валя merge-gate re-test (инцидент: процессы `test_install_lite_script.py` жили >2 суток без единого алерта). Изменения **строго внутри наблюдателя** (`watchdog/**` + сервис watchdog в compose); `src/**`/`/metrics`/`schema_version`/`STAGE_TRANSITIONS`/`QG_CHECKS`/`check_*`/machine-verdict/схема БД — **байт-в-байт не тронуты**; выкат пересобирает **только** `orchestrator-watchdog`, прод `orchestrator` не рестартится (NFR-3). ADR: `docs/work-items/ORCH-111/06-adr/ADR-001-watchdog-orphan-test-process-alert.md`, сквозной `docs/architecture/adr/adr-0041-watchdog-orphan-test-process-alert.md`. + - **Коллектор `watchdog/collectors/proc.py` (D3):** новый stdlib-only `/proc`-скан (под `pid: host` контейнерный `/proc` отражает хост-namespace) — читает `/proc/stat` (`btime`) + `os.sysconf("SC_CLK_TCK")`, итерирует числовые `/proc/`, матчит `/proc//cmdline` по паттерну тест-класса, парсит `/proc//stat` (поле 22 `starttime` → `age_s`, поля 14+15 `utime+stime` → `cpu_s` информационно). Строго **read-only** (никаких `os.kill`/сигналов/`subprocess`; **никогда** не читает `/proc//environ` — секреты); **never-raise** (per-pid гонка «процесс умер между listdir и read» пропускается, top-level → `[]`); чистый разбор отделён от I/O (тестируется на фейковом `/proc`-дереве). + - **Чистый builder `proc_signals` + синтез RECOVERY (D4):** per-entity `Signal("proc_blocking", pid)` active ⇔ `age_s > cfg.proc_age_s` (cmdline уже отфильтрована коллектором); действенный RU-`detail` (PID + возраст + усечённый фрагмент cmdline + CPU-время). Исчезновение процесса не оставляет «висящего» алерта: в `core.tick()` для каждого alerting-ключа без свежего сигнала **синтезируется** `Signal(active=False)` → существующая `decision.decide()`/`AlertState` даёт **однократный** RECOVERY и чистит состояние (никакой новой анти-спам-логики — FR-5). + - **Анти-false-positive и отсутствие дубля с `agent_hung` — по построению (D2):** cmdline-скоуп (`claude`-агент ≠ `pytest` → нулевое пересечение, NFR-4/AC-5) + дефолтный порог возраста (60 мин) **превышает** макс. легитимный бюджет тест-прогона `max(merge_retest_timeout_s=600, coverage_run_timeout_s=900)` → in-flight прогон физически не перерастает порог (BR-4/AC-4). Без хрупкого кросс-namespace матчинга PID. + - **Конфиг + kill-switch (D5):** ключи `WATCHDOG_PROC_ENABLED` (дефолт **false** — opt-in) / `WATCHDOG_PROC_AGE_MIN` (60) / `WATCHDOG_PROC_PATTERNS` (`pytest`) / `WATCHDOG_PROC_COOLDOWN_S` (1800), never-raise парсеры. При выключенном флаге коллектор в `tick()` **не вызывается** → нулевой оверхед и байт-в-байт прежний тик (AC-7). Топология (D6): аддитивный `pid: host` **только** на сервисе `orchestrator-watchdog` (привилегия read-only, меньше уже-смонтированного `docker.sock`; не volume → инвариант read-only-маунтов цел). + - **Канон тиража (NFR-5):** новые `WATCHDOG_PROC_*` синхронизированы в `.env.watchdog.example` ↔ блок `WATCHDOG_*` `.env.example` (key-sync тест зелёный), описаны в `docs/deployment/LITE_SETUP.md` §4 и `docs/architecture/README.md` (§ proc_blocking). Покрытие — `tests/watchdog/test_proc_blocking_signal.py` (TC-01…TC-06), `test_proc_collector.py` (парсинг `/proc`), `test_tick_proc_blocking_integration.py` (TC-07 tick→dispatch + flag-off), позитивный `pid: host` в `test_compose_service.py`, proc-конфиг в `test_config_killswitch.py`. Полный `pytest tests/` зелёный (1930). - **Timeout-бюджеты developer/reviewer + launch-стамп модели в телеметрии** (ORCH-109, `fix`): две аддитивные изолированные правки подсистемы запуска агентов (инцидент ORCH-104, runs 658/659/660), **без** касания `STAGE_TRANSITIONS`/`QG_CHECKS`/`check_*`/machine-verdict/схемы БД. ADR: `docs/work-items/ORCH-109/06-adr/ADR-001-agent-timeout-budgets-and-launch-model-stamp.md`, сквозной `docs/architecture/adr/adr-0040-agent-timeout-budgets-and-launch-model-stamp.md`. - **Launch-стамп модели (D1, FR-1):** резолвенная `resolve_agent_model(...)` пишется в `agent_runs.model` в **момент launch** объединённым `UPDATE agent_runs SET model=?, effort=? WHERE id=?` рядом со стампом эффорта (ORCH-087) в `launcher._spawn`. Раньше модель писалась только постфактум из финального usage-JSON (`record_usage`, `model=COALESCE(?, model)`), а убитый по тайм-ауту прогон этот JSON не эмитит → модель оставалась `NULL` ровно тогда, когда нужна для разбора инцидента. Теперь модель присутствует с launch, **переживает timeout-kill (`exit_code=-9`)**, видна in-flight в `GET /metrics`/`GET /queue` (`get_running_agents` уже отдаёт `model`) и в строке Telegram-карточки. Пустой резолв (CLI-дефолт без `--model`) → `NULL` (симметрично `effort or None`). Постфактум `record_usage` остаётся **обогащением** (COALESCE сохраняет launch-стамп при `model=None`). never-raise: сбой стампа изолирован `try/except` + WARNING, launch продолжается. - **Поднятые per-role wall-clock бюджеты (D3/D4, FR-3):** выделенные типизированные ключи `agent_timeout_developer_s=3600` (60м) / `agent_timeout_reviewer_s=3000` (50м) (env `ORCH_AGENT_TIMEOUT_DEVELOPER_S`/`_REVIEWER_S`). `_resolve_timeout(agent)` получил детерминированную лестницу: `agent_timeout_overrides_json[agent]` (операторский escape-hatch, высший приоритет, BC) → выделенный ключ роли → `agent_timeout_seconds=1800` (прочие роли — байт-в-байт). Малформный JSON / непозитивный/нечисловой выделенный ключ → откат на глобальный дефолт + WARNING (never-break). Дефолты = боевым значениям (канон ORCH-101): пустой `.env` воспроизводит поднятые бюджеты. **Кросс-инвариант reaper ORCH-065** сохранён синхронным поднятием `reaper_max_running_s` 3600 → **5400** (`5400 > max(timeout)3600 + grace20 = 3620`). diff --git a/docker-compose.yml b/docker-compose.yml index 67f8ee9..1338403 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -79,6 +79,12 @@ services: restart: unless-stopped init: true network_mode: host + # ORCH-111 (adr-0041 D6): share the host PID-namespace so the sidecar's /proc + # reflects the host and the proc_blocking collector can see orphaned pytest + # subprocesses. Privilege is read-only and ONLY on the observer; the signal + # is default-off (WATCHDOG_PROC_ENABLED=false) -> no behaviour change unless + # opted in. NOT a volume, so the host-paths-read-only compose test is unaffected. + pid: host mem_limit: 128m mem_reservation: 32m volumes: diff --git a/docs/deployment/LITE_SETUP.md b/docs/deployment/LITE_SETUP.md index 50bc5b0..9265dc7 100644 --- a/docs/deployment/LITE_SETUP.md +++ b/docs/deployment/LITE_SETUP.md @@ -163,6 +163,18 @@ cp .env.watchdog.example .env.watchdog # заполнить два ключа: WATCHDOG_TG_BOT_TOKEN / WATCHDOG_TG_CHAT_ID (бота создадим в §8) ``` +**Опционально (ORCH-111): алерт на осиротевший тест-процесс.** Watchdog умеет +поднимать сигнал `proc_blocking` на долго живущий процесс тест-класса (по умолчанию +`pytest`), репарентированный на хост и грузящий CPU. По умолчанию **выключен** +(`WATCHDOG_PROC_ENABLED=false`) — нулевая регрессия. Чтобы включить, в `.env.watchdog`: +`WATCHDOG_PROC_ENABLED=true`, при необходимости подстройте `WATCHDOG_PROC_AGE_MIN` +(минуты; **обязан** превышать `max(merge_retest_timeout_s, coverage_run_timeout_s)/60`, +дефолт 60), `WATCHDOG_PROC_PATTERNS` (CSV cmdline-подстрок), `WATCHDOG_PROC_COOLDOWN_S`. +Для видимости процессов хоста сервису `orchestrator-watchdog` в `docker-compose.yml` +задан `pid: host` (привилегия только у наблюдателя, read-only). **Проверка:** +`grep -E '^WATCHDOG_PROC_ENABLED=' .env.watchdog` — если `true`, после рестарта только +sidecar (`docker compose up -d orchestrator-watchdog`) в его логах виден тик без ошибок. + **Проверка (резолв всей конфигурации):** ```bash diff --git a/tests/watchdog/test_compose_service.py b/tests/watchdog/test_compose_service.py index 359a950..71352c1 100644 --- a/tests/watchdog/test_compose_service.py +++ b/tests/watchdog/test_compose_service.py @@ -48,6 +48,17 @@ def test_host_paths_mounted_read_only(): assert v.endswith(":ro"), f"watchdog mount must be :ro: {v}" +def test_watchdog_shares_host_pid_namespace(): + # ORCH-111 (adr-0041 D6): the sidecar shares the host PID-namespace so its + # /proc reflects the host (proc_blocking collector). `pid: host` is NOT a + # volume, so the read-only-mounts invariant above is unaffected. + wd = _compose()["services"]["orchestrator-watchdog"] + assert wd.get("pid") == "host", "orchestrator-watchdog must declare `pid: host`" + # The privilege stays on the OBSERVER only — prod orchestrator must NOT get it. + orch = _compose()["services"]["orchestrator"] + assert "pid" not in orch, "the prod orchestrator service must not share the host PID-namespace" + + def test_env_file_is_optional(): # A missing .env.watchdog must not break `docker compose up` (self-hosting). wd = _compose()["services"]["orchestrator-watchdog"] diff --git a/tests/watchdog/test_config_killswitch.py b/tests/watchdog/test_config_killswitch.py index 001d14a..bf3a0de 100644 --- a/tests/watchdog/test_config_killswitch.py +++ b/tests/watchdog/test_config_killswitch.py @@ -67,3 +67,35 @@ def test_malformed_env_degrades_to_default(): cfg = Config.from_env({"WATCHDOG_INTERVAL_S": "abc", "WATCHDOG_MEM_PCT": ""}) assert cfg.interval_s == 30.0 assert cfg.mem_pct == 90.0 + + +# -- ORCH-111: proc_blocking config (kill-switch default-off + safe threshold) -- +def test_proc_blocking_defaults_off_and_safe(): + cfg = Config.from_env({}) + assert cfg.proc_enabled is False # opt-in (needs `pid: host`) + assert cfg.proc_patterns == ["pytest"] + assert cfg.proc_cooldown_s == 1800.0 + # Cross-invariant (adr-0041 D2): the default age threshold MUST exceed the max + # legitimate test-run budget max(merge_retest_timeout_s=600, coverage=900). + assert cfg.proc_age_s > 900.0 + + +def test_proc_blocking_thresholds_read_from_env(): + cfg = Config.from_env( + { + "WATCHDOG_PROC_ENABLED": "true", + "WATCHDOG_PROC_AGE_MIN": "45", + "WATCHDOG_PROC_PATTERNS": "pytest,coverage run", + "WATCHDOG_PROC_COOLDOWN_S": "900", + } + ) + assert cfg.proc_enabled is True + assert cfg.proc_age_s == 45 * 60.0 + assert cfg.proc_patterns == ["pytest", "coverage run"] + assert cfg.proc_cooldown_s == 900.0 + + +def test_proc_blocking_malformed_env_degrades(): + cfg = Config.from_env({"WATCHDOG_PROC_AGE_MIN": "nope", "WATCHDOG_PROC_ENABLED": ""}) + assert cfg.proc_age_min == 60.0 + assert cfg.proc_enabled is False diff --git a/tests/watchdog/test_proc_blocking_signal.py b/tests/watchdog/test_proc_blocking_signal.py new file mode 100644 index 0000000..38e86d9 --- /dev/null +++ b/tests/watchdog/test_proc_blocking_signal.py @@ -0,0 +1,256 @@ +"""ORCH-111 TC-01…TC-06: the proc_blocking signal builder + decision surface. + +Pure / deterministic — no real ``/proc``, no container, no socket, no timer. The +collector is exercised here only for its never-raise / read-only contract +(TC-04); its ``/proc`` parsing fixtures live in ``test_proc_collector.py``. + +TC-01 is the REGRESS anchor: before ORCH-111 there was no ``proc_blocking`` +builder/dispatch at all, so a long-lived orphaned pytest raised no alert; this +asserts the active signal is now produced (red→green). +""" +import ast as _ast +import inspect as _inspect + +from watchdog.collectors import proc as proc_mod +from watchdog.config import Config +from watchdog.core import Watchdog +from watchdog.decision import ( + ACTION_ALERT, + ACTION_NONE, + ACTION_REALERT, + ACTION_RECOVERY, +) +from watchdog.signals import proc_signals + + +def _cfg(**kw) -> Config: + base = {"WATCHDOG_PROC_ENABLED": "true", "WATCHDOG_PROC_AGE_MIN": "60"} + return Config.from_env({**base, **kw}) + + +def _candidate(pid=4242, age_s=7200.0, cmdline="python3 -m pytest tests/", cpu_s=1234.0): + return {"pid": pid, "cmdline": cmdline, "age_s": age_s, "cpu_s": cpu_s, "start_ticks": 1} + + +# -- TC-01: regress — active signal for a long-lived blocking process --------- +def test_tc01_builder_emits_active_proc_blocking_signal(): + cfg = _cfg() # proc_age_s == 3600 + sigs = proc_signals(cfg, [_candidate(pid=4242, age_s=7200.0)]) + assert len(sigs) == 1 + sig = sigs[0] + assert sig.key == ("proc_blocking", 4242) + assert sig.active is True # 7200 > 3600 + # AC-2: actionable detail — PID, age in seconds, cmdline fragment, CPU time. + assert "4242" in sig.detail + assert "7200" in sig.detail + assert "pytest" in sig.detail + assert "CPU" in sig.detail + assert sig.cooldown_s == cfg.proc_cooldown_s + + +# -- TC-02: anti-false-positive — below the threshold -> inactive ------------- +def test_tc02_below_threshold_is_inactive(): + cfg = _cfg() # proc_age_s == 3600 + sigs = proc_signals(cfg, [_candidate(age_s=600.0)]) # within a 600s test budget + assert len(sigs) == 1 + assert sigs[0].active is False # 600 < 3600 -> no alert (BR-4 / AC-4) + + +def test_tc02_boundary_is_strict_greater_than(): + cfg = _cfg() + at_threshold = proc_signals(cfg, [_candidate(age_s=cfg.proc_age_s)]) + assert at_threshold[0].active is False # strict `>`: exactly at threshold is OK + over = proc_signals(cfg, [_candidate(age_s=cfg.proc_age_s + 1)]) + assert over[0].active is True + + +# -- TC-03: config / kill-switch + default threshold > test-run budget -------- +def test_tc03_defaults_are_off_and_safe(): + cfg = Config.from_env({}) + assert cfg.proc_enabled is False # default-OFF (opt-in, D5) + assert cfg.proc_patterns == ["pytest"] + assert cfg.proc_cooldown_s == 1800.0 + # Cross-invariant (D2): default age threshold MUST exceed the max legitimate + # test-run budget max(merge_retest_timeout_s=600, coverage_run_timeout_s=900). + assert cfg.proc_age_s > 900.0 + + +def test_tc03_env_overrides_and_malformed_degrade(): + cfg = Config.from_env( + { + "WATCHDOG_PROC_ENABLED": "true", + "WATCHDOG_PROC_AGE_MIN": "30", + "WATCHDOG_PROC_PATTERNS": "pytest,coverage run", + "WATCHDOG_PROC_COOLDOWN_S": "600", + } + ) + assert cfg.proc_enabled is True + assert cfg.proc_age_s == 30 * 60.0 + assert cfg.proc_patterns == ["pytest", "coverage run"] + assert cfg.proc_cooldown_s == 600.0 + # malformed numerics degrade to defaults (never-raise config). + bad = Config.from_env({"WATCHDOG_PROC_AGE_MIN": "abc", "WATCHDOG_PROC_COOLDOWN_S": ""}) + assert bad.proc_age_min == 60.0 + assert bad.proc_cooldown_s == 1800.0 + + +def test_tc03_killswitch_off_makes_collector_inert(): + cfg = Config.from_env({"WATCHDOG_PROC_ENABLED": "false"}) + dog = Watchdog(cfg, notifier=_Notifier(), docker=_StubDocker(), now_provider=lambda: 0.0) + # The gated collector returns [] without ever touching /proc (zero overhead). + assert dog._collect_proc(now=0.0) == [] + + +# -- TC-04: collector never-raise / read-only --------------------------------- +def test_tc04_collector_degrades_to_empty_on_broken_source(): + # Missing /proc root -> [] (one signal skipped), no exception. + assert proc_mod.collect_candidates(["pytest"], now=0.0, proc_root="/no/such/proc") == [] + + +def test_tc04_collector_empty_when_btime_unreadable(tmp_path): + # /proc with no parseable btime -> [] (cannot compute age -> no bogus signal). + (tmp_path / "stat").write_text("cpu 1 2 3\nintr 0\n") + assert proc_mod.collect_candidates(["pytest"], now=0.0, proc_root=str(tmp_path)) == [] + + +def _docstring_node_ids(tree) -> set: + """ids of the Constant nodes that are module/func/class docstrings (prose).""" + out = set() + for node in _ast.walk(tree): + if isinstance(node, (_ast.Module, _ast.FunctionDef, _ast.AsyncFunctionDef, _ast.ClassDef)): + body = getattr(node, "body", []) + if ( + body + and isinstance(body[0], _ast.Expr) + and isinstance(body[0].value, _ast.Constant) + and isinstance(body[0].value.value, str) + ): + out.add(id(body[0].value)) + return out + + +def test_tc04_collector_source_is_read_only(): + # AC-3 / NFR-2: the EXECUTABLE code (not the prose describing the contract) + # carries no kill / signal / subprocess / environ-read. Scan the AST so the + # docstring that documents the ban does not trip the check. + tree = _ast.parse(_inspect.getsource(proc_mod)) + docstrings = _docstring_node_ids(tree) + violations: list[str] = [] + _MUTATING_ATTRS = {"kill", "system", "Popen", "popen", "run", "send_signal", "terminate"} + for node in _ast.walk(tree): + if isinstance(node, _ast.Import): + for a in node.names: + if a.name.split(".")[0] in {"subprocess", "signal"}: + violations.append(f"import {a.name}") + elif isinstance(node, _ast.ImportFrom): + if (node.module or "").split(".")[0] in {"subprocess", "signal"}: + violations.append(f"from {node.module}") + elif isinstance(node, _ast.Attribute) and node.attr in _MUTATING_ATTRS: + violations.append(f".{node.attr}") + elif isinstance(node, _ast.Constant) and isinstance(node.value, str): + if id(node) not in docstrings and "environ" in node.value: + violations.append("reads /proc//environ") + assert not violations, f"read-only contract violated in proc.py: {violations}" + + +def test_tc04_builder_skips_records_missing_fields(): + cfg = _cfg() + sigs = proc_signals(cfg, [{"pid": None}, {"cmdline": "pytest"}, _candidate()]) + assert [s.key for s in sigs] == [("proc_blocking", 4242)] # only the valid record + + +# -- TC-05: anti-spam / recovery through decide()/AlertState ------------------ +def test_tc05_alert_throttle_realert_then_recovery(): + seq = {"candidates": [_candidate(pid=7, age_s=7200.0)]} + cfg = _cfg(WATCHDOG_PROC_COOLDOWN_S="1000") + t = {"v": 0.0} + notifier = _Notifier() + dog = Watchdog(cfg, notifier=notifier, docker=_StubDocker(), now_provider=lambda: t["v"]) + dog._collect_proc = lambda now: list(seq["candidates"]) # inject collector + + def proc_alerts(): + return [m for m in notifier.sent if "Блокирующий процесс" in m] + + def actions(): + return [a for a, s in dog.tick() if getattr(s, "key", (None,))[0] == "proc_blocking"] + + # tick 1: threshold crossed -> exactly one ALERT. + assert ACTION_ALERT in actions() + assert len(proc_alerts()) == 1 + # tick 2: still alive, within cooldown -> NONE (anti-spam, no new alert). + t["v"] = 100.0 + assert actions() == [ACTION_NONE] + assert len(proc_alerts()) == 1 + # tick 3: cooldown elapsed -> REALERT. + t["v"] = 1100.0 + assert ACTION_REALERT in actions() + assert len(proc_alerts()) == 2 + # tick 4: the process vanished -> exactly one RECOVERY (synthesised, D4). + seq["candidates"] = [] + t["v"] = 1200.0 + assert ACTION_RECOVERY in actions() + recoveries = [m for m in notifier.sent if "восстановление" in m and "Блокирующий" in m] + assert len(recoveries) == 1 + # tick 5: still gone -> no repeated recovery (state cleared). + t["v"] = 1300.0 + dog.tick() + assert len([m for m in notifier.sent if "восстановление" in m and "Блокирующий" in m]) == 1 + + +# -- TC-06: no duplicate with agent_hung (cmdline partition) ------------------ +def test_tc06_claude_agent_cmdline_never_matches_pytest_pattern(): + # A claude agent process (covered by agent_hung) is excluded by the collector + # pattern scope -> proc_blocking never fires for it (NFR-4 / AC-5, by construction). + assert proc_mod.matches_patterns("claude --model claude-opus-4-8 -p ...", ["pytest"]) is False + assert proc_mod.matches_patterns("python3 -m pytest tests/", ["pytest"]) is True + + +def test_tc06_collector_excludes_non_matching_processes(tmp_path): + _write_fake_proc( + tmp_path, + btime=1_000_000, + procs={ + "100": ("claude --model claude-opus-4-8", _stat_line(start_ticks=0)), + "200": ("python3 -m pytest tests/test_x.py", _stat_line(start_ticks=0)), + }, + ) + recs = proc_mod.collect_candidates( + ["pytest"], now=1_010_000.0, proc_root=str(tmp_path), clk_tck=100 + ) + assert [r["pid"] for r in recs] == [200] # only the pytest process + + +# -- shared fakes ------------------------------------------------------------- +class _Notifier: + def __init__(self): + self.sent = [] + + def send(self, text): + self.sent.append(text) + return True + + +class _StubDocker: + def inspect(self, name): + return {"State": {"Status": "running"}} + + +def _stat_line(start_ticks: int, utime: int = 0, stime: int = 0) -> str: + # /proc//stat: pid (comm) state ppid ... utime(14) stime(15) ... starttime(22) ... + fields = ["0"] * 52 + fields[0] = "999" + fields[1] = "(python3)" + fields[2] = "S" + fields[13] = str(utime) # field 14 + fields[14] = str(stime) # field 15 + fields[21] = str(start_ticks) # field 22 + return " ".join(fields) + + +def _write_fake_proc(root, *, btime: int, procs: dict): + (root / "stat").write_text(f"cpu 1 2 3\nbtime {btime}\nintr 0\n") + for pid, (cmdline, stat_line) in procs.items(): + d = root / pid + d.mkdir() + (d / "cmdline").write_bytes(cmdline.replace(" ", "\x00").encode() + b"\x00") + (d / "stat").write_text(stat_line) diff --git a/tests/watchdog/test_proc_collector.py b/tests/watchdog/test_proc_collector.py new file mode 100644 index 0000000..aa6010c --- /dev/null +++ b/tests/watchdog/test_proc_collector.py @@ -0,0 +1,148 @@ +"""ORCH-111: the /proc collector — pure parsing + a fake /proc tree (never-raise). + +Mirrors ``test_host_collector.py``: the pure parsers are unit-tested on text +fixtures and ``collect_candidates`` is driven against a temporary ``/proc`` tree, +so no real host / Linux kernel is required. +""" +from watchdog.collectors import proc as proc_mod + + +# -- parse_btime -------------------------------------------------------------- +def test_parse_btime_reads_the_btime_line(): + text = "cpu 1 2 3 4\nbtime 1700000000\nprocesses 99\n" + assert proc_mod.parse_btime(text) == 1700000000 + + +def test_parse_btime_absent_is_none(): + assert proc_mod.parse_btime("cpu 1 2 3\nintr 0\n") is None + + +def test_parse_btime_garbage_is_none(): + assert proc_mod.parse_btime("btime not-a-number\n") is None + assert proc_mod.parse_btime("") is None + + +# -- parse_pid_stat (comm may contain spaces/parens) -------------------------- +def test_parse_pid_stat_simple(): + # field 14 utime, 15 stime, 22 starttime. + fields = ["0"] * 52 + fields[0], fields[1], fields[2] = "1234", "(python3)", "R" + fields[13], fields[14], fields[21] = "50", "25", "9000" + st = proc_mod.parse_pid_stat(" ".join(fields)) + assert st == {"utime": 50, "stime": 25, "starttime": 9000} + + +def test_parse_pid_stat_comm_with_spaces_and_parens(): + # A pathological comm "(py (test) x)" must not break field indexing — we + # split after the LAST ')'. + fields = ["0"] * 52 + fields[13], fields[14], fields[21] = "7", "3", "4242" + tail = " ".join(fields[2:]) + line = f"1234 (py (test) x) {tail}" + st = proc_mod.parse_pid_stat(line) + assert st == {"utime": 7, "stime": 3, "starttime": 4242} + + +def test_parse_pid_stat_truncated_is_none(): + assert proc_mod.parse_pid_stat("1234 (python3) R 1 2 3") is None + assert proc_mod.parse_pid_stat("no parens here") is None + assert proc_mod.parse_pid_stat("") is None + + +# -- decode_cmdline ----------------------------------------------------------- +def test_decode_cmdline_nul_separated(): + raw = b"python3\x00-m\x00pytest\x00tests/test_x.py\x00" + assert proc_mod.decode_cmdline(raw) == "python3 -m pytest tests/test_x.py" + + +def test_decode_cmdline_empty_for_kernel_thread(): + assert proc_mod.decode_cmdline(b"") == "" + assert proc_mod.decode_cmdline(None) == "" + + +# -- matches_patterns --------------------------------------------------------- +def test_matches_patterns_substring_any(): + assert proc_mod.matches_patterns("python3 -m pytest x", ["pytest"]) is True + assert proc_mod.matches_patterns("python3 -m coverage run", ["pytest", "coverage run"]) is True + assert proc_mod.matches_patterns("bash -c sleep", ["pytest"]) is False + assert proc_mod.matches_patterns("", ["pytest"]) is False + assert proc_mod.matches_patterns("pytest", []) is False + + +# -- collect_candidates (fake /proc tree) ------------------------------------- +def _stat_line(start_ticks, utime=0, stime=0): + fields = ["0"] * 52 + fields[0], fields[1], fields[2] = "999", "(python3)", "S" + fields[13], fields[14], fields[21] = str(utime), str(stime), str(start_ticks) + return " ".join(fields) + + +def _write_proc(root, btime, procs): + (root / "stat").write_text(f"cpu 1 2 3\nbtime {btime}\n") + for pid, (cmdline, stat) in procs.items(): + d = root / pid + d.mkdir() + (d / "cmdline").write_bytes(cmdline.replace(" ", "\x00").encode() + b"\x00") + (d / "stat").write_text(stat) + + +def test_collect_candidates_computes_age_and_cpu(tmp_path): + # btime=1_000_000, starttime=200_000 ticks @ 100 Hz -> start epoch = 1_002_000. + # now=1_010_000 -> age 8000s; utime+stime=300 ticks @100Hz -> cpu 3s. + _write_proc( + tmp_path, + btime=1_000_000, + procs={"200": ("python3 -m pytest tests/", _stat_line(200_000, utime=200, stime=100))}, + ) + recs = proc_mod.collect_candidates( + ["pytest"], now=1_010_000.0, proc_root=str(tmp_path), clk_tck=100 + ) + assert len(recs) == 1 + r = recs[0] + assert r["pid"] == 200 + assert r["cmdline"] == "python3 -m pytest tests/" + assert abs(r["age_s"] - 8000.0) < 1e-6 + assert abs(r["cpu_s"] - 3.0) < 1e-6 + + +def test_collect_candidates_filters_by_pattern(tmp_path): + _write_proc( + tmp_path, + btime=1_000_000, + procs={ + "100": ("claude --model x", _stat_line(0)), + "200": ("python3 -m pytest a", _stat_line(0)), + "300": ("/usr/bin/dockerd", _stat_line(0)), + }, + ) + recs = proc_mod.collect_candidates( + ["pytest"], now=1_010_000.0, proc_root=str(tmp_path), clk_tck=100 + ) + assert [r["pid"] for r in recs] == [200] + + +def test_collect_candidates_skips_unreadable_pid(tmp_path): + # A matching pid whose stat is unparseable (race: vanished mid-scan) is + # skipped without dropping the rest. + _write_proc( + tmp_path, + btime=1_000_000, + procs={ + "200": ("python3 -m pytest a", "garbage no parens"), + "201": ("python3 -m pytest b", _stat_line(0)), + }, + ) + recs = proc_mod.collect_candidates( + ["pytest"], now=1_010_000.0, proc_root=str(tmp_path), clk_tck=100 + ) + assert [r["pid"] for r in recs] == [201] + + +def test_collect_candidates_ignores_non_numeric_entries(tmp_path): + _write_proc(tmp_path, btime=1_000_000, procs={"200": ("pytest", _stat_line(0))}) + (tmp_path / "self").mkdir() # non-numeric -> ignored + (tmp_path / "meminfo").write_text("noise") + recs = proc_mod.collect_candidates( + ["pytest"], now=1_000_000.0, proc_root=str(tmp_path), clk_tck=100 + ) + assert [r["pid"] for r in recs] == [200] diff --git a/tests/watchdog/test_tick_proc_blocking_integration.py b/tests/watchdog/test_tick_proc_blocking_integration.py new file mode 100644 index 0000000..0aa0aa7 --- /dev/null +++ b/tests/watchdog/test_tick_proc_blocking_integration.py @@ -0,0 +1,128 @@ +"""ORCH-111 TC-07: full tick -> dispatch of the proc_blocking alert (integration). + +REGRESS: ``Watchdog.tick()`` with a collector that returns a long-lived blocking +process must dispatch exactly one ``proc_blocking`` alert through the fake +Notifier — even though ``/metrics`` reports no ``stuck`` stage and no hung agent. +With the kill-switch OFF the path is inert (byte-for-byte as before ORCH-111). + +The orchestrator ``/metrics`` envelope is stubbed healthy so ONLY the new signal +can fire; the proc collector is stubbed at the module boundary so the real +``_collect_proc`` gate + wrapper still execute. +""" +from watchdog.collectors import orch as orch_mod +from watchdog.collectors import proc as proc_mod +from watchdog.config import Config +from watchdog.core import Watchdog + + +class _Notifier: + def __init__(self): + self.sent = [] + + def send(self, text): + self.sent.append(text) + return True + + +class _StubDocker: + def inspect(self, name): + return {"State": {"Status": "running"}} + + +def _healthy_metrics(monkeypatch): + env = { + "schema_version": 1, + "generated_at": "2026-06-15T00:00:00Z", + "clk_tck": 100, + "agents": [], + "stages": [], + "queue": {"depth": 0, "counts": {"failed": 0}}, + } + monkeypatch.setattr( + orch_mod, "fetch_metrics", + lambda *a, **k: orch_mod.FetchResult(ok=True, envelope=env), + ) + + +def _cfg(**kw): + base = { + "WATCHDOG_TG_BOT_TOKEN": "t", + "WATCHDOG_TG_CHAT_ID": "c", + "WATCHDOG_PROC_ENABLED": "true", + "WATCHDOG_PROC_AGE_MIN": "60", # proc_age_s == 3600 + "WATCHDOG_CONTAINERS": "orchestrator", + } + return Config.from_env({**base, **kw}) + + +def _blocking(monkeypatch, age_s=7200.0): + rec = {"pid": 4242, "cmdline": "python3 -m pytest tests/test_install_lite_script.py", + "age_s": age_s, "cpu_s": 99999.0, "start_ticks": 1} + monkeypatch.setattr(proc_mod, "collect_candidates", lambda *a, **k: [rec]) + + +def _proc_alerts(notifier): + return [m for m in notifier.sent if "Блокирующий процесс" in m] + + +def test_tc07_tick_dispatches_proc_blocking_alert(monkeypatch): + _healthy_metrics(monkeypatch) + _blocking(monkeypatch) + notifier = _Notifier() + dog = Watchdog(_cfg(), notifier=notifier, docker=_StubDocker(), now_provider=lambda: 0.0) + + dog.tick() + + alerts = _proc_alerts(notifier) + assert len(alerts) == 1 + assert "4242" in alerts[0] + assert "pytest" in alerts[0] + assert alerts[0].startswith("\U0001f534") # red ALERT prefix + + +def test_tc07_killswitch_off_dispatches_nothing(monkeypatch): + _healthy_metrics(monkeypatch) + # Even if the collector WOULD return a blocking process, the gate skips it. + called = {"n": 0} + + def _boom(*a, **k): + called["n"] += 1 + return [{"pid": 1, "cmdline": "pytest", "age_s": 9e9, "cpu_s": 0.0}] + + monkeypatch.setattr(proc_mod, "collect_candidates", _boom) + notifier = _Notifier() + dog = Watchdog( + _cfg(WATCHDOG_PROC_ENABLED="false"), + notifier=notifier, docker=_StubDocker(), now_provider=lambda: 0.0, + ) + + dog.tick() + + assert _proc_alerts(notifier) == [] + assert called["n"] == 0 # collector never invoked when disabled (zero overhead) + + +def test_tc07_in_budget_process_does_not_alert(monkeypatch): + # A process below the threshold (legitimate in-flight run) -> no alert (AC-4). + _healthy_metrics(monkeypatch) + _blocking(monkeypatch, age_s=600.0) + notifier = _Notifier() + dog = Watchdog(_cfg(), notifier=notifier, docker=_StubDocker(), now_provider=lambda: 0.0) + + dog.tick() + + assert _proc_alerts(notifier) == [] + + +def test_tc07_tick_never_raises_when_collector_explodes(monkeypatch): + _healthy_metrics(monkeypatch) + + def _explode(*a, **k): + raise RuntimeError("boom") + + monkeypatch.setattr(proc_mod, "collect_candidates", _explode) + notifier = _Notifier() + dog = Watchdog(_cfg(), notifier=notifier, docker=_StubDocker(), now_provider=lambda: 0.0) + + dog.tick() # must not raise — collector error degrades to one skipped signal + assert _proc_alerts(notifier) == [] diff --git a/watchdog/collectors/proc.py b/watchdog/collectors/proc.py new file mode 100644 index 0000000..c6c1730 --- /dev/null +++ b/watchdog/collectors/proc.py @@ -0,0 +1,205 @@ +"""Collector: long-lived host processes whose cmdline matches a test-class (ORCH-111). + +stdlib-only ``/proc`` scan (ADR-001 D3). Under ``pid: host`` (D6) the container's +``/proc`` reflects the host PID-namespace, so the sidecar sees the orphaned +``pytest`` subprocess regardless of which container spawned it (the merge-gate / +coverage-gate re-test the orchestrator launches itself; on a timeout-kill of the +agent — ``exit_code=-9``, ORCH-109 — the grand-child ``pytest`` reparents onto +tini and keeps running for days). + +Strictly **READ-ONLY** (BR-3 / NFR-2): opens only ``/proc/stat``, +``/proc//stat`` and ``/proc//cmdline`` for reading. There is **no** +``os.kill``, signal-send, ``subprocess`` or any mutation on this path, and it +**never** reads ``/proc//environ`` (secrets, ADR-001 D3 / R-2). + +**never-raise** (NFR-1): a per-pid race — the process died between ``listdir`` +and ``read`` — skips that pid without breaking the list; any top-level failure +(non-Linux / missing ``/proc`` / unreadable ``/proc/stat``) degrades the whole +scan to ``[]`` (one signal skipped, the tick lives, D8). + +Pure parsing (``parse_btime`` / ``parse_pid_stat`` / ``decode_cmdline`` / +``matches_patterns``) is split from the I/O orchestration (``collect_candidates``) +so the scan is testable against a fake ``/proc`` tree, no real host needed. +""" +from __future__ import annotations + +import logging +import os + +logger = logging.getLogger("watchdog.collectors.proc") + +# /proc//stat field indices, 0-based AFTER the trailing ')' of `comm`. +# /proc//stat is: `pid (comm) state ppid ... utime stime ... starttime ...`. +# Fields are 1-based in proc(5); field 3 (state) is the first token after the +# last ')'. So field N (>=3) lives at index N-3 of the post-')'-split: +# utime = field 14 -> index 11 +# stime = field 15 -> index 12 +# starttime = field 22 -> index 19 +_STAT_UTIME_IDX = 11 +_STAT_STIME_IDX = 12 +_STAT_STARTTIME_IDX = 19 +_STAT_MIN_FIELDS = _STAT_STARTTIME_IDX + 1 # need starttime present + +_DEFAULT_CLK_TCK = 100 + + +def parse_btime(stat_text: str) -> int | None: + """Boot time (epoch seconds) from the ``btime `` line of ``/proc/stat``. + + Returns ``None`` when the line is absent / unparseable (never raises) so the + caller degrades the whole scan to ``[]`` rather than emitting a bogus age. + """ + try: + for line in stat_text.splitlines(): + if line.startswith("btime "): + parts = line.split() + if len(parts) >= 2: + return int(parts[1]) + except Exception as e: # noqa: BLE001 - tolerant: no btime -> no scan + logger.warning("watchdog: cannot parse /proc/stat btime: %s", e) + return None + + +def parse_pid_stat(stat_text: str) -> dict | None: + """Parse ``/proc//stat`` -> ``{utime, stime, starttime}`` (clock ticks). + + The ``comm`` field (2) is wrapped in parens and may itself contain spaces or + parens (e.g. ``(python -m) ()``), so we split AFTER the **last** ``')'`` and + index the remaining space-separated fields. Returns ``None`` on a malformed / + truncated line (never raises). + """ + try: + rparen = stat_text.rfind(")") + if rparen < 0: + return None + rest = stat_text[rparen + 1:].split() + if len(rest) < _STAT_MIN_FIELDS: + return None + return { + "utime": int(rest[_STAT_UTIME_IDX]), + "stime": int(rest[_STAT_STIME_IDX]), + "starttime": int(rest[_STAT_STARTTIME_IDX]), + } + except Exception as e: # noqa: BLE001 - one bad line, skip this pid + logger.debug("watchdog: cannot parse pid stat: %s", e) + return None + + +def decode_cmdline(raw: bytes | str | None) -> str: + """NUL-separated ``/proc//cmdline`` -> a space-joined string. + + Empty for kernel threads (they carry no cmdline) -> never matches a pattern. + Tolerant of bytes / str / ``None`` and undecodable bytes (never raises). + """ + try: + if raw is None: + return "" + if isinstance(raw, str): + raw = raw.encode("utf-8", "replace") + text = raw.decode("utf-8", "replace") + parts = [p for p in text.split("\x00") if p] + return " ".join(parts) + except Exception: # noqa: BLE001 - undecodable cmdline -> treat as empty + return "" + + +def matches_patterns(cmdline: str, patterns: list[str]) -> bool: + """``True`` iff ``cmdline`` contains ANY pattern as a substring. + + This is the test-class scope (ADR-001 D4): pattern-filtering happens in the + collector, so the signal builder only applies the age threshold. The default + pattern ``pytest`` never matches a ``claude`` agent cmdline -> zero overlap + with ``agent_hung`` by construction (NFR-4 / AC-5). + """ + if not cmdline: + return False + for p in patterns or []: + if p and p in cmdline: + return True + return False + + +def _clk_tck() -> int: + """``os.sysconf('SC_CLK_TCK')`` with a safe fallback (never raises).""" + try: + v = os.sysconf("SC_CLK_TCK") + return int(v) if v and int(v) > 0 else _DEFAULT_CLK_TCK + except Exception: # noqa: BLE001 - non-Linux / unsupported -> conventional 100 + return _DEFAULT_CLK_TCK + + +def _read_text(path: str) -> str | None: + try: + with open(path, "r") as f: + return f.read() + except Exception: # noqa: BLE001 - missing / unreadable -> None (per-pid race) + return None + + +def _read_bytes(path: str) -> bytes: + try: + with open(path, "rb") as f: + return f.read() + except Exception: # noqa: BLE001 - missing / unreadable -> empty cmdline + return b"" + + +def collect_candidates( + patterns: list[str], + *, + now: float, + proc_root: str = "/proc", + clk_tck: int | None = None, + read_text=None, + read_bytes=None, +) -> list[dict]: + """Scan ``/proc`` for live processes whose cmdline matches ``patterns``. + + Returns one ``{pid, cmdline, age_s, cpu_s, start_ticks}`` record per matching + live process. Pattern-filtering happens HERE (D4); the builder applies the + age threshold. ``age_s = now - (btime + starttime/clk_tck)``; + ``cpu_s = (utime + stime)/clk_tck`` (accumulated CPU time — informational for + BR-2, NOT part of activation). + + never-raise (D8): a top-level failure -> ``[]``; a per-pid race (vanished + process / unreadable file) is skipped silently. ``proc_root`` / ``now`` / + ``clk_tck`` / ``read_*`` are injectable so the scan is unit-testable against a + fake ``/proc`` tree with no real host. + """ + out: list[dict] = [] + try: + rt = read_text or _read_text + rb = read_bytes or _read_bytes + ticks = clk_tck if (clk_tck and clk_tck > 0) else _clk_tck() + btime = parse_btime(rt(os.path.join(proc_root, "stat")) or "") + if btime is None: + return [] + for entry in os.listdir(proc_root): + if not entry.isdigit(): + continue + try: + cmdline = decode_cmdline(rb(os.path.join(proc_root, entry, "cmdline"))) + if not matches_patterns(cmdline, patterns): + continue + st = parse_pid_stat(rt(os.path.join(proc_root, entry, "stat")) or "") + if st is None: + continue + start_ticks = st["starttime"] + age_s = now - (btime + start_ticks / ticks) + cpu_s = (st["utime"] + st["stime"]) / ticks + out.append( + { + "pid": int(entry), + "cmdline": cmdline, + "age_s": age_s, + "cpu_s": cpu_s, + "start_ticks": start_ticks, + } + ) + except Exception as e: # noqa: BLE001 - per-pid race, skip and continue + logger.debug("watchdog: skip /proc/%s: %s", entry, e) + continue + except Exception as e: # noqa: BLE001 - non-Linux / no /proc -> one signal tih + logger.warning("watchdog: proc scan error: %s", e) + return [] + return out diff --git a/watchdog/config.py b/watchdog/config.py index 4402910..e05f40e 100644 --- a/watchdog/config.py +++ b/watchdog/config.py @@ -116,6 +116,16 @@ class Config: containers: list[str] = field(default_factory=lambda: ["orchestrator"]) docker_sock: str = "/var/run/docker.sock" + # -- blocking test/child processes (opt-in; pid: host /proc scan, D5) -- + # Default-OFF: the signal needs the `pid: host` privilege (D6) and a + # conscious opt-in (mirror of disk_crit_enabled). proc_age_min MUST exceed + # max(merge_retest_timeout_s, coverage_run_timeout_s)/60 so a legitimate + # in-flight test run never crosses the threshold (D2 / AC-4). + proc_enabled: bool = False + proc_age_min: float = 60.0 # minutes a test process may live before alerting + proc_patterns: list[str] = field(default_factory=lambda: ["pytest"]) + proc_cooldown_s: float = 1800.0 # per-signal re-alert throttle + # -- external dependencies ------------------------------------------- deps: dict[str, str] = field(default_factory=dict) @@ -132,6 +142,10 @@ class Config: def stage_stuck_s(self) -> float: return self.stage_stuck_min * 60.0 + @property + def proc_age_s(self) -> float: + return self.proc_age_min * 60.0 + @classmethod def from_env(cls, env: dict | None = None) -> "Config": """Build a Config from ``env`` (defaults to ``os.environ``). never-raise.""" @@ -153,6 +167,10 @@ class Config: queue_depth=_int(e, "WATCHDOG_QUEUE_DEPTH", 20), containers=_csv(e, "WATCHDOG_CONTAINERS", ["orchestrator"]), docker_sock=_str(e, "WATCHDOG_DOCKER_SOCK", "/var/run/docker.sock"), + proc_enabled=_bool(e, "WATCHDOG_PROC_ENABLED", False), + proc_age_min=_float(e, "WATCHDOG_PROC_AGE_MIN", 60.0), + proc_patterns=_csv(e, "WATCHDOG_PROC_PATTERNS", ["pytest"]), + proc_cooldown_s=_float(e, "WATCHDOG_PROC_COOLDOWN_S", 1800.0), deps=_deps(e, "WATCHDOG_DEPS"), tg_bot_token=_str(e, "WATCHDOG_TG_BOT_TOKEN", ""), tg_chat_id=_str(e, "WATCHDOG_TG_CHAT_ID", ""), diff --git a/watchdog/core.py b/watchdog/core.py index 726ef32..b7c27fb 100644 --- a/watchdog/core.py +++ b/watchdog/core.py @@ -19,6 +19,7 @@ from .collectors import containers as containers_mod from .collectors import deps as deps_mod from .collectors import host as host_mod from .collectors import orch as orch_mod +from .collectors import proc as proc_mod from .config import Config from .notify import Notifier from . import signals as signals_mod @@ -93,6 +94,18 @@ class Watchdog: logger.warning("watchdog: deps collect error: %s", e) return {} + def _collect_proc(self, now: float) -> list: + # Opt-in: when WATCHDOG_PROC_ENABLED is false the scan is NOT called + # (gate mirrors _collect_disk on disk_crit_enabled) -> zero overhead and + # byte-for-byte tick behaviour as before ORCH-111 (D5 / AC-7). + if not self.cfg.proc_enabled: + return [] + try: + return proc_mod.collect_candidates(self.cfg.proc_patterns, now=now) + except Exception as e: # noqa: BLE001 - never-raise: one signal skipped + logger.warning("watchdog: proc collect error: %s", e) + return [] + # -- one tick --------------------------------------------------------- def tick(self) -> list: """Run one full pass; returns the dispatched ``(action, Signal)`` list. @@ -134,10 +147,53 @@ class Watchdog: # 4) external dependency pings built.extend(signals_mod.dep_signals(self._collect_deps())) + # 5) long-lived blocking test/child processes (opt-in; pid: host /proc). + # Gated entirely on proc_enabled so a disabled sidecar is byte-for-byte + # as before ORCH-111 (D5/AC-7); RECOVERY for a vanished process is + # synthesised through the SAME decide()/AlertState machinery (D4). + if self.cfg.proc_enabled: + proc_sigs = signals_mod.proc_signals(self.cfg, self._collect_proc(now)) + proc_sigs.extend(self._synthesize_proc_recoveries(proc_sigs)) + built.extend(proc_sigs) + dispatched = self._dispatch(built, now) self.last_run_ts = now return dispatched + def _synthesize_proc_recoveries(self, current_sigs: list) -> list: + """Synthesise an inactive ``Signal`` for every vanished proc_blocking key. + + ``proc_signals`` emits a signal ONLY for a currently observed candidate, + so a process that disappeared leaves an alerting :class:`AlertState` with + no fresh signal and would never recover. Reusing ``decide()``/ + ``AlertState`` (FR-5 — no separate anti-spam logic), we emit an + ``active=False`` signal for each alerting ``("proc_blocking", …)`` key + absent from the current set -> ``decide`` yields exactly one RECOVERY and + clears the state. This is per-family bookkeeping, not new throttling. + """ + out: list = [] + try: + current_keys = {s.key for s in current_sigs} + for key, state in list(self._states.items()): + if ( + isinstance(key, tuple) + and key + and key[0] == "proc_blocking" + and state.alerting + and key not in current_keys + ): + out.append( + signals_mod.Signal( + key=key, + active=False, + title="Блокирующий процесс", + detail=f"процесс PID {key[1]} завершился", + ) + ) + except Exception as e: # noqa: BLE001 - never-raise: skip recovery synthesis + logger.warning("watchdog: proc recovery synth error: %s", e) + return out + # -- decision + dispatch ---------------------------------------------- def _dispatch(self, built: list, now: float) -> list: """Run each signal through ``decide`` and send alert/realert/recovery.""" diff --git a/watchdog/signals.py b/watchdog/signals.py index 6613f66..da70eeb 100644 --- a/watchdog/signals.py +++ b/watchdog/signals.py @@ -246,6 +246,54 @@ def container_signals(cfg: Config, statuses: dict) -> list: return sigs +# Max cmdline length surfaced in an alert: truncate so a long arg list does not +# leak random arguments into the Telegram channel (ADR-001 D4 / R-2). +_PROC_CMDLINE_MAX = 120 + + +def proc_signals(cfg: Config, candidates: list) -> list: + """Build per-process ``proc_blocking`` signals from candidate records. Pure. + + Each candidate is ``{pid, cmdline, age_s, cpu_s?, start_ticks?}`` already + filtered to the test-class by the collector (D4). The signal is + ``active`` iff ``age_s > cfg.proc_age_s`` (the threshold is set above the max + legitimate test-run budget, so an in-flight run is never active — AC-4). Key + is per-entity ``("proc_blocking", pid)`` (mirror of ``("container_down", + name)``) so ``AlertState`` / cooldown work per process. The detail is + actionable (RU): truncated cmdline + PID + age (s) + accumulated CPU (s). + """ + sigs: list = [] + for rec in candidates or []: + try: + pid = rec.get("pid") + age_s = rec.get("age_s") + if pid is None or age_s is None: + continue + cmdline = (rec.get("cmdline") or "").strip() + frag = cmdline[:_PROC_CMDLINE_MAX] + if len(cmdline) > _PROC_CMDLINE_MAX: + frag += "…" + detail = ( + f"PID {pid} живёт {int(age_s)}s " + f"(порог {int(cfg.proc_age_s)}s): {frag}" + ) + cpu_s = rec.get("cpu_s") + if cpu_s is not None: + detail += f" · CPU {int(cpu_s)}s" + sigs.append( + Signal( + key=("proc_blocking", pid), + active=age_s > cfg.proc_age_s, + title="Блокирующий процесс", + detail=detail, + cooldown_s=cfg.proc_cooldown_s, + ) + ) + except Exception as e: # noqa: BLE001 - one bad record, others still build + logger.warning("watchdog: proc signal build error: %s", e) + return sigs + + def dep_signals(reachability: dict) -> list: """Build per-dependency down signals from ``{name: reachable}``. Pure.""" sigs: list = []