Compare commits

...

4 Commits

Author SHA1 Message Date
Dev Agent
c69e11348b test(pipeline): cover status-start description fetch and work_item_id uniqueness
- test_status_start_fetches_description: empty payload description -> pulled from
  Plane API (mocked) -> QG-0 passes, analyst enqueued.
- test_status_start_empty_api_still_blocks: empty API -> honest QG-0 fail.
- test_work_item_id_uniqueness: ET-006 taken -> next free id, per-repo isolation.
- test_collision_reassigns_in_start_pipeline: end-to-end collision reassignment.
- test_worktree_per_task: two tasks never share a worktree path.
2026-06-03 21:12:59 +03:00
Dev Agent
ac9f5a05a6 fix(work-item): prevent work_item_id collision and bind branch per task
ET-006 was handed to two different tasks because M-6 derives work_item_id from
the Plane sequence_id, which can collide -> the two tasks shared a branch/worktree
slug prefix and stepped on each other.

2a: ensure_unique_work_item_id() is a uniqueness-guard LAYERED ON TOP of the M-6
derive (derive is untouched): if the derived ET-NNN already exists in tasks for
the repo, it walks forward to the next free number. Applied in start_pipeline
after the derive.

2b (defense-in-depth): worktree is keyed by branch; if the resulting branch is
already owned by another task in the repo, disambiguate it with the unique
work_item_id + plane id so two tasks can never share a worktree.
2026-06-03 21:12:51 +03:00
Dev Agent
fa746105fd fix(webhook): fetch description from Plane API on status-start
Plane issue.updated (status -> In Progress) ships only changed fields, so the
webhook payload has no description and QG-0 wrongly blocked issues. start_pipeline
now pulls the full description from the Plane issue detail API (reusing the same
GET endpoint + shared token as fetch_issue_sequence_id) when the payload field is
empty/short, before QG-0 runs. Empty API -> honest QG-0 fail (truly empty ticket).
2026-06-03 21:12:38 +03:00
4773137b52 Merge pull request 'feat: pipeline UX — status-trigger, verdict statuses, stage visibility, token usage' (#10) from feature/pipeline-ux into main 2026-06-03 18:27:07 +03:00
4 changed files with 339 additions and 0 deletions

View File

@@ -159,6 +159,44 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
return f"{prefix}-{next_num:03d}"
def ensure_unique_work_item_id(work_item_id: str, repo: str) -> str:
"""BUG 2a: guarantee work_item_id uniqueness within (repo) over M-6 derive.
M-6 derives the work_item_id from the Plane sequence_id. That number can
collide (e.g. an issue was deleted and the sequence reused, or two issues
map to the same number) -> the SAME ET-NNN gets handed to two different
tasks, which then physically share a branch/worktree slug prefix and step on
each other (see ET-006: task 8 and task 25).
This is a guard LAYERED ON TOP of the M-6 derive (it does NOT replace it):
given the derived id, if that exact <PREFIX>-NNN already exists in the tasks
table for this repo, walk forward (ET-007, ET-008, ...) until a free number
is found and return that instead. If the derived id is free, it is returned
unchanged.
"""
if not work_item_id or "-" not in work_item_id:
return work_item_id
prefix, num_str = work_item_id.rsplit("-", 1)
try:
num = int(num_str)
except ValueError:
return work_item_id
width = len(num_str)
conn = get_db()
try:
candidate = work_item_id
while conn.execute(
"SELECT 1 FROM tasks WHERE repo = ? AND work_item_id = ? LIMIT 1",
(repo, candidate),
).fetchone() is not None:
num += 1
candidate = f"{prefix}-{num:0{width}d}"
return candidate
finally:
conn.close()
# ---------------------------------------------------------------------------
# ORCH-5 (M-7): idempotent webhook event logging
# ---------------------------------------------------------------------------

View File

@@ -155,6 +155,48 @@ def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
return None
import re as _re
def _strip_html(html: str) -> str:
"""Crude HTML -> text: drop tags and collapse whitespace. Good enough to
feed QG-0's length check when Plane only gives us description_html."""
if not html:
return ""
text = _re.sub(r"<[^>]+>", " ", html)
return _re.sub(r"\s+", " ", text).strip()
def fetch_issue_description(issue_id: str, project_id: str) -> str:
"""BUG 1: GET the Plane issue by UUID and return its description text.
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
CHANGED fields, so ``description``/``description_stripped`` are usually
absent there. start_pipeline calls this to pull the full description from the
issue detail endpoint so QG-0 does not blow up on an empty payload field.
Reuses the exact GET issue detail endpoint / shared token already used by
``fetch_issue_sequence_id`` (same URL, same PLANE_HEADERS). Prefers
``description_stripped``; falls back to stripping ``description_html``.
Returns "" on network error, non-2xx, or a missing field - never raises, so
a Plane outage degrades to the honest "empty description" QG-0 path instead
of crashing the webhook.
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
body = resp.json()
desc = body.get("description_stripped")
if desc and desc.strip():
return desc
return _strip_html(body.get("description_html") or "")
except Exception as e:
logger.warning(f"fetch_issue_description failed for {issue_id}: {e}")
return ""
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
project_id = _resolve_project_id(work_item_id, project_id)

View File

@@ -13,6 +13,7 @@ from ..db import (
get_db,
get_task_by_plane_id,
get_next_work_item_id,
ensure_unique_work_item_id,
update_task_stage,
enqueue_job,
insert_event_dedup,
@@ -261,6 +262,23 @@ async def start_pipeline(data: dict, project_id: str = ""):
repo = proj.repo
plane_project_id = proj.plane_project_id
# BUG 1: Plane's issue.updated webhook (status change -> In Progress) sends
# only the CHANGED fields, so description / description_stripped are usually
# empty here even though the issue HAS a description. If the payload's
# description is missing/too short, pull the full one from the Plane issue
# detail API (same GET endpoint + shared token already used by
# fetch_issue_sequence_id) before QG-0 runs. If the API is also empty, QG-0
# legitimately fails (truly empty ticket).
if not description or len(description.strip()) < 20:
from ..plane_sync import fetch_issue_description
fetched = fetch_issue_description(plane_id, plane_project_id)
if fetched and len(fetched.strip()) >= len(description.strip()):
description = fetched
logger.info(
f"start_pipeline: pulled description from Plane API for {plane_id} "
f"({len(description.strip())} chars)"
)
# QG-0 validation (hard gate on pipeline start)
errors = _qg0_errors(name, description)
if errors:
@@ -300,10 +318,41 @@ async def start_pipeline(data: dict, project_id: str = ""):
f"fell back to DB increment: {work_item_id}"
)
# BUG 2a: uniqueness-guard LAYERED ON TOP of the M-6 derive above (the derive
# itself is untouched). If the derived ET-NNN is already taken by another
# task in this repo (collision -> two tasks would share branch/worktree, see
# ET-006), bump to the next free number.
_derived = work_item_id
work_item_id = ensure_unique_work_item_id(work_item_id, repo)
if work_item_id != _derived:
logger.warning(
f"work_item_id collision: derived {_derived} already in use for "
f"{repo}; reassigned {plane_id} -> {work_item_id}"
)
# Create slug from name
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]
branch = f"feature/{work_item_id}-{slug}"
# BUG 2b (defense-in-depth): the worktree/path is keyed by BRANCH
# (git_worktree.get_worktree_path) and tasks are reverse-resolved by
# (repo, branch). With 2a the work_item_id is unique, so the branch prefix is
# too; but the slug could still collide (e.g. two issues with the same title
# under different ids -> fine) or, worse, an identical branch already exist.
# Guard physically: if this exact branch is already owned by another task in
# this repo, disambiguate with the (now unique) work_item_id so two tasks can
# never share a worktree.
_conn_b = get_db()
_branch_taken = _conn_b.execute(
"SELECT 1 FROM tasks WHERE repo = ? AND branch = ? LIMIT 1", (repo, branch)
).fetchone()
_conn_b.close()
if _branch_taken is not None:
branch = f"feature/{work_item_id}-{plane_id[:8]}"
logger.warning(
f"branch collision for {repo}; disambiguated to unique branch {branch}"
)
# Insert task into DB
conn = get_db()
conn.execute(

View File

@@ -0,0 +1,210 @@
"""Tests for the two pipeline-start bugs surfaced by the ET-006 live run.
BUG 1: issue.updated (status -> In Progress) ships a payload WITHOUT the
description, so start_pipeline must pull it from the Plane issue API
before QG-0 runs (otherwise QG-0 wrongly blocks the issue).
BUG 2a: M-6 derives work_item_id from the Plane sequence_id, which can collide.
ensure_unique_work_item_id() must hand out the next FREE id instead of
reusing one that is already in the tasks table.
BUG 2b: two tasks with an (artificially) identical work_item_id must not share a
branch/worktree.
launcher / Gitea / Plane network are mocked. Real FastAPI endpoint via
TestClient for the BUG 1 end-to-end path.
"""
import os
import tempfile
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_pipeline_bugs.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import pytest # noqa: E402
from unittest.mock import patch, AsyncMock # noqa: E402
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db, ensure_unique_work_item_id # noqa: E402
from src import projects as P # noqa: E402
from src.projects import reload_projects # noqa: E402
from src.git_worktree import get_worktree_path # noqa: E402
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122"
client = TestClient(app)
@pytest.fixture(autouse=True)
def setup(monkeypatch):
monkeypatch.setattr(P.settings, "db_path", _test_db)
import src.db as _db
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
registry_json = (
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
)
monkeypatch.setattr(P.settings, "projects_json", registry_json)
reload_projects()
yield
reload_projects()
if os.path.exists(_test_db):
os.unlink(_test_db)
def _insert_task(work_item_id, branch, plane_id="x"):
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
"VALUES (?, ?, ?, ?, ?, ?)",
(plane_id, work_item_id, "enduro-trails", branch, "analysis", plane_id),
)
conn.commit()
conn.close()
def _count(plane_id):
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()[0]
conn.close()
return n
def _task(plane_id):
conn = get_db()
row = conn.execute("SELECT * FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()
conn.close()
return row
# --------------------------------------------------------------------------- #
# BUG 1
# --------------------------------------------------------------------------- #
def _to_in_progress_no_desc(plane_id="bug1"):
"""issue.updated payload WITHOUT description (only changed fields)."""
return client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": plane_id, "name": "A valid backlog item title",
# NO description / description_stripped here, exactly like Plane sends
# on a status change.
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
})
@patch("src.webhooks.plane.enqueue_job", return_value=1)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
@patch("src.plane_sync.fetch_issue_description",
return_value="This is a sufficiently long description fetched from Plane API.")
def test_status_start_fetches_description(
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
):
"""BUG 1: empty description in payload -> start_pipeline pulls it from the
Plane API -> QG-0 passes -> task created + analyst enqueued (NOT blocked)."""
resp = _to_in_progress_no_desc("bug1")
assert resp.status_code == 200
# description was pulled from the API
mock_desc.assert_called_once()
# QG-0 passed -> task created and analyst launched (NOT set_issue_blocked)
assert _count("bug1") == 1
assert _task("bug1")["stage"] == "analysis"
mock_enqueue.assert_called_once()
assert mock_enqueue.call_args.args[0] == "analyst"
@patch("src.webhooks.plane.enqueue_job", return_value=1)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
@patch("src.plane_sync.fetch_issue_description", return_value="")
def test_status_start_empty_api_still_blocks(
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
):
"""BUG 1 negative path: if the API also returns empty, QG-0 legitimately
fails -> NO task is created (truly empty ticket)."""
resp = _to_in_progress_no_desc("bug1-empty")
assert resp.status_code == 200
mock_desc.assert_called_once()
assert _count("bug1-empty") == 0
mock_enqueue.assert_not_called()
# --------------------------------------------------------------------------- #
# BUG 2a
# --------------------------------------------------------------------------- #
def test_work_item_id_uniqueness():
"""BUG 2a: if ET-006 is already in tasks, the guard returns the next free
id (ET-007), not ET-006 again."""
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="old")
assert ensure_unique_work_item_id("ET-006", "enduro-trails") == "ET-007"
# ET-006 AND ET-007 taken -> next free is ET-008.
_insert_task("ET-007", "feature/ET-007-something", plane_id="old2")
assert ensure_unique_work_item_id("ET-006", "enduro-trails") == "ET-008"
# A free id is returned unchanged.
assert ensure_unique_work_item_id("ET-099", "enduro-trails") == "ET-099"
# Per-repo isolation: a different repo with the same id is not a collision.
assert ensure_unique_work_item_id("ET-006", "other-repo") == "ET-006"
@patch("src.webhooks.plane.enqueue_job", return_value=1)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=6)
@patch("src.plane_sync.fetch_issue_description",
return_value="A sufficiently long description for QG-0 to pass cleanly.")
def test_collision_reassigns_in_start_pipeline(
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
):
"""BUG 2a end-to-end: ET-006 already exists -> a new In Progress issue whose
Plane sequence_id is also 6 must NOT reuse ET-006."""
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="task8")
resp = client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": "task25", "name": "Popup enduro trails feature",
"description_stripped": "A sufficiently long description for QG-0.",
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
})
assert resp.status_code == 200
new_id = _task("task25")["work_item_id"]
assert new_id != "ET-006"
assert new_id == "ET-007"
# --------------------------------------------------------------------------- #
# BUG 2b
# --------------------------------------------------------------------------- #
def test_worktree_per_task():
"""BUG 2b: two tasks must not resolve to the same worktree path. With the
uniqueness guard the branches differ, so the worktree paths differ too."""
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="task8")
# The second task gets a unique id via the guard...
new_id = ensure_unique_work_item_id("ET-006", "enduro-trails")
assert new_id == "ET-007"
branch_a = "feature/ET-006-gpx-upload"
branch_b = f"feature/{new_id}-popup-enduro-trails"
wt_a = get_worktree_path("enduro-trails", branch_a)
wt_b = get_worktree_path("enduro-trails", branch_b)
assert wt_a != wt_b, "two tasks must not share a worktree path"