"""Pure signal builders: turn collected raw inputs into ``Signal`` objects (D5). A ``Signal`` is ``(key, active, title, detail, edge)``. ``key`` identifies the signal for per-key anti-spam state: a scalar (``"orch_down"``, ``"host_mem"``) or a tuple for per-entity signals (``("agent_hung", run_id)``, ``("container_down", name)``, ``("stage_stuck", work_item)``, ``("dep_down", name)``). These builders are PURE — given the envelope / host readings / prev-sample state they return signals + the next sample state, with no I/O — so the whole decision surface is unit-testable without a container, a socket or a timer (TC-01…TC-11). """ from __future__ import annotations import logging from dataclasses import dataclass, field from .collectors import containers as containers_mod from .collectors import orch as orch_mod from .config import Config logger = logging.getLogger("watchdog.signals") @dataclass class Signal: """One evaluated signal heading into the decision function. ``edge`` marks event-style signals (e.g. ``job_failed``) that fire on each new occurrence and have no sustained "recovery": the dispatcher does not persist alerting state for them. """ key: object active: bool title: str detail: str edge: bool = False cooldown_s: float | None = None # per-signal override of the global cooldown @dataclass class AgentSample: """Previous ``(cpu_ticks, generated_at_epoch)`` for one running agent (D5).""" cpu_ticks: int generated_at: float @dataclass class EnvelopeEval: """Result of evaluating the ``/metrics`` envelope: signals + carried state.""" signals: list = field(default_factory=list) agent_samples: dict = field(default_factory=dict) # run_id -> AgentSample failed_count: int | None = None def _cpu_fraction( cur_ticks: int, cur_gen: float, prev: AgentSample, clk_tck: int, ) -> float | None: """CPU fraction of one agent across two ``/metrics`` polls (D5). ``frac = (Δticks / clk_tck) / Δseconds``. Returns ``None`` if the deltas are not usable (no wall-time elapsed, non-positive clk_tck) so a degenerate sample never produces a false "hung" verdict. """ try: dt = cur_gen - prev.generated_at if dt <= 0 or not clk_tck or clk_tck <= 0: return None cpu_seconds = (cur_ticks - prev.cpu_ticks) / clk_tck if cpu_seconds < 0: return None return cpu_seconds / dt except Exception as e: # noqa: BLE001 - degenerate sample, no verdict logger.warning("watchdog: cpu_fraction error: %s", e) return None def eval_envelope( envelope: dict, cfg: Config, prev_agents: dict, prev_failed: int | None, ) -> EnvelopeEval: """Derive agent_hung / stage_stuck / job_failed / queue_depth signals (D5). Pure: no I/O. ``prev_agents`` (run_id -> :class:`AgentSample`) and ``prev_failed`` carry the cross-tick state the sidecar owns; the returned :class:`EnvelopeEval` includes the NEXT state to persist. never-raise: a bad sub-section degrades that family of signals, the rest still evaluate. """ out = EnvelopeEval() if not isinstance(envelope, dict): out.agent_samples = dict(prev_agents) out.failed_count = prev_failed return out clk_tck = envelope.get("clk_tck") gen_at = orch_mod.parse_generated_at(envelope) # -- agent_hung (needs two polls; per run_id) ------------------------- new_samples: dict = {} try: for a in envelope.get("agents") or []: run_id = a.get("run_id") cpu_ticks = a.get("cpu_ticks") runtime_s = a.get("runtime_s") if run_id is None: continue if cpu_ticks is None or gen_at is None: # pid dead / non-Linux / no timestamp -> cannot judge; skip. continue new_samples[run_id] = AgentSample(int(cpu_ticks), gen_at) prev = prev_agents.get(run_id) if prev is None or not isinstance(clk_tck, int): continue frac = _cpu_fraction(int(cpu_ticks), gen_at, prev, clk_tck) if frac is None or runtime_s is None: continue hung = (runtime_s > cfg.agent_hung_s) and (frac < cfg.agent_cpu_floor) if hung: out.signals.append( Signal( key=("agent_hung", run_id), active=True, title="Агент завис", detail=( f"agent={a.get('agent')} run_id={run_id} " f"runtime={int(runtime_s)}s cpu={frac:.4f} " f"(< {cfg.agent_cpu_floor})" ), ) ) except Exception as e: # noqa: BLE001 - degrade agent family only logger.warning("watchdog: eval agents error: %s", e) out.agent_samples = new_samples # -- stage_stuck (per work_item) ------------------------------------- try: for s in envelope.get("stages") or []: age = s.get("age_in_stage_s") wi = s.get("work_item") if age is None or wi is None: continue if age > cfg.stage_stuck_s: out.signals.append( Signal( key=("stage_stuck", wi), active=True, title="Стадия застряла", detail=( f"{wi} в стадии {s.get('stage')} уже {int(age)}s " f"(порог {int(cfg.stage_stuck_s)}s)" ), ) ) except Exception as e: # noqa: BLE001 logger.warning("watchdog: eval stages error: %s", e) # -- queue depth + job_failed (edge) --------------------------------- failed_now: int | None = prev_failed try: queue = envelope.get("queue") or {} depth = queue.get("depth") if isinstance(depth, int) and depth >= cfg.queue_depth: out.signals.append( Signal( key="queue_depth", active=True, title="Очередь растёт", detail=f"глубина очереди {depth} (порог {cfg.queue_depth})", ) ) counts = queue.get("counts") or {} failed = counts.get("failed") if isinstance(failed, int): failed_now = failed if prev_failed is not None and failed > prev_failed: out.signals.append( Signal( key="job_failed", active=True, title="Job упал", detail=( f"failed-джобов стало {failed} " f"(было {prev_failed}, +{failed - prev_failed})" ), edge=True, ) ) except Exception as e: # noqa: BLE001 logger.warning("watchdog: eval queue error: %s", e) out.failed_count = failed_now return out def host_signals(cfg: Config, mem_pct: float | None, disk: tuple | None) -> list: """Build host memory + opt-in disk-ceiling signals (D5/D6). Pure.""" sigs: list = [] if mem_pct is not None: sigs.append( Signal( key="host_mem", active=mem_pct >= cfg.mem_pct, title="Память хоста", detail=f"память хоста {mem_pct}% (порог {cfg.mem_pct}%)", ) ) # Disk ceiling is OPT-IN (D6): disk_watchdog (ORCH-063) owns the 85% alert; # the sidecar only carries an independent HIGHER ceiling when explicitly # enabled, so there is no double-alert on the same fill event (FR-9/AC-5). if cfg.disk_crit_enabled and disk is not None: path, pct = disk sigs.append( Signal( key="host_disk_crit", active=pct >= cfg.disk_crit_pct, title="Диск (критический потолок)", detail=( f"диск {path} {pct}% (критический потолок {cfg.disk_crit_pct}%, " f"независимый канал sidecar)" ), ) ) return sigs def container_signals(cfg: Config, statuses: dict) -> list: """Build per-container down signals from ``{name: status}``. Pure.""" sigs: list = [] for name, status in statuses.items(): sigs.append( Signal( key=("container_down", name), active=containers_mod.container_alarm(status), title="Контейнер не в норме", detail=f"контейнер {name}: статус '{status}'", ) ) return sigs # Max cmdline length surfaced in an alert: truncate so a long arg list does not # leak random arguments into the Telegram channel (ADR-001 D4 / R-2). _PROC_CMDLINE_MAX = 120 def proc_signals(cfg: Config, candidates: list) -> list: """Build per-process ``proc_blocking`` signals from candidate records. Pure. Each candidate is ``{pid, cmdline, age_s, cpu_s?, start_ticks?}`` already filtered to the test-class by the collector (D4). The signal is ``active`` iff ``age_s > cfg.proc_age_s`` (the threshold is set above the max legitimate test-run budget, so an in-flight run is never active — AC-4). Key is per-entity ``("proc_blocking", pid)`` (mirror of ``("container_down", name)``) so ``AlertState`` / cooldown work per process. The detail is actionable (RU): truncated cmdline + PID + age (s) + accumulated CPU (s). """ sigs: list = [] for rec in candidates or []: try: pid = rec.get("pid") age_s = rec.get("age_s") if pid is None or age_s is None: continue cmdline = (rec.get("cmdline") or "").strip() frag = cmdline[:_PROC_CMDLINE_MAX] if len(cmdline) > _PROC_CMDLINE_MAX: frag += "…" detail = ( f"PID {pid} живёт {int(age_s)}s " f"(порог {int(cfg.proc_age_s)}s): {frag}" ) cpu_s = rec.get("cpu_s") if cpu_s is not None: detail += f" · CPU {int(cpu_s)}s" sigs.append( Signal( key=("proc_blocking", pid), active=age_s > cfg.proc_age_s, title="Блокирующий процесс", detail=detail, cooldown_s=cfg.proc_cooldown_s, ) ) except Exception as e: # noqa: BLE001 - one bad record, others still build logger.warning("watchdog: proc signal build error: %s", e) return sigs def dep_signals(reachability: dict) -> list: """Build per-dependency down signals from ``{name: reachable}``. Pure.""" sigs: list = [] for name, reachable in reachability.items(): sigs.append( Signal( key=("dep_down", name), active=not reachable, title="Зависимость недоступна", detail=f"зависимость {name} не отвечает", ) ) return sigs def orch_down_signal(consecutive_failures: int, cfg: Config, error: str | None) -> Signal: """The master ``orchestrator_down`` signal (FR-3). Active once ``/metrics`` has failed ``orch_down_ticks`` times in a row — a single transient hiccup does not flap. The text explicitly notes that the in-process guards (disk / reaper / reconciler) are dead too, so the operator knows to check the host directly (D6). """ active = consecutive_failures >= cfg.orch_down_ticks return Signal( key="orch_down", active=active, title="Орк не отвечает", detail=( f"GET /metrics не отвечает {consecutive_failures} тик(ов) подряд " f"(порог {cfg.orch_down_ticks}): {error or 'недоступен'}. " f"In-process стражи (disk/reaper/reconciler) тоже мертвы — проверьте " f"хост (вкл. диск) и контейнер orchestrator." ), )