Files
orchestrator/tests/test_bug_fast_track_e2e.py
claude-bot 50bcae765a feat(bug-fast-track): cheaper/shorter pipeline route for bug-fix tasks (ORCH-019)
A task carrying the Plane `Bug` label takes a shortened route that skips the
`architecture` stage (one opus architect run + ADR + check_architecture_done),
replacing heavy analysis with a lite package (bug-report + mandatory regression
test plan). EVERY Quality Gate / sub-gate runs UNCHANGED — the route is a
scheduler property, not a gate (root invariant NFR-1): STAGE_TRANSITIONS /
QG_CHECKS / check_* / machine-verdict keys are byte-for-byte preserved.

- src/bug_fast_track.py: new leaf (never-raise) — bug_fast_track_applies (local,
  network-free, checked first), is_bug_task (labels.has_label, Plane API source),
  skips_architecture (pure DB-backed routing predicate), snapshot.
- src/db.py: additive idempotent tasks.track column (TEXT DEFAULT 'full') +
  set_task_track / get_task_track helpers (missing/NULL -> 'full', fail-safe).
- src/stage_engine.py: routing-override on the analysis-exit edge (track='bug' ->
  development/developer, skipping architect); brd-review-clock stamp extended to
  analysis->development. get_next_stage/get_agent_for_stage stay pure.
- src/webhooks/plane.py: classify task as bug in start_pipeline (applies-first
  short-circuit; never-raise -> full cycle on any error).
- src/main.py: additive bug_fast_track block in GET /queue + POST
  /bug-fast-track/escalate (reset 'bug'->'full' to return to the full cycle).
- src/config.py: bug_fast_track_enabled / _label / _repos flags (empty CSV ->
  self-hosting only).
- src/notifications.py: optional 🐞 marker on the bug-track card (never-raise).
- Prompts: analyst.md (lite bug package + escalation), reviewer.md (regression-
  test axis) — 52d canon preserved.
- Docs: CLAUDE.md, README.md (env + API + section), docs/architecture/README.md,
  CHANGELOG.md, .env.example.
- Tests: tests/test_bug_fast_track*.py + test_db_migrations.py + queue block
  (TC-01..TC-15). Full regression green (1551 passed).

Kill-switch ORCH_BUG_FAST_TRACK_ENABLED=false -> 1:1 pre-ORCH-019 (zero
regression; residual track column harmless).

Refs: ORCH-019

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 03:58:15 +03:00

185 lines
7.6 KiB
Python

"""ORCH-019 — bug-fast-track end-to-end / start_pipeline integration.
Covers (04-test-plan.yaml):
TC-08 E2E: a bug task walks development -> review -> testing -> deploy-staging ->
deploy -> done with EVERY edge gate executed, NEVER entering architecture.
TC-09 start_pipeline: an issue with the `Bug` label (flag on, repo in scope) is
created on the bug-fast-track (tasks.track='bug'); an issue without it is
created on the full cycle (track='full').
TC-10 Fail-safe: with bug_fast_track_enabled=False a `Bug`-labelled issue is
created on the full cycle (track='full'), is_bug_task never consulted.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_bug_fast_track_e2e.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine, config as cfg # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
dbfile = tmp_path / "e2e.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "bug_fast_track_repos", "", raising=False)
# Keep the edge sub-gates + self-deploy + serial gate inert so the PLAIN advance
# path runs deterministically and offline (we assert routing + gate execution,
# not the self-hosting deploy mechanics — those have their own suites).
for flag in (
"self_deploy_enabled", "security_gate_enabled", "merge_gate_enabled",
"coverage_gate_enabled", "image_freshness_enabled",
"post_deploy_monitor_enabled", "serial_gate_enabled",
):
monkeypatch.setattr(cfg.settings, flag, False, raising=False)
init_db()
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done", "set_issue_analysis",
"set_issue_awaiting_deploy", "set_issue_deploying", "set_issue_monitoring",
"set_issue_approved",
):
monkeypatch.setattr(stage_engine, name, lambda *a, **k: None, raising=False)
yield
def _make_task(work_item_id, stage="analysis", repo="orchestrator", track="full"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, track) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id, track),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
# --- TC-08: E2E walk, architecture skipped, every gate executed ------------
def test_tc08_bug_task_full_walk_skips_architecture(monkeypatch):
tid = _make_task("ORCH-e2e", stage="analysis", track="bug")
invoked = []
# Record + pass every registered edge gate. check_analysis_approved is NOT in
# this map: with finished_agent=None it is satisfied as approved-via-status
# (no call). check_architecture_done MUST never be invoked.
def _passing(name):
def _fn(*a, **k):
invoked.append(name)
return (True, f"{name} ok")
return _fn
for gate in (
"check_ci_green", "check_reviewer_verdict", "check_tests_passed",
"check_staging_status", "check_deploy_status", "check_architecture_done",
):
monkeypatch.setitem(stage_engine.QG_CHECKS, gate, _passing(gate))
visited = ["analysis"]
wi, repo, branch = "ORCH-e2e", "orchestrator", "feature/ORCH-e2e"
for _ in range(10):
row = db.get_task_by_work_item_id(wi)
cur = row["stage"]
if cur in ("done", "cancelled"):
break
res = advance_stage(tid, cur, repo, wi, branch, finished_agent=None)
if not res.advanced:
break
visited.append(res.to_stage)
assert "architecture" not in visited, f"bug task must skip architecture: {visited}"
assert visited[:3] == ["analysis", "development", "review"]
assert visited[-1] == "done", f"task should reach done: {visited}"
# Every downstream edge gate ran; the architecture gate never did.
for gate in ("check_ci_green", "check_reviewer_verdict", "check_tests_passed",
"check_staging_status", "check_deploy_status"):
assert gate in invoked, f"gate {gate} must execute on the bug track"
assert "check_architecture_done" not in invoked
# --- TC-09 / TC-10: start_pipeline classification --------------------------
async def _drive_start_pipeline(monkeypatch, *, is_bug: bool, enabled: bool):
from src.webhooks import plane
from src import plane_sync, bug_fast_track
from src.projects import ProjectConfig
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", enabled, raising=False)
proj = ProjectConfig(
plane_project_id="proj-uuid", repo="orchestrator",
work_item_prefix="ORCH", name="orch",
)
monkeypatch.setattr(plane, "get_project_by_plane_id", lambda pid: proj)
monkeypatch.setattr(plane, "_qg0_errors", lambda name, desc: [])
monkeypatch.setattr(plane, "ensure_unique_work_item_id", lambda wid, repo: wid)
monkeypatch.setattr(plane_sync, "fetch_issue_sequence_id", lambda *a, **k: 777)
monkeypatch.setattr(plane_sync, "set_issue_analysis", lambda *a, **k: None)
monkeypatch.setattr(plane_sync, "add_comment", lambda *a, **k: None)
monkeypatch.setattr(plane, "enqueue_job", lambda *a, **k: 1)
async def _noop(*a, **k):
return None
monkeypatch.setattr(plane, "_create_gitea_branch", _noop)
monkeypatch.setattr(plane, "_create_initial_docs", _noop)
# Spy is_bug_task so we can assert it is/ isn't consulted; applies() stays REAL
# (flag + self-hosting scope), so TC-10 proves the local short-circuit.
seen = {"is_bug_task": 0}
def _is_bug(wi, pid=None):
seen["is_bug_task"] += 1
return is_bug
monkeypatch.setattr(bug_fast_track, "is_bug_task", _is_bug)
data = {
"id": "issue-uuid-1",
"name": "Fix the crash on submit",
"description_stripped": "A sufficiently long description for QG-0 to pass.",
"project": "proj-uuid",
}
await plane.start_pipeline(data, project_id="proj-uuid")
return seen
def test_tc09_bug_label_creates_bug_track(monkeypatch):
import asyncio
seen = asyncio.run(_drive_start_pipeline(monkeypatch, is_bug=True, enabled=True))
assert seen["is_bug_task"] == 1 # applies() True -> classification consulted
row = db.get_task_by_work_item_id("ORCH-777")
assert row is not None
assert row["track"] == "bug"
def test_tc09_no_label_creates_full_track(monkeypatch):
import asyncio
seen = asyncio.run(_drive_start_pipeline(monkeypatch, is_bug=False, enabled=True))
assert seen["is_bug_task"] == 1
row = db.get_task_by_work_item_id("ORCH-777")
assert row["track"] == "full"
def test_tc10_killswitch_off_bug_label_full_cycle(monkeypatch):
import asyncio
seen = asyncio.run(_drive_start_pipeline(monkeypatch, is_bug=True, enabled=False))
# applies() is False (kill-switch) -> is_bug_task short-circuited (zero network).
assert seen["is_bug_task"] == 0
row = db.get_task_by_work_item_id("ORCH-777")
assert row["track"] == "full"