"""Plane API sync — update issue state and add comments.""" import logging import httpx from .config import settings logger = logging.getLogger("orchestrator.plane_sync") # L-3: emoji literals used in Plane comment bodies, named for readability. # Message text stays byte-for-byte identical to the previous output. EMOJI_STAGE = "\U0001F504" # stage transition EMOJI_QG_FAIL = "\u26A0\uFE0F" # quality-gate failure EMOJI_DONE = "\u2705" # task completed PLANE_BASE = f"{settings.plane_api_url}/api/v1" PLANE_HEADERS = {"X-API-Key": settings.plane_api_token} WORKSPACE = settings.plane_workspace_slug # feat(plane): per-agent comment authorship. # Map an agent role -> its dedicated Plane bot token (read from config / env). # When the token is present, add_comment() POSTs under that bot so Plane shows # the real author. Empty/unknown role -> fallback to the shared orchestrator # token (PLANE_HEADERS), so commenting stays autonomous. PLANE_BOT_TOKENS = { "analyst": settings.plane_bot_analyst, "architect": settings.plane_bot_architect, "developer": settings.plane_bot_developer, "reviewer": settings.plane_bot_reviewer, "tester": settings.plane_bot_tester, "deployer": settings.plane_bot_deployer, "stream": settings.plane_bot_stream, } # Map a pipeline stage -> the agent role that owns work in that stage. Used to # pick an author for rollback/stage notifications targeting a specific stage. STAGE_AUTHORS = { "analysis": "analyst", "architecture": "architect", "development": "developer", "review": "reviewer", "testing": "tester", "deploy": "deployer", } def _headers_for(author: str | None) -> dict: """Return X-API-Key headers for the given agent role. Falls back to the shared orchestrator token (PLANE_HEADERS / settings.plane_api_token) when the role is None, unknown, or its bot token is not configured. This keeps comment posting autonomous: a comment is always written, just attributed to the orchestrator if no bot is set. """ tok = PLANE_BOT_TOKENS.get(author or "") if author else None return {"X-API-Key": tok} if tok else PLANE_HEADERS PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c" def _resolve_project_id(work_item_id: str = None, project_id: str = None) -> str: """ORCH-6: resolve the Plane project id for a sync call. Priority: 1. explicit project_id arg (caller already knows the project), 2. project derived from the task's repo in the DB (by work_item_id), 3. legacy default PROJECT_ID (enduro) for backward compatibility. """ if project_id: return project_id if work_item_id: try: from .db import get_db from .projects import get_project_by_repo conn = get_db() row = conn.execute( "SELECT repo FROM tasks WHERE work_item_id = ? ORDER BY id DESC LIMIT 1", (work_item_id,), ).fetchone() conn.close() if row and row[0]: proj = get_project_by_repo(row[0]) if proj: return proj.plane_project_id except Exception as e: logger.debug(f"_resolve_project_id fallback for {work_item_id}: {e}") return PROJECT_ID # ORCH-10: per-project state resolution. # # _DEFAULT_STATES keeps the original enduro-trails UUIDs as a safe fallback # (used when the Plane API is unreachable and for backward compat). # PLANE_STATES is preserved as an alias so existing call sites that reference # it directly (QG-0 fast-path in webhooks/plane.py, tests) continue to work. _DEFAULT_STATES = { "backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122", "todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10", "in_progress": "b873d9eb-993c-48cd-97ac-99a9b1623967", "needs_input": "babf08a3-ff4d-41f3-a821-5491aa29a8ac", "in_review": "38fb1f64-aa1e-48a3-92e0-0b109679046b", "blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920", "done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", "cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17", # Feature 3 (stage visibility) — per-stage statuses on the board. "architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d", "development": "9920609b-f140-4e46-ab95-89acda8412c8", "review": "ba0d802c-5218-41d4-ab43-978b0ea123ed", "testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02", # Feature 2 (verdict statuses) — Approved / Rejected. "approved": "a519a341-dada-4a91-8910-7604f82b79c5", "rejected": "ba958f3c-5db5-461d-8f82-89425e413b97", } # Backward-compat alias — do NOT remove (tests + webhooks/plane.py import it). PLANE_STATES = _DEFAULT_STATES # Mapping: Plane state *name* (as returned by the API) -> logical key. _PLANE_NAME_TO_KEY: dict[str, str] = { "Backlog": "backlog", "Todo": "todo", "In Progress": "in_progress", "Architecture": "architecture", "Development": "development", "Review": "review", "Testing": "testing", "Approved": "approved", "Rejected": "rejected", "Done": "done", "Cancelled": "cancelled", "Needs Input": "needs_input", "In Review": "in_review", "Blocked": "blocked", } # Per-project state cache: {project_id: {logical_key: state_uuid}} _STATES_CACHE: dict[str, dict[str, str]] = {} def get_project_states(project_id: str) -> dict[str, str]: """ORCH-10: resolve {logical_key -> state_uuid} for a specific Plane project. Source of truth: Plane API GET /projects//states/. Results are cached per project_id for the lifetime of the process. Falls back to _DEFAULT_STATES (enduro-trails values) if: * project_id is empty/None, * the API call fails (network error, non-2xx), * the response contains no recognisable states. The enduro-trails project therefore returns the same UUIDs as before (backward compatible). The orchestrator project returns its own UUIDs, fixing the ORCH-10 blocker. """ if not project_id: return _DEFAULT_STATES if project_id in _STATES_CACHE: return _STATES_CACHE[project_id] url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/states/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() body = resp.json() # Plane returns {"results": [...]} or a bare list. items = body.get("results", body) if isinstance(body, dict) else body if not isinstance(items, list): raise ValueError(f"unexpected states response shape: {type(items)}") resolved: dict[str, str] = {} for item in items: name = item.get("name", "") uid = item.get("id", "") key = _PLANE_NAME_TO_KEY.get(name) if key and uid: resolved[key] = uid if not resolved: raise ValueError("no recognisable states in API response") # Fill any missing keys from _DEFAULT_STATES so callers always get a # complete mapping (defensive against partial Plane configs). for k, v in _DEFAULT_STATES.items(): resolved.setdefault(k, v) _STATES_CACHE[project_id] = resolved logger.debug( f"get_project_states: cached {len(resolved)} states for project {project_id[:8]}..." ) return resolved except Exception as e: logger.warning( f"get_project_states: API failed for project {project_id[:8]}..., " f"falling back to _DEFAULT_STATES. Error: {e}" ) return _DEFAULT_STATES def reload_project_states(project_id: str = None) -> None: """ORCH-10: clear the per-project states cache. If project_id is given, evict only that project. If None, flush the entire cache (useful in tests and after config reload). """ global _STATES_CACHE if project_id is None: _STATES_CACHE = {} logger.debug("reload_project_states: full cache cleared") else: _STATES_CACHE.pop(project_id, None) logger.debug(f"reload_project_states: evicted project {project_id[:8]}...") # Feature 3: map an orchestrator stage -> the Plane status to show on the board # when the pipeline ENTERS that stage. analysis stays driven by the existing # in_progress/in_review/needs_input logic (no dedicated status). deploy keeps # in_progress until done. Needs Input / In Review / Blocked remain higher # priority and are set explicitly elsewhere — do NOT override them from here. STAGE_VISIBILITY_STATE = { "architecture": "architecture", "development": "development", "review": "review", "testing": "testing", } # STAGE_TO_STATE kept for backward compat (used by tests that patch it). # update_issue_state now calls stage_to_state() instead of looking up here. STAGE_TO_STATE = { "created": _DEFAULT_STATES["todo"], "analysis": _DEFAULT_STATES["in_progress"], "architecture": _DEFAULT_STATES["architecture"], "development": _DEFAULT_STATES["development"], "review": _DEFAULT_STATES["review"], "testing": _DEFAULT_STATES["testing"], "deploy": _DEFAULT_STATES["in_progress"], "done": _DEFAULT_STATES["done"], } # Map orchestrator stage -> logical state key (project-independent). _STAGE_TO_STATE_KEY = { "created": "todo", "analysis": "in_progress", "architecture": "architecture", "development": "development", "review": "review", "testing": "testing", "deploy": "in_progress", "done": "done", } def stage_to_state(stage: str, project_id: str) -> str | None: """ORCH-10: return the Plane state UUID for a pipeline stage in a project. Resolves via get_project_states so the correct per-project UUID is used. Returns None for unknown stages (same behaviour as the old STAGE_TO_STATE dict lookup returning None). """ key = _STAGE_TO_STATE_KEY.get(stage) if not key: return None return get_project_states(project_id).get(key) def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None: """M-6: GET the Plane issue by UUID and return its sequence_id (the authoritative per-project number), or None if unavailable. Returns None on network error, non-2xx, or a missing field - never raises, so the webhook handler can fall back to DB increment and stay autonomous. """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() seq = resp.json().get("sequence_id") return int(seq) if seq is not None else None except Exception as e: logger.warning(f"fetch_issue_sequence_id failed for {issue_id}: {e}") return None def fetch_issue_state(issue_id: str, project_id: str) -> str | None: """ORCH-060 (F-1 Guard 2): GET the Plane issue and return its current state uuid. Used by the reconciler to honour an explicit human gate: an issue a person moved to **Blocked** / **Needs Input** must not be auto-unblocked by the sweeper. Reuses the exact GET issue-detail endpoint / shared token already used by ``fetch_issue_sequence_id`` / ``fetch_issue_fields``. Plane returns ``state`` as a bare uuid string; older shapes may nest it as a ``{"id": ...}`` dict — both are handled. Returns None on network error, non-2xx, or a missing field — never raises, so the caller can apply its conservative fallback (treat as "possibly blocked"). """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() state = resp.json().get("state") if isinstance(state, dict): state = state.get("id") return str(state) if state else None except Exception as e: logger.warning(f"fetch_issue_state failed for {issue_id}: {e}") return None import re as _re def _strip_html(html: str) -> str: """Crude HTML -> text: drop tags and collapse whitespace. Good enough to feed QG-0's length check when Plane only gives us description_html.""" if not html: return "" text = _re.sub(r"<[^>]+>", " ", html) return _re.sub(r"\s+", " ", text).strip() def fetch_issue_description(issue_id: str, project_id: str) -> str: """BUG 1: GET the Plane issue by UUID and return its description text. Plane's ``issue.updated`` webhook (e.g. a status change) only carries the CHANGED fields, so ``description``/``description_stripped`` are usually absent there. start_pipeline calls this to pull the full description from the issue detail endpoint so QG-0 does not blow up on an empty payload field. Reuses the exact GET issue detail endpoint / shared token already used by ``fetch_issue_sequence_id`` (same URL, same PLANE_HEADERS). Prefers ``description_stripped``; falls back to stripping ``description_html``. Returns "" on network error, non-2xx, or a missing field - never raises, so a Plane outage degrades to the honest "empty description" QG-0 path instead of crashing the webhook. """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() body = resp.json() desc = body.get("description_stripped") if desc and desc.strip(): return desc return _strip_html(body.get("description_html") or "") except Exception as e: logger.warning(f"fetch_issue_description failed for {issue_id}: {e}") return "" def fetch_issue_fields(issue_id: str, project_id: str) -> tuple[str, str]: """BUG B: GET the Plane issue by UUID ONCE and return (name, description). Plane's ``issue.updated`` webhook (e.g. a status change) only carries the CHANGED fields, so BOTH ``name`` and ``description`` are usually absent in the payload. start_pipeline needs the real title (for the branch slug) and the real description (for the analyst .task.md). To avoid issuing two separate issue-detail GETs (one for name, one for description), this single request returns both. Reuses the exact GET issue detail endpoint / shared token already used by ``fetch_issue_sequence_id`` / ``fetch_issue_description``. For the description it applies the same logic as ``fetch_issue_description`` (prefer ``description_stripped``, fall back to stripping ``description_html``). Returns ("", "") on network error, non-2xx, or missing body - never raises, so a Plane outage degrades gracefully (caller keeps its payload fallbacks). """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() body = resp.json() name = (body.get("name") or "").strip() desc = body.get("description_stripped") if desc and desc.strip(): description = desc else: description = _strip_html(body.get("description_html") or "") return name, description except Exception as e: logger.warning(f"fetch_issue_fields failed for {issue_id}: {e}") return "", "" def list_issues_by_state(project_id: str, state_uuids: list[str]) -> list[dict]: """ORCH-053 (F-2): list a project's issues whose state is in ``state_uuids``. GETs ``/workspaces/{ws}/projects/{pid}/issues/`` and walks ALL pages (Plane's cursor pagination: ``results`` + ``next_cursor`` / ``next_page_results``), keeping only issues whose state uuid is one of the requested ones. The filter is applied client-side on ``issue.state`` (a dict ``{id,...}`` or a bare uuid string) so it works regardless of whether Plane's query-param state filter is honoured. Never raises: on any network / API / shape error it logs a warning and returns ``[]`` so a Plane outage degrades the F-2 tick softly instead of crashing it. """ if not project_id or not state_uuids: return [] wanted = set(state_uuids) out: list[dict] = [] url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/" try: cursor = None pages = 0 while True: params: dict = {"per_page": 100} if cursor: params["cursor"] = cursor resp = httpx.get(url, headers=PLANE_HEADERS, params=params, timeout=10) resp.raise_for_status() body = resp.json() if isinstance(body, dict): items = body.get("results", []) else: items = body if isinstance(body, list) else [] for issue in items: state = issue.get("state") sid = state.get("id") if isinstance(state, dict) else state if sid in wanted: out.append(issue) # Pagination: continue only while Plane reports more pages. pages += 1 if not isinstance(body, dict): break has_more = bool(body.get("next_page_results")) next_cursor = body.get("next_cursor") if not has_more or not next_cursor or pages >= 100: break cursor = next_cursor return out except Exception as e: logger.warning( f"list_issues_by_state: API failed for project {project_id[:8]}..., " f"returning []. Error: {e}" ) return [] def find_issue_id(work_item_id: str, project_id: str = None) -> str | None: """Find Plane issue UUID by work_item_id (e.g. 'ET-002').""" project_id = _resolve_project_id(work_item_id, project_id) # Primary: lookup from DB (plane_issue_id column) try: from .db import get_db conn = get_db() row = conn.execute( "SELECT plane_issue_id FROM tasks WHERE work_item_id = ? AND plane_issue_id IS NOT NULL", (work_item_id,) ).fetchone() if row and row[0]: return row[0] except Exception as e: logger.debug(f"DB lookup failed for {work_item_id}: {e}") # Fallback: search via Plane API url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/" try: # First try search by work_item_id resp = httpx.get(url, headers=PLANE_HEADERS, params={"search": work_item_id}, timeout=10) resp.raise_for_status() data = resp.json() results = data.get("results", data if isinstance(data, list) else []) # M-6: match by sequence_id directly (the authoritative per-project # number), parsed from the work_item_id suffix - no hardcoded prefix. try: target_num = int(work_item_id.rsplit("-", 1)[1]) except (IndexError, ValueError): target_num = None for issue in results: if target_num is not None and issue.get("sequence_id") == target_num: return issue["id"] if work_item_id in issue.get("name", ""): return issue["id"] # Fallback: get all issues and match by sequence_id number (any prefix) if target_num is not None: resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp2.raise_for_status() data2 = resp2.json() results2 = data2.get("results", data2 if isinstance(data2, list) else []) for issue in results2: if issue.get("sequence_id") == target_num: return issue["id"] except Exception as e: logger.error(f"Failed to find issue for {work_item_id}: {e}") return None def update_issue_state(work_item_id: str, stage: str, project_id: str = None): """Update Plane issue state based on orchestrator stage.""" project_id = _resolve_project_id(work_item_id, project_id) # ORCH-10: resolve state UUID for this specific project (not global dict). state_id = stage_to_state(stage, project_id) if not state_id: return issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10) resp.raise_for_status() logger.info(f"Plane: {work_item_id} state -> {stage} ({state_id[:8]}...)") except Exception as e: logger.error(f"Failed to update Plane state for {work_item_id}: {e}") def add_comment(work_item_id: str, text: str, project_id: str = None, author: str = None): """Add a comment to a Plane issue. feat(plane): when ``author`` (an agent role) maps to a configured bot token, the comment is POSTed under that bot so Plane shows the real author. Otherwise it falls back to the shared orchestrator token (see ``_headers_for``). GET/PATCH calls elsewhere keep using PLANE_HEADERS. """ project_id = _resolve_project_id(work_item_id, project_id) issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}, skipping comment") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/" html = f"

{text}

" try: resp = httpx.post(url, headers=_headers_for(author), json={"comment_html": html}, timeout=10) resp.raise_for_status() logger.info(f"Plane: comment added to {work_item_id} (author={author or 'orchestrator'})") except Exception as e: logger.error(f"Failed to add comment to {work_item_id}: {e}") def set_issue_needs_input(work_item_id: str, project_id: str = None): """Set issue to 'Needs Input' state — waiting for stakeholder response.""" project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["needs_input"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_in_review(work_item_id: str, project_id: str = None): """Set issue to 'In Review' state — waiting for :approved: or :rejected:.""" project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["in_review"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_blocked(work_item_id: str, project_id: str = None): """Set issue to 'Blocked' state — manual intervention needed.""" project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["blocked"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_done(work_item_id: str, project_id: str = None): """Observability fix: force the issue into the TERMINAL Done state. Used by the deploy->done success path so a completed task always reaches the terminal Plane state (it used to stick on In Progress because the merge webhook bypassed the stage engine). Resolves per-project UUID via get_project_states (ORCH-10). """ project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["done"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_in_progress(work_item_id: str, project_id: str = None): """Set issue to 'In Progress' state — agent working.""" project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["in_progress"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_stage_state(work_item_id: str, stage: str, project_id: str = None): """Feature 3: move the issue to the board status for a pipeline stage. Only the visible-stage statuses (architecture/development/review/testing) are driven here — stages without a dedicated status (analysis/deploy) are a no-op so the existing in_progress/in_review/needs_input logic stays in charge. By design this does NOT touch Needs Input / In Review / Blocked, which are higher priority and set explicitly by their own helpers. """ state_key = STAGE_VISIBILITY_STATE.get(stage) if not state_key: return project_id = _resolve_project_id(work_item_id, project_id) # ORCH-10: resolve per-project UUID. state_id = get_project_states(project_id)[state_key] _set_issue_state_direct(work_item_id, state_id, project_id) def _set_issue_state_direct(work_item_id: str, state_id: str, project_id: str = None): """Set issue state directly by state_id.""" project_id = _resolve_project_id(work_item_id, project_id) issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10) resp.raise_for_status() logger.info(f"Plane: {work_item_id} state -> {state_id[:8]}...") except Exception as e: logger.error(f"Failed to update Plane state for {work_item_id}: {e}") def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None, project_id: str = None): """Notify Plane about stage transition with links.""" project_id = _resolve_project_id(work_item_id, project_id) update_issue_state(work_item_id, new_stage, project_id) msg = f"{EMOJI_STAGE} Stage: {old_stage} → {new_stage}" if agent: msg += f" (launching {agent})" # Add relevant links gitea_base = "http://git.mva154.duckdns.org" try: from .db import get_db conn = get_db() row = conn.execute( "SELECT branch, repo FROM tasks WHERE work_item_id=?", (work_item_id,) ).fetchone() conn.close() if row: branch, repo = row msg += chr(10) + "📂 Branch: [" + branch + "](" + gitea_base + "/admin/" + repo + "/src/branch/" + branch + ")" if new_stage in ("review", "testing", "deploy"): import httpx as _httpx from .config import settings _headers = {"Authorization": f"token {settings.gitea_token}"} _resp = _httpx.get( f"{settings.gitea_url}/api/v1/repos/{settings.gitea_owner}/{repo}/pulls", params={"state": "open", "head": branch}, headers=_headers, timeout=5 ) if _resp.status_code == 200: _prs = _resp.json() if _prs: pr_num = _prs[0]["number"] msg += chr(10) + "🔗 PR: [#" + str(pr_num) + "](" + gitea_base + "/admin/" + repo + "/pulls/" + str(pr_num) + ")" except Exception: pass # Stage transition is the orchestrator's own voice -> attribute to stream. add_comment(work_item_id, msg, project_id, author="stream") def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None): """Notify Plane about QG failure.""" # QG failure belongs to the agent that owns the failing stage. add_comment( work_item_id, f"{EMOJI_QG_FAIL} QG failed at {stage}: {check} — {reason}", project_id, author=STAGE_AUTHORS.get(stage, "stream"), ) def notify_done(work_item_id: str, project_id: str = None): """Mark issue as Done in Plane.""" project_id = _resolve_project_id(work_item_id, project_id) update_issue_state(work_item_id, "done", project_id) # Deploy finished the task -> attribute the completion comment to Deployer. add_comment( work_item_id, f"{EMOJI_DONE} Task completed! PR merged and deployed.", project_id, author="deployer", )