ORCH-10 root cause: PLANE_STATES was a global dict hardcoding enduro-trails
UUIDs. The webhook comparison only
matched ET UUID (b873d9eb) and silently ignored the ORCH in_progress UUID
(e331bfb3), blocking pipeline start for all orchestrator-project tasks.
Changes:
- src/plane_sync.py:
* Rename PLANE_STATES -> _DEFAULT_STATES (enduro UUIDs kept as safe fallback).
* PLANE_STATES preserved as alias to _DEFAULT_STATES (backward compat).
* Add get_project_states(project_id) -> {logical_key: state_uuid}:
fetches Plane API GET /projects/<id>/states/, maps by state name,
caches per project_id, falls back to _DEFAULT_STATES on API failure.
* Add _STATES_CACHE: dict, reload_project_states(project_id=None).
* Add _PLANE_NAME_TO_KEY mapping and _STAGE_TO_STATE_KEY for clean lookup.
* Add stage_to_state(stage, project_id) using get_project_states().
* update_issue_state() uses stage_to_state() instead of STAGE_TO_STATE dict.
* set_issue_{needs_input,in_review,blocked,done,in_progress,stage_state}()
all resolve state UUID via get_project_states(project_id) instead of
the global PLANE_STATES dict.
- src/webhooks/plane.py:
* handle_issue_updated: import get_project_states, resolve proj_states per
incoming project_id, compare new_state against proj_states["in_progress"],
proj_states["approved"], proj_states["rejected"].
* start_pipeline QG-0 blocked path: use get_project_states(plane_project_id)
instead of PLANE_STATES["blocked"].
- tests/test_orch10_states.py: 23 new tests covering:
* get_project_states returns correct UUIDs for both ET and ORCH projects.
* API failure / empty response / None project_id -> _DEFAULT_STATES fallback.
* Caching and reload_project_states (per-project and full flush).
* stage_to_state() per-project resolution.
* Webhook in_progress triggers pipeline for BOTH b873d9eb (ET) and e331bfb3 (ORCH).
* Webhook approved/rejected routes correctly per project.
* PLANE_STATES alias and _DEFAULT_STATES backward compat.
593 lines
24 KiB
Python
593 lines
24 KiB
Python
"""Plane API sync — update issue state and add comments."""
|
|
|
|
import logging
|
|
import httpx
|
|
from .config import settings
|
|
|
|
logger = logging.getLogger("orchestrator.plane_sync")
|
|
|
|
# L-3: emoji literals used in Plane comment bodies, named for readability.
|
|
# Message text stays byte-for-byte identical to the previous output.
|
|
EMOJI_STAGE = "\U0001F504" # stage transition
|
|
EMOJI_QG_FAIL = "\u26A0\uFE0F" # quality-gate failure
|
|
EMOJI_DONE = "\u2705" # task completed
|
|
|
|
PLANE_BASE = f"{settings.plane_api_url}/api/v1"
|
|
PLANE_HEADERS = {"X-API-Key": settings.plane_api_token}
|
|
WORKSPACE = settings.plane_workspace_slug
|
|
|
|
# feat(plane): per-agent comment authorship.
|
|
# Map an agent role -> its dedicated Plane bot token (read from config / env).
|
|
# When the token is present, add_comment() POSTs under that bot so Plane shows
|
|
# the real author. Empty/unknown role -> fallback to the shared orchestrator
|
|
# token (PLANE_HEADERS), so commenting stays autonomous.
|
|
PLANE_BOT_TOKENS = {
|
|
"analyst": settings.plane_bot_analyst,
|
|
"architect": settings.plane_bot_architect,
|
|
"developer": settings.plane_bot_developer,
|
|
"reviewer": settings.plane_bot_reviewer,
|
|
"tester": settings.plane_bot_tester,
|
|
"deployer": settings.plane_bot_deployer,
|
|
"stream": settings.plane_bot_stream,
|
|
}
|
|
|
|
# Map a pipeline stage -> the agent role that owns work in that stage. Used to
|
|
# pick an author for rollback/stage notifications targeting a specific stage.
|
|
STAGE_AUTHORS = {
|
|
"analysis": "analyst",
|
|
"architecture": "architect",
|
|
"development": "developer",
|
|
"review": "reviewer",
|
|
"testing": "tester",
|
|
"deploy": "deployer",
|
|
}
|
|
|
|
|
|
def _headers_for(author: str | None) -> dict:
|
|
"""Return X-API-Key headers for the given agent role.
|
|
|
|
Falls back to the shared orchestrator token (PLANE_HEADERS /
|
|
settings.plane_api_token) when the role is None, unknown, or its bot token
|
|
is not configured. This keeps comment posting autonomous: a comment is
|
|
always written, just attributed to the orchestrator if no bot is set.
|
|
"""
|
|
tok = PLANE_BOT_TOKENS.get(author or "") if author else None
|
|
return {"X-API-Key": tok} if tok else PLANE_HEADERS
|
|
PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
|
|
|
|
|
def _resolve_project_id(work_item_id: str = None, project_id: str = None) -> str:
|
|
"""ORCH-6: resolve the Plane project id for a sync call.
|
|
|
|
Priority:
|
|
1. explicit project_id arg (caller already knows the project),
|
|
2. project derived from the task's repo in the DB (by work_item_id),
|
|
3. legacy default PROJECT_ID (enduro) for backward compatibility.
|
|
"""
|
|
if project_id:
|
|
return project_id
|
|
if work_item_id:
|
|
try:
|
|
from .db import get_db
|
|
from .projects import get_project_by_repo
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT repo FROM tasks WHERE work_item_id = ? ORDER BY id DESC LIMIT 1",
|
|
(work_item_id,),
|
|
).fetchone()
|
|
conn.close()
|
|
if row and row[0]:
|
|
proj = get_project_by_repo(row[0])
|
|
if proj:
|
|
return proj.plane_project_id
|
|
except Exception as e:
|
|
logger.debug(f"_resolve_project_id fallback for {work_item_id}: {e}")
|
|
return PROJECT_ID
|
|
|
|
# ORCH-10: per-project state resolution.
|
|
#
|
|
# _DEFAULT_STATES keeps the original enduro-trails UUIDs as a safe fallback
|
|
# (used when the Plane API is unreachable and for backward compat).
|
|
# PLANE_STATES is preserved as an alias so existing call sites that reference
|
|
# it directly (QG-0 fast-path in webhooks/plane.py, tests) continue to work.
|
|
_DEFAULT_STATES = {
|
|
"backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122",
|
|
"todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10",
|
|
"in_progress": "b873d9eb-993c-48cd-97ac-99a9b1623967",
|
|
"needs_input": "babf08a3-ff4d-41f3-a821-5491aa29a8ac",
|
|
"in_review": "38fb1f64-aa1e-48a3-92e0-0b109679046b",
|
|
"blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920",
|
|
"done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8",
|
|
"cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17",
|
|
# Feature 3 (stage visibility) — per-stage statuses on the board.
|
|
"architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d",
|
|
"development": "9920609b-f140-4e46-ab95-89acda8412c8",
|
|
"review": "ba0d802c-5218-41d4-ab43-978b0ea123ed",
|
|
"testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02",
|
|
# Feature 2 (verdict statuses) — Approved / Rejected.
|
|
"approved": "a519a341-dada-4a91-8910-7604f82b79c5",
|
|
"rejected": "ba958f3c-5db5-461d-8f82-89425e413b97",
|
|
}
|
|
|
|
# Backward-compat alias — do NOT remove (tests + webhooks/plane.py import it).
|
|
PLANE_STATES = _DEFAULT_STATES
|
|
|
|
# Mapping: Plane state *name* (as returned by the API) -> logical key.
|
|
_PLANE_NAME_TO_KEY: dict[str, str] = {
|
|
"Backlog": "backlog",
|
|
"Todo": "todo",
|
|
"In Progress": "in_progress",
|
|
"Architecture": "architecture",
|
|
"Development": "development",
|
|
"Review": "review",
|
|
"Testing": "testing",
|
|
"Approved": "approved",
|
|
"Rejected": "rejected",
|
|
"Done": "done",
|
|
"Cancelled": "cancelled",
|
|
"Needs Input": "needs_input",
|
|
"In Review": "in_review",
|
|
"Blocked": "blocked",
|
|
}
|
|
|
|
# Per-project state cache: {project_id: {logical_key: state_uuid}}
|
|
_STATES_CACHE: dict[str, dict[str, str]] = {}
|
|
|
|
|
|
def get_project_states(project_id: str) -> dict[str, str]:
|
|
"""ORCH-10: resolve {logical_key -> state_uuid} for a specific Plane project.
|
|
|
|
Source of truth: Plane API GET /projects/<project_id>/states/.
|
|
Results are cached per project_id for the lifetime of the process.
|
|
Falls back to _DEFAULT_STATES (enduro-trails values) if:
|
|
* project_id is empty/None,
|
|
* the API call fails (network error, non-2xx),
|
|
* the response contains no recognisable states.
|
|
|
|
The enduro-trails project therefore returns the same UUIDs as before
|
|
(backward compatible). The orchestrator project returns its own UUIDs,
|
|
fixing the ORCH-10 blocker.
|
|
"""
|
|
if not project_id:
|
|
return _DEFAULT_STATES
|
|
|
|
if project_id in _STATES_CACHE:
|
|
return _STATES_CACHE[project_id]
|
|
|
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/states/"
|
|
try:
|
|
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
|
resp.raise_for_status()
|
|
body = resp.json()
|
|
# Plane returns {"results": [...]} or a bare list.
|
|
items = body.get("results", body) if isinstance(body, dict) else body
|
|
if not isinstance(items, list):
|
|
raise ValueError(f"unexpected states response shape: {type(items)}")
|
|
|
|
resolved: dict[str, str] = {}
|
|
for item in items:
|
|
name = item.get("name", "")
|
|
uid = item.get("id", "")
|
|
key = _PLANE_NAME_TO_KEY.get(name)
|
|
if key and uid:
|
|
resolved[key] = uid
|
|
|
|
if not resolved:
|
|
raise ValueError("no recognisable states in API response")
|
|
|
|
# Fill any missing keys from _DEFAULT_STATES so callers always get a
|
|
# complete mapping (defensive against partial Plane configs).
|
|
for k, v in _DEFAULT_STATES.items():
|
|
resolved.setdefault(k, v)
|
|
|
|
_STATES_CACHE[project_id] = resolved
|
|
logger.debug(
|
|
f"get_project_states: cached {len(resolved)} states for project {project_id[:8]}..."
|
|
)
|
|
return resolved
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"get_project_states: API failed for project {project_id[:8]}..., "
|
|
f"falling back to _DEFAULT_STATES. Error: {e}"
|
|
)
|
|
return _DEFAULT_STATES
|
|
|
|
|
|
def reload_project_states(project_id: str = None) -> None:
|
|
"""ORCH-10: clear the per-project states cache.
|
|
|
|
If project_id is given, evict only that project.
|
|
If None, flush the entire cache (useful in tests and after config reload).
|
|
"""
|
|
global _STATES_CACHE
|
|
if project_id is None:
|
|
_STATES_CACHE = {}
|
|
logger.debug("reload_project_states: full cache cleared")
|
|
else:
|
|
_STATES_CACHE.pop(project_id, None)
|
|
logger.debug(f"reload_project_states: evicted project {project_id[:8]}...")
|
|
|
|
|
|
# Feature 3: map an orchestrator stage -> the Plane status to show on the board
|
|
# when the pipeline ENTERS that stage. analysis stays driven by the existing
|
|
# in_progress/in_review/needs_input logic (no dedicated status). deploy keeps
|
|
# in_progress until done. Needs Input / In Review / Blocked remain higher
|
|
# priority and are set explicitly elsewhere — do NOT override them from here.
|
|
STAGE_VISIBILITY_STATE = {
|
|
"architecture": "architecture",
|
|
"development": "development",
|
|
"review": "review",
|
|
"testing": "testing",
|
|
}
|
|
|
|
# STAGE_TO_STATE kept for backward compat (used by tests that patch it).
|
|
# update_issue_state now calls stage_to_state() instead of looking up here.
|
|
STAGE_TO_STATE = {
|
|
"created": _DEFAULT_STATES["todo"],
|
|
"analysis": _DEFAULT_STATES["in_progress"],
|
|
"architecture": _DEFAULT_STATES["architecture"],
|
|
"development": _DEFAULT_STATES["development"],
|
|
"review": _DEFAULT_STATES["review"],
|
|
"testing": _DEFAULT_STATES["testing"],
|
|
"deploy": _DEFAULT_STATES["in_progress"],
|
|
"done": _DEFAULT_STATES["done"],
|
|
}
|
|
|
|
# Map orchestrator stage -> logical state key (project-independent).
|
|
_STAGE_TO_STATE_KEY = {
|
|
"created": "todo",
|
|
"analysis": "in_progress",
|
|
"architecture": "architecture",
|
|
"development": "development",
|
|
"review": "review",
|
|
"testing": "testing",
|
|
"deploy": "in_progress",
|
|
"done": "done",
|
|
}
|
|
|
|
|
|
def stage_to_state(stage: str, project_id: str) -> str | None:
|
|
"""ORCH-10: return the Plane state UUID for a pipeline stage in a project.
|
|
|
|
Resolves via get_project_states so the correct per-project UUID is used.
|
|
Returns None for unknown stages (same behaviour as the old STAGE_TO_STATE
|
|
dict lookup returning None).
|
|
"""
|
|
key = _STAGE_TO_STATE_KEY.get(stage)
|
|
if not key:
|
|
return None
|
|
return get_project_states(project_id).get(key)
|
|
|
|
|
|
def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
|
|
"""M-6: GET the Plane issue by UUID and return its sequence_id (the
|
|
authoritative per-project number), or None if unavailable.
|
|
|
|
Returns None on network error, non-2xx, or a missing field - never raises,
|
|
so the webhook handler can fall back to DB increment and stay autonomous.
|
|
"""
|
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
|
try:
|
|
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
|
resp.raise_for_status()
|
|
seq = resp.json().get("sequence_id")
|
|
return int(seq) if seq is not None else None
|
|
except Exception as e:
|
|
logger.warning(f"fetch_issue_sequence_id failed for {issue_id}: {e}")
|
|
return None
|
|
|
|
|
|
import re as _re
|
|
|
|
|
|
def _strip_html(html: str) -> str:
|
|
"""Crude HTML -> text: drop tags and collapse whitespace. Good enough to
|
|
feed QG-0's length check when Plane only gives us description_html."""
|
|
if not html:
|
|
return ""
|
|
text = _re.sub(r"<[^>]+>", " ", html)
|
|
return _re.sub(r"\s+", " ", text).strip()
|
|
|
|
|
|
def fetch_issue_description(issue_id: str, project_id: str) -> str:
|
|
"""BUG 1: GET the Plane issue by UUID and return its description text.
|
|
|
|
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
|
|
CHANGED fields, so ``description``/``description_stripped`` are usually
|
|
absent there. start_pipeline calls this to pull the full description from the
|
|
issue detail endpoint so QG-0 does not blow up on an empty payload field.
|
|
|
|
Reuses the exact GET issue detail endpoint / shared token already used by
|
|
``fetch_issue_sequence_id`` (same URL, same PLANE_HEADERS). Prefers
|
|
``description_stripped``; falls back to stripping ``description_html``.
|
|
|
|
Returns "" on network error, non-2xx, or a missing field - never raises, so
|
|
a Plane outage degrades to the honest "empty description" QG-0 path instead
|
|
of crashing the webhook.
|
|
"""
|
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
|
try:
|
|
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
|
resp.raise_for_status()
|
|
body = resp.json()
|
|
desc = body.get("description_stripped")
|
|
if desc and desc.strip():
|
|
return desc
|
|
return _strip_html(body.get("description_html") or "")
|
|
except Exception as e:
|
|
logger.warning(f"fetch_issue_description failed for {issue_id}: {e}")
|
|
return ""
|
|
|
|
|
|
def fetch_issue_fields(issue_id: str, project_id: str) -> tuple[str, str]:
|
|
"""BUG B: GET the Plane issue by UUID ONCE and return (name, description).
|
|
|
|
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
|
|
CHANGED fields, so BOTH ``name`` and ``description`` are usually absent in
|
|
the payload. start_pipeline needs the real title (for the branch slug) and
|
|
the real description (for the analyst .task.md). To avoid issuing two
|
|
separate issue-detail GETs (one for name, one for description), this single
|
|
request returns both.
|
|
|
|
Reuses the exact GET issue detail endpoint / shared token already used by
|
|
``fetch_issue_sequence_id`` / ``fetch_issue_description``. For the
|
|
description it applies the same logic as ``fetch_issue_description``
|
|
(prefer ``description_stripped``, fall back to stripping
|
|
``description_html``).
|
|
|
|
Returns ("", "") on network error, non-2xx, or missing body - never raises,
|
|
so a Plane outage degrades gracefully (caller keeps its payload fallbacks).
|
|
"""
|
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
|
try:
|
|
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
|
resp.raise_for_status()
|
|
body = resp.json()
|
|
name = (body.get("name") or "").strip()
|
|
desc = body.get("description_stripped")
|
|
if desc and desc.strip():
|
|
description = desc
|
|
else:
|
|
description = _strip_html(body.get("description_html") or "")
|
|
return name, description
|
|
except Exception as e:
|
|
logger.warning(f"fetch_issue_fields failed for {issue_id}: {e}")
|
|
return "", ""
|
|
|
|
|
|
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
|
|
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
# Primary: lookup from DB (plane_issue_id column)
|
|
try:
|
|
from .db import get_db
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT plane_issue_id FROM tasks WHERE work_item_id = ? AND plane_issue_id IS NOT NULL",
|
|
(work_item_id,)
|
|
).fetchone()
|
|
if row and row[0]:
|
|
return row[0]
|
|
except Exception as e:
|
|
logger.debug(f"DB lookup failed for {work_item_id}: {e}")
|
|
|
|
# Fallback: search via Plane API
|
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/"
|
|
try:
|
|
# First try search by work_item_id
|
|
resp = httpx.get(url, headers=PLANE_HEADERS, params={"search": work_item_id}, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
results = data.get("results", data if isinstance(data, list) else [])
|
|
# M-6: match by sequence_id directly (the authoritative per-project
|
|
# number), parsed from the work_item_id suffix - no hardcoded prefix.
|
|
try:
|
|
target_num = int(work_item_id.rsplit("-", 1)[1])
|
|
except (IndexError, ValueError):
|
|
target_num = None
|
|
for issue in results:
|
|
if target_num is not None and issue.get("sequence_id") == target_num:
|
|
return issue["id"]
|
|
if work_item_id in issue.get("name", ""):
|
|
return issue["id"]
|
|
# Fallback: get all issues and match by sequence_id number (any prefix)
|
|
if target_num is not None:
|
|
resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
|
resp2.raise_for_status()
|
|
data2 = resp2.json()
|
|
results2 = data2.get("results", data2 if isinstance(data2, list) else [])
|
|
for issue in results2:
|
|
if issue.get("sequence_id") == target_num:
|
|
return issue["id"]
|
|
except Exception as e:
|
|
logger.error(f"Failed to find issue for {work_item_id}: {e}")
|
|
return None
|
|
|
|
|
|
def update_issue_state(work_item_id: str, stage: str, project_id: str = None):
|
|
"""Update Plane issue state based on orchestrator stage."""
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
# ORCH-10: resolve state UUID for this specific project (not global dict).
|
|
state_id = stage_to_state(stage, project_id)
|
|
if not state_id:
|
|
return
|
|
|
|
issue_id = find_issue_id(work_item_id, project_id)
|
|
if not issue_id:
|
|
logger.warning(f"Issue not found in Plane for {work_item_id}")
|
|
return
|
|
|
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
|
try:
|
|
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
|
|
resp.raise_for_status()
|
|
logger.info(f"Plane: {work_item_id} state -> {stage} ({state_id[:8]}...)")
|
|
except Exception as e:
|
|
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
|
|
|
|
|
|
def add_comment(work_item_id: str, text: str, project_id: str = None, author: str = None):
|
|
"""Add a comment to a Plane issue.
|
|
|
|
feat(plane): when ``author`` (an agent role) maps to a configured bot
|
|
token, the comment is POSTed under that bot so Plane shows the real author.
|
|
Otherwise it falls back to the shared orchestrator token (see
|
|
``_headers_for``). GET/PATCH calls elsewhere keep using PLANE_HEADERS.
|
|
"""
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
issue_id = find_issue_id(work_item_id, project_id)
|
|
if not issue_id:
|
|
logger.warning(f"Issue not found in Plane for {work_item_id}, skipping comment")
|
|
return
|
|
|
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/"
|
|
html = f"<p>{text}</p>"
|
|
try:
|
|
resp = httpx.post(url, headers=_headers_for(author), json={"comment_html": html}, timeout=10)
|
|
resp.raise_for_status()
|
|
logger.info(f"Plane: comment added to {work_item_id} (author={author or 'orchestrator'})")
|
|
except Exception as e:
|
|
logger.error(f"Failed to add comment to {work_item_id}: {e}")
|
|
|
|
|
|
def set_issue_needs_input(work_item_id: str, project_id: str = None):
|
|
"""Set issue to 'Needs Input' state — waiting for stakeholder response."""
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
state_id = get_project_states(project_id)["needs_input"]
|
|
_set_issue_state_direct(work_item_id, state_id, project_id)
|
|
|
|
|
|
def set_issue_in_review(work_item_id: str, project_id: str = None):
|
|
"""Set issue to 'In Review' state — waiting for :approved: or :rejected:."""
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
state_id = get_project_states(project_id)["in_review"]
|
|
_set_issue_state_direct(work_item_id, state_id, project_id)
|
|
|
|
|
|
def set_issue_blocked(work_item_id: str, project_id: str = None):
|
|
"""Set issue to 'Blocked' state — manual intervention needed."""
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
state_id = get_project_states(project_id)["blocked"]
|
|
_set_issue_state_direct(work_item_id, state_id, project_id)
|
|
|
|
|
|
def set_issue_done(work_item_id: str, project_id: str = None):
|
|
"""Observability fix: force the issue into the TERMINAL Done state.
|
|
|
|
Used by the deploy->done success path so a completed task always reaches the
|
|
terminal Plane state (it used to stick on In Progress because the merge
|
|
webhook bypassed the stage engine). Resolves per-project UUID via
|
|
get_project_states (ORCH-10).
|
|
"""
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
state_id = get_project_states(project_id)["done"]
|
|
_set_issue_state_direct(work_item_id, state_id, project_id)
|
|
|
|
|
|
def set_issue_in_progress(work_item_id: str, project_id: str = None):
|
|
"""Set issue to 'In Progress' state — agent working."""
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
state_id = get_project_states(project_id)["in_progress"]
|
|
_set_issue_state_direct(work_item_id, state_id, project_id)
|
|
|
|
|
|
def set_issue_stage_state(work_item_id: str, stage: str, project_id: str = None):
|
|
"""Feature 3: move the issue to the board status for a pipeline stage.
|
|
|
|
Only the visible-stage statuses (architecture/development/review/testing)
|
|
are driven here — stages without a dedicated status (analysis/deploy) are a
|
|
no-op so the existing in_progress/in_review/needs_input logic stays in
|
|
charge. By design this does NOT touch Needs Input / In Review / Blocked,
|
|
which are higher priority and set explicitly by their own helpers.
|
|
"""
|
|
state_key = STAGE_VISIBILITY_STATE.get(stage)
|
|
if not state_key:
|
|
return
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
# ORCH-10: resolve per-project UUID.
|
|
state_id = get_project_states(project_id)[state_key]
|
|
_set_issue_state_direct(work_item_id, state_id, project_id)
|
|
|
|
|
|
def _set_issue_state_direct(work_item_id: str, state_id: str, project_id: str = None):
|
|
"""Set issue state directly by state_id."""
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
issue_id = find_issue_id(work_item_id, project_id)
|
|
if not issue_id:
|
|
logger.warning(f"Issue not found in Plane for {work_item_id}")
|
|
return
|
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
|
try:
|
|
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
|
|
resp.raise_for_status()
|
|
logger.info(f"Plane: {work_item_id} state -> {state_id[:8]}...")
|
|
except Exception as e:
|
|
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
|
|
|
|
|
|
def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None, project_id: str = None):
|
|
"""Notify Plane about stage transition with links."""
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
update_issue_state(work_item_id, new_stage, project_id)
|
|
|
|
msg = f"{EMOJI_STAGE} Stage: {old_stage} → {new_stage}"
|
|
if agent:
|
|
msg += f" (launching {agent})"
|
|
|
|
# Add relevant links
|
|
gitea_base = "http://git.mva154.duckdns.org"
|
|
try:
|
|
from .db import get_db
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT branch, repo FROM tasks WHERE work_item_id=?", (work_item_id,)
|
|
).fetchone()
|
|
conn.close()
|
|
if row:
|
|
branch, repo = row
|
|
msg += chr(10) + "📂 Branch: [" + branch + "](" + gitea_base + "/admin/" + repo + "/src/branch/" + branch + ")"
|
|
if new_stage in ("review", "testing", "deploy"):
|
|
import httpx as _httpx
|
|
from .config import settings
|
|
_headers = {"Authorization": f"token {settings.gitea_token}"}
|
|
_resp = _httpx.get(
|
|
f"{settings.gitea_url}/api/v1/repos/{settings.gitea_owner}/{repo}/pulls",
|
|
params={"state": "open", "head": branch},
|
|
headers=_headers, timeout=5
|
|
)
|
|
if _resp.status_code == 200:
|
|
_prs = _resp.json()
|
|
if _prs:
|
|
pr_num = _prs[0]["number"]
|
|
msg += chr(10) + "🔗 PR: [#" + str(pr_num) + "](" + gitea_base + "/admin/" + repo + "/pulls/" + str(pr_num) + ")"
|
|
except Exception:
|
|
pass
|
|
|
|
# Stage transition is the orchestrator's own voice -> attribute to stream.
|
|
add_comment(work_item_id, msg, project_id, author="stream")
|
|
|
|
|
|
def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None):
|
|
"""Notify Plane about QG failure."""
|
|
# QG failure belongs to the agent that owns the failing stage.
|
|
add_comment(
|
|
work_item_id,
|
|
f"{EMOJI_QG_FAIL} QG failed at {stage}: {check} — {reason}",
|
|
project_id,
|
|
author=STAGE_AUTHORS.get(stage, "stream"),
|
|
)
|
|
|
|
|
|
def notify_done(work_item_id: str, project_id: str = None):
|
|
"""Mark issue as Done in Plane."""
|
|
project_id = _resolve_project_id(work_item_id, project_id)
|
|
update_issue_state(work_item_id, "done", project_id)
|
|
# Deploy finished the task -> attribute the completion comment to Deployer.
|
|
add_comment(
|
|
work_item_id,
|
|
f"{EMOJI_DONE} Task completed! PR merged and deployed.",
|
|
project_id,
|
|
author="deployer",
|
|
)
|