Close the observability gap between agent_hung (only tracked jobs by jobs.pid)
and orphaned pytest subprocesses the orchestrator launches itself
(merge_gate.retest_branch / coverage_gate.measure_coverage). On a timeout-kill of
the agent (-9, ORCH-109) the grand-child pytest reparents onto tini and keeps
running for days, starving CPU and failing merge-gate re-test — with no alert.
Strictly inside the observer (watchdog/** + the watchdog compose service):
- watchdog/collectors/proc.py: stdlib-only /proc scan (under pid: host),
read-only, never-raise -> []; pure parsers split from I/O (tested on a fake
/proc tree). Never reads /proc/<pid>/environ.
- watchdog/signals.py: pure proc_signals builder, per-entity
("proc_blocking", pid), active iff age_s > proc_age_s; actionable RU detail.
- watchdog/core.py: opt-in tick block (gated on proc_enabled -> zero overhead /
byte-for-byte when off) + RECOVERY synthesis for a vanished process through the
existing decide()/AlertState (no new anti-spam logic).
- watchdog/config.py: WATCHDOG_PROC_{ENABLED(false),AGE_MIN(60),PATTERNS(pytest),
COOLDOWN_S(1800)}; default threshold > max(merge_retest_timeout_s=600,
coverage_run_timeout_s=900) so a legit in-flight run never crosses it.
- docker-compose.yml: pid: host on orchestrator-watchdog ONLY (read-only privilege).
Anti-false-positive and no overlap with agent_hung are by construction (cmdline
scope + age threshold), not fragile cross-namespace PID matching.
Canon synced: WATCHDOG_PROC_* in .env.watchdog.example <-> .env.example block;
documented in LITE_SETUP.md and docs/architecture/README.md (architect). src/**,
/metrics, schema_version, STAGE_TRANSITIONS, QG_CHECKS, check_*, machine-verdict
and the DB schema are untouched; deploy rebuilds only the sidecar, prod
orchestrator is not restarted (NFR-3).
Tests: tests/watchdog/test_proc_blocking_signal.py (TC-01..TC-06),
test_proc_collector.py (/proc parsing), test_tick_proc_blocking_integration.py
(TC-07), plus pid: host and proc-config assertions. Full pytest tests/ green (1930).
Refs: ORCH-111
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
240 lines
10 KiB
Python
240 lines
10 KiB
Python
"""The sidecar tick orchestration: collect -> evaluate -> decide -> dispatch (D3).
|
|
|
|
The ``Watchdog`` owns the cross-tick state the sidecar is responsible for:
|
|
* ``_states`` — per signal_key :class:`AlertState` (anti-spam / recovery);
|
|
* ``_agents`` — per run_id :class:`AgentSample` (cpu_ticks, generated_at);
|
|
* ``_failed`` — last seen ``queue.counts.failed`` (job_failed edge);
|
|
* ``_orch_fail`` — consecutive ``/metrics`` failures (orch_down debounce).
|
|
|
|
All collection is wrapped per-source and the whole ``tick`` is wrapped per-tick
|
|
(never-raise, D8). ``now_provider`` is injectable for deterministic tests.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
|
|
from . import decision
|
|
from .collectors import containers as containers_mod
|
|
from .collectors import deps as deps_mod
|
|
from .collectors import host as host_mod
|
|
from .collectors import orch as orch_mod
|
|
from .collectors import proc as proc_mod
|
|
from .config import Config
|
|
from .notify import Notifier
|
|
from . import signals as signals_mod
|
|
|
|
logger = logging.getLogger("watchdog.core")
|
|
|
|
|
|
class Watchdog:
|
|
"""Stateful observer: one ``tick`` collects every source and dispatches alerts."""
|
|
|
|
def __init__(
|
|
self,
|
|
cfg: Config,
|
|
notifier: Notifier | None = None,
|
|
docker: containers_mod.DockerSockReader | None = None,
|
|
now_provider=None,
|
|
):
|
|
self.cfg = cfg
|
|
self._now = now_provider or time.time
|
|
self._notifier = notifier or Notifier(
|
|
cfg.tg_bot_token, cfg.tg_chat_id, cfg.http_timeout_s
|
|
)
|
|
self._docker = docker or containers_mod.DockerSockReader(
|
|
cfg.docker_sock, cfg.http_timeout_s
|
|
)
|
|
# cross-tick state owned by the sidecar
|
|
self._states: dict[object, decision.AlertState] = {}
|
|
self._agents: dict[object, signals_mod.AgentSample] = {}
|
|
self._failed: int | None = None
|
|
self._orch_fail: int = 0
|
|
self.last_run_ts: float | None = None
|
|
|
|
# -- collection (each source guarded; per-source never-raise) ---------
|
|
def _collect_orch(self) -> orch_mod.FetchResult:
|
|
try:
|
|
return orch_mod.fetch_metrics(self.cfg.metrics_url, self.cfg.http_timeout_s)
|
|
except Exception as e: # noqa: BLE001 - treat as down, never crash the tick
|
|
logger.warning("watchdog: orch collect error: %s", e)
|
|
return orch_mod.FetchResult(ok=False, error=str(e))
|
|
|
|
def _collect_host_mem(self) -> float | None:
|
|
try:
|
|
return host_mod.read_mem_used_pct()
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("watchdog: host mem collect error: %s", e)
|
|
return None
|
|
|
|
def _collect_disk(self) -> tuple | None:
|
|
if not self.cfg.disk_crit_enabled:
|
|
return None
|
|
try:
|
|
return host_mod.max_disk_used_pct(self.cfg.disk_paths)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("watchdog: disk collect error: %s", e)
|
|
return None
|
|
|
|
def _collect_containers(self) -> dict:
|
|
out: dict[str, str] = {}
|
|
for name in self.cfg.containers:
|
|
try:
|
|
inspect = self._docker.inspect(name)
|
|
out[name] = containers_mod.classify_container(inspect)
|
|
except Exception as e: # noqa: BLE001 - one container degrades, others continue
|
|
logger.warning("watchdog: container %s collect error: %s", name, e)
|
|
out[name] = "unknown"
|
|
return out
|
|
|
|
def _collect_deps(self) -> dict:
|
|
try:
|
|
return deps_mod.ping_all(self.cfg.deps, self.cfg.http_timeout_s)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("watchdog: deps collect error: %s", e)
|
|
return {}
|
|
|
|
def _collect_proc(self, now: float) -> list:
|
|
# Opt-in: when WATCHDOG_PROC_ENABLED is false the scan is NOT called
|
|
# (gate mirrors _collect_disk on disk_crit_enabled) -> zero overhead and
|
|
# byte-for-byte tick behaviour as before ORCH-111 (D5 / AC-7).
|
|
if not self.cfg.proc_enabled:
|
|
return []
|
|
try:
|
|
return proc_mod.collect_candidates(self.cfg.proc_patterns, now=now)
|
|
except Exception as e: # noqa: BLE001 - never-raise: one signal skipped
|
|
logger.warning("watchdog: proc collect error: %s", e)
|
|
return []
|
|
|
|
# -- one tick ---------------------------------------------------------
|
|
def tick(self) -> list:
|
|
"""Run one full pass; returns the dispatched ``(action, Signal)`` list.
|
|
|
|
Per-source collection is independently guarded so a broken source (ork
|
|
down / docker unreachable / dep timeout) degrades ONE signal and the rest
|
|
of the tick still runs (D8). The orchestrator being down is itself the
|
|
``orchestrator_down`` signal, not a failed tick (FR-3).
|
|
"""
|
|
now = self._now()
|
|
built: list[signals_mod.Signal] = []
|
|
|
|
# 1) orchestrator /metrics (+ orch_down debounce)
|
|
fetch = self._collect_orch()
|
|
if fetch.ok and fetch.envelope is not None:
|
|
self._orch_fail = 0
|
|
ev = signals_mod.eval_envelope(
|
|
fetch.envelope, self.cfg, self._agents, self._failed
|
|
)
|
|
self._agents = ev.agent_samples
|
|
self._failed = ev.failed_count
|
|
built.extend(ev.signals)
|
|
else:
|
|
self._orch_fail += 1
|
|
built.append(
|
|
signals_mod.orch_down_signal(self._orch_fail, self.cfg, fetch.error)
|
|
)
|
|
|
|
# 2) host memory + opt-in disk ceiling
|
|
built.extend(
|
|
signals_mod.host_signals(
|
|
self.cfg, self._collect_host_mem(), self._collect_disk()
|
|
)
|
|
)
|
|
|
|
# 3) containers (read-only docker.sock)
|
|
built.extend(signals_mod.container_signals(self.cfg, self._collect_containers()))
|
|
|
|
# 4) external dependency pings
|
|
built.extend(signals_mod.dep_signals(self._collect_deps()))
|
|
|
|
# 5) long-lived blocking test/child processes (opt-in; pid: host /proc).
|
|
# Gated entirely on proc_enabled so a disabled sidecar is byte-for-byte
|
|
# as before ORCH-111 (D5/AC-7); RECOVERY for a vanished process is
|
|
# synthesised through the SAME decide()/AlertState machinery (D4).
|
|
if self.cfg.proc_enabled:
|
|
proc_sigs = signals_mod.proc_signals(self.cfg, self._collect_proc(now))
|
|
proc_sigs.extend(self._synthesize_proc_recoveries(proc_sigs))
|
|
built.extend(proc_sigs)
|
|
|
|
dispatched = self._dispatch(built, now)
|
|
self.last_run_ts = now
|
|
return dispatched
|
|
|
|
def _synthesize_proc_recoveries(self, current_sigs: list) -> list:
|
|
"""Synthesise an inactive ``Signal`` for every vanished proc_blocking key.
|
|
|
|
``proc_signals`` emits a signal ONLY for a currently observed candidate,
|
|
so a process that disappeared leaves an alerting :class:`AlertState` with
|
|
no fresh signal and would never recover. Reusing ``decide()``/
|
|
``AlertState`` (FR-5 — no separate anti-spam logic), we emit an
|
|
``active=False`` signal for each alerting ``("proc_blocking", …)`` key
|
|
absent from the current set -> ``decide`` yields exactly one RECOVERY and
|
|
clears the state. This is per-family bookkeeping, not new throttling.
|
|
"""
|
|
out: list = []
|
|
try:
|
|
current_keys = {s.key for s in current_sigs}
|
|
for key, state in list(self._states.items()):
|
|
if (
|
|
isinstance(key, tuple)
|
|
and key
|
|
and key[0] == "proc_blocking"
|
|
and state.alerting
|
|
and key not in current_keys
|
|
):
|
|
out.append(
|
|
signals_mod.Signal(
|
|
key=key,
|
|
active=False,
|
|
title="Блокирующий процесс",
|
|
detail=f"процесс PID {key[1]} завершился",
|
|
)
|
|
)
|
|
except Exception as e: # noqa: BLE001 - never-raise: skip recovery synthesis
|
|
logger.warning("watchdog: proc recovery synth error: %s", e)
|
|
return out
|
|
|
|
# -- decision + dispatch ----------------------------------------------
|
|
def _dispatch(self, built: list, now: float) -> list:
|
|
"""Run each signal through ``decide`` and send alert/realert/recovery."""
|
|
results: list = []
|
|
for sig in built:
|
|
try:
|
|
cooldown = sig.cooldown_s if sig.cooldown_s is not None else self.cfg.cooldown_s
|
|
if sig.edge:
|
|
# Edge signals (job_failed) fire on each new occurrence and
|
|
# keep no sustained state: a fresh empty prev -> ALERT iff active.
|
|
prev = decision.AlertState()
|
|
else:
|
|
prev = self._states.get(sig.key) or decision.AlertState()
|
|
action = decision.decide(sig.active, prev, now, cooldown)
|
|
if action in (decision.ACTION_ALERT, decision.ACTION_REALERT):
|
|
self._send(self._format(sig, action))
|
|
if not sig.edge:
|
|
self._states[sig.key] = decision.AlertState(
|
|
alerting=True, last_alert_at=now
|
|
)
|
|
elif action == decision.ACTION_RECOVERY:
|
|
self._send(self._format(sig, action))
|
|
self._states[sig.key] = decision.AlertState(
|
|
alerting=False, last_alert_at=None
|
|
)
|
|
results.append((action, sig))
|
|
except Exception as e: # noqa: BLE001 - one signal degrades, others dispatch
|
|
logger.warning("watchdog: dispatch error for %s: %s", sig.key, e)
|
|
return results
|
|
|
|
@staticmethod
|
|
def _format(sig: signals_mod.Signal, action: str) -> str:
|
|
if action == decision.ACTION_RECOVERY:
|
|
return f"\U0001f7e2 {sig.title}: восстановление. {sig.detail}"
|
|
prefix = "\U0001f534" if action == decision.ACTION_ALERT else "\U0001f501"
|
|
return f"{prefix} {sig.title}: {sig.detail}"
|
|
|
|
def _send(self, text: str) -> None:
|
|
"""Best-effort dispatch through the sidecar's own channel. never-raise."""
|
|
try:
|
|
self._notifier.send(text)
|
|
except Exception as e: # noqa: BLE001 - per-send never-raise (D8)
|
|
logger.warning("watchdog: send failed: %s", e)
|