Files
orchestrator/tests/test_orch116_test_runner.py
claude-bot 80aa2409f7
All checks were successful
CI / test (push) Successful in 1m9s
CI / test (pull_request) Successful in 1m7s
feat(testing): deterministic test-runner replacing LLM tester on the testing stage (ORCH-116)
Second realised slice of the determinization-roadmap (ORCH-118 A5,
needs-hybrid-fallback): on the `testing` stage for the self-hosting
`orchestrator` repo the LLM `tester` agent is replaced by a deterministic
test-runner (src/test_runner.py), intercepted in launch_job BEFORE _spawn
(deploy-finalizer / post-deploy-monitor / staging-runner precedent).

It runs the regression `python -m pytest <target>` in the task worktree via
proc_group (tree-kill) + an optional read-only smoke (/health, /status, /queue
+ serial_gate), maps the exit-code -> result: PASS|FAIL via the existing
self_deploy.map_exit_code_to_status contract, writes 13-test-report.md and
initiates the EXISTING check_tests_passed gate exactly as a finished LLM-tester.

Invariant (NFR-1): only the *producer* changes — the artifact contract
(13-test-report.md / result:), the gate check_tests_passed / _parse_tests_verdict,
STAGE_TRANSITIONS and the DB schema are byte-for-byte UNCHANGED. Additive, under
a kill-switch (test_runner_enabled), never-raise, fail-closed, self-hosting scope,
two-level outcome (tool-error DEFER, anti ORCH-110), hybrid (LLM strictly
off-control-path). 52c-`status:` is aligned with the verdict (D6.1) so the
three-field _parse_tests_verdict never false-negatives a PASS.

Docs (ORCH-118 NFR-6, atomic with code): llm-call-sites.md (A5 implemented),
llm-determinization-roadmap.md (rank 2 implemented), llm-usage-policy.md,
README/internals/overview, tester.md, CLAUDE.md, CHANGELOG.md. Coverage:
tests/test_orch116_test_runner.py (TC-01..TC-14); LLM anti-drift tests green.
Full suite: 2137 passed.

Refs: ORCH-116
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-16 02:59:51 +03:00

561 lines
26 KiB
Python

"""ORCH-116 — deterministic test-runner replacing the LLM tester on the `testing`
stage (self-hosting orchestrator).
Covers Phase 1 (04-test-plan.yaml TC-01…TC-14): the launch_job intercept BEFORE
_spawn, the exit-code -> result: PASS|FAIL mapping, 13-test-report.md render + the
UNCHANGED gate contract (incl. the tester-specific D6.1 52c-`status:`↔parser
anti-collision), advance_stage initiation, kill-switch / scope / backward-compat for a
repo without a test-contract, never-raise / fail-safe, the bounded tool-error DEFER
(anti ORCH-110), process timeout (proc_group / worktree), read-only smoke, self-hosting
safety, the anti-drift invariant (STAGE_TRANSITIONS / QG_CHECKS / check_tests_passed /
_parse_tests_verdict / DB schema untouched), and the /queue observability block.
No live Claude CLI, network or worktree git: the pytest subprocess, the smoke GETs and
advance_stage are mocked; the pure mapping/render is tested directly. (TC-15 — the LLM
call-site map anti-drift — lives in tests/test_llm_call_site_inventory.py /
test_llm_determinization_docs.py and is asserted green here too.)
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch116_test_runner.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db, enqueue_job # noqa: E402
from src import test_runner # noqa: E402
from src import stage_engine # noqa: E402
from src import config as cfg # noqa: E402
from src.proc_group import ProcResult # noqa: E402
from src.agents.launcher import AgentLauncher # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# Worktree artefacts land in tmp; git commit/push degrade gracefully (no repo).
monkeypatch.setattr("src.git_worktree.settings.worktrees_dir", str(tmp_path), raising=False)
# Runner ON, self-hosting scope (default). Smoke OFF by default so the PASS path
# does not hit the network; the dedicated smoke tests re-enable + mock it.
monkeypatch.setattr(cfg.settings, "test_runner_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "test_runner_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "test_runner_smoke_enabled", False, raising=False)
monkeypatch.setattr(cfg.settings, "test_runner_infra_max_retries", 2, raising=False)
# Reset in-process counters between tests.
for k in test_runner._TEST_RUNNER_COUNTERS:
test_runner._TEST_RUNNER_COUNTERS[k] = 0
yield
def _make_task(stage, repo="orchestrator", wi="ORCH-116", branch="feature/ORCH-116-x"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _make_job(agent, repo, task_id, content="x"):
return enqueue_job(agent, repo, content, task_id=task_id)
def _job_dict(job_id, agent, repo, task_id):
return {"id": job_id, "agent": agent, "repo": repo, "task_id": task_id, "task_content": "x"}
def _read_report(repo, branch, wi):
from src.git_worktree import get_worktree_path
p = os.path.join(get_worktree_path(repo, branch), f"docs/work-items/{wi}/13-test-report.md")
with open(p, "r", encoding="utf-8") as f:
return f.read()
# ---------------------------------------------------------------------------
# TC-01 — applies(): kill-switch / scope / contract / fail-safe (FR-7 / AC-6/AC-7/AC-8)
# ---------------------------------------------------------------------------
def test_tc01_applies_killswitch_scope_and_contract(monkeypatch):
# enabled=False -> False (fall back to the LLM path) regardless of repo.
monkeypatch.setattr(cfg.settings, "test_runner_enabled", False)
assert test_runner.applies("orchestrator") is False
# enabled, empty CSV -> self-hosting only (and a test-contract resolves for it).
monkeypatch.setattr(cfg.settings, "test_runner_enabled", True)
monkeypatch.setattr(cfg.settings, "test_runner_repos", "")
assert test_runner.applies("orchestrator") is True
assert test_runner.applies("enduro-trails") is False
assert test_runner.applies("") is False
# BR-9 backward-compat: even hand-adding a non-self repo to the CSV -> False,
# because _has_test_contract is unresolved for it (Phase 1 = self only).
monkeypatch.setattr(cfg.settings, "test_runner_repos", "enduro-trails, orchestrator")
assert test_runner.applies("ENDURO-TRAILS") is False # no contract -> LLM-tester
assert test_runner.applies("orchestrator") is True # in scope + contract
assert test_runner.applies("other-repo") is False
def test_tc01_applies_never_raises(monkeypatch):
class Boom:
@property
def test_runner_enabled(self):
raise RuntimeError("boom")
monkeypatch.setattr(test_runner, "settings", Boom())
assert test_runner.applies("orchestrator") is False
# ---------------------------------------------------------------------------
# TC-02 — exit-code -> verdict, single shared contract (FR-3 / AC-3)
# ---------------------------------------------------------------------------
def test_tc02_map_exit_code():
from src import self_deploy
assert test_runner.map_exit_code_to_result(0) == "PASS"
for bad in (1, 2, 137, -9):
assert test_runner.map_exit_code_to_result(bad) == "FAIL"
for noncode in (None, "x", object()):
assert test_runner.map_exit_code_to_result(noncode) == "FAIL"
# Same underlying contract as the deploy-finalizer / staging-runner (no 2nd mapping):
# PASS iff SUCCESS, FAIL iff FAILED.
for v in (0, 1, None, "garbage"):
expect = "PASS" if self_deploy.map_exit_code_to_status(v) == "SUCCESS" else "FAIL"
assert test_runner.map_exit_code_to_result(v) == expect
# ---------------------------------------------------------------------------
# TC-03 — 13-test-report.md render: machine key + 52c schema + status alignment (FR-4 / AC-2)
# ---------------------------------------------------------------------------
def test_tc03_report_render_schema_and_status_alignment():
body = test_runner.build_test_report("ORCH-116", 0, "PASS", "23 passed", smoke="ok")
assert "result: PASS" in body # UPPERCASE machine key
for field in ("work_item: ORCH-116", "stage: testing",
"author_agent: test-runner", "status: success",
"created_at:", "model_used: n/a"):
assert field in body, f"missing 52c field: {field}"
assert "smoke: ok" in body
assert "23 passed" in body # stdout tail copied into the body
failed = test_runner.build_test_report("ORCH-116", 1, "FAIL", "boom", smoke="skipped")
assert "result: FAIL" in failed
# D6.1: status MUST be aligned with the verdict (failed, never `success`).
assert "status: failed" in failed
# ---------------------------------------------------------------------------
# TC-04 — generated report read by the UNCHANGED _parse_tests_verdict (AC-2 / D6.1)
# ---------------------------------------------------------------------------
def test_tc04_gate_parser_unchanged():
from src.qg.checks import _parse_tests_verdict
ok, reason = _parse_tests_verdict(
test_runner.build_test_report("ORCH-116", 0, "PASS", "", smoke="ok"))
assert ok is True and "PASS" in reason
bad, reason2 = _parse_tests_verdict(
test_runner.build_test_report("ORCH-116", 2, "FAIL", "", smoke="skipped"))
assert bad is False and "FAIL" in reason2
def test_tc04_status_field_never_false_negatives_a_pass():
"""The D6.1 mine: the 52c-mandatory `status:` is read by _parse_tests_verdict with
negative-token priority. A PASS report must NOT carry a status whose UPPERCASE is a
negative token (else a healthy run reads as FAIL)."""
from src.qg.checks import _parse_tests_verdict, _TESTS_NEGATIVE_TOKENS
body = test_runner.build_test_report("ORCH-116", 0, "PASS", "", smoke="ok")
# Extract the status: line from the frontmatter and assert it is token-safe.
status_line = next(ln for ln in body.splitlines() if ln.startswith("status:"))
status_val = status_line.split(":", 1)[1].strip().upper()
for neg in _TESTS_NEGATIVE_TOKENS:
assert neg not in status_val, f"PASS report status carries negative token {neg!r}"
assert _parse_tests_verdict(body)[0] is True
# ---------------------------------------------------------------------------
# TC-05 — launch_job intercepts tester on testing BEFORE _spawn (AC-1)
# ---------------------------------------------------------------------------
def test_tc05_launch_job_intercepts_before_spawn(monkeypatch):
tid = _make_task("testing")
jid = _make_job("tester", "orchestrator", tid)
lr = AgentLauncher()
spawn = MagicMock(return_value=999)
monkeypatch.setattr(lr, "_spawn", spawn)
run_gate = MagicMock()
monkeypatch.setattr(test_runner, "run_test_gate", run_gate)
ret = lr.launch_job(_job_dict(jid, "tester", "orchestrator", tid))
assert ret is None # no agent_runs row
spawn.assert_not_called() # LLM path NOT taken
run_gate.assert_called_once() # deterministic runner ran
conn = get_db()
status = conn.execute("SELECT status FROM jobs WHERE id=?", (jid,)).fetchone()[0]
conn.close()
assert status == "done"
# ---------------------------------------------------------------------------
# TC-06 — discriminator: tester off-`testing` / non-tester / never-raise (AC-1 / R-1)
# ---------------------------------------------------------------------------
def test_tc06_stage_discriminator_off_testing_not_intercepted(monkeypatch):
tid = _make_task("development") # tester job but not the testing stage
jid = _make_job("tester", "orchestrator", tid)
assert test_runner.should_intercept(_job_dict(jid, "tester", "orchestrator", tid)) is False
# and launch_job falls through to _spawn for it.
lr = AgentLauncher()
spawn = MagicMock(return_value=42)
monkeypatch.setattr(lr, "_spawn", spawn)
ret = lr.launch_job(_job_dict(jid, "tester", "orchestrator", tid))
assert ret == 42
spawn.assert_called_once()
def test_tc06_non_tester_not_intercepted():
tid = _make_task("testing")
jid = _make_job("developer", "orchestrator", tid)
assert test_runner.should_intercept(_job_dict(jid, "developer", "orchestrator", tid)) is False
def test_tc06_non_self_repo_not_intercepted():
tid = _make_task("testing", repo="enduro-trails", wi="ET-9", branch="feature/ET-9-x")
jid = _make_job("tester", "enduro-trails", tid)
assert test_runner.should_intercept(_job_dict(jid, "tester", "enduro-trails", tid)) is False
def test_tc06_should_intercept_never_raises(monkeypatch):
# applies() True, but the DB lookup explodes -> should_intercept False (fall-through).
monkeypatch.setattr(test_runner, "applies", lambda repo: True)
monkeypatch.setattr(test_runner, "get_db", None, raising=False)
def boom_get_db():
raise RuntimeError("db down")
import src.db as dbmod
monkeypatch.setattr(dbmod, "get_db", boom_get_db)
assert test_runner.should_intercept(
{"agent": "tester", "repo": "orchestrator", "task_id": 1}) is False
# ---------------------------------------------------------------------------
# TC-07 — routing equivalence: PASS -> advance; FAIL -> same path (AC-4)
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("rc,expected", [(0, "PASS"), (1, "FAIL")])
def test_tc07_advance_initiated_like_llm(monkeypatch, rc, expected):
tid = _make_task("testing")
monkeypatch.setattr(test_runner, "run_test_suite",
lambda repo, branch: ProcResult(returncode=rc, stdout="out", stderr="", timed_out=False))
advance = MagicMock()
monkeypatch.setattr(stage_engine, "advance_stage", advance)
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
advance.assert_called_once()
kwargs = advance.call_args.kwargs
assert kwargs["current_stage"] == "testing"
assert kwargs["finished_agent"] == "tester"
assert kwargs["work_item_id"] == "ORCH-116"
assert f"result: {expected}" in _read_report("orchestrator", "feature/ORCH-116-x", "ORCH-116")
# ---------------------------------------------------------------------------
# TC-08 — kill-switch: enabled=False -> LLM path via _spawn (AC-7)
# ---------------------------------------------------------------------------
def test_tc08_killswitch_falls_back_to_spawn(monkeypatch):
monkeypatch.setattr(cfg.settings, "test_runner_enabled", False)
tid = _make_task("testing")
jid = _make_job("tester", "orchestrator", tid)
lr = AgentLauncher()
spawn = MagicMock(return_value=7)
monkeypatch.setattr(lr, "_spawn", spawn)
run_gate = MagicMock()
monkeypatch.setattr(test_runner, "run_test_gate", run_gate)
ret = lr.launch_job(_job_dict(jid, "tester", "orchestrator", tid))
assert ret == 7
spawn.assert_called_once() # byte-for-byte the prior LLM-tester path
run_gate.assert_not_called()
# ---------------------------------------------------------------------------
# TC-09 — anti-drift NFR-1: STAGE_TRANSITIONS / QG_CHECKS / schema untouched (AC-6)
# ---------------------------------------------------------------------------
def test_tc09_pipeline_contract_unchanged():
from src.stages import STAGE_TRANSITIONS
from src.qg.checks import QG_CHECKS
# The review->testing edge (tester) and testing->deploy-staging gate are byte-for-byte.
assert STAGE_TRANSITIONS["review"] == {
"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"
}
assert STAGE_TRANSITIONS["testing"] == {
"next": "deploy-staging", "agent": "deployer", "qg": "check_tests_passed"
}
assert "check_tests_passed" in QG_CHECKS
# No new ORCH-116 table: only the existing tables exist.
conn = get_db()
tables = {r[0] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
conn.close()
assert not any("test_runner" in t for t in tables)
# The machine key is not renamed: the runner emits `result:`.
assert "result:" in test_runner.build_test_report("ORCH-116", 0, "PASS")
# ---------------------------------------------------------------------------
# TC-10 / TC-11 — two-level outcome + never-raise (anti ORCH-110, AC-5/AC-9)
# ---------------------------------------------------------------------------
def test_tc10_nonzero_exit_is_fail_and_advances(monkeypatch):
tid = _make_task("testing")
monkeypatch.setattr(test_runner, "run_test_suite",
lambda repo, branch: ProcResult(returncode=3, stdout="fail", stderr="", timed_out=False))
advance = MagicMock()
monkeypatch.setattr(stage_engine, "advance_stage", advance)
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
advance.assert_called_once()
assert "result: FAIL" in _read_report("orchestrator", "feature/ORCH-116-x", "ORCH-116")
def test_tc10_timeout_defers_without_advance(monkeypatch):
tid = _make_task("testing")
monkeypatch.setattr(cfg.settings, "test_runner_infra_max_retries", 2)
monkeypatch.setattr(test_runner, "run_test_suite",
lambda repo, branch: ProcResult(returncode=None, stdout="", stderr="", timed_out=True))
advance = MagicMock()
monkeypatch.setattr(stage_engine, "advance_stage", advance)
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
advance.assert_not_called() # NO silent advance / false green on infra hiccup
conn = get_db()
n = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE task_id=? AND task_content LIKE ?",
(tid, f"%{test_runner._INFRA_RETRY_MARKER}%"),
).fetchone()[0]
# The re-queued job must be a `tester` job (re-enters this runner), not a deployer.
agent = conn.execute(
"SELECT agent FROM jobs WHERE task_id=? AND task_content LIKE ?",
(tid, f"%{test_runner._INFRA_RETRY_MARKER}%"),
).fetchone()[0]
conn.close()
assert n == 1
assert agent == "tester"
assert test_runner._TEST_RUNNER_COUNTERS["deferred"] == 1
assert test_runner._TEST_RUNNER_COUNTERS["tool_error"] == 1
def test_tc10_tool_error_budget_exhausted_fails_closed(monkeypatch):
tid = _make_task("testing")
monkeypatch.setattr(cfg.settings, "test_runner_infra_max_retries", 0) # exhausted immediately
monkeypatch.setattr(test_runner, "run_test_suite",
lambda repo, branch: ProcResult(returncode=None, stdout="", stderr="", timed_out=True))
advance = MagicMock()
monkeypatch.setattr(stage_engine, "advance_stage", advance)
alert = MagicMock()
monkeypatch.setattr(test_runner, "_alert_infra_exhausted", alert)
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
advance.assert_called_once() # fail-closed: write FAIL + advance (existing rollback)
alert.assert_called_once()
assert "result: FAIL" in _read_report("orchestrator", "feature/ORCH-116-x", "ORCH-116")
def test_tc11_run_gate_never_raises(monkeypatch):
tid = _make_task("testing")
def boom(repo, branch):
raise RuntimeError("pytest exploded")
monkeypatch.setattr(test_runner, "run_test_suite", boom)
# Must NOT raise (AC-9): the worker is protected even on an unexpected error.
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
def test_tc11_launcher_contains_runner_fault(monkeypatch):
tid = _make_task("testing")
jid = _make_job("tester", "orchestrator", tid)
lr = AgentLauncher()
monkeypatch.setattr(lr, "_spawn", MagicMock())
monkeypatch.setattr(test_runner, "run_test_gate",
MagicMock(side_effect=RuntimeError("kaboom")))
ret = lr.launch_job(_job_dict(jid, "tester", "orchestrator", tid))
assert ret is None
conn = get_db()
status = conn.execute("SELECT status FROM jobs WHERE id=?", (jid,)).fetchone()[0]
conn.close()
assert status == "failed"
# ---------------------------------------------------------------------------
# TC-12 — timeout resolution + worktree cwd + tree-kill (NFR-3/NFR-4 / AC-11)
# ---------------------------------------------------------------------------
def test_tc12_resolve_timeout_default_on_bad_value(monkeypatch):
monkeypatch.setattr(cfg.settings, "test_runner_timeout_s", 1234)
assert test_runner._resolve_timeout() == 1234
for bad in (0, -5, "abc", None):
monkeypatch.setattr(cfg.settings, "test_runner_timeout_s", bad)
assert test_runner._resolve_timeout() == test_runner._DEFAULT_TIMEOUT_S
def test_tc12_pytest_runs_in_worktree_via_proc_group(monkeypatch):
monkeypatch.setattr(cfg.settings, "test_runner_timeout_s", 321)
monkeypatch.setattr(cfg.settings, "subprocess_tree_kill_enabled", True)
captured = {}
def fake_run(cmd, *, cwd, timeout, grace_s, tree_kill):
captured.update(cmd=cmd, cwd=cwd, timeout=timeout, tree_kill=tree_kill)
return ProcResult(returncode=0, stdout="", stderr="", timed_out=False)
monkeypatch.setattr(test_runner.proc_group, "run_in_process_group", fake_run)
test_runner.run_test_suite("orchestrator", "feature/ORCH-116-x")
from src.git_worktree import get_worktree_path
assert captured["cwd"] == get_worktree_path("orchestrator", "feature/ORCH-116-x")
assert captured["timeout"] == 321
assert captured["tree_kill"] is True
assert captured["cmd"][:3] == ["python", "-m", "pytest"]
# ---------------------------------------------------------------------------
# TC-13 — self-hosting safety: no forbidden literals; smoke read-only (AC-10)
# ---------------------------------------------------------------------------
def test_tc13_command_has_no_dangerous_literals():
cmd = " ".join(test_runner.build_test_command())
forbidden = ("compose", "up -d", "--build", "8500", "force", "push", ".env",
"rm ", "restart", "docker")
for token in forbidden:
assert token not in cmd, f"forbidden literal {token!r} in runner command: {cmd}"
assert "pytest" in cmd
def test_tc13_smoke_is_read_only_and_checks_serial_gate(monkeypatch):
monkeypatch.setattr(cfg.settings, "test_runner_smoke_enabled", True)
monkeypatch.setattr(test_runner, "_SMOKE_BACKOFF_S", 0)
calls = []
def fake_get(url):
calls.append(url)
if url.endswith("/queue"):
return 200, '{"serial_gate": {"enabled": true}}'
return 200, "ok"
monkeypatch.setattr(test_runner, "_http_get", fake_get)
ok, detail = test_runner.run_smoke()
assert ok is True
# All three read-only endpoints hit; serial_gate verified in /queue body.
assert any(u.endswith("/health") for u in calls)
assert any(u.endswith("/status") for u in calls)
assert any(u.endswith("/queue") for u in calls)
def test_tc13_smoke_missing_serial_gate_fails(monkeypatch):
monkeypatch.setattr(cfg.settings, "test_runner_smoke_enabled", True)
monkeypatch.setattr(test_runner, "_SMOKE_BACKOFF_S", 0)
monkeypatch.setattr(test_runner, "_http_get",
lambda url: (200, "{}")) # 200 but no serial_gate block
ok, detail = test_runner.run_smoke()
assert ok is False
assert "serial_gate" in detail
def test_tc13_smoke_unreachable_retries_then_fails(monkeypatch):
monkeypatch.setattr(cfg.settings, "test_runner_smoke_enabled", True)
monkeypatch.setattr(test_runner, "_SMOKE_BACKOFF_S", 0)
attempts = {"n": 0}
def fake_get(url):
attempts["n"] += 1
return 0, "" # always unreachable
monkeypatch.setattr(test_runner, "_http_get", fake_get)
ok, detail = test_runner.run_smoke()
assert ok is False
# bounded retry: more than one attempt was made before declaring FAIL.
assert attempts["n"] >= test_runner._SMOKE_MAX_ATTEMPTS
def test_tc13_pytest_green_but_smoke_fail_is_fail(monkeypatch):
tid = _make_task("testing")
monkeypatch.setattr(cfg.settings, "test_runner_smoke_enabled", True)
monkeypatch.setattr(test_runner, "run_test_suite",
lambda repo, branch: ProcResult(returncode=0, stdout="ok", stderr="", timed_out=False))
monkeypatch.setattr(test_runner, "run_smoke", lambda: (False, "smoke /queue failed"))
advance = MagicMock()
monkeypatch.setattr(stage_engine, "advance_stage", advance)
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
advance.assert_called_once() # FAIL still advances (existing rollback)
body = _read_report("orchestrator", "feature/ORCH-116-x", "ORCH-116")
assert "result: FAIL" in body
assert "smoke: failed" in body
# ---------------------------------------------------------------------------
# TC-14 — observability + hybrid: /queue block + structured verdict log (AC-12/AC-13)
# ---------------------------------------------------------------------------
def test_tc14_snapshot_shape():
snap = test_runner.snapshot()
for k in ("enabled", "repos", "target", "timeout_s", "smoke_enabled",
"infra_max_retries", "runs", "pass", "fail", "tool_error", "deferred"):
assert k in snap, f"snapshot missing key {k}"
def test_tc14_queue_endpoint_includes_block():
import asyncio
from src import main
payload = asyncio.run(main.queue())
assert "test_runner" in payload
assert "enabled" in payload["test_runner"]
def test_tc14_structured_verdict_log_distinguishes_outcomes(monkeypatch, caplog):
tid = _make_task("testing")
monkeypatch.setattr(stage_engine, "advance_stage", MagicMock())
# code-pass
monkeypatch.setattr(test_runner, "run_test_suite",
lambda repo, branch: ProcResult(returncode=0, stdout="", stderr="", timed_out=False))
with caplog.at_level("INFO", logger="orchestrator.test_runner"):
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
assert any("outcome=code-pass" in r.message for r in caplog.records)
caplog.clear()
# tool-error
monkeypatch.setattr(cfg.settings, "test_runner_infra_max_retries", 2)
monkeypatch.setattr(test_runner, "run_test_suite",
lambda repo, branch: ProcResult(returncode=None, stdout="", stderr="", timed_out=True))
with caplog.at_level("WARNING", logger="orchestrator.test_runner"):
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
assert any("outcome=tool-error" in r.message for r in caplog.records)
def test_tc14_snapshot_never_raises(monkeypatch):
class Boom:
def __getattr__(self, name):
raise RuntimeError("boom")
monkeypatch.setattr(test_runner, "settings", Boom())
snap = test_runner.snapshot()
assert snap["enabled"] is False
# ---------------------------------------------------------------------------
# TC-15 — anti-drift of the LLM call-site map / roadmap stays green after our edits.
# ---------------------------------------------------------------------------
def test_tc15_llm_map_anti_drift_green():
from tests import test_llm_call_site_inventory as inv
from tests import test_llm_determinization_docs as docs
# tester stays avoidable=yes / axis=C / needs-hybrid-fallback (LLM-branch = fallback).
inv.test_tc14_avoidable_set_fixed()
inv.test_tc13_control_path_axis_correct()
inv.test_tc04_classification_total_and_axis_consistent()
# roadmap: exactly one first_slice=yes (deployer); tester rank 2 hybrid.
docs.test_tc07_roadmap_completeness_and_first_slice()
docs.test_tc11_no_fabricated_followup_ids()