Files
orchestrator/tests/test_launcher.py
Dev Agent c167c6930d test(launcher): watchdog graceful kill ordering + timeout config + M-4 removal
Cover M-2: SIGTERM-before-SIGKILL ordering, graceful exit within grace skips
SIGKILL, ProcessLookupError before SIGTERM is tolerated (no _record_kill), and
_resolve_timeout per-agent override / default / malformed-JSON fallback.
Cover M-4: _auto_merge_pr removed, _ensure_pr retained.
2026-06-03 08:28:09 +03:00

281 lines
12 KiB
Python

"""Tests for launcher critical functions and reviewer verdict parsing.
Covers the audit-2026-06-02 fixes:
- B-1: _write_task_file writes directly to /repos/<repo>/<task_file> (no docker),
and raises on write failure instead of failing silently.
- S-5: check_reviewer_verdict reads the machine-readable `verdict:` field from
the YAML frontmatter only (no fragile substring matching).
"""
import os
import signal
import tempfile
import pytest
# Override env before importing app modules (same convention as test_qg.py)
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_launcher.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
from src.agents.launcher import AgentLauncher
from src.qg.checks import check_reviewer_verdict
from src.config import settings
# ---------------------------------------------------------------------------
# B-1: _write_task_file
# ---------------------------------------------------------------------------
class TestWriteTaskFile:
"""B-1 fix preserved + ORCH-2/S-4: task file now lands in the per-branch worktree.
_write_task_file(repo, branch, task_file, content) writes to
<worktrees_dir>/<repo>/<safe-branch>/<task_file> with a plain open() (no docker).
"""
def _wt_dir(self, tmp_path, repo, branch):
from src.git_worktree import _safe
d = tmp_path / "_wt" / repo / _safe(branch)
d.mkdir(parents=True)
return d
def test_writes_to_worktree_path(self, tmp_path, monkeypatch):
"""Task file is written to the worktree path, content matches (B-1 + S-4)."""
monkeypatch.setattr("src.git_worktree.settings.worktrees_dir", str(tmp_path / "_wt"))
wt = self._wt_dir(tmp_path, "enduro-trails", "feature/ET-001-x")
launcher = AgentLauncher()
launcher._write_task_file("enduro-trails", "feature/ET-001-x", ".task-dev.md", "hello-content")
written = wt / ".task-dev.md"
assert written.is_file()
assert written.read_text() == "hello-content"
def test_does_not_use_docker(self, tmp_path, monkeypatch):
"""No subprocess/docker call: if subprocess.run were used it would error here."""
monkeypatch.setattr("src.git_worktree.settings.worktrees_dir", str(tmp_path / "_wt"))
self._wt_dir(tmp_path, "enduro-trails", "main")
called = {"run": False}
def _fail_run(*a, **k):
called["run"] = True
raise AssertionError("subprocess.run must not be called by _write_task_file")
monkeypatch.setattr("src.agents.launcher.subprocess.run", _fail_run)
launcher = AgentLauncher()
launcher._write_task_file("enduro-trails", "main", ".task.md", "x")
assert called["run"] is False
def test_raises_on_write_failure(self, tmp_path, monkeypatch):
"""If the target worktree dir does not exist, raise RuntimeError (no silent fail)."""
monkeypatch.setattr("src.git_worktree.settings.worktrees_dir", str(tmp_path / "_wt"))
# worktree dir intentionally NOT created -> open() raises OSError
launcher = AgentLauncher()
with pytest.raises(RuntimeError):
launcher._write_task_file("nonexistent-repo", "main", ".task.md", "x")
# ---------------------------------------------------------------------------
# S-5: check_reviewer_verdict (frontmatter-only)
# ---------------------------------------------------------------------------
@pytest.fixture
def review_repo(tmp_path, monkeypatch):
monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path))
wi_dir = tmp_path / "enduro-trails" / "docs" / "work-items" / "ET-001"
wi_dir.mkdir(parents=True)
return wi_dir
def _write_review(wi_dir, text):
(wi_dir / "12-review.md").write_text(text)
class TestCheckReviewerVerdict:
def test_approved_in_frontmatter(self, review_repo):
_write_review(review_repo, "---\ntype: review\nverdict: APPROVED\n---\n# Review\nbody\n")
passed, reason = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is True
assert "APPROVED" in reason
def test_request_changes_in_frontmatter(self, review_repo):
_write_review(review_repo, "---\ntype: review\nverdict: REQUEST_CHANGES\n---\n# Review\n")
passed, reason = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is False
assert "REQUEST_CHANGES" in reason
def test_lowercase_verdict_normalized(self, review_repo):
_write_review(review_repo, "---\nverdict: approved\n---\nbody\n")
passed, _ = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is True
def test_no_verdict_field_is_not_approved(self, review_repo):
# Frontmatter present but no verdict -> must NOT approve.
_write_review(review_repo, "---\ntype: review\nstatus: done\n---\nbody\n")
passed, reason = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is False
assert "verdict" in reason.lower()
def test_no_frontmatter_is_not_approved(self, review_repo):
# APPROVED appears only in body/table text -> must NOT cause false positive (S-5).
_write_review(review_repo, "# Review\n| Finding | Status |\n|---|---|\n| F-01 | APPROVED |\n")
passed, _ = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is False
def test_request_changes_in_body_does_not_block_approved_frontmatter(self, review_repo):
# Body mentions REQUEST_CHANGES in a table, but frontmatter verdict is APPROVED.
_write_review(
review_repo,
"---\nverdict: APPROVED\n---\n# Review\n"
"| Item | Old verdict |\n|---|---|\n| x | REQUEST_CHANGES |\n",
)
passed, reason = check_reviewer_verdict("enduro-trails", "ET-001")
assert passed is True
assert "APPROVED" in reason
def test_missing_file(self, review_repo):
passed, reason = check_reviewer_verdict("enduro-trails", "ET-999")
assert passed is False
assert "not found" in reason.lower()
# ---------------------------------------------------------------------------
# ORCH-7 (M-4): dead code removed
# ---------------------------------------------------------------------------
class TestDeadCodeRemoved:
"""M-4: _auto_merge_pr was never called (merge is the deployer's job) and is
removed. _ensure_pr (used by the auto-advance path) must stay."""
def test_auto_merge_pr_is_gone(self):
assert not hasattr(AgentLauncher, "_auto_merge_pr")
def test_ensure_pr_still_present(self):
assert hasattr(AgentLauncher, "_ensure_pr")
# ---------------------------------------------------------------------------
# ORCH-7 (M-2): configurable timeout + per-agent override
# ---------------------------------------------------------------------------
class TestResolveTimeout:
"""M-2: _resolve_timeout honours a per-agent JSON override, else the default."""
def test_default_when_no_override(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "")
assert AgentLauncher._resolve_timeout("developer") == 1800
assert AgentLauncher._resolve_timeout(None) == 1800
def test_override_for_specific_agent(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(
settings, "agent_timeout_overrides_json", '{"reviewer": 3600, "architect": 2700}'
)
assert AgentLauncher._resolve_timeout("reviewer") == 3600
assert AgentLauncher._resolve_timeout("architect") == 2700
# an agent not in the override map falls back to the default
assert AgentLauncher._resolve_timeout("developer") == 1800
def test_malformed_override_falls_back_to_default(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "{not-json")
# must not raise, must return the default
assert AgentLauncher._resolve_timeout("reviewer") == 1800
class TestWatchdogGracefulKill:
"""M-2: SIGTERM -> grace -> SIGKILL ordering, with graceful-exit short-circuit
and ProcessLookupError tolerance. The OS process is fully faked: we record the
signals sent and decide liveness from a script, so no real process is touched."""
def _patch_db(self, monkeypatch):
"""Stub get_db so _record_kill does not need a real DB."""
class _Conn:
def execute(self, *a, **k):
return self
def commit(self):
pass
def close(self):
pass
monkeypatch.setattr("src.agents.launcher.get_db", lambda: _Conn())
def test_sigterm_then_sigkill_after_grace(self, monkeypatch):
"""Process stays alive through the whole grace window -> SIGTERM then SIGKILL."""
self._patch_db(monkeypatch)
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 1)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
def fake_kill(pid, sig):
sent.append(sig)
# signal 0 (liveness probe) -> always alive; never raise
return None
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
launcher._watchdog(pid=4242, run_id=1, timeout=0, agent="developer")
assert signal.SIGTERM in sent
assert signal.SIGKILL in sent
# SIGTERM must come before SIGKILL
assert sent.index(signal.SIGTERM) < sent.index(signal.SIGKILL)
def test_graceful_exit_in_grace_skips_sigkill(self, monkeypatch):
"""Process dies during the grace window -> SIGKILL is NOT sent."""
self._patch_db(monkeypatch)
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 5)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
state = {"alive": True, "probes": 0}
def fake_kill(pid, sig):
if sig == 0:
state["probes"] += 1
# die on the 2nd liveness probe (within grace)
if state["probes"] >= 2:
raise ProcessLookupError
return None
sent.append(sig)
return None
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
launcher._watchdog(pid=4242, run_id=2, timeout=0, agent="developer")
assert signal.SIGTERM in sent
assert signal.SIGKILL not in sent
def test_already_dead_before_sigterm(self, monkeypatch):
"""Process already gone at SIGTERM -> ProcessLookupError tolerated, no SIGKILL,
and _record_kill is NOT called (the monitor's proc.wait owns the exit)."""
self._patch_db(monkeypatch)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
def fake_kill(pid, sig):
if sig == signal.SIGTERM:
raise ProcessLookupError
sent.append(sig)
return None
recorded = {"called": False}
monkeypatch.setattr(
AgentLauncher, "_record_kill",
staticmethod(lambda rid: recorded.__setitem__("called", True)),
)
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
# must not raise
launcher._watchdog(pid=4242, run_id=3, timeout=0, agent="developer")
assert signal.SIGKILL not in sent
assert recorded["called"] is False