feat(bug-fast-track): cheaper/shorter pipeline route for bug-fix tasks (ORCH-019)
All checks were successful
CI / test (push) Successful in 46s
CI / test (pull_request) Successful in 46s

A task carrying the Plane `Bug` label takes a shortened route that skips the
`architecture` stage (one opus architect run + ADR + check_architecture_done),
replacing heavy analysis with a lite package (bug-report + mandatory regression
test plan). EVERY Quality Gate / sub-gate runs UNCHANGED — the route is a
scheduler property, not a gate (root invariant NFR-1): STAGE_TRANSITIONS /
QG_CHECKS / check_* / machine-verdict keys are byte-for-byte preserved.

- src/bug_fast_track.py: new leaf (never-raise) — bug_fast_track_applies (local,
  network-free, checked first), is_bug_task (labels.has_label, Plane API source),
  skips_architecture (pure DB-backed routing predicate), snapshot.
- src/db.py: additive idempotent tasks.track column (TEXT DEFAULT 'full') +
  set_task_track / get_task_track helpers (missing/NULL -> 'full', fail-safe).
- src/stage_engine.py: routing-override on the analysis-exit edge (track='bug' ->
  development/developer, skipping architect); brd-review-clock stamp extended to
  analysis->development. get_next_stage/get_agent_for_stage stay pure.
- src/webhooks/plane.py: classify task as bug in start_pipeline (applies-first
  short-circuit; never-raise -> full cycle on any error).
- src/main.py: additive bug_fast_track block in GET /queue + POST
  /bug-fast-track/escalate (reset 'bug'->'full' to return to the full cycle).
- src/config.py: bug_fast_track_enabled / _label / _repos flags (empty CSV ->
  self-hosting only).
- src/notifications.py: optional 🐞 marker on the bug-track card (never-raise).
- Prompts: analyst.md (lite bug package + escalation), reviewer.md (regression-
  test axis) — 52d canon preserved.
- Docs: CLAUDE.md, README.md (env + API + section), docs/architecture/README.md,
  CHANGELOG.md, .env.example.
- Tests: tests/test_bug_fast_track*.py + test_db_migrations.py + queue block
  (TC-01..TC-15). Full regression green (1551 passed).

Kill-switch ORCH_BUG_FAST_TRACK_ENABLED=false -> 1:1 pre-ORCH-019 (zero
regression; residual track column harmless).

Refs: ORCH-019

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-10 03:47:49 +03:00
parent 62a63ba0c6
commit d7abbc79b3
22 changed files with 1392 additions and 5 deletions

View File

@@ -0,0 +1,168 @@
"""ORCH-019 — src/bug_fast_track.py: bug-fast-track pure logic (never-raise, fail-safe).
Covers (04-test-plan.yaml):
TC-01 is_bug_task() True for an issue carrying the `Bug` label (label read from
the Plane API via labels.has_label, NOT the webhook payload).
TC-02 is_bug_task() False on missing/ambiguous label or labels=None (fail-safe).
TC-03 bug_fast_track_applies(): the LOCAL scope (enabled + CSV repos) is checked
FIRST, before any network; disabled flag -> False without has_label.
TC-04 never-raise: an exception in the label apparatus degrades is_bug_task to
False (full cycle), never propagates.
"""
import os
import tempfile
import pytest
os.environ.setdefault(
"ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_bug_fast_track.db")
)
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from src import bug_fast_track # noqa: E402
from src import plane_sync # noqa: E402
from src import config as cfg # noqa: E402
@pytest.fixture(autouse=True)
def enabled_self_hosting(monkeypatch):
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "bug_fast_track_label", "Bug", raising=False)
monkeypatch.setattr(cfg.settings, "bug_fast_track_repos", "", raising=False)
# Keep _resolve_project_id offline-deterministic (mirrors test_labels.py).
monkeypatch.setattr(plane_sync, "_resolve_project_id", lambda w=None, p=None: "proj-1")
yield
# --- TC-01: classification True --------------------------------------------
def test_tc01_is_bug_task_true(monkeypatch):
monkeypatch.setattr(plane_sync, "fetch_issue_labels", lambda w, p=None: ["uuid-BUG"])
monkeypatch.setattr(plane_sync, "get_project_labels", lambda pid: {"bug": "uuid-BUG"})
assert bug_fast_track.is_bug_task("ORCH-1", "proj-1") is True
def test_tc01_label_from_plane_api_not_payload(monkeypatch):
"""The decision comes from labels.has_label (Plane API), independent of any
webhook payload field — a payload `type` is irrelevant."""
seen = {"fetch": 0}
def fetch(w, p=None):
seen["fetch"] += 1
return ["uuid-BUG"]
monkeypatch.setattr(plane_sync, "fetch_issue_labels", fetch)
monkeypatch.setattr(plane_sync, "get_project_labels", lambda pid: {"bug": "uuid-BUG"})
assert bug_fast_track.is_bug_task("ORCH-1", "proj-1") is True
assert seen["fetch"] == 1 # the Plane API WAS consulted
# --- TC-02: fail-safe on absent / ambiguous / None -------------------------
def test_tc02_label_absent(monkeypatch):
monkeypatch.setattr(plane_sync, "fetch_issue_labels", lambda w, p=None: ["uuid-OTHER"])
monkeypatch.setattr(plane_sync, "get_project_labels", lambda pid: {"bug": "uuid-BUG"})
assert bug_fast_track.is_bug_task("ORCH-1", "proj-1") is False
def test_tc02_labels_none(monkeypatch):
monkeypatch.setattr(plane_sync, "fetch_issue_labels", lambda w, p=None: None)
monkeypatch.setattr(plane_sync, "get_project_labels", lambda pid: {"bug": "uuid-BUG"})
assert bug_fast_track.is_bug_task("ORCH-1", "proj-1") is False
def test_tc02_label_ambiguous(monkeypatch):
monkeypatch.setattr(plane_sync, "fetch_issue_labels", lambda w, p=None: ["uuid-BUG"])
monkeypatch.setattr(
plane_sync, "get_project_labels", lambda pid: {"bug": "__AMBIGUOUS__"}
)
assert bug_fast_track.is_bug_task("ORCH-1", "proj-1") is False
def test_tc02_empty_label_config(monkeypatch):
monkeypatch.setattr(cfg.settings, "bug_fast_track_label", "", raising=False)
monkeypatch.setattr(plane_sync, "fetch_issue_labels", lambda w, p=None: ["uuid-BUG"])
monkeypatch.setattr(plane_sync, "get_project_labels", lambda pid: {"bug": "uuid-BUG"})
assert bug_fast_track.is_bug_task("ORCH-1", "proj-1") is False
# --- TC-03: local scope first (CSV + self-hosting + kill-switch) ------------
def test_tc03_empty_csv_self_hosting_only(monkeypatch):
monkeypatch.setattr(cfg.settings, "bug_fast_track_repos", "", raising=False)
assert bug_fast_track.bug_fast_track_applies("orchestrator") is True
assert bug_fast_track.bug_fast_track_applies("enduro-trails") is False
def test_tc03_csv_membership(monkeypatch):
monkeypatch.setattr(cfg.settings, "bug_fast_track_repos", "enduro-trails, foo", raising=False)
assert bug_fast_track.bug_fast_track_applies("enduro-trails") is True
assert bug_fast_track.bug_fast_track_applies("foo") is True
# orchestrator is NOT in the explicit CSV -> out of scope.
assert bug_fast_track.bug_fast_track_applies("orchestrator") is False
def test_tc03_killswitch_off_no_network(monkeypatch):
"""The gate idiom `applies(repo) and is_bug_task(...)` short-circuits before any
network call when the kill-switch is off (AC-6)."""
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", False, raising=False)
called = {"fetch": 0}
def spy(*a, **k):
called["fetch"] += 1
return ["uuid-BUG"]
monkeypatch.setattr(plane_sync, "fetch_issue_labels", spy)
repo = "orchestrator"
fired = bug_fast_track.bug_fast_track_applies(repo) and bug_fast_track.is_bug_task(
"ORCH-1", "proj-1"
)
assert fired is False
assert called["fetch"] == 0 # is_bug_task never reached -> zero network
# --- TC-04: never-raise -----------------------------------------------------
def test_tc04_is_bug_task_never_raises(monkeypatch):
def boom(*a, **k):
raise RuntimeError("plane down")
monkeypatch.setattr(plane_sync, "fetch_issue_labels", boom)
monkeypatch.setattr(plane_sync, "get_project_labels", lambda pid: {"bug": "uuid-BUG"})
# Degrades to False (full cycle), no exception.
assert bug_fast_track.is_bug_task("ORCH-1", "proj-1") is False
def test_tc04_applies_never_raises(monkeypatch):
# A repos config whose access explodes still yields False, not a crash.
class _Poisoned:
bug_fast_track_enabled = True
@property
def bug_fast_track_repos(self):
raise RuntimeError("boom")
monkeypatch.setattr(bug_fast_track, "settings", _Poisoned(), raising=False)
assert bug_fast_track.bug_fast_track_applies("orchestrator") is False
# --- skips_architecture predicate ------------------------------------------
def test_skips_architecture_bug(monkeypatch):
assert bug_fast_track.skips_architecture("bug") is True
assert bug_fast_track.skips_architecture("BUG") is True
def test_skips_architecture_full(monkeypatch):
assert bug_fast_track.skips_architecture("full") is False
assert bug_fast_track.skips_architecture(None) is False
assert bug_fast_track.skips_architecture("") is False
def test_skips_architecture_killswitch_off(monkeypatch):
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", False, raising=False)
# Even a stored 'bug' track is inert when the kill-switch is off (1:1 routing).
assert bug_fast_track.skips_architecture("bug") is False
# --- snapshot ---------------------------------------------------------------
def test_snapshot_never_raises():
snap = bug_fast_track.snapshot()
assert set(snap) >= {
"enabled", "label", "repos",
"active_bug_tasks", "total_bug_tasks", "est_saved_architecture_runs",
}

View File

@@ -0,0 +1,87 @@
"""ORCH-019 — composition with ORCH-088 serial-gate / ORCH-089 auto-label (AC-9).
Covers (04-test-plan.yaml):
TC-14 A bug-fast-track task is an ORDINARY repo task for the serial gate
(ORCH-088): it counts as an active task and is gated like any other — it
does NOT bypass serialisation. autoApprove/autoDeploy (ORCH-089) apply on
the bug track (scope is repo-based, track-agnostic).
"""
import os
import tempfile
import pytest
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_bft_composition.db")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402
from src import serial_gate, labels, config as cfg # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "comp.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", False, raising=False)
monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False)
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "auto_label_enabled", True, raising=False)
init_db()
yield
def _make_task(work_item_id, stage="analysis", repo="orchestrator", track="full"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, track) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id, track),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def test_tc14_bug_task_counts_as_active_in_serial_gate():
# An EARLIER bug task A (unfinished) must gate a later task B's analyst-job —
# a bug task does NOT bypass the serial gate.
_make_task("ORCH-301", stage="development", track="bug") # active bug predecessor
b = _make_task("ORCH-302", stage="analysis", track="full") # new task
enqueue_job("analyst", "orchestrator", "B", task_id=b)
assert claim_next_job() is None, "a bug task must gate a later analyst-job (no bypass)"
# The bug task is the active task in the snapshot.
per = serial_gate.snapshot()["per_repo"]["orchestrator"]
assert per["active_task"]["work_item_id"] == "ORCH-301"
def test_tc14_bug_task_itself_gated_behind_predecessor():
# The bug task is also HELD behind an earlier non-bug task (symmetry).
_make_task("ORCH-310", stage="development", track="full") # active predecessor
b = _make_task("ORCH-311", stage="analysis", track="bug") # new BUG task
enqueue_job("analyst", "orchestrator", "bug-B", task_id=b)
assert claim_next_job() is None, "a bug task is itself serialised behind the predecessor"
def test_tc14_bug_task_claimable_once_predecessor_done():
a = _make_task("ORCH-320", stage="development", track="full")
b = _make_task("ORCH-321", stage="analysis", track="bug")
jid = enqueue_job("analyst", "orchestrator", "bug-B", task_id=b)
assert claim_next_job() is None
# Finish A -> the bug task's analyst-job is now claimable.
conn = get_db()
conn.execute("UPDATE tasks SET stage='done' WHERE id=?", (a,))
conn.commit()
conn.close()
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == jid
def test_tc14_auto_label_applies_track_agnostic(monkeypatch):
# autoApprove/autoDeploy scope is repo-based, independent of the bug track.
assert labels.auto_approve_applies("orchestrator") is True
assert labels.auto_deploy_applies("orchestrator") is True

View File

@@ -0,0 +1,184 @@
"""ORCH-019 — bug-fast-track end-to-end / start_pipeline integration.
Covers (04-test-plan.yaml):
TC-08 E2E: a bug task walks development -> review -> testing -> deploy-staging ->
deploy -> done with EVERY edge gate executed, NEVER entering architecture.
TC-09 start_pipeline: an issue with the `Bug` label (flag on, repo in scope) is
created on the bug-fast-track (tasks.track='bug'); an issue without it is
created on the full cycle (track='full').
TC-10 Fail-safe: with bug_fast_track_enabled=False a `Bug`-labelled issue is
created on the full cycle (track='full'), is_bug_task never consulted.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_bug_fast_track_e2e.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine, config as cfg # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
dbfile = tmp_path / "e2e.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "bug_fast_track_repos", "", raising=False)
# Keep the edge sub-gates + self-deploy + serial gate inert so the PLAIN advance
# path runs deterministically and offline (we assert routing + gate execution,
# not the self-hosting deploy mechanics — those have their own suites).
for flag in (
"self_deploy_enabled", "security_gate_enabled", "merge_gate_enabled",
"coverage_gate_enabled", "image_freshness_enabled",
"post_deploy_monitor_enabled", "serial_gate_enabled",
):
monkeypatch.setattr(cfg.settings, flag, False, raising=False)
init_db()
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done", "set_issue_analysis",
"set_issue_awaiting_deploy", "set_issue_deploying", "set_issue_monitoring",
"set_issue_approved",
):
monkeypatch.setattr(stage_engine, name, lambda *a, **k: None, raising=False)
yield
def _make_task(work_item_id, stage="analysis", repo="orchestrator", track="full"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, track) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id, track),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
# --- TC-08: E2E walk, architecture skipped, every gate executed ------------
def test_tc08_bug_task_full_walk_skips_architecture(monkeypatch):
tid = _make_task("ORCH-e2e", stage="analysis", track="bug")
invoked = []
# Record + pass every registered edge gate. check_analysis_approved is NOT in
# this map: with finished_agent=None it is satisfied as approved-via-status
# (no call). check_architecture_done MUST never be invoked.
def _passing(name):
def _fn(*a, **k):
invoked.append(name)
return (True, f"{name} ok")
return _fn
for gate in (
"check_ci_green", "check_reviewer_verdict", "check_tests_passed",
"check_staging_status", "check_deploy_status", "check_architecture_done",
):
monkeypatch.setitem(stage_engine.QG_CHECKS, gate, _passing(gate))
visited = ["analysis"]
wi, repo, branch = "ORCH-e2e", "orchestrator", "feature/ORCH-e2e"
for _ in range(10):
row = db.get_task_by_work_item_id(wi)
cur = row["stage"]
if cur in ("done", "cancelled"):
break
res = advance_stage(tid, cur, repo, wi, branch, finished_agent=None)
if not res.advanced:
break
visited.append(res.to_stage)
assert "architecture" not in visited, f"bug task must skip architecture: {visited}"
assert visited[:3] == ["analysis", "development", "review"]
assert visited[-1] == "done", f"task should reach done: {visited}"
# Every downstream edge gate ran; the architecture gate never did.
for gate in ("check_ci_green", "check_reviewer_verdict", "check_tests_passed",
"check_staging_status", "check_deploy_status"):
assert gate in invoked, f"gate {gate} must execute on the bug track"
assert "check_architecture_done" not in invoked
# --- TC-09 / TC-10: start_pipeline classification --------------------------
async def _drive_start_pipeline(monkeypatch, *, is_bug: bool, enabled: bool):
from src.webhooks import plane
from src import plane_sync, bug_fast_track
from src.projects import ProjectConfig
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", enabled, raising=False)
proj = ProjectConfig(
plane_project_id="proj-uuid", repo="orchestrator",
work_item_prefix="ORCH", name="orch",
)
monkeypatch.setattr(plane, "get_project_by_plane_id", lambda pid: proj)
monkeypatch.setattr(plane, "_qg0_errors", lambda name, desc: [])
monkeypatch.setattr(plane, "ensure_unique_work_item_id", lambda wid, repo: wid)
monkeypatch.setattr(plane_sync, "fetch_issue_sequence_id", lambda *a, **k: 777)
monkeypatch.setattr(plane_sync, "set_issue_analysis", lambda *a, **k: None)
monkeypatch.setattr(plane_sync, "add_comment", lambda *a, **k: None)
monkeypatch.setattr(plane, "enqueue_job", lambda *a, **k: 1)
async def _noop(*a, **k):
return None
monkeypatch.setattr(plane, "_create_gitea_branch", _noop)
monkeypatch.setattr(plane, "_create_initial_docs", _noop)
# Spy is_bug_task so we can assert it is/ isn't consulted; applies() stays REAL
# (flag + self-hosting scope), so TC-10 proves the local short-circuit.
seen = {"is_bug_task": 0}
def _is_bug(wi, pid=None):
seen["is_bug_task"] += 1
return is_bug
monkeypatch.setattr(bug_fast_track, "is_bug_task", _is_bug)
data = {
"id": "issue-uuid-1",
"name": "Fix the crash on submit",
"description_stripped": "A sufficiently long description for QG-0 to pass.",
"project": "proj-uuid",
}
await plane.start_pipeline(data, project_id="proj-uuid")
return seen
def test_tc09_bug_label_creates_bug_track(monkeypatch):
import asyncio
seen = asyncio.run(_drive_start_pipeline(monkeypatch, is_bug=True, enabled=True))
assert seen["is_bug_task"] == 1 # applies() True -> classification consulted
row = db.get_task_by_work_item_id("ORCH-777")
assert row is not None
assert row["track"] == "bug"
def test_tc09_no_label_creates_full_track(monkeypatch):
import asyncio
seen = asyncio.run(_drive_start_pipeline(monkeypatch, is_bug=False, enabled=True))
assert seen["is_bug_task"] == 1
row = db.get_task_by_work_item_id("ORCH-777")
assert row["track"] == "full"
def test_tc10_killswitch_off_bug_label_full_cycle(monkeypatch):
import asyncio
seen = asyncio.run(_drive_start_pipeline(monkeypatch, is_bug=True, enabled=False))
# applies() is False (kill-switch) -> is_bug_task short-circuited (zero network).
assert seen["is_bug_task"] == 0
row = db.get_task_by_work_item_id("ORCH-777")
assert row["track"] == "full"

View File

@@ -0,0 +1,105 @@
"""ORCH-019 — escalation of a complex bug to the full cycle (FR-5 / AC-5, D5).
Covers (04-test-plan.yaml):
TC-11 After the escalate endpoint resets track 'bug' -> 'full' (while the task
is still in `analysis`), the next advance routes analysis -> architecture
(return to the full cycle with the architect run).
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_bug_fast_track_escalation.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine, config as cfg # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
dbfile = tmp_path / "esc.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", True, raising=False)
init_db()
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done", "set_issue_analysis",
"set_issue_awaiting_deploy", "set_issue_deploying", "set_issue_monitoring",
"set_issue_approved",
):
monkeypatch.setattr(stage_engine, name, lambda *a, **k: None, raising=False)
yield
def _make_task(work_item_id, stage="analysis", track="bug"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, track) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(work_item_id, work_item_id, "orchestrator", f"feature/{work_item_id}",
stage, work_item_id, track),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def test_tc11_escalate_returns_to_full_cycle(monkeypatch):
import asyncio
from src import main
tid = _make_task("ORCH-cmplx", stage="analysis", track="bug")
# Operator escalates while the task is still in analysis.
out = asyncio.run(main.bug_fast_track_escalate(work_item="ORCH-cmplx"))
assert out["ok"] is True
assert out["track"] == "full"
assert out["was"] == "bug"
assert db.get_task_track(tid) == "full"
# The next advance now routes back through architecture (full cycle).
res = advance_stage(
tid, "analysis", "orchestrator", "ORCH-cmplx", "feature/ORCH-cmplx",
finished_agent=None,
)
assert res.to_stage == "architecture"
assert res.enqueued_agent == "architect"
def test_tc11_escalate_unknown_work_item():
import asyncio
from src import main
out = asyncio.run(main.bug_fast_track_escalate(work_item="ORCH-nope"))
assert out["ok"] is False
def test_tc11_escalate_missing_arg():
import asyncio
from src import main
out = asyncio.run(main.bug_fast_track_escalate(work_item=""))
assert out["ok"] is False
def test_tc11_escalate_idempotent_on_full(monkeypatch):
import asyncio
from src import main
tid = _make_task("ORCH-already", stage="analysis", track="full")
out = asyncio.run(main.bug_fast_track_escalate(work_item="ORCH-already"))
assert out["ok"] is True
assert out["was"] == "full"
assert db.get_task_track(tid) == "full"

View File

@@ -0,0 +1,97 @@
"""ORCH-019 — Quality-Gate invariants on the bug-fast-track (root invariant NFR-1).
Covers (04-test-plan.yaml):
TC-07 The QG_CHECKS registry + the check_* signatures are NOT changed by the
bug-fast-track; the machine verdict-keys (verdict / result / deploy_status /
staging_status / security_status / coverage_status) are preserved by name
and case.
TC-12 check_analysis_complete does NOT special-case the bug track (ADR-001 D4):
a bug lite-package that still emits all 4 analysis files passes; the same
requirement holds for a non-bug task (no false block, no weakening).
"""
import os
import tempfile
os.environ.setdefault(
"ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_bft_gates.db")
)
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from src.qg.checks import QG_CHECKS, check_analysis_complete # noqa: E402
# --- TC-07: registry + verdict-keys unchanged ------------------------------
def test_tc07_qg_checks_registry_unchanged():
# The exact registered gate set — a bug-fast-track must add/remove NOTHING.
expected = {
"check_analysis_complete",
"check_analysis_approved",
"check_architecture_done",
"check_ci_green",
"check_review_approved",
"check_reviewer_verdict",
"check_tests_local",
"check_tests_passed",
"check_staging_status",
"check_staging_image_fresh",
"check_deploy_status",
"check_branch_mergeable",
"check_security_gate",
"check_coverage_gate",
}
assert set(QG_CHECKS.keys()) == expected
def test_tc07_verdict_keys_preserved():
"""The frontmatter machine verdict-keys are parsed by exact name/case. ORCH-019
touches none of the parsers, so the literal keys must still be present."""
import inspect
from src.qg import checks as checks_mod
src = inspect.getsource(checks_mod)
for key in ("verdict:", "result:", "deploy_status:", "staging_status:"):
assert key in src, f"verdict key '{key}' must be preserved in qg.checks"
# security_status / coverage_status live in their own leaves but are read via
# the same unified frontmatter contract — assert they survive there.
import inspect as _i
from src import security_gate, coverage_gate
assert "security_status" in _i.getsource(security_gate)
assert "coverage_status" in _i.getsource(coverage_gate)
# --- TC-12: analysis gate not weakened, no false block ---------------------
def _seed_analysis_docs(repo_root, work_item_id, files):
d = os.path.join(repo_root, "docs", "work-items", work_item_id)
os.makedirs(d, exist_ok=True)
for fn in files:
with open(os.path.join(d, fn), "w") as fh:
fh.write("stub\n")
def test_tc12_bug_lite_package_with_all_four_passes(monkeypatch, tmp_path):
from src.qg import checks as checks_mod
monkeypatch.setattr(checks_mod, "_repo_path", lambda repo, branch=None: str(tmp_path))
_seed_analysis_docs(
str(tmp_path), "ORCH-bug",
["01-brd.md", "02-trz.md", "03-acceptance-criteria.md", "04-test-plan.yaml"],
)
ok, reason = check_analysis_complete("orchestrator", "ORCH-bug", "feature/x")
assert ok is True, reason
def test_tc12_missing_file_still_fails_for_any_track(monkeypatch, tmp_path):
"""The gate is NOT weakened for bugs: a package missing 02/03 still fails —
exactly as for a non-bug task (the gate never reads tasks.track)."""
from src.qg import checks as checks_mod
monkeypatch.setattr(checks_mod, "_repo_path", lambda repo, branch=None: str(tmp_path))
_seed_analysis_docs(str(tmp_path), "ORCH-bug", ["01-brd.md", "04-test-plan.yaml"])
ok, reason = check_analysis_complete("orchestrator", "ORCH-bug", "feature/x")
assert ok is False
assert "02-trz.md" in reason and "03-acceptance-criteria.md" in reason
def test_tc12_signature_has_no_track_param():
import inspect
params = list(inspect.signature(check_analysis_complete).parameters)
# byte-for-byte signature: (repo, work_item_id, branch=None) — no track-awareness.
assert params == ["repo", "work_item_id", "branch"]

View File

@@ -0,0 +1,147 @@
"""ORCH-019 — advance_stage routing-override (ADR-001 D3).
Covers (04-test-plan.yaml):
TC-05 bug task: analysis -> development (architecture skipped, developer
enqueued); non-bug task: analysis -> architecture (architect enqueued).
TC-06 STAGE_TRANSITIONS is structurally unchanged (set of stages + edges +
agents + qg byte-for-byte) — the override does NOT mutate the table.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_bug_fast_track_routing.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db, set_task_track # noqa: E402
from src import stage_engine # noqa: E402
from src import config as cfg # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
dbfile = tmp_path / "r.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", True, raising=False)
init_db()
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done", "set_issue_analysis",
"set_issue_awaiting_deploy", "set_issue_deploying", "set_issue_monitoring",
"set_issue_approved",
):
monkeypatch.setattr(stage_engine, name, lambda *a, **k: None, raising=False)
yield
def _make_task(work_item_id, stage="analysis", repo="orchestrator"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES (?, ?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
# --- TC-05 -----------------------------------------------------------------
def test_tc05_bug_task_skips_architecture():
tid = _make_task("ORCH-bug", stage="analysis")
set_task_track(tid, "bug")
# agent=None -> the webhook Approved-via-status path (gate satisfied, advance).
res = advance_stage(
tid, "analysis", "orchestrator", "ORCH-bug", "feature/ORCH-bug",
finished_agent=None,
)
assert res.advanced is True
assert res.to_stage == "development"
assert res.enqueued_agent == "developer"
# DB stage actually advanced past architecture.
row = db.get_task_by_work_item_id("ORCH-bug")
assert row["stage"] == "development"
def test_tc05_full_task_keeps_architecture():
tid = _make_task("ORCH-full", stage="analysis")
# track defaults to 'full' (no set_task_track call).
res = advance_stage(
tid, "analysis", "orchestrator", "ORCH-full", "feature/ORCH-full",
finished_agent=None,
)
assert res.advanced is True
assert res.to_stage == "architecture"
assert res.enqueued_agent == "architect"
def test_tc05_killswitch_off_bug_keeps_architecture(monkeypatch):
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", False, raising=False)
tid = _make_task("ORCH-bugoff", stage="analysis")
set_task_track(tid, "bug") # stored, but the flag is off -> inert
res = advance_stage(
tid, "analysis", "orchestrator", "ORCH-bugoff", "feature/ORCH-bugoff",
finished_agent=None,
)
assert res.to_stage == "architecture"
assert res.enqueued_agent == "architect"
def test_tc05_bug_only_affects_analysis_edge():
"""The override is scoped to the analysis-exit edge only — a bug task on
`development` still routes development -> review (no spurious skips)."""
tid = _make_task("ORCH-bugdev", stage="development")
set_task_track(tid, "bug")
# Make check_ci_green pass deterministically (we only assert routing, not CI).
import src.stage_engine as se
orig = se.QG_CHECKS.get("check_ci_green")
se.QG_CHECKS["check_ci_green"] = lambda *a, **k: (True, "ok")
try:
res = advance_stage(
tid, "development", "orchestrator", "ORCH-bugdev", "feature/ORCH-bugdev",
finished_agent=None,
)
finally:
if orig is not None:
se.QG_CHECKS["check_ci_green"] = orig
assert res.to_stage == "review"
# --- TC-06: STAGE_TRANSITIONS structurally unchanged -----------------------
def test_tc06_stage_transitions_unchanged():
from src.stages import STAGE_TRANSITIONS
expected = {
"created": {"next": "analysis", "agent": "analyst", "qg": None},
"analysis": {"next": "architecture", "agent": "architect", "qg": "check_analysis_approved"},
"architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"},
"development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"},
"review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"},
"testing": {"next": "deploy-staging", "agent": "deployer", "qg": "check_tests_passed"},
"deploy-staging": {"next": "deploy", "agent": "deployer", "qg": "check_staging_status"},
"deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"},
"done": {"next": None, "agent": None, "qg": None},
"cancelled": {"next": None, "agent": None, "qg": None},
}
assert STAGE_TRANSITIONS == expected
def test_tc06_get_next_stage_pure():
"""get_next_stage / get_agent_for_stage stay PURE (no track arg) — the override
lives in advance_stage, not in stages.py."""
from src.stages import get_next_stage, get_agent_for_stage
assert get_next_stage("analysis") == "architecture"
assert get_agent_for_stage("analysis") == "architect"

View File

@@ -0,0 +1,79 @@
"""ORCH-019 (TC-15) — additive, idempotent tasks.track migration.
The bug-fast-track stores the task type in an additive ``tasks.track`` column
(``TEXT DEFAULT 'full'``) created via ``_ensure_column`` (idempotent). A repeated
``init_db`` must not crash, existing rows must default to ``'full'``, and the
helpers must round-trip.
"""
import os
import tempfile
import pytest
os.environ.setdefault(
"ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_db_migrations.db")
)
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db, set_task_track, get_task_track # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "m.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
init_db()
yield
def _columns(table):
conn = get_db()
try:
return [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
finally:
conn.close()
def test_tc15_track_column_present_with_default():
assert "track" in _columns("tasks")
# A row inserted WITHOUT track gets the DEFAULT 'full'.
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES ('p','ORCH-1','orchestrator','feature/x','analysis','t')"
)
conn.commit()
row = conn.execute("SELECT track FROM tasks WHERE work_item_id='ORCH-1'").fetchone()
conn.close()
assert row["track"] == "full"
def test_tc15_init_db_idempotent():
# Running init_db again is a no-op on the existing column (no crash).
init_db()
init_db()
assert "track" in _columns("tasks")
def test_tc15_helpers_round_trip():
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES ('p2','ORCH-2','orchestrator','feature/y','analysis','t')"
)
tid = cur.lastrowid
conn.commit()
conn.close()
assert get_task_track(tid) == "full" # default
set_task_track(tid, "bug")
assert get_task_track(tid) == "bug"
set_task_track(tid, "full")
assert get_task_track(tid) == "full"
def test_tc15_get_task_track_missing_row_failsafe():
# Unknown task id -> 'full' (fail-safe -> full cycle), never raises.
assert get_task_track(999999) == "full"

View File

@@ -59,3 +59,50 @@ def test_queue_serial_gate_reflects_freeze():
assert "orchestrator" in per
assert per["orchestrator"]["frozen"] is True
assert per["orchestrator"]["frozen_reason"] == "DEGRADED"
# --- ORCH-019 (TC-13): additive bug_fast_track block -----------------------
def test_queue_has_bug_fast_track_block_and_keeps_existing_keys(monkeypatch):
import asyncio
from src import main
monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", True, raising=False)
payload = asyncio.run(main.queue())
# Pre-existing keys are all still present (no contract break).
for key in ("counts", "serial_gate", "coverage", "auto_labels", "stop", "recent"):
assert key in payload, f"existing /queue key '{key}' must be preserved"
assert "bug_fast_track" in payload
bft = payload["bug_fast_track"]
assert bft["enabled"] is True
assert set(bft) >= {
"enabled", "label", "repos",
"active_bug_tasks", "total_bug_tasks", "est_saved_architecture_runs",
}
def test_queue_bug_fast_track_counts_bug_tasks():
import asyncio
from src import main
conn = db.get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, track) "
"VALUES ('p1','ORCH-401','orchestrator','feature/x','development','t','bug')"
)
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, track) "
"VALUES ('p2','ORCH-402','orchestrator','feature/y','done','t','bug')"
)
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, track) "
"VALUES ('p3','ORCH-403','orchestrator','feature/z','development','t','full')"
)
conn.commit()
conn.close()
bft = asyncio.run(main.queue())["bug_fast_track"]
assert bft["total_bug_tasks"] == 2 # two bug tasks total
assert bft["active_bug_tasks"] == 1 # one non-terminal bug task
assert bft["est_saved_architecture_runs"] == 2