feat(testing): deterministic test-runner replacing LLM tester on the testing stage (ORCH-116)
All checks were successful
CI / test (push) Successful in 1m9s
CI / test (pull_request) Successful in 1m7s

Second realised slice of the determinization-roadmap (ORCH-118 A5,
needs-hybrid-fallback): on the `testing` stage for the self-hosting
`orchestrator` repo the LLM `tester` agent is replaced by a deterministic
test-runner (src/test_runner.py), intercepted in launch_job BEFORE _spawn
(deploy-finalizer / post-deploy-monitor / staging-runner precedent).

It runs the regression `python -m pytest <target>` in the task worktree via
proc_group (tree-kill) + an optional read-only smoke (/health, /status, /queue
+ serial_gate), maps the exit-code -> result: PASS|FAIL via the existing
self_deploy.map_exit_code_to_status contract, writes 13-test-report.md and
initiates the EXISTING check_tests_passed gate exactly as a finished LLM-tester.

Invariant (NFR-1): only the *producer* changes — the artifact contract
(13-test-report.md / result:), the gate check_tests_passed / _parse_tests_verdict,
STAGE_TRANSITIONS and the DB schema are byte-for-byte UNCHANGED. Additive, under
a kill-switch (test_runner_enabled), never-raise, fail-closed, self-hosting scope,
two-level outcome (tool-error DEFER, anti ORCH-110), hybrid (LLM strictly
off-control-path). 52c-`status:` is aligned with the verdict (D6.1) so the
three-field _parse_tests_verdict never false-negatives a PASS.

Docs (ORCH-118 NFR-6, atomic with code): llm-call-sites.md (A5 implemented),
llm-determinization-roadmap.md (rank 2 implemented), llm-usage-policy.md,
README/internals/overview, tester.md, CLAUDE.md, CHANGELOG.md. Coverage:
tests/test_orch116_test_runner.py (TC-01..TC-14); LLM anti-drift tests green.
Full suite: 2137 passed.

Refs: ORCH-116
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-16 02:59:51 +03:00
parent 04f9b9cbce
commit 80aa2409f7
17 changed files with 1506 additions and 8 deletions

View File

@@ -406,6 +406,17 @@ class AgentLauncher:
from .. import staging_runner
if staging_runner.should_intercept(job):
return self._run_staging_runner_job(job)
# ORCH-116: deterministic test-runner intercept (BEFORE _spawn). The LLM
# ``tester`` on the ``testing`` stage (self-hosting scope, repo with a
# test-contract) is replaced by a DETERMINISTIC test-runner (same precedent
# as deploy-finalizer / post-deploy-monitor / staging-runner). ``tester`` is
# the only agent entering ``testing``; the stage guard is defense-in-depth.
# Kill-switch off / out of scope / no contract -> should_intercept False ->
# the prior LLM tester runs via _spawn byte-for-byte.
if job.get("agent") == "tester":
from .. import test_runner
if test_runner.should_intercept(job):
return self._run_test_runner_job(job)
return self._spawn(
job["agent"],
job["repo"],
@@ -457,6 +468,28 @@ class AgentLauncher:
pass
return None
def _run_test_runner_job(self, job: dict):
"""ORCH-116: run the deterministic test gate for a tester job on `testing`.
Not an LLM spawn — there is no subprocess/monitor of an agent, so we mark the
jobs row done/failed here (mirror of _run_staging_runner_job). The runner
never-raises, but we guard anyway so a runner fault can't wedge the worker.
Returns None (no agent_run row, _spawn not called).
"""
from ..db import mark_job
from .. import test_runner
try:
test_runner.run_test_gate(job)
mark_job(job["id"], "done")
logger.info(f"test-runner job {job['id']} done")
except Exception as e:
logger.error(f"test-runner job {job['id']} failed: {e}")
try:
mark_job(job["id"], "failed", error=f"test-runner error: {e}")
except Exception:
pass
return None
def _run_post_deploy_monitor_job(self, job: dict):
"""ORCH-021: run one deterministic post-deploy monitor tick for a job.

View File

@@ -458,6 +458,57 @@ class Settings(BaseSettings):
staging_runner_infra_max_retries: int = 2
staging_runner_infra_retry_delay_s: int = 30
# ORCH-116: deterministic test-runner replacing the LLM `tester` agent on the
# `testing` stage for the self-hosting orchestrator (second realised slice of the
# determinization-roadmap, A5/tester needs-hybrid-fallback; first was ORCH-115/
# deployer). A new leaf src/test_runner.py (never-raise) is intercepted in
# launch_job BEFORE _spawn (mirroring the deploy-finalizer / post-deploy-monitor /
# staging-runner precedent, launcher.py:397/402/405): it runs the SAME regression
# `python -m pytest <target>` in the task worktree via proc_group (tree-kill) +
# the read-only smoke (/health,/status,/queue + serial_gate block), maps the
# exit-code -> result: PASS|FAIL via the existing self_deploy.map_exit_code_to_status
# contract, writes 13-test-report.md, and initiates the EXISTING check_tests_passed
# gate exactly as a finished LLM-tester would. The artifact contract, the gate,
# STAGE_TRANSITIONS and the DB schema are byte-for-byte UNCHANGED — this only
# replaces the *producer* of the artifact. Pattern = staging_runner_* / coverage_gate_*.
# See docs/work-items/ORCH-116/06-adr/ADR-001-deterministic-test-runner.md and
# docs/architecture/adr/adr-0049-deterministic-test-runner.md.
# test_runner_enabled -> SINGLE kill-switch (env ORCH_TEST_RUNNER_ENABLED).
# False -> the intercept never fires -> the prior
# LLM tester runs on testing via _spawn byte-for-byte
# as before ORCH-116 (D8/AC-7).
# test_runner_repos -> CSV scope (env ORCH_TEST_RUNNER_REPOS). Empty ->
# self-hosting only (orchestrator) via
# is_self_hosting_repo; non-empty -> membership.
# A repo with no resolvable test-contract is never
# intercepted even if listed (BR-9, _has_test_contract).
# test_runner_target -> pytest target of the test-contract (env
# ORCH_TEST_RUNNER_TARGET; convention merge_retest_target).
# test_runner_timeout_s -> wall-clock budget for the pytest regression (env
# ORCH_TEST_RUNNER_TIMEOUT_S). Malformed/non-positive ->
# default + WARNING (never-break). Aligned with the
# cross-cutting budget invariant ORCH-065/109/110 WITHOUT
# touching reaper_max_running_s (D9): it replaces the
# <=1800s LLM testing window (agent_timeout_seconds) with a
# bounded <=900s deterministic one (Σ on the edge does not grow).
# test_runner_smoke_enabled -> optional read-only smoke (env ORCH_TEST_RUNNER_SMOKE_ENABLED).
# False -> pytest exit-code is the sole signal (smoke skipped).
# test_runner_infra_max_retries -> tool-error (suite did NOT execute: spawn-error / timeout /
# returncode None) bounded DEFER budget before a fail-closed
# FAIL (env ORCH_TEST_RUNNER_INFRA_MAX_RETRIES). Mirrors
# staging_runner_infra_max_retries — infra hiccup is NOT a
# code-fault, so it never burns a developer-retry until the
# budget is exhausted (D5, anti ORCH-110).
# test_runner_infra_retry_delay_s-> delay before the re-queued tester job (env
# ORCH_TEST_RUNNER_INFRA_RETRY_DELAY_S).
test_runner_enabled: bool = True
test_runner_repos: str = ""
test_runner_target: str = "tests/"
test_runner_timeout_s: int = 900
test_runner_smoke_enabled: bool = True
test_runner_infra_max_retries: int = 2
test_runner_infra_retry_delay_s: int = 30
# ORCH-098 (FND/F2): machine lessons-journal — additive `lessons` table + leaf
# src/lessons.py (never-raise observer, by образцу serial_gate/coverage_gate/
# metrics). The journal is an OBSERVER, never a Quality Gate: writing a lesson

View File

@@ -236,6 +236,7 @@ async def queue():
from . import checkout_hygiene
from . import transition_lease
from . import staging_runner
from . import test_runner
from .disk_watchdog import disk_watchdog
from .build_cache_pruner import build_cache_pruner
return {
@@ -289,6 +290,11 @@ async def queue():
# failed/tool_error/deferred counters, so a code-fail FAILED is distinguishable
# from an infra tool-error. Additive block; never-raise.
"staging_runner": staging_runner.snapshot(),
# ORCH-116 (FR-8 / AC-13): deterministic test-runner observability (read-only)
# — kill-switch, scope, target, timeout/smoke/infra budget + run/pass/fail/
# tool_error/deferred counters, so a code-fail FAIL is distinguishable from an
# infra tool-error. Additive block; never-raise.
"test_runner": test_runner.snapshot(),
# ORCH-098 (FR-4 / AC-4): lessons-journal observability (read-only) —
# kill-switch + counts by type/status + last N lessons. Additive block;
# never-raise (snapshot() returns {"enabled": ...} minimum on error).

705
src/test_runner.py Normal file
View File

@@ -0,0 +1,705 @@
"""Deterministic test-runner (ORCH-116).
The ``testing`` stage for the self-hosting ``orchestrator`` repo was driven by the
LLM ``tester`` agent, but its PASS/FAIL core is purely deterministic
(``.openclaw/agents/tester.md`` steps 1-3): run the regression ``pytest tests/`` in
the task worktree, do a read-only smoke (``/health`` / ``/status`` / ``/queue`` +
``serial_gate`` block), map the exit-code to a verdict (``0 -> PASS``, else ``FAIL``)
and write ``13-test-report.md`` with the machine frontmatter ``result: PASS|FAIL``.
This leaf replaces that LLM consultation with deterministic code, intercepted in
``launcher.launch_job`` BEFORE ``_spawn`` (the ``deploy-finalizer`` /
``post-deploy-monitor`` / ``staging-runner`` reserved-agent precedent,
``launcher.py:397/402/405``). It is the second realised slice of the
determinization-roadmap (A5/tester, ``needs-hybrid-fallback``; the first was
ORCH-115/deployer — ``src/staging_runner.py``).
Hybrid nature (the key difference from ORCH-115): tester is
``needs-hybrid-fallback``, not ``replace-deterministic-now``. Its PASS/FAIL core is
fully derivable (pytest exit-code + smoke) and that is what this deterministic runner
owns; a future OPTIONAL **off-control-path** LLM triage/diagnosis after a
deterministic FAIL stays allowed (a separate role/job that never writes/overrides
``result:`` and never adds a STAGE_TRANSITIONS edge). Phase 1 does NOT implement the
triage but the architecture does not forbid it (D11/FR-9/AC-12).
What is and is NOT changed (NFR-1, the critical invariant):
* UNCHANGED — the artifact contract (``13-test-report.md`` with ``result:
PASS|FAIL``), the gate ``check_tests_passed`` / ``_parse_tests_verdict``,
``STAGE_TRANSITIONS``, the DB schema. This module replaces only the *producer* of
the artifact, never the gate that reads it.
* NEW — a deterministic producer + a launch_job intercept. Under a kill-switch +
repo-scope CSV + a test-contract resolve; fail-safe back to the LLM path when off
/ out of scope / no contract.
This module is a **leaf** (mirror of ``staging_runner`` / ``self_deploy`` /
``proc_group``): it imports only ``config`` / ``logging`` / ``proc_group`` at module
load; ``db`` / ``git_worktree`` / ``self_deploy.map_exit_code_to_status`` /
``qg.checks`` / ``stage_engine.advance_stage`` / ``notifications`` are imported LAZILY
inside functions so the heavy ``stage_engine`` is never pulled at import and no import
cycle forms. Every public function honours a **never-raise** contract (AC-9): a test
infra hiccup can never crash the worker / wedge the queue.
Two-level outcome (D5 — the key safety decision, anti ORCH-110):
* the suite EXECUTED (a real exit-code, 0 or non-zero) -> trust the code:
``0 -> PASS -> advance``; ``!=0 -> FAIL -> the existing rollback testing ->
development`` (same developer-retry path as a FAIL LLM verdict). A smoke failure
AND-s into ``FAIL`` (deterministic), it is NOT a tool-error.
* the suite did NOT execute (tool-error: spawn-error / timeout / ``returncode is
None``) -> an infra fault, NOT a code fault -> a bounded DEFER (re-queue a fresh
``tester`` job with a delay + a restart-safe marker). On budget exhaustion ->
fail-closed ``FAIL`` + advance + alert. So the runner NEVER does a silent advance
/ false green, and NEVER wedges the queue, but does NOT burn a developer-retry on
transient infra.
"""
import logging
import time
import urllib.error
import urllib.request
from .config import settings
from . import proc_group
logger = logging.getLogger("orchestrator.test_runner")
# Default wall-clock budget for the pytest regression (D9). Kept <= the LLM testing
# window it replaces (agent_timeout_seconds=1800) so Σ(work on the testing edge) does
# not grow and the cross-cutting reaper invariant (ORCH-065/109/110) holds WITHOUT
# touching reaper_max_running_s. 900s = ~74% headroom over the ~517s regression suite.
_DEFAULT_TIMEOUT_S = 900
_GIT_TIMEOUT = 60
# Read-only smoke knobs (D3). Transient unreachability (connection refused / timeout)
# is retried briefly before a FAIL so a single prod-8500 blip does not roll back a
# healthy branch; a reachable-but-wrong-form response (non-200 / no serial_gate block)
# is an immediate FAIL.
_SMOKE_TIMEOUT_S = 5
_SMOKE_MAX_ATTEMPTS = 3
_SMOKE_BACKOFF_S = 1.0
# Restart-safe DEFER marker (counted from the persisted jobs queue, mirror of
# staging_runner._INFRA_RETRY_MARKER / stage_engine._merge_infra_retry_count). Embedded
# verbatim in the re-queued job's task_content so a service restart never resets the
# infra-retry budget.
_INFRA_RETRY_MARKER = "test-runner infra-retry"
# In-process observability counters (mirror staging_runner._STAGING_RUNNER_COUNTERS).
_TEST_RUNNER_COUNTERS: dict = {
"runs": 0, # run_test_gate entered
"pass": 0, # suite ran, exit 0 + smoke ok -> PASS
"fail": 0, # suite ran non-zero / smoke failed, OR infra budget exhausted -> FAIL
"tool_error": 0, # suite did NOT execute (spawn-error / timeout / None)
"deferred": 0, # bounded infra DEFER (re-queued)
}
def _bump(key: str) -> None:
"""Increment an observability counter. Never raises."""
try:
_TEST_RUNNER_COUNTERS[key] += 1
except Exception: # noqa: BLE001 - observability must never break a decision
pass
# ---------------------------------------------------------------------------
# Conditionality (D8 / FR-7 / AC-7 / AC-8)
# ---------------------------------------------------------------------------
def _has_test_contract(repo: str) -> bool:
"""Whether a test-contract (regression + smoke) is resolvable for ``repo`` (BR-9).
In Phase 1 the contract is known by default ONLY for the self-hosting repo
(``orchestrator`` — ``pytest tests/`` + the read-only smoke); for every other repo
there is no contract yet (Phase 2 project test-contract) -> ``applies`` is False ->
the prior LLM-tester runs (fail-safe, AC-8). This makes AC-8 checkable: even if a
non-self repo (e.g. ``enduro-trails``) is hand-added to ``test_runner_repos``,
``_has_test_contract`` returns False -> it is never intercepted. never-raise.
"""
try:
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("test_runner._has_test_contract error for %s: %s", repo, e)
return False
def applies(repo: str) -> bool:
"""Whether the deterministic test-runner is REAL for ``repo``.
Mirrors ``staging_runner.applies`` / ``coverage_gate``:
* ``test_runner_enabled=False`` -> always False (global kill-switch); the legacy
LLM-tester path runs on ``testing`` via ``_spawn``.
* ``test_runner_repos`` (CSV) non-empty -> only the listed repos.
* empty CSV -> only the self-hosting repo (``orchestrator``).
* AND a test-contract is resolvable for the repo (BR-9, ``_has_test_contract``).
Checked FIRST in ``should_intercept`` (local, no network, no DB) so a disabled flag
costs nothing. never-raise -> False (fail-safe to the prior LLM path).
"""
try:
if not settings.test_runner_enabled:
return False
raw = (settings.test_runner_repos or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
in_scope = (repo or "").strip().lower() in allowed
else:
# Lazy import keeps this module a leaf (no qg import at module load).
from .qg.checks import is_self_hosting_repo
in_scope = is_self_hosting_repo(repo)
if not in_scope:
return False
return _has_test_contract(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("test_runner.applies error for %s: %s", repo, e)
return False
def should_intercept(job: dict) -> bool:
"""True iff this ``tester`` job is the deterministic test-suite job (D1).
``tester`` is the ONLY agent that enters the ``testing`` stage
(``STAGE_TRANSITIONS["review"]["agent"]``), so there is no stage collision as
there was for the shared ``deployer`` role (ORCH-115); the
``tasks.stage == "testing"`` guard is kept as defense-in-depth (R-1) — it stops a
stray future ``tester`` job outside ``testing`` from being intercepted. Intercept
iff ``agent == "tester"`` AND ``applies(repo)`` AND ``tasks.stage == "testing"``.
never-raise -> False (a DB-lookup failure falls through to ``_spawn``, fail-safe to
the prior LLM path).
"""
try:
if (job.get("agent") or "") != "tester":
return False
# applies() FIRST (local, no DB): disabled flag -> zero DB overhead.
if not applies(job.get("repo")):
return False
task_id = job.get("task_id")
if task_id is None:
return False
from .db import get_db
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
if not row:
return False
return (row[0] or "") == "testing"
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("test_runner.should_intercept error: %s", e)
return False
# ---------------------------------------------------------------------------
# Suite execution (D3 / FR-2 / NFR-3 / AC-9 / AC-11)
# ---------------------------------------------------------------------------
def build_test_command() -> list[str]:
"""Build the canonical regression argv (the same command the LLM-tester ran).
``python -m pytest <test_runner_target> -q`` (default ``tests/``, convention
``merge_retest_target``). Self-hosting safety (BR-7 / AC-10 / TC-13): NO restart of
8500, NO ``docker compose up orchestrator`` / ``--build``, NO force-push, NO
``.env`` edit — the runner only executes pytest in the task worktree and does
read-only GETs.
"""
target = (settings.test_runner_target or "tests/").strip() or "tests/"
return ["python", "-m", "pytest", target, "-q"]
def _resolve_timeout() -> int:
"""Resolve ``test_runner_timeout_s`` (malformed/non-positive -> default + WARNING,
never-break — mirror of ``staging_runner._resolve_timeout`` /
``merge_gate._resolve_retest_timeout``)."""
raw = getattr(settings, "test_runner_timeout_s", _DEFAULT_TIMEOUT_S)
try:
t = int(raw)
if t > 0:
return t
logger.warning(
"test_runner_timeout_s non-positive (%r) -> default %ds", raw, _DEFAULT_TIMEOUT_S
)
except (TypeError, ValueError):
logger.warning(
"test_runner_timeout_s malformed (%r) -> default %ds", raw, _DEFAULT_TIMEOUT_S
)
return _DEFAULT_TIMEOUT_S
def run_test_suite(repo: str, branch: str) -> proc_group.ProcResult:
"""Execute the pytest regression IN THE TASK WORKTREE, tree-killed on timeout.
Runs in ``git_worktree.get_worktree_path(repo, branch)`` (NOT the shared
``/repos/orchestrator`` — anti checkout-race, the same context coverage-gate /
merge-gate re-test use) through ``proc_group.run_in_process_group`` (ORCH-110) so a
hung pytest subtree is killed whole (no orphans, AC-11). Never raises (proc_group
degrades any OS error to a safe ``ProcResult``; a missing worktree -> a ProcResult
with ``returncode is None`` -> the tool-error DEFER path).
"""
cmd = build_test_command()
timeout = _resolve_timeout()
try:
grace = float(getattr(settings, "agent_kill_grace_seconds", 5) or 5)
except (TypeError, ValueError):
grace = 5.0
try:
from .git_worktree import get_worktree_path
wt = get_worktree_path(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise -> tool-error DEFER
logger.error("run_test_suite: worktree error for %s/%s: %s", repo, branch, e)
return proc_group.ProcResult(returncode=None, stdout="", stderr=str(e), timed_out=False)
return proc_group.run_in_process_group(
cmd,
cwd=wt,
timeout=timeout,
grace_s=grace,
tree_kill=bool(getattr(settings, "subprocess_tree_kill_enabled", True)),
)
# ---------------------------------------------------------------------------
# Read-only smoke (D3 / FR-2 / AC-10) — stdlib urllib, never-raise
# ---------------------------------------------------------------------------
def _http_get(url: str) -> tuple[int, str]:
"""GET ``url`` -> (http_code, body). Network/timeout -> (0, ""). Never raises.
Mirror of ``post_deploy._http_status`` (stdlib only, no httpx import at module
load — keeps the leaf pure). ``urllib`` raises ``HTTPError`` for >=400 responses;
that is treated as a real status code (so a 5xx is observed, not swallowed)."""
try:
with urllib.request.urlopen(url, timeout=_SMOKE_TIMEOUT_S) as resp: # noqa: S310
body = resp.read(8192).decode("utf-8", "replace")
return int(getattr(resp, "status", resp.getcode())), body
except urllib.error.HTTPError as e:
try:
body = e.read(8192).decode("utf-8", "replace")
except Exception: # noqa: BLE001
body = ""
return int(e.code), body
except Exception as e: # noqa: BLE001 - URLError / socket timeout / anything
logger.warning("test_runner smoke GET error for %s: %s", url, e)
return 0, ""
def run_smoke() -> tuple[bool, str]:
"""Read-only smoke against the running orchestrator (D3). Returns (ok, detail).
Strictly read-only GETs (BR-7/AC-10): ``/health`` (HTTP 200), ``/status`` (HTTP
200), ``/queue`` (HTTP 200 AND the ``serial_gate`` block present, ORCH-088). A
transient unreachability (code 0 = connection refused / timeout) is retried up to
``_SMOKE_MAX_ATTEMPTS`` with a short backoff before FAIL (anti-flap, so a single
prod blip does not roll back a healthy branch); a reachable-but-wrong-form response
(non-200 / missing serial_gate) is an immediate FAIL. never-raise -> (False, detail)
on any unexpected error."""
try:
base = (settings.post_deploy_base_url or "http://localhost:8500").rstrip("/")
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("test_runner.run_smoke: base url error: %s", e)
return False, f"smoke base-url error: {e}"
# (path, validator(code, body) -> (ok, immediate_fail, note))
def _ok_200(code: int, body: str) -> tuple[bool, bool, str]:
if code == 200:
return True, False, ""
if code == 0:
return False, False, "unreachable" # transient -> retryable
return False, True, f"HTTP {code}" # reachable wrong form -> immediate FAIL
def _queue_ok(code: int, body: str) -> tuple[bool, bool, str]:
if code == 0:
return False, False, "unreachable"
if code != 200:
return False, True, f"HTTP {code}"
if "serial_gate" not in (body or ""):
return False, True, "no serial_gate block"
return True, False, ""
checks = (("/health", _ok_200), ("/status", _ok_200), ("/queue", _queue_ok))
for path, validator in checks:
url = base + path
note = "unreachable"
for attempt in range(1, _SMOKE_MAX_ATTEMPTS + 1):
code, body = _http_get(url)
ok, immediate_fail, note = validator(code, body)
if ok:
break
if immediate_fail:
return False, f"smoke {path} failed: {note}"
# transient (unreachable) -> bounded retry before declaring FAIL.
if attempt < _SMOKE_MAX_ATTEMPTS:
time.sleep(_SMOKE_BACKOFF_S)
else:
return False, f"smoke {path} {note} after {_SMOKE_MAX_ATTEMPTS} attempts"
return True, "smoke ok (/health, /status, /queue + serial_gate)"
# ---------------------------------------------------------------------------
# exit-code -> result: (D4 / FR-3 / AC-3) — reuse the single contract, no 2nd mapping
# ---------------------------------------------------------------------------
def map_exit_code_to_result(exit_code) -> str:
"""``0 -> PASS``; non-zero / None / non-int -> ``FAIL`` (fail-closed).
A thin token-translator over ``self_deploy.map_exit_code_to_status`` — the SAME
pure, unit-tested ``0->SUCCESS`` contract the deploy-finalizer / staging-runner use
(BR-4: no second, drifting mapping). Only the tokens differ (``result:`` uses
``PASS``/``FAIL``, the ``_TESTS_POSITIVE_TOKENS`` / ``_TESTS_NEGATIVE_TOKENS`` the
gate reads), not the logic."""
from .self_deploy import map_exit_code_to_status as _map
return "PASS" if _map(exit_code) == "SUCCESS" else "FAIL"
# ---------------------------------------------------------------------------
# Artifact 13-test-report.md (D6 / FR-4 / AC-2) — mirror write_staging_log
# ---------------------------------------------------------------------------
def build_test_report(
work_item_id: str, exit_code, result: str, stdout: str = "",
smoke: str = "skipped", *, tool_error: bool = False,
) -> str:
"""Render a ``13-test-report.md`` body whose ``result:`` frontmatter is the verdict
``check_tests_passed`` / ``_parse_tests_verdict`` reads (contract UNCHANGED, AC-2).
Carries the mandatory 52c schema; ``author_agent: test-runner`` / ``model_used:
n/a`` honestly reflect the DETERMINISTIC producer. The machine key ``result:`` and
its UPPERCASE value are NOT changed.
D6.1 (the tester-specific mine, absent in ORCH-115): ``_parse_tests_verdict`` reads
the verdict from THREE equal-rank fields — ``verdict:`` / ``status:`` / ``result:``
— with negative-token priority. The 52c-mandatory ``status:`` is therefore read by
the SAME parser, so it MUST be aligned with the verdict and never contradict
``result:``: ``PASS -> status: success`` (``SUCCESS`` is neither a positive nor a
negative token; the positive ``PASS`` comes from ``result:`` -> the parser gives
True); ``FAIL -> status: failed`` (``FAILED`` is a negative token, consistent with
``FAIL`` -> False). A ``status:`` literal carrying a negative token at ``result:
PASS`` would be a false FAIL of a healthy run (reviewer ≥P1).
Written as a literal block (mirror of ``staging_runner.build_staging_log``) so the
machine-read frontmatter is byte-exact; only the frontmatter is machine-read, the
body is informational."""
import datetime
created = datetime.date.today().isoformat()
sub_status = "success" if result == "PASS" else "failed"
tail = ""
if stdout:
tail_text = stdout.strip()[-1500:]
if tail_text:
tail = f"\npytest stdout (tail):\n```\n{tail_text}\n```\n"
note = (
"Regression suite did NOT execute (tool-error) and the infra-retry budget was "
"exhausted -> fail-closed FAIL."
if tool_error
else f"pytest exit-code `{exit_code}` -> `result: {result}` (smoke: {smoke})."
)
return (
"---\n"
f"result: {result}\n"
f"work_item: {work_item_id}\n"
"stage: testing\n"
"author_agent: test-runner\n"
f"status: {sub_status}\n"
f"created_at: {created}\n"
"model_used: n/a\n"
f"exit_code: {exit_code}\n"
f"smoke: {smoke}\n"
"---\n\n"
"# Test Gate Log (deterministic runner, ORCH-116)\n\n"
f"{note}\n\n"
"Вердикт зафиксирован детерминированным test-раннером (ORCH-116), не LLM. "
"PASS/FAIL = exit-код `pytest` + read-only smoke (`/health`, `/status`, "
"`/queue` + блок `serial_gate`).\n"
f"{tail}"
)
def write_test_report(
repo: str, work_item_id: str, branch: str, exit_code, result: str,
stdout: str = "", smoke: str = "skipped", *, tool_error: bool = False,
) -> bool:
"""Write ``13-test-report.md`` into the task worktree (so ``check_tests_passed``
reads it) and best-effort commit+push it to the FEATURE BRANCH. Returns True iff
the file was written. Never raises.
Mirror of ``staging_runner.write_staging_log``: the actor name is ``test-runner``
and the log is pushed only to the feature branch — there is NO separate PR-merge of
the log into ``main`` (the gate reads the worktree first via ``_repo_path``;
excluding any direct work on ``main`` strengthens AC-10 / BR-7). The feature branch
is merged into ``main`` later by the normal merge-gate path."""
import os
import subprocess
from .git_worktree import get_worktree_path
rel = f"docs/work-items/{work_item_id}/13-test-report.md"
try:
wt = get_worktree_path(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise
logger.error("write_test_report: worktree error for %s/%s: %s", repo, branch, e)
return False
path = os.path.join(wt, rel)
content = build_test_report(
work_item_id, exit_code, result, stdout, smoke, tool_error=tool_error
)
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
except OSError as e:
logger.error("write_test_report: write error at %s: %s", path, e)
return False
# Best-effort commit + push to the feature branch (the gate also falls back to
# origin/main). ORCH-101: HOME + email domain from Settings; the actor NAME stays
# the platform literal `test-runner` (deterministic system-actor commits).
_email = f"test-runner@{settings.git_email_domain}"
git_env = {
**os.environ,
"HOME": settings.agent_home_dir,
"GIT_AUTHOR_NAME": "test-runner",
"GIT_AUTHOR_EMAIL": _email,
"GIT_COMMITTER_NAME": "test-runner",
"GIT_COMMITTER_EMAIL": _email,
}
try:
subprocess.run(["git", "-C", wt, "add", rel],
capture_output=True, timeout=_GIT_TIMEOUT, env=git_env)
commit = subprocess.run(
["git", "-C", wt, "commit", "-m",
f"test(ORCH-116): test gate {result} for {work_item_id}"],
capture_output=True, text=True, timeout=_GIT_TIMEOUT, env=git_env,
)
if commit.returncode == 0:
subprocess.run(["git", "-C", wt, "push", "origin", branch],
capture_output=True, timeout=_GIT_TIMEOUT, env=git_env)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("write_test_report: git commit/push best-effort failed: %s", e)
return True
# ---------------------------------------------------------------------------
# Existing-gate initiation (D7 / FR-5 / AC-4) — no new routing branch
# ---------------------------------------------------------------------------
def _advance(task_id, repo: str, work_item_id: str, branch: str) -> None:
"""Initiate the SAME ``advance_stage`` evaluation a finished LLM-tester would
(``finished_agent="tester"`` on ``testing``): PASS -> ``testing -> deploy-staging``;
FAIL -> the existing rollback ``testing -> development`` + developer-retry
(``stage_engine.py:849``, matched by ``agent == "tester" and qg_name ==
"check_tests_passed"`` — ``finished_agent="tester"`` is MANDATORY, R-2). No new
routing branch. The transition-lease (ORCH-114) is taken INSIDE advance_stage on
the side-effectful edge — the runner never touches it (task boundary O1).
never-raise."""
try:
from . import stage_engine
stage_engine.advance_stage(
task_id=task_id,
current_stage="testing",
repo=repo,
work_item_id=work_item_id,
branch=branch,
finished_agent="tester",
)
except Exception as e: # noqa: BLE001 - never-raise into the worker
logger.error(
"test_runner._advance: advance_stage failed for task %s (%s): %s",
task_id, work_item_id, e,
)
# ---------------------------------------------------------------------------
# Two-level outcome (D5) — tool-error DEFER bookkeeping
# ---------------------------------------------------------------------------
def _infra_retry_count(task_id) -> int:
"""How many times this task was re-queued by the tool-error DEFER path
(restart-safe; counted from the persisted jobs queue by the marker — mirror of
``staging_runner._infra_retry_count``). Never raises -> 0 on error."""
try:
from .db import get_db
conn = get_db()
n = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE task_id=? AND task_content LIKE ?",
(task_id, f"%{_INFRA_RETRY_MARKER}%"),
).fetchone()[0]
conn.close()
return int(n)
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("test_runner._infra_retry_count error for %s: %s", task_id, e)
return 0
def _handle_tool_error(
task_id, repo: str, work_item_id: str, branch: str, result: proc_group.ProcResult
) -> None:
"""Suite did NOT execute (tool-error) -> bounded DEFER, then fail-closed (D5).
Anti ORCH-110: an infra fault is NOT a code fault, so we re-queue a fresh
``tester`` job (which re-enters this runner on the still-``testing`` task) with a
delay instead of an immediate FAIL-rollback that would burn a developer-retry. On
budget exhaustion -> write ``result: FAIL`` + advance (the existing rollback) + an
INFRA-specific alert (explicitly "not a code defect"). Never a silent advance /
false green; never wedges the queue. never-raise."""
retries = _infra_retry_count(task_id)
try:
max_retries = int(settings.test_runner_infra_max_retries)
except (TypeError, ValueError):
max_retries = 2
try:
delay = int(settings.test_runner_infra_retry_delay_s)
except (TypeError, ValueError):
delay = 30
if retries < max_retries:
_bump("deferred")
reason = "timeout" if result.timed_out else "suite did not execute (tool-error)"
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: testing\nNote: {_INFRA_RETRY_MARKER} "
f"(attempt {retries + 1}/{max_retries}) — {reason}, retrying after {delay}s."
)
try:
from .db import enqueue_job
new_job = enqueue_job(
"tester", repo, task_desc, task_id=task_id, available_at_delay_s=delay,
)
logger.warning(
"Task %s (%s): regression suite did not execute (%s) -> infra-DEFER "
"(job_id=%s, attempt %d/%d)",
task_id, work_item_id, reason, new_job, retries + 1, max_retries,
)
except Exception as e: # noqa: BLE001 - never-raise
logger.error("test_runner: infra-DEFER enqueue failed for %s: %s", task_id, e)
return
# Budget exhausted -> fail-closed FAIL (terminal, never a false green).
_bump("fail")
logger.error(
"Task %s (%s): test tool-error DEFER budget exhausted (%d) -> fail-closed FAIL",
task_id, work_item_id, max_retries,
)
write_test_report(repo, work_item_id, branch, result.returncode, "FAIL",
result.stdout, "skipped", tool_error=True)
_alert_infra_exhausted(work_item_id, max_retries)
_advance(task_id, repo, work_item_id, branch)
def _alert_infra_exhausted(work_item_id: str, max_retries: int) -> None:
"""Best-effort Telegram alert that the regression suite never executed (infra, NOT
a code defect) after the retry budget. never-raise."""
try:
from .notifications import send_telegram, link_for
send_telegram(
f"\U0001f6a8 {link_for(work_item_id)}: регресс-сюита не запустилась "
f"(инфра, НЕ дефект кода) после {max_retries} попыток — fail-closed FAIL, "
f"откат на development. Нужно проверить тест-окружение."
)
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("test_runner: infra-exhausted alert failed for %s: %s", work_item_id, e)
# ---------------------------------------------------------------------------
# Entry point (D2) — owns the full deterministic flow, mirror run_staging_gate
# ---------------------------------------------------------------------------
def run_test_gate(job: dict) -> None:
"""Deterministic test gate for a ``tester`` job on ``testing``.
Flow (mirror of ``staging_runner.run_staging_gate``):
1. resolve ``work_item_id`` / ``branch`` by ``task_id``;
2. execute the regression suite in the worktree (D3) -> ProcResult;
3. suite EXECUTED -> map exit-code (+ smoke) -> ``result:``, write
``13-test-report.md``, initiate the existing gate via ``advance_stage`` (D7);
4. suite did NOT execute (tool-error) -> bounded DEFER / fail-closed (D5);
5. observability counters + one structured verdict log (D10).
Never raises into the caller (the launcher marks the job done/failed)."""
started = time.time()
_bump("runs")
task_id = job.get("task_id")
repo = job.get("repo")
# 1. resolve task fields.
work_item_id, branch = None, None
try:
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT work_item_id, branch FROM tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
if row:
work_item_id, branch = row[0], row[1]
except Exception as e: # noqa: BLE001 - never-raise
logger.error("test_runner: task lookup failed for task_id=%s: %s", task_id, e)
if not work_item_id or not branch:
logger.error(
"test_runner: missing work_item_id/branch for task_id=%s — aborting", task_id
)
return
# 2-4. execute + classify + route — guarded so AC-9 (never-raise) holds even if an
# unexpected error escapes a sub-step (the worker must never crash on test infra;
# the task is left on testing for the reconciler/reaper to re-drive).
try:
result = run_test_suite(repo, branch)
duration_s = round(time.time() - started, 1)
suite_ran = (result.returncode is not None) and (not result.timed_out)
if suite_ran:
# 3. trust the exit-code; AND-in the read-only smoke (D3).
exit_verdict = map_exit_code_to_result(result.returncode)
smoke_state = "skipped"
verdict = exit_verdict
if exit_verdict == "PASS" and bool(getattr(settings, "test_runner_smoke_enabled", True)):
smoke_ok, smoke_detail = run_smoke()
smoke_state = "ok" if smoke_ok else "failed"
if not smoke_ok:
verdict = "FAIL"
logger.warning(
"test_runner: pytest green but smoke FAILED for %s: %s",
work_item_id, smoke_detail,
)
_bump("pass" if verdict == "PASS" else "fail")
logger.info(
"test_runner verdict: work_item=%s repo=%s exit_code=%s result=%s "
"smoke=%s duration_s=%s outcome=%s",
work_item_id, repo, result.returncode, verdict, smoke_state, duration_s,
"code-pass" if verdict == "PASS" else "code-fail",
)
write_test_report(repo, work_item_id, branch, result.returncode, verdict,
result.stdout, smoke_state)
_advance(task_id, repo, work_item_id, branch)
return
# 4. tool-error (suite did not execute) -> DEFER / fail-closed (D5).
_bump("tool_error")
logger.warning(
"test_runner verdict: work_item=%s repo=%s exit_code=%s result=%s "
"duration_s=%s outcome=tool-error (timed_out=%s)",
work_item_id, repo, result.returncode, "TOOL-ERROR", duration_s, result.timed_out,
)
_handle_tool_error(task_id, repo, work_item_id, branch, result)
except Exception as e: # noqa: BLE001 - never-raise into the worker (AC-9)
logger.error(
"test_runner.run_test_gate: unexpected error for task %s (%s): %s",
task_id, work_item_id, e,
)
# ---------------------------------------------------------------------------
# Observability (D10 / FR-8 / AC-13)
# ---------------------------------------------------------------------------
def snapshot() -> dict:
"""Read-only test-runner summary for ``GET /queue`` (FR-8 / AC-13).
Additive block; existing ``/queue`` keys are untouched. never-raise: any error ->
a minimal dict with the kill-switch state."""
try:
return {
"enabled": bool(settings.test_runner_enabled),
"repos": getattr(settings, "test_runner_repos", "") or "",
"target": getattr(settings, "test_runner_target", "tests/") or "tests/",
"timeout_s": getattr(settings, "test_runner_timeout_s", _DEFAULT_TIMEOUT_S),
"smoke_enabled": bool(getattr(settings, "test_runner_smoke_enabled", True)),
"infra_max_retries": getattr(settings, "test_runner_infra_max_retries", 2),
"runs": _TEST_RUNNER_COUNTERS["runs"],
"pass": _TEST_RUNNER_COUNTERS["pass"],
"fail": _TEST_RUNNER_COUNTERS["fail"],
"tool_error": _TEST_RUNNER_COUNTERS["tool_error"],
"deferred": _TEST_RUNNER_COUNTERS["deferred"],
}
except Exception as e: # noqa: BLE001 - never-raise -> minimal dict
logger.warning("test_runner.snapshot error: %s", e)
return {"enabled": False}