"""Plane API sync — update issue state and add comments.""" import logging import time import httpx from .config import settings logger = logging.getLogger("orchestrator.plane_sync") # L-3: emoji literals used in Plane comment bodies, named for readability. # Message text stays byte-for-byte identical to the previous output. EMOJI_STAGE = "\U0001F504" # stage transition EMOJI_QG_FAIL = "\u26A0\uFE0F" # quality-gate failure EMOJI_DONE = "\u2705" # task completed PLANE_BASE = f"{settings.plane_api_url}/api/v1" PLANE_HEADERS = {"X-API-Key": settings.plane_api_token} WORKSPACE = settings.plane_workspace_slug # feat(plane): per-agent comment authorship. # Map an agent role -> its dedicated Plane bot token (read from config / env). # When the token is present, add_comment() POSTs under that bot so Plane shows # the real author. Empty/unknown role -> fallback to the shared orchestrator # token (PLANE_HEADERS), so commenting stays autonomous. PLANE_BOT_TOKENS = { "analyst": settings.plane_bot_analyst, "architect": settings.plane_bot_architect, "developer": settings.plane_bot_developer, "reviewer": settings.plane_bot_reviewer, "tester": settings.plane_bot_tester, "deployer": settings.plane_bot_deployer, "stream": settings.plane_bot_stream, } # Map a pipeline stage -> the agent role that owns work in that stage. Used to # pick an author for rollback/stage notifications targeting a specific stage. STAGE_AUTHORS = { "analysis": "analyst", "architecture": "architect", "development": "developer", "review": "reviewer", "testing": "tester", "deploy": "deployer", } def _headers_for(author: str | None) -> dict: """Return X-API-Key headers for the given agent role. Falls back to the shared orchestrator token (PLANE_HEADERS / settings.plane_api_token) when the role is None, unknown, or its bot token is not configured. This keeps comment posting autonomous: a comment is always written, just attributed to the orchestrator if no bot is set. """ tok = PLANE_BOT_TOKENS.get(author or "") if author else None return {"X-API-Key": tok} if tok else PLANE_HEADERS PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c" def _resolve_project_id(work_item_id: str = None, project_id: str = None) -> str: """ORCH-6: resolve the Plane project id for a sync call. Priority: 1. explicit project_id arg (caller already knows the project), 2. project derived from the task's repo in the DB (by work_item_id), 3. legacy default PROJECT_ID (enduro) for backward compatibility. """ if project_id: return project_id if work_item_id: try: from .db import get_db from .projects import get_project_by_repo conn = get_db() row = conn.execute( "SELECT repo FROM tasks WHERE work_item_id = ? ORDER BY id DESC LIMIT 1", (work_item_id,), ).fetchone() conn.close() if row and row[0]: proj = get_project_by_repo(row[0]) if proj: return proj.plane_project_id except Exception as e: logger.debug(f"_resolve_project_id fallback for {work_item_id}: {e}") return PROJECT_ID # ORCH-10: per-project state resolution. # # _DEFAULT_STATES keeps the original enduro-trails UUIDs as a safe fallback # (used when the Plane API is unreachable and for backward compat). # PLANE_STATES is preserved as an alias so existing call sites that reference # it directly (QG-0 fast-path in webhooks/plane.py, tests) continue to work. _DEFAULT_STATES = { "backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122", "todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10", "in_progress": "b873d9eb-993c-48cd-97ac-99a9b1623967", "needs_input": "babf08a3-ff4d-41f3-a821-5491aa29a8ac", "in_review": "38fb1f64-aa1e-48a3-92e0-0b109679046b", "blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920", "done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", "cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17", # Feature 3 (stage visibility) — per-stage statuses on the board. "architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d", "development": "9920609b-f140-4e46-ab95-89acda8412c8", "review": "ba0d802c-5218-41d4-ab43-978b0ea123ed", "testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02", # Feature 2 (verdict statuses) — Approved / Rejected. "approved": "a519a341-dada-4a91-8910-7604f82b79c5", "rejected": "ba958f3c-5db5-461d-8f82-89425e413b97", # ORCH-066 (meaningful Plane status model, layer B): six new logical keys. # Their _DEFAULT_STATES values alias the enduro-trails UUID of their BASE key # (see _STATE_ALIAS_FALLBACK) so a project without these statuses created # (enduro / Plane down / partial config) degrades to the current behaviour # instead of producing an invalid PATCH state. The project-relative # alias-fallback in get_project_states() overrides these with the *project's # own* base UUID on the success path; these defaults are the last resort. "to_analyse": "b873d9eb-993c-48cd-97ac-99a9b1623967", # = in_progress "analysis": "b873d9eb-993c-48cd-97ac-99a9b1623967", # = in_progress "code_review": "ba0d802c-5218-41d4-ab43-978b0ea123ed", # = review "awaiting_deploy": "38fb1f64-aa1e-48a3-92e0-0b109679046b", # = in_review "deploying": "b873d9eb-993c-48cd-97ac-99a9b1623967", # = in_progress "monitoring": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", # = done } # Backward-compat alias — do NOT remove (tests + webhooks/plane.py import it). PLANE_STATES = _DEFAULT_STATES # Mapping: Plane state *name* (as returned by the API) -> logical key. _PLANE_NAME_TO_KEY: dict[str, str] = { "Backlog": "backlog", "Todo": "todo", "In Progress": "in_progress", "Architecture": "architecture", "Development": "development", "Review": "review", "Testing": "testing", "Approved": "approved", "Rejected": "rejected", "Done": "done", "Cancelled": "cancelled", "Needs Input": "needs_input", "In Review": "in_review", "Blocked": "blocked", # ORCH-059: dedicated prod-deploy trigger status, distinct from the # human-gate "Approved". Resolved from the live Plane API for the ORCH # project; intentionally ABSENT from _DEFAULT_STATES so environments without # this board status (enduro / API fallback) fail-closed — no UUID, no # confirm-deploy branch, no KeyError (accessed via .get). "Confirm Deploy": "confirm_deploy", # ORCH-090: dedicated operator "STOP" status — the cancel trigger. Like # ORCH-059's Confirm Deploy it is INTENTIONALLY ABSENT from _DEFAULT_STATES # (fail-closed): environments without the status (enduro / API fallback) # resolve `stop` to None via .get -> the cancel branch simply never activates # (no UUID, no KeyError, no blind cancel). Create a STOP status with the # `cancelled` group on the board to enable it (07-infra-requirements.md). "STOP": "stop", # ORCH-066: meaningful per-stage / human-input statuses (layer B). "To Analyse": "to_analyse", "Analysis": "analysis", "Code-Review": "code_review", "Awaiting Deploy": "awaiting_deploy", "Deploying": "deploying", "Monitoring after Deploy": "monitoring", } # ORCH-066 (BR-12): project-relative alias-fallback for the new logical keys. # After resolving states by name from the Plane API, any NEW key the project did # not define degrades to the UUID of its BASE key **from the same project** — so # the indication falls back to the current status and the PATCH stays valid even # for a partially-configured project. Enduro (none of the new statuses created) # collapses every new key onto its base, i.e. strictly the pre-ORCH-066 # behaviour. Strengthened ORCH-059 AC-7 pattern. _STATE_ALIAS_FALLBACK: dict[str, str] = { "to_analyse": "in_progress", "analysis": "in_progress", "code_review": "review", "awaiting_deploy": "in_review", "deploying": "in_progress", "monitoring": "done", } # Per-project state cache (ORCH-10 + ORCH-068). # # Each entry is a RECORD, not a bare mapping: # {"states": {logical_key: state_uuid}, # the ORCH-10 mapping (unchanged shape) # "groups": {state_uuid: group}, # ORCH-068 D1: {uuid -> Plane state.group} # "ts": monotonic timestamp} # ORCH-068 TR-4: for TTL self-heal # get_project_states() still RETURNS the bare {logical_key: state_uuid} mapping # (backward compatible — AC-13); the richer record is internal. _STATES_CACHE: dict[str, dict] = {} def _cache_record_fresh(record: dict) -> bool: """ORCH-068 (TR-4): is a cache record still within its TTL? ``plane_states_ttl_s <= 0`` disables the TTL -> a record never expires (strictly the previous lifetime-cache behaviour, back-compat escape hatch). """ ttl = settings.plane_states_ttl_s if ttl <= 0: return True ts = record.get("ts", 0.0) return (time.monotonic() - ts) <= ttl def get_project_states(project_id: str) -> dict[str, str]: """ORCH-10: resolve {logical_key -> state_uuid} for a specific Plane project. Source of truth: Plane API GET /projects//states/. Results are cached per project_id. ORCH-068 (TR-4): a cached entry is re-fetched once it is older than ``plane_states_ttl_s`` (default 300s) so a status added to Plane after start self-heals without a process restart; ``plane_states_ttl_s = 0`` keeps the previous lifetime cache. Falls back to _DEFAULT_STATES (enduro-trails values) if: * project_id is empty/None, * the API call fails (network error, non-2xx) AND nothing is cached, * the response contains no recognisable states. The enduro-trails project therefore returns the same UUIDs as before (backward compatible). The orchestrator project returns its own UUIDs, fixing the ORCH-10 blocker. """ if not project_id: return _DEFAULT_STATES cached = _STATES_CACHE.get(project_id) if cached is not None and _cache_record_fresh(cached): return cached["states"] url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/states/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() body = resp.json() # Plane returns {"results": [...]} or a bare list. items = body.get("results", body) if isinstance(body, dict) else body if not isinstance(items, list): raise ValueError(f"unexpected states response shape: {type(items)}") resolved: dict[str, str] = {} groups: dict[str, str] = {} for item in items: name = item.get("name", "") uid = item.get("id", "") key = _PLANE_NAME_TO_KEY.get(name) if key and uid: resolved[key] = uid # ORCH-068 D1: capture {uuid -> group} for terminal-state detection # (a single API fetch — no extra network cost). The group is the # authoritative, project-independent discriminator of terminal # (completed/cancelled) vs review/work statuses, robust to UUID # aliasing after status renames (ORCH-066). grp = item.get("group", "") if uid and grp: groups[uid] = grp if not resolved: raise ValueError("no recognisable states in API response") # ORCH-066 (BR-12): project-relative alias-fallback. For each NEW key the # project did not define, reuse the UUID of its BASE key FROM THIS SAME # PROJECT (never a foreign/enduro UUID — that would yield an invalid PATCH # state on a partially-configured orchestrator project). Runs BEFORE the # _DEFAULT_STATES.setdefault below so a project's own base UUID wins over # the static enduro default. for new_key, base_key in _STATE_ALIAS_FALLBACK.items(): if new_key not in resolved and resolved.get(base_key): resolved[new_key] = resolved[base_key] # Fill any missing keys from _DEFAULT_STATES so callers always get a # complete mapping (defensive against partial Plane configs). for k, v in _DEFAULT_STATES.items(): resolved.setdefault(k, v) _STATES_CACHE[project_id] = { "states": resolved, "groups": groups, "ts": time.monotonic(), } logger.debug( f"get_project_states: cached {len(resolved)} states / " f"{len(groups)} groups for project {project_id[:8]}..." ) return resolved except Exception as e: # On a transient API failure keep serving the stale (but project-correct) # set if we have one — far safer than reverting to enduro defaults. if cached is not None: logger.warning( f"get_project_states: API refresh failed for project " f"{project_id[:8]}..., serving stale cached set. Error: {e}" ) return cached["states"] logger.warning( f"get_project_states: API failed for project {project_id[:8]}..., " f"falling back to _DEFAULT_STATES. Error: {e}" ) return _DEFAULT_STATES def get_project_state_groups(project_id: str) -> dict[str, str]: """ORCH-068 (D1): return {state_uuid -> group} for a Plane project. Reads the SAME cache record populated by ``get_project_states`` (no extra network call). Call ``get_project_states(project_id)`` first to ensure the record is fresh/populated. Returns ``{}`` when nothing is cached (e.g. the API was unreachable and the caller fell back to ``_DEFAULT_STATES``); the reconciler then falls back to logical terminal keys. """ record = _STATES_CACHE.get(project_id) if isinstance(record, dict): groups = record.get("groups") if isinstance(groups, dict): return groups return {} def reload_project_states(project_id: str = None) -> None: """ORCH-10: clear the per-project states cache. If project_id is given, evict only that project. If None, flush the entire cache (useful in tests and after config reload). """ global _STATES_CACHE if project_id is None: _STATES_CACHE = {} logger.debug("reload_project_states: full cache cleared") else: _STATES_CACHE.pop(project_id, None) logger.debug(f"reload_project_states: evicted project {project_id[:8]}...") # --------------------------------------------------------------------------- # ORCH-089: label reading (auto-mode by Plane labels) + Approved setter. # # Source of truth for an issue's labels is the Plane API, NOT the webhook payload # (both auto-mode insertion points are launcher-path events where the payload is # absent; src/webhooks/plane.py does not carry `labels`). All three helpers honour # a never-raise contract: a failure degrades to "no label" / "no-op", so the # auto-mode falls back to the manual gate (fail-safe, BR-6/AC-6). # --------------------------------------------------------------------------- # Per-project label-map cache (mirrors _STATES_CACHE / ORCH-068 TTL self-heal). # Each entry: {"map": {normalized_name -> uuid}, "ts": monotonic timestamp}. _LABELS_CACHE: dict[str, dict] = {} def _normalize_label(name: str) -> str: """Normalize a label name for matching (case/whitespace-insensitive).""" return (name or "").strip().casefold() def _labels_record_fresh(record: dict) -> bool: """ORCH-089: is a label-map cache record still within its TTL? ``auto_label_states_ttl_s <= 0`` disables the TTL (lifetime cache, escape hatch mirroring ``_cache_record_fresh`` / ``plane_states_ttl_s``). """ try: ttl = settings.auto_label_states_ttl_s except Exception: # noqa: BLE001 ttl = 0 if ttl <= 0: return True ts = record.get("ts", 0.0) return (time.monotonic() - ts) <= ttl def reload_project_labels(project_id: str = None) -> None: """ORCH-089: clear the per-project label-map cache (tests / config reload).""" global _LABELS_CACHE if project_id is None: _LABELS_CACHE = {} else: _LABELS_CACHE.pop(project_id, None) def get_project_labels(project_id: str) -> dict[str, str]: """ORCH-089: resolve {normalized_label_name -> uuid} for a Plane project. Source of truth: GET /projects//labels/. Cached per project_id with a TTL (``auto_label_states_ttl_s``, default 300s) mirroring ``get_project_states`` so we do not hit the API on every gate. On a transient API failure a stale-but-correct cached map is served (safer-than-empty); with nothing cached -> ``{}`` (caller resolves to "no label" -> manual gate). Ambiguity guard (D1.4): if two distinct project labels normalise to the SAME name, that name is mapped to a sentinel so ``has_label`` treats it as "no match" (fail-safe) instead of silently picking one uuid. never-raise -> ``{}``. """ if not project_id: return {} cached = _LABELS_CACHE.get(project_id) if cached is not None and _labels_record_fresh(cached): return cached["map"] url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/labels/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() body = resp.json() items = body.get("results", body) if isinstance(body, dict) else body if not isinstance(items, list): raise ValueError(f"unexpected labels response shape: {type(items)}") name_map: dict[str, str] = {} ambiguous: set[str] = set() for item in items: uid = item.get("id", "") norm = _normalize_label(item.get("name", "")) if not (uid and norm): continue if norm in name_map and name_map[norm] != uid: # Two distinct labels collide on the normalized name -> ambiguous. ambiguous.add(norm) name_map[norm] = uid for norm in ambiguous: # AMBIGUOUS sentinel: never equals a real issue-label uuid, so # has_label's membership test is False -> fail-safe to the manual gate. name_map[norm] = "__AMBIGUOUS__" logger.warning( "get_project_labels: ambiguous label name %r in project %s " "-> treated as no-match (fail-safe)", norm, project_id[:8], ) _LABELS_CACHE[project_id] = {"map": name_map, "ts": time.monotonic()} logger.debug( "get_project_labels: cached %d labels for project %s...", len(name_map), project_id[:8], ) return name_map except Exception as e: # noqa: BLE001 - never-raise if cached is not None: logger.warning( "get_project_labels: API refresh failed for project %s..., " "serving stale cached map. Error: %s", project_id[:8], e, ) return cached["map"] logger.warning( "get_project_labels: API failed for project %s..., no cache -> {}. " "Error: %s", project_id[:8], e, ) return {} def fetch_issue_labels(work_item_id: str, project_id: str = None) -> list[str] | None: """ORCH-089: GET the issue and return its ``labels`` (a list of label uuids). Returns ``None`` on any error / issue-not-found (DISTINCT from ``[]`` = "the issue has no labels") so the caller can distinguish "could not read" (fail-safe to manual) from "definitely no labels". never-raise. """ project_id = _resolve_project_id(work_item_id, project_id) issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.debug("fetch_issue_labels: issue not found for %s", work_item_id) return None url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() labels = resp.json().get("labels", []) if not isinstance(labels, list): return None return [str(x) for x in labels] except Exception as e: # noqa: BLE001 - never-raise logger.warning("fetch_issue_labels failed for %s: %s", work_item_id, e) return None def set_issue_approved(work_item_id: str, project_id: str = None): """ORCH-089: set issue to 'Approved' — indication of an auto-approved BRD. 1:1 mirror of ``set_issue_in_review``: resolve the per-project Approved UUID (``get_project_states(pid)["approved"]`` — the key already exists in ``_DEFAULT_STATES`` / ``_PLANE_NAME_TO_KEY``) and PATCH the issue. never-raise (via ``_set_issue_state_direct``). The status is transient — the immediately following advance to ``architecture`` overrides it; durable transparency is carried by the log + Telegram + Plane comment (AC-7). """ project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["approved"] _set_issue_state_direct(work_item_id, state_id, project_id) # Feature 3: map an orchestrator stage -> the Plane status to show on the board # when the pipeline ENTERS that stage. ORCH-066: analysis -> Analysis and # review -> Code-Review now have dedicated statuses. deploy keeps in_progress # until its own Phase A/B/C statuses drive it. Needs Input / In Review / Blocked # remain higher priority and are set explicitly elsewhere — do NOT override them # from here. STAGE_VISIBILITY_STATE = { "analysis": "analysis", # ORCH-066: analysis stage -> Analysis status "architecture": "architecture", "development": "development", "review": "code_review", # ORCH-066: review stage -> Code-Review status "testing": "testing", } # STAGE_TO_STATE kept for backward compat (used by tests that patch it). # update_issue_state now calls stage_to_state() instead of looking up here. STAGE_TO_STATE = { "created": _DEFAULT_STATES["todo"], # ORCH-066: analysis -> Analysis, review -> Code-Review. The new keys alias # the same in_progress / review UUIDs in _DEFAULT_STATES, so legacy callers / # tests that compare against concrete UUIDs see byte-identical values. "analysis": _DEFAULT_STATES["analysis"], "architecture": _DEFAULT_STATES["architecture"], "development": _DEFAULT_STATES["development"], "review": _DEFAULT_STATES["code_review"], "testing": _DEFAULT_STATES["testing"], "deploy": _DEFAULT_STATES["in_progress"], "done": _DEFAULT_STATES["done"], } # Map orchestrator stage -> logical state key (project-independent). # ORCH-066: analysis -> analysis, review -> code_review (was in_progress/review). # deploy stays in_progress (Phase A/B/C drive it directly, not update_issue_state). _STAGE_TO_STATE_KEY = { "created": "todo", "analysis": "analysis", "architecture": "architecture", "development": "development", "review": "code_review", "testing": "testing", "deploy": "in_progress", "done": "done", } def stage_to_state(stage: str, project_id: str) -> str | None: """ORCH-10: return the Plane state UUID for a pipeline stage in a project. Resolves via get_project_states so the correct per-project UUID is used. Returns None for unknown stages (same behaviour as the old STAGE_TO_STATE dict lookup returning None). """ key = _STAGE_TO_STATE_KEY.get(stage) if not key: return None return get_project_states(project_id).get(key) def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None: """M-6: GET the Plane issue by UUID and return its sequence_id (the authoritative per-project number), or None if unavailable. Returns None on network error, non-2xx, or a missing field - never raises, so the webhook handler can fall back to DB increment and stay autonomous. """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() seq = resp.json().get("sequence_id") return int(seq) if seq is not None else None except Exception as e: logger.warning(f"fetch_issue_sequence_id failed for {issue_id}: {e}") return None def fetch_issue_state(issue_id: str, project_id: str, timeout: int = 10) -> str | None: """ORCH-060 (F-1 Guard 2): GET the Plane issue and return its current state uuid. Used by the reconciler to honour an explicit human gate: an issue a person moved to **Blocked** / **Needs Input** must not be auto-unblocked by the sweeper. Reuses the exact GET issue-detail endpoint / shared token already used by ``fetch_issue_sequence_id`` / ``fetch_issue_fields``. Plane returns ``state`` as a bare uuid string; older shapes may nest it as a ``{"id": ...}`` dict — both are handled. ORCH-067 (Р-4): ``timeout`` is optional (default 10s — unchanged for the reconciler) so the tracker live-overlay can read with a SHORT timeout (settings.tracker_live_status_timeout_s) on the synchronous render path. Returns None on network error, non-2xx, or a missing field — never raises, so the caller can apply its conservative fallback (treat as "possibly blocked"). """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=timeout) resp.raise_for_status() state = resp.json().get("state") if isinstance(state, dict): state = state.get("id") return str(state) if state else None except Exception as e: logger.warning(f"fetch_issue_state failed for {issue_id}: {e}") return None def fetch_blocked_by_issue_ids(issue_id: str, project_id: str, timeout: int = 10) -> list[str]: """ORCH-026 (B-1): list the Plane issue UUIDs that ``issue_id`` is BLOCKED-BY. Reads the Plane issue-relation endpoint and returns the related issue UUIDs declared as ``blocked_by`` (i.e. the predecessors A that this task B waits for). Plane's relation payload shape has varied across versions, so the parse is defensive: it accepts either a grouped object (``{"blocked_by": [...]}``) or a flat list of ``{"relation_type": ..., "related_issue": ...}`` rows, and pulls a uuid from ``related_issue`` / ``issue`` / ``id`` (bare uuid or nested ``{"id": ...}``). never-raise (AC-G1, self-hosting): a Plane outage / non-2xx / unexpected shape -> ``[]`` (no edge declared), so the ingestion degrades conservatively and the pipeline never stalls on the network. """ if not issue_id or not project_id: return [] url = ( f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}" f"/issues/{issue_id}/issue-relation/" ) try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=timeout) resp.raise_for_status() body = resp.json() except Exception as e: logger.warning(f"fetch_blocked_by_issue_ids failed for {issue_id}: {e}") return [] def _uuid_of(row) -> str | None: if isinstance(row, str): return row if isinstance(row, dict): for key in ("related_issue", "issue", "id"): v = row.get(key) if isinstance(v, dict): v = v.get("id") if v: return str(v) return None out: list[str] = [] try: rows = [] if isinstance(body, dict): # Grouped shape: {"blocked_by": [...], "blocking": [...], ...} if "blocked_by" in body and isinstance(body["blocked_by"], list): rows = body["blocked_by"] else: # Flat shape nested under common envelope keys. rows = body.get("results") or body.get("relations") or [] elif isinstance(body, list): rows = body for row in rows: # In the flat shape, keep only blocked_by rows. if isinstance(row, dict) and row.get("relation_type") not in (None, "blocked_by"): continue uid = _uuid_of(row) if uid and uid != issue_id: out.append(uid) except Exception as e: logger.warning(f"fetch_blocked_by_issue_ids parse error for {issue_id}: {e}") return [] return out import re as _re def _strip_html(html: str) -> str: """Crude HTML -> text: drop tags and collapse whitespace. Good enough to feed QG-0's length check when Plane only gives us description_html.""" if not html: return "" text = _re.sub(r"<[^>]+>", " ", html) return _re.sub(r"\s+", " ", text).strip() def fetch_issue_description(issue_id: str, project_id: str) -> str: """BUG 1: GET the Plane issue by UUID and return its description text. Plane's ``issue.updated`` webhook (e.g. a status change) only carries the CHANGED fields, so ``description``/``description_stripped`` are usually absent there. start_pipeline calls this to pull the full description from the issue detail endpoint so QG-0 does not blow up on an empty payload field. Reuses the exact GET issue detail endpoint / shared token already used by ``fetch_issue_sequence_id`` (same URL, same PLANE_HEADERS). Prefers ``description_stripped``; falls back to stripping ``description_html``. Returns "" on network error, non-2xx, or a missing field - never raises, so a Plane outage degrades to the honest "empty description" QG-0 path instead of crashing the webhook. """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() body = resp.json() desc = body.get("description_stripped") if desc and desc.strip(): return desc return _strip_html(body.get("description_html") or "") except Exception as e: logger.warning(f"fetch_issue_description failed for {issue_id}: {e}") return "" def fetch_issue_fields(issue_id: str, project_id: str) -> tuple[str, str]: """BUG B: GET the Plane issue by UUID ONCE and return (name, description). Plane's ``issue.updated`` webhook (e.g. a status change) only carries the CHANGED fields, so BOTH ``name`` and ``description`` are usually absent in the payload. start_pipeline needs the real title (for the branch slug) and the real description (for the analyst .task.md). To avoid issuing two separate issue-detail GETs (one for name, one for description), this single request returns both. Reuses the exact GET issue detail endpoint / shared token already used by ``fetch_issue_sequence_id`` / ``fetch_issue_description``. For the description it applies the same logic as ``fetch_issue_description`` (prefer ``description_stripped``, fall back to stripping ``description_html``). Returns ("", "") on network error, non-2xx, or missing body - never raises, so a Plane outage degrades gracefully (caller keeps its payload fallbacks). """ url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() body = resp.json() name = (body.get("name") or "").strip() desc = body.get("description_stripped") if desc and desc.strip(): description = desc else: description = _strip_html(body.get("description_html") or "") return name, description except Exception as e: logger.warning(f"fetch_issue_fields failed for {issue_id}: {e}") return "", "" def list_issues_by_state(project_id: str, state_uuids: list[str]) -> list[dict]: """ORCH-053 (F-2): list a project's issues whose state is in ``state_uuids``. GETs ``/workspaces/{ws}/projects/{pid}/issues/`` and walks ALL pages (Plane's cursor pagination: ``results`` + ``next_cursor`` / ``next_page_results``), keeping only issues whose state uuid is one of the requested ones. The filter is applied client-side on ``issue.state`` (a dict ``{id,...}`` or a bare uuid string) so it works regardless of whether Plane's query-param state filter is honoured. Never raises: on any network / API / shape error it logs a warning and returns ``[]`` so a Plane outage degrades the F-2 tick softly instead of crashing it. """ if not project_id or not state_uuids: return [] wanted = set(state_uuids) out: list[dict] = [] url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/" try: cursor = None pages = 0 while True: params: dict = {"per_page": 100} if cursor: params["cursor"] = cursor resp = httpx.get(url, headers=PLANE_HEADERS, params=params, timeout=10) resp.raise_for_status() body = resp.json() if isinstance(body, dict): items = body.get("results", []) else: items = body if isinstance(body, list) else [] for issue in items: state = issue.get("state") sid = state.get("id") if isinstance(state, dict) else state if sid in wanted: out.append(issue) # Pagination: continue only while Plane reports more pages. pages += 1 if not isinstance(body, dict): break has_more = bool(body.get("next_page_results")) next_cursor = body.get("next_cursor") if not has_more or not next_cursor or pages >= 100: break cursor = next_cursor return out except Exception as e: logger.warning( f"list_issues_by_state: API failed for project {project_id[:8]}..., " f"returning []. Error: {e}" ) return [] def find_issue_id(work_item_id: str, project_id: str = None) -> str | None: """Find Plane issue UUID by work_item_id (e.g. 'ET-002').""" project_id = _resolve_project_id(work_item_id, project_id) # Primary: lookup from DB (plane_issue_id column) try: from .db import get_db conn = get_db() row = conn.execute( "SELECT plane_issue_id FROM tasks WHERE work_item_id = ? AND plane_issue_id IS NOT NULL", (work_item_id,) ).fetchone() if row and row[0]: return row[0] except Exception as e: logger.debug(f"DB lookup failed for {work_item_id}: {e}") # Fallback: search via Plane API url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/" try: # First try search by work_item_id resp = httpx.get(url, headers=PLANE_HEADERS, params={"search": work_item_id}, timeout=10) resp.raise_for_status() data = resp.json() results = data.get("results", data if isinstance(data, list) else []) # M-6: match by sequence_id directly (the authoritative per-project # number), parsed from the work_item_id suffix - no hardcoded prefix. try: target_num = int(work_item_id.rsplit("-", 1)[1]) except (IndexError, ValueError): target_num = None for issue in results: if target_num is not None and issue.get("sequence_id") == target_num: return issue["id"] if work_item_id in issue.get("name", ""): return issue["id"] # Fallback: get all issues and match by sequence_id number (any prefix) if target_num is not None: resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp2.raise_for_status() data2 = resp2.json() results2 = data2.get("results", data2 if isinstance(data2, list) else []) for issue in results2: if issue.get("sequence_id") == target_num: return issue["id"] except Exception as e: logger.error(f"Failed to find issue for {work_item_id}: {e}") return None def update_issue_state(work_item_id: str, stage: str, project_id: str = None): """Update Plane issue state based on orchestrator stage.""" project_id = _resolve_project_id(work_item_id, project_id) # ORCH-10: resolve state UUID for this specific project (not global dict). state_id = stage_to_state(stage, project_id) if not state_id: return issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10) resp.raise_for_status() logger.info(f"Plane: {work_item_id} state -> {stage} ({state_id[:8]}...)") except Exception as e: logger.error(f"Failed to update Plane state for {work_item_id}: {e}") def add_comment(work_item_id: str, text: str, project_id: str = None, author: str = None): """Add a comment to a Plane issue. feat(plane): when ``author`` (an agent role) maps to a configured bot token, the comment is POSTed under that bot so Plane shows the real author. Otherwise it falls back to the shared orchestrator token (see ``_headers_for``). GET/PATCH calls elsewhere keep using PLANE_HEADERS. """ project_id = _resolve_project_id(work_item_id, project_id) issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}, skipping comment") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/" html = f"

{text}

" try: resp = httpx.post(url, headers=_headers_for(author), json={"comment_html": html}, timeout=10) resp.raise_for_status() logger.info(f"Plane: comment added to {work_item_id} (author={author or 'orchestrator'})") except Exception as e: logger.error(f"Failed to add comment to {work_item_id}: {e}") def set_issue_needs_input(work_item_id: str, project_id: str = None): """Set issue to 'Needs Input' state — waiting for stakeholder response.""" project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["needs_input"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_in_review(work_item_id: str, project_id: str = None): """Set issue to 'In Review' state — waiting for :approved: or :rejected:.""" project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["in_review"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_blocked(work_item_id: str, project_id: str = None): """Set issue to 'Blocked' state — manual intervention needed.""" project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["blocked"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_done(work_item_id: str, project_id: str = None): """Observability fix: force the issue into the TERMINAL Done state. Used by the deploy->done success path so a completed task always reaches the terminal Plane state (it used to stick on In Progress because the merge webhook bypassed the stage engine). Resolves per-project UUID via get_project_states (ORCH-10). """ project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["done"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_in_progress(work_item_id: str, project_id: str = None): """Set issue to 'In Progress' state — agent working.""" project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["in_progress"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_analysis(work_item_id: str, project_id: str = None): """ORCH-066: set issue to 'Analysis' — analyst is working (start / resume). Degrades to the project's In Progress UUID when the 'Analysis' status is not created (alias-fallback). never-raise (via _set_issue_state_direct). """ project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["analysis"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_code_review(work_item_id: str, project_id: str = None): """ORCH-066: set issue to 'Code-Review' — review stage indication. Degrades to the project's Review UUID when 'Code-Review' is not created. """ project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["code_review"] _set_issue_state_direct(work_item_id, state_id, project_id) def _deploy_status_guarded(work_item_id: str, target: str, reason: str | None) -> bool: """ORCH-094: apply the terminal-window-aware guard for a deploy-phase setter. Returns True iff the caller should PROCEED with the normal PATCH (verdict ALLOW). On CONVERGE_DONE it drives the task to terminal ``Done`` here (the idempotent convergence target) and returns False; on SUPPRESS it does nothing and returns False. never-raise: any error degrades to ALLOW (proceed), keeping behaviour 1:1 with pre-ORCH-094 (the guard leaf itself fails safe to ALLOW). """ try: from . import deploy_status_guard verdict = deploy_status_guard.decide(work_item_id, target, reason=reason) if verdict == deploy_status_guard.CONVERGE_DONE: set_issue_done(work_item_id) return False if verdict == deploy_status_guard.SUPPRESS: return False return True except Exception as e: # noqa: BLE001 - never-raise; proceed (1:1) on doubt logger.warning(f"deploy_status_guard wrapper error for {work_item_id}: {e}") return True def set_issue_awaiting_deploy(work_item_id: str, project_id: str = None, reason: str = None): """ORCH-066: set issue to 'Awaiting Deploy' — self-deploy Phase A approval-pending. Degrades to the project's In Review UUID when 'Awaiting Deploy' is not created. ORCH-094: terminal-window-aware — a task whose DB stage is terminal converges to Done instead of stamping a spurious deploy status (``reason`` = caller, FR-4). """ if not _deploy_status_guarded(work_item_id, "awaiting", reason): return project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["awaiting_deploy"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_deploying(work_item_id: str, project_id: str = None, reason: str = None): """ORCH-066: set issue to 'Deploying' — self-deploy Phase B prod deploy in flight. Degrades to the project's In Progress UUID when 'Deploying' is not created. ORCH-094: terminal-window-aware (see :func:`set_issue_awaiting_deploy`). """ if not _deploy_status_guarded(work_item_id, "deploying", reason): return project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["deploying"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_monitoring(work_item_id: str, project_id: str = None, reason: str = None): """ORCH-066: set issue to 'Monitoring after Deploy' — post-deploy window open. Degrades to the project's Done UUID when 'Monitoring after Deploy' is not created (so the board shows Done, exactly as before ORCH-066). ORCH-094: terminal-window-aware — the LEGITIMATE first Monitoring (DB already ``done`` by the time line 404 runs, but the post-deploy window is active) is allowed; a stale Monitoring after the window has closed converges to Done. """ if not _deploy_status_guarded(work_item_id, "monitoring", reason): return project_id = _resolve_project_id(work_item_id, project_id) state_id = get_project_states(project_id)["monitoring"] _set_issue_state_direct(work_item_id, state_id, project_id) def set_issue_stage_state(work_item_id: str, stage: str, project_id: str = None): """Feature 3: move the issue to the board status for a pipeline stage. Only the visible-stage statuses (architecture/development/review/testing) are driven here — stages without a dedicated status (analysis/deploy) are a no-op so the existing in_progress/in_review/needs_input logic stays in charge. By design this does NOT touch Needs Input / In Review / Blocked, which are higher priority and set explicitly by their own helpers. """ state_key = STAGE_VISIBILITY_STATE.get(stage) if not state_key: return project_id = _resolve_project_id(work_item_id, project_id) # ORCH-10: resolve per-project UUID. state_id = get_project_states(project_id)[state_key] _set_issue_state_direct(work_item_id, state_id, project_id) def _set_issue_state_direct(work_item_id: str, state_id: str, project_id: str = None): """Set issue state directly by state_id.""" project_id = _resolve_project_id(work_item_id, project_id) issue_id = find_issue_id(work_item_id, project_id) if not issue_id: logger.warning(f"Issue not found in Plane for {work_item_id}") return url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/" try: resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10) resp.raise_for_status() logger.info(f"Plane: {work_item_id} state -> {state_id[:8]}...") except Exception as e: logger.error(f"Failed to update Plane state for {work_item_id}: {e}") def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None, project_id: str = None): """Notify Plane about stage transition with links.""" project_id = _resolve_project_id(work_item_id, project_id) update_issue_state(work_item_id, new_stage, project_id) msg = f"{EMOJI_STAGE} Stage: {old_stage} → {new_stage}" if agent: msg += f" (launching {agent})" # Add relevant links. # ORCH-101 (A1): the link base and owner come from Settings — base is # gitea_public_url with a gitea_url fallback (the exact semantics of the # existing consumers notifications._build_brd_link / usage.py), owner is # gitea_owner. No hardcoded host/owner in the executable path. gitea_base = ( getattr(settings, "gitea_public_url", "") or getattr(settings, "gitea_url", "") ).rstrip("/") gitea_owner = getattr(settings, "gitea_owner", "") try: from .db import get_db conn = get_db() row = conn.execute( "SELECT branch, repo FROM tasks WHERE work_item_id=?", (work_item_id,) ).fetchone() conn.close() if row: branch, repo = row msg += chr(10) + "📂 Branch: [" + branch + "](" + gitea_base + "/" + gitea_owner + "/" + repo + "/src/branch/" + branch + ")" if new_stage in ("review", "testing", "deploy"): import httpx as _httpx _headers = {"Authorization": f"token {settings.gitea_token}"} _resp = _httpx.get( f"{settings.gitea_url}/api/v1/repos/{settings.gitea_owner}/{repo}/pulls", params={"state": "open", "head": branch}, headers=_headers, timeout=5 ) if _resp.status_code == 200: _prs = _resp.json() if _prs: pr_num = _prs[0]["number"] msg += chr(10) + "🔗 PR: [#" + str(pr_num) + "](" + gitea_base + "/" + gitea_owner + "/" + repo + "/pulls/" + str(pr_num) + ")" except Exception: pass # Stage transition is the orchestrator's own voice -> attribute to stream. add_comment(work_item_id, msg, project_id, author="stream") def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None): """Notify Plane about QG failure.""" # QG failure belongs to the agent that owns the failing stage. add_comment( work_item_id, f"{EMOJI_QG_FAIL} QG failed at {stage}: {check} — {reason}", project_id, author=STAGE_AUTHORS.get(stage, "stream"), ) def notify_done(work_item_id: str, project_id: str = None): """Mark issue as Done in Plane.""" project_id = _resolve_project_id(work_item_id, project_id) update_issue_state(work_item_id, "done", project_id) # Deploy finished the task -> attribute the completion comment to Deployer. add_comment( work_item_id, f"{EMOJI_DONE} Task completed! PR merged and deployed.", project_id, author="deployer", )