"""Plane API sync — update issue state and add comments.""" import logging import httpx from .config import settings logger = logging.getLogger("orchestrator.plane_sync") # L-3: emoji literals used in Plane comment bodies, named for readability. # Message text stays byte-for-byte identical to the previous output. EMOJI_STAGE = "\U0001F504" # stage transition EMOJI_QG_FAIL = "\u26A0\uFE0F" # quality-gate failure EMOJI_DONE = "\u2705" # task completed PLANE_BASE = f"{settings.plane_api_url}/api/v1" PLANE_HEADERS = {"X-API-Key": settings.plane_api_token} WORKSPACE = settings.plane_workspace_slug # feat(plane): per-agent comment authorship. # Map an agent role -> its dedicated Plane bot token (read from config / env). # When the token is present, add_comment() POSTs under that bot so Plane shows # the real author. Empty/unknown role -> fallback to the shared orchestrator # token (PLANE_HEADERS), so commenting stays autonomous. PLANE_BOT_TOKENS = { "analyst": settings.plane_bot_analyst, "architect": settings.plane_bot_architect, "developer": settings.plane_bot_developer, "reviewer": settings.plane_bot_reviewer, "tester": settings.plane_bot_tester, "deployer": settings.plane_bot_deployer, "stream": settings.plane_bot_stream, } # Map a pipeline stage -> the agent role that owns work in that stage. Used to # pick an author for rollback/stage notifications targeting a specific stage. STAGE_AUTHORS = { "analysis": "analyst", "architecture": "architect", "development": "developer", "review": "reviewer", "testing": "tester", "deploy": "deployer", } def _headers_for(author: str | None) -> dict: """Return X-API-Key headers for the given agent role. Falls back to the shared orchestrator token (PLANE_HEADERS / settings.plane_api_token) when the role is None, unknown, or its bot token is not configured. This keeps comment posting autonomous: a comment is always written, just attributed to the orchestrator if no bot is set. """ tok = PLANE_BOT_TOKENS.get(author or "") if author else None return {"X-API-Key": tok} if tok else PLANE_HEADERS PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c" def _resolve_project_id(work_item_id: str = None, project_id: str = None) -> str: """ORCH-6: resolve the Plane project id for a sync call. Priority: 1. explicit project_id arg (caller already knows the project), 2. project derived from the task's repo in the DB (by work_item_id), 3. legacy default PROJECT_ID (enduro) for backward compatibility. """ if project_id: return project_id if work_item_id: try: from .db import get_db from .projects import get_project_by_repo conn = get_db() row = conn.execute( "SELECT repo FROM tasks WHERE work_item_id = ? ORDER BY id DESC LIMIT 1", (work_item_id,), ).fetchone() conn.close() if row and row[0]: proj = get_project_by_repo(row[0]) if proj: return proj.plane_project_id except Exception as e: logger.debug(f"_resolve_project_id fallback for {work_item_id}: {e}") return PROJECT_ID # Plane state IDs. # TODO(ORCH-10): these UUIDs are PER-PROJECT. The 6 stage-visibility / verdict # statuses below were created only in the enduro project (7a79f0a9-...). One # project is in prod today, so a single global dict is acceptable. When more # projects are onboarded these must be resolved per project (see ORCH-10 in # BACKLOG.md / the ORCH-6 project registry) — do NOT hardcode globally then. PLANE_STATES = { "backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122", "todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10", "in_progress": "b873d9eb-993c-48cd-97ac-99a9b1623967", "needs_input": "babf08a3-ff4d-41f3-a821-5491aa29a8ac", "in_review": "38fb1f64-aa1e-48a3-92e0-0b109679046b", "blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920", "done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", "cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17", # Feature 3 (stage visibility) — per-stage statuses on the board. "architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d", "development": "9920609b-f140-4e46-ab95-89acda8412c8", "review": "ba0d802c-5218-41d4-ab43-978b0ea123ed", "testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02", # Feature 2 (verdict statuses) — Approved / Rejected. "approved": "a519a341-dada-4a91-8910-7604f82b79c5", "rejected": "ba958f3c-5db5-461d-8f82-89425e413b97", } # Feature 3: map an orchestrator stage -> the Plane status to show on the board # when the pipeline ENTERS that stage. analysis stays driven by the existing # in_progress/in_review/needs_input logic (no dedicated status). deploy keeps # in_progress until done. Needs Input / In Review / Blocked remain higher # priority and are set explicitly elsewhere — do NOT override them from here. STAGE_VISIBILITY_STATE = { "architecture": "architecture", "development": "development", "review": "review", "testing": "testing", } # Map orchestrator stages to Plane states (used by update_issue_state / # notify_stage_change). Feature 3: architecture/development/review/testing now # point at their dedicated board statuses so the task physically moves across # columns. analysis -> in_progress, deploy -> in_progress, done -> done. STAGE_TO_STATE = { "created": PLANE_STATES["todo"], "analysis": PLANE_STATES["in_progress"], "architecture": PLANE_STATES["architecture"], "development": PLANE_STATES["development"], "review": PLANE_STATES["review"], "testing": PLANE_STATES["testing"], "deploy": PLANE_STATES["in_progress"], "done": PLANE_STATES["done"], } def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None: """M-6: GET the Plane issue by UUID and return its sequence_id (the authoritative per-project number), or None if unavailable. Returns None on network error, non-2xx, or a missing field - never raises, so the webhook handler can fall back to DB increment and stay autonomous. """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() seq = resp.json().get("sequence_id") return int(seq) if seq is not None else None except Exception as e: logger.warning(f"fetch_issue_sequence_id failed for {issue_id}: {e}") return None import re as _re def _strip_html(html: str) -> str: """Crude HTML -> text: drop tags and collapse whitespace. Good enough to feed QG-0's length check when Plane only gives us description_html.""" if not html: return "" text = _re.sub(r"<[^>]+>", " ", html) return _re.sub(r"\s+", " ", text).strip() def fetch_issue_description(issue_id: str, project_id: str) -> str: """BUG 1: GET the Plane issue by UUID and return its description text. Plane's ``issue.updated`` webhook (e.g. a status change) only carries the CHANGED fields, so ``description``/``description_stripped`` are usually absent there. start_pipeline calls this to pull the full description from the issue detail endpoint so QG-0 does not blow up on an empty payload field. Reuses the exact GET issue detail endpoint / shared token already used by ``fetch_issue_sequence_id`` (same URL, same PLANE_HEADERS). Prefers ``description_stripped``; falls back to stripping ``description_html``. Returns "" on network error, non-2xx, or a missing field - never raises, so a Plane outage degrades to the honest "empty description" QG-0 path instead of crashing the webhook. """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() body = resp.json() desc = body.get("description_stripped") if desc and desc.strip(): return desc return _strip_html(body.get("description_html") or "") except Exception as e: logger.warning(f"fetch_issue_description failed for {issue_id}: {e}") return "" def fetch_issue_fields(issue_id: str, project_id: str) -> tuple[str, str]: """BUG B: GET the Plane issue by UUID ONCE and return (name, description). Plane's ``issue.updated`` webhook (e.g. a status change) only carries the CHANGED fields, so BOTH ``name`` and ``description`` are usually absent in the payload. start_pipeline needs the real title (for the branch slug) and the real description (for the analyst .task.md). To avoid issuing two separate issue-detail GETs (one for name, one for description), this single request returns both. Reuses the exact GET issue detail endpoint / shared token already used by ``fetch_issue_sequence_id`` / ``fetch_issue_description``. For the description it applies the same logic as ``fetch_issue_description`` (prefer ``description_stripped``, fall back to stripping ``description_html``). Returns ("", "") on network error, non-2xx, or missing body - never raises, so a Plane outage degrades gracefully (caller keeps its payload fallbacks). """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() body = resp.json() name = (body.get("name") or "").strip() desc = body.get("description_stripped") if desc and desc.strip(): description = desc else: description = _strip_html(body.get("description_html") or "") return name, description except Exception as e: logger.warning(f"fetch_issue_fields failed for {issue_id}: {e}") return "", "" def find_issue_id(work_item_id: str, project_id: str = None) -> str | None: """Find Plane issue UUID by work_item_id (e.g. 'ET-002').""" project_id = _resolve_project_id(work_item_id, project_id) # Primary: lookup from DB (plane_issue_id column) try: from .db import get_db conn = get_db() row = conn.execute( "SELECT plane_issue_id FROM tasks WHERE work_item_id = ? AND plane_issue_id IS NOT NULL", (work_item_id,) ).fetchone() if row and row[0]: return row[0] except Exception as e: logger.debug(f"DB lookup failed for {work_item_id}: {e}") # Fallback: search via Plane API url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/" try: # First try search by work_item_id resp = httpx.get(url, headers=PLANE_HEADERS, params={"search": work_item_id}, timeout=10) resp.raise_for_status() data = resp.json() results = data.get("results", data if isinstance(data, list) else []) # M-6: match by sequence_id directly (the authoritative per-project # number), parsed from the work_item_id suffix - no hardcoded prefix. try: target_num = int(work_item_id.rsplit("-", 1)[1]) except (IndexError, ValueError): target_num = None for issue in results: if target_num is not None and issue.get("sequence_id") == target_num: return issue["id"] if work_item_id in issue.get("name", ""): return issue["id"] # Fallback: get all issues and match by sequence_id number (any prefix) if target_num is not None: resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp2.raise_for_status() data2 = resp2.json() results2 = data2.get("results", data2 if isinstance(data2, list) else []) for issue in results2: if issue.get("sequence_id") == target_num: return issue["id"] except Exception as e: logger.error(f"Failed to find issue for {work_item_id}: {e}") return None def update_issue_state(work_item_id: str, stage: str, project_id: str = None): """Update Plane issue state based on orchestrator stage.""" state_id = STAGE_TO_STATE.get(stage) if not state_id: return project_id = _resolve_project_id(work_item_id, project_id) issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10) resp.raise_for_status() logger.info(f"Plane: {work_item_id} state -> {stage} ({state_id[:8]}...)") except Exception as e: logger.error(f"Failed to update Plane state for {work_item_id}: {e}") def add_comment(work_item_id: str, text: str, project_id: str = None, author: str = None): """Add a comment to a Plane issue. feat(plane): when ``author`` (an agent role) maps to a configured bot token, the comment is POSTed under that bot so Plane shows the real author. Otherwise it falls back to the shared orchestrator token (see ``_headers_for``). GET/PATCH calls elsewhere keep using PLANE_HEADERS. """ project_id = _resolve_project_id(work_item_id, project_id) issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}, skipping comment") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/" html = f"

{text}

" try: resp = httpx.post(url, headers=_headers_for(author), json={"comment_html": html}, timeout=10) resp.raise_for_status() logger.info(f"Plane: comment added to {work_item_id} (author={author or 'orchestrator'})") except Exception as e: logger.error(f"Failed to add comment to {work_item_id}: {e}") def set_issue_needs_input(work_item_id: str, project_id: str = None): """Set issue to 'Needs Input' state — waiting for stakeholder response.""" _set_issue_state_direct(work_item_id, PLANE_STATES["needs_input"], project_id) def set_issue_in_review(work_item_id: str, project_id: str = None): """Set issue to 'In Review' state — waiting for :approved: or :rejected:.""" _set_issue_state_direct(work_item_id, PLANE_STATES["in_review"], project_id) def set_issue_blocked(work_item_id: str, project_id: str = None): """Set issue to 'Blocked' state — manual intervention needed.""" _set_issue_state_direct(work_item_id, PLANE_STATES["blocked"], project_id) def set_issue_in_progress(work_item_id: str, project_id: str = None): """Set issue to 'In Progress' state — agent working.""" _set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"], project_id) def set_issue_stage_state(work_item_id: str, stage: str, project_id: str = None): """Feature 3: move the issue to the board status for a pipeline stage. Only the visible-stage statuses (architecture/development/review/testing) are driven here — stages without a dedicated status (analysis/deploy) are a no-op so the existing in_progress/in_review/needs_input logic stays in charge. By design this does NOT touch Needs Input / In Review / Blocked, which are higher priority and set explicitly by their own helpers. """ state_key = STAGE_VISIBILITY_STATE.get(stage) if not state_key: return _set_issue_state_direct(work_item_id, PLANE_STATES[state_key], project_id) def _set_issue_state_direct(work_item_id: str, state_id: str, project_id: str = None): """Set issue state directly by state_id.""" project_id = _resolve_project_id(work_item_id, project_id) issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10) resp.raise_for_status() logger.info(f"Plane: {work_item_id} state -> {state_id[:8]}...") except Exception as e: logger.error(f"Failed to update Plane state for {work_item_id}: {e}") def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None, project_id: str = None): """Notify Plane about stage transition with links.""" project_id = _resolve_project_id(work_item_id, project_id) update_issue_state(work_item_id, new_stage, project_id) msg = f"{EMOJI_STAGE} Stage: {old_stage} → {new_stage}" if agent: msg += f" (launching {agent})" # Add relevant links gitea_base = "http://git.mva154.duckdns.org" try: from .db import get_db conn = get_db() row = conn.execute( "SELECT branch, repo FROM tasks WHERE work_item_id=?", (work_item_id,) ).fetchone() conn.close() if row: branch, repo = row msg += chr(10) + "📂 Branch: [" + branch + "](" + gitea_base + "/admin/" + repo + "/src/branch/" + branch + ")" if new_stage in ("review", "testing", "deploy"): import httpx as _httpx from .config import settings _headers = {"Authorization": f"token {settings.gitea_token}"} _resp = _httpx.get( f"{settings.gitea_url}/api/v1/repos/{settings.gitea_owner}/{repo}/pulls", params={"state": "open", "head": branch}, headers=_headers, timeout=5 ) if _resp.status_code == 200: _prs = _resp.json() if _prs: pr_num = _prs[0]["number"] msg += chr(10) + "🔗 PR: [#" + str(pr_num) + "](" + gitea_base + "/admin/" + repo + "/pulls/" + str(pr_num) + ")" except Exception: pass # Stage transition is the orchestrator's own voice -> attribute to stream. add_comment(work_item_id, msg, project_id, author="stream") def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None): """Notify Plane about QG failure.""" # QG failure belongs to the agent that owns the failing stage. add_comment( work_item_id, f"{EMOJI_QG_FAIL} QG failed at {stage}: {check} — {reason}", project_id, author=STAGE_AUTHORS.get(stage, "stream"), ) def notify_done(work_item_id: str, project_id: str = None): """Mark issue as Done in Plane.""" project_id = _resolve_project_id(work_item_id, project_id) update_issue_state(work_item_id, "done", project_id) # Deploy finished the task -> attribute the completion comment to Deployer. add_comment( work_item_id, f"{EMOJI_DONE} Task completed! PR merged and deployed.", project_id, author="deployer", )