"""ORCH-019 — bug-fast-track end-to-end / start_pipeline integration. Covers (04-test-plan.yaml): TC-08 E2E: a bug task walks development -> review -> testing -> deploy-staging -> deploy -> done with EVERY edge gate executed, NEVER entering architecture. TC-09 start_pipeline: an issue with the `Bug` label (flag on, repo in scope) is created on the bug-fast-track (tasks.track='bug'); an issue without it is created on the full cycle (track='full'). TC-10 Fail-safe: with bug_fast_track_enabled=False a `Bug`-labelled issue is created on the full cycle (track='full'), is_bug_task never consulted. """ import os import tempfile import pytest _test_db = os.path.join(tempfile.gettempdir(), "test_bug_fast_track_e2e.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import src.db as db # noqa: E402 from src.db import init_db, get_db # noqa: E402 from src import stage_engine, config as cfg # noqa: E402 from src.stage_engine import advance_stage # noqa: E402 @pytest.fixture(autouse=True) def fresh_db(monkeypatch, tmp_path): dbfile = tmp_path / "e2e.db" monkeypatch.setattr(db.settings, "db_path", str(dbfile)) monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", True, raising=False) monkeypatch.setattr(cfg.settings, "bug_fast_track_repos", "", raising=False) # Keep the edge sub-gates + self-deploy + serial gate inert so the PLAIN advance # path runs deterministically and offline (we assert routing + gate execution, # not the self-hosting deploy mechanics — those have their own suites). for flag in ( "self_deploy_enabled", "security_gate_enabled", "merge_gate_enabled", "coverage_gate_enabled", "image_freshness_enabled", "post_deploy_monitor_enabled", "serial_gate_enabled", ): monkeypatch.setattr(cfg.settings, flag, False, raising=False) init_db() yield @pytest.fixture(autouse=True) def silence_side_effects(monkeypatch): for name in ( "notify_stage_change", "notify_qg_failure", "notify_approve_requested", "send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment", "set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress", "set_issue_blocked", "set_issue_done", "set_issue_analysis", "set_issue_awaiting_deploy", "set_issue_deploying", "set_issue_monitoring", "set_issue_approved", ): monkeypatch.setattr(stage_engine, name, lambda *a, **k: None, raising=False) yield def _make_task(work_item_id, stage="analysis", repo="orchestrator", track="full"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, track) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id, track), ) tid = cur.lastrowid conn.commit() conn.close() return tid # --- TC-08: E2E walk, architecture skipped, every gate executed ------------ def test_tc08_bug_task_full_walk_skips_architecture(monkeypatch): tid = _make_task("ORCH-e2e", stage="analysis", track="bug") invoked = [] # Record + pass every registered edge gate. check_analysis_approved is NOT in # this map: with finished_agent=None it is satisfied as approved-via-status # (no call). check_architecture_done MUST never be invoked. def _passing(name): def _fn(*a, **k): invoked.append(name) return (True, f"{name} ok") return _fn for gate in ( "check_ci_green", "check_reviewer_verdict", "check_tests_passed", "check_staging_status", "check_deploy_status", "check_architecture_done", ): monkeypatch.setitem(stage_engine.QG_CHECKS, gate, _passing(gate)) visited = ["analysis"] wi, repo, branch = "ORCH-e2e", "orchestrator", "feature/ORCH-e2e" for _ in range(10): row = db.get_task_by_work_item_id(wi) cur = row["stage"] if cur in ("done", "cancelled"): break res = advance_stage(tid, cur, repo, wi, branch, finished_agent=None) if not res.advanced: break visited.append(res.to_stage) assert "architecture" not in visited, f"bug task must skip architecture: {visited}" assert visited[:3] == ["analysis", "development", "review"] assert visited[-1] == "done", f"task should reach done: {visited}" # Every downstream edge gate ran; the architecture gate never did. for gate in ("check_ci_green", "check_reviewer_verdict", "check_tests_passed", "check_staging_status", "check_deploy_status"): assert gate in invoked, f"gate {gate} must execute on the bug track" assert "check_architecture_done" not in invoked # --- TC-09 / TC-10: start_pipeline classification -------------------------- async def _drive_start_pipeline(monkeypatch, *, is_bug: bool, enabled: bool): from src.webhooks import plane from src import plane_sync, bug_fast_track from src.projects import ProjectConfig monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", enabled, raising=False) proj = ProjectConfig( plane_project_id="proj-uuid", repo="orchestrator", work_item_prefix="ORCH", name="orch", ) monkeypatch.setattr(plane, "get_project_by_plane_id", lambda pid: proj) monkeypatch.setattr(plane, "_qg0_errors", lambda name, desc: []) monkeypatch.setattr(plane, "ensure_unique_work_item_id", lambda wid, repo: wid) monkeypatch.setattr(plane_sync, "fetch_issue_sequence_id", lambda *a, **k: 777) monkeypatch.setattr(plane_sync, "set_issue_analysis", lambda *a, **k: None) monkeypatch.setattr(plane_sync, "add_comment", lambda *a, **k: None) monkeypatch.setattr(plane, "enqueue_job", lambda *a, **k: 1) async def _noop(*a, **k): return None monkeypatch.setattr(plane, "_create_gitea_branch", _noop) monkeypatch.setattr(plane, "_create_initial_docs", _noop) # Spy is_bug_task so we can assert it is/ isn't consulted; applies() stays REAL # (flag + self-hosting scope), so TC-10 proves the local short-circuit. seen = {"is_bug_task": 0} def _is_bug(wi, pid=None): seen["is_bug_task"] += 1 return is_bug monkeypatch.setattr(bug_fast_track, "is_bug_task", _is_bug) data = { "id": "issue-uuid-1", "name": "Fix the crash on submit", "description_stripped": "A sufficiently long description for QG-0 to pass.", "project": "proj-uuid", } await plane.start_pipeline(data, project_id="proj-uuid") return seen def test_tc09_bug_label_creates_bug_track(monkeypatch): import asyncio seen = asyncio.run(_drive_start_pipeline(monkeypatch, is_bug=True, enabled=True)) assert seen["is_bug_task"] == 1 # applies() True -> classification consulted row = db.get_task_by_work_item_id("ORCH-777") assert row is not None assert row["track"] == "bug" def test_tc09_no_label_creates_full_track(monkeypatch): import asyncio seen = asyncio.run(_drive_start_pipeline(monkeypatch, is_bug=False, enabled=True)) assert seen["is_bug_task"] == 1 row = db.get_task_by_work_item_id("ORCH-777") assert row["track"] == "full" def test_tc10_killswitch_off_bug_label_full_cycle(monkeypatch): import asyncio seen = asyncio.run(_drive_start_pipeline(monkeypatch, is_bug=True, enabled=False)) # applies() is False (kill-switch) -> is_bug_task short-circuited (zero network). assert seen["is_bug_task"] == 0 row = db.get_task_by_work_item_id("ORCH-777") assert row["track"] == "full"