Add the `watchdog/` package (thin Python-3.12 stdlib-only daemon) and the `orchestrator-watchdog` compose service — the brain half of the domain-0 observability pair. F1a (ORCH-099) exposes GET /metrics raw signal; F1b reads it, augments with host / container / dependency probes, runs each signal through a generalised pure decision function (decide(signal_active, prev, now, cooldown), a strict superset of disk_watchdog.decide_action) with per-signal in-memory dedup/throttle/recovery, and alerts over its OWN independent Telegram channel. Key properties (ADR-001): - Observer separated from observed: separate container; /metrics not answering is itself the master `orch_down` alarm (debounced K ticks — no flap on a hiccup). - Strictly read-only: docker.sock GET-only + mounted :ro (double guard), host paths :ro, no DB/disk writes, no process control — self-hosting-safe. - never-raise on three levels (per-source/per-tick/per-send) + WATCHDOG_ENABLED kill-switch (disabled -> inert idle-loop, not exit). - Disk anti-duplicate (D6): disk_watchdog (ORCH-063) stays sole owner of the 85% alert; sidecar carries orch_down + an opt-in 97% ceiling (default off). - NO import from src/** (C-1); src/**, STAGE_TRANSITIONS, QG_CHECKS, check_*, DB schema — untouched. env_file optional so a missing .env.watchdog never breaks `docker compose up` for the prod orchestrator. Tests: tests/watchdog/ (TC-01…TC-13) + full tests/ regression green (TC-14). Docs: CHANGELOG, .env.example canon (WATCHDOG_*); architecture README + adr-0033 authored at the architecture stage. Refs: ORCH-100 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
160 lines
6.0 KiB
Python
160 lines
6.0 KiB
Python
"""Read ``WATCHDOG_*`` env into a frozen config (thresholds / intervals / tokens /
|
|
URLs / kill-switch), with safe defaults (D1/D8, FR-10).
|
|
|
|
Every parser is never-raise: a missing / malformed value degrades to its
|
|
documented default, the process never crashes on a bad env (the same spirit as
|
|
``disk_watchdog.parse_paths``). ``.env.example`` is the canon of the keys.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from dataclasses import dataclass, field
|
|
|
|
|
|
def _str(env: dict, key: str, default: str) -> str:
|
|
try:
|
|
v = env.get(key)
|
|
if v is None or not str(v).strip():
|
|
return default
|
|
return str(v).strip()
|
|
except Exception: # noqa: BLE001 - never break config on a bad env
|
|
return default
|
|
|
|
|
|
def _int(env: dict, key: str, default: int) -> int:
|
|
try:
|
|
v = env.get(key)
|
|
if v is None or not str(v).strip():
|
|
return default
|
|
return int(str(v).strip())
|
|
except Exception: # noqa: BLE001
|
|
return default
|
|
|
|
|
|
def _float(env: dict, key: str, default: float) -> float:
|
|
try:
|
|
v = env.get(key)
|
|
if v is None or not str(v).strip():
|
|
return default
|
|
return float(str(v).strip())
|
|
except Exception: # noqa: BLE001
|
|
return default
|
|
|
|
|
|
def _bool(env: dict, key: str, default: bool) -> bool:
|
|
try:
|
|
v = env.get(key)
|
|
if v is None or not str(v).strip():
|
|
return default
|
|
return str(v).strip().lower() in ("1", "true", "yes", "on")
|
|
except Exception: # noqa: BLE001
|
|
return default
|
|
|
|
|
|
def _csv(env: dict, key: str, default: list[str]) -> list[str]:
|
|
try:
|
|
v = env.get(key)
|
|
if v is None or not str(v).strip():
|
|
return list(default)
|
|
out = [p.strip() for p in str(v).split(",") if p.strip()]
|
|
return out or list(default)
|
|
except Exception: # noqa: BLE001
|
|
return list(default)
|
|
|
|
|
|
def _deps(env: dict, key: str) -> dict[str, str]:
|
|
"""Parse ``name=url,name=url`` dependency pings (FR-6). Empty -> no pings.
|
|
|
|
Default is empty (fail-safe: no hardcoded network), the canonical example
|
|
URLs live in ``.env.example`` so the operator opts in explicitly.
|
|
"""
|
|
out: dict[str, str] = {}
|
|
try:
|
|
raw = env.get(key)
|
|
if not raw or not str(raw).strip():
|
|
return out
|
|
for pair in str(raw).split(","):
|
|
pair = pair.strip()
|
|
if not pair or "=" not in pair:
|
|
continue
|
|
name, _, url = pair.partition("=")
|
|
name, url = name.strip(), url.strip()
|
|
if name and url:
|
|
out[name] = url
|
|
except Exception: # noqa: BLE001
|
|
return {}
|
|
return out
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Config:
|
|
"""Immutable sidecar config built from the environment (FR-10)."""
|
|
|
|
# -- lifecycle / loop -------------------------------------------------
|
|
enabled: bool = True
|
|
interval_s: float = 30.0
|
|
http_timeout_s: float = 5.0
|
|
cooldown_s: float = 1800.0 # re-alert throttle for sustained signals
|
|
|
|
# -- orchestrator /metrics -------------------------------------------
|
|
metrics_url: str = "http://127.0.0.1:8500/metrics"
|
|
orch_down_ticks: int = 3 # K consecutive failures before orch_down fires
|
|
|
|
# -- host -------------------------------------------------------------
|
|
mem_pct: float = 90.0
|
|
disk_paths: list[str] = field(default_factory=lambda: ["/repos", "/app/data"])
|
|
disk_crit_enabled: bool = False # opt-in independent disk ceiling (D6)
|
|
disk_crit_pct: float = 97.0
|
|
|
|
# -- agents / queue / stages (derived from the /metrics envelope) -----
|
|
agent_hung_min: float = 20.0 # minutes of runtime before "hung" is considered
|
|
agent_cpu_floor: float = 0.01 # CPU fraction below which a long agent is "hung"
|
|
stage_stuck_min: float = 120.0 # minutes a task may sit in one stage
|
|
queue_depth: int = 20
|
|
|
|
# -- containers (docker.sock, read-only) ------------------------------
|
|
containers: list[str] = field(default_factory=lambda: ["orchestrator"])
|
|
docker_sock: str = "/var/run/docker.sock"
|
|
|
|
# -- external dependencies -------------------------------------------
|
|
deps: dict[str, str] = field(default_factory=dict)
|
|
|
|
# -- independent Telegram transport ----------------------------------
|
|
tg_bot_token: str = ""
|
|
tg_chat_id: str = ""
|
|
|
|
# -- derived helpers --------------------------------------------------
|
|
@property
|
|
def agent_hung_s(self) -> float:
|
|
return self.agent_hung_min * 60.0
|
|
|
|
@property
|
|
def stage_stuck_s(self) -> float:
|
|
return self.stage_stuck_min * 60.0
|
|
|
|
@classmethod
|
|
def from_env(cls, env: dict | None = None) -> "Config":
|
|
"""Build a Config from ``env`` (defaults to ``os.environ``). never-raise."""
|
|
e = dict(os.environ if env is None else env)
|
|
return cls(
|
|
enabled=_bool(e, "WATCHDOG_ENABLED", True),
|
|
interval_s=_float(e, "WATCHDOG_INTERVAL_S", 30.0),
|
|
http_timeout_s=_float(e, "WATCHDOG_HTTP_TIMEOUT_S", 5.0),
|
|
cooldown_s=_float(e, "WATCHDOG_COOLDOWN_S", 1800.0),
|
|
metrics_url=_str(e, "WATCHDOG_METRICS_URL", "http://127.0.0.1:8500/metrics"),
|
|
orch_down_ticks=_int(e, "WATCHDOG_ORCH_DOWN_TICKS", 3),
|
|
mem_pct=_float(e, "WATCHDOG_MEM_PCT", 90.0),
|
|
disk_paths=_csv(e, "WATCHDOG_DISK_PATHS", ["/repos", "/app/data"]),
|
|
disk_crit_enabled=_bool(e, "WATCHDOG_DISK_CRIT_ENABLED", False),
|
|
disk_crit_pct=_float(e, "WATCHDOG_DISK_CRIT_PCT", 97.0),
|
|
agent_hung_min=_float(e, "WATCHDOG_AGENT_HUNG_MIN", 20.0),
|
|
agent_cpu_floor=_float(e, "WATCHDOG_AGENT_CPU_FLOOR", 0.01),
|
|
stage_stuck_min=_float(e, "WATCHDOG_STAGE_STUCK_MIN", 120.0),
|
|
queue_depth=_int(e, "WATCHDOG_QUEUE_DEPTH", 20),
|
|
containers=_csv(e, "WATCHDOG_CONTAINERS", ["orchestrator"]),
|
|
docker_sock=_str(e, "WATCHDOG_DOCKER_SOCK", "/var/run/docker.sock"),
|
|
deps=_deps(e, "WATCHDOG_DEPS"),
|
|
tg_bot_token=_str(e, "WATCHDOG_TG_BOT_TOKEN", ""),
|
|
tg_chat_id=_str(e, "WATCHDOG_TG_CHAT_ID", ""),
|
|
)
|