Compare commits
4 Commits
feature/pi
...
fix/pipeli
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c69e11348b | ||
|
|
ac9f5a05a6 | ||
|
|
fa746105fd | ||
| 4773137b52 |
38
src/db.py
38
src/db.py
@@ -159,6 +159,44 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
||||
return f"{prefix}-{next_num:03d}"
|
||||
|
||||
|
||||
def ensure_unique_work_item_id(work_item_id: str, repo: str) -> str:
|
||||
"""BUG 2a: guarantee work_item_id uniqueness within (repo) over M-6 derive.
|
||||
|
||||
M-6 derives the work_item_id from the Plane sequence_id. That number can
|
||||
collide (e.g. an issue was deleted and the sequence reused, or two issues
|
||||
map to the same number) -> the SAME ET-NNN gets handed to two different
|
||||
tasks, which then physically share a branch/worktree slug prefix and step on
|
||||
each other (see ET-006: task 8 and task 25).
|
||||
|
||||
This is a guard LAYERED ON TOP of the M-6 derive (it does NOT replace it):
|
||||
given the derived id, if that exact <PREFIX>-NNN already exists in the tasks
|
||||
table for this repo, walk forward (ET-007, ET-008, ...) until a free number
|
||||
is found and return that instead. If the derived id is free, it is returned
|
||||
unchanged.
|
||||
"""
|
||||
if not work_item_id or "-" not in work_item_id:
|
||||
return work_item_id
|
||||
prefix, num_str = work_item_id.rsplit("-", 1)
|
||||
try:
|
||||
num = int(num_str)
|
||||
except ValueError:
|
||||
return work_item_id
|
||||
width = len(num_str)
|
||||
|
||||
conn = get_db()
|
||||
try:
|
||||
candidate = work_item_id
|
||||
while conn.execute(
|
||||
"SELECT 1 FROM tasks WHERE repo = ? AND work_item_id = ? LIMIT 1",
|
||||
(repo, candidate),
|
||||
).fetchone() is not None:
|
||||
num += 1
|
||||
candidate = f"{prefix}-{num:0{width}d}"
|
||||
return candidate
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-5 (M-7): idempotent webhook event logging
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -155,6 +155,48 @@ def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
|
||||
return None
|
||||
|
||||
|
||||
import re as _re
|
||||
|
||||
|
||||
def _strip_html(html: str) -> str:
|
||||
"""Crude HTML -> text: drop tags and collapse whitespace. Good enough to
|
||||
feed QG-0's length check when Plane only gives us description_html."""
|
||||
if not html:
|
||||
return ""
|
||||
text = _re.sub(r"<[^>]+>", " ", html)
|
||||
return _re.sub(r"\s+", " ", text).strip()
|
||||
|
||||
|
||||
def fetch_issue_description(issue_id: str, project_id: str) -> str:
|
||||
"""BUG 1: GET the Plane issue by UUID and return its description text.
|
||||
|
||||
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
|
||||
CHANGED fields, so ``description``/``description_stripped`` are usually
|
||||
absent there. start_pipeline calls this to pull the full description from the
|
||||
issue detail endpoint so QG-0 does not blow up on an empty payload field.
|
||||
|
||||
Reuses the exact GET issue detail endpoint / shared token already used by
|
||||
``fetch_issue_sequence_id`` (same URL, same PLANE_HEADERS). Prefers
|
||||
``description_stripped``; falls back to stripping ``description_html``.
|
||||
|
||||
Returns "" on network error, non-2xx, or a missing field - never raises, so
|
||||
a Plane outage degrades to the honest "empty description" QG-0 path instead
|
||||
of crashing the webhook.
|
||||
"""
|
||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
||||
try:
|
||||
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||||
resp.raise_for_status()
|
||||
body = resp.json()
|
||||
desc = body.get("description_stripped")
|
||||
if desc and desc.strip():
|
||||
return desc
|
||||
return _strip_html(body.get("description_html") or "")
|
||||
except Exception as e:
|
||||
logger.warning(f"fetch_issue_description failed for {issue_id}: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
|
||||
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
|
||||
project_id = _resolve_project_id(work_item_id, project_id)
|
||||
|
||||
@@ -13,6 +13,7 @@ from ..db import (
|
||||
get_db,
|
||||
get_task_by_plane_id,
|
||||
get_next_work_item_id,
|
||||
ensure_unique_work_item_id,
|
||||
update_task_stage,
|
||||
enqueue_job,
|
||||
insert_event_dedup,
|
||||
@@ -261,6 +262,23 @@ async def start_pipeline(data: dict, project_id: str = ""):
|
||||
repo = proj.repo
|
||||
plane_project_id = proj.plane_project_id
|
||||
|
||||
# BUG 1: Plane's issue.updated webhook (status change -> In Progress) sends
|
||||
# only the CHANGED fields, so description / description_stripped are usually
|
||||
# empty here even though the issue HAS a description. If the payload's
|
||||
# description is missing/too short, pull the full one from the Plane issue
|
||||
# detail API (same GET endpoint + shared token already used by
|
||||
# fetch_issue_sequence_id) before QG-0 runs. If the API is also empty, QG-0
|
||||
# legitimately fails (truly empty ticket).
|
||||
if not description or len(description.strip()) < 20:
|
||||
from ..plane_sync import fetch_issue_description
|
||||
fetched = fetch_issue_description(plane_id, plane_project_id)
|
||||
if fetched and len(fetched.strip()) >= len(description.strip()):
|
||||
description = fetched
|
||||
logger.info(
|
||||
f"start_pipeline: pulled description from Plane API for {plane_id} "
|
||||
f"({len(description.strip())} chars)"
|
||||
)
|
||||
|
||||
# QG-0 validation (hard gate on pipeline start)
|
||||
errors = _qg0_errors(name, description)
|
||||
if errors:
|
||||
@@ -300,10 +318,41 @@ async def start_pipeline(data: dict, project_id: str = ""):
|
||||
f"fell back to DB increment: {work_item_id}"
|
||||
)
|
||||
|
||||
# BUG 2a: uniqueness-guard LAYERED ON TOP of the M-6 derive above (the derive
|
||||
# itself is untouched). If the derived ET-NNN is already taken by another
|
||||
# task in this repo (collision -> two tasks would share branch/worktree, see
|
||||
# ET-006), bump to the next free number.
|
||||
_derived = work_item_id
|
||||
work_item_id = ensure_unique_work_item_id(work_item_id, repo)
|
||||
if work_item_id != _derived:
|
||||
logger.warning(
|
||||
f"work_item_id collision: derived {_derived} already in use for "
|
||||
f"{repo}; reassigned {plane_id} -> {work_item_id}"
|
||||
)
|
||||
|
||||
# Create slug from name
|
||||
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]
|
||||
branch = f"feature/{work_item_id}-{slug}"
|
||||
|
||||
# BUG 2b (defense-in-depth): the worktree/path is keyed by BRANCH
|
||||
# (git_worktree.get_worktree_path) and tasks are reverse-resolved by
|
||||
# (repo, branch). With 2a the work_item_id is unique, so the branch prefix is
|
||||
# too; but the slug could still collide (e.g. two issues with the same title
|
||||
# under different ids -> fine) or, worse, an identical branch already exist.
|
||||
# Guard physically: if this exact branch is already owned by another task in
|
||||
# this repo, disambiguate with the (now unique) work_item_id so two tasks can
|
||||
# never share a worktree.
|
||||
_conn_b = get_db()
|
||||
_branch_taken = _conn_b.execute(
|
||||
"SELECT 1 FROM tasks WHERE repo = ? AND branch = ? LIMIT 1", (repo, branch)
|
||||
).fetchone()
|
||||
_conn_b.close()
|
||||
if _branch_taken is not None:
|
||||
branch = f"feature/{work_item_id}-{plane_id[:8]}"
|
||||
logger.warning(
|
||||
f"branch collision for {repo}; disambiguated to unique branch {branch}"
|
||||
)
|
||||
|
||||
# Insert task into DB
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
|
||||
210
tests/test_pipeline_start_bugs.py
Normal file
210
tests/test_pipeline_start_bugs.py
Normal file
@@ -0,0 +1,210 @@
|
||||
"""Tests for the two pipeline-start bugs surfaced by the ET-006 live run.
|
||||
|
||||
BUG 1: issue.updated (status -> In Progress) ships a payload WITHOUT the
|
||||
description, so start_pipeline must pull it from the Plane issue API
|
||||
before QG-0 runs (otherwise QG-0 wrongly blocks the issue).
|
||||
|
||||
BUG 2a: M-6 derives work_item_id from the Plane sequence_id, which can collide.
|
||||
ensure_unique_work_item_id() must hand out the next FREE id instead of
|
||||
reusing one that is already in the tasks table.
|
||||
|
||||
BUG 2b: two tasks with an (artificially) identical work_item_id must not share a
|
||||
branch/worktree.
|
||||
|
||||
launcher / Gitea / Plane network are mocked. Real FastAPI endpoint via
|
||||
TestClient for the BUG 1 end-to-end path.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_pipeline_bugs.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
import pytest # noqa: E402
|
||||
from unittest.mock import patch, AsyncMock # noqa: E402
|
||||
from fastapi.testclient import TestClient # noqa: E402
|
||||
|
||||
from src.main import app # noqa: E402
|
||||
from src.db import init_db, get_db, ensure_unique_work_item_id # noqa: E402
|
||||
from src import projects as P # noqa: E402
|
||||
from src.projects import reload_projects # noqa: E402
|
||||
from src.git_worktree import get_worktree_path # noqa: E402
|
||||
|
||||
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
|
||||
BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122"
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(monkeypatch):
|
||||
monkeypatch.setattr(P.settings, "db_path", _test_db)
|
||||
import src.db as _db
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
|
||||
registry_json = (
|
||||
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
|
||||
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
|
||||
)
|
||||
monkeypatch.setattr(P.settings, "projects_json", registry_json)
|
||||
reload_projects()
|
||||
yield
|
||||
reload_projects()
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
def _insert_task(work_item_id, branch, plane_id="x"):
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(plane_id, work_item_id, "enduro-trails", branch, "analysis", plane_id),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def _count(plane_id):
|
||||
conn = get_db()
|
||||
n = conn.execute("SELECT COUNT(*) FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()[0]
|
||||
conn.close()
|
||||
return n
|
||||
|
||||
|
||||
def _task(plane_id):
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT * FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()
|
||||
conn.close()
|
||||
return row
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# BUG 1
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _to_in_progress_no_desc(plane_id="bug1"):
|
||||
"""issue.updated payload WITHOUT description (only changed fields)."""
|
||||
return client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": plane_id, "name": "A valid backlog item title",
|
||||
# NO description / description_stripped here, exactly like Plane sends
|
||||
# on a status change.
|
||||
"project": ENDURO_PLANE_ID,
|
||||
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
|
||||
})
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.enqueue_job", return_value=1)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
|
||||
@patch("src.plane_sync.fetch_issue_description",
|
||||
return_value="This is a sufficiently long description fetched from Plane API.")
|
||||
def test_status_start_fetches_description(
|
||||
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||
):
|
||||
"""BUG 1: empty description in payload -> start_pipeline pulls it from the
|
||||
Plane API -> QG-0 passes -> task created + analyst enqueued (NOT blocked)."""
|
||||
resp = _to_in_progress_no_desc("bug1")
|
||||
assert resp.status_code == 200
|
||||
# description was pulled from the API
|
||||
mock_desc.assert_called_once()
|
||||
# QG-0 passed -> task created and analyst launched (NOT set_issue_blocked)
|
||||
assert _count("bug1") == 1
|
||||
assert _task("bug1")["stage"] == "analysis"
|
||||
mock_enqueue.assert_called_once()
|
||||
assert mock_enqueue.call_args.args[0] == "analyst"
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.enqueue_job", return_value=1)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
|
||||
@patch("src.plane_sync.fetch_issue_description", return_value="")
|
||||
def test_status_start_empty_api_still_blocks(
|
||||
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||
):
|
||||
"""BUG 1 negative path: if the API also returns empty, QG-0 legitimately
|
||||
fails -> NO task is created (truly empty ticket)."""
|
||||
resp = _to_in_progress_no_desc("bug1-empty")
|
||||
assert resp.status_code == 200
|
||||
mock_desc.assert_called_once()
|
||||
assert _count("bug1-empty") == 0
|
||||
mock_enqueue.assert_not_called()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# BUG 2a
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_work_item_id_uniqueness():
|
||||
"""BUG 2a: if ET-006 is already in tasks, the guard returns the next free
|
||||
id (ET-007), not ET-006 again."""
|
||||
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="old")
|
||||
assert ensure_unique_work_item_id("ET-006", "enduro-trails") == "ET-007"
|
||||
|
||||
# ET-006 AND ET-007 taken -> next free is ET-008.
|
||||
_insert_task("ET-007", "feature/ET-007-something", plane_id="old2")
|
||||
assert ensure_unique_work_item_id("ET-006", "enduro-trails") == "ET-008"
|
||||
|
||||
# A free id is returned unchanged.
|
||||
assert ensure_unique_work_item_id("ET-099", "enduro-trails") == "ET-099"
|
||||
|
||||
# Per-repo isolation: a different repo with the same id is not a collision.
|
||||
assert ensure_unique_work_item_id("ET-006", "other-repo") == "ET-006"
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.enqueue_job", return_value=1)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=6)
|
||||
@patch("src.plane_sync.fetch_issue_description",
|
||||
return_value="A sufficiently long description for QG-0 to pass cleanly.")
|
||||
def test_collision_reassigns_in_start_pipeline(
|
||||
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||
):
|
||||
"""BUG 2a end-to-end: ET-006 already exists -> a new In Progress issue whose
|
||||
Plane sequence_id is also 6 must NOT reuse ET-006."""
|
||||
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="task8")
|
||||
resp = client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": "task25", "name": "Popup enduro trails feature",
|
||||
"description_stripped": "A sufficiently long description for QG-0.",
|
||||
"project": ENDURO_PLANE_ID,
|
||||
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
new_id = _task("task25")["work_item_id"]
|
||||
assert new_id != "ET-006"
|
||||
assert new_id == "ET-007"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# BUG 2b
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_worktree_per_task():
|
||||
"""BUG 2b: two tasks must not resolve to the same worktree path. With the
|
||||
uniqueness guard the branches differ, so the worktree paths differ too."""
|
||||
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="task8")
|
||||
# The second task gets a unique id via the guard...
|
||||
new_id = ensure_unique_work_item_id("ET-006", "enduro-trails")
|
||||
assert new_id == "ET-007"
|
||||
branch_a = "feature/ET-006-gpx-upload"
|
||||
branch_b = f"feature/{new_id}-popup-enduro-trails"
|
||||
|
||||
wt_a = get_worktree_path("enduro-trails", branch_a)
|
||||
wt_b = get_worktree_path("enduro-trails", branch_b)
|
||||
assert wt_a != wt_b, "two tasks must not share a worktree path"
|
||||
Reference in New Issue
Block a user