Files
orchestrator/src/plane_sync.py
claude-bot a6d0ba51c0 feat(labels): auto-mode by Plane labels — autoApprove + autoDeploy (ORCH-089)
Lift the two HUMAN gates that block an autonomous batch run (epic ORCH-088):
the BRD gate (analysis: manual Approved) and the prod-deploy gate (deploy
Phase A: manual Confirm Deploy, ORCH-059). Selective (a Plane label on the
issue), declarative, reversible, and WITHOUT touching a single technical check.

Additive, mirroring the conditional sub-gates (ORCH-035/043/058/088): leaf
src/labels.py (never-raise) + two point insertions + config flags.
STAGE_TRANSITIONS / QG_CHECKS / check_* / DB schema are NOT touched.

- autoApprove: врезка in _handle_analysis_approved_flow (files_ok branch) ->
  set_issue_approved + log/Telegram/Plane-comment + advance_stage(
  finished_agent=None) — the SAME path a human Approved takes (approved-via-
  status -> analysis->architecture + mark_brd_review_ended). No duplicated
  transition logic; re-entrancy safe.
- autoDeploy: врезка in _handle_self_deploy_phase_a after advance to deploy +
  clear_state -> log/Telegram/Plane-comment + _handle_self_deploy_phase_b
  (INITIATED marker, Deploying, finalizer). Only the indicative human steps are
  skipped. BR-5 holds structurally: Phase A is reached only after the green edge
  sub-gates, so autoDeploy can never deploy a broken build.
- plane_sync: fetch_issue_labels (None on error != []), get_project_labels
  ({normalized_name->uuid}, TTL cache, ambiguity sentinel), set_issue_approved.
- config flags: auto_label_enabled (kill-switch), auto_approve_label/
  auto_deploy_label, auto_label_repos (empty -> self-hosting only),
  auto_label_states_ttl_s. applies() (local) checked FIRST; has_label (network)
  only when applies==True -> zero network / zero regression when disabled (AC-8).
- Fail-safe (never auto on doubt), transparency via log+Telegram+Plane+card,
  read-only auto_labels block in GET /queue.
- Tests TC-01..TC-26 across 7 modules; docs (CLAUDE.md, architecture README,
  CHANGELOG) updated in the same PR.

Refs: ORCH-089

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 12:31:24 +03:00

1076 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Plane API sync — update issue state and add comments."""
import logging
import time
import httpx
from .config import settings
logger = logging.getLogger("orchestrator.plane_sync")
# L-3: emoji literals used in Plane comment bodies, named for readability.
# Message text stays byte-for-byte identical to the previous output.
EMOJI_STAGE = "\U0001F504" # stage transition
EMOJI_QG_FAIL = "\u26A0\uFE0F" # quality-gate failure
EMOJI_DONE = "\u2705" # task completed
PLANE_BASE = f"{settings.plane_api_url}/api/v1"
PLANE_HEADERS = {"X-API-Key": settings.plane_api_token}
WORKSPACE = settings.plane_workspace_slug
# feat(plane): per-agent comment authorship.
# Map an agent role -> its dedicated Plane bot token (read from config / env).
# When the token is present, add_comment() POSTs under that bot so Plane shows
# the real author. Empty/unknown role -> fallback to the shared orchestrator
# token (PLANE_HEADERS), so commenting stays autonomous.
PLANE_BOT_TOKENS = {
"analyst": settings.plane_bot_analyst,
"architect": settings.plane_bot_architect,
"developer": settings.plane_bot_developer,
"reviewer": settings.plane_bot_reviewer,
"tester": settings.plane_bot_tester,
"deployer": settings.plane_bot_deployer,
"stream": settings.plane_bot_stream,
}
# Map a pipeline stage -> the agent role that owns work in that stage. Used to
# pick an author for rollback/stage notifications targeting a specific stage.
STAGE_AUTHORS = {
"analysis": "analyst",
"architecture": "architect",
"development": "developer",
"review": "reviewer",
"testing": "tester",
"deploy": "deployer",
}
def _headers_for(author: str | None) -> dict:
"""Return X-API-Key headers for the given agent role.
Falls back to the shared orchestrator token (PLANE_HEADERS /
settings.plane_api_token) when the role is None, unknown, or its bot token
is not configured. This keeps comment posting autonomous: a comment is
always written, just attributed to the orchestrator if no bot is set.
"""
tok = PLANE_BOT_TOKENS.get(author or "") if author else None
return {"X-API-Key": tok} if tok else PLANE_HEADERS
PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c"
def _resolve_project_id(work_item_id: str = None, project_id: str = None) -> str:
"""ORCH-6: resolve the Plane project id for a sync call.
Priority:
1. explicit project_id arg (caller already knows the project),
2. project derived from the task's repo in the DB (by work_item_id),
3. legacy default PROJECT_ID (enduro) for backward compatibility.
"""
if project_id:
return project_id
if work_item_id:
try:
from .db import get_db
from .projects import get_project_by_repo
conn = get_db()
row = conn.execute(
"SELECT repo FROM tasks WHERE work_item_id = ? ORDER BY id DESC LIMIT 1",
(work_item_id,),
).fetchone()
conn.close()
if row and row[0]:
proj = get_project_by_repo(row[0])
if proj:
return proj.plane_project_id
except Exception as e:
logger.debug(f"_resolve_project_id fallback for {work_item_id}: {e}")
return PROJECT_ID
# ORCH-10: per-project state resolution.
#
# _DEFAULT_STATES keeps the original enduro-trails UUIDs as a safe fallback
# (used when the Plane API is unreachable and for backward compat).
# PLANE_STATES is preserved as an alias so existing call sites that reference
# it directly (QG-0 fast-path in webhooks/plane.py, tests) continue to work.
_DEFAULT_STATES = {
"backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122",
"todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10",
"in_progress": "b873d9eb-993c-48cd-97ac-99a9b1623967",
"needs_input": "babf08a3-ff4d-41f3-a821-5491aa29a8ac",
"in_review": "38fb1f64-aa1e-48a3-92e0-0b109679046b",
"blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920",
"done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8",
"cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17",
# Feature 3 (stage visibility) — per-stage statuses on the board.
"architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d",
"development": "9920609b-f140-4e46-ab95-89acda8412c8",
"review": "ba0d802c-5218-41d4-ab43-978b0ea123ed",
"testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02",
# Feature 2 (verdict statuses) — Approved / Rejected.
"approved": "a519a341-dada-4a91-8910-7604f82b79c5",
"rejected": "ba958f3c-5db5-461d-8f82-89425e413b97",
# ORCH-066 (meaningful Plane status model, layer B): six new logical keys.
# Their _DEFAULT_STATES values alias the enduro-trails UUID of their BASE key
# (see _STATE_ALIAS_FALLBACK) so a project without these statuses created
# (enduro / Plane down / partial config) degrades to the current behaviour
# instead of producing an invalid PATCH state. The project-relative
# alias-fallback in get_project_states() overrides these with the *project's
# own* base UUID on the success path; these defaults are the last resort.
"to_analyse": "b873d9eb-993c-48cd-97ac-99a9b1623967", # = in_progress
"analysis": "b873d9eb-993c-48cd-97ac-99a9b1623967", # = in_progress
"code_review": "ba0d802c-5218-41d4-ab43-978b0ea123ed", # = review
"awaiting_deploy": "38fb1f64-aa1e-48a3-92e0-0b109679046b", # = in_review
"deploying": "b873d9eb-993c-48cd-97ac-99a9b1623967", # = in_progress
"monitoring": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", # = done
}
# Backward-compat alias — do NOT remove (tests + webhooks/plane.py import it).
PLANE_STATES = _DEFAULT_STATES
# Mapping: Plane state *name* (as returned by the API) -> logical key.
_PLANE_NAME_TO_KEY: dict[str, str] = {
"Backlog": "backlog",
"Todo": "todo",
"In Progress": "in_progress",
"Architecture": "architecture",
"Development": "development",
"Review": "review",
"Testing": "testing",
"Approved": "approved",
"Rejected": "rejected",
"Done": "done",
"Cancelled": "cancelled",
"Needs Input": "needs_input",
"In Review": "in_review",
"Blocked": "blocked",
# ORCH-059: dedicated prod-deploy trigger status, distinct from the
# human-gate "Approved". Resolved from the live Plane API for the ORCH
# project; intentionally ABSENT from _DEFAULT_STATES so environments without
# this board status (enduro / API fallback) fail-closed — no UUID, no
# confirm-deploy branch, no KeyError (accessed via .get).
"Confirm Deploy": "confirm_deploy",
# ORCH-066: meaningful per-stage / human-input statuses (layer B).
"To Analyse": "to_analyse",
"Analysis": "analysis",
"Code-Review": "code_review",
"Awaiting Deploy": "awaiting_deploy",
"Deploying": "deploying",
"Monitoring after Deploy": "monitoring",
}
# ORCH-066 (BR-12): project-relative alias-fallback for the new logical keys.
# After resolving states by name from the Plane API, any NEW key the project did
# not define degrades to the UUID of its BASE key **from the same project** — so
# the indication falls back to the current status and the PATCH stays valid even
# for a partially-configured project. Enduro (none of the new statuses created)
# collapses every new key onto its base, i.e. strictly the pre-ORCH-066
# behaviour. Strengthened ORCH-059 AC-7 pattern.
_STATE_ALIAS_FALLBACK: dict[str, str] = {
"to_analyse": "in_progress",
"analysis": "in_progress",
"code_review": "review",
"awaiting_deploy": "in_review",
"deploying": "in_progress",
"monitoring": "done",
}
# Per-project state cache (ORCH-10 + ORCH-068).
#
# Each entry is a RECORD, not a bare mapping:
# {"states": {logical_key: state_uuid}, # the ORCH-10 mapping (unchanged shape)
# "groups": {state_uuid: group}, # ORCH-068 D1: {uuid -> Plane state.group}
# "ts": monotonic timestamp} # ORCH-068 TR-4: for TTL self-heal
# get_project_states() still RETURNS the bare {logical_key: state_uuid} mapping
# (backward compatible — AC-13); the richer record is internal.
_STATES_CACHE: dict[str, dict] = {}
def _cache_record_fresh(record: dict) -> bool:
"""ORCH-068 (TR-4): is a cache record still within its TTL?
``plane_states_ttl_s <= 0`` disables the TTL -> a record never expires
(strictly the previous lifetime-cache behaviour, back-compat escape hatch).
"""
ttl = settings.plane_states_ttl_s
if ttl <= 0:
return True
ts = record.get("ts", 0.0)
return (time.monotonic() - ts) <= ttl
def get_project_states(project_id: str) -> dict[str, str]:
"""ORCH-10: resolve {logical_key -> state_uuid} for a specific Plane project.
Source of truth: Plane API GET /projects/<project_id>/states/.
Results are cached per project_id. ORCH-068 (TR-4): a cached entry is
re-fetched once it is older than ``plane_states_ttl_s`` (default 300s) so a
status added to Plane after start self-heals without a process restart;
``plane_states_ttl_s = 0`` keeps the previous lifetime cache.
Falls back to _DEFAULT_STATES (enduro-trails values) if:
* project_id is empty/None,
* the API call fails (network error, non-2xx) AND nothing is cached,
* the response contains no recognisable states.
The enduro-trails project therefore returns the same UUIDs as before
(backward compatible). The orchestrator project returns its own UUIDs,
fixing the ORCH-10 blocker.
"""
if not project_id:
return _DEFAULT_STATES
cached = _STATES_CACHE.get(project_id)
if cached is not None and _cache_record_fresh(cached):
return cached["states"]
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/states/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
body = resp.json()
# Plane returns {"results": [...]} or a bare list.
items = body.get("results", body) if isinstance(body, dict) else body
if not isinstance(items, list):
raise ValueError(f"unexpected states response shape: {type(items)}")
resolved: dict[str, str] = {}
groups: dict[str, str] = {}
for item in items:
name = item.get("name", "")
uid = item.get("id", "")
key = _PLANE_NAME_TO_KEY.get(name)
if key and uid:
resolved[key] = uid
# ORCH-068 D1: capture {uuid -> group} for terminal-state detection
# (a single API fetch — no extra network cost). The group is the
# authoritative, project-independent discriminator of terminal
# (completed/cancelled) vs review/work statuses, robust to UUID
# aliasing after status renames (ORCH-066).
grp = item.get("group", "")
if uid and grp:
groups[uid] = grp
if not resolved:
raise ValueError("no recognisable states in API response")
# ORCH-066 (BR-12): project-relative alias-fallback. For each NEW key the
# project did not define, reuse the UUID of its BASE key FROM THIS SAME
# PROJECT (never a foreign/enduro UUID — that would yield an invalid PATCH
# state on a partially-configured orchestrator project). Runs BEFORE the
# _DEFAULT_STATES.setdefault below so a project's own base UUID wins over
# the static enduro default.
for new_key, base_key in _STATE_ALIAS_FALLBACK.items():
if new_key not in resolved and resolved.get(base_key):
resolved[new_key] = resolved[base_key]
# Fill any missing keys from _DEFAULT_STATES so callers always get a
# complete mapping (defensive against partial Plane configs).
for k, v in _DEFAULT_STATES.items():
resolved.setdefault(k, v)
_STATES_CACHE[project_id] = {
"states": resolved,
"groups": groups,
"ts": time.monotonic(),
}
logger.debug(
f"get_project_states: cached {len(resolved)} states / "
f"{len(groups)} groups for project {project_id[:8]}..."
)
return resolved
except Exception as e:
# On a transient API failure keep serving the stale (but project-correct)
# set if we have one — far safer than reverting to enduro defaults.
if cached is not None:
logger.warning(
f"get_project_states: API refresh failed for project "
f"{project_id[:8]}..., serving stale cached set. Error: {e}"
)
return cached["states"]
logger.warning(
f"get_project_states: API failed for project {project_id[:8]}..., "
f"falling back to _DEFAULT_STATES. Error: {e}"
)
return _DEFAULT_STATES
def get_project_state_groups(project_id: str) -> dict[str, str]:
"""ORCH-068 (D1): return {state_uuid -> group} for a Plane project.
Reads the SAME cache record populated by ``get_project_states`` (no extra
network call). Call ``get_project_states(project_id)`` first to ensure the
record is fresh/populated. Returns ``{}`` when nothing is cached (e.g. the
API was unreachable and the caller fell back to ``_DEFAULT_STATES``); the
reconciler then falls back to logical terminal keys.
"""
record = _STATES_CACHE.get(project_id)
if isinstance(record, dict):
groups = record.get("groups")
if isinstance(groups, dict):
return groups
return {}
def reload_project_states(project_id: str = None) -> None:
"""ORCH-10: clear the per-project states cache.
If project_id is given, evict only that project.
If None, flush the entire cache (useful in tests and after config reload).
"""
global _STATES_CACHE
if project_id is None:
_STATES_CACHE = {}
logger.debug("reload_project_states: full cache cleared")
else:
_STATES_CACHE.pop(project_id, None)
logger.debug(f"reload_project_states: evicted project {project_id[:8]}...")
# ---------------------------------------------------------------------------
# ORCH-089: label reading (auto-mode by Plane labels) + Approved setter.
#
# Source of truth for an issue's labels is the Plane API, NOT the webhook payload
# (both auto-mode insertion points are launcher-path events where the payload is
# absent; src/webhooks/plane.py does not carry `labels`). All three helpers honour
# a never-raise contract: a failure degrades to "no label" / "no-op", so the
# auto-mode falls back to the manual gate (fail-safe, BR-6/AC-6).
# ---------------------------------------------------------------------------
# Per-project label-map cache (mirrors _STATES_CACHE / ORCH-068 TTL self-heal).
# Each entry: {"map": {normalized_name -> uuid}, "ts": monotonic timestamp}.
_LABELS_CACHE: dict[str, dict] = {}
def _normalize_label(name: str) -> str:
"""Normalize a label name for matching (case/whitespace-insensitive)."""
return (name or "").strip().casefold()
def _labels_record_fresh(record: dict) -> bool:
"""ORCH-089: is a label-map cache record still within its TTL?
``auto_label_states_ttl_s <= 0`` disables the TTL (lifetime cache, escape
hatch mirroring ``_cache_record_fresh`` / ``plane_states_ttl_s``).
"""
try:
ttl = settings.auto_label_states_ttl_s
except Exception: # noqa: BLE001
ttl = 0
if ttl <= 0:
return True
ts = record.get("ts", 0.0)
return (time.monotonic() - ts) <= ttl
def reload_project_labels(project_id: str = None) -> None:
"""ORCH-089: clear the per-project label-map cache (tests / config reload)."""
global _LABELS_CACHE
if project_id is None:
_LABELS_CACHE = {}
else:
_LABELS_CACHE.pop(project_id, None)
def get_project_labels(project_id: str) -> dict[str, str]:
"""ORCH-089: resolve {normalized_label_name -> uuid} for a Plane project.
Source of truth: GET /projects/<pid>/labels/. Cached per project_id with a
TTL (``auto_label_states_ttl_s``, default 300s) mirroring
``get_project_states`` so we do not hit the API on every gate. On a transient
API failure a stale-but-correct cached map is served (safer-than-empty); with
nothing cached -> ``{}`` (caller resolves to "no label" -> manual gate).
Ambiguity guard (D1.4): if two distinct project labels normalise to the SAME
name, that name is mapped to a sentinel so ``has_label`` treats it as "no
match" (fail-safe) instead of silently picking one uuid. never-raise -> ``{}``.
"""
if not project_id:
return {}
cached = _LABELS_CACHE.get(project_id)
if cached is not None and _labels_record_fresh(cached):
return cached["map"]
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/labels/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
body = resp.json()
items = body.get("results", body) if isinstance(body, dict) else body
if not isinstance(items, list):
raise ValueError(f"unexpected labels response shape: {type(items)}")
name_map: dict[str, str] = {}
ambiguous: set[str] = set()
for item in items:
uid = item.get("id", "")
norm = _normalize_label(item.get("name", ""))
if not (uid and norm):
continue
if norm in name_map and name_map[norm] != uid:
# Two distinct labels collide on the normalized name -> ambiguous.
ambiguous.add(norm)
name_map[norm] = uid
for norm in ambiguous:
# AMBIGUOUS sentinel: never equals a real issue-label uuid, so
# has_label's membership test is False -> fail-safe to the manual gate.
name_map[norm] = "__AMBIGUOUS__"
logger.warning(
"get_project_labels: ambiguous label name %r in project %s "
"-> treated as no-match (fail-safe)", norm, project_id[:8],
)
_LABELS_CACHE[project_id] = {"map": name_map, "ts": time.monotonic()}
logger.debug(
"get_project_labels: cached %d labels for project %s...",
len(name_map), project_id[:8],
)
return name_map
except Exception as e: # noqa: BLE001 - never-raise
if cached is not None:
logger.warning(
"get_project_labels: API refresh failed for project %s..., "
"serving stale cached map. Error: %s", project_id[:8], e,
)
return cached["map"]
logger.warning(
"get_project_labels: API failed for project %s..., no cache -> {}. "
"Error: %s", project_id[:8], e,
)
return {}
def fetch_issue_labels(work_item_id: str, project_id: str = None) -> list[str] | None:
"""ORCH-089: GET the issue and return its ``labels`` (a list of label uuids).
Returns ``None`` on any error / issue-not-found (DISTINCT from ``[]`` = "the
issue has no labels") so the caller can distinguish "could not read" (fail-safe
to manual) from "definitely no labels". never-raise.
"""
project_id = _resolve_project_id(work_item_id, project_id)
issue_id = find_issue_id(work_item_id, project_id)
if not issue_id:
logger.debug("fetch_issue_labels: issue not found for %s", work_item_id)
return None
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
labels = resp.json().get("labels", [])
if not isinstance(labels, list):
return None
return [str(x) for x in labels]
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("fetch_issue_labels failed for %s: %s", work_item_id, e)
return None
def set_issue_approved(work_item_id: str, project_id: str = None):
"""ORCH-089: set issue to 'Approved' — indication of an auto-approved BRD.
1:1 mirror of ``set_issue_in_review``: resolve the per-project Approved UUID
(``get_project_states(pid)["approved"]`` — the key already exists in
``_DEFAULT_STATES`` / ``_PLANE_NAME_TO_KEY``) and PATCH the issue. never-raise
(via ``_set_issue_state_direct``). The status is transient — the immediately
following advance to ``architecture`` overrides it; durable transparency is
carried by the log + Telegram + Plane comment (AC-7).
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["approved"]
_set_issue_state_direct(work_item_id, state_id, project_id)
# Feature 3: map an orchestrator stage -> the Plane status to show on the board
# when the pipeline ENTERS that stage. ORCH-066: analysis -> Analysis and
# review -> Code-Review now have dedicated statuses. deploy keeps in_progress
# until its own Phase A/B/C statuses drive it. Needs Input / In Review / Blocked
# remain higher priority and are set explicitly elsewhere — do NOT override them
# from here.
STAGE_VISIBILITY_STATE = {
"analysis": "analysis", # ORCH-066: analysis stage -> Analysis status
"architecture": "architecture",
"development": "development",
"review": "code_review", # ORCH-066: review stage -> Code-Review status
"testing": "testing",
}
# STAGE_TO_STATE kept for backward compat (used by tests that patch it).
# update_issue_state now calls stage_to_state() instead of looking up here.
STAGE_TO_STATE = {
"created": _DEFAULT_STATES["todo"],
# ORCH-066: analysis -> Analysis, review -> Code-Review. The new keys alias
# the same in_progress / review UUIDs in _DEFAULT_STATES, so legacy callers /
# tests that compare against concrete UUIDs see byte-identical values.
"analysis": _DEFAULT_STATES["analysis"],
"architecture": _DEFAULT_STATES["architecture"],
"development": _DEFAULT_STATES["development"],
"review": _DEFAULT_STATES["code_review"],
"testing": _DEFAULT_STATES["testing"],
"deploy": _DEFAULT_STATES["in_progress"],
"done": _DEFAULT_STATES["done"],
}
# Map orchestrator stage -> logical state key (project-independent).
# ORCH-066: analysis -> analysis, review -> code_review (was in_progress/review).
# deploy stays in_progress (Phase A/B/C drive it directly, not update_issue_state).
_STAGE_TO_STATE_KEY = {
"created": "todo",
"analysis": "analysis",
"architecture": "architecture",
"development": "development",
"review": "code_review",
"testing": "testing",
"deploy": "in_progress",
"done": "done",
}
def stage_to_state(stage: str, project_id: str) -> str | None:
"""ORCH-10: return the Plane state UUID for a pipeline stage in a project.
Resolves via get_project_states so the correct per-project UUID is used.
Returns None for unknown stages (same behaviour as the old STAGE_TO_STATE
dict lookup returning None).
"""
key = _STAGE_TO_STATE_KEY.get(stage)
if not key:
return None
return get_project_states(project_id).get(key)
def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
"""M-6: GET the Plane issue by UUID and return its sequence_id (the
authoritative per-project number), or None if unavailable.
Returns None on network error, non-2xx, or a missing field - never raises,
so the webhook handler can fall back to DB increment and stay autonomous.
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
seq = resp.json().get("sequence_id")
return int(seq) if seq is not None else None
except Exception as e:
logger.warning(f"fetch_issue_sequence_id failed for {issue_id}: {e}")
return None
def fetch_issue_state(issue_id: str, project_id: str, timeout: int = 10) -> str | None:
"""ORCH-060 (F-1 Guard 2): GET the Plane issue and return its current state uuid.
Used by the reconciler to honour an explicit human gate: an issue a person
moved to **Blocked** / **Needs Input** must not be auto-unblocked by the
sweeper. Reuses the exact GET issue-detail endpoint / shared token already
used by ``fetch_issue_sequence_id`` / ``fetch_issue_fields``.
Plane returns ``state`` as a bare uuid string; older shapes may nest it as a
``{"id": ...}`` dict — both are handled.
ORCH-067 (Р-4): ``timeout`` is optional (default 10s — unchanged for the
reconciler) so the tracker live-overlay can read with a SHORT timeout
(settings.tracker_live_status_timeout_s) on the synchronous render path.
Returns None on network error, non-2xx, or a missing field — never raises, so
the caller can apply its conservative fallback (treat as "possibly blocked").
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=timeout)
resp.raise_for_status()
state = resp.json().get("state")
if isinstance(state, dict):
state = state.get("id")
return str(state) if state else None
except Exception as e:
logger.warning(f"fetch_issue_state failed for {issue_id}: {e}")
return None
def fetch_blocked_by_issue_ids(issue_id: str, project_id: str, timeout: int = 10) -> list[str]:
"""ORCH-026 (B-1): list the Plane issue UUIDs that ``issue_id`` is BLOCKED-BY.
Reads the Plane issue-relation endpoint and returns the related issue UUIDs
declared as ``blocked_by`` (i.e. the predecessors A that this task B waits
for). Plane's relation payload shape has varied across versions, so the parse
is defensive: it accepts either a grouped object (``{"blocked_by": [...]}``)
or a flat list of ``{"relation_type": ..., "related_issue": ...}`` rows, and
pulls a uuid from ``related_issue`` / ``issue`` / ``id`` (bare uuid or nested
``{"id": ...}``).
never-raise (AC-G1, self-hosting): a Plane outage / non-2xx / unexpected
shape -> ``[]`` (no edge declared), so the ingestion degrades conservatively
and the pipeline never stalls on the network.
"""
if not issue_id or not project_id:
return []
url = (
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}"
f"/issues/{issue_id}/issue-relation/"
)
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=timeout)
resp.raise_for_status()
body = resp.json()
except Exception as e:
logger.warning(f"fetch_blocked_by_issue_ids failed for {issue_id}: {e}")
return []
def _uuid_of(row) -> str | None:
if isinstance(row, str):
return row
if isinstance(row, dict):
for key in ("related_issue", "issue", "id"):
v = row.get(key)
if isinstance(v, dict):
v = v.get("id")
if v:
return str(v)
return None
out: list[str] = []
try:
rows = []
if isinstance(body, dict):
# Grouped shape: {"blocked_by": [...], "blocking": [...], ...}
if "blocked_by" in body and isinstance(body["blocked_by"], list):
rows = body["blocked_by"]
else:
# Flat shape nested under common envelope keys.
rows = body.get("results") or body.get("relations") or []
elif isinstance(body, list):
rows = body
for row in rows:
# In the flat shape, keep only blocked_by rows.
if isinstance(row, dict) and row.get("relation_type") not in (None, "blocked_by"):
continue
uid = _uuid_of(row)
if uid and uid != issue_id:
out.append(uid)
except Exception as e:
logger.warning(f"fetch_blocked_by_issue_ids parse error for {issue_id}: {e}")
return []
return out
import re as _re
def _strip_html(html: str) -> str:
"""Crude HTML -> text: drop tags and collapse whitespace. Good enough to
feed QG-0's length check when Plane only gives us description_html."""
if not html:
return ""
text = _re.sub(r"<[^>]+>", " ", html)
return _re.sub(r"\s+", " ", text).strip()
def fetch_issue_description(issue_id: str, project_id: str) -> str:
"""BUG 1: GET the Plane issue by UUID and return its description text.
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
CHANGED fields, so ``description``/``description_stripped`` are usually
absent there. start_pipeline calls this to pull the full description from the
issue detail endpoint so QG-0 does not blow up on an empty payload field.
Reuses the exact GET issue detail endpoint / shared token already used by
``fetch_issue_sequence_id`` (same URL, same PLANE_HEADERS). Prefers
``description_stripped``; falls back to stripping ``description_html``.
Returns "" on network error, non-2xx, or a missing field - never raises, so
a Plane outage degrades to the honest "empty description" QG-0 path instead
of crashing the webhook.
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
body = resp.json()
desc = body.get("description_stripped")
if desc and desc.strip():
return desc
return _strip_html(body.get("description_html") or "")
except Exception as e:
logger.warning(f"fetch_issue_description failed for {issue_id}: {e}")
return ""
def fetch_issue_fields(issue_id: str, project_id: str) -> tuple[str, str]:
"""BUG B: GET the Plane issue by UUID ONCE and return (name, description).
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
CHANGED fields, so BOTH ``name`` and ``description`` are usually absent in
the payload. start_pipeline needs the real title (for the branch slug) and
the real description (for the analyst .task.md). To avoid issuing two
separate issue-detail GETs (one for name, one for description), this single
request returns both.
Reuses the exact GET issue detail endpoint / shared token already used by
``fetch_issue_sequence_id`` / ``fetch_issue_description``. For the
description it applies the same logic as ``fetch_issue_description``
(prefer ``description_stripped``, fall back to stripping
``description_html``).
Returns ("", "") on network error, non-2xx, or missing body - never raises,
so a Plane outage degrades gracefully (caller keeps its payload fallbacks).
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
body = resp.json()
name = (body.get("name") or "").strip()
desc = body.get("description_stripped")
if desc and desc.strip():
description = desc
else:
description = _strip_html(body.get("description_html") or "")
return name, description
except Exception as e:
logger.warning(f"fetch_issue_fields failed for {issue_id}: {e}")
return "", ""
def list_issues_by_state(project_id: str, state_uuids: list[str]) -> list[dict]:
"""ORCH-053 (F-2): list a project's issues whose state is in ``state_uuids``.
GETs ``/workspaces/{ws}/projects/{pid}/issues/`` and walks ALL pages
(Plane's cursor pagination: ``results`` + ``next_cursor`` /
``next_page_results``), keeping only issues whose state uuid is one of the
requested ones. The filter is applied client-side on ``issue.state`` (a dict
``{id,...}`` or a bare uuid string) so it works regardless of whether Plane's
query-param state filter is honoured.
Never raises: on any network / API / shape error it logs a warning and
returns ``[]`` so a Plane outage degrades the F-2 tick softly instead of
crashing it.
"""
if not project_id or not state_uuids:
return []
wanted = set(state_uuids)
out: list[dict] = []
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/"
try:
cursor = None
pages = 0
while True:
params: dict = {"per_page": 100}
if cursor:
params["cursor"] = cursor
resp = httpx.get(url, headers=PLANE_HEADERS, params=params, timeout=10)
resp.raise_for_status()
body = resp.json()
if isinstance(body, dict):
items = body.get("results", [])
else:
items = body if isinstance(body, list) else []
for issue in items:
state = issue.get("state")
sid = state.get("id") if isinstance(state, dict) else state
if sid in wanted:
out.append(issue)
# Pagination: continue only while Plane reports more pages.
pages += 1
if not isinstance(body, dict):
break
has_more = bool(body.get("next_page_results"))
next_cursor = body.get("next_cursor")
if not has_more or not next_cursor or pages >= 100:
break
cursor = next_cursor
return out
except Exception as e:
logger.warning(
f"list_issues_by_state: API failed for project {project_id[:8]}..., "
f"returning []. Error: {e}"
)
return []
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
project_id = _resolve_project_id(work_item_id, project_id)
# Primary: lookup from DB (plane_issue_id column)
try:
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT plane_issue_id FROM tasks WHERE work_item_id = ? AND plane_issue_id IS NOT NULL",
(work_item_id,)
).fetchone()
if row and row[0]:
return row[0]
except Exception as e:
logger.debug(f"DB lookup failed for {work_item_id}: {e}")
# Fallback: search via Plane API
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/"
try:
# First try search by work_item_id
resp = httpx.get(url, headers=PLANE_HEADERS, params={"search": work_item_id}, timeout=10)
resp.raise_for_status()
data = resp.json()
results = data.get("results", data if isinstance(data, list) else [])
# M-6: match by sequence_id directly (the authoritative per-project
# number), parsed from the work_item_id suffix - no hardcoded prefix.
try:
target_num = int(work_item_id.rsplit("-", 1)[1])
except (IndexError, ValueError):
target_num = None
for issue in results:
if target_num is not None and issue.get("sequence_id") == target_num:
return issue["id"]
if work_item_id in issue.get("name", ""):
return issue["id"]
# Fallback: get all issues and match by sequence_id number (any prefix)
if target_num is not None:
resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp2.raise_for_status()
data2 = resp2.json()
results2 = data2.get("results", data2 if isinstance(data2, list) else [])
for issue in results2:
if issue.get("sequence_id") == target_num:
return issue["id"]
except Exception as e:
logger.error(f"Failed to find issue for {work_item_id}: {e}")
return None
def update_issue_state(work_item_id: str, stage: str, project_id: str = None):
"""Update Plane issue state based on orchestrator stage."""
project_id = _resolve_project_id(work_item_id, project_id)
# ORCH-10: resolve state UUID for this specific project (not global dict).
state_id = stage_to_state(stage, project_id)
if not state_id:
return
issue_id = find_issue_id(work_item_id, project_id)
if not issue_id:
logger.warning(f"Issue not found in Plane for {work_item_id}")
return
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
resp.raise_for_status()
logger.info(f"Plane: {work_item_id} state -> {stage} ({state_id[:8]}...)")
except Exception as e:
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
def add_comment(work_item_id: str, text: str, project_id: str = None, author: str = None):
"""Add a comment to a Plane issue.
feat(plane): when ``author`` (an agent role) maps to a configured bot
token, the comment is POSTed under that bot so Plane shows the real author.
Otherwise it falls back to the shared orchestrator token (see
``_headers_for``). GET/PATCH calls elsewhere keep using PLANE_HEADERS.
"""
project_id = _resolve_project_id(work_item_id, project_id)
issue_id = find_issue_id(work_item_id, project_id)
if not issue_id:
logger.warning(f"Issue not found in Plane for {work_item_id}, skipping comment")
return
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/"
html = f"<p>{text}</p>"
try:
resp = httpx.post(url, headers=_headers_for(author), json={"comment_html": html}, timeout=10)
resp.raise_for_status()
logger.info(f"Plane: comment added to {work_item_id} (author={author or 'orchestrator'})")
except Exception as e:
logger.error(f"Failed to add comment to {work_item_id}: {e}")
def set_issue_needs_input(work_item_id: str, project_id: str = None):
"""Set issue to 'Needs Input' state — waiting for stakeholder response."""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["needs_input"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_in_review(work_item_id: str, project_id: str = None):
"""Set issue to 'In Review' state — waiting for :approved: or :rejected:."""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["in_review"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_blocked(work_item_id: str, project_id: str = None):
"""Set issue to 'Blocked' state — manual intervention needed."""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["blocked"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_done(work_item_id: str, project_id: str = None):
"""Observability fix: force the issue into the TERMINAL Done state.
Used by the deploy->done success path so a completed task always reaches the
terminal Plane state (it used to stick on In Progress because the merge
webhook bypassed the stage engine). Resolves per-project UUID via
get_project_states (ORCH-10).
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["done"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_in_progress(work_item_id: str, project_id: str = None):
"""Set issue to 'In Progress' state — agent working."""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["in_progress"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_analysis(work_item_id: str, project_id: str = None):
"""ORCH-066: set issue to 'Analysis' — analyst is working (start / resume).
Degrades to the project's In Progress UUID when the 'Analysis' status is not
created (alias-fallback). never-raise (via _set_issue_state_direct).
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["analysis"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_code_review(work_item_id: str, project_id: str = None):
"""ORCH-066: set issue to 'Code-Review' — review stage indication.
Degrades to the project's Review UUID when 'Code-Review' is not created.
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["code_review"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_awaiting_deploy(work_item_id: str, project_id: str = None):
"""ORCH-066: set issue to 'Awaiting Deploy' — self-deploy Phase A approval-pending.
Degrades to the project's In Review UUID when 'Awaiting Deploy' is not created.
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["awaiting_deploy"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_deploying(work_item_id: str, project_id: str = None):
"""ORCH-066: set issue to 'Deploying' — self-deploy Phase B prod deploy in flight.
Degrades to the project's In Progress UUID when 'Deploying' is not created.
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["deploying"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_monitoring(work_item_id: str, project_id: str = None):
"""ORCH-066: set issue to 'Monitoring after Deploy' — post-deploy window open.
Degrades to the project's Done UUID when 'Monitoring after Deploy' is not
created (so the board shows Done, exactly as before ORCH-066).
"""
project_id = _resolve_project_id(work_item_id, project_id)
state_id = get_project_states(project_id)["monitoring"]
_set_issue_state_direct(work_item_id, state_id, project_id)
def set_issue_stage_state(work_item_id: str, stage: str, project_id: str = None):
"""Feature 3: move the issue to the board status for a pipeline stage.
Only the visible-stage statuses (architecture/development/review/testing)
are driven here — stages without a dedicated status (analysis/deploy) are a
no-op so the existing in_progress/in_review/needs_input logic stays in
charge. By design this does NOT touch Needs Input / In Review / Blocked,
which are higher priority and set explicitly by their own helpers.
"""
state_key = STAGE_VISIBILITY_STATE.get(stage)
if not state_key:
return
project_id = _resolve_project_id(work_item_id, project_id)
# ORCH-10: resolve per-project UUID.
state_id = get_project_states(project_id)[state_key]
_set_issue_state_direct(work_item_id, state_id, project_id)
def _set_issue_state_direct(work_item_id: str, state_id: str, project_id: str = None):
"""Set issue state directly by state_id."""
project_id = _resolve_project_id(work_item_id, project_id)
issue_id = find_issue_id(work_item_id, project_id)
if not issue_id:
logger.warning(f"Issue not found in Plane for {work_item_id}")
return
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
resp.raise_for_status()
logger.info(f"Plane: {work_item_id} state -> {state_id[:8]}...")
except Exception as e:
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None, project_id: str = None):
"""Notify Plane about stage transition with links."""
project_id = _resolve_project_id(work_item_id, project_id)
update_issue_state(work_item_id, new_stage, project_id)
msg = f"{EMOJI_STAGE} Stage: {old_stage}{new_stage}"
if agent:
msg += f" (launching {agent})"
# Add relevant links
gitea_base = "http://git.mva154.duckdns.org"
try:
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT branch, repo FROM tasks WHERE work_item_id=?", (work_item_id,)
).fetchone()
conn.close()
if row:
branch, repo = row
msg += chr(10) + "📂 Branch: [" + branch + "](" + gitea_base + "/admin/" + repo + "/src/branch/" + branch + ")"
if new_stage in ("review", "testing", "deploy"):
import httpx as _httpx
from .config import settings
_headers = {"Authorization": f"token {settings.gitea_token}"}
_resp = _httpx.get(
f"{settings.gitea_url}/api/v1/repos/{settings.gitea_owner}/{repo}/pulls",
params={"state": "open", "head": branch},
headers=_headers, timeout=5
)
if _resp.status_code == 200:
_prs = _resp.json()
if _prs:
pr_num = _prs[0]["number"]
msg += chr(10) + "🔗 PR: [#" + str(pr_num) + "](" + gitea_base + "/admin/" + repo + "/pulls/" + str(pr_num) + ")"
except Exception:
pass
# Stage transition is the orchestrator's own voice -> attribute to stream.
add_comment(work_item_id, msg, project_id, author="stream")
def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None):
"""Notify Plane about QG failure."""
# QG failure belongs to the agent that owns the failing stage.
add_comment(
work_item_id,
f"{EMOJI_QG_FAIL} QG failed at {stage}: {check}{reason}",
project_id,
author=STAGE_AUTHORS.get(stage, "stream"),
)
def notify_done(work_item_id: str, project_id: str = None):
"""Mark issue as Done in Plane."""
project_id = _resolve_project_id(work_item_id, project_id)
update_issue_state(work_item_id, "done", project_id)
# Deploy finished the task -> attribute the completion comment to Deployer.
add_comment(
work_item_id,
f"{EMOJI_DONE} Task completed! PR merged and deployed.",
project_id,
author="deployer",
)