Add the `watchdog/` package (thin Python-3.12 stdlib-only daemon) and the `orchestrator-watchdog` compose service — the brain half of the domain-0 observability pair. F1a (ORCH-099) exposes GET /metrics raw signal; F1b reads it, augments with host / container / dependency probes, runs each signal through a generalised pure decision function (decide(signal_active, prev, now, cooldown), a strict superset of disk_watchdog.decide_action) with per-signal in-memory dedup/throttle/recovery, and alerts over its OWN independent Telegram channel. Key properties (ADR-001): - Observer separated from observed: separate container; /metrics not answering is itself the master `orch_down` alarm (debounced K ticks — no flap on a hiccup). - Strictly read-only: docker.sock GET-only + mounted :ro (double guard), host paths :ro, no DB/disk writes, no process control — self-hosting-safe. - never-raise on three levels (per-source/per-tick/per-send) + WATCHDOG_ENABLED kill-switch (disabled -> inert idle-loop, not exit). - Disk anti-duplicate (D6): disk_watchdog (ORCH-063) stays sole owner of the 85% alert; sidecar carries orch_down + an opt-in 97% ceiling (default off). - NO import from src/** (C-1); src/**, STAGE_TRANSITIONS, QG_CHECKS, check_*, DB schema — untouched. env_file optional so a missing .env.watchdog never breaks `docker compose up` for the prod orchestrator. Tests: tests/watchdog/ (TC-01…TC-13) + full tests/ regression green (TC-14). Docs: CHANGELOG, .env.example canon (WATCHDOG_*); architecture README + adr-0033 authored at the architecture stage. Refs: ORCH-100 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
184 lines
7.6 KiB
Python
184 lines
7.6 KiB
Python
"""The sidecar tick orchestration: collect -> evaluate -> decide -> dispatch (D3).
|
|
|
|
The ``Watchdog`` owns the cross-tick state the sidecar is responsible for:
|
|
* ``_states`` — per signal_key :class:`AlertState` (anti-spam / recovery);
|
|
* ``_agents`` — per run_id :class:`AgentSample` (cpu_ticks, generated_at);
|
|
* ``_failed`` — last seen ``queue.counts.failed`` (job_failed edge);
|
|
* ``_orch_fail`` — consecutive ``/metrics`` failures (orch_down debounce).
|
|
|
|
All collection is wrapped per-source and the whole ``tick`` is wrapped per-tick
|
|
(never-raise, D8). ``now_provider`` is injectable for deterministic tests.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
|
|
from . import decision
|
|
from .collectors import containers as containers_mod
|
|
from .collectors import deps as deps_mod
|
|
from .collectors import host as host_mod
|
|
from .collectors import orch as orch_mod
|
|
from .config import Config
|
|
from .notify import Notifier
|
|
from . import signals as signals_mod
|
|
|
|
logger = logging.getLogger("watchdog.core")
|
|
|
|
|
|
class Watchdog:
|
|
"""Stateful observer: one ``tick`` collects every source and dispatches alerts."""
|
|
|
|
def __init__(
|
|
self,
|
|
cfg: Config,
|
|
notifier: Notifier | None = None,
|
|
docker: containers_mod.DockerSockReader | None = None,
|
|
now_provider=None,
|
|
):
|
|
self.cfg = cfg
|
|
self._now = now_provider or time.time
|
|
self._notifier = notifier or Notifier(
|
|
cfg.tg_bot_token, cfg.tg_chat_id, cfg.http_timeout_s
|
|
)
|
|
self._docker = docker or containers_mod.DockerSockReader(
|
|
cfg.docker_sock, cfg.http_timeout_s
|
|
)
|
|
# cross-tick state owned by the sidecar
|
|
self._states: dict[object, decision.AlertState] = {}
|
|
self._agents: dict[object, signals_mod.AgentSample] = {}
|
|
self._failed: int | None = None
|
|
self._orch_fail: int = 0
|
|
self.last_run_ts: float | None = None
|
|
|
|
# -- collection (each source guarded; per-source never-raise) ---------
|
|
def _collect_orch(self) -> orch_mod.FetchResult:
|
|
try:
|
|
return orch_mod.fetch_metrics(self.cfg.metrics_url, self.cfg.http_timeout_s)
|
|
except Exception as e: # noqa: BLE001 - treat as down, never crash the tick
|
|
logger.warning("watchdog: orch collect error: %s", e)
|
|
return orch_mod.FetchResult(ok=False, error=str(e))
|
|
|
|
def _collect_host_mem(self) -> float | None:
|
|
try:
|
|
return host_mod.read_mem_used_pct()
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("watchdog: host mem collect error: %s", e)
|
|
return None
|
|
|
|
def _collect_disk(self) -> tuple | None:
|
|
if not self.cfg.disk_crit_enabled:
|
|
return None
|
|
try:
|
|
return host_mod.max_disk_used_pct(self.cfg.disk_paths)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("watchdog: disk collect error: %s", e)
|
|
return None
|
|
|
|
def _collect_containers(self) -> dict:
|
|
out: dict[str, str] = {}
|
|
for name in self.cfg.containers:
|
|
try:
|
|
inspect = self._docker.inspect(name)
|
|
out[name] = containers_mod.classify_container(inspect)
|
|
except Exception as e: # noqa: BLE001 - one container degrades, others continue
|
|
logger.warning("watchdog: container %s collect error: %s", name, e)
|
|
out[name] = "unknown"
|
|
return out
|
|
|
|
def _collect_deps(self) -> dict:
|
|
try:
|
|
return deps_mod.ping_all(self.cfg.deps, self.cfg.http_timeout_s)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("watchdog: deps collect error: %s", e)
|
|
return {}
|
|
|
|
# -- one tick ---------------------------------------------------------
|
|
def tick(self) -> list:
|
|
"""Run one full pass; returns the dispatched ``(action, Signal)`` list.
|
|
|
|
Per-source collection is independently guarded so a broken source (ork
|
|
down / docker unreachable / dep timeout) degrades ONE signal and the rest
|
|
of the tick still runs (D8). The orchestrator being down is itself the
|
|
``orchestrator_down`` signal, not a failed tick (FR-3).
|
|
"""
|
|
now = self._now()
|
|
built: list[signals_mod.Signal] = []
|
|
|
|
# 1) orchestrator /metrics (+ orch_down debounce)
|
|
fetch = self._collect_orch()
|
|
if fetch.ok and fetch.envelope is not None:
|
|
self._orch_fail = 0
|
|
ev = signals_mod.eval_envelope(
|
|
fetch.envelope, self.cfg, self._agents, self._failed
|
|
)
|
|
self._agents = ev.agent_samples
|
|
self._failed = ev.failed_count
|
|
built.extend(ev.signals)
|
|
else:
|
|
self._orch_fail += 1
|
|
built.append(
|
|
signals_mod.orch_down_signal(self._orch_fail, self.cfg, fetch.error)
|
|
)
|
|
|
|
# 2) host memory + opt-in disk ceiling
|
|
built.extend(
|
|
signals_mod.host_signals(
|
|
self.cfg, self._collect_host_mem(), self._collect_disk()
|
|
)
|
|
)
|
|
|
|
# 3) containers (read-only docker.sock)
|
|
built.extend(signals_mod.container_signals(self.cfg, self._collect_containers()))
|
|
|
|
# 4) external dependency pings
|
|
built.extend(signals_mod.dep_signals(self._collect_deps()))
|
|
|
|
dispatched = self._dispatch(built, now)
|
|
self.last_run_ts = now
|
|
return dispatched
|
|
|
|
# -- decision + dispatch ----------------------------------------------
|
|
def _dispatch(self, built: list, now: float) -> list:
|
|
"""Run each signal through ``decide`` and send alert/realert/recovery."""
|
|
results: list = []
|
|
for sig in built:
|
|
try:
|
|
cooldown = sig.cooldown_s if sig.cooldown_s is not None else self.cfg.cooldown_s
|
|
if sig.edge:
|
|
# Edge signals (job_failed) fire on each new occurrence and
|
|
# keep no sustained state: a fresh empty prev -> ALERT iff active.
|
|
prev = decision.AlertState()
|
|
else:
|
|
prev = self._states.get(sig.key) or decision.AlertState()
|
|
action = decision.decide(sig.active, prev, now, cooldown)
|
|
if action in (decision.ACTION_ALERT, decision.ACTION_REALERT):
|
|
self._send(self._format(sig, action))
|
|
if not sig.edge:
|
|
self._states[sig.key] = decision.AlertState(
|
|
alerting=True, last_alert_at=now
|
|
)
|
|
elif action == decision.ACTION_RECOVERY:
|
|
self._send(self._format(sig, action))
|
|
self._states[sig.key] = decision.AlertState(
|
|
alerting=False, last_alert_at=None
|
|
)
|
|
results.append((action, sig))
|
|
except Exception as e: # noqa: BLE001 - one signal degrades, others dispatch
|
|
logger.warning("watchdog: dispatch error for %s: %s", sig.key, e)
|
|
return results
|
|
|
|
@staticmethod
|
|
def _format(sig: signals_mod.Signal, action: str) -> str:
|
|
if action == decision.ACTION_RECOVERY:
|
|
return f"\U0001f7e2 {sig.title}: восстановление. {sig.detail}"
|
|
prefix = "\U0001f534" if action == decision.ACTION_ALERT else "\U0001f501"
|
|
return f"{prefix} {sig.title}: {sig.detail}"
|
|
|
|
def _send(self, text: str) -> None:
|
|
"""Best-effort dispatch through the sidecar's own channel. never-raise."""
|
|
try:
|
|
self._notifier.send(text)
|
|
except Exception as e: # noqa: BLE001 - per-send never-raise (D8)
|
|
logger.warning("watchdog: send failed: %s", e)
|