From 6abdc220d2fab214d26295a50f6e694acc7025a6 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 08:56:25 +0300 Subject: [PATCH] test(stage): cover unified stage_engine + launcher/plane delegation 18 tests: happy-path advance per stage with correct agent (ORCH-4 fix), QG-fail no-advance, reviewer REQUEST_CHANGES rollback+retry/alert, tester FAIL rollback+retry/block, architect conflict rollback to analysis, analyst approved-flow no-advance, and launcher+plane both delegating to the engine. --- tests/test_stage_engine.py | 395 +++++++++++++++++++++++++++++++++++++ 1 file changed, 395 insertions(+) create mode 100644 tests/test_stage_engine.py diff --git a/tests/test_stage_engine.py b/tests/test_stage_engine.py new file mode 100644 index 0000000..47f7965 --- /dev/null +++ b/tests/test_stage_engine.py @@ -0,0 +1,395 @@ +"""ORCH-4 / M-3: tests for the unified stage engine (src/stage_engine.advance_stage). + +These verify the MERGED behavior of what used to be two diverged +_try_advance_stage implementations (launcher sync + plane async): + + * happy-path advance for every stage launches the CORRECT agent + (the ORCH-4 fix: agent = get_agent_for_stage(current_stage), NOT next_stage); + * a QG failure does not advance; + * reviewer REQUEST_CHANGES -> rollback to development + enqueue developer; + * developer retries > 3 -> telegram alert, no further enqueue; + * tester FAIL -> rollback to development + enqueue developer; + * architect conflict (10-conflict.md) -> rollback to analysis + enqueue analyst; + * launcher AND plane both delegate to the engine. + +Network/Plane/Telegram side effects are mocked at the src.stage_engine level so +the engine runs against a real isolated sqlite DB. +""" + +import os +import tempfile + +import pytest + +# Isolated test DB (same convention as the other suites). +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_stage_engine.db") +os.environ["ORCH_DB_PATH"] = _test_db +os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +from unittest.mock import MagicMock, patch # noqa: E402 + +import src.db as _db # noqa: E402 +from src.db import init_db, get_db # noqa: E402 +from src import stage_engine # noqa: E402 +from src.stage_engine import advance_stage # noqa: E402 +from src.stages import get_agent_for_stage # noqa: E402 + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- +@pytest.fixture(autouse=True) +def fresh_db(monkeypatch): + """Fresh isolated DB per test.""" + monkeypatch.setattr(_db.settings, "db_path", _test_db) + if os.path.exists(_test_db): + os.unlink(_test_db) + init_db() + yield + + +@pytest.fixture(autouse=True) +def silence_side_effects(monkeypatch): + """Mock all Plane/Telegram/notification side effects in the engine. + + Everything imported into src.stage_engine that touches the network or sends + a message becomes a no-op MagicMock so tests are deterministic and offline. + """ + for name in ( + "notify_stage_change", + "notify_qg_failure", + "notify_approve_requested", + "send_telegram", + "plane_notify_stage", + "plane_notify_qg", + "plane_add_comment", + "set_issue_in_review", + "set_issue_needs_input", + "set_issue_in_progress", + "set_issue_blocked", + ): + monkeypatch.setattr(stage_engine, name, MagicMock()) + + +def _make_task(stage, repo="enduro-trails", branch="feature/ET-001-x", wi="ET-001"): + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " + "VALUES (?, ?, ?, ?, ?)", + (f"plane-{wi}", wi, repo, branch, stage), + ) + task_id = cur.lastrowid + conn.commit() + conn.close() + return task_id + + +def _stage(task_id): + conn = get_db() + row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone() + conn.close() + return row[0] + + +def _jobs(): + conn = get_db() + rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall() + conn.close() + return [dict(r) for r in rows] + + +def _add_developer_runs(task_id, n): + conn = get_db() + for _ in range(n): + conn.execute( + "INSERT INTO agent_runs (task_id, agent) VALUES (?, 'developer')", + (task_id,), + ) + conn.commit() + conn.close() + + +def _pass(*a, **k): + return (True, "ok") + + +def _fail(reason): + def _f(*a, **k): + return (False, reason) + return _f + + +# --------------------------------------------------------------------------- +# Happy path: each stage advances and launches the CORRECT agent (ORCH-4 fix) +# --------------------------------------------------------------------------- +class TestHappyPathAgentSelection: + """The fixed agent-selection: when advancing FROM current_stage, the engine + must enqueue get_agent_for_stage(current_stage), NOT next_stage. + """ + + @pytest.mark.parametrize( + "current_stage,expected_next,expected_agent", + [ + ("architecture", "development", "developer"), + ("development", "review", "reviewer"), + ("review", "testing", "tester"), + ("testing", "deploy", "deployer"), + ], + ) + def test_advance_launches_current_stage_agent( + self, monkeypatch, current_stage, expected_next, expected_agent + ): + # All QG checks pass for this happy-path suite. + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {k: _pass for k in stage_engine.QG_CHECKS}, + ) + task_id = _make_task(current_stage) + + res = advance_stage( + task_id, current_stage, "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent=None, + ) + + assert res.advanced is True + assert res.to_stage == expected_next + assert _stage(task_id) == expected_next + # The ORCH-4 fix: correct agent == get_agent_for_stage(current_stage). + assert expected_agent == get_agent_for_stage(current_stage) + assert res.enqueued_agent == expected_agent + jobs = _jobs() + assert len(jobs) == 1 + assert jobs[0]["agent"] == expected_agent + + def test_deploy_to_done_no_agent(self, monkeypatch): + """deploy -> done advances but launches no agent (terminal-ish).""" + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {k: _pass for k in stage_engine.QG_CHECKS}, + ) + task_id = _make_task("deploy") + res = advance_stage(task_id, "deploy", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent=None) + assert res.advanced is True + assert _stage(task_id) == "done" + assert res.enqueued_agent is None + assert _jobs() == [] + + def test_done_is_terminal(self): + task_id = _make_task("done") + res = advance_stage(task_id, "done", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent=None) + assert res.advanced is False + assert _stage(task_id) == "done" + + +# --------------------------------------------------------------------------- +# QG failure: do not advance +# --------------------------------------------------------------------------- +class TestQgFailureDoesNotAdvance: + def test_qg_fail_keeps_stage(self, monkeypatch): + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")}, + ) + task_id = _make_task("architecture") + res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent="architect") + assert res.advanced is False + assert res.qg_passed is False + assert _stage(task_id) == "architecture" + assert _jobs() == [] + + def test_webhook_path_emits_qg_failure_notification(self, monkeypatch): + """finished_agent=None -> generic QG-failure notification fires (plane parity).""" + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")}, + ) + task_id = _make_task("development") + advance_stage(task_id, "development", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent=None) + assert stage_engine.notify_qg_failure.called + assert stage_engine.plane_notify_qg.called + + def test_launcher_path_no_generic_qg_notification(self, monkeypatch): + """finished_agent set -> NO generic QG notification (launcher parity).""" + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")}, + ) + task_id = _make_task("architecture") + advance_stage(task_id, "architecture", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent="architect") + assert not stage_engine.notify_qg_failure.called + + +# --------------------------------------------------------------------------- +# Reviewer REQUEST_CHANGES -> rollback to development + enqueue developer +# --------------------------------------------------------------------------- +class TestReviewerRequestChanges: + def test_rollback_and_enqueue_developer(self, monkeypatch): + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {**stage_engine.QG_CHECKS, + "check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")}, + ) + task_id = _make_task("review") + res = advance_stage(task_id, "review", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent="reviewer") + assert res.advanced is False + assert res.rolled_back_to == "development" + assert _stage(task_id) == "development" + jobs = _jobs() + assert len(jobs) == 1 + assert jobs[0]["agent"] == "developer" + + def test_retry_over_3_alerts_no_enqueue(self, monkeypatch): + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {**stage_engine.QG_CHECKS, + "check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")}, + ) + task_id = _make_task("review") + _add_developer_runs(task_id, 3) # already at the max + res = advance_stage(task_id, "review", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent="reviewer") + assert res.rolled_back_to == "development" + assert res.alerted is True + assert stage_engine.send_telegram.called + # No new developer job enqueued past the retry cap. + assert _jobs() == [] + + +# --------------------------------------------------------------------------- +# Tester FAIL -> rollback to development + enqueue developer +# --------------------------------------------------------------------------- +class TestTesterFail: + def test_rollback_and_enqueue_developer(self, monkeypatch): + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {**stage_engine.QG_CHECKS, "check_tests_passed": _fail("2 tests failed")}, + ) + task_id = _make_task("testing") + res = advance_stage(task_id, "testing", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent="tester") + assert res.advanced is False + assert res.rolled_back_to == "development" + assert _stage(task_id) == "development" + jobs = _jobs() + assert len(jobs) == 1 + assert jobs[0]["agent"] == "developer" + + def test_retry_over_3_blocks_and_alerts(self, monkeypatch): + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {**stage_engine.QG_CHECKS, "check_tests_passed": _fail("still failing")}, + ) + task_id = _make_task("testing") + _add_developer_runs(task_id, 3) + res = advance_stage(task_id, "testing", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent="tester") + assert res.rolled_back_to == "development" + assert res.alerted is True + assert stage_engine.set_issue_blocked.called + assert _jobs() == [] + + +# --------------------------------------------------------------------------- +# Architect conflict -> rollback to analysis + enqueue analyst +# --------------------------------------------------------------------------- +class TestArchitectConflict: + def test_conflict_rolls_back_to_analysis(self, monkeypatch, tmp_path): + # 10-conflict.md must exist in the worktree path the engine inspects. + wt = tmp_path / "wt" + conflict_dir = wt / "docs" / "work-items" / "ET-001" + conflict_dir.mkdir(parents=True) + (conflict_dir / "10-conflict.md").write_text("conflict with TRZ") + + monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt)) + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {**stage_engine.QG_CHECKS, "check_architecture_done": _fail("conflict")}, + ) + task_id = _make_task("architecture") + res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent="architect") + assert res.advanced is False + assert res.rolled_back_to == "analysis" + assert _stage(task_id) == "analysis" + jobs = _jobs() + assert len(jobs) == 1 + assert jobs[0]["agent"] == "analyst" + + def test_no_conflict_file_no_rollback(self, monkeypatch, tmp_path): + wt = tmp_path / "wt" + (wt / "docs").mkdir(parents=True) + monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt)) + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {**stage_engine.QG_CHECKS, "check_architecture_done": _fail("incomplete")}, + ) + task_id = _make_task("architecture") + res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent="architect") + assert res.advanced is False + assert res.rolled_back_to is None + assert _stage(task_id) == "architecture" + assert _jobs() == [] + + +# --------------------------------------------------------------------------- +# Analyst approved-flow (analysis gate): never auto-advances +# --------------------------------------------------------------------------- +class TestAnalysisApprovedFlow: + def test_artifacts_ready_requests_approval_no_advance(self, monkeypatch): + monkeypatch.setattr( + stage_engine, "QG_CHECKS", + {**stage_engine.QG_CHECKS, "check_analysis_complete": _pass}, + ) + task_id = _make_task("analysis") + res = advance_stage(task_id, "analysis", "enduro-trails", "ET-001", + "feature/ET-001-x", finished_agent="analyst") + assert res.advanced is False + assert _stage(task_id) == "analysis" + assert stage_engine.set_issue_in_review.called + assert stage_engine.notify_approve_requested.called + assert _jobs() == [] + + +# --------------------------------------------------------------------------- +# launcher + plane both delegate to the engine +# --------------------------------------------------------------------------- +class TestDelegation: + def test_launcher_calls_engine(self): + from src.agents.launcher import AgentLauncher + task_id = _make_task("development", branch="feature/ET-777-deleg") + with patch("src.stage_engine.advance_stage") as m: + AgentLauncher()._try_advance_stage( + run_id=1, agent="developer", repo="enduro-trails", + branch="feature/ET-777-deleg", + ) + m.assert_called_once() + kwargs = m.call_args.kwargs + assert kwargs["task_id"] == task_id + assert kwargs["current_stage"] == "development" + assert kwargs["finished_agent"] == "developer" + + def test_plane_calls_engine(self): + import asyncio + from src.webhooks import plane as plane_mod + with patch("src.stage_engine.advance_stage") as m: + asyncio.run( + plane_mod._try_advance_stage( + task_id=5, current_stage="analysis", repo="enduro-trails", + work_item_id="ET-001", branch="feature/ET-001-x", + ) + ) + m.assert_called_once() + # plane passes positional args; finished_agent (last positional) is None. + args = m.call_args.args + assert args[0] == 5 + assert args[1] == "analysis" + assert args[-1] is None