Files
orchestrator/src/projects.py
2026-06-05 19:45:19 +03:00

152 lines
5.7 KiB
Python

"""ORCH-6: Project registry — map Plane project id -> repo / work-item prefix.
Root cause of the 2026-06-02 incident: the Plane webhook listened to the whole
workspace and hardcoded ``repo = settings.default_repo`` (enduro-trails). Every
issue from any project was funneled into one repo with one prefix (ET).
This module introduces a small registry keyed by the Plane project uuid so the
orchestrator can:
* filter webhooks by project (ignore unknown projects),
* resolve the gitea repo + work-item prefix for a known project,
* route Plane sync (state/comment) into the issue's own project.
Source of truth: ``settings.projects_json`` (a JSON array set via the
``ORCH_PROJECTS_JSON`` env var). If unset/empty/invalid, a built-in default
registry is used so the system works out of the box.
"""
import json
import logging
from dataclasses import dataclass, field
from .config import settings
logger = logging.getLogger("orchestrator.projects")
@dataclass(frozen=True)
class ProjectConfig:
plane_project_id: str # uuid of the Plane project (registry key)
repo: str # gitea repo name (== folder under /repos)
work_item_prefix: str # ET / ORCH
name: str # human-readable label
# ORCH-41: optional per-project agent->model / agent->effort overrides parsed
# from projects_json. frozen dataclass + mutable default -> field(default_factory=dict)
# (a bare {} default raises ValueError). Empty dict = no override (old records work).
agent_models: dict = field(default_factory=dict)
agent_efforts: dict = field(default_factory=dict)
# Built-in default registry (used when ORCH_PROJECTS_JSON is empty/invalid).
# Keep enduro-trails first so existing behaviour is the safe default.
_DEFAULT_PROJECTS = [
ProjectConfig(
plane_project_id="7a79f0a9-5278-49cd-9007-9a338f238f9c",
repo="enduro-trails",
work_item_prefix="ET",
name="enduro-trails",
),
ProjectConfig(
plane_project_id="8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a",
repo="orchestrator",
work_item_prefix="ORCH",
name="orchestrator",
),
]
def _coerce_str_map(value, idx, field_name) -> dict:
"""ORCH-41: coerce an optional projects_json sub-object into a {str: str} dict.
Missing / null -> {} (no override). A non-object value is logged and dropped so
one malformed entry can never brick the whole registry; non-string keys/values
are stringified for safety.
"""
if value is None:
return {}
if not isinstance(value, dict):
logger.error(
f"ORCH_PROJECTS_JSON[{idx}].{field_name} is not an object, ignoring"
)
return {}
return {str(k): str(v) for k, v in value.items()}
def _parse_projects_json(raw: str) -> list[ProjectConfig] | None:
"""Parse ORCH_PROJECTS_JSON. Returns None if empty/invalid (-> use default)."""
if not raw or not raw.strip():
return None
try:
data = json.loads(raw)
except (ValueError, TypeError) as e:
logger.error(f"ORCH_PROJECTS_JSON is not valid JSON, falling back to default: {e}")
return None
if not isinstance(data, list):
logger.error("ORCH_PROJECTS_JSON must be a JSON array, falling back to default")
return None
parsed: list[ProjectConfig] = []
for i, item in enumerate(data):
if not isinstance(item, dict):
logger.error(f"ORCH_PROJECTS_JSON[{i}] is not an object, skipping")
continue
try:
parsed.append(
ProjectConfig(
plane_project_id=str(item["plane_project_id"]),
repo=str(item["repo"]),
work_item_prefix=str(item["work_item_prefix"]),
name=str(item.get("name", item["repo"])),
agent_models=_coerce_str_map(item.get("agent_models"), i, "agent_models"),
agent_efforts=_coerce_str_map(item.get("agent_efforts"), i, "agent_efforts"),
)
)
except KeyError as e:
logger.error(f"ORCH_PROJECTS_JSON[{i}] missing required key {e}, skipping")
continue
if not parsed:
logger.error("ORCH_PROJECTS_JSON produced no valid entries, falling back to default")
return None
return parsed
def _load_projects() -> list[ProjectConfig]:
parsed = _parse_projects_json(getattr(settings, "projects_json", "") or "")
if parsed is not None:
logger.info(f"Project registry loaded from ORCH_PROJECTS_JSON: {len(parsed)} project(s)")
return parsed
return list(_DEFAULT_PROJECTS)
# Module-level registry, built once at import.
PROJECTS: list[ProjectConfig] = _load_projects()
_BY_PLANE_ID: dict[str, ProjectConfig] = {p.plane_project_id: p for p in PROJECTS}
_BY_REPO: dict[str, ProjectConfig] = {p.repo: p for p in PROJECTS}
def get_project_by_plane_id(plane_project_id: str) -> ProjectConfig | None:
"""Resolve project config by Plane project uuid. None if unknown."""
if not plane_project_id:
return None
return _BY_PLANE_ID.get(plane_project_id)
def get_project_by_repo(repo: str) -> ProjectConfig | None:
"""Resolve project config by gitea repo name. None if unknown."""
if not repo:
return None
return _BY_REPO.get(repo)
def known_plane_project_ids() -> set[str]:
"""Set of Plane project ids the orchestrator is configured to handle."""
return set(_BY_PLANE_ID.keys())
def reload_projects() -> None:
"""Rebuild the registry from current settings (used by tests)."""
global PROJECTS, _BY_PLANE_ID, _BY_REPO
PROJECTS = _load_projects()
_BY_PLANE_ID = {p.plane_project_id: p for p in PROJECTS}
_BY_REPO = {p.repo: p for p in PROJECTS}