"""Read ``WATCHDOG_*`` env into a frozen config (thresholds / intervals / tokens / URLs / kill-switch), with safe defaults (D1/D8, FR-10). Every parser is never-raise: a missing / malformed value degrades to its documented default, the process never crashes on a bad env (the same spirit as ``disk_watchdog.parse_paths``). ``.env.example`` is the canon of the keys. """ from __future__ import annotations import os from dataclasses import dataclass, field def _str(env: dict, key: str, default: str) -> str: try: v = env.get(key) if v is None or not str(v).strip(): return default return str(v).strip() except Exception: # noqa: BLE001 - never break config on a bad env return default def _int(env: dict, key: str, default: int) -> int: try: v = env.get(key) if v is None or not str(v).strip(): return default return int(str(v).strip()) except Exception: # noqa: BLE001 return default def _float(env: dict, key: str, default: float) -> float: try: v = env.get(key) if v is None or not str(v).strip(): return default return float(str(v).strip()) except Exception: # noqa: BLE001 return default def _bool(env: dict, key: str, default: bool) -> bool: try: v = env.get(key) if v is None or not str(v).strip(): return default return str(v).strip().lower() in ("1", "true", "yes", "on") except Exception: # noqa: BLE001 return default def _csv(env: dict, key: str, default: list[str]) -> list[str]: try: v = env.get(key) if v is None or not str(v).strip(): return list(default) out = [p.strip() for p in str(v).split(",") if p.strip()] return out or list(default) except Exception: # noqa: BLE001 return list(default) def _deps(env: dict, key: str) -> dict[str, str]: """Parse ``name=url,name=url`` dependency pings (FR-6). Empty -> no pings. Default is empty (fail-safe: no hardcoded network), the canonical example URLs live in ``.env.example`` so the operator opts in explicitly. """ out: dict[str, str] = {} try: raw = env.get(key) if not raw or not str(raw).strip(): return out for pair in str(raw).split(","): pair = pair.strip() if not pair or "=" not in pair: continue name, _, url = pair.partition("=") name, url = name.strip(), url.strip() if name and url: out[name] = url except Exception: # noqa: BLE001 return {} return out @dataclass(frozen=True) class Config: """Immutable sidecar config built from the environment (FR-10).""" # -- lifecycle / loop ------------------------------------------------- enabled: bool = True interval_s: float = 30.0 http_timeout_s: float = 5.0 cooldown_s: float = 1800.0 # re-alert throttle for sustained signals # -- orchestrator /metrics ------------------------------------------- metrics_url: str = "http://127.0.0.1:8500/metrics" orch_down_ticks: int = 3 # K consecutive failures before orch_down fires # -- host ------------------------------------------------------------- mem_pct: float = 90.0 disk_paths: list[str] = field(default_factory=lambda: ["/repos", "/app/data"]) disk_crit_enabled: bool = False # opt-in independent disk ceiling (D6) disk_crit_pct: float = 97.0 # -- agents / queue / stages (derived from the /metrics envelope) ----- agent_hung_min: float = 20.0 # minutes of runtime before "hung" is considered agent_cpu_floor: float = 0.01 # CPU fraction below which a long agent is "hung" stage_stuck_min: float = 120.0 # minutes a task may sit in one stage queue_depth: int = 20 # -- containers (docker.sock, read-only) ------------------------------ containers: list[str] = field(default_factory=lambda: ["orchestrator"]) docker_sock: str = "/var/run/docker.sock" # -- external dependencies ------------------------------------------- deps: dict[str, str] = field(default_factory=dict) # -- independent Telegram transport ---------------------------------- tg_bot_token: str = "" tg_chat_id: str = "" # -- derived helpers -------------------------------------------------- @property def agent_hung_s(self) -> float: return self.agent_hung_min * 60.0 @property def stage_stuck_s(self) -> float: return self.stage_stuck_min * 60.0 @classmethod def from_env(cls, env: dict | None = None) -> "Config": """Build a Config from ``env`` (defaults to ``os.environ``). never-raise.""" e = dict(os.environ if env is None else env) return cls( enabled=_bool(e, "WATCHDOG_ENABLED", True), interval_s=_float(e, "WATCHDOG_INTERVAL_S", 30.0), http_timeout_s=_float(e, "WATCHDOG_HTTP_TIMEOUT_S", 5.0), cooldown_s=_float(e, "WATCHDOG_COOLDOWN_S", 1800.0), metrics_url=_str(e, "WATCHDOG_METRICS_URL", "http://127.0.0.1:8500/metrics"), orch_down_ticks=_int(e, "WATCHDOG_ORCH_DOWN_TICKS", 3), mem_pct=_float(e, "WATCHDOG_MEM_PCT", 90.0), disk_paths=_csv(e, "WATCHDOG_DISK_PATHS", ["/repos", "/app/data"]), disk_crit_enabled=_bool(e, "WATCHDOG_DISK_CRIT_ENABLED", False), disk_crit_pct=_float(e, "WATCHDOG_DISK_CRIT_PCT", 97.0), agent_hung_min=_float(e, "WATCHDOG_AGENT_HUNG_MIN", 20.0), agent_cpu_floor=_float(e, "WATCHDOG_AGENT_CPU_FLOOR", 0.01), stage_stuck_min=_float(e, "WATCHDOG_STAGE_STUCK_MIN", 120.0), queue_depth=_int(e, "WATCHDOG_QUEUE_DEPTH", 20), containers=_csv(e, "WATCHDOG_CONTAINERS", ["orchestrator"]), docker_sock=_str(e, "WATCHDOG_DOCKER_SOCK", "/var/run/docker.sock"), deps=_deps(e, "WATCHDOG_DEPS"), tg_bot_token=_str(e, "WATCHDOG_TG_BOT_TOKEN", ""), tg_chat_id=_str(e, "WATCHDOG_TG_CHAT_ID", ""), )