"""Plane webhook handlers — full implementation.""" import hmac import hashlib import re import json import logging import httpx from fastapi import APIRouter, Request, HTTPException from ..config import settings from ..db import ( get_db, get_task_by_plane_id, get_next_work_item_id, ensure_unique_work_item_id, update_task_stage, enqueue_job, insert_event_dedup, ) from ._dedup import plane_delivery_id from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage from ..qg.checks import QG_CHECKS from ..notifications import notify_stage_change, notify_qg_failure, notify_error from ..agents.launcher import launcher from ..plane_sync import ( notify_stage_change as plane_notify_stage, notify_qg_failure as plane_notify_qg, notify_done as plane_notify_done, ) from ..projects import ( get_project_by_plane_id, get_project_by_repo, known_plane_project_ids, ) logger = logging.getLogger("orchestrator.webhooks.plane") router = APIRouter() def verify_plane_signature(body: bytes, signature: str) -> bool: """Verify Plane webhook HMAC-SHA256 signature.""" if not settings.plane_webhook_secret: return True # Skip verification if no secret configured expected = hmac.new( settings.plane_webhook_secret.encode(), body, hashlib.sha256, ).hexdigest() return hmac.compare_digest(expected, signature) @router.post("/plane") async def plane_webhook(request: Request): """Handle Plane webhook events.""" body = await request.body() # Verify HMAC signature signature = request.headers.get("X-Plane-Signature", "") if not verify_plane_signature(body, signature): logger.warning("Plane webhook: invalid signature") raise HTTPException(status_code=401, detail="Invalid signature") payload = json.loads(body) # ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the # delivery_id falls back to sha256("plane" + body) (a retried identical body maps # to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return # {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6 # project filter, so a repeat does no extra work; the FIRST delivery of an unknown # project still falls through to the filter below and returns {"status":"ignored"}. event_type = payload.get("event", "unknown") delivery_id = plane_delivery_id(request.headers, body) inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id) if not inserted: logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch") return {"status": "duplicate"} event = payload.get("event") action = payload.get("action", "") data = payload.get("data", {}) # ORCH-6: filter by Plane project. Ignore issues from unknown/unconfigured # projects so a webhook on the whole workspace cannot funnel everything into # the default repo (root cause of the 2026-06-02 incident). project_id = data.get("project") or data.get("project_id") or "" if project_id not in known_plane_project_ids(): logger.info( f"Plane webhook: ignoring event '{event}' from unknown project " f"'{project_id}' (known: {len(known_plane_project_ids())})" ) return {"status": "ignored", "reason": "unknown project"} if (event == "work_item.created") or (event == "issue" and action == "created"): # Feature 1: creation NO LONGER starts the pipeline. Slava keeps the # backlog until he moves an issue to In Progress. We only run a soft # QG-0 sanity log here (no branch, no analyst, no task row). await handle_work_item_created(data, project_id) elif (event == "work_item.updated") or (event == "issue" and action == "updated"): # Status-only verdict model: status changes drive the pipeline. # Backlog/Todo/Triage -> In Progress : START pipeline, or relaunch the # stage agent if returned from # Needs Input. # -> Approved : advance to the next stage. # -> Rejected : rollback (reason from latest comment). await handle_issue_updated(data, project_id) elif (event == "comment.created") or (event == "issue_comment" and action == "created"): await handle_comment(data, project_id) return {"status": "accepted"} def _state_id(data: dict) -> str: """Extract the new Plane state UUID from an 'issue updated' payload. Real payload (verified from prod events): data.state is {id, name, color, group}. Some payloads carry state as a bare UUID string. """ state = data.get("state") if isinstance(state, dict): return state.get("id", "") or "" if isinstance(state, str): return state return "" async def handle_issue_updated(data: dict, project_id: str = ""): """Feature 1 & 2: react to a Plane issue status change. Routes the NEW state UUID (data.state.id) to: - in_progress : start the pipeline if this issue has no task yet; if a task already exists and the stage agent is idle (returned from Needs Input), relaunch the stage agent so it reads Slava's fresh comments. - approved : advance to the next stage. - rejected : rollback to the previous stage (reason from latest comment). Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.) is ignored here — those are statuses the orchestrator itself sets. """ from ..plane_sync import PLANE_STATES plane_id = str(data.get("id") or "") new_state = _state_id(data) if not plane_id or not new_state: logger.info("issue updated without id/state, ignoring") return if new_state == PLANE_STATES["in_progress"]: await handle_status_start(data, project_id) elif new_state == PLANE_STATES["approved"]: await handle_verdict(data, project_id, approved=True) elif new_state == PLANE_STATES["rejected"]: await handle_verdict(data, project_id, approved=False) else: logger.info(f"issue {plane_id} updated to state {new_state[:8]}..., no pipeline action") async def handle_status_start(data: dict, project_id: str = ""): """An issue moved into In Progress. Two cases under the status-only verdict model: 1. No task yet for this plane_id -> START the pipeline (start_pipeline). 2. A task already exists -> this is Slava returning the issue from Needs Input to In Progress after answering the analyst's questions. We must RELAUNCH the current stage's agent so it reads the fresh comments from Plane (the answer-to-questions flow used to live in handle_comment; it is now status-driven). KEY FORK — telling "answer to questions" apart from a plain duplicate In Progress webhook (the dedup-protection case): The tasks table stores no Plane status, and the issue.updated payload only carries the NEW state (In Progress), so we cannot read the previous status from here. Instead we use the only reliable local signal: whether the stage's agent is currently in flight. - The orchestrator sets In Progress itself while an agent runs. When the agent FINISHES it leaves the issue in Needs Input or In Review and has NO queued/running job. So: an existing task with NO active job means the agent is idle / waiting -> a return to In Progress is a genuine relaunch request -> enqueue the stage agent. - If a queued/running job already exists for the task, the agent is busy (or a duplicate webhook arrived) -> SKIP (no double launch). The events de-dup at the top of plane_webhook already absorbs identical webhook bodies; this job guard additionally covers distinct webhooks fired while a job is still pending/running. """ from ..db import has_active_job_for_task plane_id = str(data.get("id") or "") existing = get_task_by_plane_id(plane_id) if not existing: logger.info(f"Status->In Progress for {plane_id}: starting pipeline") await start_pipeline(data, project_id) return task_id = existing["id"] current_stage = existing["stage"] repo = existing["repo"] work_item_id = existing.get("work_item_id", "") branch = existing.get("branch", "") # Duplicate / busy guard: a job is already pending or running for this task. if has_active_job_for_task(task_id): logger.info( f"Status->In Progress for {plane_id}: task {task_id} already has an " f"active job (stage={current_stage}), not relaunching" ) return # Agent is idle -> Slava answered questions and returned the issue to In # Progress. Relaunch the current stage's agent to read the fresh comments. from ..plane_sync import STAGE_AUTHORS, add_comment as _add_comment stage_agent = STAGE_AUTHORS.get(current_stage) if not stage_agent: logger.info( f"Status->In Progress for {plane_id}: no agent for stage " f"'{current_stage}', not relaunching" ) return task_desc = ( f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" f"Stage: {current_stage}\nNote: Stakeholder returned the issue to In " f"Progress (answered your questions). Read the latest comments in Plane " f"and revise your artifacts." ) job_id = enqueue_job(stage_agent, repo, task_desc, task_id=task_id) logger.info( f"Task {task_id}: returned to In Progress (Needs Input answered), " f"relaunched {stage_agent} for stage {current_stage} (job_id={job_id})" ) try: _add_comment( work_item_id, "\U0001f504 \u0410\u0433\u0435\u043d\u0442 \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.", author=stage_agent, ) except Exception as e: logger.error(f"Failed to post relaunch comment for {work_item_id}: {e}") async def handle_verdict(data: dict, project_id: str, approved: bool): """Status-only verdict: a Plane status change drives advance / rollback. Approved status -> _try_advance_stage. We do NOT touch the issue status here: _try_advance_stage -> advance_stage -> plane_notify_stage already PATCHes the issue to the NEXT stage's status. The old set_issue_in_progress call reset the status to In Progress first, which made the board flicker In Progress before the next stage (part of bug 3); it is removed. Rejected status -> rollback to the previous stage. The reason is pulled from the issue's latest comment (Slava writes the reason in a comment before/with flipping the status to Rejected). """ plane_id = str(data.get("id") or "") task = get_task_by_plane_id(plane_id) if not task: logger.warning(f"Verdict status for {plane_id} but no task found, ignoring") return task_id = task["id"] current_stage = task["stage"] repo = task["repo"] work_item_id = task.get("work_item_id", "") branch = task.get("branch", "") if approved: # NOTE: no set_issue_in_progress here — _try_advance_stage sets the next # stage's status itself (advance_stage -> plane_notify_stage). logger.info(f"Task {task_id}: Approved status -> advance from {current_stage}") await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch) return # Rejected: pull the rejection reason from the issue's latest comment. issue_id = task.get("plane_issue_id") or task.get("plane_id") or plane_id reason = _latest_comment_reason(issue_id, repo, project_id) await _rollback_stage( task_id, current_stage, repo, work_item_id, branch, reason ) def _latest_comment_reason(issue_id: str, repo: str, project_id: str = "") -> str: """Fetch the issue's most recent comment text (HTML stripped) as the reject reason. Slava writes the reason in a comment before/with flipping the status to Rejected. Returns a fixed fallback when there is no comment / the API call fails. """ from ..plane_sync import ( PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID as _DEFAULT_PROJECT_ID, ) fallback = "Rejected via status, no reason comment" if not issue_id: return fallback _proj = get_project_by_repo(repo) pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID) url = ( f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{pid}/issues/" f"{issue_id}/comments/" ) try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) if resp.status_code != 200: logger.warning( f"reject-reason: GET comments for {issue_id} returned " f"{resp.status_code}" ) return fallback payload = resp.json() comments = payload.get("results", payload) if isinstance(payload, dict) else payload if not comments: return fallback latest = max(comments, key=lambda c: c.get("created_at", "") or "") raw = ( latest.get("comment_stripped") or latest.get("comment_html") or latest.get("comment") or "" ) text = re.sub(r"<[^>]+>", "", raw).strip() return text[:300] if text else fallback except Exception as e: logger.error(f"reject-reason: failed to fetch comments for {issue_id}: {e}") return fallback async def handle_work_item_created(data: dict, project_id: str = ""): """Feature 1: creation does NOT start the pipeline anymore. The pipeline is started when Slava moves the issue into In Progress (handle_status_start -> start_pipeline). On creation we only run a SOFT QG-0 sanity check and log the result — NO branch, NO docs, NO analyst, NO task row — so the issue can sit in the backlog until Slava is ready. """ plane_id = data.get("id", "") name = data.get("name", "untitled") description = data.get("description_stripped", data.get("description", "")) errors = _qg0_errors(name, description) if errors: logger.info(f"work_item.created {plane_id}: soft QG-0 warnings: {errors}") else: logger.info(f"work_item.created {plane_id} ('{name}'): in backlog, awaiting In Progress") def _qg0_errors(name: str, description: str) -> list: """QG-0 validation: returns a list of human-readable problems (empty = OK).""" errors = [] if not name or len(name) < 5: errors.append("Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 5 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)") if len(name) > 80: errors.append("Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u0434\u043b\u0438\u043d\u043d\u044b\u0439 (\u043c\u0430\u043a\u0441\u0438\u043c\u0443\u043c 80 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)") if not description or len(description.strip()) < 20: errors.append("Description \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 20 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)") return errors async def start_pipeline(data: dict, project_id: str = ""): """Feature 1: start the pipeline for an issue (moved to In Progress). This is the body extracted from the old handle_work_item_created: resolve the project, run QG-0 (hard — blocks on failure), create the work item id + branch + initial docs, insert the task row, and enqueue the analyst. Callers (handle_status_start) already guarantee no existing task for this plane_id, so this never duplicates. """ plane_id = data.get("id", "") name = data.get("name", "untitled") description = data.get("description_stripped", data.get("description", "")) # ORCH-6: resolve repo / prefix / Plane project from the registry instead of # the single hardcoded default_repo. if not project_id: project_id = data.get("project") or data.get("project_id") or "" proj = get_project_by_plane_id(project_id) if not proj: logger.warning(f"start_pipeline: unknown project '{project_id}', ignoring {plane_id}") return repo = proj.repo plane_project_id = proj.plane_project_id # BUG 1 + BUG B: Plane's issue.updated webhook (status change -> In Progress) # sends only the CHANGED fields, so BOTH description / description_stripped # AND name are usually empty here even though the issue HAS them. Pull the # full title + description from the Plane issue detail API in a SINGLE GET # (fetch_issue_fields: same endpoint + shared token already used by # fetch_issue_sequence_id) before QG-0 and before the branch slug is built. # If the API is also empty, QG-0 legitimately fails (truly empty ticket) and # name falls back to "untitled". name_missing = (not name) or name.strip().lower() == "untitled" or len(name.strip()) < 3 desc_missing = (not description) or len(description.strip()) < 20 if name_missing or desc_missing: from ..plane_sync import fetch_issue_fields fetched_name, fetched_desc = fetch_issue_fields(plane_id, plane_project_id) if desc_missing and fetched_desc and len(fetched_desc.strip()) >= len(description.strip()): description = fetched_desc logger.info( f"start_pipeline: pulled description from Plane API for {plane_id} " f"({len(description.strip())} chars)" ) if name_missing and fetched_name and len(fetched_name.strip()) >= 3: name = fetched_name logger.info( f"start_pipeline: pulled name from Plane API for {plane_id} " f"('{name}')" ) # BUG B fallback: if name is still empty/blank after the API pull, keep the # legacy "untitled" so the slug/branch build never crashes on an empty name. if not name or not name.strip(): name = "untitled" # QG-0 validation (hard gate on pipeline start) errors = _qg0_errors(name, description) if errors: # QG-0 failed error_text = "\u26a0\ufe0f QG-0 failed:\n" + "\n".join(f"\u2022 {e}" for e in errors) from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE, PLANE_STATES import httpx as _httpx # Post comment (ORCH-6: route to the issue's own project) url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{plane_project_id}/issues/{plane_id}/comments/" try: _httpx.post(url, headers=PLANE_HEADERS, json={"comment_html": f"
{error_text}
"}, timeout=10) except Exception: pass # Set blocked url2 = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{plane_project_id}/issues/{plane_id}/" try: _httpx.patch(url2, headers=PLANE_HEADERS, json={"state": PLANE_STATES["blocked"]}, timeout=10) except Exception: pass logger.info(f"QG-0 failed for {plane_id}: {errors}") return # Generate work item ID. # M-6: source of truth for the number is the Plane sequence_id. Fetch it by # issue UUID; if Plane is unavailable, fall back to the DB increment so a # Plane outage never blocks task creation (autonomy > exact numbering). from ..plane_sync import fetch_issue_sequence_id seq = fetch_issue_sequence_id(plane_id, plane_project_id) if seq is not None: work_item_id = f"{proj.work_item_prefix}-{seq:03d}" else: work_item_id = get_next_work_item_id(repo, proj.work_item_prefix) logger.warning( f"Plane sequence_id unavailable for {plane_id}, " f"fell back to DB increment: {work_item_id}" ) # BUG 2a: uniqueness-guard LAYERED ON TOP of the M-6 derive above (the derive # itself is untouched). If the derived ET-NNN is already taken by another # task in this repo (collision -> two tasks would share branch/worktree, see # ET-006), bump to the next free number. _derived = work_item_id work_item_id = ensure_unique_work_item_id(work_item_id, repo) if work_item_id != _derived: logger.warning( f"work_item_id collision: derived {_derived} already in use for " f"{repo}; reassigned {plane_id} -> {work_item_id}" ) # Create slug from name slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30] branch = f"feature/{work_item_id}-{slug}" # BUG 2b (defense-in-depth): the worktree/path is keyed by BRANCH # (git_worktree.get_worktree_path) and tasks are reverse-resolved by # (repo, branch). With 2a the work_item_id is unique, so the branch prefix is # too; but the slug could still collide (e.g. two issues with the same title # under different ids -> fine) or, worse, an identical branch already exist. # Guard physically: if this exact branch is already owned by another task in # this repo, disambiguate with the (now unique) work_item_id so two tasks can # never share a worktree. _conn_b = get_db() _branch_taken = _conn_b.execute( "SELECT 1 FROM tasks WHERE repo = ? AND branch = ? LIMIT 1", (repo, branch) ).fetchone() _conn_b.close() if _branch_taken is not None: branch = f"feature/{work_item_id}-{plane_id[:8]}" logger.warning( f"branch collision for {repo}; disambiguated to unique branch {branch}" ) # Insert task into DB conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) VALUES (?, ?, ?, ?, ?, ?)", (plane_id, work_item_id, repo, branch, "analysis", plane_id), ) conn.commit() conn.close() # Create branch in Gitea try: await _create_gitea_branch(repo, branch) except Exception as e: logger.error(f"Failed to create branch '{branch}': {e}") # Task is created, branch creation failed — log but don't crash notify_error(0, f"Branch creation failed: {e}") return # Create initial docs structure via Gitea API (create file) try: await _create_initial_docs(repo, branch, work_item_id, name) except Exception as e: logger.error(f"Failed to create initial docs: {e}") logger.info(f"Task created: {work_item_id} ({name}), branch={branch}, stage=analysis") # Launch analyst agent try: task_row = get_db().execute("SELECT id FROM tasks WHERE work_item_id=?", (work_item_id,)).fetchone() if task_row: task_id = task_row[0] task_desc = ( f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}" ) job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id) logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})") # Post start comment to Plane from ..plane_sync import add_comment as _add_comment _add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst") except Exception as e: logger.error(f"Failed to launch analyst for {work_item_id}: {e}") async def handle_comment(data: dict, project_id: str = ""): """Status-only verdict model: comments NEVER drive the pipeline. The whole comment-based control mechanism (``:approved:`` / ``:rejected:`` and the analysis answer-to-questions flow) was removed. It caused bug 3 (echo self-hit): the analyst posts its own "waiting for approval" comment, handle_comment catches its own comment and reverts In Review -> In Progress. Comments are now logged only — no status change, no enqueue, no side effect. The pipeline is driven solely by status changes (handle_issue_updated): - Approved -> advance - Rejected -> rollback (reason pulled from the latest comment) - In Progress (returned from Needs Input) -> relaunch the stage agent """ plane_id = str( data.get("work_item_id") or data.get("issue_id") or data.get("issue") or "" ) logger.info( f"comment.created for {plane_id}: logged only, no pipeline action " f"(status-only verdict model)" ) async def _rollback_stage( task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str, reason: str, ): """Rollback triggered by a status change to Rejected. - at analysis: relaunch the analyst with the rejection reason; - otherwise: roll back to the previous stage and relaunch its agent (via the existing rollback notify + an enqueue of the prev-stage agent). """ if current_stage == "analysis": # Already in analysis — just relaunch analyst with rejection reason from ..plane_sync import set_issue_in_progress set_issue_in_progress(work_item_id) task_desc = ( f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. " f"Reason: {reason}\nRevise and improve." ) new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) from ..plane_sync import add_comment as _plane_comment _plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}", author="analyst") logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})") return # Rollback to previous stage prev_stage = get_previous_stage(current_stage) if not prev_stage: logger.info(f"Task {task_id}: rejected at {current_stage} but no previous stage") return update_task_stage(task_id, prev_stage) notify_stage_change(task_id, current_stage, prev_stage) # Feature 3: plane_notify_stage moves the board to the prev stage's status. plane_notify_stage(work_item_id, current_stage, prev_stage) # Then put it back to In Progress so the relaunched agent is clearly working. from ..plane_sync import set_issue_in_progress set_issue_in_progress(work_item_id) from ..plane_sync import add_comment as _plane_comment, STAGE_AUTHORS _plane_comment( work_item_id, f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}", author=STAGE_AUTHORS.get(prev_stage, "stream"), ) # Relaunch the previous stage's agent so the rollback actually re-runs work. # STAGE_AUTHORS maps a stage directly to the role that OWNS work in it # (analysis->analyst, architecture->architect, ...), which is exactly the # agent we must re-run on a rollback into prev_stage. from ..plane_sync import STAGE_AUTHORS as _STAGE_AUTHORS prev_agent = _STAGE_AUTHORS.get(prev_stage) if prev_agent: task_desc = ( f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" f"Stage: {prev_stage}\nNote: Stakeholder REJECTED. Reason: {reason}\n" f"Revise and improve." ) new_job = enqueue_job(prev_agent, repo, task_desc, task_id=task_id) logger.info( f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}, " f"enqueued {prev_agent} (job_id={new_job})" ) else: logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}") async def _try_advance_stage( task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str ): """Thin async wrapper over the unified stage engine (ORCH-4 / M-3). The QG dispatch (including the check_review_approved PR-by-branch logic) and the advance/launch logic now live in src/stage_engine.advance_stage(), which is synchronous. We run it off the event loop via asyncio.to_thread so there is exactly one implementation shared with the launcher. finished_agent is None on this webhook path (a human Approved status change, not a finished agent), so the agent-specific rollback branches inside the engine intentionally do not trigger — the webhook path only runs the QG and either advances or reports the failure. """ import asyncio from ..stage_engine import advance_stage await asyncio.to_thread( advance_stage, task_id, current_stage, repo, work_item_id, branch, None, ) async def _create_gitea_branch(repo: str, branch: str): """Create a new branch in Gitea from main.""" owner = settings.gitea_owner url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/branches" headers = {"Authorization": f"token {settings.gitea_token}"} payload = {"new_branch_name": branch, "old_branch_name": "main"} async with httpx.AsyncClient() as client: resp = await client.post(url, json=payload, headers=headers, timeout=10) if resp.status_code == 409: logger.info(f"Branch '{branch}' already exists") return resp.raise_for_status() logger.info(f"Created branch '{branch}' in {owner}/{repo}") async def _create_initial_docs(repo: str, branch: str, work_item_id: str, name: str): """Create initial business request doc in the feature branch.""" owner = settings.gitea_owner file_path = f"docs/work-items/{work_item_id}/00-business-request.md" url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}" headers = {"Authorization": f"token {settings.gitea_token}"} import base64 content = f"# Business Request: {name}\n\nWork Item ID: {work_item_id}\n\n## Description\n\nTBD\n" encoded = base64.b64encode(content.encode()).decode() payload = { "message": f"docs: init {work_item_id} business request", "content": encoded, "branch": branch, } async with httpx.AsyncClient() as client: resp = await client.post(url, json=payload, headers=headers, timeout=10) if resp.status_code in (201, 422): # 422 = already exists return resp.raise_for_status()