feat(webhook): start pipeline on In Progress status (not on create)

Feature 1. work_item.created no longer starts the pipeline (soft QG-0 log only);
the issue stays in the backlog until moved to In Progress. The pipeline-start body
is extracted into start_pipeline(); a new issue updated handler routes a state
change to In Progress -> handle_status_start, which is idempotent: an existing task
for the plane_id is NOT re-created or restarted (protects handle_comment, which also
flips issues to In Progress). Real Plane payload: event=issue, action=updated,
data.state.id. Existing m6/plane_webhook/dedup tests updated to drive the new
trigger; new test_status_trigger.py covers created-no-op / start / idempotent.
This commit is contained in:
Dev Agent
2026-06-03 18:18:26 +03:00
parent a4668c0303
commit 09b1c5e1b9
5 changed files with 391 additions and 52 deletions

View File

@@ -92,38 +92,139 @@ async def plane_webhook(request: Request):
return {"status": "ignored", "reason": "unknown project"}
if (event == "work_item.created") or (event == "issue" and action == "created"):
# Feature 1: creation NO LONGER starts the pipeline. Slava keeps the
# backlog until he moves an issue to In Progress. We only run a soft
# QG-0 sanity log here (no branch, no analyst, no task row).
await handle_work_item_created(data, project_id)
elif (event == "work_item.updated") or (event == "issue" and action == "updated"):
# Feature 1 & 2: status changes drive the pipeline.
# Backlog/Todo/Triage -> In Progress : START the pipeline (idempotent)
# -> Approved : advance (== :approved: comment)
# -> Rejected : rollback (== :rejected: comment)
await handle_issue_updated(data, project_id)
elif (event == "comment.created") or (event == "issue_comment" and action == "created"):
await handle_comment(data, project_id)
return {"status": "accepted"}
async def handle_work_item_created(data: dict, project_id: str = ""):
def _state_id(data: dict) -> str:
"""Extract the new Plane state UUID from an 'issue updated' payload.
Real payload (verified from prod events): data.state is
{id, name, color, group}. Some payloads carry state as a bare UUID string.
"""
New work item created in Plane.
QG-0: validate title, description, priority.
If valid: create branch, init docs, launch analyst.
If invalid: comment with what's missing, set Blocked.
state = data.get("state")
if isinstance(state, dict):
return state.get("id", "") or ""
if isinstance(state, str):
return state
return ""
async def handle_issue_updated(data: dict, project_id: str = ""):
"""Feature 1 & 2: react to a Plane issue status change.
Routes the NEW state UUID (data.state.id) to:
- in_progress : start the pipeline if this issue has no task yet
(idempotent — an existing task is NOT restarted; protects handle_comment
which also flips issues to In Progress during approve/answer flows).
- approved : same as a :approved: comment (advance current stage).
- rejected : same as a :rejected: comment (rollback + relaunch).
Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.)
is ignored here — those are statuses the orchestrator itself sets.
"""
from ..plane_sync import PLANE_STATES
plane_id = str(data.get("id") or "")
new_state = _state_id(data)
if not plane_id or not new_state:
logger.info("issue updated without id/state, ignoring")
return
if new_state == PLANE_STATES["in_progress"]:
await handle_status_start(data, project_id)
elif new_state == PLANE_STATES["approved"]:
await handle_verdict(data, project_id, approved=True)
elif new_state == PLANE_STATES["rejected"]:
await handle_verdict(data, project_id, approved=False)
else:
logger.info(f"issue {plane_id} updated to state {new_state[:8]}..., no pipeline action")
async def handle_status_start(data: dict, project_id: str = ""):
"""Feature 1: an issue moved into In Progress -> start the pipeline.
Idempotent: if a task already exists for this plane_id, do nothing (no dup,
no analyst restart). This is what makes handle_comment's set_issue_in_progress
safe — by then the task already exists, so the start is skipped.
"""
plane_id = str(data.get("id") or "")
existing = get_task_by_plane_id(plane_id)
if existing:
logger.info(
f"Status->In Progress for {plane_id}: task already exists "
f"(stage={existing.get('stage')}), not restarting"
)
return
logger.info(f"Status->In Progress for {plane_id}: starting pipeline")
await start_pipeline(data, project_id)
async def handle_verdict(data: dict, project_id: str, approved: bool):
"""Feature 2 (variant B): a status verdict mirrors the comment verdicts.
Approved status == :approved: comment -> _try_advance_stage.
Rejected status == :rejected: comment -> rollback to previous stage + relaunch
(reason is unknown from a status change; Slava writes it in a separate
comment, so we pass a fixed note).
"""
plane_id = str(data.get("id") or "")
task = get_task_by_plane_id(plane_id)
if not task:
logger.warning(f"Verdict status for {plane_id} but no task found, ignoring")
return
task_id = task["id"]
current_stage = task["stage"]
repo = task["repo"]
work_item_id = task.get("work_item_id", "")
branch = task.get("branch", "")
if approved:
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
logger.info(f"Task {task_id}: Approved status -> advance from {current_stage}")
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
return
# Rejected: mirror the :rejected: comment rollback branch.
reason = "(rejected via status, see latest comment)"
await _rollback_stage(
task_id, current_stage, repo, work_item_id, branch, reason
)
async def handle_work_item_created(data: dict, project_id: str = ""):
"""Feature 1: creation does NOT start the pipeline anymore.
The pipeline is started when Slava moves the issue into In Progress
(handle_status_start -> start_pipeline). On creation we only run a SOFT QG-0
sanity check and log the result — NO branch, NO docs, NO analyst, NO task row
— so the issue can sit in the backlog until Slava is ready.
"""
plane_id = data.get("id", "")
name = data.get("name", "untitled")
description = data.get("description_stripped", data.get("description", ""))
priority = data.get("priority", {})
priority_name = priority if isinstance(priority, str) else priority.get("name", "")
errors = _qg0_errors(name, description)
if errors:
logger.info(f"work_item.created {plane_id}: soft QG-0 warnings: {errors}")
else:
logger.info(f"work_item.created {plane_id} ('{name}'): in backlog, awaiting In Progress")
# ORCH-6: resolve repo / prefix / Plane project from the registry instead of
# the single hardcoded default_repo.
if not project_id:
project_id = data.get("project") or data.get("project_id") or ""
proj = get_project_by_plane_id(project_id)
if not proj:
logger.warning(f"handle_work_item_created: unknown project '{project_id}', ignoring {plane_id}")
return
repo = proj.repo
plane_project_id = proj.plane_project_id
# QG-0 validation
def _qg0_errors(name: str, description: str) -> list:
"""QG-0 validation: returns a list of human-readable problems (empty = OK)."""
errors = []
if not name or len(name) < 5:
errors.append("Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 5 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)")
@@ -132,6 +233,36 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
if not description or len(description.strip()) < 20:
errors.append("Description \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 20 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)")
return errors
async def start_pipeline(data: dict, project_id: str = ""):
"""Feature 1: start the pipeline for an issue (moved to In Progress).
This is the body extracted from the old handle_work_item_created: resolve the
project, run QG-0 (hard — blocks on failure), create the work item id +
branch + initial docs, insert the task row, and enqueue the analyst.
Callers (handle_status_start) already guarantee no existing task for this
plane_id, so this never duplicates.
"""
plane_id = data.get("id", "")
name = data.get("name", "untitled")
description = data.get("description_stripped", data.get("description", ""))
# ORCH-6: resolve repo / prefix / Plane project from the registry instead of
# the single hardcoded default_repo.
if not project_id:
project_id = data.get("project") or data.get("project_id") or ""
proj = get_project_by_plane_id(project_id)
if not proj:
logger.warning(f"start_pipeline: unknown project '{project_id}', ignoring {plane_id}")
return
repo = proj.repo
plane_project_id = proj.plane_project_id
# QG-0 validation (hard gate on pipeline start)
errors = _qg0_errors(name, description)
if errors:
# QG-0 failed
error_text = "\u26a0\ufe0f QG-0 failed:\n" + "\n".join(f"\u2022 {e}" for e in errors)
@@ -240,36 +371,7 @@ async def handle_comment(data: dict, project_id: str = ""):
if ":rejected:" in comment_body:
# Extract reason (text after :rejected:)
reason = comment_body.split(":rejected:", 1)[-1].strip()[:300]
if current_stage == "analysis":
# Already in analysis — just relaunch analyst with rejection reason
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
f"Reason: {reason}\nRevise and improve."
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _plane_comment
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}", author="analyst")
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
else:
# Rollback to previous stage
prev_stage = get_previous_stage(current_stage)
if prev_stage:
update_task_stage(task_id, prev_stage)
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
notify_stage_change(task_id, current_stage, prev_stage)
plane_notify_stage(work_item_id, current_stage, prev_stage)
from ..plane_sync import add_comment as _plane_comment, STAGE_AUTHORS
_plane_comment(
work_item_id,
f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}",
author=STAGE_AUTHORS.get(prev_stage, "stream"),
)
logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}")
await _rollback_stage(task_id, current_stage, repo, work_item_id, branch, reason)
return
if ":approved:" in comment_body:
@@ -338,6 +440,72 @@ async def handle_comment(data: dict, project_id: str = ""):
logger.error(f"Failed to check issue state: {e}")
async def _rollback_stage(
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str,
reason: str,
):
"""Shared :rejected: / Rejected-status rollback (Feature 2 variant B).
Both the :rejected: comment and a status change to Rejected funnel here so
the two mechanisms behave identically:
- at analysis: relaunch the analyst with the rejection reason;
- otherwise: roll back to the previous stage and relaunch its agent
(via the existing rollback notify + an enqueue of the prev-stage agent).
"""
if current_stage == "analysis":
# Already in analysis — just relaunch analyst with rejection reason
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
f"Reason: {reason}\nRevise and improve."
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _plane_comment
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}", author="analyst")
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
return
# Rollback to previous stage
prev_stage = get_previous_stage(current_stage)
if not prev_stage:
logger.info(f"Task {task_id}: rejected at {current_stage} but no previous stage")
return
update_task_stage(task_id, prev_stage)
notify_stage_change(task_id, current_stage, prev_stage)
# Feature 3: plane_notify_stage moves the board to the prev stage's status.
plane_notify_stage(work_item_id, current_stage, prev_stage)
# Then put it back to In Progress so the relaunched agent is clearly working.
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
from ..plane_sync import add_comment as _plane_comment, STAGE_AUTHORS
_plane_comment(
work_item_id,
f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}",
author=STAGE_AUTHORS.get(prev_stage, "stream"),
)
# Relaunch the previous stage's agent so the rollback actually re-runs work.
# STAGE_AUTHORS maps a stage directly to the role that OWNS work in it
# (analysis->analyst, architecture->architect, ...), which is exactly the
# agent we must re-run on a rollback into prev_stage.
from ..plane_sync import STAGE_AUTHORS as _STAGE_AUTHORS
prev_agent = _STAGE_AUTHORS.get(prev_stage)
if prev_agent:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: {prev_stage}\nNote: Stakeholder REJECTED. Reason: {reason}\n"
f"Revise and improve."
)
new_job = enqueue_job(prev_agent, repo, task_desc, task_id=task_id)
logger.info(
f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}, "
f"enqueued {prev_agent} (job_id={new_job})"
)
else:
logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}")
async def _try_advance_stage(
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str
):

View File

@@ -102,16 +102,22 @@ def test_fetch_sequence_id_missing_field_returns_none():
# handle_work_item_created: seq available -> prefix-NNN
# ---------------------------------------------------------------------------
# Feature 1: pipeline starts on a status change to In Progress, not on creation.
_IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
def _post(plane_id, plane_project_id=ORCH_PLANE_ID, name="A valid work item title"):
return client.post(
"/webhook/plane",
json={
"event": "work_item.created",
"event": "issue",
"action": "updated",
"data": {
"id": plane_id,
"name": name,
"description_stripped": "This is a sufficiently long description.",
"project": plane_project_id,
"state": {"id": _IN_PROGRESS, "name": "In Progress", "group": "started"},
},
},
)

View File

@@ -73,16 +73,24 @@ def setup(monkeypatch):
os.unlink(_test_db)
# Feature 1: the pipeline now starts on a status change to In Progress (not on
# creation). _post_created drives that status-change event so these ORCH-6
# routing tests still exercise task creation through the new trigger.
_IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
def _post_created(plane_project_id, plane_id="wi-1", name="A valid work item title"):
return client.post(
"/webhook/plane",
json={
"event": "work_item.created",
"event": "issue",
"action": "updated",
"data": {
"id": plane_id,
"name": name,
"description_stripped": "This is a sufficiently long description.",
"project": plane_project_id,
"state": {"id": _IN_PROGRESS, "name": "In Progress", "group": "started"},
},
},
)

View File

@@ -0,0 +1,150 @@
"""Feature 1: pipeline starts on status -> In Progress, not on creation.
* work_item.created / issue created -> NO task, NO branch, NO analyst.
* issue updated -> In Progress (from backlog) -> task created + analyst enqueued.
* a second In Progress update for the same issue -> NO duplicate, NO restart
(protects handle_comment, which also flips issues to In Progress).
launcher / Gitea network are mocked. Real FastAPI endpoint via TestClient.
"""
import os
import tempfile
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_status_trigger.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import pytest # noqa: E402
from unittest.mock import patch, AsyncMock # noqa: E402
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import projects as P # noqa: E402
from src.projects import reload_projects # noqa: E402
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122"
client = TestClient(app)
@pytest.fixture(autouse=True)
def setup(monkeypatch):
monkeypatch.setattr(P.settings, "db_path", _test_db)
import src.db as _db
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
registry_json = (
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
)
monkeypatch.setattr(P.settings, "projects_json", registry_json)
reload_projects()
yield
reload_projects()
if os.path.exists(_test_db):
os.unlink(_test_db)
def _created(plane_id="st-created"):
return client.post("/webhook/plane", json={
"event": "issue", "action": "created",
"data": {
"id": plane_id, "name": "A valid backlog item title",
"description_stripped": "A sufficiently long description for QG-0.",
"project": ENDURO_PLANE_ID,
"state": {"id": BACKLOG, "name": "Backlog", "group": "backlog"},
},
})
def _to_in_progress(plane_id="st-1"):
return client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": plane_id, "name": "A valid backlog item title",
"description_stripped": "A sufficiently long description for QG-0.",
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
})
def _count(plane_id):
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()[0]
conn.close()
return n
# --------------------------------------------------------------------------- #
@patch("src.webhooks.plane.enqueue_job")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
def test_created_does_not_start_pipeline(mock_branch, mock_docs, mock_enqueue):
resp = _created("st-created")
assert resp.status_code == 200
assert resp.json()["status"] == "accepted"
# No task, no branch, no analyst enqueue.
assert _count("st-created") == 0
mock_branch.assert_not_called()
mock_enqueue.assert_not_called()
@patch("src.webhooks.plane.enqueue_job")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
def test_in_progress_starts_pipeline(mock_seq, mock_branch, mock_docs, mock_enqueue):
mock_enqueue.return_value = 1
resp = _to_in_progress("st-1")
assert resp.status_code == 200
assert resp.json()["status"] == "accepted"
assert _count("st-1") == 1
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE plane_id='st-1'").fetchone()
conn.close()
assert task["stage"] == "analysis"
assert task["repo"] == "enduro-trails"
mock_branch.assert_called_once()
# analyst enqueued exactly once
assert mock_enqueue.call_count == 1
assert mock_enqueue.call_args.args[0] == "analyst"
@patch("src.webhooks.plane.enqueue_job")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
def test_repeat_in_progress_is_idempotent(mock_seq, mock_branch, mock_docs, mock_enqueue):
mock_enqueue.return_value = 1
_to_in_progress("st-2")
assert _count("st-2") == 1
assert mock_enqueue.call_count == 1
# Second In Progress update (e.g. handle_comment re-set the status). Use a
# DISTINCT body (different activity old_value) so webhook dedup does NOT
# short-circuit it — this exercises the existing-task idempotency guard in
# handle_status_start, not the delivery-dedup layer.
resp = client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": "st-2", "name": "A valid backlog item title",
"description_stripped": "A sufficiently long description for QG-0.",
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "some-other-state"},
})
assert resp.status_code == 200
assert _count("st-2") == 1 # still exactly one task
assert mock_enqueue.call_count == 1 # analyst NOT re-enqueued

View File

@@ -211,14 +211,21 @@ def test_gitea_fallback_hash_when_no_delivery_header():
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate."""
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate.
Feature 1: the pipeline now starts on a status change to In Progress, not on
creation, so this drives the dedup test with an 'issue updated' event.
"""
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
body = {
"event": "work_item.created",
"event": "issue",
"action": "updated",
"data": {
"id": "pd-001",
"name": "Dedup plane task",
"description_stripped": "A sufficiently long description for QG-0 to pass.",
"project": "proj-1",
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
}
r1 = client.post("/webhook/plane", json=body)