Files
orchestrator/src/plane_sync.py

856 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Plane API sync — update issue state and add comments."""
import logging
import time
import httpx
from .config import settings
logger = logging.getLogger("orchestrator.plane_sync")
# L-3: emoji literals used in Plane comment bodies, named for readability.
# Message text stays byte-for-byte identical to the previous output.
EMOJI_STAGE = "\U0001F504" # stage transition
EMOJI_QG_FAIL = "\u26A0\uFE0F" # quality-gate failure
EMOJI_DONE = "\u2705" # task completed
PLANE_BASE = f"{settings.plane_api_url}/api/v1"
PLANE_HEADERS = {"X-API-Key": settings.plane_api_token}
WORKSPACE = settings.plane_workspace_slug
# feat(plane): per-agent comment authorship.
# Map an agent role -> its dedicated Plane bot token (read from config / env).
# When the token is present, add_comment() POSTs under that bot so Plane shows
# the real author. Empty/unknown role -> fallback to the shared orchestrator
# token (PLANE_HEADERS), so commenting stays autonomous.
PLANE_BOT_TOKENS = {
"analyst": settings.plane_bot_analyst,
"architect": settings.plane_bot_architect,
"developer": settings.plane_bot_developer,
"reviewer": settings.plane_bot_reviewer,
"tester": settings.plane_bot_tester,
"deployer": settings.plane_bot_deployer,
"stream": settings.plane_bot_stream,
}
# Map a pipeline stage -> the agent role that owns work in that stage. Used to
# pick an author for rollback/stage notifications targeting a specific stage.
STAGE_AUTHORS = {
"analysis": "analyst",
"architecture": "architect",
"development": "developer",
"review": "reviewer",
"testing": "tester",
"deploy": "deployer",
}
def _headers_for(author: str | None) -> dict:
"""Return X-API-Key headers for the given agent role.
Falls back to the shared orchestrator token (PLANE_HEADERS /
settings.plane_api_token) when the role is None, unknown, or its bot token
is not configured. This keeps comment posting autonomous: a comment is
always written, just attributed to the orchestrator if no bot is set.
"""
tok = PLANE_BOT_TOKENS.get(author or "") if author else None
return {"X-API-Key": tok} if tok else PLANE_HEADERS
PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c"
def _resolve_project_id(work_item_id: str = None, project_id: str = None) -> str:
"""ORCH-6: resolve the Plane project id for a sync call.
Priority:
1. explicit project_id arg (caller already knows the project),
2. project derived from the task's repo in the DB (by work_item_id),
3. legacy default PROJECT_ID (enduro) for backward compatibility.
"""
if project_id:
return project_id
if work_item_id:
try:
from .db import get_db
from .projects import get_project_by_repo
conn = get_db()
row = conn.execute(
"SELECT repo FROM tasks WHERE work_item_id = ? ORDER BY id DESC LIMIT 1",
(work_item_id,),
).fetchone()
conn.close()
if row and row[0]:
proj = get_project_by_repo(row[0])
if proj:
return proj.plane_project_id
except Exception as e:
logger.debug(f"_resolve_project_id fallback for {work_item_id}: {e}")
return PROJECT_ID
# ORCH-10: per-project state resolution.
#
# _DEFAULT_STATES keeps the original enduro-trails UUIDs as a safe fallback
# (used when the Plane API is unreachable and for backward compat).
# PLANE_STATES is preserved as an alias so existing call sites that reference
# it directly (QG-0 fast-path in webhooks/plane.py, tests) continue to work.
_DEFAULT_STATES = {
"backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122",
"todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10",
"in_progress": "b873d9eb-993c-48cd-97ac-99a9b1623967",
"needs_input": "babf08a3-ff4d-41f3-a821-5491aa29a8ac",
"in_review": "38fb1f64-aa1e-48a3-92e0-0b109679046b",
"blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920",
"done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8",
"cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17",
# Feature 3 (stage visibility) — per-stage statuses on the board.
"architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d",
"development": "9920609b-f140-4e46-ab95-89acda8412c8",
"review": "ba0d802c-5218-41d4-ab43-978b0ea123ed",
"testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02",
# Feature 2 (verdict statuses) — Approved / Rejected.
"approved": "a519a341-dada-4a91-8910-7604f82b79c5",
"rejected": "ba958f3c-5db5-461d-8f82-89425e413b97",
# ORCH-066 (meaningful Plane status model, layer B): six new logical keys.
# Their _DEFAULT_STATES values alias the enduro-trails UUID of their BASE key
# (see _STATE_ALIAS_FALLBACK) so a project without these statuses created
# (enduro / Plane down / partial config) degrades to the current behaviour
# instead of producing an invalid PATCH state. The project-relative
# alias-fallback in get_project_states() overrides these with the *project's
# own* base UUID on the success path; these defaults are the last resort.
"to_analyse": "b873d9eb-993c-48cd-97ac-99a9b1623967", # = in_progress
"analysis": "b873d9eb-993c-48cd-97ac-99a9b1623967", # = in_progress
"code_review": "ba0d802c-5218-41d4-ab43-978b0ea123ed", # = review
"awaiting_deploy": "38fb1f64-aa1e-48a3-92e0-0b109679046b", # = in_review
"deploying": "b873d9eb-993c-48cd-97ac-99a9b1623967", # = in_progress
"monitoring": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", # = done
}
# Backward-compat alias — do NOT remove (tests + webhooks/plane.py import it).
PLANE_STATES = _DEFAULT_STATES
# Mapping: Plane state *name* (as returned by the API) -> logical key.
_PLANE_NAME_TO_KEY: dict[str, str] = {
"Backlog": "backlog",
"Todo": "todo",
"In Progress": "in_progress",
"Architecture": "architecture",
"Development": "development",
"Review": "review",
"Testing": "testing",
"Approved": "approved",
"Rejected": "rejected",
"Done": "done",
"Cancelled": "cancelled",
"Needs Input": "needs_input",
"In Review": "in_review",
"Blocked": "blocked",
# ORCH-059: dedicated prod-deploy trigger status, distinct from the
# human-gate "Approved". Resolved from the live Plane API for the ORCH
# project; intentionally ABSENT from _DEFAULT_STATES so environments without
# this board status (enduro / API fallback) fail-closed — no UUID, no
# confirm-deploy branch, no KeyError (accessed via .get).
"Confirm Deploy": "confirm_deploy",
# ORCH-066: meaningful per-stage / human-input statuses (layer B).
"To Analyse": "to_analyse",
"Analysis": "analysis",
"Code-Review": "code_review",
"Awaiting Deploy": "awaiting_deploy",
"Deploying": "deploying",
"Monitoring after Deploy": "monitoring",
}
# ORCH-066 (BR-12): project-relative alias-fallback for the new logical keys.
# After resolving states by name from the Plane API, any NEW key the project did
# not define degrades to the UUID of its BASE key **from the same project** — so
# the indication falls back to the current status and the PATCH stays valid even
# for a partially-configured project. Enduro (none of the new statuses created)
# collapses every new key onto its base, i.e. strictly the pre-ORCH-066
# behaviour. Strengthened ORCH-059 AC-7 pattern.
_STATE_ALIAS_FALLBACK: dict[str, str] = {
"to_analyse": "in_progress",
"analysis": "in_progress",
"code_review": "review",
"awaiting_deploy": "in_review",
"deploying": "in_progress",
"monitoring": "done",
}
# Per-project state cache (ORCH-10 + ORCH-068).
#
# Each entry is a RECORD, not a bare mapping:
# {"states": {logical_key: state_uuid}, # the ORCH-10 mapping (unchanged shape)
# "groups": {state_uuid: group}, # ORCH-068 D1: {uuid -> Plane state.group}
# "ts": monotonic timestamp} # ORCH-068 TR-4: for TTL self-heal
# get_project_states() still RETURNS the bare {logical_key: state_uuid} mapping
# (backward compatible — AC-13); the richer record is internal.
_STATES_CACHE: dict[str, dict] = {}
def _cache_record_fresh(record: dict) -> bool:
"""ORCH-068 (TR-4): is a cache record still within its TTL?
``plane_states_ttl_s <= 0`` disables the TTL -> a record never expires
(strictly the previous lifetime-cache behaviour, back-compat escape hatch).
"""
ttl = settings.plane_states_ttl_s
if ttl <= 0:
return True
ts = record.get("ts", 0.0)
return (time.monotonic() - ts) <= ttl
def get_project_states(project_id: str) -> dict[str, str]:
"""ORCH-10: resolve {logical_key -> state_uuid} for a specific Plane project.
Source of truth: Plane API GET /projects/<project_id>/states/.
Results are cached per project_id. ORCH-068 (TR-4): a cached entry is
re-fetched once it is older than ``plane_states_ttl_s`` (default 300s) so a
status added to Plane after start self-heals without a process restart;
``plane_states_ttl_s = 0`` keeps the previous lifetime cache.
Falls back to _DEFAULT_STATES (enduro-trails values) if:
* project_id is empty/None,
* the API call fails (network error, non-2xx) AND nothing is cached,
* the response contains no recognisable states.
The enduro-trails project therefore returns the same UUIDs as before
(backward compatible). The orchestrator project returns its own UUIDs,
fixing the ORCH-10 blocker.
"""
if not project_id:
return _DEFAULT_STATES
cached = _STATES_CACHE.get(project_id)
if cached is not None and _cache_record_fresh(cached):
return cached["states"]
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/states/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
body = resp.json()
# Plane returns {"results": [...]} or a bare list.
items = body.get("results", body) if isinstance(body, dict) else body
if not isinstance(items, list):
raise ValueError(f"unexpected states response shape: {type(items)}")
resolved: dict[str, str] = {}
groups: dict[str, str] = {}
for item in items:
name = item.get("name", "")
uid = item.get("id", "")
key = _PLANE_NAME_TO_KEY.get(name)
if key and uid:
resolved[key] = uid
# ORCH-068 D1: capture {uuid -> group} for terminal-state detection
# (a single API fetch — no extra network cost). The group is the
# authoritative, project-independent discriminator of terminal
# (completed/cancelled) vs review/work statuses, robust to UUID
# aliasing after status renames (ORCH-066).
grp = item.get("group", "")
if uid and grp:
groups[uid] = grp
if not resolved:
raise ValueError("no recognisable states in API response")
# ORCH-066 (BR-12): project-relative alias-fallback. For each NEW key the
# project did not define, reuse the UUID of its BASE key FROM THIS SAME
# PROJECT (never a foreign/enduro UUID — that would yield an invalid PATCH
# state on a partially-configured orchestrator project). Runs BEFORE the
# _DEFAULT_STATES.setdefault below so a project's own base UUID wins over
# the static enduro default.
for new_key, base_key in _STATE_ALIAS_FALLBACK.items():
if new_key not in resolved and resolved.get(base_key):
resolved[new_key] = resolved[base_key]
# Fill any missing keys from _DEFAULT_STATES so callers always get a
# complete mapping (defensive against partial Plane configs).
for k, v in _DEFAULT_STATES.items():
resolved.setdefault(k, v)
_STATES_CACHE[project_id] = {
"states": resolved,
"groups": groups,
"ts": time.monotonic(),
}
logger.debug(
f"get_project_states: cached {len(resolved)} states / "
f"{len(groups)} groups for project {project_id[:8]}..."
)
return resolved
except Exception as e:
# On a transient API failure keep serving the stale (but project-correct)
# set if we have one — far safer than reverting to enduro defaults.
if cached is not None:
logger.warning(
f"get_project_states: API refresh failed for project "
f"{project_id[:8]}..., serving stale cached set. Error: {e}"
)
return cached["states"]
logger.warning(
f"get_project_states: API failed for project {project_id[:8]}..., "
f"falling back to _DEFAULT_STATES. Error: {e}"
)
return _DEFAULT_STATES
def get_project_state_groups(project_id: str) -> dict[str, str]:
"""ORCH-068 (D1): return {state_uuid -> group} for a Plane project.
Reads the SAME cache record populated by ``get_project_states`` (no extra
network call). Call ``get_project_states(project_id)`` first to ensure the
record is fresh/populated. Returns ``{}`` when nothing is cached (e.g. the
API was unreachable and the caller fell back to ``_DEFAULT_STATES``); the
reconciler then falls back to logical terminal keys.
"""
record = _STATES_CACHE.get(project_id)
if isinstance(record, dict):
groups = record.get("groups")
if isinstance(groups, dict):
return groups
return {}
def reload_project_states(project_id: str = None) -> None:
"""ORCH-10: clear the per-project states cache.
If project_id is given, evict only that project.
If None, flush the entire cache (useful in tests and after config reload).
"""
global _STATES_CACHE
if project_id is None:
_STATES_CACHE = {}
logger.debug("reload_project_states: full cache cleared")
else:
_STATES_CACHE.pop(project_id, None)
logger.debug(f"reload_project_states: evicted project {project_id[:8]}...")
# Feature 3: map an orchestrator stage -> the Plane status to show on the board
# when the pipeline ENTERS that stage. ORCH-066: analysis -> Analysis and
# review -> Code-Review now have dedicated statuses. deploy keeps in_progress
# until its own Phase A/B/C statuses drive it. Needs Input / In Review / Blocked
# remain higher priority and are set explicitly elsewhere — do NOT override them
# from here.
STAGE_VISIBILITY_STATE = {
"analysis": "analysis", # ORCH-066: analysis stage -> Analysis status
"architecture": "architecture",
"development": "development",
"review": "code_review", # ORCH-066: review stage -> Code-Review status
"testing": "testing",
}
# STAGE_TO_STATE kept for backward compat (used by tests that patch it).
# update_issue_state now calls stage_to_state() instead of looking up here.
STAGE_TO_STATE = {
"created": _DEFAULT_STATES["todo"],
# ORCH-066: analysis -> Analysis, review -> Code-Review. The new keys alias
# the same in_progress / review UUIDs in _DEFAULT_STATES, so legacy callers /
# tests that compare against concrete UUIDs see byte-identical values.
"analysis": _DEFAULT_STATES["analysis"],
"architecture": _DEFAULT_STATES["architecture"],
"development": _DEFAULT_STATES["development"],
"review": _DEFAULT_STATES["code_review"],
"testing": _DEFAULT_STATES["testing"],
"deploy": _DEFAULT_STATES["in_progress"],
"done": _DEFAULT_STATES["done"],
}
# Map orchestrator stage -> logical state key (project-independent).
# ORCH-066: analysis -> analysis, review -> code_review (was in_progress/review).
# deploy stays in_progress (Phase A/B/C drive it directly, not update_issue_state).
_STAGE_TO_STATE_KEY = {
"created": "todo",
"analysis": "analysis",
"architecture": "architecture",
"development": "development",
"review": "code_review",
"testing": "testing",
"deploy": "in_progress",
"done": "done",
}
def stage_to_state(stage: str, project_id: str) -> str | None:
"""ORCH-10: return the Plane state UUID for a pipeline stage in a project.
Resolves via get_project_states so the correct per-project UUID is used.
Returns None for unknown stages (same behaviour as the old STAGE_TO_STATE
dict lookup returning None).
"""
key = _STAGE_TO_STATE_KEY.get(stage)
if not key:
return None
return get_project_states(project_id).get(key)
def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
"""M-6: GET the Plane issue by UUID and return its sequence_id (the
authoritative per-project number), or None if unavailable.
Returns None on network error, non-2xx, or a missing field - never raises,
so the webhook handler can fall back to DB increment and stay autonomous.
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
seq = resp.json().get("sequence_id")
return int(seq) if seq is not None else None
except Exception as e:
logger.warning(f"fetch_issue_sequence_id failed for {issue_id}: {e}")
return None
def fetch_issue_state(issue_id: str, project_id: str, timeout: int = 10) -> str | None:
"""ORCH-060 (F-1 Guard 2): GET the Plane issue and return its current state uuid.
Used by the reconciler to honour an explicit human gate: an issue a person
moved to **Blocked** / **Needs Input** must not be auto-unblocked by the
sweeper. Reuses the exact GET issue-detail endpoint / shared token already
used by ``fetch_issue_sequence_id`` / ``fetch_issue_fields``.
Plane returns ``state`` as a bare uuid string; older shapes may nest it as a
``{"id": ...}`` dict — both are handled.
ORCH-067 (Р-4): ``timeout`` is optional (default 10s — unchanged for the
reconciler) so the tracker live-overlay can read with a SHORT timeout
(settings.tracker_live_status_timeout_s) on the synchronous render path.
Returns None on network error, non-2xx, or a missing field — never raises, so
the caller can apply its conservative fallback (treat as "possibly blocked").
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=timeout)
resp.raise_for_status()
state = resp.json().get("state")
if isinstance(state, dict):
state = state.get("id")
return str(state) if state else None
except Exception as e:
logger.warning(f"fetch_issue_state failed for {issue_id}: {e}")
return None
import re as _re
def _strip_html(html: str) -> str:
"""Crude HTML -> text: drop tags and collapse whitespace. Good enough to
feed QG-0's length check when Plane only gives us description_html."""
if not html:
return ""
text = _re.sub(r"<[^>]+>", " ", html)
return _re.sub(r"\s+", " ", text).strip()
def fetch_issue_description(issue_id: str, project_id: str) -> str:
"""BUG 1: GET the Plane issue by UUID and return its description text.
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
CHANGED fields, so ``description``/``description_stripped`` are usually
absent there. start_pipeline calls this to pull the full description from the
issue detail endpoint so QG-0 does not blow up on an empty payload field.
Reuses the exact GET issue detail endpoint / shared token already used by
``fetch_issue_sequence_id`` (same URL, same PLANE_HEADERS). Prefers
``description_stripped``; falls back to stripping ``description_html``.
Returns "" on network error, non-2xx, or a missing field - never raises, so
a Plane outage degrades to the honest "empty description" QG-0 path instead
of crashing the webhook.
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
body = resp.json()
desc = body.get("description_stripped")
if desc and desc.strip():
return desc
return _strip_html(body.get("description_html") or "")
except Exception as e:
logger.warning(f"fetch_issue_description failed for {issue_id}: {e}")
return ""
def fetch_issue_fields(issue_id: str, project_id: str) -> tuple[str, str]:
"""BUG B: GET the Plane issue by UUID ONCE and return (name, description).
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
CHANGED fields, so BOTH ``name`` and ``description`` are usually absent in
the payload. start_pipeline needs the real title (for the branch slug) and
the real description (for the analyst .task.md). To avoid issuing two
separate issue-detail GETs (one for name, one for description), this single
request returns both.
Reuses the exact GET issue detail endpoint / shared token already used by
``fetch_issue_sequence_id`` / ``fetch_issue_description``. For the
description it applies the same logic as ``fetch_issue_description``
(prefer ``description_stripped``, fall back to stripping
``description_html``).
Returns ("", "") on network error, non-2xx, or missing body - never raises,
so a Plane outage degrades gracefully (caller keeps its payload fallbacks).
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
body = resp.json()
name = (body.get("name") or "").strip()
desc = body.get("description_stripped")
if desc and desc.strip():
description = desc
else:
description = _strip_html(body.get("description_html") or "")
return name, description
except Exception as e:
logger.warning(f"fetch_issue_fields failed for {issue_id}: {e}")
return "", ""
def list_issues_by_state(project_id: str, state_uuids: list[str]) -> list[dict]:
"""ORCH-053 (F-2): list a project's issues whose state is in ``state_uuids``.
GETs ``/workspaces/{ws}/projects/{pid}/issues/`` and walks ALL pages
(Plane's cursor pagination: ``results`` + ``next_cursor`` /
``next_page_results``), keeping only issues whose state uuid is one of the
requested ones. The filter is applied client-side on ``issue.state`` (a dict
``{id,...}`` or a bare uuid string) so it works regardless of whether Plane's
query-param state filter is honoured.
Never raises: on any network / API / shape error it logs a warning and
returns ``[]`` so a Plane outage degrades the F-2 tick softly instead of
crashing it.
"""
if not project_id or not state_uuids:
return []
wanted = set(state_uuids)
out: list[dict] = []
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/"
try:
cursor = None
pages = 0
while True:
params: dict = {"per_page": 100}
if cursor:
params["cursor"] = cursor
resp = httpx.get(url, headers=PLANE_HEADERS, params=params, timeout=10)
resp.raise_for_status()
body = resp.json()
if isinstance(body, dict):
items = body.get("results", [])
else:
items = body if isinstance(body, list) else []
for issue in items:
state = issue.get("state")
sid = state.get("id") if isinstance(state, dict) else state
if sid in wanted:
out.append(issue)
# Pagination: continue only while Plane reports more pages.
pages += 1
if not isinstance(body, dict):
break
has_more = bool(body.get("next_page_results"))
next_cursor = body.get("next_cursor")
if not has_more or not next_cursor or pages >= 100:
break
cursor = next_cursor
return out
except Exception as e:
logger.warning(
f"list_issues_by_state: API failed for project {project_id[:8]}..., "
f"returning []. Error: {e}"
)
return []
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
project_id = _resolve_project_id(work_item_id, project_id)
# Primary: lookup from DB (plane_issue_id column)
try:
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT plane_issue_id FROM tasks WHERE work_item_id = ? AND plane_issue_id IS NOT NULL",
(work_item_id,)
).fetchone()
if row and row[0]:
return row[0]
except Exception as e:
logger.debug(f"DB lookup failed for {work_item_id}: {e}")
# Fallback: search via Plane API
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/"
try:
# First try search by work_item_id
resp = httpx.get(url, headers=PLANE_HEADERS, params={"search": work_item_id}, timeout=10)
resp.raise_for_status()
data = resp.json()
results = data.get("results", data if isinstance(data, list) else [])
# M-6: match by sequence_id directly (the authoritative per-project
# number), parsed from the work_item_id suffix - no hardcoded prefix.
try:
target_num = int(work_item_id.rsplit("-", 1)[1])
except (IndexError, ValueError):
target_num = None
for issue in results:
if target_num is not None and issue.get("sequence_id") == target_num:
return issue["id"]
if work_item_id in issue.get("name", ""):
return issue["id"]
# Fallback: get all issues and match by sequence_id number (any prefix)
if target_num is not None:
resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp2.raise_for_status()
data2 = resp2.json()
results2 = data2.get("results", data2 if isinstance(data2, list) else [])
for issue in results2:
if issue.get("sequence_id") == target_num:
return issue["id"]
except Exception as e:
logger.error(f"Failed to find issue for {work_item_id}: {e}")
return None
def update_issue_state(work_item_id: str, stage: str, project_id: str = None):
"""Update Plane issue state based on orchestrator stage."""
project_id = _resolve_project_id(work_item_id, project_id)
# ORCH-10: resolve state UUID for this specific project (not global dict).
state_id = stage_to_state(stage, project_id)
if not state_id:
return
issue_id = find_issue_id(work_item_id, project_id)
if not issue_id:
logger.warning(f"Issue not found in Plane for {work_item_id}")
return
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
resp.raise_for_status()
logger.info(f"Plane: {work_item_id} state -> {stage} ({state_id[:8]}...)")
except Exception as e:
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
def add_comment(work_item_id: str, text: str, project_id: str = None, author: str = None):
"""Add a comment to a Plane issue.
feat(plane): when ``author`` (an agent role) maps to a configured bot
token, the comment is POSTed under that bot so Plane shows the real author.
Otherwise it falls back to the shared orchestrator token (see
``_headers_for``). GET/PATCH calls elsewhere keep using PLANE_HEADERS.
"""
project_id = _resolve_project_id(work_item_id, project_id)
issue_id = find_issue_id(work_item_id, project_id)
if not issue_id:
logger.warning(f"Issue not found in Plane for {work_item_id}, skipping comment")
return
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/"
html = f"<p>{text}</p>"
try:
resp = httpx.post(url, headers=_headers_for(author), json={"comment_html": html}, timeout=10)
resp.raise_for_status()
logger.info(f"Plane: comment added to {work_item_id} (author={author or 'orchestrator'})")
except Exception as e:
logger.error(f"Failed to add comment to {work_item_id}: {e}")
def set_issue_needs_input(work_item_id: str, project_id: str = None):
"""Set issue to 'Needs Input' state — waiting for stakeholder response."""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["needs_input"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_in_review(work_item_id: str, project_id: str = None):
"""Set issue to 'In Review' state — waiting for :approved: or :rejected:."""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["in_review"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_blocked(work_item_id: str, project_id: str = None):
"""Set issue to 'Blocked' state — manual intervention needed."""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["blocked"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_done(work_item_id: str, project_id: str = None):
"""Observability fix: force the issue into the TERMINAL Done state.
Used by the deploy->done success path so a completed task always reaches the
terminal Plane state (it used to stick on In Progress because the merge
webhook bypassed the stage engine). Resolves per-project UUID via
get_project_states (ORCH-10).
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["done"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_in_progress(work_item_id: str, project_id: str = None):
"""Set issue to 'In Progress' state — agent working."""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["in_progress"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_analysis(work_item_id: str, project_id: str = None):
"""ORCH-066: set issue to 'Analysis' — analyst is working (start / resume).
Degrades to the project's In Progress UUID when the 'Analysis' status is not
created (alias-fallback). never-raise (via _set_issue_state_direct).
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["analysis"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_code_review(work_item_id: str, project_id: str = None):
"""ORCH-066: set issue to 'Code-Review' — review stage indication.
Degrades to the project's Review UUID when 'Code-Review' is not created.
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["code_review"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_awaiting_deploy(work_item_id: str, project_id: str = None):
"""ORCH-066: set issue to 'Awaiting Deploy' — self-deploy Phase A approval-pending.
Degrades to the project's In Review UUID when 'Awaiting Deploy' is not created.
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["awaiting_deploy"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_deploying(work_item_id: str, project_id: str = None):
"""ORCH-066: set issue to 'Deploying' — self-deploy Phase B prod deploy in flight.
Degrades to the project's In Progress UUID when 'Deploying' is not created.
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["deploying"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_monitoring(work_item_id: str, project_id: str = None):
"""ORCH-066: set issue to 'Monitoring after Deploy' — post-deploy window open.
Degrades to the project's Done UUID when 'Monitoring after Deploy' is not
created (so the board shows Done, exactly as before ORCH-066).
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["monitoring"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_stage_state(work_item_id: str, stage: str, project_id: str = None):
"""Feature 3: move the issue to the board status for a pipeline stage.
Only the visible-stage statuses (architecture/development/review/testing)
are driven here — stages without a dedicated status (analysis/deploy) are a
no-op so the existing in_progress/in_review/needs_input logic stays in
charge. By design this does NOT touch Needs Input / In Review / Blocked,
which are higher priority and set explicitly by their own helpers.
"""
state_key = STAGE_VISIBILITY_STATE.get(stage)
if not state_key:
return
project_id = _resolve_project_id(work_item_id, project_id)
# ORCH-10: resolve per-project UUID.
state_id = get_project_states(project_id)[state_key]
_set_issue_state_direct(work_item_id, state_id, project_id)
def _set_issue_state_direct(work_item_id: str, state_id: str, project_id: str = None):
"""Set issue state directly by state_id."""
project_id = _resolve_project_id(work_item_id, project_id)
issue_id = find_issue_id(work_item_id, project_id)
if not issue_id:
logger.warning(f"Issue not found in Plane for {work_item_id}")
return
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
resp.raise_for_status()
logger.info(f"Plane: {work_item_id} state -> {state_id[:8]}...")
except Exception as e:
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None, project_id: str = None):
"""Notify Plane about stage transition with links."""
project_id = _resolve_project_id(work_item_id, project_id)
update_issue_state(work_item_id, new_stage, project_id)
msg = f"{EMOJI_STAGE} Stage: {old_stage}{new_stage}"
if agent:
msg += f" (launching {agent})"
# Add relevant links
gitea_base = "http://git.mva154.duckdns.org"
try:
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT branch, repo FROM tasks WHERE work_item_id=?", (work_item_id,)
).fetchone()
conn.close()
if row:
branch, repo = row
msg += chr(10) + "📂 Branch: [" + branch + "](" + gitea_base + "/admin/" + repo + "/src/branch/" + branch + ")"
if new_stage in ("review", "testing", "deploy"):
import httpx as _httpx
from .config import settings
_headers = {"Authorization": f"token {settings.gitea_token}"}
_resp = _httpx.get(
f"{settings.gitea_url}/api/v1/repos/{settings.gitea_owner}/{repo}/pulls",
params={"state": "open", "head": branch},
headers=_headers, timeout=5
)
if _resp.status_code == 200:
_prs = _resp.json()
if _prs:
pr_num = _prs[0]["number"]
msg += chr(10) + "🔗 PR: [#" + str(pr_num) + "](" + gitea_base + "/admin/" + repo + "/pulls/" + str(pr_num) + ")"
except Exception:
pass
# Stage transition is the orchestrator's own voice -> attribute to stream.
add_comment(work_item_id, msg, project_id, author="stream")
def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None):
"""Notify Plane about QG failure."""
# QG failure belongs to the agent that owns the failing stage.
add_comment(
work_item_id,
f"{EMOJI_QG_FAIL} QG failed at {stage}: {check}{reason}",
project_id,
author=STAGE_AUTHORS.get(stage, "stream"),
)
def notify_done(work_item_id: str, project_id: str = None):
"""Mark issue as Done in Plane."""
project_id = _resolve_project_id(work_item_id, project_id)
update_issue_state(work_item_id, "done", project_id)
# Deploy finished the task -> attribute the completion comment to Deployer.
add_comment(
work_item_id,
f"{EMOJI_DONE} Task completed! PR merged and deployed.",
project_id,
author="deployer",
)