A task carrying the Plane `Bug` label takes a shortened route that skips the `architecture` stage (one opus architect run + ADR + check_architecture_done), replacing heavy analysis with a lite package (bug-report + mandatory regression test plan). EVERY Quality Gate / sub-gate runs UNCHANGED — the route is a scheduler property, not a gate (root invariant NFR-1): STAGE_TRANSITIONS / QG_CHECKS / check_* / machine-verdict keys are byte-for-byte preserved. - src/bug_fast_track.py: new leaf (never-raise) — bug_fast_track_applies (local, network-free, checked first), is_bug_task (labels.has_label, Plane API source), skips_architecture (pure DB-backed routing predicate), snapshot. - src/db.py: additive idempotent tasks.track column (TEXT DEFAULT 'full') + set_task_track / get_task_track helpers (missing/NULL -> 'full', fail-safe). - src/stage_engine.py: routing-override on the analysis-exit edge (track='bug' -> development/developer, skipping architect); brd-review-clock stamp extended to analysis->development. get_next_stage/get_agent_for_stage stay pure. - src/webhooks/plane.py: classify task as bug in start_pipeline (applies-first short-circuit; never-raise -> full cycle on any error). - src/main.py: additive bug_fast_track block in GET /queue + POST /bug-fast-track/escalate (reset 'bug'->'full' to return to the full cycle). - src/config.py: bug_fast_track_enabled / _label / _repos flags (empty CSV -> self-hosting only). - src/notifications.py: optional 🐞 marker on the bug-track card (never-raise). - Prompts: analyst.md (lite bug package + escalation), reviewer.md (regression- test axis) — 52d canon preserved. - Docs: CLAUDE.md, README.md (env + API + section), docs/architecture/README.md, CHANGELOG.md, .env.example. - Tests: tests/test_bug_fast_track*.py + test_db_migrations.py + queue block (TC-01..TC-15). Full regression green (1551 passed). Kill-switch ORCH_BUG_FAST_TRACK_ENABLED=false -> 1:1 pre-ORCH-019 (zero regression; residual track column harmless). Refs: ORCH-019 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
914 lines
41 KiB
Python
914 lines
41 KiB
Python
"""Plane webhook handlers — full implementation."""
|
||
|
||
import hmac
|
||
import hashlib
|
||
import re
|
||
import json
|
||
import logging
|
||
import httpx
|
||
from fastapi import APIRouter, Request, HTTPException
|
||
|
||
from ..config import settings
|
||
from ..db import (
|
||
get_db,
|
||
get_task_by_plane_id,
|
||
get_next_work_item_id,
|
||
ensure_unique_work_item_id,
|
||
update_task_stage,
|
||
enqueue_job,
|
||
insert_event_dedup,
|
||
create_task_atomic,
|
||
set_task_track,
|
||
)
|
||
from ._dedup import plane_delivery_id
|
||
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
||
from ..qg.checks import QG_CHECKS
|
||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||
from ..agents.launcher import launcher
|
||
from ..plane_sync import (
|
||
notify_stage_change as plane_notify_stage,
|
||
notify_qg_failure as plane_notify_qg,
|
||
notify_done as plane_notify_done,
|
||
)
|
||
from ..projects import (
|
||
get_project_by_plane_id,
|
||
get_project_by_repo,
|
||
known_plane_project_ids,
|
||
)
|
||
|
||
logger = logging.getLogger("orchestrator.webhooks.plane")
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
def verify_plane_signature(body: bytes, signature: str) -> bool:
|
||
"""Verify Plane webhook HMAC-SHA256 signature."""
|
||
if not settings.plane_webhook_secret:
|
||
return True # Skip verification if no secret configured
|
||
expected = hmac.new(
|
||
settings.plane_webhook_secret.encode(),
|
||
body,
|
||
hashlib.sha256,
|
||
).hexdigest()
|
||
return hmac.compare_digest(expected, signature)
|
||
|
||
|
||
@router.post("/plane")
|
||
async def plane_webhook(request: Request):
|
||
"""Handle Plane webhook events."""
|
||
body = await request.body()
|
||
|
||
# Verify HMAC signature
|
||
signature = request.headers.get("X-Plane-Signature", "")
|
||
if not verify_plane_signature(body, signature):
|
||
logger.warning("Plane webhook: invalid signature")
|
||
raise HTTPException(status_code=401, detail="Invalid signature")
|
||
|
||
payload = json.loads(body)
|
||
|
||
# ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the
|
||
# delivery_id falls back to sha256("plane" + body) (a retried identical body maps
|
||
# to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return
|
||
# {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6
|
||
# project filter, so a repeat does no extra work; the FIRST delivery of an unknown
|
||
# project still falls through to the filter below and returns {"status":"ignored"}.
|
||
event_type = payload.get("event", "unknown")
|
||
delivery_id = plane_delivery_id(request.headers, body)
|
||
inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id)
|
||
if not inserted:
|
||
logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch")
|
||
return {"status": "duplicate"}
|
||
|
||
event = payload.get("event")
|
||
action = payload.get("action", "")
|
||
data = payload.get("data", {})
|
||
|
||
# ORCH-6: filter by Plane project. Ignore issues from unknown/unconfigured
|
||
# projects so a webhook on the whole workspace cannot funnel everything into
|
||
# the default repo (root cause of the 2026-06-02 incident).
|
||
project_id = data.get("project") or data.get("project_id") or ""
|
||
if project_id not in known_plane_project_ids():
|
||
logger.info(
|
||
f"Plane webhook: ignoring event '{event}' from unknown project "
|
||
f"'{project_id}' (known: {len(known_plane_project_ids())})"
|
||
)
|
||
return {"status": "ignored", "reason": "unknown project"}
|
||
|
||
if (event == "work_item.created") or (event == "issue" and action == "created"):
|
||
# Feature 1: creation NO LONGER starts the pipeline. Slava keeps the
|
||
# backlog until he moves an issue to In Progress. We only run a soft
|
||
# QG-0 sanity log here (no branch, no analyst, no task row).
|
||
await handle_work_item_created(data, project_id)
|
||
elif (event == "work_item.updated") or (event == "issue" and action == "updated"):
|
||
# Status-only verdict model: status changes drive the pipeline.
|
||
# Backlog/Todo/Triage -> In Progress : START pipeline, or relaunch the
|
||
# stage agent if returned from
|
||
# Needs Input.
|
||
# -> Approved : advance to the next stage.
|
||
# -> Rejected : rollback (reason from latest comment).
|
||
await handle_issue_updated(data, project_id)
|
||
elif (event == "comment.created") or (event == "issue_comment" and action == "created"):
|
||
await handle_comment(data, project_id)
|
||
|
||
return {"status": "accepted"}
|
||
|
||
|
||
def _state_id(data: dict) -> str:
|
||
"""Extract the new Plane state UUID from an 'issue updated' payload.
|
||
|
||
Real payload (verified from prod events): data.state is
|
||
{id, name, color, group}. Some payloads carry state as a bare UUID string.
|
||
"""
|
||
state = data.get("state")
|
||
if isinstance(state, dict):
|
||
return state.get("id", "") or ""
|
||
if isinstance(state, str):
|
||
return state
|
||
return ""
|
||
|
||
|
||
async def handle_issue_updated(data: dict, project_id: str = ""):
|
||
"""Feature 1 & 2: react to a Plane issue status change.
|
||
|
||
Routes the NEW state UUID (data.state.id) to:
|
||
- in_progress : start the pipeline if this issue has no task yet; if a
|
||
task already exists and the stage agent is idle (returned from Needs
|
||
Input), relaunch the stage agent so it reads Slava's fresh comments.
|
||
- approved : advance to the next stage.
|
||
- rejected : rollback to the previous stage (reason from latest comment).
|
||
Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.)
|
||
is ignored here — those are statuses the orchestrator itself sets.
|
||
"""
|
||
from ..plane_sync import get_project_states
|
||
|
||
plane_id = str(data.get("id") or "")
|
||
new_state = _state_id(data)
|
||
if not plane_id or not new_state:
|
||
logger.info("issue updated without id/state, ignoring")
|
||
return
|
||
|
||
# ORCH-10: resolve expected state UUIDs per the incoming issue's project so
|
||
# both enduro (b873d9eb) and orchestrator (e331bfb3) statuses trigger the
|
||
# pipeline. Using PLANE_STATES["in_progress"] here was the root-cause blocker.
|
||
# ORCH-066: the start/resume trigger is now `To Analyse` (human entry-point),
|
||
# which discharges `In Progress` of its overloaded "start the pipeline"
|
||
# meaning. Fail-closed: on a project without the `To Analyse` status,
|
||
# `to_analyse` aliases to the project's own `in_progress` UUID, so moving an
|
||
# enduro issue to In Progress still triggers start/resume (AC-17).
|
||
proj_states = get_project_states(project_id)
|
||
# ORCH-059: the dedicated "Confirm Deploy" status is the prod-deploy trigger.
|
||
# fail-closed via .get — environments without the status (enduro / API
|
||
# fallback) resolve to None, so the branch simply never activates (no KeyError,
|
||
# no blind deploy). Checked before `approved` so the two gestures never alias.
|
||
confirm_state = proj_states.get("confirm_deploy")
|
||
# ORCH-090: dedicated operator STOP status -> cancel the task (stop agent + full
|
||
# reset). fail-closed via .get (no UUID on a board without the status -> None ->
|
||
# branch never activates, exactly like confirm_deploy). Checked FIRST so a STOP
|
||
# is never aliased by to_analyse/approved/rejected.
|
||
stop_state = proj_states.get("stop")
|
||
# ORCH-066: start/resume trigger is `To Analyse` (human entry-point).
|
||
if stop_state and new_state == stop_state:
|
||
await handle_stop(data, project_id)
|
||
elif new_state == proj_states["to_analyse"]:
|
||
await handle_status_start(data, project_id)
|
||
elif confirm_state and new_state == confirm_state:
|
||
await handle_confirm_deploy(data, project_id)
|
||
elif new_state == proj_states["approved"]:
|
||
await handle_verdict(data, project_id, approved=True)
|
||
elif new_state == proj_states["rejected"]:
|
||
await handle_verdict(data, project_id, approved=False)
|
||
else:
|
||
logger.info(f"issue {plane_id} updated to state {new_state[:8]}..., no pipeline action")
|
||
|
||
|
||
async def handle_confirm_deploy(data: dict, project_id: str = ""):
|
||
"""ORCH-059: a human flipped the issue to the dedicated "Confirm Deploy"
|
||
status — the explicit trigger for the self-hosting prod deploy (Phase B).
|
||
|
||
Guarded to the `deploy` stage: "Confirm Deploy" is only meaningful on the
|
||
approval-pending `deploy` stage (Phase A advanced the task there). On any
|
||
other stage it is a no-op-with-log, so a stray Confirm Deploy can never
|
||
perturb another gate.
|
||
|
||
Routes to the unified stage engine with ``confirm_deploy=True`` so ONLY this
|
||
path initiates Phase B; a plain Approved on `deploy` stays a no-op (TRZ-3).
|
||
"""
|
||
plane_id = str(data.get("id") or "")
|
||
task = get_task_by_plane_id(plane_id)
|
||
if not task:
|
||
logger.warning(f"Confirm Deploy for {plane_id} but no task found, ignoring")
|
||
return
|
||
|
||
task_id = task["id"]
|
||
current_stage = task["stage"]
|
||
repo = task["repo"]
|
||
work_item_id = task.get("work_item_id", "")
|
||
branch = task.get("branch", "")
|
||
|
||
if current_stage != "deploy":
|
||
logger.info(
|
||
f"Confirm Deploy for {plane_id} but stage is '{current_stage}' "
|
||
f"(not 'deploy'); no-op"
|
||
)
|
||
return
|
||
|
||
logger.info(
|
||
f"Task {task_id}: Confirm Deploy status on `deploy` -> initiate Phase B prod deploy"
|
||
)
|
||
await _try_advance_stage(
|
||
task_id, current_stage, repo, work_item_id, branch, confirm_deploy=True
|
||
)
|
||
|
||
|
||
async def handle_stop(data: dict, project_id: str = ""):
|
||
"""ORCH-090: a human flipped the issue to the dedicated STOP status — cancel
|
||
the task (stop the active agent + full progress reset).
|
||
|
||
Resolves the task by plane_id and delegates to the unified
|
||
``stage_engine.cancel_task`` (run off the event loop via asyncio.to_thread — it
|
||
is synchronous and may sleep during the graceful SIGTERM cascade). Guards:
|
||
* kill-switch / repo-scope via ``cancel.applies(repo)`` (False -> no-op-log);
|
||
* idempotent — an absent / already-terminal task is a no-op inside cancel_task.
|
||
Contract is never-raise (NFR-5): any error is logged, the webhook flow never
|
||
crashes.
|
||
"""
|
||
import asyncio
|
||
from .. import cancel
|
||
from ..stage_engine import cancel_task
|
||
|
||
plane_id = str(data.get("id") or "")
|
||
task = get_task_by_plane_id(plane_id)
|
||
if not task:
|
||
logger.info(f"STOP for {plane_id} but no task found, ignoring (no-op)")
|
||
return
|
||
|
||
task_id = task["id"]
|
||
repo = task.get("repo", "")
|
||
if not cancel.applies(repo):
|
||
logger.info(
|
||
f"STOP for {plane_id} (task {task_id}, repo={repo}) but cancellation is "
|
||
f"not applicable (kill-switch off / out of scope); no-op"
|
||
)
|
||
return
|
||
|
||
logger.info(f"Task {task_id}: STOP status -> cancelling (stop agent + full reset)")
|
||
try:
|
||
await asyncio.to_thread(cancel_task, task_id, reason="Plane STOP status", source="stop")
|
||
except Exception as e: # never-raise: the webhook flow must not crash
|
||
logger.error(f"STOP handling failed for task {task_id}: {e}")
|
||
|
||
|
||
async def handle_status_start(data: dict, project_id: str = ""):
|
||
"""An issue moved into In Progress.
|
||
|
||
Two cases under the status-only verdict model:
|
||
|
||
1. No task yet for this plane_id -> START the pipeline (start_pipeline).
|
||
|
||
2. A task already exists -> this is Slava returning the issue from
|
||
Needs Input to In Progress after answering the analyst's questions. We
|
||
must RELAUNCH the current stage's agent so it reads the fresh comments
|
||
from Plane (the answer-to-questions flow used to live in handle_comment;
|
||
it is now status-driven).
|
||
|
||
KEY FORK — telling "answer to questions" apart from a plain duplicate In
|
||
Progress webhook (the dedup-protection case):
|
||
|
||
The tasks table stores no Plane status, and the issue.updated payload only
|
||
carries the NEW state (In Progress), so we cannot read the previous status
|
||
from here. Instead we use the only reliable local signal: whether the
|
||
stage's agent is currently in flight.
|
||
|
||
- The orchestrator sets In Progress itself while an agent runs. When the
|
||
agent FINISHES it leaves the issue in Needs Input or In Review and has
|
||
NO queued/running job. So: an existing task with NO active job means the
|
||
agent is idle / waiting -> a return to In Progress is a genuine relaunch
|
||
request -> enqueue the stage agent.
|
||
- If a queued/running job already exists for the task, the agent is busy
|
||
(or a duplicate webhook arrived) -> SKIP (no double launch). The events
|
||
de-dup at the top of plane_webhook already absorbs identical webhook
|
||
bodies; this job guard additionally covers distinct webhooks fired while
|
||
a job is still pending/running.
|
||
"""
|
||
from ..db import has_active_job_for_task
|
||
|
||
plane_id = str(data.get("id") or "")
|
||
existing = get_task_by_plane_id(plane_id)
|
||
|
||
if not existing:
|
||
logger.info(f"Status->In Progress for {plane_id}: starting pipeline")
|
||
await start_pipeline(data, project_id)
|
||
return
|
||
|
||
task_id = existing["id"]
|
||
current_stage = existing["stage"]
|
||
repo = existing["repo"]
|
||
work_item_id = existing.get("work_item_id", "")
|
||
branch = existing.get("branch", "")
|
||
|
||
# Duplicate / busy guard: a job is already pending or running for this task.
|
||
if has_active_job_for_task(task_id):
|
||
logger.info(
|
||
f"Status->In Progress for {plane_id}: task {task_id} already has an "
|
||
f"active job (stage={current_stage}), not relaunching"
|
||
)
|
||
return
|
||
|
||
# Agent is idle -> Slava answered questions and returned the issue to In
|
||
# Progress. Relaunch the current stage's agent to read the fresh comments.
|
||
from ..plane_sync import STAGE_AUTHORS, add_comment as _add_comment
|
||
stage_agent = STAGE_AUTHORS.get(current_stage)
|
||
if not stage_agent:
|
||
logger.info(
|
||
f"Status->In Progress for {plane_id}: no agent for stage "
|
||
f"'{current_stage}', not relaunching"
|
||
)
|
||
return
|
||
|
||
# ORCH-090 (ADR-001 D6 / AC-5): close the relaunch hole. The legitimate "answer
|
||
# to Needs Input" resume is owned ONLY by the analyst (ORCH-066 — the sole
|
||
# Needs-Input setter). A manual move of an EXISTING task at any OTHER stage to
|
||
# "To Analyse" must NOT silently relaunch the mid-pipeline agent on the old
|
||
# branch (the incident pattern). Gate the relaunch to `analysis`; any other
|
||
# stage -> no-op-with-log + a best-effort Plane hint to use STOP -> To Analyse
|
||
# for a clean-slate restart. Under the kill-switch off this gate is inert
|
||
# (behaviour 1:1 as before ORCH-090).
|
||
from ..config import settings as _settings
|
||
if getattr(_settings, "stop_status_enabled", False) and current_stage != "analysis":
|
||
logger.info(
|
||
f"Status->To Analyse for {plane_id}: existing task on stage "
|
||
f"'{current_stage}' — NOT relaunching {stage_agent} (relaunch-hole closed, "
|
||
f"ORCH-090). Use STOP then To Analyse to restart from scratch."
|
||
)
|
||
try:
|
||
_add_comment(
|
||
work_item_id,
|
||
"ℹ️ Перезапуск "
|
||
"агента сменой "
|
||
"рабочего статуса "
|
||
"отключён (ORCH-090). Для "
|
||
"перезапуска с нуля: "
|
||
"STOP → To Analyse.",
|
||
author=stage_agent,
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Failed to post relaunch-hole comment for {work_item_id}: {e}")
|
||
return
|
||
|
||
task_desc = (
|
||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||
f"Stage: {current_stage}\nNote: Stakeholder returned the issue to In "
|
||
f"Progress (answered your questions). Read the latest comments in Plane "
|
||
f"and revise your artifacts."
|
||
)
|
||
job_id = enqueue_job(stage_agent, repo, task_desc, task_id=task_id)
|
||
logger.info(
|
||
f"Task {task_id}: returned to To Analyse (Needs Input answered), "
|
||
f"relaunched {stage_agent} for stage {current_stage} (job_id={job_id})"
|
||
)
|
||
# ORCH-066 (AC-3): a resume of the analyst (the only Needs-Input owner) is
|
||
# re-indicated as `Analysis`; other stages keep their own indication.
|
||
if current_stage == "analysis":
|
||
from ..plane_sync import set_issue_analysis as _set_analysis
|
||
_set_analysis(work_item_id)
|
||
try:
|
||
_add_comment(
|
||
work_item_id,
|
||
"\U0001f504 \u0410\u0433\u0435\u043d\u0442 \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.",
|
||
author=stage_agent,
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Failed to post relaunch comment for {work_item_id}: {e}")
|
||
|
||
|
||
async def handle_verdict(data: dict, project_id: str, approved: bool):
|
||
"""Status-only verdict: a Plane status change drives advance / rollback.
|
||
|
||
Approved status -> _try_advance_stage. We do NOT touch the issue status here:
|
||
_try_advance_stage -> advance_stage -> plane_notify_stage already PATCHes the
|
||
issue to the NEXT stage's status. The old set_issue_in_progress call reset
|
||
the status to In Progress first, which made the board flicker In Progress
|
||
before the next stage (part of bug 3); it is removed.
|
||
|
||
Rejected status -> rollback to the previous stage. The reason is pulled from
|
||
the issue's latest comment (Slava writes the reason in a comment before/with
|
||
flipping the status to Rejected).
|
||
"""
|
||
plane_id = str(data.get("id") or "")
|
||
task = get_task_by_plane_id(plane_id)
|
||
if not task:
|
||
logger.warning(f"Verdict status for {plane_id} but no task found, ignoring")
|
||
return
|
||
|
||
task_id = task["id"]
|
||
current_stage = task["stage"]
|
||
repo = task["repo"]
|
||
work_item_id = task.get("work_item_id", "")
|
||
branch = task.get("branch", "")
|
||
|
||
if approved:
|
||
# NOTE: no set_issue_in_progress here — _try_advance_stage sets the next
|
||
# stage's status itself (advance_stage -> plane_notify_stage).
|
||
logger.info(f"Task {task_id}: Approved status -> advance from {current_stage}")
|
||
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
|
||
return
|
||
|
||
# Rejected: pull the rejection reason from the issue's latest comment.
|
||
issue_id = task.get("plane_issue_id") or task.get("plane_id") or plane_id
|
||
reason = _latest_comment_reason(issue_id, repo, project_id)
|
||
await _rollback_stage(
|
||
task_id, current_stage, repo, work_item_id, branch, reason
|
||
)
|
||
|
||
|
||
def _latest_comment_reason(issue_id: str, repo: str, project_id: str = "") -> str:
|
||
"""Fetch the issue's most recent comment text (HTML stripped) as the reject
|
||
reason. Slava writes the reason in a comment before/with flipping the status
|
||
to Rejected.
|
||
|
||
Returns a fixed fallback when there is no comment / the API call fails.
|
||
"""
|
||
from ..plane_sync import (
|
||
PLANE_BASE,
|
||
PLANE_HEADERS,
|
||
WORKSPACE,
|
||
PROJECT_ID as _DEFAULT_PROJECT_ID,
|
||
)
|
||
fallback = "Rejected via status, no reason comment"
|
||
if not issue_id:
|
||
return fallback
|
||
_proj = get_project_by_repo(repo)
|
||
pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID)
|
||
url = (
|
||
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{pid}/issues/"
|
||
f"{issue_id}/comments/"
|
||
)
|
||
try:
|
||
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||
if resp.status_code != 200:
|
||
logger.warning(
|
||
f"reject-reason: GET comments for {issue_id} returned "
|
||
f"{resp.status_code}"
|
||
)
|
||
return fallback
|
||
payload = resp.json()
|
||
comments = payload.get("results", payload) if isinstance(payload, dict) else payload
|
||
if not comments:
|
||
return fallback
|
||
latest = max(comments, key=lambda c: c.get("created_at", "") or "")
|
||
raw = (
|
||
latest.get("comment_stripped")
|
||
or latest.get("comment_html")
|
||
or latest.get("comment")
|
||
or ""
|
||
)
|
||
text = re.sub(r"<[^>]+>", "", raw).strip()
|
||
return text[:300] if text else fallback
|
||
except Exception as e:
|
||
logger.error(f"reject-reason: failed to fetch comments for {issue_id}: {e}")
|
||
return fallback
|
||
|
||
|
||
async def handle_work_item_created(data: dict, project_id: str = ""):
|
||
"""Feature 1: creation does NOT start the pipeline anymore.
|
||
|
||
The pipeline is started when Slava moves the issue into In Progress
|
||
(handle_status_start -> start_pipeline). On creation we only run a SOFT QG-0
|
||
sanity check and log the result — NO branch, NO docs, NO analyst, NO task row
|
||
— so the issue can sit in the backlog until Slava is ready.
|
||
"""
|
||
plane_id = data.get("id", "")
|
||
name = data.get("name", "untitled")
|
||
description = data.get("description_stripped", data.get("description", ""))
|
||
errors = _qg0_errors(name, description)
|
||
if errors:
|
||
logger.info(f"work_item.created {plane_id}: soft QG-0 warnings: {errors}")
|
||
else:
|
||
logger.info(f"work_item.created {plane_id} ('{name}'): in backlog, awaiting In Progress")
|
||
|
||
|
||
def _qg0_errors(name: str, description: str) -> list:
|
||
"""QG-0 validation: returns a list of human-readable problems (empty = OK)."""
|
||
errors = []
|
||
if not name or len(name) < 5:
|
||
errors.append("Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 5 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)")
|
||
if len(name) > settings.qg0_title_max:
|
||
errors.append(
|
||
f"Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u0434\u043b\u0438\u043d\u043d\u044b\u0439 "
|
||
f"(\u043c\u0430\u043a\u0441\u0438\u043c\u0443\u043c {settings.qg0_title_max} \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)"
|
||
)
|
||
if not description or len(description.strip()) < 20:
|
||
errors.append("Description \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 20 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)")
|
||
|
||
return errors
|
||
|
||
|
||
async def start_pipeline(data: dict, project_id: str = ""):
|
||
"""Feature 1: start the pipeline for an issue (moved to In Progress).
|
||
|
||
This is the body extracted from the old handle_work_item_created: resolve the
|
||
project, run QG-0 (hard — blocks on failure), create the work item id +
|
||
branch + initial docs, insert the task row, and enqueue the analyst.
|
||
|
||
Callers (handle_status_start) already guarantee no existing task for this
|
||
plane_id, so this never duplicates.
|
||
"""
|
||
plane_id = data.get("id", "")
|
||
name = data.get("name", "untitled")
|
||
description = data.get("description_stripped", data.get("description", ""))
|
||
|
||
# ORCH-6: resolve repo / prefix / Plane project from the registry instead of
|
||
# the single hardcoded default_repo.
|
||
if not project_id:
|
||
project_id = data.get("project") or data.get("project_id") or ""
|
||
proj = get_project_by_plane_id(project_id)
|
||
if not proj:
|
||
logger.warning(f"start_pipeline: unknown project '{project_id}', ignoring {plane_id}")
|
||
return
|
||
repo = proj.repo
|
||
plane_project_id = proj.plane_project_id
|
||
|
||
# BUG 1 + BUG B: Plane's issue.updated webhook (status change -> In Progress)
|
||
# sends only the CHANGED fields, so BOTH description / description_stripped
|
||
# AND name are usually empty here even though the issue HAS them. Pull the
|
||
# full title + description from the Plane issue detail API in a SINGLE GET
|
||
# (fetch_issue_fields: same endpoint + shared token already used by
|
||
# fetch_issue_sequence_id) before QG-0 and before the branch slug is built.
|
||
# If the API is also empty, QG-0 legitimately fails (truly empty ticket) and
|
||
# name falls back to "untitled".
|
||
name_missing = (not name) or name.strip().lower() == "untitled" or len(name.strip()) < 3
|
||
desc_missing = (not description) or len(description.strip()) < 20
|
||
if name_missing or desc_missing:
|
||
from ..plane_sync import fetch_issue_fields
|
||
fetched_name, fetched_desc = fetch_issue_fields(plane_id, plane_project_id)
|
||
if desc_missing and fetched_desc and len(fetched_desc.strip()) >= len(description.strip()):
|
||
description = fetched_desc
|
||
logger.info(
|
||
f"start_pipeline: pulled description from Plane API for {plane_id} "
|
||
f"({len(description.strip())} chars)"
|
||
)
|
||
if name_missing and fetched_name and len(fetched_name.strip()) >= 3:
|
||
name = fetched_name
|
||
logger.info(
|
||
f"start_pipeline: pulled name from Plane API for {plane_id} "
|
||
f"('{name}')"
|
||
)
|
||
# BUG B fallback: if name is still empty/blank after the API pull, keep the
|
||
# legacy "untitled" so the slug/branch build never crashes on an empty name.
|
||
if not name or not name.strip():
|
||
name = "untitled"
|
||
|
||
# QG-0 validation (hard gate on pipeline start)
|
||
errors = _qg0_errors(name, description)
|
||
if errors:
|
||
# QG-0 failed
|
||
error_text = "\u26a0\ufe0f QG-0 failed:\n" + "\n".join(f"\u2022 {e}" for e in errors)
|
||
from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE, get_project_states
|
||
import httpx as _httpx
|
||
# Post comment (ORCH-6: route to the issue's own project)
|
||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{plane_project_id}/issues/{plane_id}/comments/"
|
||
try:
|
||
_httpx.post(url, headers=PLANE_HEADERS,
|
||
json={"comment_html": f"<p>{error_text}</p>"}, timeout=10)
|
||
except Exception:
|
||
pass
|
||
# Set blocked — ORCH-10: resolve per-project UUID.
|
||
url2 = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{plane_project_id}/issues/{plane_id}/"
|
||
try:
|
||
_blocked = get_project_states(plane_project_id)["blocked"]
|
||
_httpx.patch(url2, headers=PLANE_HEADERS,
|
||
json={"state": _blocked}, timeout=10)
|
||
except Exception:
|
||
pass
|
||
logger.info(f"QG-0 failed for {plane_id}: {errors}")
|
||
return
|
||
|
||
# Generate work item ID.
|
||
# M-6: source of truth for the number is the Plane sequence_id. Fetch it by
|
||
# issue UUID; if Plane is unavailable, fall back to the DB increment so a
|
||
# Plane outage never blocks task creation (autonomy > exact numbering).
|
||
from ..plane_sync import fetch_issue_sequence_id
|
||
seq = fetch_issue_sequence_id(plane_id, plane_project_id)
|
||
if seq is not None:
|
||
work_item_id = f"{proj.work_item_prefix}-{seq:03d}"
|
||
else:
|
||
work_item_id = get_next_work_item_id(repo, proj.work_item_prefix)
|
||
logger.warning(
|
||
f"Plane sequence_id unavailable for {plane_id}, "
|
||
f"fell back to DB increment: {work_item_id}"
|
||
)
|
||
|
||
# BUG 2a: uniqueness-guard LAYERED ON TOP of the M-6 derive above (the derive
|
||
# itself is untouched). If the derived ET-NNN is already taken by another
|
||
# task in this repo (collision -> two tasks would share branch/worktree, see
|
||
# ET-006), bump to the next free number.
|
||
_derived = work_item_id
|
||
work_item_id = ensure_unique_work_item_id(work_item_id, repo)
|
||
if work_item_id != _derived:
|
||
logger.warning(
|
||
f"work_item_id collision: derived {_derived} already in use for "
|
||
f"{repo}; reassigned {plane_id} -> {work_item_id}"
|
||
)
|
||
|
||
# Create slug from name
|
||
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]
|
||
branch = f"feature/{work_item_id}-{slug}"
|
||
|
||
# BUG 2b (defense-in-depth): the worktree/path is keyed by BRANCH
|
||
# (git_worktree.get_worktree_path) and tasks are reverse-resolved by
|
||
# (repo, branch). With 2a the work_item_id is unique, so the branch prefix is
|
||
# too; but the slug could still collide (e.g. two issues with the same title
|
||
# under different ids -> fine) or, worse, an identical branch already exist.
|
||
# Guard physically: if this exact branch is already owned by another task in
|
||
# this repo, disambiguate with the (now unique) work_item_id so two tasks can
|
||
# never share a worktree.
|
||
_conn_b = get_db()
|
||
_branch_taken = _conn_b.execute(
|
||
"SELECT 1 FROM tasks WHERE repo = ? AND branch = ? LIMIT 1", (repo, branch)
|
||
).fetchone()
|
||
_conn_b.close()
|
||
if _branch_taken is not None:
|
||
branch = f"feature/{work_item_id}-{plane_id[:8]}"
|
||
logger.warning(
|
||
f"branch collision for {repo}; disambiguated to unique branch {branch}"
|
||
)
|
||
|
||
# Insert task into DB — ORCH-053 (AC-4): atomic anti-dup claim under a
|
||
# process-wide lock. If the F-2 reconciler and this live webhook race on the
|
||
# same plane_id, exactly one wins (created=True); the loser sees the existing
|
||
# task and returns WITHOUT creating a second branch / worktree / analyst job.
|
||
task_row, created = create_task_atomic(
|
||
plane_id, work_item_id, repo, branch, "analysis", name
|
||
)
|
||
if not created:
|
||
logger.info(
|
||
f"start_pipeline: task for plane_id={plane_id} already exists "
|
||
f"(id={task_row['id']}, work_item_id={task_row.get('work_item_id')}), "
|
||
f"skipping duplicate creation"
|
||
)
|
||
return
|
||
task_id = task_row["id"]
|
||
|
||
# ORCH-019 (FR-1/FR-2, ADR-001 D1/D2): classify the task as a bug-fix and put it
|
||
# on the cheaper bug-fast-track (skips the `architecture` stage downstream). The
|
||
# gate idiom is `applies(repo) and is_bug_task(...)`: the LOCAL, network-free
|
||
# `bug_fast_track_applies` is checked FIRST so a disabled kill-switch / out-of-scope
|
||
# repo costs ZERO network (no has_label call). The Plane `Bug` label is the source
|
||
# of truth (read here at start, NEVER in the hot claim_next_job — NFR-4); the type
|
||
# is persisted in tasks.track so advance_stage routes off the DB, not the network.
|
||
# never-raise / fail-safe: ANY error -> task stays track='full' (full cycle, AC-6).
|
||
try:
|
||
from .. import bug_fast_track
|
||
if bug_fast_track.bug_fast_track_applies(repo) and bug_fast_track.is_bug_task(
|
||
work_item_id, plane_project_id
|
||
):
|
||
set_task_track(task_id, "bug")
|
||
logger.info(
|
||
f"Task {work_item_id}: classified as BUG -> bug-fast-track "
|
||
f"(architecture stage will be skipped, ORCH-019)"
|
||
)
|
||
try:
|
||
from ..plane_sync import add_comment as _bug_comment
|
||
_bug_comment(
|
||
work_item_id,
|
||
"\U0001f41e Багфикс-трек: "
|
||
"упрощённый маршрут "
|
||
"(пропуск стадии architecture). "
|
||
"Все Quality Gate исполняются.",
|
||
author="analyst",
|
||
)
|
||
except Exception:
|
||
pass
|
||
except Exception as e:
|
||
logger.warning(
|
||
f"Task {work_item_id}: bug-fast-track classification skipped "
|
||
f"(fail-safe -> full cycle): {e}"
|
||
)
|
||
|
||
# ORCH-088 (FR-1/AC-6, ADR-001 D1): DEFER the branch cut for an applicable repo.
|
||
# Creating the Gitea branch here (T0, issue -> analysis) would cut it from `main`
|
||
# BEFORE the predecessor is merged -> stale base. When the serial gate applies we
|
||
# do NOT create the branch / initial docs now; the analyst-job sits in the queue
|
||
# (status='queued', no branch) and the gate keeps it there until the predecessor
|
||
# reaches stage='done'. The branch + docs are then materialised at claim time in
|
||
# launcher._spawn from a fresh origin/main (anti-stale-base). The task row already
|
||
# stores `branch` as a NAME (R-5) — only the git ref is deferred.
|
||
from .. import serial_gate
|
||
defer_branch = serial_gate.serial_gate_applies(repo)
|
||
if not defer_branch:
|
||
# Create branch in Gitea
|
||
try:
|
||
await _create_gitea_branch(repo, branch)
|
||
except Exception as e:
|
||
logger.error(f"Failed to create branch '{branch}': {e}")
|
||
# Task is created, branch creation failed — log but don't crash
|
||
notify_error(0, f"Branch creation failed: {e}")
|
||
return
|
||
|
||
# Create initial docs structure via Gitea API (create file)
|
||
try:
|
||
await _create_initial_docs(repo, branch, work_item_id, name)
|
||
except Exception as e:
|
||
logger.error(f"Failed to create initial docs: {e}")
|
||
else:
|
||
logger.info(
|
||
f"Task {work_item_id}: serial gate applies for {repo} -> deferring branch "
|
||
f"cut to analyst-job claim (anti-stale-base, ORCH-088)"
|
||
)
|
||
|
||
logger.info(f"Task created: {work_item_id} ({name}), branch={branch}, stage=analysis")
|
||
|
||
# Launch analyst agent (task_id from the atomic create above).
|
||
try:
|
||
task_desc = (
|
||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||
f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}"
|
||
)
|
||
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
|
||
# ORCH-066 (AC-3): indicate the analysis stage with the dedicated
|
||
# `Analysis` status (degrades to In Progress where it is not created).
|
||
from ..plane_sync import set_issue_analysis as _set_analysis
|
||
_set_analysis(work_item_id, plane_project_id)
|
||
# Post start comment to Plane
|
||
from ..plane_sync import add_comment as _add_comment
|
||
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
|
||
except Exception as e:
|
||
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")
|
||
|
||
# ORCH-026 (B-1): import declared Plane `blocked-by` relations into job_deps
|
||
# (only for task_deps_source = plane|hybrid; default `db` -> no-op, no Plane
|
||
# call). Best-effort, never-raise: a Plane outage must not block the start.
|
||
try:
|
||
from .. import task_deps
|
||
n = task_deps.ingest_plane_relations(task_id, plane_id, plane_project_id)
|
||
if n:
|
||
logger.info(f"Task {task_id}: ingested {n} blocked-by dependency edge(s)")
|
||
except Exception as e:
|
||
logger.warning(f"Task {task_id}: dependency ingestion skipped: {e}")
|
||
|
||
|
||
async def handle_comment(data: dict, project_id: str = ""):
|
||
"""Status-only verdict model: comments NEVER drive the pipeline.
|
||
|
||
The whole comment-based control mechanism (``:approved:`` / ``:rejected:``
|
||
and the analysis answer-to-questions flow) was removed. It caused bug 3
|
||
(echo self-hit): the analyst posts its own "waiting for approval" comment,
|
||
handle_comment catches its own comment and reverts In Review -> In Progress.
|
||
|
||
Comments are now logged only — no status change, no enqueue, no side effect.
|
||
The pipeline is driven solely by status changes (handle_issue_updated):
|
||
- Approved -> advance
|
||
- Rejected -> rollback (reason pulled from the latest comment)
|
||
- In Progress (returned from Needs Input) -> relaunch the stage agent
|
||
"""
|
||
plane_id = str(
|
||
data.get("work_item_id") or data.get("issue_id") or data.get("issue") or ""
|
||
)
|
||
logger.info(
|
||
f"comment.created for {plane_id}: logged only, no pipeline action "
|
||
f"(status-only verdict model)"
|
||
)
|
||
|
||
|
||
async def _rollback_stage(
|
||
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str,
|
||
reason: str,
|
||
):
|
||
"""Rollback triggered by a status change to Rejected.
|
||
|
||
- at analysis: relaunch the analyst with the rejection reason;
|
||
- otherwise: roll back to the previous stage and relaunch its agent
|
||
(via the existing rollback notify + an enqueue of the prev-stage agent).
|
||
"""
|
||
if current_stage == "analysis":
|
||
# Already in analysis — just relaunch analyst with rejection reason.
|
||
# ORCH-066 (AC-3): indicate `Analysis` (degrades to In Progress where the
|
||
# status is not created).
|
||
from ..plane_sync import set_issue_analysis
|
||
set_issue_analysis(work_item_id)
|
||
task_desc = (
|
||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
|
||
f"Reason: {reason}\nRevise and improve."
|
||
)
|
||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||
from ..plane_sync import add_comment as _plane_comment
|
||
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}", author="analyst")
|
||
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
|
||
return
|
||
|
||
# Rollback to previous stage
|
||
prev_stage = get_previous_stage(current_stage)
|
||
if not prev_stage:
|
||
logger.info(f"Task {task_id}: rejected at {current_stage} but no previous stage")
|
||
return
|
||
update_task_stage(task_id, prev_stage)
|
||
notify_stage_change(task_id, current_stage, prev_stage)
|
||
# Feature 3: plane_notify_stage moves the board to the prev stage's status.
|
||
plane_notify_stage(work_item_id, current_stage, prev_stage)
|
||
# Then put it back to In Progress so the relaunched agent is clearly working.
|
||
from ..plane_sync import set_issue_in_progress
|
||
set_issue_in_progress(work_item_id)
|
||
from ..plane_sync import add_comment as _plane_comment, STAGE_AUTHORS
|
||
_plane_comment(
|
||
work_item_id,
|
||
f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}",
|
||
author=STAGE_AUTHORS.get(prev_stage, "stream"),
|
||
)
|
||
# Relaunch the previous stage's agent so the rollback actually re-runs work.
|
||
# STAGE_AUTHORS maps a stage directly to the role that OWNS work in it
|
||
# (analysis->analyst, architecture->architect, ...), which is exactly the
|
||
# agent we must re-run on a rollback into prev_stage.
|
||
from ..plane_sync import STAGE_AUTHORS as _STAGE_AUTHORS
|
||
prev_agent = _STAGE_AUTHORS.get(prev_stage)
|
||
if prev_agent:
|
||
task_desc = (
|
||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||
f"Stage: {prev_stage}\nNote: Stakeholder REJECTED. Reason: {reason}\n"
|
||
f"Revise and improve."
|
||
)
|
||
new_job = enqueue_job(prev_agent, repo, task_desc, task_id=task_id)
|
||
logger.info(
|
||
f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}, "
|
||
f"enqueued {prev_agent} (job_id={new_job})"
|
||
)
|
||
else:
|
||
logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}")
|
||
|
||
|
||
async def _try_advance_stage(
|
||
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str,
|
||
confirm_deploy: bool = False,
|
||
):
|
||
"""Thin async wrapper over the unified stage engine (ORCH-4 / M-3).
|
||
|
||
The QG dispatch (including the check_review_approved PR-by-branch logic) and
|
||
the advance/launch logic now live in src/stage_engine.advance_stage(), which
|
||
is synchronous. We run it off the event loop via asyncio.to_thread so there
|
||
is exactly one implementation shared with the launcher.
|
||
|
||
finished_agent is None on this webhook path (a human status change, not a
|
||
finished agent), so the agent-specific rollback branches inside the engine
|
||
intentionally do not trigger — the webhook path only runs the QG and either
|
||
advances or reports the failure.
|
||
|
||
ORCH-059: ``confirm_deploy`` is threaded through (keyword-only on
|
||
advance_stage). It is True ONLY on the "Confirm Deploy" path
|
||
(handle_confirm_deploy) and gates Phase B of the self-hosting prod deploy; the
|
||
plain Approved path (handle_verdict) leaves it at the default False.
|
||
"""
|
||
import asyncio
|
||
from ..stage_engine import advance_stage
|
||
|
||
await asyncio.to_thread(
|
||
advance_stage,
|
||
task_id,
|
||
current_stage,
|
||
repo,
|
||
work_item_id,
|
||
branch,
|
||
None,
|
||
confirm_deploy=confirm_deploy,
|
||
)
|
||
|
||
|
||
async def _create_gitea_branch(repo: str, branch: str):
|
||
"""Create a new branch in Gitea from main."""
|
||
owner = settings.gitea_owner
|
||
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/branches"
|
||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||
payload = {"new_branch_name": branch, "old_branch_name": "main"}
|
||
|
||
async with httpx.AsyncClient() as client:
|
||
resp = await client.post(url, json=payload, headers=headers, timeout=10)
|
||
if resp.status_code == 409:
|
||
logger.info(f"Branch '{branch}' already exists")
|
||
return
|
||
resp.raise_for_status()
|
||
logger.info(f"Created branch '{branch}' in {owner}/{repo}")
|
||
|
||
|
||
async def _create_initial_docs(repo: str, branch: str, work_item_id: str, name: str):
|
||
"""Create initial business request doc in the feature branch."""
|
||
owner = settings.gitea_owner
|
||
file_path = f"docs/work-items/{work_item_id}/00-business-request.md"
|
||
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}"
|
||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||
|
||
import base64
|
||
content = f"# Business Request: {name}\n\nWork Item ID: {work_item_id}\n\n## Description\n\nTBD\n"
|
||
encoded = base64.b64encode(content.encode()).decode()
|
||
|
||
payload = {
|
||
"message": f"docs: init {work_item_id} business request",
|
||
"content": encoded,
|
||
"branch": branch,
|
||
}
|
||
|
||
async with httpx.AsyncClient() as client:
|
||
resp = await client.post(url, json=payload, headers=headers, timeout=10)
|
||
if resp.status_code in (201, 422): # 422 = already exists
|
||
return
|
||
resp.raise_for_status()
|