"""Plane webhook handlers — full implementation.""" import hmac import hashlib import re import json import logging import httpx from fastapi import APIRouter, Request, HTTPException from ..config import settings from ..db import ( get_db, get_task_by_plane_id, get_next_work_item_id, update_task_stage, enqueue_job, ) from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage from ..qg.checks import QG_CHECKS from ..notifications import notify_stage_change, notify_qg_failure, notify_error from ..agents.launcher import launcher from ..plane_sync import ( notify_stage_change as plane_notify_stage, notify_qg_failure as plane_notify_qg, notify_done as plane_notify_done, ) from ..projects import ( get_project_by_plane_id, get_project_by_repo, known_plane_project_ids, ) logger = logging.getLogger("orchestrator.webhooks.plane") router = APIRouter() def verify_plane_signature(body: bytes, signature: str) -> bool: """Verify Plane webhook HMAC-SHA256 signature.""" if not settings.plane_webhook_secret: return True # Skip verification if no secret configured expected = hmac.new( settings.plane_webhook_secret.encode(), body, hashlib.sha256, ).hexdigest() return hmac.compare_digest(expected, signature) @router.post("/plane") async def plane_webhook(request: Request): """Handle Plane webhook events.""" body = await request.body() # Verify HMAC signature signature = request.headers.get("X-Plane-Signature", "") if not verify_plane_signature(body, signature): logger.warning("Plane webhook: invalid signature") raise HTTPException(status_code=401, detail="Invalid signature") payload = json.loads(body) # Log event conn = get_db() conn.execute( "INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)", ("plane", payload.get("event", "unknown"), body.decode()), ) conn.commit() conn.close() event = payload.get("event") action = payload.get("action", "") data = payload.get("data", {}) # ORCH-6: filter by Plane project. Ignore issues from unknown/unconfigured # projects so a webhook on the whole workspace cannot funnel everything into # the default repo (root cause of the 2026-06-02 incident). project_id = data.get("project") or data.get("project_id") or "" if project_id not in known_plane_project_ids(): logger.info( f"Plane webhook: ignoring event '{event}' from unknown project " f"'{project_id}' (known: {len(known_plane_project_ids())})" ) return {"status": "ignored", "reason": "unknown project"} if (event == "work_item.created") or (event == "issue" and action == "created"): await handle_work_item_created(data, project_id) elif (event == "comment.created") or (event == "issue_comment" and action == "created"): await handle_comment(data, project_id) return {"status": "accepted"} async def handle_work_item_created(data: dict, project_id: str = ""): """ New work item created in Plane. QG-0: validate title, description, priority. If valid: create branch, init docs, launch analyst. If invalid: comment with what's missing, set Blocked. """ plane_id = data.get("id", "") name = data.get("name", "untitled") description = data.get("description_stripped", data.get("description", "")) priority = data.get("priority", {}) priority_name = priority if isinstance(priority, str) else priority.get("name", "") # ORCH-6: resolve repo / prefix / Plane project from the registry instead of # the single hardcoded default_repo. if not project_id: project_id = data.get("project") or data.get("project_id") or "" proj = get_project_by_plane_id(project_id) if not proj: logger.warning(f"handle_work_item_created: unknown project '{project_id}', ignoring {plane_id}") return repo = proj.repo plane_project_id = proj.plane_project_id # QG-0 validation errors = [] if not name or len(name) < 5: errors.append("Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 5 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)") if len(name) > 80: errors.append("Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u0434\u043b\u0438\u043d\u043d\u044b\u0439 (\u043c\u0430\u043a\u0441\u0438\u043c\u0443\u043c 80 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)") if not description or len(description.strip()) < 20: errors.append("Description \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 20 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)") if errors: # QG-0 failed error_text = "\u26a0\ufe0f QG-0 failed:\n" + "\n".join(f"\u2022 {e}" for e in errors) from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE, PLANE_STATES import httpx as _httpx # Post comment (ORCH-6: route to the issue's own project) url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{plane_project_id}/issues/{plane_id}/comments/" try: _httpx.post(url, headers=PLANE_HEADERS, json={"comment_html": f"

{error_text}

"}, timeout=10) except Exception: pass # Set blocked url2 = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{plane_project_id}/issues/{plane_id}/" try: _httpx.patch(url2, headers=PLANE_HEADERS, json={"state": PLANE_STATES["blocked"]}, timeout=10) except Exception: pass logger.info(f"QG-0 failed for {plane_id}: {errors}") return # Generate work item ID work_item_id = get_next_work_item_id(repo, proj.work_item_prefix) # Create slug from name slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30] branch = f"feature/{work_item_id}-{slug}" # Insert task into DB conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) VALUES (?, ?, ?, ?, ?, ?)", (plane_id, work_item_id, repo, branch, "analysis", plane_id), ) conn.commit() conn.close() # Create branch in Gitea try: await _create_gitea_branch(repo, branch) except Exception as e: logger.error(f"Failed to create branch '{branch}': {e}") # Task is created, branch creation failed — log but don't crash notify_error(0, f"Branch creation failed: {e}") return # Create initial docs structure via Gitea API (create file) try: await _create_initial_docs(repo, branch, work_item_id, name) except Exception as e: logger.error(f"Failed to create initial docs: {e}") logger.info(f"Task created: {work_item_id} ({name}), branch={branch}, stage=analysis") # Launch analyst agent try: task_row = get_db().execute("SELECT id FROM tasks WHERE work_item_id=?", (work_item_id,)).fetchone() if task_row: task_id = task_row[0] task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}" job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id) logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})") # Post start comment to Plane from ..plane_sync import add_comment as _add_comment _add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).") except Exception as e: logger.error(f"Failed to launch analyst for {work_item_id}: {e}") async def handle_comment(data: dict, project_id: str = ""): """ Handle comment event — check for :approved: or :rejected:. Advance or rollback stage accordingly. """ comment_body = data.get("comment_stripped", data.get("comment", data.get("body", data.get("comment_html", "")))) plane_id = str(data.get("work_item_id") or data.get("issue_id") or data.get("issue") or "") if not plane_id: logger.warning("Comment event without work_item_id, skipping") return task = get_task_by_plane_id(plane_id) if not task: logger.warning(f"No task found for plane_id={plane_id}") return task_id = task["id"] current_stage = task["stage"] repo = task["repo"] work_item_id = task.get("work_item_id", "") branch = task.get("branch", "") if ":rejected:" in comment_body: # Extract reason (text after :rejected:) reason = comment_body.split(":rejected:", 1)[-1].strip()[:300] if current_stage == "analysis": # Already in analysis — just relaunch analyst with rejection reason from ..plane_sync import set_issue_in_progress set_issue_in_progress(work_item_id) task_desc = ( f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. " f"Reason: {reason}\nRevise and improve." ) new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) from ..plane_sync import add_comment as _plane_comment _plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}") logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})") else: # Rollback to previous stage prev_stage = get_previous_stage(current_stage) if prev_stage: update_task_stage(task_id, prev_stage) from ..plane_sync import set_issue_in_progress set_issue_in_progress(work_item_id) notify_stage_change(task_id, current_stage, prev_stage) plane_notify_stage(work_item_id, current_stage, prev_stage) from ..plane_sync import add_comment as _plane_comment _plane_comment(work_item_id, f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}") logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}") return if ":approved:" in comment_body: from ..plane_sync import set_issue_in_progress set_issue_in_progress(work_item_id) # Try to advance stage await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch) return # Task 3: If neither :approved: nor :rejected: — check if this is an answer to questions if current_stage == "analysis": from ..plane_sync import PLANE_STATES, set_issue_in_progress issue_id = task.get("plane_issue_id") or task.get("plane_id") if not issue_id: issue_id = plane_id if issue_id: from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE from ..plane_sync import PROJECT_ID as _DEFAULT_PROJECT_ID # ORCH-6: route to this task's own Plane project (resolved from repo). _proj = get_project_by_repo(repo) _pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID) import httpx as _httpx try: _resp = _httpx.get( f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/", headers=PLANE_HEADERS, timeout=10 ) if _resp.status_code == 200: issue_data = _resp.json() if issue_data.get("state") == PLANE_STATES["needs_input"]: # Task 11: Check analyst retry count (max 3 question rounds) conn3 = get_db() analyst_runs = conn3.execute( "SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='analyst'", (task_id,) ).fetchone()[0] conn3.close() if analyst_runs >= 4: # initial + 3 retries from ..plane_sync import set_issue_blocked, add_comment as _pc set_issue_blocked(work_item_id) _pc( work_item_id, "\U0001f6a8 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0439 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. Analyst \u043d\u0435 \u043c\u043e\u0436\u0435\u0442 \u0441\u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0422\u0417. " "\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430." ) from ..notifications import send_telegram send_telegram(f"\U0001f6a8 {work_item_id}: 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432 analyst'\u0430 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. \u041d\u0443\u0436\u043d\u0430 \u043f\u043e\u043c\u043e\u0449\u044c.") return # This is an answer to analyst's questions — relaunch set_issue_in_progress(work_item_id) task_desc = ( f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" f"Stage: analysis\nNote: Stakeholder answered your questions. " f"Read the latest comment in Plane and revise your artifacts.\n" f"Answer: {comment_body[:500]}" ) new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) from ..plane_sync import add_comment as _pc2 _pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.") logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})") return except Exception as e: logger.error(f"Failed to check issue state: {e}") async def _try_advance_stage( task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str ): """Thin async wrapper over the unified stage engine (ORCH-4 / M-3). The QG dispatch (including the check_review_approved PR-by-branch logic) and the advance/launch logic now live in src/stage_engine.advance_stage(), which is synchronous. We run it off the event loop via asyncio.to_thread so there is exactly one implementation shared with the launcher. finished_agent is None on this webhook path (a human :approved: comment, not a finished agent), so the agent-specific rollback branches inside the engine intentionally do not trigger — identical to the old plane behavior, which only ran the QG and either advanced or reported the failure. """ import asyncio from ..stage_engine import advance_stage await asyncio.to_thread( advance_stage, task_id, current_stage, repo, work_item_id, branch, None, ) async def _create_gitea_branch(repo: str, branch: str): """Create a new branch in Gitea from main.""" owner = settings.gitea_owner url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/branches" headers = {"Authorization": f"token {settings.gitea_token}"} payload = {"new_branch_name": branch, "old_branch_name": "main"} async with httpx.AsyncClient() as client: resp = await client.post(url, json=payload, headers=headers, timeout=10) if resp.status_code == 409: logger.info(f"Branch '{branch}' already exists") return resp.raise_for_status() logger.info(f"Created branch '{branch}' in {owner}/{repo}") async def _create_initial_docs(repo: str, branch: str, work_item_id: str, name: str): """Create initial business request doc in the feature branch.""" owner = settings.gitea_owner file_path = f"docs/work-items/{work_item_id}/00-business-request.md" url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}" headers = {"Authorization": f"token {settings.gitea_token}"} import base64 content = f"# Business Request: {name}\n\nWork Item ID: {work_item_id}\n\n## Description\n\nTBD\n" encoded = base64.b64encode(content.encode()).decode() payload = { "message": f"docs: init {work_item_id} business request", "content": encoded, "branch": branch, } async with httpx.AsyncClient() as client: resp = await client.post(url, json=payload, headers=headers, timeout=10) if resp.status_code in (201, 422): # 422 = already exists return resp.raise_for_status()