Files
orchestrator/tests/test_launcher.py
claude-bot 81fc2df8a8 fix(launcher): runs log dir from settings, not hardcoded /app (CI fix)
test_spawn_stamps_resolved_effort упал в CI с PermissionError на '/app':
launcher._spawn хардкодил output_path='/app/data/runs/{run_id}.log' и
os.makedirs('/app/data/runs'). В контейнере /app есть, на CI-хосте
(act_runner hostexecutor) — нет, makedirs бросает -> красный CI.

Фикс корня (не только теста): базовый каталог per-run логов вынесен в
Settings.runs_dir (env ORCH_RUNS_DIR, дефолт '/app/data/runs' = прод 1:1).
Новый хелпер _run_log_path(run_id) — единый источник пути, использован в
_spawn + три прежних inline-строки логов/алертов. Тест monkeypatch-ит
settings.runs_dir на tmp_path -> окружение-независим (проверено прогоном
с принудительно недоступным /app). pytest tests/ -q: 1090 passed.

STAGE_TRANSITIONS/QG_CHECKS/схема БД не тронуты. Docs: README env-таблица,
CHANGELOG.

Refs: ORCH-087
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 10:06:17 +03:00

410 lines
18 KiB
Python

"""Tests for launcher critical functions and reviewer verdict parsing.
Covers the audit-2026-06-02 fixes:
- B-1: _write_task_file writes directly to /repos/<repo>/<task_file> (no docker),
and raises on write failure instead of failing silently.
- S-5: check_reviewer_verdict reads the machine-readable `verdict:` field from
the YAML frontmatter only (no fragile substring matching).
"""
import os
import signal
import tempfile
import pytest
# Override env before importing app modules (same convention as test_qg.py)
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_launcher.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
from src.agents.launcher import AgentLauncher
from src.qg.checks import check_reviewer_verdict
from src.config import settings
# ---------------------------------------------------------------------------
# B-1: _write_task_file
# ---------------------------------------------------------------------------
class TestWriteTaskFile:
"""B-1 fix preserved + ORCH-2/S-4: task file now lands in the per-branch worktree.
_write_task_file(repo, branch, task_file, content) writes to
<worktrees_dir>/<repo>/<safe-branch>/<task_file> with a plain open() (no docker).
"""
def _wt_dir(self, tmp_path, repo, branch):
from src.git_worktree import _safe
d = tmp_path / "_wt" / repo / _safe(branch)
d.mkdir(parents=True)
return d
def test_writes_to_worktree_path(self, tmp_path, monkeypatch):
"""Task file is written to the worktree path, content matches (B-1 + S-4)."""
monkeypatch.setattr("src.git_worktree.settings.worktrees_dir", str(tmp_path / "_wt"))
wt = self._wt_dir(tmp_path, "enduro-trails", "feature/ET-001-x")
launcher = AgentLauncher()
launcher._write_task_file("enduro-trails", "feature/ET-001-x", ".task-dev.md", "hello-content")
written = wt / ".task-dev.md"
assert written.is_file()
assert written.read_text() == "hello-content"
def test_does_not_use_docker(self, tmp_path, monkeypatch):
"""No subprocess/docker call: if subprocess.run were used it would error here."""
monkeypatch.setattr("src.git_worktree.settings.worktrees_dir", str(tmp_path / "_wt"))
self._wt_dir(tmp_path, "enduro-trails", "main")
called = {"run": False}
def _fail_run(*a, **k):
called["run"] = True
raise AssertionError("subprocess.run must not be called by _write_task_file")
monkeypatch.setattr("src.agents.launcher.subprocess.run", _fail_run)
launcher = AgentLauncher()
launcher._write_task_file("enduro-trails", "main", ".task.md", "x")
assert called["run"] is False
def test_raises_on_write_failure(self, tmp_path, monkeypatch):
"""If the target worktree dir does not exist, raise RuntimeError (no silent fail)."""
monkeypatch.setattr("src.git_worktree.settings.worktrees_dir", str(tmp_path / "_wt"))
# worktree dir intentionally NOT created -> open() raises OSError
launcher = AgentLauncher()
with pytest.raises(RuntimeError):
launcher._write_task_file("nonexistent-repo", "main", ".task.md", "x")
# ---------------------------------------------------------------------------
# S-5: check_reviewer_verdict (frontmatter-only)
# ---------------------------------------------------------------------------
@pytest.fixture
def review_repo(tmp_path, monkeypatch):
monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path))
wi_dir = tmp_path / "enduro-trails" / "docs" / "work-items" / "ET-001"
wi_dir.mkdir(parents=True)
return wi_dir
def _write_review(wi_dir, text):
(wi_dir / "12-review.md").write_text(text)
class TestCheckReviewerVerdict:
def test_approved_in_frontmatter(self, review_repo):
_write_review(review_repo, "---\ntype: review\nverdict: APPROVED\n---\n# Review\nbody\n")
passed, reason = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is True
assert "APPROVED" in reason
def test_request_changes_in_frontmatter(self, review_repo):
_write_review(review_repo, "---\ntype: review\nverdict: REQUEST_CHANGES\n---\n# Review\n")
passed, reason = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is False
assert "REQUEST_CHANGES" in reason
def test_lowercase_verdict_normalized(self, review_repo):
_write_review(review_repo, "---\nverdict: approved\n---\nbody\n")
passed, _ = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is True
def test_no_verdict_field_is_not_approved(self, review_repo):
# Frontmatter present but no verdict -> must NOT approve.
_write_review(review_repo, "---\ntype: review\nstatus: done\n---\nbody\n")
passed, reason = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is False
assert "verdict" in reason.lower()
def test_no_frontmatter_is_not_approved(self, review_repo):
# APPROVED appears only in body/table text -> must NOT cause false positive (S-5).
_write_review(review_repo, "# Review\n| Finding | Status |\n|---|---|\n| F-01 | APPROVED |\n")
passed, _ = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is False
def test_request_changes_in_body_does_not_block_approved_frontmatter(self, review_repo):
# Body mentions REQUEST_CHANGES in a table, but frontmatter verdict is APPROVED.
_write_review(
review_repo,
"---\nverdict: APPROVED\n---\n# Review\n"
"| Item | Old verdict |\n|---|---|\n| x | REQUEST_CHANGES |\n",
)
passed, reason = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is True
assert "APPROVED" in reason
def test_missing_file(self, review_repo):
passed, reason = check_reviewer_verdict("enduro-trails", "ET-999")
assert passed is False
assert "not found" in reason.lower()
# ---------------------------------------------------------------------------
# ORCH-7 (M-4): dead code removed
# ---------------------------------------------------------------------------
class TestDeadCodeRemoved:
"""M-4: _auto_merge_pr was never called (merge is the deployer's job) and is
removed. _ensure_pr (used by the auto-advance path) must stay."""
def test_auto_merge_pr_is_gone(self):
assert not hasattr(AgentLauncher, "_auto_merge_pr")
def test_ensure_pr_still_present(self):
assert hasattr(AgentLauncher, "_ensure_pr")
# ---------------------------------------------------------------------------
# ORCH-7 (M-2): configurable timeout + per-agent override
# ---------------------------------------------------------------------------
class TestResolveTimeout:
"""M-2: _resolve_timeout honours a per-agent JSON override, else the default."""
def test_default_when_no_override(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "")
assert AgentLauncher._resolve_timeout("developer") == 1800
assert AgentLauncher._resolve_timeout(None) == 1800
def test_override_for_specific_agent(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(
settings, "agent_timeout_overrides_json", '{"reviewer": 3600, "architect": 2700}'
)
assert AgentLauncher._resolve_timeout("reviewer") == 3600
assert AgentLauncher._resolve_timeout("architect") == 2700
# an agent not in the override map falls back to the default
assert AgentLauncher._resolve_timeout("developer") == 1800
def test_malformed_override_falls_back_to_default(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "{not-json")
# must not raise, must return the default
assert AgentLauncher._resolve_timeout("reviewer") == 1800
class TestWatchdogGracefulKill:
"""M-2: SIGTERM -> grace -> SIGKILL ordering, with graceful-exit short-circuit
and ProcessLookupError tolerance. The OS process is fully faked: we record the
signals sent and decide liveness from a script, so no real process is touched."""
def _patch_db(self, monkeypatch):
"""Stub get_db so _record_kill does not need a real DB."""
class _Conn:
def execute(self, *a, **k):
return self
def commit(self):
pass
def close(self):
pass
monkeypatch.setattr("src.agents.launcher.get_db", lambda: _Conn())
def test_sigterm_then_sigkill_after_grace(self, monkeypatch):
"""Process stays alive through the whole grace window -> SIGTERM then SIGKILL."""
self._patch_db(monkeypatch)
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 1)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
def fake_kill(pid, sig):
sent.append(sig)
# signal 0 (liveness probe) -> always alive; never raise
return None
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
launcher._watchdog(pid=4242, run_id=1, timeout=0, agent="developer")
assert signal.SIGTERM in sent
assert signal.SIGKILL in sent
# SIGTERM must come before SIGKILL
assert sent.index(signal.SIGTERM) < sent.index(signal.SIGKILL)
def test_graceful_exit_in_grace_skips_sigkill(self, monkeypatch):
"""Process dies during the grace window -> SIGKILL is NOT sent."""
self._patch_db(monkeypatch)
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 5)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
state = {"alive": True, "probes": 0}
def fake_kill(pid, sig):
if sig == 0:
state["probes"] += 1
# die on the 2nd liveness probe (within grace)
if state["probes"] >= 2:
raise ProcessLookupError
return None
sent.append(sig)
return None
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
launcher._watchdog(pid=4242, run_id=2, timeout=0, agent="developer")
assert signal.SIGTERM in sent
assert signal.SIGKILL not in sent
def test_already_dead_before_sigterm(self, monkeypatch):
"""Process already gone at SIGTERM -> ProcessLookupError tolerated, no SIGKILL,
and _record_kill is NOT called (the monitor's proc.wait owns the exit)."""
self._patch_db(monkeypatch)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
def fake_kill(pid, sig):
if sig == signal.SIGTERM:
raise ProcessLookupError
sent.append(sig)
return None
recorded = {"called": False}
monkeypatch.setattr(
AgentLauncher, "_record_kill",
staticmethod(lambda rid: recorded.__setitem__("called", True)),
)
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
# must not raise
launcher._watchdog(pid=4242, run_id=3, timeout=0, agent="developer")
assert signal.SIGKILL not in sent
assert recorded["called"] is False
# ---------------------------------------------------------------------------
# ORCH-061 / TC-06 + TC-07: "no changes to commit" on an action stage is EXPECTED,
# not under-delivery (FR-3 / AC-4). action_stage_no_changes_note is the PURE
# observability decision used by the post-run no-changes branch: it returns an
# explicit note for self-deploy action stages (deploy-staging/deploy) and None
# everywhere else. It NEVER signals a rollback — advancement is decided by the
# exit-code + gate verdict, never by a commit existing.
# ---------------------------------------------------------------------------
from src.agents.launcher import action_stage_no_changes_note # noqa: E402
class TestActionStageNoChangesNote:
def test_tc06_deploy_staging_self_deploy_returns_note(self):
"""TC-06 / AC-4: on deploy-staging for the self-hosting repo, an empty diff
yields an explicit "expected on action stage" note (no rollback signal)."""
note = action_stage_no_changes_note("deploy-staging", "orchestrator")
assert note is not None
assert "deploy-staging" in note
assert "expected on action stage" in note
def test_tc06_deploy_self_deploy_returns_note(self):
"""The `deploy` stage is equally an action stage for self-deploy."""
note = action_stage_no_changes_note("deploy", "orchestrator")
assert note is not None
assert "deploy: no code changes" in note
def test_tc07_development_stage_returns_none(self):
"""TC-07 / AC-4 regression-guard: on a CODE stage (development) the new
action-stage allowance does NOT apply — no note, behaviour unchanged."""
assert action_stage_no_changes_note("development", "orchestrator") is None
def test_tc06_non_self_repo_returns_none(self):
"""Conditionality (FR-5): the action-stage allowance is self-deploy only;
a non-self repo on deploy-staging gets no special note."""
assert action_stage_no_changes_note("deploy-staging", "enduro-trails") is None
def test_review_stage_returns_none(self):
"""Any non-action stage -> None (defensive: only deploy stages qualify)."""
assert action_stage_no_changes_note("review", "orchestrator") is None
def test_never_raises_on_bad_input(self):
"""never-raise: odd inputs (None stage / None repo) degrade to None."""
assert action_stage_no_changes_note(None, None) is None
# ---------------------------------------------------------------------------
# ORCH-087 (BR-EFF): agent_runs.effort migration + launch-time stamp
# ---------------------------------------------------------------------------
class TestEffortStamp:
"""TC-09/TC-10: the effort column is idempotent and stamped at launch."""
def _fresh_db(self, monkeypatch):
import src.db as db_module
if os.path.exists(_test_db):
os.unlink(_test_db)
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
from src.db import init_db
init_db()
def test_effort_migration_idempotent(self, monkeypatch):
"""TC-09/AC-E.1: _ensure_column twice -> no error; column present."""
self._fresh_db(monkeypatch)
from src.db import init_db, get_db
init_db() # second call must be a no-op
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(agent_runs)").fetchall()]
conn.close()
assert "effort" in cols
def test_spawn_stamps_resolved_effort(self, tmp_path, monkeypatch):
"""TC-10/AC-E.1: _spawn writes the REAL resolved --effort to agent_runs.
developer resolves to xhigh (ORCH-081 floor); the stamp must match that.
All OS/process side-effects are faked so nothing is actually launched.
"""
self._fresh_db(monkeypatch)
from src.db import get_db
import src.agents.launcher as L
# A real repo dir so the isdir() guard passes; worktree is faked.
repo = "orchestrator"
(tmp_path / repo).mkdir()
monkeypatch.setattr(L.settings, "repos_dir", str(tmp_path), raising=False)
# ORCH-087: per-run log dir must be writable on a non-container host (CI runs
# as a plain user where '/app' is denied). Point it at tmp_path so _spawn's
# makedirs/open never touch the hardcoded '/app/data/runs'.
monkeypatch.setattr(L.settings, "runs_dir", str(tmp_path / "runs"), raising=False)
monkeypatch.setattr(L, "ensure_worktree", lambda r, b: str(tmp_path / repo))
monkeypatch.setattr("src.projects.get_project_by_repo", lambda r: None)
# No --effort env overrides -> developer falls to its xhigh floor.
monkeypatch.setattr(L.settings, "agent_effort_developer", "", raising=False)
monkeypatch.setattr(L.settings, "agent_effort_default", "", raising=False)
# Fake the process + threads so nothing real runs.
class _Proc:
pid = 4242
monkeypatch.setattr(L.subprocess, "Popen", lambda *a, **k: _Proc())
class _T:
def __init__(self, *a, **k):
pass
def start(self):
pass
monkeypatch.setattr(L.threading, "Thread", _T)
monkeypatch.setattr(L, "notify_agent_started", lambda *a, **k: None)
# Seed a task row so _spawn can resolve the branch.
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES (?,?,?,?,?,?)",
("p1", "ORCH-087", repo, "feature/ORCH-087-x", "development", "t"),
)
tid = cur.lastrowid
conn.commit()
conn.close()
launcher = L.AgentLauncher()
run_id = launcher._spawn("developer", repo, task_content=None, task_id=tid)
conn = get_db()
row = conn.execute(
"SELECT effort FROM agent_runs WHERE id=?", (run_id,)
).fetchone()
conn.close()
assert row[0] == "xhigh"