Files
orchestrator/watchdog/signals.py
claude-bot 2e73ccf090 feat(watchdog): proc_blocking alert for orphaned long-lived test processes
Close the observability gap between agent_hung (only tracked jobs by jobs.pid)
and orphaned pytest subprocesses the orchestrator launches itself
(merge_gate.retest_branch / coverage_gate.measure_coverage). On a timeout-kill of
the agent (-9, ORCH-109) the grand-child pytest reparents onto tini and keeps
running for days, starving CPU and failing merge-gate re-test — with no alert.

Strictly inside the observer (watchdog/** + the watchdog compose service):
- watchdog/collectors/proc.py: stdlib-only /proc scan (under pid: host),
  read-only, never-raise -> []; pure parsers split from I/O (tested on a fake
  /proc tree). Never reads /proc/<pid>/environ.
- watchdog/signals.py: pure proc_signals builder, per-entity
  ("proc_blocking", pid), active iff age_s > proc_age_s; actionable RU detail.
- watchdog/core.py: opt-in tick block (gated on proc_enabled -> zero overhead /
  byte-for-byte when off) + RECOVERY synthesis for a vanished process through the
  existing decide()/AlertState (no new anti-spam logic).
- watchdog/config.py: WATCHDOG_PROC_{ENABLED(false),AGE_MIN(60),PATTERNS(pytest),
  COOLDOWN_S(1800)}; default threshold > max(merge_retest_timeout_s=600,
  coverage_run_timeout_s=900) so a legit in-flight run never crosses it.
- docker-compose.yml: pid: host on orchestrator-watchdog ONLY (read-only privilege).

Anti-false-positive and no overlap with agent_hung are by construction (cmdline
scope + age threshold), not fragile cross-namespace PID matching.

Canon synced: WATCHDOG_PROC_* in .env.watchdog.example <-> .env.example block;
documented in LITE_SETUP.md and docs/architecture/README.md (architect). src/**,
/metrics, schema_version, STAGE_TRANSITIONS, QG_CHECKS, check_*, machine-verdict
and the DB schema are untouched; deploy rebuilds only the sidecar, prod
orchestrator is not restarted (NFR-3).

Tests: tests/watchdog/test_proc_blocking_signal.py (TC-01..TC-06),
test_proc_collector.py (/proc parsing), test_tick_proc_blocking_integration.py
(TC-07), plus pid: host and proc-config assertions. Full pytest tests/ green (1930).

Refs: ORCH-111
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 02:14:17 +03:00

332 lines
12 KiB
Python

"""Pure signal builders: turn collected raw inputs into ``Signal`` objects (D5).
A ``Signal`` is ``(key, active, title, detail, edge)``. ``key`` identifies the
signal for per-key anti-spam state: a scalar (``"orch_down"``, ``"host_mem"``)
or a tuple for per-entity signals (``("agent_hung", run_id)``,
``("container_down", name)``, ``("stage_stuck", work_item)``,
``("dep_down", name)``).
These builders are PURE — given the envelope / host readings / prev-sample state
they return signals + the next sample state, with no I/O — so the whole decision
surface is unit-testable without a container, a socket or a timer (TC-01…TC-11).
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from .collectors import containers as containers_mod
from .collectors import orch as orch_mod
from .config import Config
logger = logging.getLogger("watchdog.signals")
@dataclass
class Signal:
"""One evaluated signal heading into the decision function.
``edge`` marks event-style signals (e.g. ``job_failed``) that fire on each
new occurrence and have no sustained "recovery": the dispatcher does not
persist alerting state for them.
"""
key: object
active: bool
title: str
detail: str
edge: bool = False
cooldown_s: float | None = None # per-signal override of the global cooldown
@dataclass
class AgentSample:
"""Previous ``(cpu_ticks, generated_at_epoch)`` for one running agent (D5)."""
cpu_ticks: int
generated_at: float
@dataclass
class EnvelopeEval:
"""Result of evaluating the ``/metrics`` envelope: signals + carried state."""
signals: list = field(default_factory=list)
agent_samples: dict = field(default_factory=dict) # run_id -> AgentSample
failed_count: int | None = None
def _cpu_fraction(
cur_ticks: int,
cur_gen: float,
prev: AgentSample,
clk_tck: int,
) -> float | None:
"""CPU fraction of one agent across two ``/metrics`` polls (D5).
``frac = (Δticks / clk_tck) / Δseconds``. Returns ``None`` if the deltas are
not usable (no wall-time elapsed, non-positive clk_tck) so a degenerate
sample never produces a false "hung" verdict.
"""
try:
dt = cur_gen - prev.generated_at
if dt <= 0 or not clk_tck or clk_tck <= 0:
return None
cpu_seconds = (cur_ticks - prev.cpu_ticks) / clk_tck
if cpu_seconds < 0:
return None
return cpu_seconds / dt
except Exception as e: # noqa: BLE001 - degenerate sample, no verdict
logger.warning("watchdog: cpu_fraction error: %s", e)
return None
def eval_envelope(
envelope: dict,
cfg: Config,
prev_agents: dict,
prev_failed: int | None,
) -> EnvelopeEval:
"""Derive agent_hung / stage_stuck / job_failed / queue_depth signals (D5).
Pure: no I/O. ``prev_agents`` (run_id -> :class:`AgentSample`) and
``prev_failed`` carry the cross-tick state the sidecar owns; the returned
:class:`EnvelopeEval` includes the NEXT state to persist. never-raise: a bad
sub-section degrades that family of signals, the rest still evaluate.
"""
out = EnvelopeEval()
if not isinstance(envelope, dict):
out.agent_samples = dict(prev_agents)
out.failed_count = prev_failed
return out
clk_tck = envelope.get("clk_tck")
gen_at = orch_mod.parse_generated_at(envelope)
# -- agent_hung (needs two polls; per run_id) -------------------------
new_samples: dict = {}
try:
for a in envelope.get("agents") or []:
run_id = a.get("run_id")
cpu_ticks = a.get("cpu_ticks")
runtime_s = a.get("runtime_s")
if run_id is None:
continue
if cpu_ticks is None or gen_at is None:
# pid dead / non-Linux / no timestamp -> cannot judge; skip.
continue
new_samples[run_id] = AgentSample(int(cpu_ticks), gen_at)
prev = prev_agents.get(run_id)
if prev is None or not isinstance(clk_tck, int):
continue
frac = _cpu_fraction(int(cpu_ticks), gen_at, prev, clk_tck)
if frac is None or runtime_s is None:
continue
hung = (runtime_s > cfg.agent_hung_s) and (frac < cfg.agent_cpu_floor)
if hung:
out.signals.append(
Signal(
key=("agent_hung", run_id),
active=True,
title="Агент завис",
detail=(
f"agent={a.get('agent')} run_id={run_id} "
f"runtime={int(runtime_s)}s cpu={frac:.4f} "
f"(< {cfg.agent_cpu_floor})"
),
)
)
except Exception as e: # noqa: BLE001 - degrade agent family only
logger.warning("watchdog: eval agents error: %s", e)
out.agent_samples = new_samples
# -- stage_stuck (per work_item) -------------------------------------
try:
for s in envelope.get("stages") or []:
age = s.get("age_in_stage_s")
wi = s.get("work_item")
if age is None or wi is None:
continue
if age > cfg.stage_stuck_s:
out.signals.append(
Signal(
key=("stage_stuck", wi),
active=True,
title="Стадия застряла",
detail=(
f"{wi} в стадии {s.get('stage')} уже {int(age)}s "
f"(порог {int(cfg.stage_stuck_s)}s)"
),
)
)
except Exception as e: # noqa: BLE001
logger.warning("watchdog: eval stages error: %s", e)
# -- queue depth + job_failed (edge) ---------------------------------
failed_now: int | None = prev_failed
try:
queue = envelope.get("queue") or {}
depth = queue.get("depth")
if isinstance(depth, int) and depth >= cfg.queue_depth:
out.signals.append(
Signal(
key="queue_depth",
active=True,
title="Очередь растёт",
detail=f"глубина очереди {depth} (порог {cfg.queue_depth})",
)
)
counts = queue.get("counts") or {}
failed = counts.get("failed")
if isinstance(failed, int):
failed_now = failed
if prev_failed is not None and failed > prev_failed:
out.signals.append(
Signal(
key="job_failed",
active=True,
title="Job упал",
detail=(
f"failed-джобов стало {failed} "
f"(было {prev_failed}, +{failed - prev_failed})"
),
edge=True,
)
)
except Exception as e: # noqa: BLE001
logger.warning("watchdog: eval queue error: %s", e)
out.failed_count = failed_now
return out
def host_signals(cfg: Config, mem_pct: float | None, disk: tuple | None) -> list:
"""Build host memory + opt-in disk-ceiling signals (D5/D6). Pure."""
sigs: list = []
if mem_pct is not None:
sigs.append(
Signal(
key="host_mem",
active=mem_pct >= cfg.mem_pct,
title="Память хоста",
detail=f"память хоста {mem_pct}% (порог {cfg.mem_pct}%)",
)
)
# Disk ceiling is OPT-IN (D6): disk_watchdog (ORCH-063) owns the 85% alert;
# the sidecar only carries an independent HIGHER ceiling when explicitly
# enabled, so there is no double-alert on the same fill event (FR-9/AC-5).
if cfg.disk_crit_enabled and disk is not None:
path, pct = disk
sigs.append(
Signal(
key="host_disk_crit",
active=pct >= cfg.disk_crit_pct,
title="Диск (критический потолок)",
detail=(
f"диск {path} {pct}% (критический потолок {cfg.disk_crit_pct}%, "
f"независимый канал sidecar)"
),
)
)
return sigs
def container_signals(cfg: Config, statuses: dict) -> list:
"""Build per-container down signals from ``{name: status}``. Pure."""
sigs: list = []
for name, status in statuses.items():
sigs.append(
Signal(
key=("container_down", name),
active=containers_mod.container_alarm(status),
title="Контейнер не в норме",
detail=f"контейнер {name}: статус '{status}'",
)
)
return sigs
# Max cmdline length surfaced in an alert: truncate so a long arg list does not
# leak random arguments into the Telegram channel (ADR-001 D4 / R-2).
_PROC_CMDLINE_MAX = 120
def proc_signals(cfg: Config, candidates: list) -> list:
"""Build per-process ``proc_blocking`` signals from candidate records. Pure.
Each candidate is ``{pid, cmdline, age_s, cpu_s?, start_ticks?}`` already
filtered to the test-class by the collector (D4). The signal is
``active`` iff ``age_s > cfg.proc_age_s`` (the threshold is set above the max
legitimate test-run budget, so an in-flight run is never active — AC-4). Key
is per-entity ``("proc_blocking", pid)`` (mirror of ``("container_down",
name)``) so ``AlertState`` / cooldown work per process. The detail is
actionable (RU): truncated cmdline + PID + age (s) + accumulated CPU (s).
"""
sigs: list = []
for rec in candidates or []:
try:
pid = rec.get("pid")
age_s = rec.get("age_s")
if pid is None or age_s is None:
continue
cmdline = (rec.get("cmdline") or "").strip()
frag = cmdline[:_PROC_CMDLINE_MAX]
if len(cmdline) > _PROC_CMDLINE_MAX:
frag += ""
detail = (
f"PID {pid} живёт {int(age_s)}s "
f"(порог {int(cfg.proc_age_s)}s): {frag}"
)
cpu_s = rec.get("cpu_s")
if cpu_s is not None:
detail += f" · CPU {int(cpu_s)}s"
sigs.append(
Signal(
key=("proc_blocking", pid),
active=age_s > cfg.proc_age_s,
title="Блокирующий процесс",
detail=detail,
cooldown_s=cfg.proc_cooldown_s,
)
)
except Exception as e: # noqa: BLE001 - one bad record, others still build
logger.warning("watchdog: proc signal build error: %s", e)
return sigs
def dep_signals(reachability: dict) -> list:
"""Build per-dependency down signals from ``{name: reachable}``. Pure."""
sigs: list = []
for name, reachable in reachability.items():
sigs.append(
Signal(
key=("dep_down", name),
active=not reachable,
title="Зависимость недоступна",
detail=f"зависимость {name} не отвечает",
)
)
return sigs
def orch_down_signal(consecutive_failures: int, cfg: Config, error: str | None) -> Signal:
"""The master ``orchestrator_down`` signal (FR-3).
Active once ``/metrics`` has failed ``orch_down_ticks`` times in a row — a
single transient hiccup does not flap. The text explicitly notes that the
in-process guards (disk / reaper / reconciler) are dead too, so the operator
knows to check the host directly (D6).
"""
active = consecutive_failures >= cfg.orch_down_ticks
return Signal(
key="orch_down",
active=active,
title="Орк не отвечает",
detail=(
f"GET /metrics не отвечает {consecutive_failures} тик(ов) подряд "
f"(порог {cfg.orch_down_ticks}): {error or 'недоступен'}. "
f"In-process стражи (disk/reaper/reconciler) тоже мертвы — проверьте "
f"хост (вкл. диск) и контейнер orchestrator."
),
)