"""ORCH-111 TC-01…TC-06: the proc_blocking signal builder + decision surface. Pure / deterministic — no real ``/proc``, no container, no socket, no timer. The collector is exercised here only for its never-raise / read-only contract (TC-04); its ``/proc`` parsing fixtures live in ``test_proc_collector.py``. TC-01 is the REGRESS anchor: before ORCH-111 there was no ``proc_blocking`` builder/dispatch at all, so a long-lived orphaned pytest raised no alert; this asserts the active signal is now produced (red→green). """ import ast as _ast import inspect as _inspect from watchdog.collectors import proc as proc_mod from watchdog.config import Config from watchdog.core import Watchdog from watchdog.decision import ( ACTION_ALERT, ACTION_NONE, ACTION_REALERT, ACTION_RECOVERY, ) from watchdog.signals import proc_signals def _cfg(**kw) -> Config: base = {"WATCHDOG_PROC_ENABLED": "true", "WATCHDOG_PROC_AGE_MIN": "60"} return Config.from_env({**base, **kw}) def _candidate(pid=4242, age_s=7200.0, cmdline="python3 -m pytest tests/", cpu_s=1234.0): return {"pid": pid, "cmdline": cmdline, "age_s": age_s, "cpu_s": cpu_s, "start_ticks": 1} # -- TC-01: regress — active signal for a long-lived blocking process --------- def test_tc01_builder_emits_active_proc_blocking_signal(): cfg = _cfg() # proc_age_s == 3600 sigs = proc_signals(cfg, [_candidate(pid=4242, age_s=7200.0)]) assert len(sigs) == 1 sig = sigs[0] assert sig.key == ("proc_blocking", 4242) assert sig.active is True # 7200 > 3600 # AC-2: actionable detail — PID, age in seconds, cmdline fragment, CPU time. assert "4242" in sig.detail assert "7200" in sig.detail assert "pytest" in sig.detail assert "CPU" in sig.detail assert sig.cooldown_s == cfg.proc_cooldown_s # -- TC-02: anti-false-positive — below the threshold -> inactive ------------- def test_tc02_below_threshold_is_inactive(): cfg = _cfg() # proc_age_s == 3600 sigs = proc_signals(cfg, [_candidate(age_s=600.0)]) # within a 600s test budget assert len(sigs) == 1 assert sigs[0].active is False # 600 < 3600 -> no alert (BR-4 / AC-4) def test_tc02_boundary_is_strict_greater_than(): cfg = _cfg() at_threshold = proc_signals(cfg, [_candidate(age_s=cfg.proc_age_s)]) assert at_threshold[0].active is False # strict `>`: exactly at threshold is OK over = proc_signals(cfg, [_candidate(age_s=cfg.proc_age_s + 1)]) assert over[0].active is True # -- TC-03: config / kill-switch + default threshold > test-run budget -------- def test_tc03_defaults_are_off_and_safe(): cfg = Config.from_env({}) assert cfg.proc_enabled is False # default-OFF (opt-in, D5) assert cfg.proc_patterns == ["pytest"] assert cfg.proc_cooldown_s == 1800.0 # Cross-invariant (D2): default age threshold MUST exceed the max legitimate # test-run budget max(merge_retest_timeout_s=600, coverage_run_timeout_s=900). assert cfg.proc_age_s > 900.0 def test_tc03_env_overrides_and_malformed_degrade(): cfg = Config.from_env( { "WATCHDOG_PROC_ENABLED": "true", "WATCHDOG_PROC_AGE_MIN": "30", "WATCHDOG_PROC_PATTERNS": "pytest,coverage run", "WATCHDOG_PROC_COOLDOWN_S": "600", } ) assert cfg.proc_enabled is True assert cfg.proc_age_s == 30 * 60.0 assert cfg.proc_patterns == ["pytest", "coverage run"] assert cfg.proc_cooldown_s == 600.0 # malformed numerics degrade to defaults (never-raise config). bad = Config.from_env({"WATCHDOG_PROC_AGE_MIN": "abc", "WATCHDOG_PROC_COOLDOWN_S": ""}) assert bad.proc_age_min == 60.0 assert bad.proc_cooldown_s == 1800.0 def test_tc03_killswitch_off_makes_collector_inert(): cfg = Config.from_env({"WATCHDOG_PROC_ENABLED": "false"}) dog = Watchdog(cfg, notifier=_Notifier(), docker=_StubDocker(), now_provider=lambda: 0.0) # The gated collector returns [] without ever touching /proc (zero overhead). assert dog._collect_proc(now=0.0) == [] # -- TC-04: collector never-raise / read-only --------------------------------- def test_tc04_collector_degrades_to_empty_on_broken_source(): # Missing /proc root -> [] (one signal skipped), no exception. assert proc_mod.collect_candidates(["pytest"], now=0.0, proc_root="/no/such/proc") == [] def test_tc04_collector_empty_when_btime_unreadable(tmp_path): # /proc with no parseable btime -> [] (cannot compute age -> no bogus signal). (tmp_path / "stat").write_text("cpu 1 2 3\nintr 0\n") assert proc_mod.collect_candidates(["pytest"], now=0.0, proc_root=str(tmp_path)) == [] def _docstring_node_ids(tree) -> set: """ids of the Constant nodes that are module/func/class docstrings (prose).""" out = set() for node in _ast.walk(tree): if isinstance(node, (_ast.Module, _ast.FunctionDef, _ast.AsyncFunctionDef, _ast.ClassDef)): body = getattr(node, "body", []) if ( body and isinstance(body[0], _ast.Expr) and isinstance(body[0].value, _ast.Constant) and isinstance(body[0].value.value, str) ): out.add(id(body[0].value)) return out def test_tc04_collector_source_is_read_only(): # AC-3 / NFR-2: the EXECUTABLE code (not the prose describing the contract) # carries no kill / signal / subprocess / environ-read. Scan the AST so the # docstring that documents the ban does not trip the check. tree = _ast.parse(_inspect.getsource(proc_mod)) docstrings = _docstring_node_ids(tree) violations: list[str] = [] _MUTATING_ATTRS = {"kill", "system", "Popen", "popen", "run", "send_signal", "terminate"} for node in _ast.walk(tree): if isinstance(node, _ast.Import): for a in node.names: if a.name.split(".")[0] in {"subprocess", "signal"}: violations.append(f"import {a.name}") elif isinstance(node, _ast.ImportFrom): if (node.module or "").split(".")[0] in {"subprocess", "signal"}: violations.append(f"from {node.module}") elif isinstance(node, _ast.Attribute) and node.attr in _MUTATING_ATTRS: violations.append(f".{node.attr}") elif isinstance(node, _ast.Constant) and isinstance(node.value, str): if id(node) not in docstrings and "environ" in node.value: violations.append("reads /proc//environ") assert not violations, f"read-only contract violated in proc.py: {violations}" def test_tc04_builder_skips_records_missing_fields(): cfg = _cfg() sigs = proc_signals(cfg, [{"pid": None}, {"cmdline": "pytest"}, _candidate()]) assert [s.key for s in sigs] == [("proc_blocking", 4242)] # only the valid record # -- TC-05: anti-spam / recovery through decide()/AlertState ------------------ def test_tc05_alert_throttle_realert_then_recovery(): seq = {"candidates": [_candidate(pid=7, age_s=7200.0)]} cfg = _cfg(WATCHDOG_PROC_COOLDOWN_S="1000") t = {"v": 0.0} notifier = _Notifier() dog = Watchdog(cfg, notifier=notifier, docker=_StubDocker(), now_provider=lambda: t["v"]) dog._collect_proc = lambda now: list(seq["candidates"]) # inject collector def proc_alerts(): return [m for m in notifier.sent if "Блокирующий процесс" in m] def actions(): return [a for a, s in dog.tick() if getattr(s, "key", (None,))[0] == "proc_blocking"] # tick 1: threshold crossed -> exactly one ALERT. assert ACTION_ALERT in actions() assert len(proc_alerts()) == 1 # tick 2: still alive, within cooldown -> NONE (anti-spam, no new alert). t["v"] = 100.0 assert actions() == [ACTION_NONE] assert len(proc_alerts()) == 1 # tick 3: cooldown elapsed -> REALERT. t["v"] = 1100.0 assert ACTION_REALERT in actions() assert len(proc_alerts()) == 2 # tick 4: the process vanished -> exactly one RECOVERY (synthesised, D4). seq["candidates"] = [] t["v"] = 1200.0 assert ACTION_RECOVERY in actions() recoveries = [m for m in notifier.sent if "восстановление" in m and "Блокирующий" in m] assert len(recoveries) == 1 # tick 5: still gone -> no repeated recovery (state cleared). t["v"] = 1300.0 dog.tick() assert len([m for m in notifier.sent if "восстановление" in m and "Блокирующий" in m]) == 1 # -- TC-06: no duplicate with agent_hung (cmdline partition) ------------------ def test_tc06_claude_agent_cmdline_never_matches_pytest_pattern(): # A claude agent process (covered by agent_hung) is excluded by the collector # pattern scope -> proc_blocking never fires for it (NFR-4 / AC-5, by construction). assert proc_mod.matches_patterns("claude --model claude-opus-4-8 -p ...", ["pytest"]) is False assert proc_mod.matches_patterns("python3 -m pytest tests/", ["pytest"]) is True def test_tc06_collector_excludes_non_matching_processes(tmp_path): _write_fake_proc( tmp_path, btime=1_000_000, procs={ "100": ("claude --model claude-opus-4-8", _stat_line(start_ticks=0)), "200": ("python3 -m pytest tests/test_x.py", _stat_line(start_ticks=0)), }, ) recs = proc_mod.collect_candidates( ["pytest"], now=1_010_000.0, proc_root=str(tmp_path), clk_tck=100 ) assert [r["pid"] for r in recs] == [200] # only the pytest process # -- shared fakes ------------------------------------------------------------- class _Notifier: def __init__(self): self.sent = [] def send(self, text): self.sent.append(text) return True class _StubDocker: def inspect(self, name): return {"State": {"Status": "running"}} def _stat_line(start_ticks: int, utime: int = 0, stime: int = 0) -> str: # /proc//stat: pid (comm) state ppid ... utime(14) stime(15) ... starttime(22) ... fields = ["0"] * 52 fields[0] = "999" fields[1] = "(python3)" fields[2] = "S" fields[13] = str(utime) # field 14 fields[14] = str(stime) # field 15 fields[21] = str(start_ticks) # field 22 return " ".join(fields) def _write_fake_proc(root, *, btime: int, procs: dict): (root / "stat").write_text(f"cpu 1 2 3\nbtime {btime}\nintr 0\n") for pid, (cmdline, stat_line) in procs.items(): d = root / pid d.mkdir() (d / "cmdline").write_bytes(cmdline.replace(" ", "\x00").encode() + b"\x00") (d / "stat").write_text(stat_line)