Files
orchestrator/watchdog/core.py
claude-bot 259b507906 feat(watchdog): sidecar-watchdog F1b — monitoring brain in a separate container (ORCH-100)
Add the `watchdog/` package (thin Python-3.12 stdlib-only daemon) and the
`orchestrator-watchdog` compose service — the brain half of the domain-0
observability pair. F1a (ORCH-099) exposes GET /metrics raw signal; F1b reads it,
augments with host / container / dependency probes, runs each signal through a
generalised pure decision function (decide(signal_active, prev, now, cooldown),
a strict superset of disk_watchdog.decide_action) with per-signal in-memory
dedup/throttle/recovery, and alerts over its OWN independent Telegram channel.

Key properties (ADR-001):
- Observer separated from observed: separate container; /metrics not answering is
  itself the master `orch_down` alarm (debounced K ticks — no flap on a hiccup).
- Strictly read-only: docker.sock GET-only + mounted :ro (double guard), host
  paths :ro, no DB/disk writes, no process control — self-hosting-safe.
- never-raise on three levels (per-source/per-tick/per-send) + WATCHDOG_ENABLED
  kill-switch (disabled -> inert idle-loop, not exit).
- Disk anti-duplicate (D6): disk_watchdog (ORCH-063) stays sole owner of the 85%
  alert; sidecar carries orch_down + an opt-in 97% ceiling (default off).
- NO import from src/** (C-1); src/**, STAGE_TRANSITIONS, QG_CHECKS, check_*, DB
  schema — untouched. env_file optional so a missing .env.watchdog never breaks
  `docker compose up` for the prod orchestrator.

Tests: tests/watchdog/ (TC-01…TC-13) + full tests/ regression green (TC-14).
Docs: CHANGELOG, .env.example canon (WATCHDOG_*); architecture README + adr-0033
authored at the architecture stage.

Refs: ORCH-100

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 09:36:02 +03:00

184 lines
7.6 KiB
Python

"""The sidecar tick orchestration: collect -> evaluate -> decide -> dispatch (D3).
The ``Watchdog`` owns the cross-tick state the sidecar is responsible for:
* ``_states`` — per signal_key :class:`AlertState` (anti-spam / recovery);
* ``_agents`` — per run_id :class:`AgentSample` (cpu_ticks, generated_at);
* ``_failed`` — last seen ``queue.counts.failed`` (job_failed edge);
* ``_orch_fail`` — consecutive ``/metrics`` failures (orch_down debounce).
All collection is wrapped per-source and the whole ``tick`` is wrapped per-tick
(never-raise, D8). ``now_provider`` is injectable for deterministic tests.
"""
from __future__ import annotations
import logging
import time
from . import decision
from .collectors import containers as containers_mod
from .collectors import deps as deps_mod
from .collectors import host as host_mod
from .collectors import orch as orch_mod
from .config import Config
from .notify import Notifier
from . import signals as signals_mod
logger = logging.getLogger("watchdog.core")
class Watchdog:
"""Stateful observer: one ``tick`` collects every source and dispatches alerts."""
def __init__(
self,
cfg: Config,
notifier: Notifier | None = None,
docker: containers_mod.DockerSockReader | None = None,
now_provider=None,
):
self.cfg = cfg
self._now = now_provider or time.time
self._notifier = notifier or Notifier(
cfg.tg_bot_token, cfg.tg_chat_id, cfg.http_timeout_s
)
self._docker = docker or containers_mod.DockerSockReader(
cfg.docker_sock, cfg.http_timeout_s
)
# cross-tick state owned by the sidecar
self._states: dict[object, decision.AlertState] = {}
self._agents: dict[object, signals_mod.AgentSample] = {}
self._failed: int | None = None
self._orch_fail: int = 0
self.last_run_ts: float | None = None
# -- collection (each source guarded; per-source never-raise) ---------
def _collect_orch(self) -> orch_mod.FetchResult:
try:
return orch_mod.fetch_metrics(self.cfg.metrics_url, self.cfg.http_timeout_s)
except Exception as e: # noqa: BLE001 - treat as down, never crash the tick
logger.warning("watchdog: orch collect error: %s", e)
return orch_mod.FetchResult(ok=False, error=str(e))
def _collect_host_mem(self) -> float | None:
try:
return host_mod.read_mem_used_pct()
except Exception as e: # noqa: BLE001
logger.warning("watchdog: host mem collect error: %s", e)
return None
def _collect_disk(self) -> tuple | None:
if not self.cfg.disk_crit_enabled:
return None
try:
return host_mod.max_disk_used_pct(self.cfg.disk_paths)
except Exception as e: # noqa: BLE001
logger.warning("watchdog: disk collect error: %s", e)
return None
def _collect_containers(self) -> dict:
out: dict[str, str] = {}
for name in self.cfg.containers:
try:
inspect = self._docker.inspect(name)
out[name] = containers_mod.classify_container(inspect)
except Exception as e: # noqa: BLE001 - one container degrades, others continue
logger.warning("watchdog: container %s collect error: %s", name, e)
out[name] = "unknown"
return out
def _collect_deps(self) -> dict:
try:
return deps_mod.ping_all(self.cfg.deps, self.cfg.http_timeout_s)
except Exception as e: # noqa: BLE001
logger.warning("watchdog: deps collect error: %s", e)
return {}
# -- one tick ---------------------------------------------------------
def tick(self) -> list:
"""Run one full pass; returns the dispatched ``(action, Signal)`` list.
Per-source collection is independently guarded so a broken source (ork
down / docker unreachable / dep timeout) degrades ONE signal and the rest
of the tick still runs (D8). The orchestrator being down is itself the
``orchestrator_down`` signal, not a failed tick (FR-3).
"""
now = self._now()
built: list[signals_mod.Signal] = []
# 1) orchestrator /metrics (+ orch_down debounce)
fetch = self._collect_orch()
if fetch.ok and fetch.envelope is not None:
self._orch_fail = 0
ev = signals_mod.eval_envelope(
fetch.envelope, self.cfg, self._agents, self._failed
)
self._agents = ev.agent_samples
self._failed = ev.failed_count
built.extend(ev.signals)
else:
self._orch_fail += 1
built.append(
signals_mod.orch_down_signal(self._orch_fail, self.cfg, fetch.error)
)
# 2) host memory + opt-in disk ceiling
built.extend(
signals_mod.host_signals(
self.cfg, self._collect_host_mem(), self._collect_disk()
)
)
# 3) containers (read-only docker.sock)
built.extend(signals_mod.container_signals(self.cfg, self._collect_containers()))
# 4) external dependency pings
built.extend(signals_mod.dep_signals(self._collect_deps()))
dispatched = self._dispatch(built, now)
self.last_run_ts = now
return dispatched
# -- decision + dispatch ----------------------------------------------
def _dispatch(self, built: list, now: float) -> list:
"""Run each signal through ``decide`` and send alert/realert/recovery."""
results: list = []
for sig in built:
try:
cooldown = sig.cooldown_s if sig.cooldown_s is not None else self.cfg.cooldown_s
if sig.edge:
# Edge signals (job_failed) fire on each new occurrence and
# keep no sustained state: a fresh empty prev -> ALERT iff active.
prev = decision.AlertState()
else:
prev = self._states.get(sig.key) or decision.AlertState()
action = decision.decide(sig.active, prev, now, cooldown)
if action in (decision.ACTION_ALERT, decision.ACTION_REALERT):
self._send(self._format(sig, action))
if not sig.edge:
self._states[sig.key] = decision.AlertState(
alerting=True, last_alert_at=now
)
elif action == decision.ACTION_RECOVERY:
self._send(self._format(sig, action))
self._states[sig.key] = decision.AlertState(
alerting=False, last_alert_at=None
)
results.append((action, sig))
except Exception as e: # noqa: BLE001 - one signal degrades, others dispatch
logger.warning("watchdog: dispatch error for %s: %s", sig.key, e)
return results
@staticmethod
def _format(sig: signals_mod.Signal, action: str) -> str:
if action == decision.ACTION_RECOVERY:
return f"\U0001f7e2 {sig.title}: восстановление. {sig.detail}"
prefix = "\U0001f534" if action == decision.ACTION_ALERT else "\U0001f501"
return f"{prefix} {sig.title}: {sig.detail}"
def _send(self, text: str) -> None:
"""Best-effort dispatch through the sidecar's own channel. never-raise."""
try:
self._notifier.send(text)
except Exception as e: # noqa: BLE001 - per-send never-raise (D8)
logger.warning("watchdog: send failed: %s", e)