feat(bug-fast-track): cheaper/shorter pipeline route for bug-fix tasks (ORCH-019)

A task carrying the Plane `Bug` label takes a shortened route that skips the
`architecture` stage (one opus architect run + ADR + check_architecture_done),
replacing heavy analysis with a lite package (bug-report + mandatory regression
test plan). EVERY Quality Gate / sub-gate runs UNCHANGED — the route is a
scheduler property, not a gate (root invariant NFR-1): STAGE_TRANSITIONS /
QG_CHECKS / check_* / machine-verdict keys are byte-for-byte preserved.

- src/bug_fast_track.py: new leaf (never-raise) — bug_fast_track_applies (local,
  network-free, checked first), is_bug_task (labels.has_label, Plane API source),
  skips_architecture (pure DB-backed routing predicate), snapshot.
- src/db.py: additive idempotent tasks.track column (TEXT DEFAULT 'full') +
  set_task_track / get_task_track helpers (missing/NULL -> 'full', fail-safe).
- src/stage_engine.py: routing-override on the analysis-exit edge (track='bug' ->
  development/developer, skipping architect); brd-review-clock stamp extended to
  analysis->development. get_next_stage/get_agent_for_stage stay pure.
- src/webhooks/plane.py: classify task as bug in start_pipeline (applies-first
  short-circuit; never-raise -> full cycle on any error).
- src/main.py: additive bug_fast_track block in GET /queue + POST
  /bug-fast-track/escalate (reset 'bug'->'full' to return to the full cycle).
- src/config.py: bug_fast_track_enabled / _label / _repos flags (empty CSV ->
  self-hosting only).
- src/notifications.py: optional 🐞 marker on the bug-track card (never-raise).
- Prompts: analyst.md (lite bug package + escalation), reviewer.md (regression-
  test axis) — 52d canon preserved.
- Docs: CLAUDE.md, README.md (env + API + section), docs/architecture/README.md,
  CHANGELOG.md, .env.example.
- Tests: tests/test_bug_fast_track*.py + test_db_migrations.py + queue block
  (TC-01..TC-15). Full regression green (1551 passed).

Kill-switch ORCH_BUG_FAST_TRACK_ENABLED=false -> 1:1 pre-ORCH-019 (zero
regression; residual track column harmless).

Refs: ORCH-019

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-10 03:47:49 +03:00
committed by orchestrator-deployer
parent bc04186b93
commit 50bcae765a
22 changed files with 1392 additions and 5 deletions

166
src/bug_fast_track.py Normal file
View File

@@ -0,0 +1,166 @@
"""ORCH-019: bug-fast-track — a cheaper/shorter pipeline route for bug-fix tasks.
Leaf module — pure, unit-testable logic over the config flags + the proven Plane
label apparatus (``labels.has_label`` -> ``plane_sync``, ORCH-089). Mirrors the
leaf pattern of ``src/labels.py`` / ``src/serial_gate.py``: imports only
``config`` (and lazily ``labels`` / ``db`` / ``qg.checks``), never
``stage_engine`` / ``launcher``.
What it decides (ADR-001):
* Whether the bug-fast-track is in scope for a repo (``bug_fast_track_applies``)
— a LOCAL, network-free check evaluated FIRST.
* Whether a given Plane issue carries the ``Bug`` label (``is_bug_task``) — the
only network call, made ONLY after ``applies()`` is True, so a disabled
kill-switch costs zero network and yields zero regression (AC-6).
* Whether a task's stored track skips the ``architecture`` stage
(``skips_architecture``) — a pure predicate over the DB-stored ``track``,
read in the hot ``advance_stage`` path WITHOUT any network call (NFR-4).
never-raise contract (BR-6/AC-6, fail-safe to the FULL cycle): every public
function degrades to "full cycle" on ANY error / ambiguity / Plane
unavailability / disabled flag. There is NO fail-open here — the conservative
default is always the full pipeline (with ``architecture``), so an error can
never silently skip a stage.
"""
from __future__ import annotations
import logging
from .config import settings
logger = logging.getLogger("orchestrator.bug_fast_track")
# ---------------------------------------------------------------------------
# Scope / kill-switch (mirrors _auto_label_applies / serial_gate_applies)
# ---------------------------------------------------------------------------
def bug_fast_track_applies(repo: str) -> bool:
"""Whether the bug-fast-track is REAL for ``repo`` (ADR-001 D6 / AC-6).
* ``bug_fast_track_enabled=False`` -> always False (kill-switch; start and
routing are 1:1 as before ORCH-019, and — crucially — ``has_label`` is
never consulted, so no new network call on start, AC-6).
* ``bug_fast_track_repos`` (CSV) non-empty -> real only for the listed repos.
* empty CSV -> self-hosting only (``orchestrator``) — the safe default (the
track is first burnt in on the orchestrator itself, where the `Bug` label
is guaranteed to exist; enduro opts in via an explicit CSV entry).
Checked FIRST (local, network-free); never raises -> False on error (degrade
to "full cycle", which matches the kill-switch-off behaviour).
"""
try:
if not getattr(settings, "bug_fast_track_enabled", False):
return False
raw = (getattr(settings, "bug_fast_track_repos", "") or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
return (repo or "").strip().lower() in allowed
# Lazy import keeps this module a leaf (avoids importing qg at load).
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise -> full cycle
logger.warning("bug_fast_track_applies error for %s: %s", repo, e)
return False
# ---------------------------------------------------------------------------
# Classification (the ONLY network call; ADR-001 D1)
# ---------------------------------------------------------------------------
def is_bug_task(work_item_id: str, project_id: str | None = None) -> bool:
"""True iff the issue carries the configured ``Bug`` label (Plane API source).
``bug_fast_track_applies`` is assumed already True (checked by the caller —
the gate idiom ``applies(repo) and is_bug_task(...)`` short-circuits before any
network call when the kill-switch is off). Delegates to the proven
``labels.has_label`` (fetch_issue_labels + get_project_labels, normalization,
TTL-cache, source-of-truth = Plane API, not the webhook payload).
Any error / ambiguity / Plane unavailability -> **False** (fail-safe -> full
cycle, never silently fast-track on doubt).
"""
try:
label = (getattr(settings, "bug_fast_track_label", "") or "").strip()
if not label:
return False
from . import labels
return bool(labels.has_label(work_item_id, label, project_id))
except Exception as e: # noqa: BLE001 - never-raise -> full cycle
logger.warning(
"is_bug_task error for %s -> fail-safe (full cycle): %s", work_item_id, e
)
return False
# ---------------------------------------------------------------------------
# Routing predicate (pure, DB-backed; hot path — NO network, NFR-4) — ADR-001 D3
# ---------------------------------------------------------------------------
def skips_architecture(track: str | None) -> bool:
"""Whether a task with stored ``track`` skips the ``architecture`` stage.
Pure predicate (no I/O): True iff the kill-switch is on AND ``track == 'bug'``.
Used by ``advance_stage`` on the analysis-exit edge to map
``analysis -> architecture`` to ``analysis -> development`` for a bug task.
A disabled flag -> always False (1:1 prior routing); any error -> False
(fail-safe -> full cycle).
"""
try:
if not getattr(settings, "bug_fast_track_enabled", False):
return False
return (track or "").strip().lower() == "bug"
except Exception as e: # noqa: BLE001 - never-raise -> full cycle
logger.warning("skips_architecture error for track=%r: %s", track, e)
return False
# ---------------------------------------------------------------------------
# Observability snapshot for GET /queue (ADR-001 D7)
# ---------------------------------------------------------------------------
def snapshot() -> dict:
"""Read-only bug-fast-track summary for GET /queue (additive block). never-raise.
Surfaces the flags + a savings metric derived from the existing telemetry: the
count of tasks on the bug track and the number of ``architecture`` agent runs
those tasks structurally skipped (one per bug task = ``est_saved_architecture_runs``).
Any error -> a minimal dict with the flags (never crashes the endpoint).
"""
try:
enabled = bool(getattr(settings, "bug_fast_track_enabled", False))
except Exception: # noqa: BLE001
enabled = False
try:
label = getattr(settings, "bug_fast_track_label", "Bug") or "Bug"
except Exception: # noqa: BLE001
label = "Bug"
try:
repos_cfg = getattr(settings, "bug_fast_track_repos", "") or ""
except Exception: # noqa: BLE001
repos_cfg = ""
active_bug_tasks = 0
total_bug_tasks = 0
try:
from . import db
conn = db.get_db()
try:
# ORCH-090 terminal set {done,cancelled}: "active" = not terminal.
row = conn.execute(
"SELECT "
" COUNT(*) AS total, "
" SUM(CASE WHEN stage NOT IN ('done','cancelled') THEN 1 ELSE 0 END) AS active "
"FROM tasks WHERE track = 'bug'"
).fetchone()
if row:
total_bug_tasks = int(row["total"] or 0)
active_bug_tasks = int(row["active"] or 0)
finally:
conn.close()
except Exception as e: # noqa: BLE001
logger.warning("bug_fast_track snapshot count error: %s", e)
return {
"enabled": enabled,
"label": label,
"repos": repos_cfg,
"active_bug_tasks": active_bug_tasks,
"total_bug_tasks": total_bug_tasks,
# Each bug task skips exactly one `architecture` stage (one architect agent
# run + ADR). This is the structural savings the track buys (FR-7 / AC-7).
"est_saved_architecture_runs": total_bug_tasks,
}

View File

@@ -794,6 +794,34 @@ class Settings(BaseSettings):
auto_label_repos: str = ""
auto_label_states_ttl_s: int = 300
# ORCH-019: bug-fast-track — a cheaper/shorter pipeline route for bug-fix tasks.
# A task carrying the Plane label `bug_fast_track_label` (default `Bug`) skips
# the whole `architecture` stage (one opus `architect` run + ADR + the
# check_architecture_done exit-gate): the routing-override in advance_stage maps
# the analysis -> architecture edge to analysis -> development for a task whose
# tasks.track == 'bug'. EVERY Quality Gate / sub-gate (CI/review/tester/staging/
# deploy + security/merge/coverage/image-freshness/merge-verify) runs UNCHANGED
# — the route is a scheduler property, NOT a gate (root invariant NFR-1).
# Recognition reuses the proven ORCH-089 label apparatus (labels.has_label ->
# plane_sync), read ONLY in start_pipeline (never in the hot claim_next_job).
# Additive leaf (src/bug_fast_track.py, never-raise) + an additive idempotent
# tasks.track column; STAGE_TRANSITIONS / QG_CHECKS / check_* / verdict-keys are
# NOT touched. fail-safe -> full cycle on any error/ambiguity/disabled flag. See
# docs/work-items/ORCH-019/06-adr/ADR-001-bug-fast-track.md and the cross-cutting
# docs/architecture/adr/adr-0032-bug-fast-track.md.
# bug_fast_track_enabled -> kill-switch (env ORCH_BUG_FAST_TRACK_ENABLED).
# False -> start_pipeline AND advance_stage are 1:1 as
# before ORCH-019 (skips_architecture always False,
# has_label never consulted) — zero regression (AC-6).
# bug_fast_track_label -> Plane label name that activates the track (env
# ORCH_BUG_FAST_TRACK_LABEL; default `Bug`).
# bug_fast_track_repos -> CSV scope (env ORCH_BUG_FAST_TRACK_REPOS). Empty ->
# self-hosting only (orchestrator), the safe default
# (D6); non-empty -> only the listed repos.
bug_fast_track_enabled: bool = True
bug_fast_track_label: str = "Bug"
bug_fast_track_repos: str = ""
# Telegram notifications
telegram_bot_token: str = ""
telegram_chat_id: str = ""

View File

@@ -140,6 +140,13 @@ def init_db():
# irreversible step finishes honestly, then applied.
_ensure_column(conn, "tasks", "cancelled_at", "TEXT")
_ensure_column(conn, "tasks", "cancel_requested_at", "TEXT")
# ORCH-019 (08-data-requirements.md): bug-fast-track task type. Additive,
# idempotent (_ensure_column is a no-op once present) -> safe on the live shared
# prod DB (enduro untouched). Values: 'full' (DEFAULT — ALL existing and non-bug
# tasks) | 'bug' (a task carrying the Plane `Bug` label, set in start_pipeline
# after a successful atomic create). Read in advance_stage for the routing-override
# (skips architecture) — from the DB, NEVER from the network (NFR-4).
_ensure_column(conn, "tasks", "track", "TEXT DEFAULT 'full'")
# ORCH-026 (Level B): declarative task dependencies. job_deps stores the
# directed edge "task_id (B) is blocked-by depends_on_task_id (A)". The
# scheduler gate in claim_next_job keeps B queued until every A reaches
@@ -487,6 +494,48 @@ def update_task_stage(task_id: int, stage: str):
conn.close()
# ---------------------------------------------------------------------------
# ORCH-019: bug-fast-track task type (tasks.track) helpers
# ---------------------------------------------------------------------------
def set_task_track(task_id: int, track: str) -> None:
"""ORCH-019: persist the task's pipeline track ('full' | 'bug').
Idempotent overwrite. Called from start_pipeline (after a successful atomic
create, when the issue carries the `Bug` label) and from the escalate endpoint
(reset 'bug' -> 'full' to return a complex bug to the full cycle).
"""
conn = get_db()
try:
conn.execute(
"UPDATE tasks SET track = ? WHERE id = ?", (track, task_id)
)
conn.commit()
finally:
conn.close()
def get_task_track(task_id: int) -> str:
"""ORCH-019: read the task's pipeline track; missing/NULL -> 'full' (fail-safe).
Read in the hot advance_stage path for the routing-override (skips architecture).
A non-existent row, a NULL value, or any read error degrades to 'full' so a bug
can never be created by accident (fail-safe -> full cycle).
"""
try:
conn = get_db()
try:
row = conn.execute(
"SELECT track FROM tasks WHERE id = ?", (task_id,)
).fetchone()
finally:
conn.close()
if not row:
return "full"
return row["track"] or "full"
except Exception: # noqa: BLE001 - fail-safe -> full cycle
return "full"
# ---------------------------------------------------------------------------
# Telegram live tracker helpers (feat/telegram-live-tracker)
# ---------------------------------------------------------------------------

View File

@@ -212,6 +212,7 @@ async def queue():
from . import fs_normalize
from . import labels
from . import cancel
from . import bug_fast_track
from .disk_watchdog import disk_watchdog
from .build_cache_pruner import build_cache_pruner
return {
@@ -243,6 +244,10 @@ async def queue():
# repo scope, cancelled/deferred counts, recent cancellations. Additive block;
# never-raise.
"stop": cancel.snapshot(),
# ORCH-019 (FR-7 / AC-7): bug-fast-track observability (read-only) —
# kill-switch, label, scope, bug-task counts + the structural savings metric
# (architecture stages skipped). Additive block; never-raise.
"bug_fast_track": bug_fast_track.snapshot(),
# ORCH-063 (FR-6 / AC-7): disk-watchdog observability (read-only) —
# enabled, threshold, interval, last measurement per host-path. Additive
# block; never-raise (status() returns {"enabled": ...} minimum on error).
@@ -343,3 +348,45 @@ async def coverage_set_baseline(repo: str = "", value: float | None = None):
repo = repo.strip()
ok = db.set_coverage_baseline(repo, value, sha="manual-override")
return {"ok": ok, "repo": repo, "baseline": db.get_coverage_baseline(repo)}
@app.post("/bug-fast-track/escalate")
async def bug_fast_track_escalate(work_item: str = ""):
"""ORCH-019 (FR-5 / AC-5, ADR-001 D5): escalate a bug-fast-track task to the
full cycle (return it to the route WITH `architecture`).
Operator path for a bug that turned out to be complex / architectural / visual
(needs an ADR or a mock): reset ``tasks.track`` 'bug' -> 'full'. Apply while the
task is still in `analysis` (before its exit) — the next advance_stage then routes
analysis -> architecture normally. By образцу ``POST /serial-gate/unfreeze`` /
``POST /coverage/baseline``. never-raise.
"""
from . import db
if not work_item or not work_item.strip():
return {"ok": False, "error": "missing 'work_item'", "work_item": work_item}
work_item = work_item.strip()
task = db.get_task_by_work_item_id(work_item)
if not task:
return {"ok": False, "error": "unknown work_item", "work_item": work_item}
prev_track = task.get("track") or "full"
db.set_task_track(task["id"], "full")
if prev_track == "bug":
try:
from .notifications import send_telegram
send_telegram(
f"🐞➡️ {work_item}: эскалация в ПОЛНЫЙ цикл "
f"(багфикс-трек снят, стадия architecture восстановлена)."
)
except Exception:
pass
try:
from .plane_sync import add_comment
add_comment(
work_item,
"🐞➡️ Эскалация: задача возвращена в полный цикл "
"(багфикс-трек снят, стадия architecture восстановлена).",
author="analyst",
)
except Exception:
pass
return {"ok": True, "work_item": work_item, "track": "full", "was": prev_track}

View File

@@ -452,10 +452,18 @@ def render_task_tracker(task_id: int) -> str:
task_repo = _row_get(task, "repo")
task_issue_id = _row_get(task, "plane_issue_id")
num_html = plane_issue_link(work_item_id, plane_issue_id=task_issue_id, repo=task_repo)
# ORCH-019 (D7): mark a bug-fast-track task with a \ud83d\udc1e in the header. Optional,
# never-raise \u2014 any error simply omits the marker (the card always renders).
bug_marker = ""
try:
if (_row_get(task, "track") or "").strip().lower() == "bug":
bug_marker = "\U0001f41e "
except Exception:
bug_marker = ""
header = (
f"\U0001f389 {num_html} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e"
f"\U0001f389 {bug_marker}{num_html} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e"
if done
else f"\U0001f6e0\ufe0f {num_html} \u00b7 {esc_title}"
else f"\U0001f6e0\ufe0f {bug_marker}{num_html} \u00b7 {esc_title}"
)
bar = "\u2501" * 22
# ORCH-067 (req 2): a Plane-status line (model ORCH-066) under the header.

View File

@@ -30,7 +30,7 @@ import os
import time
from dataclasses import dataclass, field
from .db import get_db, update_task_stage, enqueue_job
from .db import get_db, update_task_stage, enqueue_job, get_task_track
from .stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
from .git_worktree import get_worktree_path
from .review_parse import extract_review_findings, extract_test_failures
@@ -40,6 +40,7 @@ from . import merge_gate
from . import self_deploy
from . import post_deploy
from . import labels
from . import bug_fast_track
from .notifications import (
notify_stage_change,
notify_qg_failure,
@@ -212,6 +213,25 @@ def advance_stage(
try:
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
# --- ORCH-019 bug-fast-track routing-override (ADR-001 D3) ------------
# A task carrying the Plane `Bug` label is stored as tasks.track='bug' in
# start_pipeline. On the analysis-EXIT edge we map analysis -> architecture
# to analysis -> development, so a bug skips the whole `architecture` stage
# (one opus architect run + ADR + check_architecture_done). This is a pure
# routing-override: STAGE_TRANSITIONS / get_next_stage / get_agent_for_stage
# stay 1:1, and the track is read from the DB (no network in this hot path,
# NFR-4). For a non-bug task (track='full', the DEFAULT) the route is
# byte-for-byte unchanged. The `track` is reused below for the next-agent
# override and the brd-review-clock stamp.
track = get_task_track(task_id)
if current_stage == "analysis" and bug_fast_track.skips_architecture(track):
next_stage = "development"
logger.info(
f"Task {task_id}: bug-fast-track -> analysis -> development "
f"(skipping architecture, ORCH-019)"
)
result.qg_name = qg_name
result.to_stage = next_stage
@@ -383,7 +403,11 @@ def advance_stage(
# Telegram live tracker: the analysis->architecture advance is the human
# Approved gate clearing -> stamp the END of "Ревью БРД" (the only
# human time). Idempotent: only the first stamp counts.
if current_stage == "analysis" and next_stage == "architecture":
# ORCH-019 (ADR-001 D3): for a bug-fast-track task the analysis-exit edge
# lands on `development` (not `architecture`), so the brd-review-clock end
# stamp must trigger on BOTH targets — otherwise "твоё время" (ORCH-087)
# would never close on the bug track. This does not touch any gate.
if current_stage == "analysis" and next_stage in ("architecture", "development"):
try:
from .db import mark_brd_review_ended
mark_brd_review_ended(task_id)
@@ -462,6 +486,12 @@ def advance_stage(
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
next_agent = get_agent_for_stage(current_stage)
# ORCH-019 (ADR-001 D3): get_agent_for_stage('analysis') is 'architect'; for a
# bug-fast-track task we skip the architect run entirely and launch the
# developer directly (mirrors the next_stage override above). get_agent_for_stage
# stays pure (1:1) — the override lives here, NOT in stages.py.
if current_stage == "analysis" and next_stage == "development":
next_agent = "developer"
if next_agent:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\n"

View File

@@ -18,6 +18,7 @@ from ..db import (
enqueue_job,
insert_event_dedup,
create_task_atomic,
set_task_track,
)
from ._dedup import plane_delivery_id
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
@@ -648,6 +649,42 @@ async def start_pipeline(data: dict, project_id: str = ""):
return
task_id = task_row["id"]
# ORCH-019 (FR-1/FR-2, ADR-001 D1/D2): classify the task as a bug-fix and put it
# on the cheaper bug-fast-track (skips the `architecture` stage downstream). The
# gate idiom is `applies(repo) and is_bug_task(...)`: the LOCAL, network-free
# `bug_fast_track_applies` is checked FIRST so a disabled kill-switch / out-of-scope
# repo costs ZERO network (no has_label call). The Plane `Bug` label is the source
# of truth (read here at start, NEVER in the hot claim_next_job — NFR-4); the type
# is persisted in tasks.track so advance_stage routes off the DB, not the network.
# never-raise / fail-safe: ANY error -> task stays track='full' (full cycle, AC-6).
try:
from .. import bug_fast_track
if bug_fast_track.bug_fast_track_applies(repo) and bug_fast_track.is_bug_task(
work_item_id, plane_project_id
):
set_task_track(task_id, "bug")
logger.info(
f"Task {work_item_id}: classified as BUG -> bug-fast-track "
f"(architecture stage will be skipped, ORCH-019)"
)
try:
from ..plane_sync import add_comment as _bug_comment
_bug_comment(
work_item_id,
"\U0001f41e Багфикс-трек: "
"упрощённый маршрут "
"(пропуск стадии architecture). "
"Все Quality Gate исполняются.",
author="analyst",
)
except Exception:
pass
except Exception as e:
logger.warning(
f"Task {work_item_id}: bug-fast-track classification skipped "
f"(fail-safe -> full cycle): {e}"
)
# ORCH-088 (FR-1/AC-6, ADR-001 D1): DEFER the branch cut for an applicable repo.
# Creating the Gitea branch here (T0, issue -> analysis) would cut it from `main`
# BEFORE the predecessor is merged -> stale base. When the serial gate applies we