"""ORCH-109: timeout budgets + launch-time model telemetry for developer/reviewer. Covers FR-1..FR-6 / AC-1..AC-10 through TC-01..TC-12 (04-test-plan.yaml). Fully deterministic: an isolated temp SQLite DB + synthetic agent_runs / jobs rows; no network, no Claude CLI subprocess. Settings are monkeypatched / overridden. Two production changes under test (ADR-001): * D1 — launcher._spawn stamps the resolved model into agent_runs.model in the SAME UPDATE as the effort stamp, so the model is present from launch and survives a timeout-kill / is visible in-flight. * D3/D4 — launcher._resolve_timeout grows a dedicated per-role budget level (developer 3600 / reviewer 3000) between the JSON escape-hatch and the global default; reaper_max_running_s raised 3600 -> 5400 in lockstep (ORCH-065). FR-2 (COALESCE preserve), FR-4/NFR-6 (kill / in-flight visibility) and FR-5 (anti-salvage) are STRUCTURAL guarantees already present in the code — pinned here as regression tests, not new branches. """ import os import sqlite3 import tempfile os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") _test_db = os.path.join(tempfile.gettempdir(), "test_orch109_timeout_model.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ.setdefault("ORCH_REPOS_DIR", tempfile.gettempdir()) import pytest # noqa: E402 import src.db as db_module # noqa: E402 from src.db import init_db, get_db, get_running_agents # noqa: E402 from src.config import settings, Settings # noqa: E402 from src.agents.launcher import AgentLauncher, resolve_agent_model # noqa: E402 from src import usage as U # noqa: E402 from src import notifications as N # noqa: E402 @pytest.fixture(autouse=True) def setup_db(monkeypatch): # get_db() reads settings.db_path live; pin it to our isolated DB. monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False) if os.path.exists(_test_db): os.unlink(_test_db) init_db() # render-only tests: never consult the live Plane overlay (no network). monkeypatch.setattr(N._get_settings(), "tracker_live_status", False, raising=False) yield if os.path.exists(_test_db): os.unlink(_test_db) # --------------------------------------------------------------------------- # # TC-01..TC-03 — _resolve_timeout dedicated-budget ladder (FR-3, AC-3 / AC-4) # --------------------------------------------------------------------------- # class TestResolveTimeoutLadder: """The priority ladder: overrides_json > dedicated role key > global default.""" def _pin(self, monkeypatch, *, dev=3600, rev=3000, default=1800, overrides=""): monkeypatch.setattr(settings, "agent_timeout_seconds", default) monkeypatch.setattr(settings, "agent_timeout_overrides_json", overrides) monkeypatch.setattr(settings, "agent_timeout_developer_s", dev) monkeypatch.setattr(settings, "agent_timeout_reviewer_s", rev) def test_tc01_developer_reviewer_raised(self, monkeypatch): """TC-01/AC-3: developer/reviewer resolve to their raised dedicated budget.""" self._pin(monkeypatch) assert AgentLauncher._resolve_timeout("developer") == 3600 assert AgentLauncher._resolve_timeout("reviewer") == 3000 def test_tc01_dedicated_keys_are_configurable(self, monkeypatch): """TC-01/AC-3: the budgets are config-driven, not hardcoded.""" self._pin(monkeypatch, dev=4200, rev=2400) assert AgentLauncher._resolve_timeout("developer") == 4200 assert AgentLauncher._resolve_timeout("reviewer") == 2400 def test_tc02_other_roles_use_global_default(self, monkeypatch): """TC-02/AC-3: roles without a dedicated key keep the global default (1800).""" self._pin(monkeypatch) for role in ("analyst", "architect", "tester", "deployer"): assert AgentLauncher._resolve_timeout(role) == 1800 # unknown role / None also fall through to the global default. assert AgentLauncher._resolve_timeout("unknown-role") == 1800 assert AgentLauncher._resolve_timeout(None) == 1800 def test_tc01_overrides_json_wins_over_dedicated(self, monkeypatch): """AC-3: the operator JSON escape-hatch stays HIGHEST priority for ANY role.""" self._pin(monkeypatch, overrides='{"developer": 1234, "reviewer": 999}') assert AgentLauncher._resolve_timeout("developer") == 1234 assert AgentLauncher._resolve_timeout("reviewer") == 999 def test_tc03_malformed_overrides_json_never_raises(self, monkeypatch): """TC-03/AC-4: malformed JSON is ignored; resolution still succeeds (never-break).""" self._pin(monkeypatch, overrides="{not-json") # malformed JSON ignored -> developer still resolves via its dedicated key. assert AgentLauncher._resolve_timeout("developer") == 3600 # a role without a dedicated key falls through to the global default. assert AgentLauncher._resolve_timeout("analyst") == 1800 @pytest.mark.parametrize("bad", [0, -5, "abc"]) def test_tc03_non_positive_dedicated_falls_back(self, monkeypatch, bad): """TC-03/AC-4: an absurd/non-positive/non-int dedicated value -> global default.""" self._pin(monkeypatch, dev=bad) # must NOT raise; falls back to agent_timeout_seconds + WARNING. assert AgentLauncher._resolve_timeout("developer") == 1800 # --------------------------------------------------------------------------- # # TC-04 / TC-05 — launch-time model stamp in _spawn (FR-1, AC-1 + NFR-2) # --------------------------------------------------------------------------- # class TestLaunchModelStamp: """_spawn writes the resolved model to agent_runs.model at launch (next to effort).""" def _seed_task(self, repo="orchestrator", branch="feature/ORCH-109-x", wid="ORCH-109"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) " "VALUES (?,?,?,?,?,?)", ("p1", wid, repo, branch, "development", "t"), ) tid = cur.lastrowid conn.commit() conn.close() return tid def _fake_spawn_env(self, tmp_path, monkeypatch, repo="orchestrator"): """Fake every OS/process side-effect so _spawn touches only the DB.""" import src.agents.launcher as L (tmp_path / repo).mkdir() monkeypatch.setattr(L.settings, "repos_dir", str(tmp_path), raising=False) monkeypatch.setattr(L.settings, "runs_dir", str(tmp_path / "runs"), raising=False) monkeypatch.setattr(L, "ensure_worktree", lambda r, b: str(tmp_path / repo)) monkeypatch.setattr("src.projects.get_project_by_repo", lambda r: None) class _Proc: pid = 4242 monkeypatch.setattr(L.subprocess, "Popen", lambda *a, **k: _Proc()) class _T: def __init__(self, *a, **k): pass def start(self): pass monkeypatch.setattr(L.threading, "Thread", _T) monkeypatch.setattr(L, "notify_agent_started", lambda *a, **k: None) return L def test_tc04_spawn_stamps_model_and_effort(self, tmp_path, monkeypatch): """TC-04/AC-1: after _spawn the run row carries the resolved model AND effort.""" L = self._fake_spawn_env(tmp_path, monkeypatch) # Deterministic resolve: developer -> claude-opus-4-8 (default) / xhigh (floor). monkeypatch.setattr(L.settings, "agent_model_developer", "", raising=False) monkeypatch.setattr(L.settings, "agent_model_default", "claude-opus-4-8", raising=False) monkeypatch.setattr(L.settings, "agent_effort_developer", "", raising=False) monkeypatch.setattr(L.settings, "agent_effort_default", "", raising=False) tid = self._seed_task() run_id = L.AgentLauncher()._spawn( "developer", "orchestrator", task_content=None, task_id=tid ) conn = get_db() row = conn.execute( "SELECT model, effort FROM agent_runs WHERE id=?", (run_id,) ).fetchone() conn.close() assert row["model"] == "claude-opus-4-8" assert row["effort"] == "xhigh" # The stamp matches the resolver — single source of truth. assert row["model"] == resolve_agent_model("developer", None) def test_tc05_stamp_failure_is_isolated(self, tmp_path, monkeypatch): """TC-05/NFR-2: a failing model/effort stamp does NOT propagate out of _spawn.""" L = self._fake_spawn_env(tmp_path, monkeypatch) real_get_db = db_module.get_db class _RaisingConn: """Delegates to a real conn but raises on the launch stamp UPDATE only.""" def __init__(self, real): self._real = real def execute(self, sql, *a, **k): if "SET model=?, effort=?" in sql: raise sqlite3.OperationalError("simulated stamp failure") return self._real.execute(sql, *a, **k) def commit(self): return self._real.commit() def close(self): return self._real.close() def __getattr__(self, name): return getattr(self._real, name) monkeypatch.setattr(L, "get_db", lambda: _RaisingConn(real_get_db())) tid = self._seed_task() # Must NOT raise even though the stamp UPDATE blows up. run_id = L.AgentLauncher()._spawn( "developer", "orchestrator", task_content=None, task_id=tid ) assert run_id is not None # The run row exists; model stayed NULL (stamp failed) — launch unharmed. conn = real_get_db() row = conn.execute( "SELECT id, model FROM agent_runs WHERE id=?", (run_id,) ).fetchone() conn.close() assert row is not None assert row["model"] is None # --------------------------------------------------------------------------- # # TC-06 / TC-07 — post-hoc enrich preserves / refines the launch stamp (FR-2) # --------------------------------------------------------------------------- # class TestRecordUsagePreservesStamp: """record_usage (model=COALESCE(?, model)) never clobbers a launch-stamped model.""" def _run_with_model(self, model="claude-opus-4-8", agent="developer"): conn = get_db() cur = conn.execute( "INSERT INTO agent_runs (task_id, agent, model) VALUES (?,?,?)", (1, agent, model), ) rid = cur.lastrowid conn.commit() conn.close() return rid def _model_of(self, rid): conn = get_db() row = conn.execute("SELECT model FROM agent_runs WHERE id=?", (rid,)).fetchone() conn.close() return row["model"] def test_tc06_record_usage_none_preserves_model(self): """TC-06/AC-2: usage=None (no final JSON, e.g. timeout) keeps the launch stamp.""" rid = self._run_with_model() U.record_usage(rid, None) # must not raise assert self._model_of(rid) == "claude-opus-4-8" def test_tc06_record_usage_model_none_preserves_model(self): """TC-06/AC-2: a usage JSON with model=None keeps the launch stamp (COALESCE).""" rid = self._run_with_model() U.record_usage(rid, {"input_tokens": 10, "output_tokens": 5, "model": None}) assert self._model_of(rid) == "claude-opus-4-8" def test_tc07_record_usage_nonempty_model_enriches_blank(self): """TC-07/AC-2: a non-empty model in the JSON sets a blank (CLI-default) stamp.""" rid = self._run_with_model(model=None) U.record_usage( rid, {"input_tokens": 1, "output_tokens": 1, "model": "claude-opus-4-8"} ) assert self._model_of(rid) == "claude-opus-4-8" def test_tc07_record_usage_refines_existing_model(self): """TC-07/AC-2: a fuller provider-prefixed id refines a bare launch stamp.""" rid = self._run_with_model(model="claude-opus-4-8") U.record_usage( rid, {"input_tokens": 1, "output_tokens": 1, "model": "tokenator/claude-opus-4-8"}, ) assert self._model_of(rid) == "tokenator/claude-opus-4-8" # --------------------------------------------------------------------------- # # TC-08 — reaper cross-invariant (NFR-4, AC-5) # --------------------------------------------------------------------------- # class TestReaperInvariant: """reaper_max_running_s MUST stay > max(resolved timeout) + grace (ORCH-065).""" def test_tc08_shipped_defaults_satisfy_invariant(self, monkeypatch): """TC-08/AC-5: the canonical shipped defaults hold the invariant.""" for name in ( "ORCH_AGENT_TIMEOUT_SECONDS", "ORCH_AGENT_KILL_GRACE_SECONDS", "ORCH_AGENT_TIMEOUT_OVERRIDES_JSON", "ORCH_AGENT_TIMEOUT_DEVELOPER_S", "ORCH_AGENT_TIMEOUT_REVIEWER_S", "ORCH_REAPER_MAX_RUNNING_S", ): monkeypatch.delenv(name, raising=False) s = Settings() max_budget = max( s.agent_timeout_seconds, s.agent_timeout_developer_s, s.agent_timeout_reviewer_s, ) assert s.reaper_max_running_s > max_budget + s.agent_kill_grace_seconds # Concrete shipped numbers (ADR-001 D4): 5400 > 3600 + 20 = 3620. assert (max_budget, s.agent_kill_grace_seconds, s.reaper_max_running_s) == ( 3600, 20, 5400, ) def test_tc08_resolved_max_is_developer(self, monkeypatch): """TC-08/AC-5: the max resolved per-role budget is the developer budget.""" monkeypatch.setattr(settings, "agent_timeout_seconds", 1800) monkeypatch.setattr(settings, "agent_timeout_overrides_json", "") monkeypatch.setattr(settings, "agent_timeout_developer_s", 3600) monkeypatch.setattr(settings, "agent_timeout_reviewer_s", 3000) monkeypatch.setattr(settings, "agent_kill_grace_seconds", 20) monkeypatch.setattr(settings, "reaper_max_running_s", 5400) roles = ["analyst", "architect", "developer", "reviewer", "tester", "deployer"] max_timeout = max(AgentLauncher._resolve_timeout(r) for r in roles) assert max_timeout == 3600 assert settings.reaper_max_running_s > max_timeout + settings.agent_kill_grace_seconds # --------------------------------------------------------------------------- # # TC-09 — tracker stage line shows model+effort on a timeout-killed run (FR-4) # --------------------------------------------------------------------------- # class TestTrackerTimeoutVisibility: """A -9 run still renders '· · ' because both are launch-stamped. The stage line takes its model/effort from the LAST run of the agent (stage_runs[-1] in _stage_line). When that last run is a timeout-kill (-9), its launch-stamped values are exactly what the operator sees — the whole point of stamping at launch. Without the stamp the -9 row would carry model=NULL and the line would drop the model suffix (the AC-6 FAIL condition). """ def _mk_task(self, stage="done", wid="ORCH-109"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) " "VALUES (?,?,?,?,?,?)", ("p1", wid, "orchestrator", "feature/ORCH-109-x", stage, "t"), ) tid = cur.lastrowid conn.commit() conn.close() return tid def _add_run(self, tid, *, exit_code, model, effort, started, finished): conn = get_db() conn.execute( "INSERT INTO agent_runs (task_id, agent, started_at, finished_at, " "exit_code, input_tokens, output_tokens, cost_usd, model, effort) " "VALUES (?,?,?,?,?,?,?,?,?,?)", (tid, "developer", started, finished, exit_code, 10, 5, 0.0, model, effort), ) conn.commit() conn.close() def test_tc09_killed_run_renders_model_effort(self): """TC-09/AC-6: the -9 (last) developer run's launch-stamped model+effort show.""" tid = self._mk_task(stage="done") # run 1: succeeded (opens the ✅ stage line) — DIFFERENT model so we can prove # the displayed value comes from the killed run, not this one. self._add_run( tid, exit_code=0, model="claude-sonnet-4-6", effort="high", started="2026-06-14 09:00:00", finished="2026-06-14 09:20:00", ) # run 2: timeout-killed (-9), the LAST run -> _stage_line reads its row. self._add_run( tid, exit_code=-9, model="tokenator/claude-opus-4-8", effort="xhigh", started="2026-06-14 09:25:00", finished="2026-06-14 09:55:00", ) text = N.render_task_tracker(tid) line = [ln for ln in text.splitlines() if ln.startswith("✅ Разработка")][0] # model NOT null: the killed run's launch-stamped opus-4-8 · xhigh is shown. assert line.rstrip().endswith("opus-4-8 · xhigh") assert "sonnet" not in line # the displayed value is the -9 run's, not run 1's def test_tc09_unstamped_killed_run_drops_model_suffix(self): """AC-6 FAIL-guard: a -9 run with model=NULL would omit the suffix (negative).""" tid = self._mk_task(stage="done") self._add_run( tid, exit_code=0, model="tokenator/claude-opus-4-8", effort="xhigh", started="2026-06-14 09:00:00", finished="2026-06-14 09:20:00", ) # killed run WITHOUT a launch stamp (the pre-ORCH-109 bug): model+effort NULL. self._add_run( tid, exit_code=-9, model=None, effort=None, started="2026-06-14 09:25:00", finished="2026-06-14 09:55:00", ) text = N.render_task_tracker(tid) line = [ln for ln in text.splitlines() if ln.startswith("✅ Разработка")][0] # No launch stamp -> the model/effort suffix is dropped (cost shown without model). assert "opus-4-8" not in line assert "xhigh" not in line # --------------------------------------------------------------------------- # # TC-10 — in-flight model visibility via get_running_agents (NFR-6) # --------------------------------------------------------------------------- # class TestInflightModelVisibility: """get_running_agents exposes the launch-stamped model for a RUNNING job.""" def test_tc10_running_job_exposes_model(self): """TC-10/AC-7: /metrics & /queue see the model before the run finishes.""" conn = get_db() cur = conn.execute( "INSERT INTO agent_runs (task_id, agent, model, effort) VALUES (?,?,?,?)", (1, "developer", "claude-opus-4-8", "xhigh"), ) rid = cur.lastrowid conn.execute( "INSERT INTO jobs (agent, repo, status, run_id, started_at) " "VALUES (?,?,?,?,datetime('now'))", ("developer", "orchestrator", "running", rid), ) conn.commit() conn.close() rows = get_running_agents() assert len(rows) == 1 assert rows[0]["model"] == "claude-opus-4-8" # non-null in-flight assert rows[0]["effort"] == "xhigh" # --------------------------------------------------------------------------- # # TC-11 — anti-salvage: a timeout-killed run does NOT advance the stage (FR-5) # --------------------------------------------------------------------------- # class TestAntiSalvage: """Advancement is gated by `if exit_code == 0`; a -9 run is routed to retry/fail.""" class _Proc: def __init__(self, code): self._code = code def wait(self): return self._code def _seed_run(self, agent="developer"): conn = get_db() cur = conn.execute( "INSERT INTO agent_runs (task_id, agent) VALUES (?,?)", (1, agent) ) rid = cur.lastrowid conn.commit() conn.close() return rid def _drive(self, monkeypatch, exit_code, agent="developer", job_id=7): import src.agents.launcher as L calls = {"advance": [], "finalize": []} monkeypatch.setattr( L.AgentLauncher, "_try_advance_stage", lambda self, *a, **k: calls["advance"].append(a), ) monkeypatch.setattr( L.AgentLauncher, "_finalize_job", lambda self, *a, **k: calls["finalize"].append(a), ) monkeypatch.setattr( L.AgentLauncher, "_post_usage_comments", lambda self, *a, **k: None ) monkeypatch.setattr(L, "notify_agent_finished", lambda *a, **k: None) monkeypatch.setattr(L, "get_worktree_path", lambda r, b: "/nonexistent/path") # git status returns "no changes" so the commit/push branch is skipped. class _R: stdout = "" stderr = "" returncode = 0 monkeypatch.setattr(L.subprocess, "run", lambda *a, **k: _R()) rid = self._seed_run(agent) L.AgentLauncher()._monitor_agent( self._Proc(exit_code), rid, agent, "orchestrator", "feature/ORCH-109-x", output_path=None, log_fh=None, job_id=job_id, ) return calls def test_tc11_killed_developer_run_does_not_advance(self, monkeypatch): """TC-11/AC-8: a developer run killed (-9) does not auto-advance the stage.""" calls = self._drive(monkeypatch, exit_code=-9, agent="developer") assert calls["advance"] == [] # NO auto-advance on -9 assert len(calls["finalize"]) == 1 # routed to retry/fail finalizer instead def test_tc11_killed_reviewer_run_does_not_advance(self, monkeypatch): """TC-11/AC-8: same guard for the reviewer role (review -> testing).""" calls = self._drive(monkeypatch, exit_code=-9, agent="reviewer") assert calls["advance"] == [] def test_tc11_clean_exit_advances(self, monkeypatch): """Positive control: a clean exit (0) DOES reach _try_advance_stage.""" calls = self._drive(monkeypatch, exit_code=0, agent="developer") assert len(calls["advance"]) == 1 # --------------------------------------------------------------------------- # # TC-12 — contracts & schema untouched (NFR-1 / NFR-3, AC-9) # --------------------------------------------------------------------------- # class TestContractsUnchanged: """ORCH-109 lives entirely outside the stage-machine / QG / schema layers.""" def test_tc12_stage_transitions_unchanged(self): """AC-9: no new edge / sink introduced.""" from src.stages import STAGE_TRANSITIONS assert set(STAGE_TRANSITIONS) == { "created", "analysis", "architecture", "development", "review", "testing", "deploy-staging", "deploy", "done", "cancelled", } def test_tc12_agent_runs_model_effort_columns_preexist(self): """AC-9: model/effort are PRE-EXISTING columns; ORCH-109 adds no migration.""" conn = get_db() cols = [r[1] for r in conn.execute("PRAGMA table_info(agent_runs)").fetchall()] conn.close() assert "model" in cols assert "effort" in cols def test_tc12_qg_checks_registry_present(self): """AC-9: the QG registry is untouched (timeout/telemetry is not a gate).""" from src.qg.checks import QG_CHECKS assert "check_ci_green" in QG_CHECKS assert "check_reviewer_verdict" in QG_CHECKS