Second realised slice of the determinization-roadmap (ORCH-118 A5, needs-hybrid-fallback): on the `testing` stage for the self-hosting `orchestrator` repo the LLM `tester` agent is replaced by a deterministic test-runner (src/test_runner.py), intercepted in launch_job BEFORE _spawn (deploy-finalizer / post-deploy-monitor / staging-runner precedent). It runs the regression `python -m pytest <target>` in the task worktree via proc_group (tree-kill) + an optional read-only smoke (/health, /status, /queue + serial_gate), maps the exit-code -> result: PASS|FAIL via the existing self_deploy.map_exit_code_to_status contract, writes 13-test-report.md and initiates the EXISTING check_tests_passed gate exactly as a finished LLM-tester. Invariant (NFR-1): only the *producer* changes — the artifact contract (13-test-report.md / result:), the gate check_tests_passed / _parse_tests_verdict, STAGE_TRANSITIONS and the DB schema are byte-for-byte UNCHANGED. Additive, under a kill-switch (test_runner_enabled), never-raise, fail-closed, self-hosting scope, two-level outcome (tool-error DEFER, anti ORCH-110), hybrid (LLM strictly off-control-path). 52c-`status:` is aligned with the verdict (D6.1) so the three-field _parse_tests_verdict never false-negatives a PASS. Docs (ORCH-118 NFR-6, atomic with code): llm-call-sites.md (A5 implemented), llm-determinization-roadmap.md (rank 2 implemented), llm-usage-policy.md, README/internals/overview, tester.md, CLAUDE.md, CHANGELOG.md. Coverage: tests/test_orch116_test_runner.py (TC-01..TC-14); LLM anti-drift tests green. Full suite: 2137 passed. Refs: ORCH-116 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
561 lines
26 KiB
Python
561 lines
26 KiB
Python
"""ORCH-116 — deterministic test-runner replacing the LLM tester on the `testing`
|
|
stage (self-hosting orchestrator).
|
|
|
|
Covers Phase 1 (04-test-plan.yaml TC-01…TC-14): the launch_job intercept BEFORE
|
|
_spawn, the exit-code -> result: PASS|FAIL mapping, 13-test-report.md render + the
|
|
UNCHANGED gate contract (incl. the tester-specific D6.1 52c-`status:`↔parser
|
|
anti-collision), advance_stage initiation, kill-switch / scope / backward-compat for a
|
|
repo without a test-contract, never-raise / fail-safe, the bounded tool-error DEFER
|
|
(anti ORCH-110), process timeout (proc_group / worktree), read-only smoke, self-hosting
|
|
safety, the anti-drift invariant (STAGE_TRANSITIONS / QG_CHECKS / check_tests_passed /
|
|
_parse_tests_verdict / DB schema untouched), and the /queue observability block.
|
|
|
|
No live Claude CLI, network or worktree git: the pytest subprocess, the smoke GETs and
|
|
advance_stage are mocked; the pure mapping/render is tested directly. (TC-15 — the LLM
|
|
call-site map anti-drift — lives in tests/test_llm_call_site_inventory.py /
|
|
test_llm_determinization_docs.py and is asserted green here too.)
|
|
"""
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orch116_test_runner.db")
|
|
os.environ["ORCH_DB_PATH"] = _test_db
|
|
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
|
|
|
from unittest.mock import MagicMock # noqa: E402
|
|
|
|
import src.db as _db # noqa: E402
|
|
from src.db import init_db, get_db, enqueue_job # noqa: E402
|
|
from src import test_runner # noqa: E402
|
|
from src import stage_engine # noqa: E402
|
|
from src import config as cfg # noqa: E402
|
|
from src.proc_group import ProcResult # noqa: E402
|
|
from src.agents.launcher import AgentLauncher # noqa: E402
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
@pytest.fixture(autouse=True)
|
|
def fresh_db(monkeypatch, tmp_path):
|
|
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
|
if os.path.exists(_test_db):
|
|
os.unlink(_test_db)
|
|
init_db()
|
|
# Worktree artefacts land in tmp; git commit/push degrade gracefully (no repo).
|
|
monkeypatch.setattr("src.git_worktree.settings.worktrees_dir", str(tmp_path), raising=False)
|
|
# Runner ON, self-hosting scope (default). Smoke OFF by default so the PASS path
|
|
# does not hit the network; the dedicated smoke tests re-enable + mock it.
|
|
monkeypatch.setattr(cfg.settings, "test_runner_enabled", True, raising=False)
|
|
monkeypatch.setattr(cfg.settings, "test_runner_repos", "", raising=False)
|
|
monkeypatch.setattr(cfg.settings, "test_runner_smoke_enabled", False, raising=False)
|
|
monkeypatch.setattr(cfg.settings, "test_runner_infra_max_retries", 2, raising=False)
|
|
# Reset in-process counters between tests.
|
|
for k in test_runner._TEST_RUNNER_COUNTERS:
|
|
test_runner._TEST_RUNNER_COUNTERS[k] = 0
|
|
yield
|
|
|
|
|
|
def _make_task(stage, repo="orchestrator", wi="ORCH-116", branch="feature/ORCH-116-x"):
|
|
conn = get_db()
|
|
cur = conn.execute(
|
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)",
|
|
(f"plane-{wi}", wi, repo, branch, stage),
|
|
)
|
|
tid = cur.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
return tid
|
|
|
|
|
|
def _make_job(agent, repo, task_id, content="x"):
|
|
return enqueue_job(agent, repo, content, task_id=task_id)
|
|
|
|
|
|
def _job_dict(job_id, agent, repo, task_id):
|
|
return {"id": job_id, "agent": agent, "repo": repo, "task_id": task_id, "task_content": "x"}
|
|
|
|
|
|
def _read_report(repo, branch, wi):
|
|
from src.git_worktree import get_worktree_path
|
|
p = os.path.join(get_worktree_path(repo, branch), f"docs/work-items/{wi}/13-test-report.md")
|
|
with open(p, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-01 — applies(): kill-switch / scope / contract / fail-safe (FR-7 / AC-6/AC-7/AC-8)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc01_applies_killswitch_scope_and_contract(monkeypatch):
|
|
# enabled=False -> False (fall back to the LLM path) regardless of repo.
|
|
monkeypatch.setattr(cfg.settings, "test_runner_enabled", False)
|
|
assert test_runner.applies("orchestrator") is False
|
|
|
|
# enabled, empty CSV -> self-hosting only (and a test-contract resolves for it).
|
|
monkeypatch.setattr(cfg.settings, "test_runner_enabled", True)
|
|
monkeypatch.setattr(cfg.settings, "test_runner_repos", "")
|
|
assert test_runner.applies("orchestrator") is True
|
|
assert test_runner.applies("enduro-trails") is False
|
|
assert test_runner.applies("") is False
|
|
|
|
# BR-9 backward-compat: even hand-adding a non-self repo to the CSV -> False,
|
|
# because _has_test_contract is unresolved for it (Phase 1 = self only).
|
|
monkeypatch.setattr(cfg.settings, "test_runner_repos", "enduro-trails, orchestrator")
|
|
assert test_runner.applies("ENDURO-TRAILS") is False # no contract -> LLM-tester
|
|
assert test_runner.applies("orchestrator") is True # in scope + contract
|
|
assert test_runner.applies("other-repo") is False
|
|
|
|
|
|
def test_tc01_applies_never_raises(monkeypatch):
|
|
class Boom:
|
|
@property
|
|
def test_runner_enabled(self):
|
|
raise RuntimeError("boom")
|
|
monkeypatch.setattr(test_runner, "settings", Boom())
|
|
assert test_runner.applies("orchestrator") is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-02 — exit-code -> verdict, single shared contract (FR-3 / AC-3)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc02_map_exit_code():
|
|
from src import self_deploy
|
|
assert test_runner.map_exit_code_to_result(0) == "PASS"
|
|
for bad in (1, 2, 137, -9):
|
|
assert test_runner.map_exit_code_to_result(bad) == "FAIL"
|
|
for noncode in (None, "x", object()):
|
|
assert test_runner.map_exit_code_to_result(noncode) == "FAIL"
|
|
# Same underlying contract as the deploy-finalizer / staging-runner (no 2nd mapping):
|
|
# PASS iff SUCCESS, FAIL iff FAILED.
|
|
for v in (0, 1, None, "garbage"):
|
|
expect = "PASS" if self_deploy.map_exit_code_to_status(v) == "SUCCESS" else "FAIL"
|
|
assert test_runner.map_exit_code_to_result(v) == expect
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-03 — 13-test-report.md render: machine key + 52c schema + status alignment (FR-4 / AC-2)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc03_report_render_schema_and_status_alignment():
|
|
body = test_runner.build_test_report("ORCH-116", 0, "PASS", "23 passed", smoke="ok")
|
|
assert "result: PASS" in body # UPPERCASE machine key
|
|
for field in ("work_item: ORCH-116", "stage: testing",
|
|
"author_agent: test-runner", "status: success",
|
|
"created_at:", "model_used: n/a"):
|
|
assert field in body, f"missing 52c field: {field}"
|
|
assert "smoke: ok" in body
|
|
assert "23 passed" in body # stdout tail copied into the body
|
|
|
|
failed = test_runner.build_test_report("ORCH-116", 1, "FAIL", "boom", smoke="skipped")
|
|
assert "result: FAIL" in failed
|
|
# D6.1: status MUST be aligned with the verdict (failed, never `success`).
|
|
assert "status: failed" in failed
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-04 — generated report read by the UNCHANGED _parse_tests_verdict (AC-2 / D6.1)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc04_gate_parser_unchanged():
|
|
from src.qg.checks import _parse_tests_verdict
|
|
ok, reason = _parse_tests_verdict(
|
|
test_runner.build_test_report("ORCH-116", 0, "PASS", "", smoke="ok"))
|
|
assert ok is True and "PASS" in reason
|
|
bad, reason2 = _parse_tests_verdict(
|
|
test_runner.build_test_report("ORCH-116", 2, "FAIL", "", smoke="skipped"))
|
|
assert bad is False and "FAIL" in reason2
|
|
|
|
|
|
def test_tc04_status_field_never_false_negatives_a_pass():
|
|
"""The D6.1 mine: the 52c-mandatory `status:` is read by _parse_tests_verdict with
|
|
negative-token priority. A PASS report must NOT carry a status whose UPPERCASE is a
|
|
negative token (else a healthy run reads as FAIL)."""
|
|
from src.qg.checks import _parse_tests_verdict, _TESTS_NEGATIVE_TOKENS
|
|
body = test_runner.build_test_report("ORCH-116", 0, "PASS", "", smoke="ok")
|
|
# Extract the status: line from the frontmatter and assert it is token-safe.
|
|
status_line = next(ln for ln in body.splitlines() if ln.startswith("status:"))
|
|
status_val = status_line.split(":", 1)[1].strip().upper()
|
|
for neg in _TESTS_NEGATIVE_TOKENS:
|
|
assert neg not in status_val, f"PASS report status carries negative token {neg!r}"
|
|
assert _parse_tests_verdict(body)[0] is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-05 — launch_job intercepts tester on testing BEFORE _spawn (AC-1)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc05_launch_job_intercepts_before_spawn(monkeypatch):
|
|
tid = _make_task("testing")
|
|
jid = _make_job("tester", "orchestrator", tid)
|
|
lr = AgentLauncher()
|
|
spawn = MagicMock(return_value=999)
|
|
monkeypatch.setattr(lr, "_spawn", spawn)
|
|
run_gate = MagicMock()
|
|
monkeypatch.setattr(test_runner, "run_test_gate", run_gate)
|
|
|
|
ret = lr.launch_job(_job_dict(jid, "tester", "orchestrator", tid))
|
|
|
|
assert ret is None # no agent_runs row
|
|
spawn.assert_not_called() # LLM path NOT taken
|
|
run_gate.assert_called_once() # deterministic runner ran
|
|
conn = get_db()
|
|
status = conn.execute("SELECT status FROM jobs WHERE id=?", (jid,)).fetchone()[0]
|
|
conn.close()
|
|
assert status == "done"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-06 — discriminator: tester off-`testing` / non-tester / never-raise (AC-1 / R-1)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc06_stage_discriminator_off_testing_not_intercepted(monkeypatch):
|
|
tid = _make_task("development") # tester job but not the testing stage
|
|
jid = _make_job("tester", "orchestrator", tid)
|
|
assert test_runner.should_intercept(_job_dict(jid, "tester", "orchestrator", tid)) is False
|
|
# and launch_job falls through to _spawn for it.
|
|
lr = AgentLauncher()
|
|
spawn = MagicMock(return_value=42)
|
|
monkeypatch.setattr(lr, "_spawn", spawn)
|
|
ret = lr.launch_job(_job_dict(jid, "tester", "orchestrator", tid))
|
|
assert ret == 42
|
|
spawn.assert_called_once()
|
|
|
|
|
|
def test_tc06_non_tester_not_intercepted():
|
|
tid = _make_task("testing")
|
|
jid = _make_job("developer", "orchestrator", tid)
|
|
assert test_runner.should_intercept(_job_dict(jid, "developer", "orchestrator", tid)) is False
|
|
|
|
|
|
def test_tc06_non_self_repo_not_intercepted():
|
|
tid = _make_task("testing", repo="enduro-trails", wi="ET-9", branch="feature/ET-9-x")
|
|
jid = _make_job("tester", "enduro-trails", tid)
|
|
assert test_runner.should_intercept(_job_dict(jid, "tester", "enduro-trails", tid)) is False
|
|
|
|
|
|
def test_tc06_should_intercept_never_raises(monkeypatch):
|
|
# applies() True, but the DB lookup explodes -> should_intercept False (fall-through).
|
|
monkeypatch.setattr(test_runner, "applies", lambda repo: True)
|
|
monkeypatch.setattr(test_runner, "get_db", None, raising=False)
|
|
|
|
def boom_get_db():
|
|
raise RuntimeError("db down")
|
|
import src.db as dbmod
|
|
monkeypatch.setattr(dbmod, "get_db", boom_get_db)
|
|
assert test_runner.should_intercept(
|
|
{"agent": "tester", "repo": "orchestrator", "task_id": 1}) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-07 — routing equivalence: PASS -> advance; FAIL -> same path (AC-4)
|
|
# ---------------------------------------------------------------------------
|
|
@pytest.mark.parametrize("rc,expected", [(0, "PASS"), (1, "FAIL")])
|
|
def test_tc07_advance_initiated_like_llm(monkeypatch, rc, expected):
|
|
tid = _make_task("testing")
|
|
monkeypatch.setattr(test_runner, "run_test_suite",
|
|
lambda repo, branch: ProcResult(returncode=rc, stdout="out", stderr="", timed_out=False))
|
|
advance = MagicMock()
|
|
monkeypatch.setattr(stage_engine, "advance_stage", advance)
|
|
|
|
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
|
|
|
|
advance.assert_called_once()
|
|
kwargs = advance.call_args.kwargs
|
|
assert kwargs["current_stage"] == "testing"
|
|
assert kwargs["finished_agent"] == "tester"
|
|
assert kwargs["work_item_id"] == "ORCH-116"
|
|
assert f"result: {expected}" in _read_report("orchestrator", "feature/ORCH-116-x", "ORCH-116")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-08 — kill-switch: enabled=False -> LLM path via _spawn (AC-7)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc08_killswitch_falls_back_to_spawn(monkeypatch):
|
|
monkeypatch.setattr(cfg.settings, "test_runner_enabled", False)
|
|
tid = _make_task("testing")
|
|
jid = _make_job("tester", "orchestrator", tid)
|
|
lr = AgentLauncher()
|
|
spawn = MagicMock(return_value=7)
|
|
monkeypatch.setattr(lr, "_spawn", spawn)
|
|
run_gate = MagicMock()
|
|
monkeypatch.setattr(test_runner, "run_test_gate", run_gate)
|
|
|
|
ret = lr.launch_job(_job_dict(jid, "tester", "orchestrator", tid))
|
|
|
|
assert ret == 7
|
|
spawn.assert_called_once() # byte-for-byte the prior LLM-tester path
|
|
run_gate.assert_not_called()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-09 — anti-drift NFR-1: STAGE_TRANSITIONS / QG_CHECKS / schema untouched (AC-6)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc09_pipeline_contract_unchanged():
|
|
from src.stages import STAGE_TRANSITIONS
|
|
from src.qg.checks import QG_CHECKS
|
|
# The review->testing edge (tester) and testing->deploy-staging gate are byte-for-byte.
|
|
assert STAGE_TRANSITIONS["review"] == {
|
|
"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"
|
|
}
|
|
assert STAGE_TRANSITIONS["testing"] == {
|
|
"next": "deploy-staging", "agent": "deployer", "qg": "check_tests_passed"
|
|
}
|
|
assert "check_tests_passed" in QG_CHECKS
|
|
# No new ORCH-116 table: only the existing tables exist.
|
|
conn = get_db()
|
|
tables = {r[0] for r in conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
|
|
conn.close()
|
|
assert not any("test_runner" in t for t in tables)
|
|
# The machine key is not renamed: the runner emits `result:`.
|
|
assert "result:" in test_runner.build_test_report("ORCH-116", 0, "PASS")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-10 / TC-11 — two-level outcome + never-raise (anti ORCH-110, AC-5/AC-9)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc10_nonzero_exit_is_fail_and_advances(monkeypatch):
|
|
tid = _make_task("testing")
|
|
monkeypatch.setattr(test_runner, "run_test_suite",
|
|
lambda repo, branch: ProcResult(returncode=3, stdout="fail", stderr="", timed_out=False))
|
|
advance = MagicMock()
|
|
monkeypatch.setattr(stage_engine, "advance_stage", advance)
|
|
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
|
|
advance.assert_called_once()
|
|
assert "result: FAIL" in _read_report("orchestrator", "feature/ORCH-116-x", "ORCH-116")
|
|
|
|
|
|
def test_tc10_timeout_defers_without_advance(monkeypatch):
|
|
tid = _make_task("testing")
|
|
monkeypatch.setattr(cfg.settings, "test_runner_infra_max_retries", 2)
|
|
monkeypatch.setattr(test_runner, "run_test_suite",
|
|
lambda repo, branch: ProcResult(returncode=None, stdout="", stderr="", timed_out=True))
|
|
advance = MagicMock()
|
|
monkeypatch.setattr(stage_engine, "advance_stage", advance)
|
|
|
|
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
|
|
|
|
advance.assert_not_called() # NO silent advance / false green on infra hiccup
|
|
conn = get_db()
|
|
n = conn.execute(
|
|
"SELECT COUNT(*) FROM jobs WHERE task_id=? AND task_content LIKE ?",
|
|
(tid, f"%{test_runner._INFRA_RETRY_MARKER}%"),
|
|
).fetchone()[0]
|
|
# The re-queued job must be a `tester` job (re-enters this runner), not a deployer.
|
|
agent = conn.execute(
|
|
"SELECT agent FROM jobs WHERE task_id=? AND task_content LIKE ?",
|
|
(tid, f"%{test_runner._INFRA_RETRY_MARKER}%"),
|
|
).fetchone()[0]
|
|
conn.close()
|
|
assert n == 1
|
|
assert agent == "tester"
|
|
assert test_runner._TEST_RUNNER_COUNTERS["deferred"] == 1
|
|
assert test_runner._TEST_RUNNER_COUNTERS["tool_error"] == 1
|
|
|
|
|
|
def test_tc10_tool_error_budget_exhausted_fails_closed(monkeypatch):
|
|
tid = _make_task("testing")
|
|
monkeypatch.setattr(cfg.settings, "test_runner_infra_max_retries", 0) # exhausted immediately
|
|
monkeypatch.setattr(test_runner, "run_test_suite",
|
|
lambda repo, branch: ProcResult(returncode=None, stdout="", stderr="", timed_out=True))
|
|
advance = MagicMock()
|
|
monkeypatch.setattr(stage_engine, "advance_stage", advance)
|
|
alert = MagicMock()
|
|
monkeypatch.setattr(test_runner, "_alert_infra_exhausted", alert)
|
|
|
|
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
|
|
|
|
advance.assert_called_once() # fail-closed: write FAIL + advance (existing rollback)
|
|
alert.assert_called_once()
|
|
assert "result: FAIL" in _read_report("orchestrator", "feature/ORCH-116-x", "ORCH-116")
|
|
|
|
|
|
def test_tc11_run_gate_never_raises(monkeypatch):
|
|
tid = _make_task("testing")
|
|
|
|
def boom(repo, branch):
|
|
raise RuntimeError("pytest exploded")
|
|
monkeypatch.setattr(test_runner, "run_test_suite", boom)
|
|
# Must NOT raise (AC-9): the worker is protected even on an unexpected error.
|
|
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
|
|
|
|
|
|
def test_tc11_launcher_contains_runner_fault(monkeypatch):
|
|
tid = _make_task("testing")
|
|
jid = _make_job("tester", "orchestrator", tid)
|
|
lr = AgentLauncher()
|
|
monkeypatch.setattr(lr, "_spawn", MagicMock())
|
|
monkeypatch.setattr(test_runner, "run_test_gate",
|
|
MagicMock(side_effect=RuntimeError("kaboom")))
|
|
ret = lr.launch_job(_job_dict(jid, "tester", "orchestrator", tid))
|
|
assert ret is None
|
|
conn = get_db()
|
|
status = conn.execute("SELECT status FROM jobs WHERE id=?", (jid,)).fetchone()[0]
|
|
conn.close()
|
|
assert status == "failed"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-12 — timeout resolution + worktree cwd + tree-kill (NFR-3/NFR-4 / AC-11)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc12_resolve_timeout_default_on_bad_value(monkeypatch):
|
|
monkeypatch.setattr(cfg.settings, "test_runner_timeout_s", 1234)
|
|
assert test_runner._resolve_timeout() == 1234
|
|
for bad in (0, -5, "abc", None):
|
|
monkeypatch.setattr(cfg.settings, "test_runner_timeout_s", bad)
|
|
assert test_runner._resolve_timeout() == test_runner._DEFAULT_TIMEOUT_S
|
|
|
|
|
|
def test_tc12_pytest_runs_in_worktree_via_proc_group(monkeypatch):
|
|
monkeypatch.setattr(cfg.settings, "test_runner_timeout_s", 321)
|
|
monkeypatch.setattr(cfg.settings, "subprocess_tree_kill_enabled", True)
|
|
captured = {}
|
|
|
|
def fake_run(cmd, *, cwd, timeout, grace_s, tree_kill):
|
|
captured.update(cmd=cmd, cwd=cwd, timeout=timeout, tree_kill=tree_kill)
|
|
return ProcResult(returncode=0, stdout="", stderr="", timed_out=False)
|
|
monkeypatch.setattr(test_runner.proc_group, "run_in_process_group", fake_run)
|
|
|
|
test_runner.run_test_suite("orchestrator", "feature/ORCH-116-x")
|
|
from src.git_worktree import get_worktree_path
|
|
assert captured["cwd"] == get_worktree_path("orchestrator", "feature/ORCH-116-x")
|
|
assert captured["timeout"] == 321
|
|
assert captured["tree_kill"] is True
|
|
assert captured["cmd"][:3] == ["python", "-m", "pytest"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-13 — self-hosting safety: no forbidden literals; smoke read-only (AC-10)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc13_command_has_no_dangerous_literals():
|
|
cmd = " ".join(test_runner.build_test_command())
|
|
forbidden = ("compose", "up -d", "--build", "8500", "force", "push", ".env",
|
|
"rm ", "restart", "docker")
|
|
for token in forbidden:
|
|
assert token not in cmd, f"forbidden literal {token!r} in runner command: {cmd}"
|
|
assert "pytest" in cmd
|
|
|
|
|
|
def test_tc13_smoke_is_read_only_and_checks_serial_gate(monkeypatch):
|
|
monkeypatch.setattr(cfg.settings, "test_runner_smoke_enabled", True)
|
|
monkeypatch.setattr(test_runner, "_SMOKE_BACKOFF_S", 0)
|
|
calls = []
|
|
|
|
def fake_get(url):
|
|
calls.append(url)
|
|
if url.endswith("/queue"):
|
|
return 200, '{"serial_gate": {"enabled": true}}'
|
|
return 200, "ok"
|
|
monkeypatch.setattr(test_runner, "_http_get", fake_get)
|
|
|
|
ok, detail = test_runner.run_smoke()
|
|
assert ok is True
|
|
# All three read-only endpoints hit; serial_gate verified in /queue body.
|
|
assert any(u.endswith("/health") for u in calls)
|
|
assert any(u.endswith("/status") for u in calls)
|
|
assert any(u.endswith("/queue") for u in calls)
|
|
|
|
|
|
def test_tc13_smoke_missing_serial_gate_fails(monkeypatch):
|
|
monkeypatch.setattr(cfg.settings, "test_runner_smoke_enabled", True)
|
|
monkeypatch.setattr(test_runner, "_SMOKE_BACKOFF_S", 0)
|
|
monkeypatch.setattr(test_runner, "_http_get",
|
|
lambda url: (200, "{}")) # 200 but no serial_gate block
|
|
ok, detail = test_runner.run_smoke()
|
|
assert ok is False
|
|
assert "serial_gate" in detail
|
|
|
|
|
|
def test_tc13_smoke_unreachable_retries_then_fails(monkeypatch):
|
|
monkeypatch.setattr(cfg.settings, "test_runner_smoke_enabled", True)
|
|
monkeypatch.setattr(test_runner, "_SMOKE_BACKOFF_S", 0)
|
|
attempts = {"n": 0}
|
|
|
|
def fake_get(url):
|
|
attempts["n"] += 1
|
|
return 0, "" # always unreachable
|
|
monkeypatch.setattr(test_runner, "_http_get", fake_get)
|
|
ok, detail = test_runner.run_smoke()
|
|
assert ok is False
|
|
# bounded retry: more than one attempt was made before declaring FAIL.
|
|
assert attempts["n"] >= test_runner._SMOKE_MAX_ATTEMPTS
|
|
|
|
|
|
def test_tc13_pytest_green_but_smoke_fail_is_fail(monkeypatch):
|
|
tid = _make_task("testing")
|
|
monkeypatch.setattr(cfg.settings, "test_runner_smoke_enabled", True)
|
|
monkeypatch.setattr(test_runner, "run_test_suite",
|
|
lambda repo, branch: ProcResult(returncode=0, stdout="ok", stderr="", timed_out=False))
|
|
monkeypatch.setattr(test_runner, "run_smoke", lambda: (False, "smoke /queue failed"))
|
|
advance = MagicMock()
|
|
monkeypatch.setattr(stage_engine, "advance_stage", advance)
|
|
|
|
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
|
|
|
|
advance.assert_called_once() # FAIL still advances (existing rollback)
|
|
body = _read_report("orchestrator", "feature/ORCH-116-x", "ORCH-116")
|
|
assert "result: FAIL" in body
|
|
assert "smoke: failed" in body
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-14 — observability + hybrid: /queue block + structured verdict log (AC-12/AC-13)
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc14_snapshot_shape():
|
|
snap = test_runner.snapshot()
|
|
for k in ("enabled", "repos", "target", "timeout_s", "smoke_enabled",
|
|
"infra_max_retries", "runs", "pass", "fail", "tool_error", "deferred"):
|
|
assert k in snap, f"snapshot missing key {k}"
|
|
|
|
|
|
def test_tc14_queue_endpoint_includes_block():
|
|
import asyncio
|
|
from src import main
|
|
payload = asyncio.run(main.queue())
|
|
assert "test_runner" in payload
|
|
assert "enabled" in payload["test_runner"]
|
|
|
|
|
|
def test_tc14_structured_verdict_log_distinguishes_outcomes(monkeypatch, caplog):
|
|
tid = _make_task("testing")
|
|
monkeypatch.setattr(stage_engine, "advance_stage", MagicMock())
|
|
# code-pass
|
|
monkeypatch.setattr(test_runner, "run_test_suite",
|
|
lambda repo, branch: ProcResult(returncode=0, stdout="", stderr="", timed_out=False))
|
|
with caplog.at_level("INFO", logger="orchestrator.test_runner"):
|
|
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
|
|
assert any("outcome=code-pass" in r.message for r in caplog.records)
|
|
|
|
caplog.clear()
|
|
# tool-error
|
|
monkeypatch.setattr(cfg.settings, "test_runner_infra_max_retries", 2)
|
|
monkeypatch.setattr(test_runner, "run_test_suite",
|
|
lambda repo, branch: ProcResult(returncode=None, stdout="", stderr="", timed_out=True))
|
|
with caplog.at_level("WARNING", logger="orchestrator.test_runner"):
|
|
test_runner.run_test_gate(_job_dict(1, "tester", "orchestrator", tid))
|
|
assert any("outcome=tool-error" in r.message for r in caplog.records)
|
|
|
|
|
|
def test_tc14_snapshot_never_raises(monkeypatch):
|
|
class Boom:
|
|
def __getattr__(self, name):
|
|
raise RuntimeError("boom")
|
|
monkeypatch.setattr(test_runner, "settings", Boom())
|
|
snap = test_runner.snapshot()
|
|
assert snap["enabled"] is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TC-15 — anti-drift of the LLM call-site map / roadmap stays green after our edits.
|
|
# ---------------------------------------------------------------------------
|
|
def test_tc15_llm_map_anti_drift_green():
|
|
from tests import test_llm_call_site_inventory as inv
|
|
from tests import test_llm_determinization_docs as docs
|
|
# tester stays avoidable=yes / axis=C / needs-hybrid-fallback (LLM-branch = fallback).
|
|
inv.test_tc14_avoidable_set_fixed()
|
|
inv.test_tc13_control_path_axis_correct()
|
|
inv.test_tc04_classification_total_and_axis_consistent()
|
|
# roadmap: exactly one first_slice=yes (deployer); tester rank 2 hybrid.
|
|
docs.test_tc07_roadmap_completeness_and_first_slice()
|
|
docs.test_tc11_no_fabricated_followup_ids()
|