Files
orchestrator/src/test_runner.py
claude-bot 9d16ee473a feat(testing): deterministic test-runner replacing LLM tester on the testing stage (ORCH-116)
Second realised slice of the determinization-roadmap (ORCH-118 A5,
needs-hybrid-fallback): on the `testing` stage for the self-hosting
`orchestrator` repo the LLM `tester` agent is replaced by a deterministic
test-runner (src/test_runner.py), intercepted in launch_job BEFORE _spawn
(deploy-finalizer / post-deploy-monitor / staging-runner precedent).

It runs the regression `python -m pytest <target>` in the task worktree via
proc_group (tree-kill) + an optional read-only smoke (/health, /status, /queue
+ serial_gate), maps the exit-code -> result: PASS|FAIL via the existing
self_deploy.map_exit_code_to_status contract, writes 13-test-report.md and
initiates the EXISTING check_tests_passed gate exactly as a finished LLM-tester.

Invariant (NFR-1): only the *producer* changes — the artifact contract
(13-test-report.md / result:), the gate check_tests_passed / _parse_tests_verdict,
STAGE_TRANSITIONS and the DB schema are byte-for-byte UNCHANGED. Additive, under
a kill-switch (test_runner_enabled), never-raise, fail-closed, self-hosting scope,
two-level outcome (tool-error DEFER, anti ORCH-110), hybrid (LLM strictly
off-control-path). 52c-`status:` is aligned with the verdict (D6.1) so the
three-field _parse_tests_verdict never false-negatives a PASS.

Docs (ORCH-118 NFR-6, atomic with code): llm-call-sites.md (A5 implemented),
llm-determinization-roadmap.md (rank 2 implemented), llm-usage-policy.md,
README/internals/overview, tester.md, CLAUDE.md, CHANGELOG.md. Coverage:
tests/test_orch116_test_runner.py (TC-01..TC-14); LLM anti-drift tests green.
Full suite: 2137 passed.

Refs: ORCH-116
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-16 09:37:40 +03:00

706 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Deterministic test-runner (ORCH-116).
The ``testing`` stage for the self-hosting ``orchestrator`` repo was driven by the
LLM ``tester`` agent, but its PASS/FAIL core is purely deterministic
(``.openclaw/agents/tester.md`` steps 1-3): run the regression ``pytest tests/`` in
the task worktree, do a read-only smoke (``/health`` / ``/status`` / ``/queue`` +
``serial_gate`` block), map the exit-code to a verdict (``0 -> PASS``, else ``FAIL``)
and write ``13-test-report.md`` with the machine frontmatter ``result: PASS|FAIL``.
This leaf replaces that LLM consultation with deterministic code, intercepted in
``launcher.launch_job`` BEFORE ``_spawn`` (the ``deploy-finalizer`` /
``post-deploy-monitor`` / ``staging-runner`` reserved-agent precedent,
``launcher.py:397/402/405``). It is the second realised slice of the
determinization-roadmap (A5/tester, ``needs-hybrid-fallback``; the first was
ORCH-115/deployer — ``src/staging_runner.py``).
Hybrid nature (the key difference from ORCH-115): tester is
``needs-hybrid-fallback``, not ``replace-deterministic-now``. Its PASS/FAIL core is
fully derivable (pytest exit-code + smoke) and that is what this deterministic runner
owns; a future OPTIONAL **off-control-path** LLM triage/diagnosis after a
deterministic FAIL stays allowed (a separate role/job that never writes/overrides
``result:`` and never adds a STAGE_TRANSITIONS edge). Phase 1 does NOT implement the
triage but the architecture does not forbid it (D11/FR-9/AC-12).
What is and is NOT changed (NFR-1, the critical invariant):
* UNCHANGED — the artifact contract (``13-test-report.md`` with ``result:
PASS|FAIL``), the gate ``check_tests_passed`` / ``_parse_tests_verdict``,
``STAGE_TRANSITIONS``, the DB schema. This module replaces only the *producer* of
the artifact, never the gate that reads it.
* NEW — a deterministic producer + a launch_job intercept. Under a kill-switch +
repo-scope CSV + a test-contract resolve; fail-safe back to the LLM path when off
/ out of scope / no contract.
This module is a **leaf** (mirror of ``staging_runner`` / ``self_deploy`` /
``proc_group``): it imports only ``config`` / ``logging`` / ``proc_group`` at module
load; ``db`` / ``git_worktree`` / ``self_deploy.map_exit_code_to_status`` /
``qg.checks`` / ``stage_engine.advance_stage`` / ``notifications`` are imported LAZILY
inside functions so the heavy ``stage_engine`` is never pulled at import and no import
cycle forms. Every public function honours a **never-raise** contract (AC-9): a test
infra hiccup can never crash the worker / wedge the queue.
Two-level outcome (D5 — the key safety decision, anti ORCH-110):
* the suite EXECUTED (a real exit-code, 0 or non-zero) -> trust the code:
``0 -> PASS -> advance``; ``!=0 -> FAIL -> the existing rollback testing ->
development`` (same developer-retry path as a FAIL LLM verdict). A smoke failure
AND-s into ``FAIL`` (deterministic), it is NOT a tool-error.
* the suite did NOT execute (tool-error: spawn-error / timeout / ``returncode is
None``) -> an infra fault, NOT a code fault -> a bounded DEFER (re-queue a fresh
``tester`` job with a delay + a restart-safe marker). On budget exhaustion ->
fail-closed ``FAIL`` + advance + alert. So the runner NEVER does a silent advance
/ false green, and NEVER wedges the queue, but does NOT burn a developer-retry on
transient infra.
"""
import logging
import time
import urllib.error
import urllib.request
from .config import settings
from . import proc_group
logger = logging.getLogger("orchestrator.test_runner")
# Default wall-clock budget for the pytest regression (D9). Kept <= the LLM testing
# window it replaces (agent_timeout_seconds=1800) so Σ(work on the testing edge) does
# not grow and the cross-cutting reaper invariant (ORCH-065/109/110) holds WITHOUT
# touching reaper_max_running_s. 900s = ~74% headroom over the ~517s regression suite.
_DEFAULT_TIMEOUT_S = 900
_GIT_TIMEOUT = 60
# Read-only smoke knobs (D3). Transient unreachability (connection refused / timeout)
# is retried briefly before a FAIL so a single prod-8500 blip does not roll back a
# healthy branch; a reachable-but-wrong-form response (non-200 / no serial_gate block)
# is an immediate FAIL.
_SMOKE_TIMEOUT_S = 5
_SMOKE_MAX_ATTEMPTS = 3
_SMOKE_BACKOFF_S = 1.0
# Restart-safe DEFER marker (counted from the persisted jobs queue, mirror of
# staging_runner._INFRA_RETRY_MARKER / stage_engine._merge_infra_retry_count). Embedded
# verbatim in the re-queued job's task_content so a service restart never resets the
# infra-retry budget.
_INFRA_RETRY_MARKER = "test-runner infra-retry"
# In-process observability counters (mirror staging_runner._STAGING_RUNNER_COUNTERS).
_TEST_RUNNER_COUNTERS: dict = {
"runs": 0, # run_test_gate entered
"pass": 0, # suite ran, exit 0 + smoke ok -> PASS
"fail": 0, # suite ran non-zero / smoke failed, OR infra budget exhausted -> FAIL
"tool_error": 0, # suite did NOT execute (spawn-error / timeout / None)
"deferred": 0, # bounded infra DEFER (re-queued)
}
def _bump(key: str) -> None:
"""Increment an observability counter. Never raises."""
try:
_TEST_RUNNER_COUNTERS[key] += 1
except Exception: # noqa: BLE001 - observability must never break a decision
pass
# ---------------------------------------------------------------------------
# Conditionality (D8 / FR-7 / AC-7 / AC-8)
# ---------------------------------------------------------------------------
def _has_test_contract(repo: str) -> bool:
"""Whether a test-contract (regression + smoke) is resolvable for ``repo`` (BR-9).
In Phase 1 the contract is known by default ONLY for the self-hosting repo
(``orchestrator`` — ``pytest tests/`` + the read-only smoke); for every other repo
there is no contract yet (Phase 2 project test-contract) -> ``applies`` is False ->
the prior LLM-tester runs (fail-safe, AC-8). This makes AC-8 checkable: even if a
non-self repo (e.g. ``enduro-trails``) is hand-added to ``test_runner_repos``,
``_has_test_contract`` returns False -> it is never intercepted. never-raise.
"""
try:
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("test_runner._has_test_contract error for %s: %s", repo, e)
return False
def applies(repo: str) -> bool:
"""Whether the deterministic test-runner is REAL for ``repo``.
Mirrors ``staging_runner.applies`` / ``coverage_gate``:
* ``test_runner_enabled=False`` -> always False (global kill-switch); the legacy
LLM-tester path runs on ``testing`` via ``_spawn``.
* ``test_runner_repos`` (CSV) non-empty -> only the listed repos.
* empty CSV -> only the self-hosting repo (``orchestrator``).
* AND a test-contract is resolvable for the repo (BR-9, ``_has_test_contract``).
Checked FIRST in ``should_intercept`` (local, no network, no DB) so a disabled flag
costs nothing. never-raise -> False (fail-safe to the prior LLM path).
"""
try:
if not settings.test_runner_enabled:
return False
raw = (settings.test_runner_repos or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
in_scope = (repo or "").strip().lower() in allowed
else:
# Lazy import keeps this module a leaf (no qg import at module load).
from .qg.checks import is_self_hosting_repo
in_scope = is_self_hosting_repo(repo)
if not in_scope:
return False
return _has_test_contract(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("test_runner.applies error for %s: %s", repo, e)
return False
def should_intercept(job: dict) -> bool:
"""True iff this ``tester`` job is the deterministic test-suite job (D1).
``tester`` is the ONLY agent that enters the ``testing`` stage
(``STAGE_TRANSITIONS["review"]["agent"]``), so there is no stage collision as
there was for the shared ``deployer`` role (ORCH-115); the
``tasks.stage == "testing"`` guard is kept as defense-in-depth (R-1) — it stops a
stray future ``tester`` job outside ``testing`` from being intercepted. Intercept
iff ``agent == "tester"`` AND ``applies(repo)`` AND ``tasks.stage == "testing"``.
never-raise -> False (a DB-lookup failure falls through to ``_spawn``, fail-safe to
the prior LLM path).
"""
try:
if (job.get("agent") or "") != "tester":
return False
# applies() FIRST (local, no DB): disabled flag -> zero DB overhead.
if not applies(job.get("repo")):
return False
task_id = job.get("task_id")
if task_id is None:
return False
from .db import get_db
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
if not row:
return False
return (row[0] or "") == "testing"
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("test_runner.should_intercept error: %s", e)
return False
# ---------------------------------------------------------------------------
# Suite execution (D3 / FR-2 / NFR-3 / AC-9 / AC-11)
# ---------------------------------------------------------------------------
def build_test_command() -> list[str]:
"""Build the canonical regression argv (the same command the LLM-tester ran).
``python -m pytest <test_runner_target> -q`` (default ``tests/``, convention
``merge_retest_target``). Self-hosting safety (BR-7 / AC-10 / TC-13): NO restart of
8500, NO ``docker compose up orchestrator`` / ``--build``, NO force-push, NO
``.env`` edit — the runner only executes pytest in the task worktree and does
read-only GETs.
"""
target = (settings.test_runner_target or "tests/").strip() or "tests/"
return ["python", "-m", "pytest", target, "-q"]
def _resolve_timeout() -> int:
"""Resolve ``test_runner_timeout_s`` (malformed/non-positive -> default + WARNING,
never-break — mirror of ``staging_runner._resolve_timeout`` /
``merge_gate._resolve_retest_timeout``)."""
raw = getattr(settings, "test_runner_timeout_s", _DEFAULT_TIMEOUT_S)
try:
t = int(raw)
if t > 0:
return t
logger.warning(
"test_runner_timeout_s non-positive (%r) -> default %ds", raw, _DEFAULT_TIMEOUT_S
)
except (TypeError, ValueError):
logger.warning(
"test_runner_timeout_s malformed (%r) -> default %ds", raw, _DEFAULT_TIMEOUT_S
)
return _DEFAULT_TIMEOUT_S
def run_test_suite(repo: str, branch: str) -> proc_group.ProcResult:
"""Execute the pytest regression IN THE TASK WORKTREE, tree-killed on timeout.
Runs in ``git_worktree.get_worktree_path(repo, branch)`` (NOT the shared
``/repos/orchestrator`` — anti checkout-race, the same context coverage-gate /
merge-gate re-test use) through ``proc_group.run_in_process_group`` (ORCH-110) so a
hung pytest subtree is killed whole (no orphans, AC-11). Never raises (proc_group
degrades any OS error to a safe ``ProcResult``; a missing worktree -> a ProcResult
with ``returncode is None`` -> the tool-error DEFER path).
"""
cmd = build_test_command()
timeout = _resolve_timeout()
try:
grace = float(getattr(settings, "agent_kill_grace_seconds", 5) or 5)
except (TypeError, ValueError):
grace = 5.0
try:
from .git_worktree import get_worktree_path
wt = get_worktree_path(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise -> tool-error DEFER
logger.error("run_test_suite: worktree error for %s/%s: %s", repo, branch, e)
return proc_group.ProcResult(returncode=None, stdout="", stderr=str(e), timed_out=False)
return proc_group.run_in_process_group(
cmd,
cwd=wt,
timeout=timeout,
grace_s=grace,
tree_kill=bool(getattr(settings, "subprocess_tree_kill_enabled", True)),
)
# ---------------------------------------------------------------------------
# Read-only smoke (D3 / FR-2 / AC-10) — stdlib urllib, never-raise
# ---------------------------------------------------------------------------
def _http_get(url: str) -> tuple[int, str]:
"""GET ``url`` -> (http_code, body). Network/timeout -> (0, ""). Never raises.
Mirror of ``post_deploy._http_status`` (stdlib only, no httpx import at module
load — keeps the leaf pure). ``urllib`` raises ``HTTPError`` for >=400 responses;
that is treated as a real status code (so a 5xx is observed, not swallowed)."""
try:
with urllib.request.urlopen(url, timeout=_SMOKE_TIMEOUT_S) as resp: # noqa: S310
body = resp.read(8192).decode("utf-8", "replace")
return int(getattr(resp, "status", resp.getcode())), body
except urllib.error.HTTPError as e:
try:
body = e.read(8192).decode("utf-8", "replace")
except Exception: # noqa: BLE001
body = ""
return int(e.code), body
except Exception as e: # noqa: BLE001 - URLError / socket timeout / anything
logger.warning("test_runner smoke GET error for %s: %s", url, e)
return 0, ""
def run_smoke() -> tuple[bool, str]:
"""Read-only smoke against the running orchestrator (D3). Returns (ok, detail).
Strictly read-only GETs (BR-7/AC-10): ``/health`` (HTTP 200), ``/status`` (HTTP
200), ``/queue`` (HTTP 200 AND the ``serial_gate`` block present, ORCH-088). A
transient unreachability (code 0 = connection refused / timeout) is retried up to
``_SMOKE_MAX_ATTEMPTS`` with a short backoff before FAIL (anti-flap, so a single
prod blip does not roll back a healthy branch); a reachable-but-wrong-form response
(non-200 / missing serial_gate) is an immediate FAIL. never-raise -> (False, detail)
on any unexpected error."""
try:
base = (settings.post_deploy_base_url or "http://localhost:8500").rstrip("/")
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("test_runner.run_smoke: base url error: %s", e)
return False, f"smoke base-url error: {e}"
# (path, validator(code, body) -> (ok, immediate_fail, note))
def _ok_200(code: int, body: str) -> tuple[bool, bool, str]:
if code == 200:
return True, False, ""
if code == 0:
return False, False, "unreachable" # transient -> retryable
return False, True, f"HTTP {code}" # reachable wrong form -> immediate FAIL
def _queue_ok(code: int, body: str) -> tuple[bool, bool, str]:
if code == 0:
return False, False, "unreachable"
if code != 200:
return False, True, f"HTTP {code}"
if "serial_gate" not in (body or ""):
return False, True, "no serial_gate block"
return True, False, ""
checks = (("/health", _ok_200), ("/status", _ok_200), ("/queue", _queue_ok))
for path, validator in checks:
url = base + path
note = "unreachable"
for attempt in range(1, _SMOKE_MAX_ATTEMPTS + 1):
code, body = _http_get(url)
ok, immediate_fail, note = validator(code, body)
if ok:
break
if immediate_fail:
return False, f"smoke {path} failed: {note}"
# transient (unreachable) -> bounded retry before declaring FAIL.
if attempt < _SMOKE_MAX_ATTEMPTS:
time.sleep(_SMOKE_BACKOFF_S)
else:
return False, f"smoke {path} {note} after {_SMOKE_MAX_ATTEMPTS} attempts"
return True, "smoke ok (/health, /status, /queue + serial_gate)"
# ---------------------------------------------------------------------------
# exit-code -> result: (D4 / FR-3 / AC-3) — reuse the single contract, no 2nd mapping
# ---------------------------------------------------------------------------
def map_exit_code_to_result(exit_code) -> str:
"""``0 -> PASS``; non-zero / None / non-int -> ``FAIL`` (fail-closed).
A thin token-translator over ``self_deploy.map_exit_code_to_status`` — the SAME
pure, unit-tested ``0->SUCCESS`` contract the deploy-finalizer / staging-runner use
(BR-4: no second, drifting mapping). Only the tokens differ (``result:`` uses
``PASS``/``FAIL``, the ``_TESTS_POSITIVE_TOKENS`` / ``_TESTS_NEGATIVE_TOKENS`` the
gate reads), not the logic."""
from .self_deploy import map_exit_code_to_status as _map
return "PASS" if _map(exit_code) == "SUCCESS" else "FAIL"
# ---------------------------------------------------------------------------
# Artifact 13-test-report.md (D6 / FR-4 / AC-2) — mirror write_staging_log
# ---------------------------------------------------------------------------
def build_test_report(
work_item_id: str, exit_code, result: str, stdout: str = "",
smoke: str = "skipped", *, tool_error: bool = False,
) -> str:
"""Render a ``13-test-report.md`` body whose ``result:`` frontmatter is the verdict
``check_tests_passed`` / ``_parse_tests_verdict`` reads (contract UNCHANGED, AC-2).
Carries the mandatory 52c schema; ``author_agent: test-runner`` / ``model_used:
n/a`` honestly reflect the DETERMINISTIC producer. The machine key ``result:`` and
its UPPERCASE value are NOT changed.
D6.1 (the tester-specific mine, absent in ORCH-115): ``_parse_tests_verdict`` reads
the verdict from THREE equal-rank fields — ``verdict:`` / ``status:`` / ``result:``
— with negative-token priority. The 52c-mandatory ``status:`` is therefore read by
the SAME parser, so it MUST be aligned with the verdict and never contradict
``result:``: ``PASS -> status: success`` (``SUCCESS`` is neither a positive nor a
negative token; the positive ``PASS`` comes from ``result:`` -> the parser gives
True); ``FAIL -> status: failed`` (``FAILED`` is a negative token, consistent with
``FAIL`` -> False). A ``status:`` literal carrying a negative token at ``result:
PASS`` would be a false FAIL of a healthy run (reviewer ≥P1).
Written as a literal block (mirror of ``staging_runner.build_staging_log``) so the
machine-read frontmatter is byte-exact; only the frontmatter is machine-read, the
body is informational."""
import datetime
created = datetime.date.today().isoformat()
sub_status = "success" if result == "PASS" else "failed"
tail = ""
if stdout:
tail_text = stdout.strip()[-1500:]
if tail_text:
tail = f"\npytest stdout (tail):\n```\n{tail_text}\n```\n"
note = (
"Regression suite did NOT execute (tool-error) and the infra-retry budget was "
"exhausted -> fail-closed FAIL."
if tool_error
else f"pytest exit-code `{exit_code}` -> `result: {result}` (smoke: {smoke})."
)
return (
"---\n"
f"result: {result}\n"
f"work_item: {work_item_id}\n"
"stage: testing\n"
"author_agent: test-runner\n"
f"status: {sub_status}\n"
f"created_at: {created}\n"
"model_used: n/a\n"
f"exit_code: {exit_code}\n"
f"smoke: {smoke}\n"
"---\n\n"
"# Test Gate Log (deterministic runner, ORCH-116)\n\n"
f"{note}\n\n"
"Вердикт зафиксирован детерминированным test-раннером (ORCH-116), не LLM. "
"PASS/FAIL = exit-код `pytest` + read-only smoke (`/health`, `/status`, "
"`/queue` + блок `serial_gate`).\n"
f"{tail}"
)
def write_test_report(
repo: str, work_item_id: str, branch: str, exit_code, result: str,
stdout: str = "", smoke: str = "skipped", *, tool_error: bool = False,
) -> bool:
"""Write ``13-test-report.md`` into the task worktree (so ``check_tests_passed``
reads it) and best-effort commit+push it to the FEATURE BRANCH. Returns True iff
the file was written. Never raises.
Mirror of ``staging_runner.write_staging_log``: the actor name is ``test-runner``
and the log is pushed only to the feature branch — there is NO separate PR-merge of
the log into ``main`` (the gate reads the worktree first via ``_repo_path``;
excluding any direct work on ``main`` strengthens AC-10 / BR-7). The feature branch
is merged into ``main`` later by the normal merge-gate path."""
import os
import subprocess
from .git_worktree import get_worktree_path
rel = f"docs/work-items/{work_item_id}/13-test-report.md"
try:
wt = get_worktree_path(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise
logger.error("write_test_report: worktree error for %s/%s: %s", repo, branch, e)
return False
path = os.path.join(wt, rel)
content = build_test_report(
work_item_id, exit_code, result, stdout, smoke, tool_error=tool_error
)
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
except OSError as e:
logger.error("write_test_report: write error at %s: %s", path, e)
return False
# Best-effort commit + push to the feature branch (the gate also falls back to
# origin/main). ORCH-101: HOME + email domain from Settings; the actor NAME stays
# the platform literal `test-runner` (deterministic system-actor commits).
_email = f"test-runner@{settings.git_email_domain}"
git_env = {
**os.environ,
"HOME": settings.agent_home_dir,
"GIT_AUTHOR_NAME": "test-runner",
"GIT_AUTHOR_EMAIL": _email,
"GIT_COMMITTER_NAME": "test-runner",
"GIT_COMMITTER_EMAIL": _email,
}
try:
subprocess.run(["git", "-C", wt, "add", rel],
capture_output=True, timeout=_GIT_TIMEOUT, env=git_env)
commit = subprocess.run(
["git", "-C", wt, "commit", "-m",
f"test(ORCH-116): test gate {result} for {work_item_id}"],
capture_output=True, text=True, timeout=_GIT_TIMEOUT, env=git_env,
)
if commit.returncode == 0:
subprocess.run(["git", "-C", wt, "push", "origin", branch],
capture_output=True, timeout=_GIT_TIMEOUT, env=git_env)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("write_test_report: git commit/push best-effort failed: %s", e)
return True
# ---------------------------------------------------------------------------
# Existing-gate initiation (D7 / FR-5 / AC-4) — no new routing branch
# ---------------------------------------------------------------------------
def _advance(task_id, repo: str, work_item_id: str, branch: str) -> None:
"""Initiate the SAME ``advance_stage`` evaluation a finished LLM-tester would
(``finished_agent="tester"`` on ``testing``): PASS -> ``testing -> deploy-staging``;
FAIL -> the existing rollback ``testing -> development`` + developer-retry
(``stage_engine.py:849``, matched by ``agent == "tester" and qg_name ==
"check_tests_passed"`` — ``finished_agent="tester"`` is MANDATORY, R-2). No new
routing branch. The transition-lease (ORCH-114) is taken INSIDE advance_stage on
the side-effectful edge — the runner never touches it (task boundary O1).
never-raise."""
try:
from . import stage_engine
stage_engine.advance_stage(
task_id=task_id,
current_stage="testing",
repo=repo,
work_item_id=work_item_id,
branch=branch,
finished_agent="tester",
)
except Exception as e: # noqa: BLE001 - never-raise into the worker
logger.error(
"test_runner._advance: advance_stage failed for task %s (%s): %s",
task_id, work_item_id, e,
)
# ---------------------------------------------------------------------------
# Two-level outcome (D5) — tool-error DEFER bookkeeping
# ---------------------------------------------------------------------------
def _infra_retry_count(task_id) -> int:
"""How many times this task was re-queued by the tool-error DEFER path
(restart-safe; counted from the persisted jobs queue by the marker — mirror of
``staging_runner._infra_retry_count``). Never raises -> 0 on error."""
try:
from .db import get_db
conn = get_db()
n = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE task_id=? AND task_content LIKE ?",
(task_id, f"%{_INFRA_RETRY_MARKER}%"),
).fetchone()[0]
conn.close()
return int(n)
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("test_runner._infra_retry_count error for %s: %s", task_id, e)
return 0
def _handle_tool_error(
task_id, repo: str, work_item_id: str, branch: str, result: proc_group.ProcResult
) -> None:
"""Suite did NOT execute (tool-error) -> bounded DEFER, then fail-closed (D5).
Anti ORCH-110: an infra fault is NOT a code fault, so we re-queue a fresh
``tester`` job (which re-enters this runner on the still-``testing`` task) with a
delay instead of an immediate FAIL-rollback that would burn a developer-retry. On
budget exhaustion -> write ``result: FAIL`` + advance (the existing rollback) + an
INFRA-specific alert (explicitly "not a code defect"). Never a silent advance /
false green; never wedges the queue. never-raise."""
retries = _infra_retry_count(task_id)
try:
max_retries = int(settings.test_runner_infra_max_retries)
except (TypeError, ValueError):
max_retries = 2
try:
delay = int(settings.test_runner_infra_retry_delay_s)
except (TypeError, ValueError):
delay = 30
if retries < max_retries:
_bump("deferred")
reason = "timeout" if result.timed_out else "suite did not execute (tool-error)"
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: testing\nNote: {_INFRA_RETRY_MARKER} "
f"(attempt {retries + 1}/{max_retries}) — {reason}, retrying after {delay}s."
)
try:
from .db import enqueue_job
new_job = enqueue_job(
"tester", repo, task_desc, task_id=task_id, available_at_delay_s=delay,
)
logger.warning(
"Task %s (%s): regression suite did not execute (%s) -> infra-DEFER "
"(job_id=%s, attempt %d/%d)",
task_id, work_item_id, reason, new_job, retries + 1, max_retries,
)
except Exception as e: # noqa: BLE001 - never-raise
logger.error("test_runner: infra-DEFER enqueue failed for %s: %s", task_id, e)
return
# Budget exhausted -> fail-closed FAIL (terminal, never a false green).
_bump("fail")
logger.error(
"Task %s (%s): test tool-error DEFER budget exhausted (%d) -> fail-closed FAIL",
task_id, work_item_id, max_retries,
)
write_test_report(repo, work_item_id, branch, result.returncode, "FAIL",
result.stdout, "skipped", tool_error=True)
_alert_infra_exhausted(work_item_id, max_retries)
_advance(task_id, repo, work_item_id, branch)
def _alert_infra_exhausted(work_item_id: str, max_retries: int) -> None:
"""Best-effort Telegram alert that the regression suite never executed (infra, NOT
a code defect) after the retry budget. never-raise."""
try:
from .notifications import send_telegram, link_for
send_telegram(
f"\U0001f6a8 {link_for(work_item_id)}: регресс-сюита не запустилась "
f"(инфра, НЕ дефект кода) после {max_retries} попыток — fail-closed FAIL, "
f"откат на development. Нужно проверить тест-окружение."
)
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("test_runner: infra-exhausted alert failed for %s: %s", work_item_id, e)
# ---------------------------------------------------------------------------
# Entry point (D2) — owns the full deterministic flow, mirror run_staging_gate
# ---------------------------------------------------------------------------
def run_test_gate(job: dict) -> None:
"""Deterministic test gate for a ``tester`` job on ``testing``.
Flow (mirror of ``staging_runner.run_staging_gate``):
1. resolve ``work_item_id`` / ``branch`` by ``task_id``;
2. execute the regression suite in the worktree (D3) -> ProcResult;
3. suite EXECUTED -> map exit-code (+ smoke) -> ``result:``, write
``13-test-report.md``, initiate the existing gate via ``advance_stage`` (D7);
4. suite did NOT execute (tool-error) -> bounded DEFER / fail-closed (D5);
5. observability counters + one structured verdict log (D10).
Never raises into the caller (the launcher marks the job done/failed)."""
started = time.time()
_bump("runs")
task_id = job.get("task_id")
repo = job.get("repo")
# 1. resolve task fields.
work_item_id, branch = None, None
try:
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT work_item_id, branch FROM tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
if row:
work_item_id, branch = row[0], row[1]
except Exception as e: # noqa: BLE001 - never-raise
logger.error("test_runner: task lookup failed for task_id=%s: %s", task_id, e)
if not work_item_id or not branch:
logger.error(
"test_runner: missing work_item_id/branch for task_id=%s — aborting", task_id
)
return
# 2-4. execute + classify + route — guarded so AC-9 (never-raise) holds even if an
# unexpected error escapes a sub-step (the worker must never crash on test infra;
# the task is left on testing for the reconciler/reaper to re-drive).
try:
result = run_test_suite(repo, branch)
duration_s = round(time.time() - started, 1)
suite_ran = (result.returncode is not None) and (not result.timed_out)
if suite_ran:
# 3. trust the exit-code; AND-in the read-only smoke (D3).
exit_verdict = map_exit_code_to_result(result.returncode)
smoke_state = "skipped"
verdict = exit_verdict
if exit_verdict == "PASS" and bool(getattr(settings, "test_runner_smoke_enabled", True)):
smoke_ok, smoke_detail = run_smoke()
smoke_state = "ok" if smoke_ok else "failed"
if not smoke_ok:
verdict = "FAIL"
logger.warning(
"test_runner: pytest green but smoke FAILED for %s: %s",
work_item_id, smoke_detail,
)
_bump("pass" if verdict == "PASS" else "fail")
logger.info(
"test_runner verdict: work_item=%s repo=%s exit_code=%s result=%s "
"smoke=%s duration_s=%s outcome=%s",
work_item_id, repo, result.returncode, verdict, smoke_state, duration_s,
"code-pass" if verdict == "PASS" else "code-fail",
)
write_test_report(repo, work_item_id, branch, result.returncode, verdict,
result.stdout, smoke_state)
_advance(task_id, repo, work_item_id, branch)
return
# 4. tool-error (suite did not execute) -> DEFER / fail-closed (D5).
_bump("tool_error")
logger.warning(
"test_runner verdict: work_item=%s repo=%s exit_code=%s result=%s "
"duration_s=%s outcome=tool-error (timed_out=%s)",
work_item_id, repo, result.returncode, "TOOL-ERROR", duration_s, result.timed_out,
)
_handle_tool_error(task_id, repo, work_item_id, branch, result)
except Exception as e: # noqa: BLE001 - never-raise into the worker (AC-9)
logger.error(
"test_runner.run_test_gate: unexpected error for task %s (%s): %s",
task_id, work_item_id, e,
)
# ---------------------------------------------------------------------------
# Observability (D10 / FR-8 / AC-13)
# ---------------------------------------------------------------------------
def snapshot() -> dict:
"""Read-only test-runner summary for ``GET /queue`` (FR-8 / AC-13).
Additive block; existing ``/queue`` keys are untouched. never-raise: any error ->
a minimal dict with the kill-switch state."""
try:
return {
"enabled": bool(settings.test_runner_enabled),
"repos": getattr(settings, "test_runner_repos", "") or "",
"target": getattr(settings, "test_runner_target", "tests/") or "tests/",
"timeout_s": getattr(settings, "test_runner_timeout_s", _DEFAULT_TIMEOUT_S),
"smoke_enabled": bool(getattr(settings, "test_runner_smoke_enabled", True)),
"infra_max_retries": getattr(settings, "test_runner_infra_max_retries", 2),
"runs": _TEST_RUNNER_COUNTERS["runs"],
"pass": _TEST_RUNNER_COUNTERS["pass"],
"fail": _TEST_RUNNER_COUNTERS["fail"],
"tool_error": _TEST_RUNNER_COUNTERS["tool_error"],
"deferred": _TEST_RUNNER_COUNTERS["deferred"],
}
except Exception as e: # noqa: BLE001 - never-raise -> minimal dict
logger.warning("test_runner.snapshot error: %s", e)
return {"enabled": False}