ORCH-10 root cause: PLANE_STATES was a global dict hardcoding enduro-trails
UUIDs. The webhook comparison only
matched ET UUID (b873d9eb) and silently ignored the ORCH in_progress UUID
(e331bfb3), blocking pipeline start for all orchestrator-project tasks.
Changes:
- src/plane_sync.py:
* Rename PLANE_STATES -> _DEFAULT_STATES (enduro UUIDs kept as safe fallback).
* PLANE_STATES preserved as alias to _DEFAULT_STATES (backward compat).
* Add get_project_states(project_id) -> {logical_key: state_uuid}:
fetches Plane API GET /projects/<id>/states/, maps by state name,
caches per project_id, falls back to _DEFAULT_STATES on API failure.
* Add _STATES_CACHE: dict, reload_project_states(project_id=None).
* Add _PLANE_NAME_TO_KEY mapping and _STAGE_TO_STATE_KEY for clean lookup.
* Add stage_to_state(stage, project_id) using get_project_states().
* update_issue_state() uses stage_to_state() instead of STAGE_TO_STATE dict.
* set_issue_{needs_input,in_review,blocked,done,in_progress,stage_state}()
all resolve state UUID via get_project_states(project_id) instead of
the global PLANE_STATES dict.
- src/webhooks/plane.py:
* handle_issue_updated: import get_project_states, resolve proj_states per
incoming project_id, compare new_state against proj_states["in_progress"],
proj_states["approved"], proj_states["rejected"].
* start_pipeline QG-0 blocked path: use get_project_states(plane_project_id)
instead of PLANE_STATES["blocked"].
- tests/test_orch10_states.py: 23 new tests covering:
* get_project_states returns correct UUIDs for both ET and ORCH projects.
* API failure / empty response / None project_id -> _DEFAULT_STATES fallback.
* Caching and reload_project_states (per-project and full flush).
* stage_to_state() per-project resolution.
* Webhook in_progress triggers pipeline for BOTH b873d9eb (ET) and e331bfb3 (ORCH).
* Webhook approved/rejected routes correctly per project.
* PLANE_STATES alias and _DEFAULT_STATES backward compat.
698 lines
31 KiB
Python
698 lines
31 KiB
Python
"""Plane webhook handlers — full implementation."""
|
|
|
|
import hmac
|
|
import hashlib
|
|
import re
|
|
import json
|
|
import logging
|
|
import httpx
|
|
from fastapi import APIRouter, Request, HTTPException
|
|
|
|
from ..config import settings
|
|
from ..db import (
|
|
get_db,
|
|
get_task_by_plane_id,
|
|
get_next_work_item_id,
|
|
ensure_unique_work_item_id,
|
|
update_task_stage,
|
|
enqueue_job,
|
|
insert_event_dedup,
|
|
)
|
|
from ._dedup import plane_delivery_id
|
|
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
|
from ..qg.checks import QG_CHECKS
|
|
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
|
from ..agents.launcher import launcher
|
|
from ..plane_sync import (
|
|
notify_stage_change as plane_notify_stage,
|
|
notify_qg_failure as plane_notify_qg,
|
|
notify_done as plane_notify_done,
|
|
)
|
|
from ..projects import (
|
|
get_project_by_plane_id,
|
|
get_project_by_repo,
|
|
known_plane_project_ids,
|
|
)
|
|
|
|
logger = logging.getLogger("orchestrator.webhooks.plane")
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def verify_plane_signature(body: bytes, signature: str) -> bool:
|
|
"""Verify Plane webhook HMAC-SHA256 signature."""
|
|
if not settings.plane_webhook_secret:
|
|
return True # Skip verification if no secret configured
|
|
expected = hmac.new(
|
|
settings.plane_webhook_secret.encode(),
|
|
body,
|
|
hashlib.sha256,
|
|
).hexdigest()
|
|
return hmac.compare_digest(expected, signature)
|
|
|
|
|
|
@router.post("/plane")
|
|
async def plane_webhook(request: Request):
|
|
"""Handle Plane webhook events."""
|
|
body = await request.body()
|
|
|
|
# Verify HMAC signature
|
|
signature = request.headers.get("X-Plane-Signature", "")
|
|
if not verify_plane_signature(body, signature):
|
|
logger.warning("Plane webhook: invalid signature")
|
|
raise HTTPException(status_code=401, detail="Invalid signature")
|
|
|
|
payload = json.loads(body)
|
|
|
|
# ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the
|
|
# delivery_id falls back to sha256("plane" + body) (a retried identical body maps
|
|
# to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return
|
|
# {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6
|
|
# project filter, so a repeat does no extra work; the FIRST delivery of an unknown
|
|
# project still falls through to the filter below and returns {"status":"ignored"}.
|
|
event_type = payload.get("event", "unknown")
|
|
delivery_id = plane_delivery_id(request.headers, body)
|
|
inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id)
|
|
if not inserted:
|
|
logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch")
|
|
return {"status": "duplicate"}
|
|
|
|
event = payload.get("event")
|
|
action = payload.get("action", "")
|
|
data = payload.get("data", {})
|
|
|
|
# ORCH-6: filter by Plane project. Ignore issues from unknown/unconfigured
|
|
# projects so a webhook on the whole workspace cannot funnel everything into
|
|
# the default repo (root cause of the 2026-06-02 incident).
|
|
project_id = data.get("project") or data.get("project_id") or ""
|
|
if project_id not in known_plane_project_ids():
|
|
logger.info(
|
|
f"Plane webhook: ignoring event '{event}' from unknown project "
|
|
f"'{project_id}' (known: {len(known_plane_project_ids())})"
|
|
)
|
|
return {"status": "ignored", "reason": "unknown project"}
|
|
|
|
if (event == "work_item.created") or (event == "issue" and action == "created"):
|
|
# Feature 1: creation NO LONGER starts the pipeline. Slava keeps the
|
|
# backlog until he moves an issue to In Progress. We only run a soft
|
|
# QG-0 sanity log here (no branch, no analyst, no task row).
|
|
await handle_work_item_created(data, project_id)
|
|
elif (event == "work_item.updated") or (event == "issue" and action == "updated"):
|
|
# Status-only verdict model: status changes drive the pipeline.
|
|
# Backlog/Todo/Triage -> In Progress : START pipeline, or relaunch the
|
|
# stage agent if returned from
|
|
# Needs Input.
|
|
# -> Approved : advance to the next stage.
|
|
# -> Rejected : rollback (reason from latest comment).
|
|
await handle_issue_updated(data, project_id)
|
|
elif (event == "comment.created") or (event == "issue_comment" and action == "created"):
|
|
await handle_comment(data, project_id)
|
|
|
|
return {"status": "accepted"}
|
|
|
|
|
|
def _state_id(data: dict) -> str:
|
|
"""Extract the new Plane state UUID from an 'issue updated' payload.
|
|
|
|
Real payload (verified from prod events): data.state is
|
|
{id, name, color, group}. Some payloads carry state as a bare UUID string.
|
|
"""
|
|
state = data.get("state")
|
|
if isinstance(state, dict):
|
|
return state.get("id", "") or ""
|
|
if isinstance(state, str):
|
|
return state
|
|
return ""
|
|
|
|
|
|
async def handle_issue_updated(data: dict, project_id: str = ""):
|
|
"""Feature 1 & 2: react to a Plane issue status change.
|
|
|
|
Routes the NEW state UUID (data.state.id) to:
|
|
- in_progress : start the pipeline if this issue has no task yet; if a
|
|
task already exists and the stage agent is idle (returned from Needs
|
|
Input), relaunch the stage agent so it reads Slava's fresh comments.
|
|
- approved : advance to the next stage.
|
|
- rejected : rollback to the previous stage (reason from latest comment).
|
|
Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.)
|
|
is ignored here — those are statuses the orchestrator itself sets.
|
|
"""
|
|
from ..plane_sync import get_project_states
|
|
|
|
plane_id = str(data.get("id") or "")
|
|
new_state = _state_id(data)
|
|
if not plane_id or not new_state:
|
|
logger.info("issue updated without id/state, ignoring")
|
|
return
|
|
|
|
# ORCH-10: resolve expected state UUIDs per the incoming issue's project so
|
|
# both enduro (b873d9eb) and orchestrator (e331bfb3) In Progress trigger the
|
|
# pipeline. Using PLANE_STATES["in_progress"] here was the root-cause blocker.
|
|
proj_states = get_project_states(project_id)
|
|
if new_state == proj_states["in_progress"]:
|
|
await handle_status_start(data, project_id)
|
|
elif new_state == proj_states["approved"]:
|
|
await handle_verdict(data, project_id, approved=True)
|
|
elif new_state == proj_states["rejected"]:
|
|
await handle_verdict(data, project_id, approved=False)
|
|
else:
|
|
logger.info(f"issue {plane_id} updated to state {new_state[:8]}..., no pipeline action")
|
|
|
|
|
|
async def handle_status_start(data: dict, project_id: str = ""):
|
|
"""An issue moved into In Progress.
|
|
|
|
Two cases under the status-only verdict model:
|
|
|
|
1. No task yet for this plane_id -> START the pipeline (start_pipeline).
|
|
|
|
2. A task already exists -> this is Slava returning the issue from
|
|
Needs Input to In Progress after answering the analyst's questions. We
|
|
must RELAUNCH the current stage's agent so it reads the fresh comments
|
|
from Plane (the answer-to-questions flow used to live in handle_comment;
|
|
it is now status-driven).
|
|
|
|
KEY FORK — telling "answer to questions" apart from a plain duplicate In
|
|
Progress webhook (the dedup-protection case):
|
|
|
|
The tasks table stores no Plane status, and the issue.updated payload only
|
|
carries the NEW state (In Progress), so we cannot read the previous status
|
|
from here. Instead we use the only reliable local signal: whether the
|
|
stage's agent is currently in flight.
|
|
|
|
- The orchestrator sets In Progress itself while an agent runs. When the
|
|
agent FINISHES it leaves the issue in Needs Input or In Review and has
|
|
NO queued/running job. So: an existing task with NO active job means the
|
|
agent is idle / waiting -> a return to In Progress is a genuine relaunch
|
|
request -> enqueue the stage agent.
|
|
- If a queued/running job already exists for the task, the agent is busy
|
|
(or a duplicate webhook arrived) -> SKIP (no double launch). The events
|
|
de-dup at the top of plane_webhook already absorbs identical webhook
|
|
bodies; this job guard additionally covers distinct webhooks fired while
|
|
a job is still pending/running.
|
|
"""
|
|
from ..db import has_active_job_for_task
|
|
|
|
plane_id = str(data.get("id") or "")
|
|
existing = get_task_by_plane_id(plane_id)
|
|
|
|
if not existing:
|
|
logger.info(f"Status->In Progress for {plane_id}: starting pipeline")
|
|
await start_pipeline(data, project_id)
|
|
return
|
|
|
|
task_id = existing["id"]
|
|
current_stage = existing["stage"]
|
|
repo = existing["repo"]
|
|
work_item_id = existing.get("work_item_id", "")
|
|
branch = existing.get("branch", "")
|
|
|
|
# Duplicate / busy guard: a job is already pending or running for this task.
|
|
if has_active_job_for_task(task_id):
|
|
logger.info(
|
|
f"Status->In Progress for {plane_id}: task {task_id} already has an "
|
|
f"active job (stage={current_stage}), not relaunching"
|
|
)
|
|
return
|
|
|
|
# Agent is idle -> Slava answered questions and returned the issue to In
|
|
# Progress. Relaunch the current stage's agent to read the fresh comments.
|
|
from ..plane_sync import STAGE_AUTHORS, add_comment as _add_comment
|
|
stage_agent = STAGE_AUTHORS.get(current_stage)
|
|
if not stage_agent:
|
|
logger.info(
|
|
f"Status->In Progress for {plane_id}: no agent for stage "
|
|
f"'{current_stage}', not relaunching"
|
|
)
|
|
return
|
|
|
|
task_desc = (
|
|
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
|
f"Stage: {current_stage}\nNote: Stakeholder returned the issue to In "
|
|
f"Progress (answered your questions). Read the latest comments in Plane "
|
|
f"and revise your artifacts."
|
|
)
|
|
job_id = enqueue_job(stage_agent, repo, task_desc, task_id=task_id)
|
|
logger.info(
|
|
f"Task {task_id}: returned to In Progress (Needs Input answered), "
|
|
f"relaunched {stage_agent} for stage {current_stage} (job_id={job_id})"
|
|
)
|
|
try:
|
|
_add_comment(
|
|
work_item_id,
|
|
"\U0001f504 \u0410\u0433\u0435\u043d\u0442 \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.",
|
|
author=stage_agent,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to post relaunch comment for {work_item_id}: {e}")
|
|
|
|
|
|
async def handle_verdict(data: dict, project_id: str, approved: bool):
|
|
"""Status-only verdict: a Plane status change drives advance / rollback.
|
|
|
|
Approved status -> _try_advance_stage. We do NOT touch the issue status here:
|
|
_try_advance_stage -> advance_stage -> plane_notify_stage already PATCHes the
|
|
issue to the NEXT stage's status. The old set_issue_in_progress call reset
|
|
the status to In Progress first, which made the board flicker In Progress
|
|
before the next stage (part of bug 3); it is removed.
|
|
|
|
Rejected status -> rollback to the previous stage. The reason is pulled from
|
|
the issue's latest comment (Slava writes the reason in a comment before/with
|
|
flipping the status to Rejected).
|
|
"""
|
|
plane_id = str(data.get("id") or "")
|
|
task = get_task_by_plane_id(plane_id)
|
|
if not task:
|
|
logger.warning(f"Verdict status for {plane_id} but no task found, ignoring")
|
|
return
|
|
|
|
task_id = task["id"]
|
|
current_stage = task["stage"]
|
|
repo = task["repo"]
|
|
work_item_id = task.get("work_item_id", "")
|
|
branch = task.get("branch", "")
|
|
|
|
if approved:
|
|
# NOTE: no set_issue_in_progress here — _try_advance_stage sets the next
|
|
# stage's status itself (advance_stage -> plane_notify_stage).
|
|
logger.info(f"Task {task_id}: Approved status -> advance from {current_stage}")
|
|
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
|
|
return
|
|
|
|
# Rejected: pull the rejection reason from the issue's latest comment.
|
|
issue_id = task.get("plane_issue_id") or task.get("plane_id") or plane_id
|
|
reason = _latest_comment_reason(issue_id, repo, project_id)
|
|
await _rollback_stage(
|
|
task_id, current_stage, repo, work_item_id, branch, reason
|
|
)
|
|
|
|
|
|
def _latest_comment_reason(issue_id: str, repo: str, project_id: str = "") -> str:
|
|
"""Fetch the issue's most recent comment text (HTML stripped) as the reject
|
|
reason. Slava writes the reason in a comment before/with flipping the status
|
|
to Rejected.
|
|
|
|
Returns a fixed fallback when there is no comment / the API call fails.
|
|
"""
|
|
from ..plane_sync import (
|
|
PLANE_BASE,
|
|
PLANE_HEADERS,
|
|
WORKSPACE,
|
|
PROJECT_ID as _DEFAULT_PROJECT_ID,
|
|
)
|
|
fallback = "Rejected via status, no reason comment"
|
|
if not issue_id:
|
|
return fallback
|
|
_proj = get_project_by_repo(repo)
|
|
pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID)
|
|
url = (
|
|
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{pid}/issues/"
|
|
f"{issue_id}/comments/"
|
|
)
|
|
try:
|
|
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
|
if resp.status_code != 200:
|
|
logger.warning(
|
|
f"reject-reason: GET comments for {issue_id} returned "
|
|
f"{resp.status_code}"
|
|
)
|
|
return fallback
|
|
payload = resp.json()
|
|
comments = payload.get("results", payload) if isinstance(payload, dict) else payload
|
|
if not comments:
|
|
return fallback
|
|
latest = max(comments, key=lambda c: c.get("created_at", "") or "")
|
|
raw = (
|
|
latest.get("comment_stripped")
|
|
or latest.get("comment_html")
|
|
or latest.get("comment")
|
|
or ""
|
|
)
|
|
text = re.sub(r"<[^>]+>", "", raw).strip()
|
|
return text[:300] if text else fallback
|
|
except Exception as e:
|
|
logger.error(f"reject-reason: failed to fetch comments for {issue_id}: {e}")
|
|
return fallback
|
|
|
|
|
|
async def handle_work_item_created(data: dict, project_id: str = ""):
|
|
"""Feature 1: creation does NOT start the pipeline anymore.
|
|
|
|
The pipeline is started when Slava moves the issue into In Progress
|
|
(handle_status_start -> start_pipeline). On creation we only run a SOFT QG-0
|
|
sanity check and log the result — NO branch, NO docs, NO analyst, NO task row
|
|
— so the issue can sit in the backlog until Slava is ready.
|
|
"""
|
|
plane_id = data.get("id", "")
|
|
name = data.get("name", "untitled")
|
|
description = data.get("description_stripped", data.get("description", ""))
|
|
errors = _qg0_errors(name, description)
|
|
if errors:
|
|
logger.info(f"work_item.created {plane_id}: soft QG-0 warnings: {errors}")
|
|
else:
|
|
logger.info(f"work_item.created {plane_id} ('{name}'): in backlog, awaiting In Progress")
|
|
|
|
|
|
def _qg0_errors(name: str, description: str) -> list:
|
|
"""QG-0 validation: returns a list of human-readable problems (empty = OK)."""
|
|
errors = []
|
|
if not name or len(name) < 5:
|
|
errors.append("Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 5 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)")
|
|
if len(name) > 80:
|
|
errors.append("Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u0434\u043b\u0438\u043d\u043d\u044b\u0439 (\u043c\u0430\u043a\u0441\u0438\u043c\u0443\u043c 80 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)")
|
|
if not description or len(description.strip()) < 20:
|
|
errors.append("Description \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 20 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)")
|
|
|
|
return errors
|
|
|
|
|
|
async def start_pipeline(data: dict, project_id: str = ""):
|
|
"""Feature 1: start the pipeline for an issue (moved to In Progress).
|
|
|
|
This is the body extracted from the old handle_work_item_created: resolve the
|
|
project, run QG-0 (hard — blocks on failure), create the work item id +
|
|
branch + initial docs, insert the task row, and enqueue the analyst.
|
|
|
|
Callers (handle_status_start) already guarantee no existing task for this
|
|
plane_id, so this never duplicates.
|
|
"""
|
|
plane_id = data.get("id", "")
|
|
name = data.get("name", "untitled")
|
|
description = data.get("description_stripped", data.get("description", ""))
|
|
|
|
# ORCH-6: resolve repo / prefix / Plane project from the registry instead of
|
|
# the single hardcoded default_repo.
|
|
if not project_id:
|
|
project_id = data.get("project") or data.get("project_id") or ""
|
|
proj = get_project_by_plane_id(project_id)
|
|
if not proj:
|
|
logger.warning(f"start_pipeline: unknown project '{project_id}', ignoring {plane_id}")
|
|
return
|
|
repo = proj.repo
|
|
plane_project_id = proj.plane_project_id
|
|
|
|
# BUG 1 + BUG B: Plane's issue.updated webhook (status change -> In Progress)
|
|
# sends only the CHANGED fields, so BOTH description / description_stripped
|
|
# AND name are usually empty here even though the issue HAS them. Pull the
|
|
# full title + description from the Plane issue detail API in a SINGLE GET
|
|
# (fetch_issue_fields: same endpoint + shared token already used by
|
|
# fetch_issue_sequence_id) before QG-0 and before the branch slug is built.
|
|
# If the API is also empty, QG-0 legitimately fails (truly empty ticket) and
|
|
# name falls back to "untitled".
|
|
name_missing = (not name) or name.strip().lower() == "untitled" or len(name.strip()) < 3
|
|
desc_missing = (not description) or len(description.strip()) < 20
|
|
if name_missing or desc_missing:
|
|
from ..plane_sync import fetch_issue_fields
|
|
fetched_name, fetched_desc = fetch_issue_fields(plane_id, plane_project_id)
|
|
if desc_missing and fetched_desc and len(fetched_desc.strip()) >= len(description.strip()):
|
|
description = fetched_desc
|
|
logger.info(
|
|
f"start_pipeline: pulled description from Plane API for {plane_id} "
|
|
f"({len(description.strip())} chars)"
|
|
)
|
|
if name_missing and fetched_name and len(fetched_name.strip()) >= 3:
|
|
name = fetched_name
|
|
logger.info(
|
|
f"start_pipeline: pulled name from Plane API for {plane_id} "
|
|
f"('{name}')"
|
|
)
|
|
# BUG B fallback: if name is still empty/blank after the API pull, keep the
|
|
# legacy "untitled" so the slug/branch build never crashes on an empty name.
|
|
if not name or not name.strip():
|
|
name = "untitled"
|
|
|
|
# QG-0 validation (hard gate on pipeline start)
|
|
errors = _qg0_errors(name, description)
|
|
if errors:
|
|
# QG-0 failed
|
|
error_text = "\u26a0\ufe0f QG-0 failed:\n" + "\n".join(f"\u2022 {e}" for e in errors)
|
|
from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE, get_project_states
|
|
import httpx as _httpx
|
|
# Post comment (ORCH-6: route to the issue's own project)
|
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{plane_project_id}/issues/{plane_id}/comments/"
|
|
try:
|
|
_httpx.post(url, headers=PLANE_HEADERS,
|
|
json={"comment_html": f"<p>{error_text}</p>"}, timeout=10)
|
|
except Exception:
|
|
pass
|
|
# Set blocked — ORCH-10: resolve per-project UUID.
|
|
url2 = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{plane_project_id}/issues/{plane_id}/"
|
|
try:
|
|
_blocked = get_project_states(plane_project_id)["blocked"]
|
|
_httpx.patch(url2, headers=PLANE_HEADERS,
|
|
json={"state": _blocked}, timeout=10)
|
|
except Exception:
|
|
pass
|
|
logger.info(f"QG-0 failed for {plane_id}: {errors}")
|
|
return
|
|
|
|
# Generate work item ID.
|
|
# M-6: source of truth for the number is the Plane sequence_id. Fetch it by
|
|
# issue UUID; if Plane is unavailable, fall back to the DB increment so a
|
|
# Plane outage never blocks task creation (autonomy > exact numbering).
|
|
from ..plane_sync import fetch_issue_sequence_id
|
|
seq = fetch_issue_sequence_id(plane_id, plane_project_id)
|
|
if seq is not None:
|
|
work_item_id = f"{proj.work_item_prefix}-{seq:03d}"
|
|
else:
|
|
work_item_id = get_next_work_item_id(repo, proj.work_item_prefix)
|
|
logger.warning(
|
|
f"Plane sequence_id unavailable for {plane_id}, "
|
|
f"fell back to DB increment: {work_item_id}"
|
|
)
|
|
|
|
# BUG 2a: uniqueness-guard LAYERED ON TOP of the M-6 derive above (the derive
|
|
# itself is untouched). If the derived ET-NNN is already taken by another
|
|
# task in this repo (collision -> two tasks would share branch/worktree, see
|
|
# ET-006), bump to the next free number.
|
|
_derived = work_item_id
|
|
work_item_id = ensure_unique_work_item_id(work_item_id, repo)
|
|
if work_item_id != _derived:
|
|
logger.warning(
|
|
f"work_item_id collision: derived {_derived} already in use for "
|
|
f"{repo}; reassigned {plane_id} -> {work_item_id}"
|
|
)
|
|
|
|
# Create slug from name
|
|
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]
|
|
branch = f"feature/{work_item_id}-{slug}"
|
|
|
|
# BUG 2b (defense-in-depth): the worktree/path is keyed by BRANCH
|
|
# (git_worktree.get_worktree_path) and tasks are reverse-resolved by
|
|
# (repo, branch). With 2a the work_item_id is unique, so the branch prefix is
|
|
# too; but the slug could still collide (e.g. two issues with the same title
|
|
# under different ids -> fine) or, worse, an identical branch already exist.
|
|
# Guard physically: if this exact branch is already owned by another task in
|
|
# this repo, disambiguate with the (now unique) work_item_id so two tasks can
|
|
# never share a worktree.
|
|
_conn_b = get_db()
|
|
_branch_taken = _conn_b.execute(
|
|
"SELECT 1 FROM tasks WHERE repo = ? AND branch = ? LIMIT 1", (repo, branch)
|
|
).fetchone()
|
|
_conn_b.close()
|
|
if _branch_taken is not None:
|
|
branch = f"feature/{work_item_id}-{plane_id[:8]}"
|
|
logger.warning(
|
|
f"branch collision for {repo}; disambiguated to unique branch {branch}"
|
|
)
|
|
|
|
# Insert task into DB
|
|
conn = get_db()
|
|
conn.execute(
|
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(plane_id, work_item_id, repo, branch, "analysis", plane_id, name),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Create branch in Gitea
|
|
try:
|
|
await _create_gitea_branch(repo, branch)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create branch '{branch}': {e}")
|
|
# Task is created, branch creation failed — log but don't crash
|
|
notify_error(0, f"Branch creation failed: {e}")
|
|
return
|
|
|
|
# Create initial docs structure via Gitea API (create file)
|
|
try:
|
|
await _create_initial_docs(repo, branch, work_item_id, name)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create initial docs: {e}")
|
|
|
|
logger.info(f"Task created: {work_item_id} ({name}), branch={branch}, stage=analysis")
|
|
|
|
# Launch analyst agent
|
|
try:
|
|
task_row = get_db().execute("SELECT id FROM tasks WHERE work_item_id=?", (work_item_id,)).fetchone()
|
|
if task_row:
|
|
task_id = task_row[0]
|
|
task_desc = (
|
|
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
|
f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}"
|
|
)
|
|
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
|
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
|
|
# Post start comment to Plane
|
|
from ..plane_sync import add_comment as _add_comment
|
|
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
|
|
except Exception as e:
|
|
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")
|
|
|
|
|
|
async def handle_comment(data: dict, project_id: str = ""):
|
|
"""Status-only verdict model: comments NEVER drive the pipeline.
|
|
|
|
The whole comment-based control mechanism (``:approved:`` / ``:rejected:``
|
|
and the analysis answer-to-questions flow) was removed. It caused bug 3
|
|
(echo self-hit): the analyst posts its own "waiting for approval" comment,
|
|
handle_comment catches its own comment and reverts In Review -> In Progress.
|
|
|
|
Comments are now logged only — no status change, no enqueue, no side effect.
|
|
The pipeline is driven solely by status changes (handle_issue_updated):
|
|
- Approved -> advance
|
|
- Rejected -> rollback (reason pulled from the latest comment)
|
|
- In Progress (returned from Needs Input) -> relaunch the stage agent
|
|
"""
|
|
plane_id = str(
|
|
data.get("work_item_id") or data.get("issue_id") or data.get("issue") or ""
|
|
)
|
|
logger.info(
|
|
f"comment.created for {plane_id}: logged only, no pipeline action "
|
|
f"(status-only verdict model)"
|
|
)
|
|
|
|
|
|
async def _rollback_stage(
|
|
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str,
|
|
reason: str,
|
|
):
|
|
"""Rollback triggered by a status change to Rejected.
|
|
|
|
- at analysis: relaunch the analyst with the rejection reason;
|
|
- otherwise: roll back to the previous stage and relaunch its agent
|
|
(via the existing rollback notify + an enqueue of the prev-stage agent).
|
|
"""
|
|
if current_stage == "analysis":
|
|
# Already in analysis — just relaunch analyst with rejection reason
|
|
from ..plane_sync import set_issue_in_progress
|
|
set_issue_in_progress(work_item_id)
|
|
task_desc = (
|
|
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
|
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
|
|
f"Reason: {reason}\nRevise and improve."
|
|
)
|
|
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
|
from ..plane_sync import add_comment as _plane_comment
|
|
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}", author="analyst")
|
|
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
|
|
return
|
|
|
|
# Rollback to previous stage
|
|
prev_stage = get_previous_stage(current_stage)
|
|
if not prev_stage:
|
|
logger.info(f"Task {task_id}: rejected at {current_stage} but no previous stage")
|
|
return
|
|
update_task_stage(task_id, prev_stage)
|
|
notify_stage_change(task_id, current_stage, prev_stage)
|
|
# Feature 3: plane_notify_stage moves the board to the prev stage's status.
|
|
plane_notify_stage(work_item_id, current_stage, prev_stage)
|
|
# Then put it back to In Progress so the relaunched agent is clearly working.
|
|
from ..plane_sync import set_issue_in_progress
|
|
set_issue_in_progress(work_item_id)
|
|
from ..plane_sync import add_comment as _plane_comment, STAGE_AUTHORS
|
|
_plane_comment(
|
|
work_item_id,
|
|
f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}",
|
|
author=STAGE_AUTHORS.get(prev_stage, "stream"),
|
|
)
|
|
# Relaunch the previous stage's agent so the rollback actually re-runs work.
|
|
# STAGE_AUTHORS maps a stage directly to the role that OWNS work in it
|
|
# (analysis->analyst, architecture->architect, ...), which is exactly the
|
|
# agent we must re-run on a rollback into prev_stage.
|
|
from ..plane_sync import STAGE_AUTHORS as _STAGE_AUTHORS
|
|
prev_agent = _STAGE_AUTHORS.get(prev_stage)
|
|
if prev_agent:
|
|
task_desc = (
|
|
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
|
f"Stage: {prev_stage}\nNote: Stakeholder REJECTED. Reason: {reason}\n"
|
|
f"Revise and improve."
|
|
)
|
|
new_job = enqueue_job(prev_agent, repo, task_desc, task_id=task_id)
|
|
logger.info(
|
|
f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}, "
|
|
f"enqueued {prev_agent} (job_id={new_job})"
|
|
)
|
|
else:
|
|
logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}")
|
|
|
|
|
|
async def _try_advance_stage(
|
|
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str
|
|
):
|
|
"""Thin async wrapper over the unified stage engine (ORCH-4 / M-3).
|
|
|
|
The QG dispatch (including the check_review_approved PR-by-branch logic) and
|
|
the advance/launch logic now live in src/stage_engine.advance_stage(), which
|
|
is synchronous. We run it off the event loop via asyncio.to_thread so there
|
|
is exactly one implementation shared with the launcher.
|
|
|
|
finished_agent is None on this webhook path (a human Approved status change,
|
|
not a finished agent), so the agent-specific rollback branches inside the
|
|
engine intentionally do not trigger — the webhook path only runs the QG and
|
|
either advances or reports the failure.
|
|
"""
|
|
import asyncio
|
|
from ..stage_engine import advance_stage
|
|
|
|
await asyncio.to_thread(
|
|
advance_stage,
|
|
task_id,
|
|
current_stage,
|
|
repo,
|
|
work_item_id,
|
|
branch,
|
|
None,
|
|
)
|
|
|
|
|
|
async def _create_gitea_branch(repo: str, branch: str):
|
|
"""Create a new branch in Gitea from main."""
|
|
owner = settings.gitea_owner
|
|
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/branches"
|
|
headers = {"Authorization": f"token {settings.gitea_token}"}
|
|
payload = {"new_branch_name": branch, "old_branch_name": "main"}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(url, json=payload, headers=headers, timeout=10)
|
|
if resp.status_code == 409:
|
|
logger.info(f"Branch '{branch}' already exists")
|
|
return
|
|
resp.raise_for_status()
|
|
logger.info(f"Created branch '{branch}' in {owner}/{repo}")
|
|
|
|
|
|
async def _create_initial_docs(repo: str, branch: str, work_item_id: str, name: str):
|
|
"""Create initial business request doc in the feature branch."""
|
|
owner = settings.gitea_owner
|
|
file_path = f"docs/work-items/{work_item_id}/00-business-request.md"
|
|
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}"
|
|
headers = {"Authorization": f"token {settings.gitea_token}"}
|
|
|
|
import base64
|
|
content = f"# Business Request: {name}\n\nWork Item ID: {work_item_id}\n\n## Description\n\nTBD\n"
|
|
encoded = base64.b64encode(content.encode()).decode()
|
|
|
|
payload = {
|
|
"message": f"docs: init {work_item_id} business request",
|
|
"content": encoded,
|
|
"branch": branch,
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(url, json=payload, headers=headers, timeout=10)
|
|
if resp.status_code in (201, 422): # 422 = already exists
|
|
return
|
|
resp.raise_for_status()
|