test(stage): cover unified stage_engine + launcher/plane delegation

18 tests: happy-path advance per stage with correct agent (ORCH-4 fix),
QG-fail no-advance, reviewer REQUEST_CHANGES rollback+retry/alert, tester FAIL
rollback+retry/block, architect conflict rollback to analysis, analyst
approved-flow no-advance, and launcher+plane both delegating to the engine.
This commit is contained in:
Dev Agent
2026-06-03 08:56:25 +03:00
parent 51401a3ba9
commit 6abdc220d2

395
tests/test_stage_engine.py Normal file
View File

@@ -0,0 +1,395 @@
"""ORCH-4 / M-3: tests for the unified stage engine (src/stage_engine.advance_stage).
These verify the MERGED behavior of what used to be two diverged
_try_advance_stage implementations (launcher sync + plane async):
* happy-path advance for every stage launches the CORRECT agent
(the ORCH-4 fix: agent = get_agent_for_stage(current_stage), NOT next_stage);
* a QG failure does not advance;
* reviewer REQUEST_CHANGES -> rollback to development + enqueue developer;
* developer retries > 3 -> telegram alert, no further enqueue;
* tester FAIL -> rollback to development + enqueue developer;
* architect conflict (10-conflict.md) -> rollback to analysis + enqueue analyst;
* launcher AND plane both delegate to the engine.
Network/Plane/Telegram side effects are mocked at the src.stage_engine level so
the engine runs against a real isolated sqlite DB.
"""
import os
import tempfile
import pytest
# Isolated test DB (same convention as the other suites).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_stage_engine.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock, patch # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
from src.stages import get_agent_for_stage # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch):
"""Fresh isolated DB per test."""
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
"""Mock all Plane/Telegram/notification side effects in the engine.
Everything imported into src.stage_engine that touches the network or sends
a message becomes a no-op MagicMock so tests are deterministic and offline.
"""
for name in (
"notify_stage_change",
"notify_qg_failure",
"notify_approve_requested",
"send_telegram",
"plane_notify_stage",
"plane_notify_qg",
"plane_add_comment",
"set_issue_in_review",
"set_issue_needs_input",
"set_issue_in_progress",
"set_issue_blocked",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo="enduro-trails", branch="feature/ET-001-x", wi="ET-001"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _jobs():
conn = get_db()
rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall()
conn.close()
return [dict(r) for r in rows]
def _add_developer_runs(task_id, n):
conn = get_db()
for _ in range(n):
conn.execute(
"INSERT INTO agent_runs (task_id, agent) VALUES (?, 'developer')",
(task_id,),
)
conn.commit()
conn.close()
def _pass(*a, **k):
return (True, "ok")
def _fail(reason):
def _f(*a, **k):
return (False, reason)
return _f
# ---------------------------------------------------------------------------
# Happy path: each stage advances and launches the CORRECT agent (ORCH-4 fix)
# ---------------------------------------------------------------------------
class TestHappyPathAgentSelection:
"""The fixed agent-selection: when advancing FROM current_stage, the engine
must enqueue get_agent_for_stage(current_stage), NOT next_stage.
"""
@pytest.mark.parametrize(
"current_stage,expected_next,expected_agent",
[
("architecture", "development", "developer"),
("development", "review", "reviewer"),
("review", "testing", "tester"),
("testing", "deploy", "deployer"),
],
)
def test_advance_launches_current_stage_agent(
self, monkeypatch, current_stage, expected_next, expected_agent
):
# All QG checks pass for this happy-path suite.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task(current_stage)
res = advance_stage(
task_id, current_stage, "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None,
)
assert res.advanced is True
assert res.to_stage == expected_next
assert _stage(task_id) == expected_next
# The ORCH-4 fix: correct agent == get_agent_for_stage(current_stage).
assert expected_agent == get_agent_for_stage(current_stage)
assert res.enqueued_agent == expected_agent
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == expected_agent
def test_deploy_to_done_no_agent(self, monkeypatch):
"""deploy -> done advances but launches no agent (terminal-ish)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task("deploy")
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert res.advanced is True
assert _stage(task_id) == "done"
assert res.enqueued_agent is None
assert _jobs() == []
def test_done_is_terminal(self):
task_id = _make_task("done")
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert res.advanced is False
assert _stage(task_id) == "done"
# ---------------------------------------------------------------------------
# QG failure: do not advance
# ---------------------------------------------------------------------------
class TestQgFailureDoesNotAdvance:
def test_qg_fail_keeps_stage(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.qg_passed is False
assert _stage(task_id) == "architecture"
assert _jobs() == []
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
)
task_id = _make_task("development")
advance_stage(task_id, "development", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert stage_engine.notify_qg_failure.called
assert stage_engine.plane_notify_qg.called
def test_launcher_path_no_generic_qg_notification(self, monkeypatch):
"""finished_agent set -> NO generic QG notification (launcher parity)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
)
task_id = _make_task("architecture")
advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert not stage_engine.notify_qg_failure.called
# ---------------------------------------------------------------------------
# Reviewer REQUEST_CHANGES -> rollback to development + enqueue developer
# ---------------------------------------------------------------------------
class TestReviewerRequestChanges:
def test_rollback_and_enqueue_developer(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
)
task_id = _make_task("review")
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="reviewer")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "developer"
def test_retry_over_3_alerts_no_enqueue(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
)
task_id = _make_task("review")
_add_developer_runs(task_id, 3) # already at the max
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="reviewer")
assert res.rolled_back_to == "development"
assert res.alerted is True
assert stage_engine.send_telegram.called
# No new developer job enqueued past the retry cap.
assert _jobs() == []
# ---------------------------------------------------------------------------
# Tester FAIL -> rollback to development + enqueue developer
# ---------------------------------------------------------------------------
class TestTesterFail:
def test_rollback_and_enqueue_developer(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("2 tests failed")},
)
task_id = _make_task("testing")
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="tester")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "developer"
def test_retry_over_3_blocks_and_alerts(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("still failing")},
)
task_id = _make_task("testing")
_add_developer_runs(task_id, 3)
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="tester")
assert res.rolled_back_to == "development"
assert res.alerted is True
assert stage_engine.set_issue_blocked.called
assert _jobs() == []
# ---------------------------------------------------------------------------
# Architect conflict -> rollback to analysis + enqueue analyst
# ---------------------------------------------------------------------------
class TestArchitectConflict:
def test_conflict_rolls_back_to_analysis(self, monkeypatch, tmp_path):
# 10-conflict.md must exist in the worktree path the engine inspects.
wt = tmp_path / "wt"
conflict_dir = wt / "docs" / "work-items" / "ET-001"
conflict_dir.mkdir(parents=True)
(conflict_dir / "10-conflict.md").write_text("conflict with TRZ")
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("conflict")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.rolled_back_to == "analysis"
assert _stage(task_id) == "analysis"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "analyst"
def test_no_conflict_file_no_rollback(self, monkeypatch, tmp_path):
wt = tmp_path / "wt"
(wt / "docs").mkdir(parents=True)
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("incomplete")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.rolled_back_to is None
assert _stage(task_id) == "architecture"
assert _jobs() == []
# ---------------------------------------------------------------------------
# Analyst approved-flow (analysis gate): never auto-advances
# ---------------------------------------------------------------------------
class TestAnalysisApprovedFlow:
def test_artifacts_ready_requests_approval_no_advance(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_analysis_complete": _pass},
)
task_id = _make_task("analysis")
res = advance_stage(task_id, "analysis", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="analyst")
assert res.advanced is False
assert _stage(task_id) == "analysis"
assert stage_engine.set_issue_in_review.called
assert stage_engine.notify_approve_requested.called
assert _jobs() == []
# ---------------------------------------------------------------------------
# launcher + plane both delegate to the engine
# ---------------------------------------------------------------------------
class TestDelegation:
def test_launcher_calls_engine(self):
from src.agents.launcher import AgentLauncher
task_id = _make_task("development", branch="feature/ET-777-deleg")
with patch("src.stage_engine.advance_stage") as m:
AgentLauncher()._try_advance_stage(
run_id=1, agent="developer", repo="enduro-trails",
branch="feature/ET-777-deleg",
)
m.assert_called_once()
kwargs = m.call_args.kwargs
assert kwargs["task_id"] == task_id
assert kwargs["current_stage"] == "development"
assert kwargs["finished_agent"] == "developer"
def test_plane_calls_engine(self):
import asyncio
from src.webhooks import plane as plane_mod
with patch("src.stage_engine.advance_stage") as m:
asyncio.run(
plane_mod._try_advance_stage(
task_id=5, current_stage="analysis", repo="enduro-trails",
work_item_id="ET-001", branch="feature/ET-001-x",
)
)
m.assert_called_once()
# plane passes positional args; finished_agent (last positional) is None.
args = m.call_args.args
assert args[0] == 5
assert args[1] == "analysis"
assert args[-1] is None