Add the `watchdog/` package (thin Python-3.12 stdlib-only daemon) and the `orchestrator-watchdog` compose service — the brain half of the domain-0 observability pair. F1a (ORCH-099) exposes GET /metrics raw signal; F1b reads it, augments with host / container / dependency probes, runs each signal through a generalised pure decision function (decide(signal_active, prev, now, cooldown), a strict superset of disk_watchdog.decide_action) with per-signal in-memory dedup/throttle/recovery, and alerts over its OWN independent Telegram channel. Key properties (ADR-001): - Observer separated from observed: separate container; /metrics not answering is itself the master `orch_down` alarm (debounced K ticks — no flap on a hiccup). - Strictly read-only: docker.sock GET-only + mounted :ro (double guard), host paths :ro, no DB/disk writes, no process control — self-hosting-safe. - never-raise on three levels (per-source/per-tick/per-send) + WATCHDOG_ENABLED kill-switch (disabled -> inert idle-loop, not exit). - Disk anti-duplicate (D6): disk_watchdog (ORCH-063) stays sole owner of the 85% alert; sidecar carries orch_down + an opt-in 97% ceiling (default off). - NO import from src/** (C-1); src/**, STAGE_TRANSITIONS, QG_CHECKS, check_*, DB schema — untouched. env_file optional so a missing .env.watchdog never breaks `docker compose up` for the prod orchestrator. Tests: tests/watchdog/ (TC-01…TC-13) + full tests/ regression green (TC-14). Docs: CHANGELOG, .env.example canon (WATCHDOG_*); architecture README + adr-0033 authored at the architecture stage. Refs: ORCH-100 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
284 lines
10 KiB
Python
284 lines
10 KiB
Python
"""Pure signal builders: turn collected raw inputs into ``Signal`` objects (D5).
|
|
|
|
A ``Signal`` is ``(key, active, title, detail, edge)``. ``key`` identifies the
|
|
signal for per-key anti-spam state: a scalar (``"orch_down"``, ``"host_mem"``)
|
|
or a tuple for per-entity signals (``("agent_hung", run_id)``,
|
|
``("container_down", name)``, ``("stage_stuck", work_item)``,
|
|
``("dep_down", name)``).
|
|
|
|
These builders are PURE — given the envelope / host readings / prev-sample state
|
|
they return signals + the next sample state, with no I/O — so the whole decision
|
|
surface is unit-testable without a container, a socket or a timer (TC-01…TC-11).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
|
|
from .collectors import containers as containers_mod
|
|
from .collectors import orch as orch_mod
|
|
from .config import Config
|
|
|
|
logger = logging.getLogger("watchdog.signals")
|
|
|
|
|
|
@dataclass
|
|
class Signal:
|
|
"""One evaluated signal heading into the decision function.
|
|
|
|
``edge`` marks event-style signals (e.g. ``job_failed``) that fire on each
|
|
new occurrence and have no sustained "recovery": the dispatcher does not
|
|
persist alerting state for them.
|
|
"""
|
|
|
|
key: object
|
|
active: bool
|
|
title: str
|
|
detail: str
|
|
edge: bool = False
|
|
cooldown_s: float | None = None # per-signal override of the global cooldown
|
|
|
|
|
|
@dataclass
|
|
class AgentSample:
|
|
"""Previous ``(cpu_ticks, generated_at_epoch)`` for one running agent (D5)."""
|
|
|
|
cpu_ticks: int
|
|
generated_at: float
|
|
|
|
|
|
@dataclass
|
|
class EnvelopeEval:
|
|
"""Result of evaluating the ``/metrics`` envelope: signals + carried state."""
|
|
|
|
signals: list = field(default_factory=list)
|
|
agent_samples: dict = field(default_factory=dict) # run_id -> AgentSample
|
|
failed_count: int | None = None
|
|
|
|
|
|
def _cpu_fraction(
|
|
cur_ticks: int,
|
|
cur_gen: float,
|
|
prev: AgentSample,
|
|
clk_tck: int,
|
|
) -> float | None:
|
|
"""CPU fraction of one agent across two ``/metrics`` polls (D5).
|
|
|
|
``frac = (Δticks / clk_tck) / Δseconds``. Returns ``None`` if the deltas are
|
|
not usable (no wall-time elapsed, non-positive clk_tck) so a degenerate
|
|
sample never produces a false "hung" verdict.
|
|
"""
|
|
try:
|
|
dt = cur_gen - prev.generated_at
|
|
if dt <= 0 or not clk_tck or clk_tck <= 0:
|
|
return None
|
|
cpu_seconds = (cur_ticks - prev.cpu_ticks) / clk_tck
|
|
if cpu_seconds < 0:
|
|
return None
|
|
return cpu_seconds / dt
|
|
except Exception as e: # noqa: BLE001 - degenerate sample, no verdict
|
|
logger.warning("watchdog: cpu_fraction error: %s", e)
|
|
return None
|
|
|
|
|
|
def eval_envelope(
|
|
envelope: dict,
|
|
cfg: Config,
|
|
prev_agents: dict,
|
|
prev_failed: int | None,
|
|
) -> EnvelopeEval:
|
|
"""Derive agent_hung / stage_stuck / job_failed / queue_depth signals (D5).
|
|
|
|
Pure: no I/O. ``prev_agents`` (run_id -> :class:`AgentSample`) and
|
|
``prev_failed`` carry the cross-tick state the sidecar owns; the returned
|
|
:class:`EnvelopeEval` includes the NEXT state to persist. never-raise: a bad
|
|
sub-section degrades that family of signals, the rest still evaluate.
|
|
"""
|
|
out = EnvelopeEval()
|
|
if not isinstance(envelope, dict):
|
|
out.agent_samples = dict(prev_agents)
|
|
out.failed_count = prev_failed
|
|
return out
|
|
|
|
clk_tck = envelope.get("clk_tck")
|
|
gen_at = orch_mod.parse_generated_at(envelope)
|
|
|
|
# -- agent_hung (needs two polls; per run_id) -------------------------
|
|
new_samples: dict = {}
|
|
try:
|
|
for a in envelope.get("agents") or []:
|
|
run_id = a.get("run_id")
|
|
cpu_ticks = a.get("cpu_ticks")
|
|
runtime_s = a.get("runtime_s")
|
|
if run_id is None:
|
|
continue
|
|
if cpu_ticks is None or gen_at is None:
|
|
# pid dead / non-Linux / no timestamp -> cannot judge; skip.
|
|
continue
|
|
new_samples[run_id] = AgentSample(int(cpu_ticks), gen_at)
|
|
prev = prev_agents.get(run_id)
|
|
if prev is None or not isinstance(clk_tck, int):
|
|
continue
|
|
frac = _cpu_fraction(int(cpu_ticks), gen_at, prev, clk_tck)
|
|
if frac is None or runtime_s is None:
|
|
continue
|
|
hung = (runtime_s > cfg.agent_hung_s) and (frac < cfg.agent_cpu_floor)
|
|
if hung:
|
|
out.signals.append(
|
|
Signal(
|
|
key=("agent_hung", run_id),
|
|
active=True,
|
|
title="Агент завис",
|
|
detail=(
|
|
f"agent={a.get('agent')} run_id={run_id} "
|
|
f"runtime={int(runtime_s)}s cpu={frac:.4f} "
|
|
f"(< {cfg.agent_cpu_floor})"
|
|
),
|
|
)
|
|
)
|
|
except Exception as e: # noqa: BLE001 - degrade agent family only
|
|
logger.warning("watchdog: eval agents error: %s", e)
|
|
out.agent_samples = new_samples
|
|
|
|
# -- stage_stuck (per work_item) -------------------------------------
|
|
try:
|
|
for s in envelope.get("stages") or []:
|
|
age = s.get("age_in_stage_s")
|
|
wi = s.get("work_item")
|
|
if age is None or wi is None:
|
|
continue
|
|
if age > cfg.stage_stuck_s:
|
|
out.signals.append(
|
|
Signal(
|
|
key=("stage_stuck", wi),
|
|
active=True,
|
|
title="Стадия застряла",
|
|
detail=(
|
|
f"{wi} в стадии {s.get('stage')} уже {int(age)}s "
|
|
f"(порог {int(cfg.stage_stuck_s)}s)"
|
|
),
|
|
)
|
|
)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("watchdog: eval stages error: %s", e)
|
|
|
|
# -- queue depth + job_failed (edge) ---------------------------------
|
|
failed_now: int | None = prev_failed
|
|
try:
|
|
queue = envelope.get("queue") or {}
|
|
depth = queue.get("depth")
|
|
if isinstance(depth, int) and depth >= cfg.queue_depth:
|
|
out.signals.append(
|
|
Signal(
|
|
key="queue_depth",
|
|
active=True,
|
|
title="Очередь растёт",
|
|
detail=f"глубина очереди {depth} (порог {cfg.queue_depth})",
|
|
)
|
|
)
|
|
counts = queue.get("counts") or {}
|
|
failed = counts.get("failed")
|
|
if isinstance(failed, int):
|
|
failed_now = failed
|
|
if prev_failed is not None and failed > prev_failed:
|
|
out.signals.append(
|
|
Signal(
|
|
key="job_failed",
|
|
active=True,
|
|
title="Job упал",
|
|
detail=(
|
|
f"failed-джобов стало {failed} "
|
|
f"(было {prev_failed}, +{failed - prev_failed})"
|
|
),
|
|
edge=True,
|
|
)
|
|
)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("watchdog: eval queue error: %s", e)
|
|
out.failed_count = failed_now
|
|
|
|
return out
|
|
|
|
|
|
def host_signals(cfg: Config, mem_pct: float | None, disk: tuple | None) -> list:
|
|
"""Build host memory + opt-in disk-ceiling signals (D5/D6). Pure."""
|
|
sigs: list = []
|
|
if mem_pct is not None:
|
|
sigs.append(
|
|
Signal(
|
|
key="host_mem",
|
|
active=mem_pct >= cfg.mem_pct,
|
|
title="Память хоста",
|
|
detail=f"память хоста {mem_pct}% (порог {cfg.mem_pct}%)",
|
|
)
|
|
)
|
|
# Disk ceiling is OPT-IN (D6): disk_watchdog (ORCH-063) owns the 85% alert;
|
|
# the sidecar only carries an independent HIGHER ceiling when explicitly
|
|
# enabled, so there is no double-alert on the same fill event (FR-9/AC-5).
|
|
if cfg.disk_crit_enabled and disk is not None:
|
|
path, pct = disk
|
|
sigs.append(
|
|
Signal(
|
|
key="host_disk_crit",
|
|
active=pct >= cfg.disk_crit_pct,
|
|
title="Диск (критический потолок)",
|
|
detail=(
|
|
f"диск {path} {pct}% (критический потолок {cfg.disk_crit_pct}%, "
|
|
f"независимый канал sidecar)"
|
|
),
|
|
)
|
|
)
|
|
return sigs
|
|
|
|
|
|
def container_signals(cfg: Config, statuses: dict) -> list:
|
|
"""Build per-container down signals from ``{name: status}``. Pure."""
|
|
sigs: list = []
|
|
for name, status in statuses.items():
|
|
sigs.append(
|
|
Signal(
|
|
key=("container_down", name),
|
|
active=containers_mod.container_alarm(status),
|
|
title="Контейнер не в норме",
|
|
detail=f"контейнер {name}: статус '{status}'",
|
|
)
|
|
)
|
|
return sigs
|
|
|
|
|
|
def dep_signals(reachability: dict) -> list:
|
|
"""Build per-dependency down signals from ``{name: reachable}``. Pure."""
|
|
sigs: list = []
|
|
for name, reachable in reachability.items():
|
|
sigs.append(
|
|
Signal(
|
|
key=("dep_down", name),
|
|
active=not reachable,
|
|
title="Зависимость недоступна",
|
|
detail=f"зависимость {name} не отвечает",
|
|
)
|
|
)
|
|
return sigs
|
|
|
|
|
|
def orch_down_signal(consecutive_failures: int, cfg: Config, error: str | None) -> Signal:
|
|
"""The master ``orchestrator_down`` signal (FR-3).
|
|
|
|
Active once ``/metrics`` has failed ``orch_down_ticks`` times in a row — a
|
|
single transient hiccup does not flap. The text explicitly notes that the
|
|
in-process guards (disk / reaper / reconciler) are dead too, so the operator
|
|
knows to check the host directly (D6).
|
|
"""
|
|
active = consecutive_failures >= cfg.orch_down_ticks
|
|
return Signal(
|
|
key="orch_down",
|
|
active=active,
|
|
title="Орк не отвечает",
|
|
detail=(
|
|
f"GET /metrics не отвечает {consecutive_failures} тик(ов) подряд "
|
|
f"(порог {cfg.orch_down_ticks}): {error or 'недоступен'}. "
|
|
f"In-process стражи (disk/reaper/reconciler) тоже мертвы — проверьте "
|
|
f"хост (вкл. диск) и контейнер orchestrator."
|
|
),
|
|
)
|