"""The sidecar tick orchestration: collect -> evaluate -> decide -> dispatch (D3). The ``Watchdog`` owns the cross-tick state the sidecar is responsible for: * ``_states`` — per signal_key :class:`AlertState` (anti-spam / recovery); * ``_agents`` — per run_id :class:`AgentSample` (cpu_ticks, generated_at); * ``_failed`` — last seen ``queue.counts.failed`` (job_failed edge); * ``_orch_fail`` — consecutive ``/metrics`` failures (orch_down debounce). All collection is wrapped per-source and the whole ``tick`` is wrapped per-tick (never-raise, D8). ``now_provider`` is injectable for deterministic tests. """ from __future__ import annotations import logging import time from . import decision from .collectors import containers as containers_mod from .collectors import deps as deps_mod from .collectors import host as host_mod from .collectors import orch as orch_mod from .config import Config from .notify import Notifier from . import signals as signals_mod logger = logging.getLogger("watchdog.core") class Watchdog: """Stateful observer: one ``tick`` collects every source and dispatches alerts.""" def __init__( self, cfg: Config, notifier: Notifier | None = None, docker: containers_mod.DockerSockReader | None = None, now_provider=None, ): self.cfg = cfg self._now = now_provider or time.time self._notifier = notifier or Notifier( cfg.tg_bot_token, cfg.tg_chat_id, cfg.http_timeout_s ) self._docker = docker or containers_mod.DockerSockReader( cfg.docker_sock, cfg.http_timeout_s ) # cross-tick state owned by the sidecar self._states: dict[object, decision.AlertState] = {} self._agents: dict[object, signals_mod.AgentSample] = {} self._failed: int | None = None self._orch_fail: int = 0 self.last_run_ts: float | None = None # -- collection (each source guarded; per-source never-raise) --------- def _collect_orch(self) -> orch_mod.FetchResult: try: return orch_mod.fetch_metrics(self.cfg.metrics_url, self.cfg.http_timeout_s) except Exception as e: # noqa: BLE001 - treat as down, never crash the tick logger.warning("watchdog: orch collect error: %s", e) return orch_mod.FetchResult(ok=False, error=str(e)) def _collect_host_mem(self) -> float | None: try: return host_mod.read_mem_used_pct() except Exception as e: # noqa: BLE001 logger.warning("watchdog: host mem collect error: %s", e) return None def _collect_disk(self) -> tuple | None: if not self.cfg.disk_crit_enabled: return None try: return host_mod.max_disk_used_pct(self.cfg.disk_paths) except Exception as e: # noqa: BLE001 logger.warning("watchdog: disk collect error: %s", e) return None def _collect_containers(self) -> dict: out: dict[str, str] = {} for name in self.cfg.containers: try: inspect = self._docker.inspect(name) out[name] = containers_mod.classify_container(inspect) except Exception as e: # noqa: BLE001 - one container degrades, others continue logger.warning("watchdog: container %s collect error: %s", name, e) out[name] = "unknown" return out def _collect_deps(self) -> dict: try: return deps_mod.ping_all(self.cfg.deps, self.cfg.http_timeout_s) except Exception as e: # noqa: BLE001 logger.warning("watchdog: deps collect error: %s", e) return {} # -- one tick --------------------------------------------------------- def tick(self) -> list: """Run one full pass; returns the dispatched ``(action, Signal)`` list. Per-source collection is independently guarded so a broken source (ork down / docker unreachable / dep timeout) degrades ONE signal and the rest of the tick still runs (D8). The orchestrator being down is itself the ``orchestrator_down`` signal, not a failed tick (FR-3). """ now = self._now() built: list[signals_mod.Signal] = [] # 1) orchestrator /metrics (+ orch_down debounce) fetch = self._collect_orch() if fetch.ok and fetch.envelope is not None: self._orch_fail = 0 ev = signals_mod.eval_envelope( fetch.envelope, self.cfg, self._agents, self._failed ) self._agents = ev.agent_samples self._failed = ev.failed_count built.extend(ev.signals) else: self._orch_fail += 1 built.append( signals_mod.orch_down_signal(self._orch_fail, self.cfg, fetch.error) ) # 2) host memory + opt-in disk ceiling built.extend( signals_mod.host_signals( self.cfg, self._collect_host_mem(), self._collect_disk() ) ) # 3) containers (read-only docker.sock) built.extend(signals_mod.container_signals(self.cfg, self._collect_containers())) # 4) external dependency pings built.extend(signals_mod.dep_signals(self._collect_deps())) dispatched = self._dispatch(built, now) self.last_run_ts = now return dispatched # -- decision + dispatch ---------------------------------------------- def _dispatch(self, built: list, now: float) -> list: """Run each signal through ``decide`` and send alert/realert/recovery.""" results: list = [] for sig in built: try: cooldown = sig.cooldown_s if sig.cooldown_s is not None else self.cfg.cooldown_s if sig.edge: # Edge signals (job_failed) fire on each new occurrence and # keep no sustained state: a fresh empty prev -> ALERT iff active. prev = decision.AlertState() else: prev = self._states.get(sig.key) or decision.AlertState() action = decision.decide(sig.active, prev, now, cooldown) if action in (decision.ACTION_ALERT, decision.ACTION_REALERT): self._send(self._format(sig, action)) if not sig.edge: self._states[sig.key] = decision.AlertState( alerting=True, last_alert_at=now ) elif action == decision.ACTION_RECOVERY: self._send(self._format(sig, action)) self._states[sig.key] = decision.AlertState( alerting=False, last_alert_at=None ) results.append((action, sig)) except Exception as e: # noqa: BLE001 - one signal degrades, others dispatch logger.warning("watchdog: dispatch error for %s: %s", sig.key, e) return results @staticmethod def _format(sig: signals_mod.Signal, action: str) -> str: if action == decision.ACTION_RECOVERY: return f"\U0001f7e2 {sig.title}: восстановление. {sig.detail}" prefix = "\U0001f534" if action == decision.ACTION_ALERT else "\U0001f501" return f"{prefix} {sig.title}: {sig.detail}" def _send(self, text: str) -> None: """Best-effort dispatch through the sidecar's own channel. never-raise.""" try: self._notifier.send(text) except Exception as e: # noqa: BLE001 - per-send never-raise (D8) logger.warning("watchdog: send failed: %s", e)