feat(watchdog): sidecar-watchdog F1b — monitoring brain in a separate container (ORCH-100)
All checks were successful
CI / test (push) Successful in 52s
CI / test (pull_request) Successful in 47s

Add the `watchdog/` package (thin Python-3.12 stdlib-only daemon) and the
`orchestrator-watchdog` compose service — the brain half of the domain-0
observability pair. F1a (ORCH-099) exposes GET /metrics raw signal; F1b reads it,
augments with host / container / dependency probes, runs each signal through a
generalised pure decision function (decide(signal_active, prev, now, cooldown),
a strict superset of disk_watchdog.decide_action) with per-signal in-memory
dedup/throttle/recovery, and alerts over its OWN independent Telegram channel.

Key properties (ADR-001):
- Observer separated from observed: separate container; /metrics not answering is
  itself the master `orch_down` alarm (debounced K ticks — no flap on a hiccup).
- Strictly read-only: docker.sock GET-only + mounted :ro (double guard), host
  paths :ro, no DB/disk writes, no process control — self-hosting-safe.
- never-raise on three levels (per-source/per-tick/per-send) + WATCHDOG_ENABLED
  kill-switch (disabled -> inert idle-loop, not exit).
- Disk anti-duplicate (D6): disk_watchdog (ORCH-063) stays sole owner of the 85%
  alert; sidecar carries orch_down + an opt-in 97% ceiling (default off).
- NO import from src/** (C-1); src/**, STAGE_TRANSITIONS, QG_CHECKS, check_*, DB
  schema — untouched. env_file optional so a missing .env.watchdog never breaks
  `docker compose up` for the prod orchestrator.

Tests: tests/watchdog/ (TC-01…TC-13) + full tests/ regression green (TC-14).
Docs: CHANGELOG, .env.example canon (WATCHDOG_*); architecture README + adr-0033
authored at the architecture stage.

Refs: ORCH-100

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-10 09:05:01 +03:00
parent 70243f9c74
commit 330113a993
31 changed files with 2261 additions and 2 deletions

View File

View File

@@ -0,0 +1,46 @@
"""Shared helpers/fixtures for the watchdog (ORCH-100, F1b) test suite.
A tiny urllib-style fake opener so HTTP collectors / Telegram transport never
touch the network (test plan §scope: all collectors/transport are mocked).
"""
from __future__ import annotations
import io
import urllib.error
class FakeResponse:
"""Context-manager response mimicking ``urllib`` ``addinfourl``."""
def __init__(self, status: int = 200, body: bytes = b"{}"):
self.status = status
self._body = body
def getcode(self):
return self.status
def read(self):
return self._body
def __enter__(self):
return self
def __exit__(self, *a):
return False
def make_opener(*, status=200, body=b"{}", exc=None):
"""Build a fake ``urlopen`` that returns a body or raises ``exc``."""
def _opener(req, timeout=None):
if exc is not None:
raise exc
return FakeResponse(status=status, body=body)
return _opener
def http_error(code: int) -> urllib.error.HTTPError:
return urllib.error.HTTPError(
url="http://x", code=code, msg="err", hdrs=None, fp=io.BytesIO(b"")
)

View File

@@ -0,0 +1,66 @@
"""TC-12: compose invariant — orchestrator-watchdog is a separate service.
It declares its own build (watchdog/Dockerfile), restart policy, mem_limit, and
mounts docker.sock read-only (:ro). Parses the real docker-compose.yml.
"""
import pathlib
import yaml
REPO_ROOT = pathlib.Path(__file__).resolve().parents[2]
def _compose():
with open(REPO_ROOT / "docker-compose.yml") as f:
return yaml.safe_load(f)
def test_watchdog_service_declared():
svc = _compose()["services"]
assert "orchestrator-watchdog" in svc
def test_watchdog_builds_from_watchdog_dockerfile():
wd = _compose()["services"]["orchestrator-watchdog"]
build = wd["build"]
assert isinstance(build, dict)
assert build["dockerfile"] == "watchdog/Dockerfile"
assert build["context"] == "."
def test_watchdog_has_restart_and_mem_limit():
wd = _compose()["services"]["orchestrator-watchdog"]
assert wd["restart"] == "unless-stopped"
assert wd["mem_limit"] == "128m" # thin stack, not Grafana/Prometheus
def test_docker_sock_mounted_read_only():
wd = _compose()["services"]["orchestrator-watchdog"]
sock = [v for v in wd["volumes"] if "docker.sock" in v]
assert sock, "docker.sock must be mounted"
assert all(v.endswith(":ro") for v in sock), "docker.sock must be :ro"
def test_host_paths_mounted_read_only():
wd = _compose()["services"]["orchestrator-watchdog"]
# Every bind mount the watchdog uses is read-only (it only reads).
for v in wd["volumes"]:
assert v.endswith(":ro"), f"watchdog mount must be :ro: {v}"
def test_env_file_is_optional():
# A missing .env.watchdog must not break `docker compose up` (self-hosting).
wd = _compose()["services"]["orchestrator-watchdog"]
env_file = wd["env_file"]
assert isinstance(env_file, list)
assert env_file[0]["required"] is False
def test_watchdog_dockerfile_exists_and_is_stdlib_only():
df = REPO_ROOT / "watchdog" / "Dockerfile"
assert df.exists()
text = df.read_text()
# No pip install of third-party deps (stdlib-only, D1).
assert "pip install" not in text
assert "COPY requirements" not in text
assert "requirements.txt" not in text

View File

@@ -0,0 +1,69 @@
"""TC-07: kill-switch + env-driven config (no hardcoded thresholds).
``WATCHDOG_ENABLED=false`` -> the daemon is inert (idle, no ticks). Thresholds /
intervals / timeouts come from env, not constants.
"""
from watchdog.config import Config
def test_killswitch_off_is_inert(monkeypatch):
from watchdog import __main__ as entry
cfg = Config.from_env({"WATCHDOG_ENABLED": "false", "WATCHDOG_INTERVAL_S": "0"})
assert cfg.enabled is False
built = {"n": 0}
class _Dog:
def tick(self):
built["n"] += 1
# If run() ever constructed a Watchdog / ticked while disabled, this would fire.
monkeypatch.setattr(entry, "Watchdog", lambda c: _Dog())
monkeypatch.setattr(entry.time, "sleep", lambda *_: None)
entry.run(cfg=cfg, max_ticks=3)
assert built["n"] == 0 # inert: never ticked
def test_thresholds_read_from_env():
cfg = Config.from_env(
{
"WATCHDOG_INTERVAL_S": "7",
"WATCHDOG_MEM_PCT": "77",
"WATCHDOG_QUEUE_DEPTH": "9",
"WATCHDOG_AGENT_HUNG_MIN": "5",
"WATCHDOG_STAGE_STUCK_MIN": "11",
"WATCHDOG_ORCH_DOWN_TICKS": "4",
"WATCHDOG_COOLDOWN_S": "60",
"WATCHDOG_HTTP_TIMEOUT_S": "2",
"WATCHDOG_CONTAINERS": "orchestrator,plane-app",
"WATCHDOG_DEPS": "gitea=http://g/healthz,plane=http://p/",
}
)
assert cfg.interval_s == 7.0
assert cfg.mem_pct == 77.0
assert cfg.queue_depth == 9
assert cfg.agent_hung_s == 5 * 60.0
assert cfg.stage_stuck_s == 11 * 60.0
assert cfg.orch_down_ticks == 4
assert cfg.cooldown_s == 60.0
assert cfg.http_timeout_s == 2.0
assert cfg.containers == ["orchestrator", "plane-app"]
assert cfg.deps == {"gitea": "http://g/healthz", "plane": "http://p/"}
def test_defaults_when_env_absent():
cfg = Config.from_env({})
assert cfg.enabled is True
assert cfg.interval_s == 30.0
assert cfg.metrics_url.endswith(":8500/metrics")
assert cfg.disk_crit_enabled is False
assert cfg.containers == ["orchestrator"]
assert cfg.deps == {}
def test_malformed_env_degrades_to_default():
# A garbage numeric value must not crash config; it degrades to the default.
cfg = Config.from_env({"WATCHDOG_INTERVAL_S": "abc", "WATCHDOG_MEM_PCT": ""})
assert cfg.interval_s == 30.0
assert cfg.mem_pct == 90.0

View File

@@ -0,0 +1,56 @@
"""TC-01…TC-04: the pure decision function (alert/throttle/realert/recovery).
Mirrors the disk_watchdog.decide_action tests — the generalised ``decide`` is a
strict superset (boolean ``signal_active`` instead of ``used_pct >= threshold``).
"""
from watchdog.decision import (
ACTION_ALERT,
ACTION_NONE,
ACTION_REALERT,
ACTION_RECOVERY,
AlertState,
decide,
)
COOLDOWN = 1800.0
def test_tc01_not_alerting_active_alerts():
# TC-01: not-alerting & signal active -> ALERT (one per crossing).
prev = AlertState(alerting=False)
assert decide(True, prev, now=100.0, cooldown_s=COOLDOWN) == ACTION_ALERT
def test_tc01_not_alerting_inactive_is_none():
prev = AlertState(alerting=False)
assert decide(False, prev, now=100.0, cooldown_s=COOLDOWN) == ACTION_NONE
def test_tc02_alerting_active_in_cooldown_is_none():
# TC-02: alerting & still active & cooldown NOT elapsed -> NONE (anti-spam).
prev = AlertState(alerting=True, last_alert_at=1000.0)
assert decide(True, prev, now=1000.0 + 10.0, cooldown_s=COOLDOWN) == ACTION_NONE
def test_tc03_alerting_active_cooldown_elapsed_realerts():
# TC-03: alerting & still active & cooldown elapsed -> REALERT.
prev = AlertState(alerting=True, last_alert_at=1000.0)
assert decide(True, prev, now=1000.0 + COOLDOWN, cooldown_s=COOLDOWN) == ACTION_REALERT
def test_tc03_alerting_active_no_last_alert_realerts():
# Defensive: alerting but last_alert_at missing -> treat cooldown as elapsed.
prev = AlertState(alerting=True, last_alert_at=None)
assert decide(True, prev, now=5.0, cooldown_s=COOLDOWN) == ACTION_REALERT
def test_tc04_alerting_recovers_when_inactive():
# TC-04: alerting & signal back to normal -> RECOVERY.
prev = AlertState(alerting=True, last_alert_at=1000.0)
assert decide(False, prev, now=1200.0, cooldown_s=COOLDOWN) == ACTION_RECOVERY
def test_cooldown_boundary_is_inclusive():
# Exactly at cooldown boundary -> REALERT (>= semantics, like disk_watchdog).
prev = AlertState(alerting=True, last_alert_at=0.0)
assert decide(True, prev, now=COOLDOWN, cooldown_s=COOLDOWN) == ACTION_REALERT

View File

@@ -0,0 +1,39 @@
"""Dependency ping collector: reachable / unreachable / 5xx (never-raise)."""
from watchdog.collectors import deps as deps_mod
from .conftest import http_error, make_opener
def test_ping_reachable():
assert deps_mod.ping("http://x", 1.0, opener=make_opener(status=200)) is True
def test_ping_4xx_still_reachable():
# A 4xx proves the host is up (we ping for liveness, not auth).
assert deps_mod.ping("http://x", 1.0, opener=make_opener(exc=http_error(404))) is True
def test_ping_5xx_is_down():
assert deps_mod.ping("http://x", 1.0, opener=make_opener(exc=http_error(503))) is False
def test_ping_timeout_is_down():
assert deps_mod.ping(
"http://x", 1.0, opener=make_opener(exc=TimeoutError())
) is False
def test_ping_all_mixed():
def opener_factory(url):
return make_opener(status=200) if "good" in url else make_opener(
exc=ConnectionError()
)
def opener(req, timeout=None):
url = req.full_url if hasattr(req, "full_url") else req
return opener_factory(url)(req, timeout)
res = deps_mod.ping_all(
{"good": "http://good", "bad": "http://bad"}, 1.0, opener=opener
)
assert res == {"good": True, "bad": False}

View File

@@ -0,0 +1,42 @@
"""TC-13: anti-duplicate disk alert (coordinated with ORCH-063 / disk_watchdog).
ADR-001 D6: disk_watchdog (ORCH-063) is the SOLE owner of the 85% disk alert via
the orchestrator's Telegram. The sidecar carries NO disk alert by default
(``WATCHDOG_DISK_CRIT_ENABLED=false``) -> structurally zero double-alert. The
sidecar's contribution is an OPT-IN independent ceiling at a HIGHER threshold
(a different event, separate channel).
"""
from watchdog.config import Config
from watchdog.signals import host_signals
def _cfg(**kw):
return Config.from_env(kw)
def test_disk_signal_absent_by_default():
# Disk full at 90% -> sidecar produces NO disk signal (disk_watchdog owns it).
cfg = _cfg()
assert cfg.disk_crit_enabled is False
sigs = host_signals(cfg, mem_pct=None, disk=("/repos", 90.0))
assert [s for s in sigs if s.key == "host_disk_crit"] == []
def test_opt_in_ceiling_is_separate_higher_event():
cfg = _cfg(WATCHDOG_DISK_CRIT_ENABLED="true", WATCHDOG_DISK_CRIT_PCT="97")
# Below the ceiling (90% < 97%) -> not active even when opted in (no 85% dup).
below = host_signals(cfg, mem_pct=None, disk=("/repos", 90.0))
crit_below = [s for s in below if s.key == "host_disk_crit"]
assert len(crit_below) == 1 and crit_below[0].active is False
# At/over the high ceiling -> active (a DIFFERENT event from disk_watchdog 85%).
over = host_signals(cfg, mem_pct=None, disk=("/repos", 98.0))
crit_over = [s for s in over if s.key == "host_disk_crit"]
assert len(crit_over) == 1 and crit_over[0].active is True
def test_mem_signal_independent_of_disk():
cfg = _cfg(WATCHDOG_MEM_PCT="90")
sigs = host_signals(cfg, mem_pct=95.0, disk=None)
mem = [s for s in sigs if s.key == "host_mem"]
assert len(mem) == 1 and mem[0].active is True

View File

@@ -0,0 +1,79 @@
"""TC-09: self-hosting safety — the Docker client is read-only by construction.
The client exposes ONLY read methods (list/inspect), its single request
primitive hard-codes the ``GET`` HTTP method, and the source carries no
mutating Docker verb (start/stop/restart/kill/exec/POST). ``classify_container``
is a pure status mapper.
"""
import inspect as _inspect
from watchdog.collectors import containers as cmod
def test_request_primitive_is_get_only(monkeypatch):
captured = {}
class _FakeConn:
def __init__(self, *a, **k):
pass
def request(self, method, path):
captured["method"] = method
captured["path"] = path
def getresponse(self):
class _R:
status = 200
def read(self_inner):
return b"[]"
return _R()
def close(self):
pass
monkeypatch.setattr(cmod, "_UnixHTTPConnection", _FakeConn)
reader = cmod.DockerSockReader("/var/run/docker.sock")
reader.list_containers()
assert captured["method"] == "GET"
reader.inspect("orchestrator")
assert captured["method"] == "GET"
def test_no_mutating_verbs_in_source():
src = _inspect.getsource(cmod)
lowered = src.lower()
# No write/control verbs should appear as Docker actions in this module.
for verb in ("/start", "/stop", "/restart", "/kill", "/exec", "\"post\"", "'post'"):
assert verb not in lowered, f"mutating verb leaked into containers.py: {verb}"
def test_reader_exposes_only_read_methods():
public = [
n for n in dir(cmod.DockerSockReader)
if not n.startswith("_")
]
assert set(public) == {"list_containers", "inspect"}
def test_classify_container_pure_mapping():
assert cmod.classify_container({"State": {"Status": "running"}}) == "running"
assert cmod.classify_container({"State": {"Status": "exited"}}) == "exited"
assert cmod.classify_container(
{"State": {"Status": "running", "Health": {"Status": "unhealthy"}}}
) == "unhealthy"
assert cmod.classify_container(
{"State": {"Status": "running", "Health": {"Status": "healthy"}}}
) == "healthy"
assert cmod.classify_container(None) == "unknown"
assert cmod.classify_container({}) == "unknown"
def test_container_alarm_semantics():
assert cmod.container_alarm("running") is False
assert cmod.container_alarm("healthy") is False
assert cmod.container_alarm("exited") is True
assert cmod.container_alarm("restarting") is True
assert cmod.container_alarm("unhealthy") is True
assert cmod.container_alarm("unknown") is True

View File

@@ -0,0 +1,54 @@
"""Host collector: /proc/meminfo parsing + disk reads (never-raise)."""
import os
import tempfile
from watchdog.collectors import host as host_mod
def test_mem_used_pct_from_meminfo():
content = "MemTotal: 1000 kB\nMemFree: 100 kB\nMemAvailable: 250 kB\n"
with tempfile.NamedTemporaryFile("w", suffix=".meminfo", delete=False) as f:
f.write(content)
path = f.name
try:
pct = host_mod.read_mem_used_pct(path)
# used = (1 - 250/1000) * 100 = 75.0
assert pct == 75.0
finally:
os.unlink(path)
def test_mem_used_pct_missing_file_is_none():
assert host_mod.read_mem_used_pct("/no/such/meminfo") is None
def test_mem_used_pct_garbage_is_none():
with tempfile.NamedTemporaryFile("w", delete=False) as f:
f.write("totally not meminfo\n")
path = f.name
try:
assert host_mod.read_mem_used_pct(path) is None
finally:
os.unlink(path)
def test_disk_used_pct_real_path():
pct = host_mod.read_disk_used_pct("/")
assert pct is None or (0.0 <= pct <= 100.0)
def test_disk_used_pct_missing_path_is_none():
assert host_mod.read_disk_used_pct("/no/such/path/xyz") is None
def test_max_disk_used_pct_picks_worst(monkeypatch):
monkeypatch.setattr(
host_mod, "read_disk_used_pct",
lambda p: {"/a": 10.0, "/b": 80.0, "/c": None}.get(p),
)
assert host_mod.max_disk_used_pct(["/a", "/b", "/c"]) == ("/b", 80.0)
def test_max_disk_used_pct_all_unreadable(monkeypatch):
monkeypatch.setattr(host_mod, "read_disk_used_pct", lambda p: None)
assert host_mod.max_disk_used_pct(["/a", "/b"]) is None

View File

@@ -0,0 +1,118 @@
"""TC-11: tolerance to the /metrics contract.
Unknown fields are ignored, a missing optional does not crash, and a
schema_version above the known one logs a warning (no crash). Also covers the
envelope-derived signal evaluation (agent_hung / stage_stuck / job_failed /
queue_depth).
"""
import logging
from watchdog.collectors import orch as orch_mod
from watchdog.config import Config
from watchdog.signals import AgentSample, eval_envelope
def _cfg(**kw):
return Config.from_env(kw)
def test_unknown_field_ignored():
body = '{"schema_version":1,"stages":[],"brand_new_field":42}'
env = orch_mod.parse_envelope(body)
assert env["brand_new_field"] == 42 # tolerated, not a crash
def test_missing_optional_not_an_error():
env = orch_mod.parse_envelope('{"schema_version":1}')
ev = eval_envelope(env, _cfg(), prev_agents={}, prev_failed=None)
assert ev.signals == [] # no stages/agents/queue -> no signals, no crash
def test_non_object_body_raises_valueerror():
import pytest
with pytest.raises(ValueError):
orch_mod.parse_envelope("[1,2,3]")
def test_schema_version_bump_warns(caplog):
env = {"schema_version": 999}
with caplog.at_level(logging.WARNING):
orch_mod.check_schema_version(env)
assert any("schema_version" in r.message for r in caplog.records)
def test_parse_generated_at_roundtrip_and_tolerant():
assert orch_mod.parse_generated_at({"generated_at": "2026-06-10T00:00:00Z"})
assert orch_mod.parse_generated_at({"generated_at": "garbage"}) is None
assert orch_mod.parse_generated_at({}) is None
def test_queue_depth_and_job_failed_signals():
env = {
"schema_version": 1,
"queue": {"depth": 25, "counts": {"failed": 5}},
}
cfg = _cfg(WATCHDOG_QUEUE_DEPTH="20")
# First tick: failed baseline established, depth over threshold fires.
ev = eval_envelope(env, cfg, prev_agents={}, prev_failed=None)
keys = {s.key for s in ev.signals}
assert "queue_depth" in keys
assert "job_failed" not in keys # no prior baseline -> no edge yet
assert ev.failed_count == 5
# Next tick: failed grew 5 -> 7 -> edge job_failed alert.
env2 = {"queue": {"depth": 0, "counts": {"failed": 7}}}
ev2 = eval_envelope(env2, cfg, prev_agents={}, prev_failed=ev.failed_count)
jf = [s for s in ev2.signals if s.key == "job_failed"]
assert len(jf) == 1 and jf[0].edge is True and jf[0].active is True
def test_stage_stuck_signal():
env = {"stages": [{"work_item": "ORCH-1", "stage": "review", "age_in_stage_s": 9999}]}
cfg = _cfg(WATCHDOG_STAGE_STUCK_MIN="1") # 60s threshold
ev = eval_envelope(env, cfg, prev_agents={}, prev_failed=None)
stuck = [s for s in ev.signals if s.key == ("stage_stuck", "ORCH-1")]
assert len(stuck) == 1 and stuck[0].active is True
def test_agent_hung_needs_two_polls_and_low_cpu():
cfg = _cfg(WATCHDOG_AGENT_HUNG_MIN="1", WATCHDOG_AGENT_CPU_FLOOR="0.01")
env = {
"schema_version": 1,
"generated_at": "2026-06-10T00:01:40Z", # +100s vs prev sample below
"clk_tck": 100,
"agents": [{"run_id": 7, "agent": "developer", "runtime_s": 999, "cpu_ticks": 50}],
}
prev_t = orch_mod.parse_generated_at({"generated_at": "2026-06-10T00:00:00Z"})
prev = {7: AgentSample(cpu_ticks=40, generated_at=prev_t)}
# Δticks=10 over clk_tck=100 -> 0.1 CPU-seconds over 100s -> frac 0.001 < floor.
ev = eval_envelope(env, cfg, prev_agents=prev, prev_failed=None)
hung = [s for s in ev.signals if s.key == ("agent_hung", 7)]
assert len(hung) == 1 and hung[0].active is True
def test_agent_hung_skipped_when_cpu_ticks_null():
cfg = _cfg(WATCHDOG_AGENT_HUNG_MIN="1")
env = {
"generated_at": "2026-06-10T00:01:40Z",
"clk_tck": 100,
"agents": [{"run_id": 8, "runtime_s": 999, "cpu_ticks": None}],
}
prev = {8: AgentSample(cpu_ticks=10, generated_at=0.0)}
ev = eval_envelope(env, cfg, prev_agents=prev, prev_failed=None)
assert [s for s in ev.signals if s.key == ("agent_hung", 8)] == []
def test_agent_busy_not_hung():
cfg = _cfg(WATCHDOG_AGENT_HUNG_MIN="1", WATCHDOG_AGENT_CPU_FLOOR="0.01")
env = {
"generated_at": "2026-06-10T00:01:40Z",
"clk_tck": 100,
"agents": [{"run_id": 9, "runtime_s": 999, "cpu_ticks": 5000}],
}
prev_t = orch_mod.parse_generated_at({"generated_at": "2026-06-10T00:00:00Z"})
prev = {9: AgentSample(cpu_ticks=40, generated_at=prev_t)}
# Big Δticks -> high CPU fraction -> not hung.
ev = eval_envelope(env, cfg, prev_agents=prev, prev_failed=None)
assert [s for s in ev.signals if s.key == ("agent_hung", 9)] == []

View File

@@ -0,0 +1,88 @@
"""TC-06: three-level never-raise.
A raising collector (host / containers / deps) degrades ONE signal and the tick
reaches the end collecting the rest; a raising send is swallowed; the daemon
loop survives a raising tick.
"""
from watchdog.config import Config
from watchdog.core import Watchdog
class _BoomDocker:
def inspect(self, name):
raise RuntimeError("docker socket blew up")
class _Notifier:
def __init__(self):
self.sent = []
def send(self, text):
self.sent.append(text)
return True
class _BoomNotifier:
def send(self, text):
raise RuntimeError("telegram blew up")
def _cfg(**kw):
base = {
"WATCHDOG_TG_BOT_TOKEN": "t",
"WATCHDOG_TG_CHAT_ID": "c",
"WATCHDOG_CONTAINERS": "orchestrator",
}
return Config.from_env({**base, **kw})
def _good_fetch_patch(dog, monkeypatch):
from watchdog.collectors import orch as orch_mod
env = {"schema_version": 1, "generated_at": "2026-06-10T00:00:00Z",
"clk_tck": 100, "agents": [], "stages": [],
"queue": {"depth": 0, "counts": {"failed": 0}}}
monkeypatch.setattr(
orch_mod, "fetch_metrics",
lambda *a, **k: orch_mod.FetchResult(ok=True, envelope=env),
)
def test_per_source_broken_container_degrades_one_signal(monkeypatch):
notifier = _Notifier()
dog = Watchdog(_cfg(), notifier=notifier, docker=_BoomDocker())
_good_fetch_patch(dog, monkeypatch)
# Should not raise; tick completes and produces results for other sources.
results = dog.tick()
keys = [getattr(s, "key", None) for _, s in results]
# orch_down evaluated (orch was up -> not active) and container evaluated.
assert "orch_down" in keys
assert ("container_down", "orchestrator") in keys
def test_per_send_failure_is_swallowed(monkeypatch):
# A raising notifier must not break the tick (per-send never-raise).
cfg = _cfg(WATCHDOG_MEM_PCT="0") # mem >= 0 always -> force an alert send
dog = Watchdog(cfg, notifier=_BoomNotifier(), docker=_BoomDocker())
_good_fetch_patch(dog, monkeypatch)
monkeypatch.setattr(
"watchdog.collectors.host.read_mem_used_pct", lambda *a, **k: 50.0
)
# Must not raise despite the notifier exploding on a triggered alert.
dog.tick()
def test_per_tick_loop_survives_raising_tick(monkeypatch):
# The __main__ run loop must survive a tick that raises (outer never-raise).
from watchdog import __main__ as entry
cfg = _cfg(WATCHDOG_INTERVAL_S="0")
class _BoomDog:
def tick(self):
raise RuntimeError("tick blew up")
monkeypatch.setattr(entry, "Watchdog", lambda c: _BoomDog())
monkeypatch.setattr(entry.time, "sleep", lambda *_: None)
# max_ticks bounds the loop; it must return cleanly, not propagate.
entry.run(cfg=cfg, max_ticks=3)

View File

@@ -0,0 +1,84 @@
"""TC-10: independent Telegram transport.
The sidecar sends through its OWN bot_token/chat_id from env and must NOT import
``src.notifications`` or the orchestrator's code (C-1 / BR-8).
"""
import pathlib
from watchdog import notify as notify_mod
from watchdog.notify import Notifier, send_telegram
def test_notify_uses_own_token_and_chat(monkeypatch):
captured = {}
def _fake_opener(req, timeout=None):
captured["url"] = req.full_url
captured["data"] = req.data
class _R:
status = 200
def getcode(self):
return 200
def __enter__(self_inner):
return self_inner
def __exit__(self_inner, *a):
return False
return _R()
ok = send_telegram(
"MYTOKEN", "MYCHAT", "hello", opener=_fake_opener, api_base="https://tg.test"
)
assert ok is True
assert "botMYTOKEN" in captured["url"]
assert b"MYCHAT" in captured["data"]
def test_missing_credentials_is_failsafe_no_send():
# Absent token/chat -> logs and returns False, never raises (fail-safe).
assert send_telegram("", "chat", "x") is False
assert send_telegram("tok", "", "x") is False
def test_send_failure_is_swallowed():
def _boom(req, timeout=None):
raise OSError("network down")
assert send_telegram("t", "c", "x", opener=_boom) is False
def test_notifier_wraps_credentials(monkeypatch):
sent = {}
monkeypatch.setattr(
notify_mod, "send_telegram",
lambda tok, chat, text, timeout: sent.update(tok=tok, chat=chat, text=text) or True,
)
Notifier("TOK", "CHAT").send("body")
assert sent == {"tok": "TOK", "chat": "CHAT", "text": "body"}
def test_watchdog_package_does_not_import_src():
# No watchdog/*.py file may reference the orchestrator's src package (C-1).
# (Source scan, not sys.modules: the global test conftest imports src.* for
# every test, so a runtime check would be polluted.)
pkg_root = pathlib.Path(notify_mod.__file__).resolve().parent
offenders = []
for py in pkg_root.rglob("*.py"):
text = py.read_text(encoding="utf-8")
for needle in ("import src", "from src", "src.notifications"):
if needle in text:
offenders.append(f"{py.name}: {needle}")
assert offenders == [], f"watchdog references the orchestrator src: {offenders}"
def test_notify_source_has_no_src_notifications_import():
import inspect
src = inspect.getsource(notify_mod)
assert "src.notifications" not in src
assert "from src" not in src
assert "import src" not in src

View File

@@ -0,0 +1,67 @@
"""TC-05: orchestrator-down detection.
A ``/metrics`` timeout / connection-refused / 5xx / unreadable body -> the
``orchestrator_down`` signal -> ALERT "орк не отвечает" once the debounce
threshold of consecutive failures is reached (FR-3).
"""
from watchdog.collectors import orch as orch_mod
from watchdog.config import Config
from watchdog.signals import orch_down_signal
from .conftest import http_error, make_opener
def _cfg(**kw):
return Config.from_env({**{"WATCHDOG_ORCH_DOWN_TICKS": "3"}, **kw})
def test_fetch_timeout_is_not_ok():
opener = make_opener(exc=TimeoutError("timed out"))
res = orch_mod.fetch_metrics("http://x/metrics", 1.0, opener=opener)
assert res.ok is False
assert res.envelope is None
assert res.error
def test_fetch_connection_refused_is_not_ok():
opener = make_opener(exc=ConnectionRefusedError("refused"))
res = orch_mod.fetch_metrics("http://x/metrics", 1.0, opener=opener)
assert res.ok is False
def test_fetch_5xx_is_not_ok():
opener = make_opener(status=503, body=b"oops")
res = orch_mod.fetch_metrics("http://x/metrics", 1.0, opener=opener)
assert res.ok is False
assert "503" in (res.error or "")
def test_fetch_httperror_5xx_is_not_ok():
opener = make_opener(exc=http_error(502))
res = orch_mod.fetch_metrics("http://x/metrics", 1.0, opener=opener)
assert res.ok is False
def test_fetch_unreadable_body_is_not_ok():
opener = make_opener(status=200, body=b"not-json{{{")
res = orch_mod.fetch_metrics("http://x/metrics", 1.0, opener=opener)
assert res.ok is False
def test_fetch_good_body_is_ok():
opener = make_opener(status=200, body=b'{"schema_version":1,"stages":[]}')
res = orch_mod.fetch_metrics("http://x/metrics", 1.0, opener=opener)
assert res.ok is True
assert res.envelope["schema_version"] == 1
def test_orch_down_signal_debounce_then_alert():
cfg = _cfg()
# Single transient failure -> NOT active (does not flap).
assert orch_down_signal(1, cfg, "timeout").active is False
assert orch_down_signal(2, cfg, "timeout").active is False
# K-th consecutive failure -> active alarm.
sig = orch_down_signal(3, cfg, "timeout")
assert sig.active is True
assert sig.key == "orch_down"
assert "не отвечает" in sig.detail

View File

@@ -0,0 +1,106 @@
"""TC-08: full tick with the orchestrator down (integration).
With ``/metrics`` failing, the tick must not crash, must still collect host /
containers / deps, must produce EXACTLY ONE ``orchestrator_down`` alert (after
the debounce), suppress within cooldown, and emit recovery on restoration.
"""
from watchdog.collectors import orch as orch_mod
from watchdog.config import Config
from watchdog.core import Watchdog
class _Notifier:
def __init__(self):
self.sent = []
def send(self, text):
self.sent.append(text)
return True
class _StubDocker:
def inspect(self, name):
return {"State": {"Status": "running"}}
def _cfg(**kw):
base = {
"WATCHDOG_TG_BOT_TOKEN": "t",
"WATCHDOG_TG_CHAT_ID": "c",
"WATCHDOG_ORCH_DOWN_TICKS": "2",
"WATCHDOG_COOLDOWN_S": "1000",
"WATCHDOG_CONTAINERS": "orchestrator",
}
return Config.from_env({**base, **kw})
def _clock():
t = {"v": 0.0}
def now():
return t["v"]
return t, now
def _down(monkeypatch):
monkeypatch.setattr(
orch_mod, "fetch_metrics",
lambda *a, **k: orch_mod.FetchResult(ok=False, error="timeout"),
)
def _up(monkeypatch):
env = {"schema_version": 1, "generated_at": "2026-06-10T00:00:00Z",
"clk_tck": 100, "agents": [], "stages": [],
"queue": {"depth": 0, "counts": {"failed": 0}}}
monkeypatch.setattr(
orch_mod, "fetch_metrics",
lambda *a, **k: orch_mod.FetchResult(ok=True, envelope=env),
)
def _orch_down_alerts(notifier):
return [m for m in notifier.sent if "не отвечает" in m]
def test_tick_orch_down_one_alert_then_throttle_then_recovery(monkeypatch):
notifier = _Notifier()
t, now = _clock()
dog = Watchdog(_cfg(), notifier=notifier, docker=_StubDocker(), now_provider=now)
_down(monkeypatch)
# tick 1: first failure -> debounced, NOT yet active -> no alert.
dog.tick()
assert _orch_down_alerts(notifier) == []
# tick 2: second consecutive failure -> active -> EXACTLY ONE alert.
t["v"] = 30.0
dog.tick()
assert len(_orch_down_alerts(notifier)) == 1
# tick 3: still down, within cooldown -> throttled (no new alert).
t["v"] = 60.0
dog.tick()
assert len(_orch_down_alerts(notifier)) == 1
# restore: orchestrator answers again -> recovery message.
_up(monkeypatch)
t["v"] = 90.0
dog.tick()
recoveries = [m for m in notifier.sent if "восстановление" in m and "Орк" in m]
assert len(recoveries) == 1
def test_tick_does_not_crash_when_everything_breaks(monkeypatch):
# orch down + docker raising + no deps: tick still completes.
class _BoomDocker:
def inspect(self, name):
raise RuntimeError("boom")
notifier = _Notifier()
dog = Watchdog(_cfg(), notifier=notifier, docker=_BoomDocker())
_down(monkeypatch)
dog.tick() # must not raise
dog.tick()
assert len(_orch_down_alerts(notifier)) == 1