Files
orchestrator/watchdog/signals.py
claude-bot 259b507906 feat(watchdog): sidecar-watchdog F1b — monitoring brain in a separate container (ORCH-100)
Add the `watchdog/` package (thin Python-3.12 stdlib-only daemon) and the
`orchestrator-watchdog` compose service — the brain half of the domain-0
observability pair. F1a (ORCH-099) exposes GET /metrics raw signal; F1b reads it,
augments with host / container / dependency probes, runs each signal through a
generalised pure decision function (decide(signal_active, prev, now, cooldown),
a strict superset of disk_watchdog.decide_action) with per-signal in-memory
dedup/throttle/recovery, and alerts over its OWN independent Telegram channel.

Key properties (ADR-001):
- Observer separated from observed: separate container; /metrics not answering is
  itself the master `orch_down` alarm (debounced K ticks — no flap on a hiccup).
- Strictly read-only: docker.sock GET-only + mounted :ro (double guard), host
  paths :ro, no DB/disk writes, no process control — self-hosting-safe.
- never-raise on three levels (per-source/per-tick/per-send) + WATCHDOG_ENABLED
  kill-switch (disabled -> inert idle-loop, not exit).
- Disk anti-duplicate (D6): disk_watchdog (ORCH-063) stays sole owner of the 85%
  alert; sidecar carries orch_down + an opt-in 97% ceiling (default off).
- NO import from src/** (C-1); src/**, STAGE_TRANSITIONS, QG_CHECKS, check_*, DB
  schema — untouched. env_file optional so a missing .env.watchdog never breaks
  `docker compose up` for the prod orchestrator.

Tests: tests/watchdog/ (TC-01…TC-13) + full tests/ regression green (TC-14).
Docs: CHANGELOG, .env.example canon (WATCHDOG_*); architecture README + adr-0033
authored at the architecture stage.

Refs: ORCH-100

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 09:36:02 +03:00

284 lines
10 KiB
Python

"""Pure signal builders: turn collected raw inputs into ``Signal`` objects (D5).
A ``Signal`` is ``(key, active, title, detail, edge)``. ``key`` identifies the
signal for per-key anti-spam state: a scalar (``"orch_down"``, ``"host_mem"``)
or a tuple for per-entity signals (``("agent_hung", run_id)``,
``("container_down", name)``, ``("stage_stuck", work_item)``,
``("dep_down", name)``).
These builders are PURE — given the envelope / host readings / prev-sample state
they return signals + the next sample state, with no I/O — so the whole decision
surface is unit-testable without a container, a socket or a timer (TC-01…TC-11).
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from .collectors import containers as containers_mod
from .collectors import orch as orch_mod
from .config import Config
logger = logging.getLogger("watchdog.signals")
@dataclass
class Signal:
"""One evaluated signal heading into the decision function.
``edge`` marks event-style signals (e.g. ``job_failed``) that fire on each
new occurrence and have no sustained "recovery": the dispatcher does not
persist alerting state for them.
"""
key: object
active: bool
title: str
detail: str
edge: bool = False
cooldown_s: float | None = None # per-signal override of the global cooldown
@dataclass
class AgentSample:
"""Previous ``(cpu_ticks, generated_at_epoch)`` for one running agent (D5)."""
cpu_ticks: int
generated_at: float
@dataclass
class EnvelopeEval:
"""Result of evaluating the ``/metrics`` envelope: signals + carried state."""
signals: list = field(default_factory=list)
agent_samples: dict = field(default_factory=dict) # run_id -> AgentSample
failed_count: int | None = None
def _cpu_fraction(
cur_ticks: int,
cur_gen: float,
prev: AgentSample,
clk_tck: int,
) -> float | None:
"""CPU fraction of one agent across two ``/metrics`` polls (D5).
``frac = (Δticks / clk_tck) / Δseconds``. Returns ``None`` if the deltas are
not usable (no wall-time elapsed, non-positive clk_tck) so a degenerate
sample never produces a false "hung" verdict.
"""
try:
dt = cur_gen - prev.generated_at
if dt <= 0 or not clk_tck or clk_tck <= 0:
return None
cpu_seconds = (cur_ticks - prev.cpu_ticks) / clk_tck
if cpu_seconds < 0:
return None
return cpu_seconds / dt
except Exception as e: # noqa: BLE001 - degenerate sample, no verdict
logger.warning("watchdog: cpu_fraction error: %s", e)
return None
def eval_envelope(
envelope: dict,
cfg: Config,
prev_agents: dict,
prev_failed: int | None,
) -> EnvelopeEval:
"""Derive agent_hung / stage_stuck / job_failed / queue_depth signals (D5).
Pure: no I/O. ``prev_agents`` (run_id -> :class:`AgentSample`) and
``prev_failed`` carry the cross-tick state the sidecar owns; the returned
:class:`EnvelopeEval` includes the NEXT state to persist. never-raise: a bad
sub-section degrades that family of signals, the rest still evaluate.
"""
out = EnvelopeEval()
if not isinstance(envelope, dict):
out.agent_samples = dict(prev_agents)
out.failed_count = prev_failed
return out
clk_tck = envelope.get("clk_tck")
gen_at = orch_mod.parse_generated_at(envelope)
# -- agent_hung (needs two polls; per run_id) -------------------------
new_samples: dict = {}
try:
for a in envelope.get("agents") or []:
run_id = a.get("run_id")
cpu_ticks = a.get("cpu_ticks")
runtime_s = a.get("runtime_s")
if run_id is None:
continue
if cpu_ticks is None or gen_at is None:
# pid dead / non-Linux / no timestamp -> cannot judge; skip.
continue
new_samples[run_id] = AgentSample(int(cpu_ticks), gen_at)
prev = prev_agents.get(run_id)
if prev is None or not isinstance(clk_tck, int):
continue
frac = _cpu_fraction(int(cpu_ticks), gen_at, prev, clk_tck)
if frac is None or runtime_s is None:
continue
hung = (runtime_s > cfg.agent_hung_s) and (frac < cfg.agent_cpu_floor)
if hung:
out.signals.append(
Signal(
key=("agent_hung", run_id),
active=True,
title="Агент завис",
detail=(
f"agent={a.get('agent')} run_id={run_id} "
f"runtime={int(runtime_s)}s cpu={frac:.4f} "
f"(< {cfg.agent_cpu_floor})"
),
)
)
except Exception as e: # noqa: BLE001 - degrade agent family only
logger.warning("watchdog: eval agents error: %s", e)
out.agent_samples = new_samples
# -- stage_stuck (per work_item) -------------------------------------
try:
for s in envelope.get("stages") or []:
age = s.get("age_in_stage_s")
wi = s.get("work_item")
if age is None or wi is None:
continue
if age > cfg.stage_stuck_s:
out.signals.append(
Signal(
key=("stage_stuck", wi),
active=True,
title="Стадия застряла",
detail=(
f"{wi} в стадии {s.get('stage')} уже {int(age)}s "
f"(порог {int(cfg.stage_stuck_s)}s)"
),
)
)
except Exception as e: # noqa: BLE001
logger.warning("watchdog: eval stages error: %s", e)
# -- queue depth + job_failed (edge) ---------------------------------
failed_now: int | None = prev_failed
try:
queue = envelope.get("queue") or {}
depth = queue.get("depth")
if isinstance(depth, int) and depth >= cfg.queue_depth:
out.signals.append(
Signal(
key="queue_depth",
active=True,
title="Очередь растёт",
detail=f"глубина очереди {depth} (порог {cfg.queue_depth})",
)
)
counts = queue.get("counts") or {}
failed = counts.get("failed")
if isinstance(failed, int):
failed_now = failed
if prev_failed is not None and failed > prev_failed:
out.signals.append(
Signal(
key="job_failed",
active=True,
title="Job упал",
detail=(
f"failed-джобов стало {failed} "
f"(было {prev_failed}, +{failed - prev_failed})"
),
edge=True,
)
)
except Exception as e: # noqa: BLE001
logger.warning("watchdog: eval queue error: %s", e)
out.failed_count = failed_now
return out
def host_signals(cfg: Config, mem_pct: float | None, disk: tuple | None) -> list:
"""Build host memory + opt-in disk-ceiling signals (D5/D6). Pure."""
sigs: list = []
if mem_pct is not None:
sigs.append(
Signal(
key="host_mem",
active=mem_pct >= cfg.mem_pct,
title="Память хоста",
detail=f"память хоста {mem_pct}% (порог {cfg.mem_pct}%)",
)
)
# Disk ceiling is OPT-IN (D6): disk_watchdog (ORCH-063) owns the 85% alert;
# the sidecar only carries an independent HIGHER ceiling when explicitly
# enabled, so there is no double-alert on the same fill event (FR-9/AC-5).
if cfg.disk_crit_enabled and disk is not None:
path, pct = disk
sigs.append(
Signal(
key="host_disk_crit",
active=pct >= cfg.disk_crit_pct,
title="Диск (критический потолок)",
detail=(
f"диск {path} {pct}% (критический потолок {cfg.disk_crit_pct}%, "
f"независимый канал sidecar)"
),
)
)
return sigs
def container_signals(cfg: Config, statuses: dict) -> list:
"""Build per-container down signals from ``{name: status}``. Pure."""
sigs: list = []
for name, status in statuses.items():
sigs.append(
Signal(
key=("container_down", name),
active=containers_mod.container_alarm(status),
title="Контейнер не в норме",
detail=f"контейнер {name}: статус '{status}'",
)
)
return sigs
def dep_signals(reachability: dict) -> list:
"""Build per-dependency down signals from ``{name: reachable}``. Pure."""
sigs: list = []
for name, reachable in reachability.items():
sigs.append(
Signal(
key=("dep_down", name),
active=not reachable,
title="Зависимость недоступна",
detail=f"зависимость {name} не отвечает",
)
)
return sigs
def orch_down_signal(consecutive_failures: int, cfg: Config, error: str | None) -> Signal:
"""The master ``orchestrator_down`` signal (FR-3).
Active once ``/metrics`` has failed ``orch_down_ticks`` times in a row — a
single transient hiccup does not flap. The text explicitly notes that the
in-process guards (disk / reaper / reconciler) are dead too, so the operator
knows to check the host directly (D6).
"""
active = consecutive_failures >= cfg.orch_down_ticks
return Signal(
key="orch_down",
active=active,
title="Орк не отвечает",
detail=(
f"GET /metrics не отвечает {consecutive_failures} тик(ов) подряд "
f"(порог {cfg.orch_down_ticks}): {error or 'недоступен'}. "
f"In-process стражи (disk/reaper/reconciler) тоже мертвы — проверьте "
f"хост (вкл. диск) и контейнер orchestrator."
),
)