Files
orchestrator/tests/test_orch109_timeout_model.py
claude-bot 6bd7f9ba84 fix(launcher): raise developer/reviewer timeout budgets + stamp model at launch
Two additive, isolated launch-subsystem fixes from incident ORCH-104, without
touching STAGE_TRANSITIONS / QG_CHECKS / check_* / machine-verdict / DB schema.

D1 — launch-time model stamp: write the resolved model into agent_runs.model in
the SAME UPDATE as the effort stamp (ORCH-087), so the model is present from
launch, survives a timeout-kill (exit_code=-9), and is visible in-flight in
/metrics & /queue. record_usage stays an enrichment (model=COALESCE preserves the
launch stamp when the usage JSON model is None). never-raise (isolated try/except).

D3/D4 — dedicated per-role budgets: agent_timeout_developer_s=3600 /
agent_timeout_reviewer_s=3000 with a deterministic _resolve_timeout ladder
(overrides_json[agent] > dedicated role key > agent_timeout_seconds=1800; other
roles byte-for-byte). Malformed/non-positive config falls back to the global
default + WARNING (never-break). reaper_max_running_s raised 3600 -> 5400 in
lockstep to keep the ORCH-065 invariant (5400 > 3600 + 20 = 3620).

FR-4 (kill / in-flight visibility) and FR-5 (anti-salvage) are structural in the
existing code; pinned here by regression tests (tests/test_orch109_timeout_model.py,
TC-01..TC-12). Docs: .env.example, config passport, CHANGELOG, CLAUDE.md
(README/internals authored by architect in this branch).

Refs: ORCH-109

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 14:26:11 +03:00

555 lines
24 KiB
Python

"""ORCH-109: timeout budgets + launch-time model telemetry for developer/reviewer.
Covers FR-1..FR-6 / AC-1..AC-10 through TC-01..TC-12 (04-test-plan.yaml). Fully
deterministic: an isolated temp SQLite DB + synthetic agent_runs / jobs rows; no
network, no Claude CLI subprocess. Settings are monkeypatched / overridden.
Two production changes under test (ADR-001):
* D1 — launcher._spawn stamps the resolved model into agent_runs.model in the
SAME UPDATE as the effort stamp, so the model is present from launch and
survives a timeout-kill / is visible in-flight.
* D3/D4 — launcher._resolve_timeout grows a dedicated per-role budget level
(developer 3600 / reviewer 3000) between the JSON escape-hatch and the global
default; reaper_max_running_s raised 3600 -> 5400 in lockstep (ORCH-065).
FR-2 (COALESCE preserve), FR-4/NFR-6 (kill / in-flight visibility) and FR-5
(anti-salvage) are STRUCTURAL guarantees already present in the code — pinned here
as regression tests, not new branches.
"""
import os
import sqlite3
import tempfile
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
_test_db = os.path.join(tempfile.gettempdir(), "test_orch109_timeout_model.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ.setdefault("ORCH_REPOS_DIR", tempfile.gettempdir())
import pytest # noqa: E402
import src.db as db_module # noqa: E402
from src.db import init_db, get_db, get_running_agents # noqa: E402
from src.config import settings, Settings # noqa: E402
from src.agents.launcher import AgentLauncher, resolve_agent_model # noqa: E402
from src import usage as U # noqa: E402
from src import notifications as N # noqa: E402
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
# get_db() reads settings.db_path live; pin it to our isolated DB.
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# render-only tests: never consult the live Plane overlay (no network).
monkeypatch.setattr(N._get_settings(), "tracker_live_status", False, raising=False)
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
# --------------------------------------------------------------------------- #
# TC-01..TC-03 — _resolve_timeout dedicated-budget ladder (FR-3, AC-3 / AC-4)
# --------------------------------------------------------------------------- #
class TestResolveTimeoutLadder:
"""The priority ladder: overrides_json > dedicated role key > global default."""
def _pin(self, monkeypatch, *, dev=3600, rev=3000, default=1800, overrides=""):
monkeypatch.setattr(settings, "agent_timeout_seconds", default)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", overrides)
monkeypatch.setattr(settings, "agent_timeout_developer_s", dev)
monkeypatch.setattr(settings, "agent_timeout_reviewer_s", rev)
def test_tc01_developer_reviewer_raised(self, monkeypatch):
"""TC-01/AC-3: developer/reviewer resolve to their raised dedicated budget."""
self._pin(monkeypatch)
assert AgentLauncher._resolve_timeout("developer") == 3600
assert AgentLauncher._resolve_timeout("reviewer") == 3000
def test_tc01_dedicated_keys_are_configurable(self, monkeypatch):
"""TC-01/AC-3: the budgets are config-driven, not hardcoded."""
self._pin(monkeypatch, dev=4200, rev=2400)
assert AgentLauncher._resolve_timeout("developer") == 4200
assert AgentLauncher._resolve_timeout("reviewer") == 2400
def test_tc02_other_roles_use_global_default(self, monkeypatch):
"""TC-02/AC-3: roles without a dedicated key keep the global default (1800)."""
self._pin(monkeypatch)
for role in ("analyst", "architect", "tester", "deployer"):
assert AgentLauncher._resolve_timeout(role) == 1800
# unknown role / None also fall through to the global default.
assert AgentLauncher._resolve_timeout("unknown-role") == 1800
assert AgentLauncher._resolve_timeout(None) == 1800
def test_tc01_overrides_json_wins_over_dedicated(self, monkeypatch):
"""AC-3: the operator JSON escape-hatch stays HIGHEST priority for ANY role."""
self._pin(monkeypatch, overrides='{"developer": 1234, "reviewer": 999}')
assert AgentLauncher._resolve_timeout("developer") == 1234
assert AgentLauncher._resolve_timeout("reviewer") == 999
def test_tc03_malformed_overrides_json_never_raises(self, monkeypatch):
"""TC-03/AC-4: malformed JSON is ignored; resolution still succeeds (never-break)."""
self._pin(monkeypatch, overrides="{not-json")
# malformed JSON ignored -> developer still resolves via its dedicated key.
assert AgentLauncher._resolve_timeout("developer") == 3600
# a role without a dedicated key falls through to the global default.
assert AgentLauncher._resolve_timeout("analyst") == 1800
@pytest.mark.parametrize("bad", [0, -5, "abc"])
def test_tc03_non_positive_dedicated_falls_back(self, monkeypatch, bad):
"""TC-03/AC-4: an absurd/non-positive/non-int dedicated value -> global default."""
self._pin(monkeypatch, dev=bad)
# must NOT raise; falls back to agent_timeout_seconds + WARNING.
assert AgentLauncher._resolve_timeout("developer") == 1800
# --------------------------------------------------------------------------- #
# TC-04 / TC-05 — launch-time model stamp in _spawn (FR-1, AC-1 + NFR-2)
# --------------------------------------------------------------------------- #
class TestLaunchModelStamp:
"""_spawn writes the resolved model to agent_runs.model at launch (next to effort)."""
def _seed_task(self, repo="orchestrator", branch="feature/ORCH-109-x", wid="ORCH-109"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES (?,?,?,?,?,?)",
("p1", wid, repo, branch, "development", "t"),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _fake_spawn_env(self, tmp_path, monkeypatch, repo="orchestrator"):
"""Fake every OS/process side-effect so _spawn touches only the DB."""
import src.agents.launcher as L
(tmp_path / repo).mkdir()
monkeypatch.setattr(L.settings, "repos_dir", str(tmp_path), raising=False)
monkeypatch.setattr(L.settings, "runs_dir", str(tmp_path / "runs"), raising=False)
monkeypatch.setattr(L, "ensure_worktree", lambda r, b: str(tmp_path / repo))
monkeypatch.setattr("src.projects.get_project_by_repo", lambda r: None)
class _Proc:
pid = 4242
monkeypatch.setattr(L.subprocess, "Popen", lambda *a, **k: _Proc())
class _T:
def __init__(self, *a, **k):
pass
def start(self):
pass
monkeypatch.setattr(L.threading, "Thread", _T)
monkeypatch.setattr(L, "notify_agent_started", lambda *a, **k: None)
return L
def test_tc04_spawn_stamps_model_and_effort(self, tmp_path, monkeypatch):
"""TC-04/AC-1: after _spawn the run row carries the resolved model AND effort."""
L = self._fake_spawn_env(tmp_path, monkeypatch)
# Deterministic resolve: developer -> claude-opus-4-8 (default) / xhigh (floor).
monkeypatch.setattr(L.settings, "agent_model_developer", "", raising=False)
monkeypatch.setattr(L.settings, "agent_model_default", "claude-opus-4-8", raising=False)
monkeypatch.setattr(L.settings, "agent_effort_developer", "", raising=False)
monkeypatch.setattr(L.settings, "agent_effort_default", "", raising=False)
tid = self._seed_task()
run_id = L.AgentLauncher()._spawn(
"developer", "orchestrator", task_content=None, task_id=tid
)
conn = get_db()
row = conn.execute(
"SELECT model, effort FROM agent_runs WHERE id=?", (run_id,)
).fetchone()
conn.close()
assert row["model"] == "claude-opus-4-8"
assert row["effort"] == "xhigh"
# The stamp matches the resolver — single source of truth.
assert row["model"] == resolve_agent_model("developer", None)
def test_tc05_stamp_failure_is_isolated(self, tmp_path, monkeypatch):
"""TC-05/NFR-2: a failing model/effort stamp does NOT propagate out of _spawn."""
L = self._fake_spawn_env(tmp_path, monkeypatch)
real_get_db = db_module.get_db
class _RaisingConn:
"""Delegates to a real conn but raises on the launch stamp UPDATE only."""
def __init__(self, real):
self._real = real
def execute(self, sql, *a, **k):
if "SET model=?, effort=?" in sql:
raise sqlite3.OperationalError("simulated stamp failure")
return self._real.execute(sql, *a, **k)
def commit(self):
return self._real.commit()
def close(self):
return self._real.close()
def __getattr__(self, name):
return getattr(self._real, name)
monkeypatch.setattr(L, "get_db", lambda: _RaisingConn(real_get_db()))
tid = self._seed_task()
# Must NOT raise even though the stamp UPDATE blows up.
run_id = L.AgentLauncher()._spawn(
"developer", "orchestrator", task_content=None, task_id=tid
)
assert run_id is not None
# The run row exists; model stayed NULL (stamp failed) — launch unharmed.
conn = real_get_db()
row = conn.execute(
"SELECT id, model FROM agent_runs WHERE id=?", (run_id,)
).fetchone()
conn.close()
assert row is not None
assert row["model"] is None
# --------------------------------------------------------------------------- #
# TC-06 / TC-07 — post-hoc enrich preserves / refines the launch stamp (FR-2)
# --------------------------------------------------------------------------- #
class TestRecordUsagePreservesStamp:
"""record_usage (model=COALESCE(?, model)) never clobbers a launch-stamped model."""
def _run_with_model(self, model="claude-opus-4-8", agent="developer"):
conn = get_db()
cur = conn.execute(
"INSERT INTO agent_runs (task_id, agent, model) VALUES (?,?,?)",
(1, agent, model),
)
rid = cur.lastrowid
conn.commit()
conn.close()
return rid
def _model_of(self, rid):
conn = get_db()
row = conn.execute("SELECT model FROM agent_runs WHERE id=?", (rid,)).fetchone()
conn.close()
return row["model"]
def test_tc06_record_usage_none_preserves_model(self):
"""TC-06/AC-2: usage=None (no final JSON, e.g. timeout) keeps the launch stamp."""
rid = self._run_with_model()
U.record_usage(rid, None) # must not raise
assert self._model_of(rid) == "claude-opus-4-8"
def test_tc06_record_usage_model_none_preserves_model(self):
"""TC-06/AC-2: a usage JSON with model=None keeps the launch stamp (COALESCE)."""
rid = self._run_with_model()
U.record_usage(rid, {"input_tokens": 10, "output_tokens": 5, "model": None})
assert self._model_of(rid) == "claude-opus-4-8"
def test_tc07_record_usage_nonempty_model_enriches_blank(self):
"""TC-07/AC-2: a non-empty model in the JSON sets a blank (CLI-default) stamp."""
rid = self._run_with_model(model=None)
U.record_usage(
rid, {"input_tokens": 1, "output_tokens": 1, "model": "claude-opus-4-8"}
)
assert self._model_of(rid) == "claude-opus-4-8"
def test_tc07_record_usage_refines_existing_model(self):
"""TC-07/AC-2: a fuller provider-prefixed id refines a bare launch stamp."""
rid = self._run_with_model(model="claude-opus-4-8")
U.record_usage(
rid,
{"input_tokens": 1, "output_tokens": 1, "model": "tokenator/claude-opus-4-8"},
)
assert self._model_of(rid) == "tokenator/claude-opus-4-8"
# --------------------------------------------------------------------------- #
# TC-08 — reaper cross-invariant (NFR-4, AC-5)
# --------------------------------------------------------------------------- #
class TestReaperInvariant:
"""reaper_max_running_s MUST stay > max(resolved timeout) + grace (ORCH-065)."""
def test_tc08_shipped_defaults_satisfy_invariant(self, monkeypatch):
"""TC-08/AC-5: the canonical shipped defaults hold the invariant."""
for name in (
"ORCH_AGENT_TIMEOUT_SECONDS",
"ORCH_AGENT_KILL_GRACE_SECONDS",
"ORCH_AGENT_TIMEOUT_OVERRIDES_JSON",
"ORCH_AGENT_TIMEOUT_DEVELOPER_S",
"ORCH_AGENT_TIMEOUT_REVIEWER_S",
"ORCH_REAPER_MAX_RUNNING_S",
):
monkeypatch.delenv(name, raising=False)
s = Settings()
max_budget = max(
s.agent_timeout_seconds,
s.agent_timeout_developer_s,
s.agent_timeout_reviewer_s,
)
assert s.reaper_max_running_s > max_budget + s.agent_kill_grace_seconds
# Concrete shipped numbers (ADR-001 D4): 5400 > 3600 + 20 = 3620.
assert (max_budget, s.agent_kill_grace_seconds, s.reaper_max_running_s) == (
3600,
20,
5400,
)
def test_tc08_resolved_max_is_developer(self, monkeypatch):
"""TC-08/AC-5: the max resolved per-role budget is the developer budget."""
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "")
monkeypatch.setattr(settings, "agent_timeout_developer_s", 3600)
monkeypatch.setattr(settings, "agent_timeout_reviewer_s", 3000)
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 20)
monkeypatch.setattr(settings, "reaper_max_running_s", 5400)
roles = ["analyst", "architect", "developer", "reviewer", "tester", "deployer"]
max_timeout = max(AgentLauncher._resolve_timeout(r) for r in roles)
assert max_timeout == 3600
assert settings.reaper_max_running_s > max_timeout + settings.agent_kill_grace_seconds
# --------------------------------------------------------------------------- #
# TC-09 — tracker stage line shows model+effort on a timeout-killed run (FR-4)
# --------------------------------------------------------------------------- #
class TestTrackerTimeoutVisibility:
"""A -9 run still renders '· <model> · <effort>' because both are launch-stamped.
The stage line takes its model/effort from the LAST run of the agent
(stage_runs[-1] in _stage_line). When that last run is a timeout-kill (-9), its
launch-stamped values are exactly what the operator sees — the whole point of
stamping at launch. Without the stamp the -9 row would carry model=NULL and the
line would drop the model suffix (the AC-6 FAIL condition).
"""
def _mk_task(self, stage="done", wid="ORCH-109"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES (?,?,?,?,?,?)",
("p1", wid, "orchestrator", "feature/ORCH-109-x", stage, "t"),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _add_run(self, tid, *, exit_code, model, effort, started, finished):
conn = get_db()
conn.execute(
"INSERT INTO agent_runs (task_id, agent, started_at, finished_at, "
"exit_code, input_tokens, output_tokens, cost_usd, model, effort) "
"VALUES (?,?,?,?,?,?,?,?,?,?)",
(tid, "developer", started, finished, exit_code, 10, 5, 0.0, model, effort),
)
conn.commit()
conn.close()
def test_tc09_killed_run_renders_model_effort(self):
"""TC-09/AC-6: the -9 (last) developer run's launch-stamped model+effort show."""
tid = self._mk_task(stage="done")
# run 1: succeeded (opens the ✅ stage line) — DIFFERENT model so we can prove
# the displayed value comes from the killed run, not this one.
self._add_run(
tid,
exit_code=0,
model="claude-sonnet-4-6",
effort="high",
started="2026-06-14 09:00:00",
finished="2026-06-14 09:20:00",
)
# run 2: timeout-killed (-9), the LAST run -> _stage_line reads its row.
self._add_run(
tid,
exit_code=-9,
model="tokenator/claude-opus-4-8",
effort="xhigh",
started="2026-06-14 09:25:00",
finished="2026-06-14 09:55:00",
)
text = N.render_task_tracker(tid)
line = [ln for ln in text.splitlines() if ln.startswith("✅ Разработка")][0]
# model NOT null: the killed run's launch-stamped opus-4-8 · xhigh is shown.
assert line.rstrip().endswith("opus-4-8 · xhigh")
assert "sonnet" not in line # the displayed value is the -9 run's, not run 1's
def test_tc09_unstamped_killed_run_drops_model_suffix(self):
"""AC-6 FAIL-guard: a -9 run with model=NULL would omit the suffix (negative)."""
tid = self._mk_task(stage="done")
self._add_run(
tid,
exit_code=0,
model="tokenator/claude-opus-4-8",
effort="xhigh",
started="2026-06-14 09:00:00",
finished="2026-06-14 09:20:00",
)
# killed run WITHOUT a launch stamp (the pre-ORCH-109 bug): model+effort NULL.
self._add_run(
tid,
exit_code=-9,
model=None,
effort=None,
started="2026-06-14 09:25:00",
finished="2026-06-14 09:55:00",
)
text = N.render_task_tracker(tid)
line = [ln for ln in text.splitlines() if ln.startswith("✅ Разработка")][0]
# No launch stamp -> the model/effort suffix is dropped (cost shown without model).
assert "opus-4-8" not in line
assert "xhigh" not in line
# --------------------------------------------------------------------------- #
# TC-10 — in-flight model visibility via get_running_agents (NFR-6)
# --------------------------------------------------------------------------- #
class TestInflightModelVisibility:
"""get_running_agents exposes the launch-stamped model for a RUNNING job."""
def test_tc10_running_job_exposes_model(self):
"""TC-10/AC-7: /metrics & /queue see the model before the run finishes."""
conn = get_db()
cur = conn.execute(
"INSERT INTO agent_runs (task_id, agent, model, effort) VALUES (?,?,?,?)",
(1, "developer", "claude-opus-4-8", "xhigh"),
)
rid = cur.lastrowid
conn.execute(
"INSERT INTO jobs (agent, repo, status, run_id, started_at) "
"VALUES (?,?,?,?,datetime('now'))",
("developer", "orchestrator", "running", rid),
)
conn.commit()
conn.close()
rows = get_running_agents()
assert len(rows) == 1
assert rows[0]["model"] == "claude-opus-4-8" # non-null in-flight
assert rows[0]["effort"] == "xhigh"
# --------------------------------------------------------------------------- #
# TC-11 — anti-salvage: a timeout-killed run does NOT advance the stage (FR-5)
# --------------------------------------------------------------------------- #
class TestAntiSalvage:
"""Advancement is gated by `if exit_code == 0`; a -9 run is routed to retry/fail."""
class _Proc:
def __init__(self, code):
self._code = code
def wait(self):
return self._code
def _seed_run(self, agent="developer"):
conn = get_db()
cur = conn.execute(
"INSERT INTO agent_runs (task_id, agent) VALUES (?,?)", (1, agent)
)
rid = cur.lastrowid
conn.commit()
conn.close()
return rid
def _drive(self, monkeypatch, exit_code, agent="developer", job_id=7):
import src.agents.launcher as L
calls = {"advance": [], "finalize": []}
monkeypatch.setattr(
L.AgentLauncher,
"_try_advance_stage",
lambda self, *a, **k: calls["advance"].append(a),
)
monkeypatch.setattr(
L.AgentLauncher,
"_finalize_job",
lambda self, *a, **k: calls["finalize"].append(a),
)
monkeypatch.setattr(
L.AgentLauncher, "_post_usage_comments", lambda self, *a, **k: None
)
monkeypatch.setattr(L, "notify_agent_finished", lambda *a, **k: None)
monkeypatch.setattr(L, "get_worktree_path", lambda r, b: "/nonexistent/path")
# git status returns "no changes" so the commit/push branch is skipped.
class _R:
stdout = ""
stderr = ""
returncode = 0
monkeypatch.setattr(L.subprocess, "run", lambda *a, **k: _R())
rid = self._seed_run(agent)
L.AgentLauncher()._monitor_agent(
self._Proc(exit_code),
rid,
agent,
"orchestrator",
"feature/ORCH-109-x",
output_path=None,
log_fh=None,
job_id=job_id,
)
return calls
def test_tc11_killed_developer_run_does_not_advance(self, monkeypatch):
"""TC-11/AC-8: a developer run killed (-9) does not auto-advance the stage."""
calls = self._drive(monkeypatch, exit_code=-9, agent="developer")
assert calls["advance"] == [] # NO auto-advance on -9
assert len(calls["finalize"]) == 1 # routed to retry/fail finalizer instead
def test_tc11_killed_reviewer_run_does_not_advance(self, monkeypatch):
"""TC-11/AC-8: same guard for the reviewer role (review -> testing)."""
calls = self._drive(monkeypatch, exit_code=-9, agent="reviewer")
assert calls["advance"] == []
def test_tc11_clean_exit_advances(self, monkeypatch):
"""Positive control: a clean exit (0) DOES reach _try_advance_stage."""
calls = self._drive(monkeypatch, exit_code=0, agent="developer")
assert len(calls["advance"]) == 1
# --------------------------------------------------------------------------- #
# TC-12 — contracts & schema untouched (NFR-1 / NFR-3, AC-9)
# --------------------------------------------------------------------------- #
class TestContractsUnchanged:
"""ORCH-109 lives entirely outside the stage-machine / QG / schema layers."""
def test_tc12_stage_transitions_unchanged(self):
"""AC-9: no new edge / sink introduced."""
from src.stages import STAGE_TRANSITIONS
assert set(STAGE_TRANSITIONS) == {
"created",
"analysis",
"architecture",
"development",
"review",
"testing",
"deploy-staging",
"deploy",
"done",
"cancelled",
}
def test_tc12_agent_runs_model_effort_columns_preexist(self):
"""AC-9: model/effort are PRE-EXISTING columns; ORCH-109 adds no migration."""
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(agent_runs)").fetchall()]
conn.close()
assert "model" in cols
assert "effort" in cols
def test_tc12_qg_checks_registry_present(self):
"""AC-9: the QG registry is untouched (timeout/telemetry is not a gate)."""
from src.qg.checks import QG_CHECKS
assert "check_ci_green" in QG_CHECKS
assert "check_reviewer_verdict" in QG_CHECKS