feat(staging): deterministic staging-runner replacing LLM deployer on deploy-staging (ORCH-115)
All checks were successful
CI / test (push) Successful in 1m8s
CI / test (pull_request) Successful in 1m8s

Replace the LLM `deployer` agent on the `deploy-staging` stage (self-hosting
orchestrator) with a deterministic staging-runner intercepted in launch_job
BEFORE _spawn (the deploy-finalizer / post-deploy-monitor reserved-agent
precedent). The runner executes the SAME staging suite, maps the exit-code to
`staging_status:` via the existing self_deploy.map_exit_code_to_status contract,
writes 15-staging-log.md, and initiates the UNCHANGED check_staging_status gate
exactly as a finished LLM-deployer would.

Invariant (NFR-1): this replaces only the *producer* of the artifact — the
artifact contract, the gate / _parse_staging_status / check_staging_status name,
STAGE_TRANSITIONS, the machine-verdict key `staging_status:` and the DB schema are
byte-for-byte unchanged. Additive, under a kill-switch + repo-scope CSV,
never-raise, fail-safe back to the LLM path.

Two-level outcome (D5, anti ORCH-110): suite executed -> verdict -> advance
(FAILED -> the existing deploy-staging -> development rollback + developer-retry,
same as a FAILED LLM verdict); tool-error (suite did not execute) -> bounded DEFER
-> fail-closed FAILED + alert on exhaustion (infra != code fault; never a silent
advance / false green).

First implemented slice of the LLM determinization roadmap (ORCH-118 A6,
replace-deterministic-now).

- New leaf src/staging_runner.py (never-raise; proc_group tree-kill + timeout)
- launch_job intercept + _run_staging_runner_job (mirror _run_deploy_finalizer_job)
- config: ORCH_STAGING_RUNNER_* keys (enabled/repos/timeout/infra-retry budget)
- GET /queue staging_runner observability block
- docs: llm-call-sites/roadmap/usage-policy (A6 implemented; machine blocks +
  single-transport invariant intact), deployer.md (LLM branch -> fallback),
  CLAUDE.md, CHANGELOG.md, overview (tech-pipeline/tech-agents/tech-quality-security),
  .env.example
- tests/test_orch115_staging_runner.py (TC-01..TC-13); LLM anti-drift green (TC-14)

Refs: ORCH-115

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-16 01:59:43 +03:00
parent f120e4bd8f
commit b50cf1dd08
16 changed files with 1235 additions and 7 deletions

View File

@@ -0,0 +1,438 @@
"""ORCH-115 — deterministic staging-runner replacing the LLM deployer on
`deploy-staging` (self-hosting orchestrator).
Covers Phase 1 (04-test-plan.yaml TC-01…TC-13): the launch_job intercept BEFORE
_spawn, the exit-code -> staging_status: mapping, 15-staging-log.md render + the
UNCHANGED gate contract, advance_stage initiation, kill-switch/scope, never-raise /
fail-safe, the bounded tool-error DEFER, process timeout, self-hosting safety, the
anti-drift invariant (STAGE_TRANSITIONS / QG_CHECKS / DB schema untouched), and the
/queue observability block.
No live Claude CLI, docker or network: the staging subprocess and advance_stage are
mocked; the pure mapping/render is tested directly. (TC-14 — the LLM call-site map
anti-drift — lives in tests/test_llm_call_site_inventory.py / test_llm_determinization_docs.py.)
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch115_staging_runner.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db, enqueue_job # noqa: E402
from src import staging_runner # noqa: E402
from src import stage_engine # noqa: E402
from src import config as cfg # noqa: E402
from src.proc_group import ProcResult # noqa: E402
from src.agents.launcher import AgentLauncher # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# Worktree artefacts land in tmp; git commit/push degrade gracefully (no repo).
monkeypatch.setattr("src.git_worktree.settings.worktrees_dir", str(tmp_path), raising=False)
# Runner ON, self-hosting scope (default).
monkeypatch.setattr(cfg.settings, "staging_runner_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "staging_runner_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "staging_runner_infra_max_retries", 2, raising=False)
# Reset in-process counters between tests.
for k in staging_runner._STAGING_RUNNER_COUNTERS:
staging_runner._STAGING_RUNNER_COUNTERS[k] = 0
yield
def _make_task(stage, repo="orchestrator", wi="ORCH-115", branch="feature/ORCH-115-x"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _make_job(agent, repo, task_id, content="x"):
return enqueue_job(agent, repo, content, task_id=task_id)
def _job_dict(job_id, agent, repo, task_id):
return {"id": job_id, "agent": agent, "repo": repo, "task_id": task_id, "task_content": "x"}
def _read_log(tmp_path, repo, branch, wi):
from src.git_worktree import get_worktree_path
p = os.path.join(get_worktree_path(repo, branch), f"docs/work-items/{wi}/15-staging-log.md")
with open(p, "r", encoding="utf-8") as f:
return f.read()
# ---------------------------------------------------------------------------
# TC-01 — applies(): kill-switch / scope / fail-safe (FR-6 / AC-6)
# ---------------------------------------------------------------------------
def test_tc01_applies_killswitch_and_scope(monkeypatch):
# enabled=False -> False (fall back to the LLM path) regardless of repo.
monkeypatch.setattr(cfg.settings, "staging_runner_enabled", False)
assert staging_runner.applies("orchestrator") is False
# enabled, empty CSV -> self-hosting only.
monkeypatch.setattr(cfg.settings, "staging_runner_enabled", True)
monkeypatch.setattr(cfg.settings, "staging_runner_repos", "")
assert staging_runner.applies("orchestrator") is True
assert staging_runner.applies("enduro-trails") is False
assert staging_runner.applies("") is False
# non-empty CSV -> membership (case-insensitive).
monkeypatch.setattr(cfg.settings, "staging_runner_repos", "enduro-trails, orchestrator")
assert staging_runner.applies("ENDURO-TRAILS") is True
assert staging_runner.applies("other-repo") is False
def test_tc01_applies_never_raises(monkeypatch):
# A settings attribute that explodes on read -> applies() degrades to False.
class Boom:
@property
def staging_runner_enabled(self):
raise RuntimeError("boom")
monkeypatch.setattr(staging_runner, "settings", Boom())
assert staging_runner.applies("orchestrator") is False
# ---------------------------------------------------------------------------
# TC-02 — exit-code -> verdict, single shared contract (FR-3 / AC-3)
# ---------------------------------------------------------------------------
def test_tc02_map_exit_code():
from src import self_deploy
assert staging_runner.map_exit_code_to_status(0) == "SUCCESS"
for bad in (1, 2, 137, -9):
assert staging_runner.map_exit_code_to_status(bad) == "FAILED"
for noncode in (None, "x", object()):
assert staging_runner.map_exit_code_to_status(noncode) == "FAILED"
# Same contract as the deploy-finalizer (no second, drifting mapping).
for v in (0, 1, None, "garbage"):
assert staging_runner.map_exit_code_to_status(v) == self_deploy.map_exit_code_to_status(v)
# ---------------------------------------------------------------------------
# TC-03 — 15-staging-log.md render: machine key + 52c schema + INFRA-WAIVED (FR-4 / AC-2)
# ---------------------------------------------------------------------------
def test_tc03_log_render_schema_and_infra_waived():
stdout = "B6 OK\nINFRA-WAIVED: B9 image-build skipped (sandbox)\nDONE"
body = staging_runner.build_staging_log("ORCH-115", 0, "SUCCESS", stdout)
assert "staging_status: SUCCESS" in body # UPPERCASE machine key
for field in ("work_item: ORCH-115", "stage: deploy-staging",
"author_agent: staging-runner", "status: success",
"created_at:", "model_used: n/a"):
assert field in body, f"missing 52c field: {field}"
# INFRA-WAIVED line copied into the body for observability.
assert "INFRA-WAIVED: B9 image-build skipped (sandbox)" in body
failed = staging_runner.build_staging_log("ORCH-115", 1, "FAILED", "boom")
assert "staging_status: FAILED" in failed
assert "status: failed" in failed
# ---------------------------------------------------------------------------
# TC-04 — generated log read by the UNCHANGED _parse_staging_status (AC-2)
# ---------------------------------------------------------------------------
def test_tc04_gate_parser_unchanged():
from src.qg.checks import _parse_staging_status
ok, reason = _parse_staging_status(staging_runner.build_staging_log("ORCH-115", 0, "SUCCESS"))
assert ok is True and "SUCCESS" in reason
bad, reason2 = _parse_staging_status(staging_runner.build_staging_log("ORCH-115", 2, "FAILED"))
assert bad is False and "FAILED" in reason2
# ---------------------------------------------------------------------------
# TC-05 — launch_job intercepts deployer on deploy-staging BEFORE _spawn (AC-1)
# ---------------------------------------------------------------------------
def test_tc05_launch_job_intercepts_before_spawn(monkeypatch):
tid = _make_task("deploy-staging")
jid = _make_job("deployer", "orchestrator", tid)
lr = AgentLauncher()
spawn = MagicMock(return_value=999)
monkeypatch.setattr(lr, "_spawn", spawn)
run_gate = MagicMock()
monkeypatch.setattr(staging_runner, "run_staging_gate", run_gate)
ret = lr.launch_job(_job_dict(jid, "deployer", "orchestrator", tid))
assert ret is None # no agent_runs row
spawn.assert_not_called() # LLM path NOT taken
run_gate.assert_called_once() # deterministic runner ran
# jobs row driven to done by the launcher (no monitor/agent).
conn = get_db()
status = conn.execute("SELECT status FROM jobs WHERE id=?", (jid,)).fetchone()[0]
conn.close()
assert status == "done"
# ---------------------------------------------------------------------------
# TC-06 — stage discriminator: deployer on `deploy` is NOT intercepted (AC-1 / R-1)
# ---------------------------------------------------------------------------
def test_tc06_stage_discriminator_prod_not_intercepted(monkeypatch):
tid = _make_task("deploy") # prod edge, not deploy-staging
jid = _make_job("deployer", "orchestrator", tid)
lr = AgentLauncher()
spawn = MagicMock(return_value=42)
monkeypatch.setattr(lr, "_spawn", spawn)
run_gate = MagicMock()
monkeypatch.setattr(staging_runner, "run_staging_gate", run_gate)
ret = lr.launch_job(_job_dict(jid, "deployer", "orchestrator", tid))
assert ret == 42 # _spawn path taken
spawn.assert_called_once()
run_gate.assert_not_called()
assert staging_runner.should_intercept(_job_dict(jid, "deployer", "orchestrator", tid)) is False
def test_tc06_non_self_repo_not_intercepted(monkeypatch):
tid = _make_task("deploy-staging", repo="enduro-trails", wi="ET-9", branch="feature/ET-9-x")
jid = _make_job("deployer", "enduro-trails", tid)
assert staging_runner.should_intercept(_job_dict(jid, "deployer", "enduro-trails", tid)) is False
# ---------------------------------------------------------------------------
# TC-07 — routing equivalence: SUCCESS -> advance; FAILED -> same path (AC-4)
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("rc,expected", [(0, "SUCCESS"), (1, "FAILED")])
def test_tc07_advance_initiated_like_llm(monkeypatch, tmp_path, rc, expected):
tid = _make_task("deploy-staging")
monkeypatch.setattr(staging_runner, "run_staging_suite",
lambda: ProcResult(returncode=rc, stdout="out", stderr="", timed_out=False))
advance = MagicMock()
monkeypatch.setattr(stage_engine, "advance_stage", advance)
staging_runner.run_staging_gate(_job_dict(1, "deployer", "orchestrator", tid))
advance.assert_called_once()
kwargs = advance.call_args.kwargs
assert kwargs["current_stage"] == "deploy-staging"
assert kwargs["finished_agent"] == "deployer"
assert kwargs["work_item_id"] == "ORCH-115"
# The log written for the gate carries the matching verdict.
assert f"staging_status: {expected}" in _read_log(tmp_path, "orchestrator", "feature/ORCH-115-x", "ORCH-115")
# ---------------------------------------------------------------------------
# TC-08 — kill-switch: enabled=False -> LLM path via _spawn (AC-6)
# ---------------------------------------------------------------------------
def test_tc08_killswitch_falls_back_to_spawn(monkeypatch):
monkeypatch.setattr(cfg.settings, "staging_runner_enabled", False)
tid = _make_task("deploy-staging")
jid = _make_job("deployer", "orchestrator", tid)
lr = AgentLauncher()
spawn = MagicMock(return_value=7)
monkeypatch.setattr(lr, "_spawn", spawn)
run_gate = MagicMock()
monkeypatch.setattr(staging_runner, "run_staging_gate", run_gate)
ret = lr.launch_job(_job_dict(jid, "deployer", "orchestrator", tid))
assert ret == 7
spawn.assert_called_once() # byte-for-byte the prior LLM-deployer path
run_gate.assert_not_called()
# ---------------------------------------------------------------------------
# TC-09 — anti-drift NFR-1: STAGE_TRANSITIONS / QG_CHECKS / schema untouched (AC-5)
# ---------------------------------------------------------------------------
def test_tc09_pipeline_contract_unchanged():
from src.stages import STAGE_TRANSITIONS
from src.qg.checks import QG_CHECKS
# The deploy-staging edge + its gate are byte-for-byte the prior contract.
assert STAGE_TRANSITIONS["deploy-staging"] == {
"next": "deploy", "agent": "deployer", "qg": "check_staging_status"
}
assert "check_staging_status" in QG_CHECKS
# No new ORCH-115 table/column: only the existing tables exist.
conn = get_db()
tables = {r[0] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
conn.close()
assert not any("staging_runner" in t for t in tables)
# The machine key is not renamed: the runner emits `staging_status:`.
assert "staging_status:" in staging_runner.build_staging_log("ORCH-115", 0, "SUCCESS")
# ---------------------------------------------------------------------------
# TC-10 — never-raise / fail-safe: tool-error never a silent advance/false green (AC-7)
# ---------------------------------------------------------------------------
def test_tc10_nonzero_exit_is_failed_and_advances(monkeypatch, tmp_path):
tid = _make_task("deploy-staging")
monkeypatch.setattr(staging_runner, "run_staging_suite",
lambda: ProcResult(returncode=3, stdout="fail", stderr="", timed_out=False))
advance = MagicMock()
monkeypatch.setattr(stage_engine, "advance_stage", advance)
staging_runner.run_staging_gate(_job_dict(1, "deployer", "orchestrator", tid))
advance.assert_called_once()
assert "staging_status: FAILED" in _read_log(tmp_path, "orchestrator", "feature/ORCH-115-x", "ORCH-115")
def test_tc10_timeout_defers_without_advance(monkeypatch):
tid = _make_task("deploy-staging")
monkeypatch.setattr(cfg.settings, "staging_runner_infra_max_retries", 2)
monkeypatch.setattr(staging_runner, "run_staging_suite",
lambda: ProcResult(returncode=None, stdout="", stderr="", timed_out=True))
advance = MagicMock()
monkeypatch.setattr(stage_engine, "advance_stage", advance)
staging_runner.run_staging_gate(_job_dict(1, "deployer", "orchestrator", tid))
advance.assert_not_called() # NO silent advance / false green on infra hiccup
# A fresh deployer job was re-queued with the restart-safe marker (bounded DEFER).
conn = get_db()
n = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE task_id=? AND task_content LIKE ?",
(tid, f"%{staging_runner._INFRA_RETRY_MARKER}%"),
).fetchone()[0]
conn.close()
assert n == 1
assert staging_runner._STAGING_RUNNER_COUNTERS["deferred"] == 1
assert staging_runner._STAGING_RUNNER_COUNTERS["tool_error"] == 1
def test_tc10_tool_error_budget_exhausted_fails_closed(monkeypatch, tmp_path):
tid = _make_task("deploy-staging")
monkeypatch.setattr(cfg.settings, "staging_runner_infra_max_retries", 0) # exhausted immediately
monkeypatch.setattr(staging_runner, "run_staging_suite",
lambda: ProcResult(returncode=None, stdout="", stderr="", timed_out=True))
advance = MagicMock()
monkeypatch.setattr(stage_engine, "advance_stage", advance)
staging_runner.run_staging_gate(_job_dict(1, "deployer", "orchestrator", tid))
advance.assert_called_once() # fail-closed: write FAILED + advance (existing rollback)
assert "staging_status: FAILED" in _read_log(tmp_path, "orchestrator", "feature/ORCH-115-x", "ORCH-115")
def test_tc10_run_gate_never_raises(monkeypatch):
tid = _make_task("deploy-staging")
def boom():
raise RuntimeError("docker exec exploded")
monkeypatch.setattr(staging_runner, "run_staging_suite", boom)
# Must NOT raise (AC-7): the worker is protected even on an unexpected error.
staging_runner.run_staging_gate(_job_dict(1, "deployer", "orchestrator", tid))
def test_tc10_launcher_contains_runner_fault(monkeypatch):
tid = _make_task("deploy-staging")
jid = _make_job("deployer", "orchestrator", tid)
lr = AgentLauncher()
monkeypatch.setattr(lr, "_spawn", MagicMock())
monkeypatch.setattr(staging_runner, "run_staging_gate",
MagicMock(side_effect=RuntimeError("kaboom")))
# The launcher wrapper contains the fault -> job marked failed, never raises.
ret = lr.launch_job(_job_dict(jid, "deployer", "orchestrator", tid))
assert ret is None
conn = get_db()
status = conn.execute("SELECT status FROM jobs WHERE id=?", (jid,)).fetchone()[0]
conn.close()
assert status == "failed"
# ---------------------------------------------------------------------------
# TC-11 — timeout resolution + propagation (NFR-4 / AC-9)
# ---------------------------------------------------------------------------
def test_tc11_resolve_timeout_default_on_bad_value(monkeypatch):
monkeypatch.setattr(cfg.settings, "staging_runner_timeout_s", 1234)
assert staging_runner._resolve_timeout() == 1234
for bad in (0, -5, "abc", None):
monkeypatch.setattr(cfg.settings, "staging_runner_timeout_s", bad)
assert staging_runner._resolve_timeout() == staging_runner._DEFAULT_TIMEOUT_S
def test_tc11_timeout_passed_to_proc_group(monkeypatch):
monkeypatch.setattr(cfg.settings, "staging_runner_timeout_s", 321)
monkeypatch.setattr(cfg.settings, "subprocess_tree_kill_enabled", True)
captured = {}
def fake_run(cmd, *, cwd, timeout, grace_s, tree_kill):
captured.update(cmd=cmd, timeout=timeout, tree_kill=tree_kill)
return ProcResult(returncode=0, stdout="", stderr="", timed_out=False)
monkeypatch.setattr(staging_runner.proc_group, "run_in_process_group", fake_run)
staging_runner.run_staging_suite()
assert captured["timeout"] == 321
assert captured["tree_kill"] is True
assert captured["cmd"][:2] == ["docker", "exec"]
# ---------------------------------------------------------------------------
# TC-12 — self-hosting safety: no forbidden literals in the runner command (AC-8)
# ---------------------------------------------------------------------------
def test_tc12_command_has_no_dangerous_literals():
cmd = " ".join(staging_runner.build_staging_command())
forbidden = ("compose", "up -d", "--build", "8500", "force", "push", ".env",
"rm ", "restart")
for token in forbidden:
assert token not in cmd, f"forbidden literal {token!r} in runner command: {cmd}"
# It IS the staging suite against 8501 via docker exec.
assert "docker exec" in cmd
assert "staging_check.py" in cmd
assert "8501" in cmd
assert "--mode stub" in cmd
# ---------------------------------------------------------------------------
# TC-13 — observability: /queue block + structured verdict log (AC-10)
# ---------------------------------------------------------------------------
def test_tc13_snapshot_shape():
snap = staging_runner.snapshot()
for k in ("enabled", "repos", "timeout_s", "infra_max_retries",
"runs", "success", "failed", "tool_error", "deferred"):
assert k in snap, f"snapshot missing key {k}"
def test_tc13_queue_endpoint_includes_block(monkeypatch):
import asyncio
from src import main
payload = asyncio.run(main.queue())
assert "staging_runner" in payload
assert "enabled" in payload["staging_runner"]
def test_tc13_structured_verdict_log_distinguishes_outcomes(monkeypatch, caplog, tmp_path):
tid = _make_task("deploy-staging")
monkeypatch.setattr(stage_engine, "advance_stage", MagicMock())
# code-pass
monkeypatch.setattr(staging_runner, "run_staging_suite",
lambda: ProcResult(returncode=0, stdout="", stderr="", timed_out=False))
with caplog.at_level("INFO", logger="orchestrator.staging_runner"):
staging_runner.run_staging_gate(_job_dict(1, "deployer", "orchestrator", tid))
assert any("outcome=code-pass" in r.message for r in caplog.records)
caplog.clear()
# tool-error
monkeypatch.setattr(cfg.settings, "staging_runner_infra_max_retries", 2)
monkeypatch.setattr(staging_runner, "run_staging_suite",
lambda: ProcResult(returncode=None, stdout="", stderr="", timed_out=True))
with caplog.at_level("WARNING", logger="orchestrator.staging_runner"):
staging_runner.run_staging_gate(_job_dict(1, "deployer", "orchestrator", tid))
assert any("outcome=tool-error" in r.message for r in caplog.records)
def test_tc13_snapshot_never_raises(monkeypatch):
class Boom:
def __getattr__(self, name):
raise RuntimeError("boom")
monkeypatch.setattr(staging_runner, "settings", Boom())
snap = staging_runner.snapshot()
assert snap["enabled"] is False