# ORCH-118 (FR-6 / AC-1, AC-3, AC-6, AC-7, AC-10): structural anti-drift tests for # the LLM call-site map (docs/architecture/llm-call-sites.md). # # UNIT OF INVENTORY = an *LLM consultation* (a control-path consumes an LLM # judgment), NOT "a process is spawned / Claude CLI exists" (R4, BRD §0). The # discriminator of every check below is therefore **"consults an LLM"**, never # "spawns a subprocess": the orchestrator spawns dozens of deterministic tools # (git / pytest / docker / ssh / scanners / staging_check.py) and those are # explicitly NOT matched — otherwise the test would degenerate into "count every # Popen" (the corruption these tests exist to avoid). # # These tests are fully offline and deterministic: no network, no LLM, no # subprocess-to-a-model. They read repository source + the canonical machine # blocks embedded in the map and assert the map stays in sync with the code. # # The map carries two machine-readable blocks the parser keys on: # * ORCH-118-INVENTORY-BLOCK — the call-site table (8 columns, D2) # * ORCH-118-KEEP-JUSTIFICATION-BLOCK — keep-LLM named-judgment list (TC-05) # Both are human-readable markdown AND machine-parseable (stdlib split), per # ADR-001 D2/D5 (no brittle prose regex). import re from pathlib import Path from src.qg.checks import QG_CHECKS from src.stages import STAGE_TRANSITIONS REPO_ROOT = Path(__file__).resolve().parents[1] SRC = REPO_ROOT / "src" WATCHDOG = REPO_ROOT / "watchdog" AGENTS_DIR = REPO_ROOT / ".openclaw" / "agents" MAP = REPO_ROOT / "docs" / "architecture" / "llm-call-sites.md" # The single allowed transport (S0): launcher._spawn builds + launches the Claude CLI. TRANSPORT_FILE = "src/agents/launcher.py" # Roles split by control-path axis (§0-bis). Ground truth lives in src/qg/checks.py # (the deterministic consumers) — the map must mirror it. P_ROLES = frozenset({"analyst", "architect", "developer"}) C_ROLES = frozenset({"reviewer", "tester", "deployer"}) AGENT_ROLES = P_ROLES | C_ROLES # P-roles are consumed by deterministic gates that judge an ARTIFACT (file # presence / CI) independently of any LLM self-report. P_CONSUMERS = frozenset( {"check_analysis_complete", "check_architecture_done", "check_ci_green"} ) # C-roles are consumed by verdict-parsers that READ a machine-verdict the LLM wrote # — the LLM judgment branches the control flow (PASS->advance / FAIL->rollback). C_CONSUMERS = frozenset( { "check_reviewer_verdict", "_parse_tests_verdict", "_parse_staging_status", "_parse_deploy_status", } ) ALLOWED_CLASSES = frozenset( { "keep-LLM", "replace-deterministic-now", "replace-later/risky", "needs-hybrid-fallback", "already-deterministic", } ) AGENT_IDS = frozenset({"A1", "A2", "A3", "A4", "A5", "A6"}) ALL_IDS = frozenset({"S0", "A1", "A2", "A3", "A4", "A5", "A6", "D1", "D2"}) # Deterministic leaf modules / routing that must NOT consult an LLM (FR-3 / AC-3). DETERMINISTIC_MODULES = ( "stages", "stage_engine", "serial_gate", "merge_gate", "coverage_gate", "security_gate", "staging_verdict", "review_parse", "error_classifier", "frontmatter", "self_deploy", "post_deploy", "transition_lease", "reconciler", "job_reaper", ) # Alternative-LLM-transport signatures forbidden anywhere in src/** + watchdog/** # (TC-12 / FR-6f): an LLM SDK import or a direct Anthropic/Claude HTTP endpoint. _FORBIDDEN_TRANSPORT_RE = ( re.compile(r"^\s*(?:from|import)\s+anthropic\b", re.M), re.compile(r"^\s*(?:from|import)\s+openai\b", re.M), re.compile(r"api\.anthropic\.com"), re.compile(r"/v1/messages"), ) # Frozen snapshot of the runtime contract (TC-09 / FR-7 / AC-7). ORCH-118 is # docs+tests-only; if this drifts, the map task touched the stage machine / gates. EXPECTED_STAGE_AGENTS = frozenset( {"analyst", "architect", "developer", "reviewer", "tester", "deployer"} ) EXPECTED_QG_CHECKS = frozenset( { "check_analysis_approved", "check_analysis_complete", "check_architecture_done", "check_ci_green", "check_review_approved", "check_tests_passed", "check_reviewer_verdict", "check_tests_local", "check_deploy_status", "check_staging_status", "check_branch_mergeable", "check_staging_image_fresh", "check_security_gate", "check_coverage_gate", } ) # --------------------------------------------------------------------------- # Helpers (stdlib only) # --------------------------------------------------------------------------- def _src_py_files() -> list[Path]: return sorted(SRC.glob("**/*.py")) def _src_and_watchdog_py_files() -> list[Path]: files = list(SRC.glob("**/*.py")) if WATCHDOG.is_dir(): files.extend(WATCHDOG.glob("**/*.py")) return sorted(files) def _rel(p: Path) -> str: return p.relative_to(REPO_ROOT).as_posix() def _function_body(source: str, name: str) -> str: """Return the source text of ``def `` up to (excluding) the next same-or-lower-indent def/class/decorator. Robust to line drift.""" lines = source.splitlines() start = None indent = 0 for i, line in enumerate(lines): stripped = line.lstrip() if stripped.startswith(f"def {name}("): start = i indent = len(line) - len(stripped) break assert start is not None, f"def {name}( not found in source" body = [lines[start]] for line in lines[start + 1 :]: if not line.strip(): body.append(line) continue cur_indent = len(line) - len(line.lstrip()) head = line.lstrip() if cur_indent <= indent and head.startswith(("def ", "class ", "@")): break body.append(line) return "\n".join(body) def _extract_block(text: str, name: str) -> str: start = f"" end = f"" assert start in text, f"missing block start marker {start!r} in map" assert end in text, f"missing block end marker {end!r} in map" return text.split(start, 1)[1].split(end, 1)[0] def _parse_pipe_table(block: str) -> list[dict]: """Parse a GitHub-style pipe table into a list of {column: value} dicts.""" header = None rows: list[dict] = [] for raw in block.splitlines(): line = raw.strip() if not line.startswith("|"): continue cells = [c.strip() for c in line.strip("|").split("|")] joined = "".join(cells) if joined and set(joined) <= set("-: "): continue # separator row |---|---| if header is None: header = [c.lower() for c in cells] continue rows.append(dict(zip(header, cells))) return rows def _inventory_rows() -> list[dict]: block = _extract_block(MAP.read_text(encoding="utf-8"), "ORCH-118-INVENTORY-BLOCK") rows = _parse_pipe_table(block) assert rows, "inventory block parsed to zero rows" return rows def _agent_rows() -> list[dict]: return [r for r in _inventory_rows() if r["id"] in AGENT_IDS] def _by_role() -> dict[str, dict]: return {r["role"]: r for r in _agent_rows()} def _parse_justifications() -> dict[str, str]: """Parse the keep-LLM named-judgment list: ``- role: justification text``.""" block = _extract_block( MAP.read_text(encoding="utf-8"), "ORCH-118-KEEP-JUSTIFICATION-BLOCK" ) out: dict[str, str] = {} for raw in block.splitlines(): line = raw.strip() m = re.match(r"^[-*]\s*([A-Za-z_-]+)\s*:\s*(.+)$", line) if m: out[m.group(1).strip()] = m.group(2).strip() return out # --------------------------------------------------------------------------- # TC-01 — single LLM-consultation transport (necessary, completed by TC-12). # --------------------------------------------------------------------------- def test_tc01_single_llm_transport(): """Exactly one place in src/** assembles+launches the Claude CLI, matched by the CONJUNCTION of transport signals (CLAUDE_BIN AND --system-prompt AND a process launcher) — and it is launcher._spawn. The conjunction is mandatory: bare CLAUDE_BIN would false-positive on preflight.py (existence check) and config.py (path literal), neither of which consults an LLM (ADR D5a).""" hits = [] for f in _src_py_files(): text = f.read_text(encoding="utf-8") launches = ("Popen" in text) or ('"bash"' in text) or ("'bash'" in text) if "--system-prompt" in text and "CLAUDE_BIN" in text and launches: hits.append(_rel(f)) assert hits == [TRANSPORT_FILE], ( "expected the single LLM-transport to be launcher._spawn; got: " + repr(hits) ) # The transport assembly lives inside _spawn specifically. launcher = (SRC / "agents" / "launcher.py").read_text(encoding="utf-8") assert "--system-prompt" in _function_body(launcher, "_spawn"), ( "--system-prompt is not inside def _spawn — transport moved?" ) # --------------------------------------------------------------------------- # TC-12 — no alternative LLM transport (FR-6f / AC-1, AC-6). # --------------------------------------------------------------------------- def test_tc12_no_alternative_llm_transport(): """No LLM-SDK import, no direct Anthropic/Claude HTTP endpoint, and no SECOND --system-prompt-bearing subprocess builder anywhere in src/** + watchdog/**. Closes the gap 'one _spawn green, a new consultation grew next to it'.""" sdk_offenders = [] for f in _src_and_watchdog_py_files(): text = f.read_text(encoding="utf-8") for rx in _FORBIDDEN_TRANSPORT_RE: if rx.search(text): sdk_offenders.append(f"{_rel(f)}: {rx.pattern}") assert not sdk_offenders, ( "alternative LLM transport found (allowed transport = S0/launcher._spawn " "only):\n" + "\n".join(sdk_offenders) ) # No second --system-prompt builder outside the allowlisted transport file. second_builders = [ _rel(f) for f in _src_and_watchdog_py_files() if "--system-prompt" in f.read_text(encoding="utf-8") and _rel(f) != TRANSPORT_FILE ] assert second_builders == [], ( "a second --system-prompt subprocess builder appeared: " + repr(second_builders) ) # --------------------------------------------------------------------------- # TC-02 — deterministic modules carry no LLM consultation (FR-6b / AC-3). # --------------------------------------------------------------------------- def test_tc02_deterministic_modules_no_llm_consultation(): """The listed routing/leaf modules do not consult an LLM (no _spawn transport, no alternative transport). Their git/pytest/docker/ssh/scanner subprocesses are deterministic TOOLS, not LLM consultations — discriminator is 'consults LLM', not 'spawns subprocess'.""" offenders = [] for mod in DETERMINISTIC_MODULES: path = SRC / f"{mod}.py" assert path.is_file(), f"deterministic module missing: {path}" text = path.read_text(encoding="utf-8") if "--system-prompt" in text: offenders.append(f"{mod}: builds --system-prompt (LLM transport)") for rx in _FORBIDDEN_TRANSPORT_RE: if rx.search(text): offenders.append(f"{mod}: {rx.pattern}") assert not offenders, "LLM consultation found in deterministic path:\n" + "\n".join( offenders ) # --------------------------------------------------------------------------- # TC-03 — prompt files on disk match the map, both ways (FR-6c / AC-1). # --------------------------------------------------------------------------- def test_tc03_prompt_files_match_map(): on_disk = {p.stem for p in AGENTS_DIR.glob("*.md")} in_map = {r["role"] for r in _agent_rows()} assert on_disk == set(AGENT_ROLES), ( f"prompt files on disk drifted from the 6 canonical roles: {on_disk}" ) assert in_map == on_disk, ( f"map agent roles {in_map} != prompt files on disk {on_disk}" ) # --------------------------------------------------------------------------- # TC-04 — totality + axis-consistent classification (FR-6d / FR-2 / AC-2). # --------------------------------------------------------------------------- def test_tc04_classification_total_and_axis_consistent(): rows = _inventory_rows() ids = [r["id"] for r in rows] assert len(ids) == len(set(ids)), f"duplicate call-site ids: {ids}" assert set(ids) == set(ALL_IDS), f"call-site id set drifted: {set(ids)}" for r in rows: cls = r["classification"] if r["id"] == "S0": assert cls in ("-", "—"), f"S0 (transport) must not be classified: {cls!r}" else: assert cls in ALLOWED_CLASSES or cls in ("-", "—"), ( f"{r['id']} class out of taxonomy: {cls!r}" ) # Class is DERIVED from the axis (not postulated): P->keep; C+!avoidable->keep; # C+avoidable->replace-*/hybrid. for r in _agent_rows(): axis = r["axis"].upper() avoidable = r["avoidable"].lower() cls = r["classification"] if axis == "P": assert cls == "keep-LLM", f"{r['role']} is P but class {cls!r}" elif axis == "C" and avoidable == "no": assert cls == "keep-LLM", f"{r['role']} is C-keep but class {cls!r}" elif axis == "C" and avoidable == "yes": assert cls in { "replace-deterministic-now", "replace-later/risky", "needs-hybrid-fallback", }, f"{r['role']} is avoidable but class {cls!r}" else: raise AssertionError(f"{r['role']}: bad axis/avoidable {axis!r}/{avoidable!r}") # --------------------------------------------------------------------------- # TC-05 — keep-LLM requires a named judgment; C-keep states non-derivability. # --------------------------------------------------------------------------- def test_tc05_keep_llm_named_judgment(): keep_roles = {r["role"] for r in _agent_rows() if r["classification"] == "keep-LLM"} assert keep_roles == {"analyst", "architect", "developer", "reviewer"}, ( f"keep-LLM role set drifted: {keep_roles}" ) just = _parse_justifications() for role in keep_roles: assert just.get(role, "").strip(), f"keep-LLM role {role} has no named judgment" # reviewer is C-keep: its justification must explain NON-derivability of the verdict. assert "deriv" in just["reviewer"].lower(), ( "reviewer (C-keep) justification must state the verdict is NOT derivable " "from an exit-code" ) # --------------------------------------------------------------------------- # TC-06 — capability != consultation: D1/D2 intercepted before _spawn (FR-6e). # --------------------------------------------------------------------------- def test_tc06_capability_not_consultation(): launcher = (SRC / "agents" / "launcher.py").read_text(encoding="utf-8") body = _function_body(launcher, "launch_job") i_finalizer = body.find('"deploy-finalizer"') i_monitor = body.find('"post-deploy-monitor"') i_spawn = body.find("self._spawn(") assert i_finalizer != -1, "deploy-finalizer guard not found in launch_job" assert i_monitor != -1, "post-deploy-monitor guard not found in launch_job" assert i_spawn != -1, "self._spawn( call not found in launch_job" assert i_finalizer < i_spawn, "deploy-finalizer guard must precede _spawn" assert i_monitor < i_spawn, "post-deploy-monitor guard must precede _spawn" assert "return self._run_deploy_finalizer_job" in body assert "return self._run_post_deploy_monitor_job" in body # --------------------------------------------------------------------------- # TC-09 — runtime contract snapshot unchanged (FR-7 / AC-7). # --------------------------------------------------------------------------- def test_tc09_runtime_contract_snapshot(): agents = {t["agent"] for t in STAGE_TRANSITIONS.values() if t["agent"]} assert agents == set(EXPECTED_STAGE_AGENTS), ( f"STAGE_TRANSITIONS agent set changed: {agents}" ) assert set(QG_CHECKS) == set(EXPECTED_QG_CHECKS), ( f"QG_CHECKS name set changed: {set(QG_CHECKS)}" ) # --------------------------------------------------------------------------- # TC-13 — control-path axis is consistent with the real consumer (FR-6g / AC-10). # --------------------------------------------------------------------------- def test_tc13_control_path_axis_correct(): checks_src = (SRC / "qg" / "checks.py").read_text(encoding="utf-8") rows = _agent_rows() for r in rows: role = r["role"] axis = r["axis"].upper() consumer = r["output_consumer"].split(":")[0].strip() assert re.search(rf"def {re.escape(consumer)}\(", checks_src), ( f"{role}: output_consumer {consumer!r} is not a def in src/qg/checks.py" ) if role in P_ROLES: assert axis == "P", f"{role} must be axis P, got {axis!r}" assert consumer in P_CONSUMERS, ( f"{role} (P) consumer {consumer!r} is not a deterministic artifact gate" ) elif role in C_ROLES: assert axis == "C", f"{role} must be axis C, got {axis!r}" assert consumer in C_CONSUMERS, ( f"{role} (C) consumer {consumer!r} is not a verdict-parser" ) else: raise AssertionError(f"unexpected agent role in map: {role!r}") assert {r["role"] for r in rows if r["axis"].upper() == "P"} == set(P_ROLES) assert {r["role"] for r in rows if r["axis"].upper() == "C"} == set(C_ROLES) # --------------------------------------------------------------------------- # TC-14 — avoidable LLM control-path set is exactly {tester, deployer} (FR-6h). # --------------------------------------------------------------------------- def test_tc14_avoidable_set_fixed(): rows = _agent_rows() by_role = {r["role"]: r for r in rows} avoidable = {r["role"] for r in rows if r["avoidable"].lower() == "yes"} assert avoidable == {"tester", "deployer"}, ( f"avoidable LLM control-path set drifted from {{tester, deployer}}: {avoidable}" ) # reviewer: control path (C) but KEEP — verdict not derivable. assert by_role["reviewer"]["axis"].upper() == "C" assert by_role["reviewer"]["avoidable"].lower() == "no" # analyst / architect / developer: NOT control path (P artifact-producer). for role in ("analyst", "architect", "developer"): assert by_role[role]["axis"].upper() == "P", f"{role} must be P (not control path)" assert by_role[role]["avoidable"].lower() == "no", f"{role} must not be avoidable"