"""Plane API sync — update issue state and add comments.""" import logging import httpx from .config import settings logger = logging.getLogger("orchestrator.plane_sync") # L-3: emoji literals used in Plane comment bodies, named for readability. # Message text stays byte-for-byte identical to the previous output. EMOJI_STAGE = "\U0001F504" # stage transition EMOJI_QG_FAIL = "\u26A0\uFE0F" # quality-gate failure EMOJI_DONE = "\u2705" # task completed PLANE_BASE = f"{settings.plane_api_url}/api/v1" PLANE_HEADERS = {"X-API-Key": settings.plane_api_token} WORKSPACE = settings.plane_workspace_slug PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c" def _resolve_project_id(work_item_id: str = None, project_id: str = None) -> str: """ORCH-6: resolve the Plane project id for a sync call. Priority: 1. explicit project_id arg (caller already knows the project), 2. project derived from the task's repo in the DB (by work_item_id), 3. legacy default PROJECT_ID (enduro) for backward compatibility. """ if project_id: return project_id if work_item_id: try: from .db import get_db from .projects import get_project_by_repo conn = get_db() row = conn.execute( "SELECT repo FROM tasks WHERE work_item_id = ? ORDER BY id DESC LIMIT 1", (work_item_id,), ).fetchone() conn.close() if row and row[0]: proj = get_project_by_repo(row[0]) if proj: return proj.plane_project_id except Exception as e: logger.debug(f"_resolve_project_id fallback for {work_item_id}: {e}") return PROJECT_ID # Plane state IDs PLANE_STATES = { "backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122", "todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10", "in_progress": "b873d9eb-993c-48cd-97ac-99a9b1623967", "needs_input": "babf08a3-ff4d-41f3-a821-5491aa29a8ac", "in_review": "38fb1f64-aa1e-48a3-92e0-0b109679046b", "blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920", "done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", "cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17", } # Map orchestrator stages to Plane states STAGE_TO_STATE = { "created": PLANE_STATES["todo"], "analysis": PLANE_STATES["in_progress"], "architecture": PLANE_STATES["in_progress"], "development": PLANE_STATES["in_progress"], "review": PLANE_STATES["in_progress"], "testing": PLANE_STATES["in_progress"], "deploy": PLANE_STATES["in_progress"], "done": PLANE_STATES["done"], } def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None: """M-6: GET the Plane issue by UUID and return its sequence_id (the authoritative per-project number), or None if unavailable. Returns None on network error, non-2xx, or a missing field - never raises, so the webhook handler can fall back to DB increment and stay autonomous. """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() seq = resp.json().get("sequence_id") return int(seq) if seq is not None else None except Exception as e: logger.warning(f"fetch_issue_sequence_id failed for {issue_id}: {e}") return None def find_issue_id(work_item_id: str, project_id: str = None) -> str | None: """Find Plane issue UUID by work_item_id (e.g. 'ET-002').""" project_id = _resolve_project_id(work_item_id, project_id) # Primary: lookup from DB (plane_issue_id column) try: from .db import get_db conn = get_db() row = conn.execute( "SELECT plane_issue_id FROM tasks WHERE work_item_id = ? AND plane_issue_id IS NOT NULL", (work_item_id,) ).fetchone() if row and row[0]: return row[0] except Exception as e: logger.debug(f"DB lookup failed for {work_item_id}: {e}") # Fallback: search via Plane API url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/" try: # First try search by work_item_id resp = httpx.get(url, headers=PLANE_HEADERS, params={"search": work_item_id}, timeout=10) resp.raise_for_status() data = resp.json() results = data.get("results", data if isinstance(data, list) else []) # M-6: match by sequence_id directly (the authoritative per-project # number), parsed from the work_item_id suffix - no hardcoded prefix. try: target_num = int(work_item_id.rsplit("-", 1)[1]) except (IndexError, ValueError): target_num = None for issue in results: if target_num is not None and issue.get("sequence_id") == target_num: return issue["id"] if work_item_id in issue.get("name", ""): return issue["id"] # Fallback: get all issues and match by sequence_id number (any prefix) if target_num is not None: resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp2.raise_for_status() data2 = resp2.json() results2 = data2.get("results", data2 if isinstance(data2, list) else []) for issue in results2: if issue.get("sequence_id") == target_num: return issue["id"] except Exception as e: logger.error(f"Failed to find issue for {work_item_id}: {e}") return None def update_issue_state(work_item_id: str, stage: str, project_id: str = None): """Update Plane issue state based on orchestrator stage.""" state_id = STAGE_TO_STATE.get(stage) if not state_id: return project_id = _resolve_project_id(work_item_id, project_id) issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10) resp.raise_for_status() logger.info(f"Plane: {work_item_id} state -> {stage} ({state_id[:8]}...)") except Exception as e: logger.error(f"Failed to update Plane state for {work_item_id}: {e}") def add_comment(work_item_id: str, text: str, project_id: str = None): """Add a comment to Plane issue.""" project_id = _resolve_project_id(work_item_id, project_id) issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}, skipping comment") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/" html = f"
{text}
" try: resp = httpx.post(url, headers=PLANE_HEADERS, json={"comment_html": html}, timeout=10) resp.raise_for_status() logger.info(f"Plane: comment added to {work_item_id}") except Exception as e: logger.error(f"Failed to add comment to {work_item_id}: {e}") def set_issue_needs_input(work_item_id: str, project_id: str = None): """Set issue to 'Needs Input' state — waiting for stakeholder response.""" _set_issue_state_direct(work_item_id, PLANE_STATES["needs_input"], project_id) def set_issue_in_review(work_item_id: str, project_id: str = None): """Set issue to 'In Review' state — waiting for :approved: or :rejected:.""" _set_issue_state_direct(work_item_id, PLANE_STATES["in_review"], project_id) def set_issue_blocked(work_item_id: str, project_id: str = None): """Set issue to 'Blocked' state — manual intervention needed.""" _set_issue_state_direct(work_item_id, PLANE_STATES["blocked"], project_id) def set_issue_in_progress(work_item_id: str, project_id: str = None): """Set issue to 'In Progress' state — agent working.""" _set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"], project_id) def _set_issue_state_direct(work_item_id: str, state_id: str, project_id: str = None): """Set issue state directly by state_id.""" project_id = _resolve_project_id(work_item_id, project_id) issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10) resp.raise_for_status() logger.info(f"Plane: {work_item_id} state -> {state_id[:8]}...") except Exception as e: logger.error(f"Failed to update Plane state for {work_item_id}: {e}") def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None, project_id: str = None): """Notify Plane about stage transition with links.""" project_id = _resolve_project_id(work_item_id, project_id) update_issue_state(work_item_id, new_stage, project_id) msg = f"{EMOJI_STAGE} Stage: {old_stage} → {new_stage}" if agent: msg += f" (launching {agent})" # Add relevant links gitea_base = "http://git.mva154.duckdns.org" try: from .db import get_db conn = get_db() row = conn.execute( "SELECT branch, repo FROM tasks WHERE work_item_id=?", (work_item_id,) ).fetchone() conn.close() if row: branch, repo = row msg += chr(10) + "📂 Branch: [" + branch + "](" + gitea_base + "/admin/" + repo + "/src/branch/" + branch + ")" if new_stage in ("review", "testing", "deploy"): import httpx as _httpx from .config import settings _headers = {"Authorization": f"token {settings.gitea_token}"} _resp = _httpx.get( f"{settings.gitea_url}/api/v1/repos/{settings.gitea_owner}/{repo}/pulls", params={"state": "open", "head": branch}, headers=_headers, timeout=5 ) if _resp.status_code == 200: _prs = _resp.json() if _prs: pr_num = _prs[0]["number"] msg += chr(10) + "🔗 PR: [#" + str(pr_num) + "](" + gitea_base + "/admin/" + repo + "/pulls/" + str(pr_num) + ")" except Exception: pass add_comment(work_item_id, msg, project_id) def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None): """Notify Plane about QG failure.""" add_comment(work_item_id, f"{EMOJI_QG_FAIL} QG failed at {stage}: {check} — {reason}", project_id) def notify_done(work_item_id: str, project_id: str = None): """Mark issue as Done in Plane.""" project_id = _resolve_project_id(work_item_id, project_id) update_issue_state(work_item_id, "done", project_id) add_comment(work_item_id, f"{EMOJI_DONE} Task completed! PR merged and deployed.", project_id)