Compare commits

..

8 Commits

Author SHA1 Message Date
Dev Agent
857bad314c feat(webhook): pull reject reason from latest comment
handle_verdict(rejected): the reason is now pulled from the issue latest Plane
comment (_latest_comment_reason: GET comments, newest by created_at, HTML
stripped) instead of a fixed stub. Slava writes the reason in a comment before
flipping the status to Rejected. Falls back to a fixed note when there is no
comment / the API call fails.

tests: add test_status_only_verdict.py (test_inreview_comment_does_not_revert
[bug 3 root], test_any_comment_no_pipeline_action,
test_approved_status_advances_without_inprogress_reset,
test_rejected_status_pulls_reason_from_comment) and
test_inprogress_from_needs_input_relaunches_analyst in test_status_trigger.py.
Rewrote the comment-based tests (test_verdict_status, test_plane_approved/
rejected in test_webhooks) under the status-only model: comments are no-ops,
verdicts come from status changes.
2026-06-03 22:18:24 +03:00
Dev Agent
c4be50ee20 fix(webhook): drop redundant in_progress reset on Approved
handle_verdict(approved): removed set_issue_in_progress(work_item_id) before
_try_advance_stage. _try_advance_stage -> advance_stage -> plane_notify_stage
already PATCHes the issue to the NEXT stage status, so the reset only made the
board flicker In Progress before the next stage (part of bug 3).
2026-06-03 22:18:13 +03:00
Dev Agent
6b3e144949 fix(webhook): remove comment-based approve, keep status-only verdict
Status-only verdict model: comments NEVER drive the pipeline. Removed the
whole comment-based control mechanism from handle_comment (:approved: /
:rejected: / answer-to-questions) which caused bug 3 (echo self-hit): the
analyst posts its own "waiting for approval" comment, handle_comment catches
its own comment and reverts In Review -> In Progress. handle_comment is now a
pure logger with no side effects.

handle_status_start: a return to In Progress on an EXISTING task (Slava
answered the analyst questions in Needs Input) now RELAUNCHES the stage agent
instead of being a no-op. Distinguished from a duplicate In Progress webhook
via has_active_job_for_task() (new db helper): no active job => agent idle =>
relaunch; active job => busy => skip (no double launch).
2026-06-03 22:18:02 +03:00
cd73c75cda Merge pull request 'fix: pipeline-start bugs (ET-006) — fetch description on status-start + work_item_id collision guard' (#11) from fix/pipeline-start-bugs into main 2026-06-03 21:14:44 +03:00
Dev Agent
c69e11348b test(pipeline): cover status-start description fetch and work_item_id uniqueness
- test_status_start_fetches_description: empty payload description -> pulled from
  Plane API (mocked) -> QG-0 passes, analyst enqueued.
- test_status_start_empty_api_still_blocks: empty API -> honest QG-0 fail.
- test_work_item_id_uniqueness: ET-006 taken -> next free id, per-repo isolation.
- test_collision_reassigns_in_start_pipeline: end-to-end collision reassignment.
- test_worktree_per_task: two tasks never share a worktree path.
2026-06-03 21:12:59 +03:00
Dev Agent
ac9f5a05a6 fix(work-item): prevent work_item_id collision and bind branch per task
ET-006 was handed to two different tasks because M-6 derives work_item_id from
the Plane sequence_id, which can collide -> the two tasks shared a branch/worktree
slug prefix and stepped on each other.

2a: ensure_unique_work_item_id() is a uniqueness-guard LAYERED ON TOP of the M-6
derive (derive is untouched): if the derived ET-NNN already exists in tasks for
the repo, it walks forward to the next free number. Applied in start_pipeline
after the derive.

2b (defense-in-depth): worktree is keyed by branch; if the resulting branch is
already owned by another task in the repo, disambiguate it with the unique
work_item_id + plane id so two tasks can never share a worktree.
2026-06-03 21:12:51 +03:00
Dev Agent
fa746105fd fix(webhook): fetch description from Plane API on status-start
Plane issue.updated (status -> In Progress) ships only changed fields, so the
webhook payload has no description and QG-0 wrongly blocked issues. start_pipeline
now pulls the full description from the Plane issue detail API (reusing the same
GET endpoint + shared token as fetch_issue_sequence_id) when the payload field is
empty/short, before QG-0 runs. Empty API -> honest QG-0 fail (truly empty ticket).
2026-06-03 21:12:38 +03:00
4773137b52 Merge pull request 'feat: pipeline UX — status-trigger, verdict statuses, stage visibility, token usage' (#10) from feature/pipeline-ux into main 2026-06-03 18:27:07 +03:00
8 changed files with 926 additions and 181 deletions

View File

@@ -159,6 +159,44 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
return f"{prefix}-{next_num:03d}"
def ensure_unique_work_item_id(work_item_id: str, repo: str) -> str:
"""BUG 2a: guarantee work_item_id uniqueness within (repo) over M-6 derive.
M-6 derives the work_item_id from the Plane sequence_id. That number can
collide (e.g. an issue was deleted and the sequence reused, or two issues
map to the same number) -> the SAME ET-NNN gets handed to two different
tasks, which then physically share a branch/worktree slug prefix and step on
each other (see ET-006: task 8 and task 25).
This is a guard LAYERED ON TOP of the M-6 derive (it does NOT replace it):
given the derived id, if that exact <PREFIX>-NNN already exists in the tasks
table for this repo, walk forward (ET-007, ET-008, ...) until a free number
is found and return that instead. If the derived id is free, it is returned
unchanged.
"""
if not work_item_id or "-" not in work_item_id:
return work_item_id
prefix, num_str = work_item_id.rsplit("-", 1)
try:
num = int(num_str)
except ValueError:
return work_item_id
width = len(num_str)
conn = get_db()
try:
candidate = work_item_id
while conn.execute(
"SELECT 1 FROM tasks WHERE repo = ? AND work_item_id = ? LIMIT 1",
(repo, candidate),
).fetchone() is not None:
num += 1
candidate = f"{prefix}-{num:0{width}d}"
return candidate
finally:
conn.close()
# ---------------------------------------------------------------------------
# ORCH-5 (M-7): idempotent webhook event logging
# ---------------------------------------------------------------------------
@@ -313,6 +351,23 @@ def mark_job(
conn.close()
def has_active_job_for_task(task_id: int) -> bool:
"""True if the task already has a queued or running job.
Used by the status-only verdict model (handle_status_start) to guard against
double-launching an agent when a duplicate In Progress webhook arrives or a
job is still in flight. The events de-dup absorbs identical webhook bodies;
this guards against distinct webhooks while a job is pending/running.
"""
conn = get_db()
row = conn.execute(
"SELECT 1 FROM jobs WHERE task_id = ? AND status IN ('queued','running') LIMIT 1",
(task_id,),
).fetchone()
conn.close()
return row is not None
def count_running_jobs() -> int:
"""Number of jobs currently in 'running' status (for max_concurrency)."""
conn = get_db()

View File

@@ -155,6 +155,48 @@ def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
return None
import re as _re
def _strip_html(html: str) -> str:
"""Crude HTML -> text: drop tags and collapse whitespace. Good enough to
feed QG-0's length check when Plane only gives us description_html."""
if not html:
return ""
text = _re.sub(r"<[^>]+>", " ", html)
return _re.sub(r"\s+", " ", text).strip()
def fetch_issue_description(issue_id: str, project_id: str) -> str:
"""BUG 1: GET the Plane issue by UUID and return its description text.
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
CHANGED fields, so ``description``/``description_stripped`` are usually
absent there. start_pipeline calls this to pull the full description from the
issue detail endpoint so QG-0 does not blow up on an empty payload field.
Reuses the exact GET issue detail endpoint / shared token already used by
``fetch_issue_sequence_id`` (same URL, same PLANE_HEADERS). Prefers
``description_stripped``; falls back to stripping ``description_html``.
Returns "" on network error, non-2xx, or a missing field - never raises, so
a Plane outage degrades to the honest "empty description" QG-0 path instead
of crashing the webhook.
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
body = resp.json()
desc = body.get("description_stripped")
if desc and desc.strip():
return desc
return _strip_html(body.get("description_html") or "")
except Exception as e:
logger.warning(f"fetch_issue_description failed for {issue_id}: {e}")
return ""
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
project_id = _resolve_project_id(work_item_id, project_id)

View File

@@ -13,6 +13,7 @@ from ..db import (
get_db,
get_task_by_plane_id,
get_next_work_item_id,
ensure_unique_work_item_id,
update_task_stage,
enqueue_job,
insert_event_dedup,
@@ -97,10 +98,12 @@ async def plane_webhook(request: Request):
# QG-0 sanity log here (no branch, no analyst, no task row).
await handle_work_item_created(data, project_id)
elif (event == "work_item.updated") or (event == "issue" and action == "updated"):
# Feature 1 & 2: status changes drive the pipeline.
# Backlog/Todo/Triage -> In Progress : START the pipeline (idempotent)
# -> Approved : advance (== :approved: comment)
# -> Rejected : rollback (== :rejected: comment)
# Status-only verdict model: status changes drive the pipeline.
# Backlog/Todo/Triage -> In Progress : START pipeline, or relaunch the
# stage agent if returned from
# Needs Input.
# -> Approved : advance to the next stage.
# -> Rejected : rollback (reason from latest comment).
await handle_issue_updated(data, project_id)
elif (event == "comment.created") or (event == "issue_comment" and action == "created"):
await handle_comment(data, project_id)
@@ -126,11 +129,11 @@ async def handle_issue_updated(data: dict, project_id: str = ""):
"""Feature 1 & 2: react to a Plane issue status change.
Routes the NEW state UUID (data.state.id) to:
- in_progress : start the pipeline if this issue has no task yet
(idempotent — an existing task is NOT restarted; protects handle_comment
which also flips issues to In Progress during approve/answer flows).
- approved : same as a :approved: comment (advance current stage).
- rejected : same as a :rejected: comment (rollback + relaunch).
- in_progress : start the pipeline if this issue has no task yet; if a
task already exists and the stage agent is idle (returned from Needs
Input), relaunch the stage agent so it reads Slava's fresh comments.
- approved : advance to the next stage.
- rejected : rollback to the previous stage (reason from latest comment).
Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.)
is ignored here — those are statuses the orchestrator itself sets.
"""
@@ -153,31 +156,105 @@ async def handle_issue_updated(data: dict, project_id: str = ""):
async def handle_status_start(data: dict, project_id: str = ""):
"""Feature 1: an issue moved into In Progress -> start the pipeline.
"""An issue moved into In Progress.
Idempotent: if a task already exists for this plane_id, do nothing (no dup,
no analyst restart). This is what makes handle_comment's set_issue_in_progress
safe — by then the task already exists, so the start is skipped.
Two cases under the status-only verdict model:
1. No task yet for this plane_id -> START the pipeline (start_pipeline).
2. A task already exists -> this is Slava returning the issue from
Needs Input to In Progress after answering the analyst's questions. We
must RELAUNCH the current stage's agent so it reads the fresh comments
from Plane (the answer-to-questions flow used to live in handle_comment;
it is now status-driven).
KEY FORK — telling "answer to questions" apart from a plain duplicate In
Progress webhook (the dedup-protection case):
The tasks table stores no Plane status, and the issue.updated payload only
carries the NEW state (In Progress), so we cannot read the previous status
from here. Instead we use the only reliable local signal: whether the
stage's agent is currently in flight.
- The orchestrator sets In Progress itself while an agent runs. When the
agent FINISHES it leaves the issue in Needs Input or In Review and has
NO queued/running job. So: an existing task with NO active job means the
agent is idle / waiting -> a return to In Progress is a genuine relaunch
request -> enqueue the stage agent.
- If a queued/running job already exists for the task, the agent is busy
(or a duplicate webhook arrived) -> SKIP (no double launch). The events
de-dup at the top of plane_webhook already absorbs identical webhook
bodies; this job guard additionally covers distinct webhooks fired while
a job is still pending/running.
"""
from ..db import has_active_job_for_task
plane_id = str(data.get("id") or "")
existing = get_task_by_plane_id(plane_id)
if existing:
if not existing:
logger.info(f"Status->In Progress for {plane_id}: starting pipeline")
await start_pipeline(data, project_id)
return
task_id = existing["id"]
current_stage = existing["stage"]
repo = existing["repo"]
work_item_id = existing.get("work_item_id", "")
branch = existing.get("branch", "")
# Duplicate / busy guard: a job is already pending or running for this task.
if has_active_job_for_task(task_id):
logger.info(
f"Status->In Progress for {plane_id}: task already exists "
f"(stage={existing.get('stage')}), not restarting"
f"Status->In Progress for {plane_id}: task {task_id} already has an "
f"active job (stage={current_stage}), not relaunching"
)
return
logger.info(f"Status->In Progress for {plane_id}: starting pipeline")
await start_pipeline(data, project_id)
# Agent is idle -> Slava answered questions and returned the issue to In
# Progress. Relaunch the current stage's agent to read the fresh comments.
from ..plane_sync import STAGE_AUTHORS, add_comment as _add_comment
stage_agent = STAGE_AUTHORS.get(current_stage)
if not stage_agent:
logger.info(
f"Status->In Progress for {plane_id}: no agent for stage "
f"'{current_stage}', not relaunching"
)
return
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: {current_stage}\nNote: Stakeholder returned the issue to In "
f"Progress (answered your questions). Read the latest comments in Plane "
f"and revise your artifacts."
)
job_id = enqueue_job(stage_agent, repo, task_desc, task_id=task_id)
logger.info(
f"Task {task_id}: returned to In Progress (Needs Input answered), "
f"relaunched {stage_agent} for stage {current_stage} (job_id={job_id})"
)
try:
_add_comment(
work_item_id,
"\U0001f504 \u0410\u0433\u0435\u043d\u0442 \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.",
author=stage_agent,
)
except Exception as e:
logger.error(f"Failed to post relaunch comment for {work_item_id}: {e}")
async def handle_verdict(data: dict, project_id: str, approved: bool):
"""Feature 2 (variant B): a status verdict mirrors the comment verdicts.
"""Status-only verdict: a Plane status change drives advance / rollback.
Approved status == :approved: comment -> _try_advance_stage.
Rejected status == :rejected: comment -> rollback to previous stage + relaunch
(reason is unknown from a status change; Slava writes it in a separate
comment, so we pass a fixed note).
Approved status -> _try_advance_stage. We do NOT touch the issue status here:
_try_advance_stage -> advance_stage -> plane_notify_stage already PATCHes the
issue to the NEXT stage's status. The old set_issue_in_progress call reset
the status to In Progress first, which made the board flicker In Progress
before the next stage (part of bug 3); it is removed.
Rejected status -> rollback to the previous stage. The reason is pulled from
the issue's latest comment (Slava writes the reason in a comment before/with
flipping the status to Rejected).
"""
plane_id = str(data.get("id") or "")
task = get_task_by_plane_id(plane_id)
@@ -192,19 +269,68 @@ async def handle_verdict(data: dict, project_id: str, approved: bool):
branch = task.get("branch", "")
if approved:
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
# NOTE: no set_issue_in_progress here — _try_advance_stage sets the next
# stage's status itself (advance_stage -> plane_notify_stage).
logger.info(f"Task {task_id}: Approved status -> advance from {current_stage}")
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
return
# Rejected: mirror the :rejected: comment rollback branch.
reason = "(rejected via status, see latest comment)"
# Rejected: pull the rejection reason from the issue's latest comment.
issue_id = task.get("plane_issue_id") or task.get("plane_id") or plane_id
reason = _latest_comment_reason(issue_id, repo, project_id)
await _rollback_stage(
task_id, current_stage, repo, work_item_id, branch, reason
)
def _latest_comment_reason(issue_id: str, repo: str, project_id: str = "") -> str:
"""Fetch the issue's most recent comment text (HTML stripped) as the reject
reason. Slava writes the reason in a comment before/with flipping the status
to Rejected.
Returns a fixed fallback when there is no comment / the API call fails.
"""
from ..plane_sync import (
PLANE_BASE,
PLANE_HEADERS,
WORKSPACE,
PROJECT_ID as _DEFAULT_PROJECT_ID,
)
fallback = "Rejected via status, no reason comment"
if not issue_id:
return fallback
_proj = get_project_by_repo(repo)
pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID)
url = (
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{pid}/issues/"
f"{issue_id}/comments/"
)
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
if resp.status_code != 200:
logger.warning(
f"reject-reason: GET comments for {issue_id} returned "
f"{resp.status_code}"
)
return fallback
payload = resp.json()
comments = payload.get("results", payload) if isinstance(payload, dict) else payload
if not comments:
return fallback
latest = max(comments, key=lambda c: c.get("created_at", "") or "")
raw = (
latest.get("comment_stripped")
or latest.get("comment_html")
or latest.get("comment")
or ""
)
text = re.sub(r"<[^>]+>", "", raw).strip()
return text[:300] if text else fallback
except Exception as e:
logger.error(f"reject-reason: failed to fetch comments for {issue_id}: {e}")
return fallback
async def handle_work_item_created(data: dict, project_id: str = ""):
"""Feature 1: creation does NOT start the pipeline anymore.
@@ -261,6 +387,23 @@ async def start_pipeline(data: dict, project_id: str = ""):
repo = proj.repo
plane_project_id = proj.plane_project_id
# BUG 1: Plane's issue.updated webhook (status change -> In Progress) sends
# only the CHANGED fields, so description / description_stripped are usually
# empty here even though the issue HAS a description. If the payload's
# description is missing/too short, pull the full one from the Plane issue
# detail API (same GET endpoint + shared token already used by
# fetch_issue_sequence_id) before QG-0 runs. If the API is also empty, QG-0
# legitimately fails (truly empty ticket).
if not description or len(description.strip()) < 20:
from ..plane_sync import fetch_issue_description
fetched = fetch_issue_description(plane_id, plane_project_id)
if fetched and len(fetched.strip()) >= len(description.strip()):
description = fetched
logger.info(
f"start_pipeline: pulled description from Plane API for {plane_id} "
f"({len(description.strip())} chars)"
)
# QG-0 validation (hard gate on pipeline start)
errors = _qg0_errors(name, description)
if errors:
@@ -300,10 +443,41 @@ async def start_pipeline(data: dict, project_id: str = ""):
f"fell back to DB increment: {work_item_id}"
)
# BUG 2a: uniqueness-guard LAYERED ON TOP of the M-6 derive above (the derive
# itself is untouched). If the derived ET-NNN is already taken by another
# task in this repo (collision -> two tasks would share branch/worktree, see
# ET-006), bump to the next free number.
_derived = work_item_id
work_item_id = ensure_unique_work_item_id(work_item_id, repo)
if work_item_id != _derived:
logger.warning(
f"work_item_id collision: derived {_derived} already in use for "
f"{repo}; reassigned {plane_id} -> {work_item_id}"
)
# Create slug from name
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]
branch = f"feature/{work_item_id}-{slug}"
# BUG 2b (defense-in-depth): the worktree/path is keyed by BRANCH
# (git_worktree.get_worktree_path) and tasks are reverse-resolved by
# (repo, branch). With 2a the work_item_id is unique, so the branch prefix is
# too; but the slug could still collide (e.g. two issues with the same title
# under different ids -> fine) or, worse, an identical branch already exist.
# Guard physically: if this exact branch is already owned by another task in
# this repo, disambiguate with the (now unique) work_item_id so two tasks can
# never share a worktree.
_conn_b = get_db()
_branch_taken = _conn_b.execute(
"SELECT 1 FROM tasks WHERE repo = ? AND branch = ? LIMIT 1", (repo, branch)
).fetchone()
_conn_b.close()
if _branch_taken is not None:
branch = f"feature/{work_item_id}-{plane_id[:8]}"
logger.warning(
f"branch collision for {repo}; disambiguated to unique branch {branch}"
)
# Insert task into DB
conn = get_db()
conn.execute(
@@ -346,108 +520,34 @@ async def start_pipeline(data: dict, project_id: str = ""):
async def handle_comment(data: dict, project_id: str = ""):
"""Status-only verdict model: comments NEVER drive the pipeline.
The whole comment-based control mechanism (``:approved:`` / ``:rejected:``
and the analysis answer-to-questions flow) was removed. It caused bug 3
(echo self-hit): the analyst posts its own "waiting for approval" comment,
handle_comment catches its own comment and reverts In Review -> In Progress.
Comments are now logged only — no status change, no enqueue, no side effect.
The pipeline is driven solely by status changes (handle_issue_updated):
- Approved -> advance
- Rejected -> rollback (reason pulled from the latest comment)
- In Progress (returned from Needs Input) -> relaunch the stage agent
"""
Handle comment event — check for :approved: or :rejected:.
Advance or rollback stage accordingly.
"""
comment_body = data.get("comment_stripped", data.get("comment", data.get("body", data.get("comment_html", ""))))
plane_id = str(data.get("work_item_id") or data.get("issue_id") or data.get("issue") or "")
if not plane_id:
logger.warning("Comment event without work_item_id, skipping")
return
task = get_task_by_plane_id(plane_id)
if not task:
logger.warning(f"No task found for plane_id={plane_id}")
return
task_id = task["id"]
current_stage = task["stage"]
repo = task["repo"]
work_item_id = task.get("work_item_id", "")
branch = task.get("branch", "")
if ":rejected:" in comment_body:
# Extract reason (text after :rejected:)
reason = comment_body.split(":rejected:", 1)[-1].strip()[:300]
await _rollback_stage(task_id, current_stage, repo, work_item_id, branch, reason)
return
if ":approved:" in comment_body:
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
# Try to advance stage
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
return
# Task 3: If neither :approved: nor :rejected: — check if this is an answer to questions
if current_stage == "analysis":
from ..plane_sync import PLANE_STATES, set_issue_in_progress
issue_id = task.get("plane_issue_id") or task.get("plane_id")
if not issue_id:
issue_id = plane_id
if issue_id:
from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE
from ..plane_sync import PROJECT_ID as _DEFAULT_PROJECT_ID
# ORCH-6: route to this task's own Plane project (resolved from repo).
_proj = get_project_by_repo(repo)
_pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID)
import httpx as _httpx
try:
_resp = _httpx.get(
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/",
headers=PLANE_HEADERS, timeout=10
)
if _resp.status_code == 200:
issue_data = _resp.json()
if issue_data.get("state") == PLANE_STATES["needs_input"]:
# Task 11: Check analyst retry count (max 3 question rounds)
conn3 = get_db()
analyst_runs = conn3.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='analyst'",
(task_id,)
).fetchone()[0]
conn3.close()
if analyst_runs >= 4: # initial + 3 retries
from ..plane_sync import set_issue_blocked, add_comment as _pc
set_issue_blocked(work_item_id)
_pc(
work_item_id,
"\U0001f6a8 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0439 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. Analyst \u043d\u0435 \u043c\u043e\u0436\u0435\u0442 \u0441\u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0422\u0417. "
"\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430.",
author="analyst",
)
from ..notifications import send_telegram
send_telegram(f"\U0001f6a8 {work_item_id}: 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432 analyst'\u0430 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. \u041d\u0443\u0436\u043d\u0430 \u043f\u043e\u043c\u043e\u0449\u044c.")
return
# This is an answer to analyst's questions — relaunch
set_issue_in_progress(work_item_id)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Stakeholder answered your questions. "
f"Read the latest comment in Plane and revise your artifacts.\n"
f"Answer: {comment_body[:500]}"
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _pc2
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.", author="analyst")
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
return
except Exception as e:
logger.error(f"Failed to check issue state: {e}")
plane_id = str(
data.get("work_item_id") or data.get("issue_id") or data.get("issue") or ""
)
logger.info(
f"comment.created for {plane_id}: logged only, no pipeline action "
f"(status-only verdict model)"
)
async def _rollback_stage(
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str,
reason: str,
):
"""Shared :rejected: / Rejected-status rollback (Feature 2 variant B).
"""Rollback triggered by a status change to Rejected.
Both the :rejected: comment and a status change to Rejected funnel here so
the two mechanisms behave identically:
- at analysis: relaunch the analyst with the rejection reason;
- otherwise: roll back to the previous stage and relaunch its agent
(via the existing rollback notify + an enqueue of the prev-stage agent).
@@ -516,10 +616,10 @@ async def _try_advance_stage(
is synchronous. We run it off the event loop via asyncio.to_thread so there
is exactly one implementation shared with the launcher.
finished_agent is None on this webhook path (a human :approved: comment, not
a finished agent), so the agent-specific rollback branches inside the engine
intentionally do not trigger — identical to the old plane behavior, which
only ran the QG and either advanced or reported the failure.
finished_agent is None on this webhook path (a human Approved status change,
not a finished agent), so the agent-specific rollback branches inside the
engine intentionally do not trigger — the webhook path only runs the QG and
either advances or reports the failure.
"""
import asyncio
from ..stage_engine import advance_stage

View File

@@ -0,0 +1,210 @@
"""Tests for the two pipeline-start bugs surfaced by the ET-006 live run.
BUG 1: issue.updated (status -> In Progress) ships a payload WITHOUT the
description, so start_pipeline must pull it from the Plane issue API
before QG-0 runs (otherwise QG-0 wrongly blocks the issue).
BUG 2a: M-6 derives work_item_id from the Plane sequence_id, which can collide.
ensure_unique_work_item_id() must hand out the next FREE id instead of
reusing one that is already in the tasks table.
BUG 2b: two tasks with an (artificially) identical work_item_id must not share a
branch/worktree.
launcher / Gitea / Plane network are mocked. Real FastAPI endpoint via
TestClient for the BUG 1 end-to-end path.
"""
import os
import tempfile
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_pipeline_bugs.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import pytest # noqa: E402
from unittest.mock import patch, AsyncMock # noqa: E402
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db, ensure_unique_work_item_id # noqa: E402
from src import projects as P # noqa: E402
from src.projects import reload_projects # noqa: E402
from src.git_worktree import get_worktree_path # noqa: E402
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122"
client = TestClient(app)
@pytest.fixture(autouse=True)
def setup(monkeypatch):
monkeypatch.setattr(P.settings, "db_path", _test_db)
import src.db as _db
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
registry_json = (
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
)
monkeypatch.setattr(P.settings, "projects_json", registry_json)
reload_projects()
yield
reload_projects()
if os.path.exists(_test_db):
os.unlink(_test_db)
def _insert_task(work_item_id, branch, plane_id="x"):
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
"VALUES (?, ?, ?, ?, ?, ?)",
(plane_id, work_item_id, "enduro-trails", branch, "analysis", plane_id),
)
conn.commit()
conn.close()
def _count(plane_id):
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()[0]
conn.close()
return n
def _task(plane_id):
conn = get_db()
row = conn.execute("SELECT * FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()
conn.close()
return row
# --------------------------------------------------------------------------- #
# BUG 1
# --------------------------------------------------------------------------- #
def _to_in_progress_no_desc(plane_id="bug1"):
"""issue.updated payload WITHOUT description (only changed fields)."""
return client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": plane_id, "name": "A valid backlog item title",
# NO description / description_stripped here, exactly like Plane sends
# on a status change.
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
})
@patch("src.webhooks.plane.enqueue_job", return_value=1)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
@patch("src.plane_sync.fetch_issue_description",
return_value="This is a sufficiently long description fetched from Plane API.")
def test_status_start_fetches_description(
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
):
"""BUG 1: empty description in payload -> start_pipeline pulls it from the
Plane API -> QG-0 passes -> task created + analyst enqueued (NOT blocked)."""
resp = _to_in_progress_no_desc("bug1")
assert resp.status_code == 200
# description was pulled from the API
mock_desc.assert_called_once()
# QG-0 passed -> task created and analyst launched (NOT set_issue_blocked)
assert _count("bug1") == 1
assert _task("bug1")["stage"] == "analysis"
mock_enqueue.assert_called_once()
assert mock_enqueue.call_args.args[0] == "analyst"
@patch("src.webhooks.plane.enqueue_job", return_value=1)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
@patch("src.plane_sync.fetch_issue_description", return_value="")
def test_status_start_empty_api_still_blocks(
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
):
"""BUG 1 negative path: if the API also returns empty, QG-0 legitimately
fails -> NO task is created (truly empty ticket)."""
resp = _to_in_progress_no_desc("bug1-empty")
assert resp.status_code == 200
mock_desc.assert_called_once()
assert _count("bug1-empty") == 0
mock_enqueue.assert_not_called()
# --------------------------------------------------------------------------- #
# BUG 2a
# --------------------------------------------------------------------------- #
def test_work_item_id_uniqueness():
"""BUG 2a: if ET-006 is already in tasks, the guard returns the next free
id (ET-007), not ET-006 again."""
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="old")
assert ensure_unique_work_item_id("ET-006", "enduro-trails") == "ET-007"
# ET-006 AND ET-007 taken -> next free is ET-008.
_insert_task("ET-007", "feature/ET-007-something", plane_id="old2")
assert ensure_unique_work_item_id("ET-006", "enduro-trails") == "ET-008"
# A free id is returned unchanged.
assert ensure_unique_work_item_id("ET-099", "enduro-trails") == "ET-099"
# Per-repo isolation: a different repo with the same id is not a collision.
assert ensure_unique_work_item_id("ET-006", "other-repo") == "ET-006"
@patch("src.webhooks.plane.enqueue_job", return_value=1)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=6)
@patch("src.plane_sync.fetch_issue_description",
return_value="A sufficiently long description for QG-0 to pass cleanly.")
def test_collision_reassigns_in_start_pipeline(
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
):
"""BUG 2a end-to-end: ET-006 already exists -> a new In Progress issue whose
Plane sequence_id is also 6 must NOT reuse ET-006."""
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="task8")
resp = client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": "task25", "name": "Popup enduro trails feature",
"description_stripped": "A sufficiently long description for QG-0.",
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
})
assert resp.status_code == 200
new_id = _task("task25")["work_item_id"]
assert new_id != "ET-006"
assert new_id == "ET-007"
# --------------------------------------------------------------------------- #
# BUG 2b
# --------------------------------------------------------------------------- #
def test_worktree_per_task():
"""BUG 2b: two tasks must not resolve to the same worktree path. With the
uniqueness guard the branches differ, so the worktree paths differ too."""
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="task8")
# The second task gets a unique id via the guard...
new_id = ensure_unique_work_item_id("ET-006", "enduro-trails")
assert new_id == "ET-007"
branch_a = "feature/ET-006-gpx-upload"
branch_b = f"feature/{new_id}-popup-enduro-trails"
wt_a = get_worktree_path("enduro-trails", branch_a)
wt_b = get_worktree_path("enduro-trails", branch_b)
assert wt_a != wt_b, "two tasks must not share a worktree path"

View File

@@ -0,0 +1,200 @@
"""Status-only verdict model (bug 3 fix).
The comment-based control mechanism (:approved: / :rejected: / answer-to-questions)
was removed. The pipeline is driven SOLELY by Plane status changes. These tests
lock in the new behaviour:
* test_inreview_comment_does_not_revert — bug 3 root: an In Review task,
any comment arrives -> status NOT reverted, no agent launched.
* test_any_comment_no_pipeline_action — :approved: / :rejected: / plain
text comment -> no status change, no enqueue.
* test_approved_status_advances_without_inprogress_reset — Approved status
advances WITHOUT an intermediate set_issue_in_progress reset.
* test_rejected_status_pulls_reason_from_comment — Rejected status pulls the
reason from the issue's latest comment (mocked GET comments).
"""
import os
import tempfile
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_status_only.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import pytest # noqa: E402
from unittest.mock import patch, AsyncMock # noqa: E402
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import projects as P # noqa: E402
from src.projects import reload_projects # noqa: E402
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
APPROVED = "a519a341-dada-4a91-8910-7604f82b79c5"
REJECTED = "ba958f3c-5db5-461d-8f82-89425e413b97"
IN_REVIEW = "38fb1f64-aa1e-48a3-92e0-0b109679046b"
client = TestClient(app)
@pytest.fixture(autouse=True)
def setup(monkeypatch):
monkeypatch.setattr(P.settings, "db_path", _test_db)
import src.db as _db
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
registry_json = (
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
)
monkeypatch.setattr(P.settings, "projects_json", registry_json)
reload_projects()
# Seed a task at the 'review' stage for plane_id 'r-1'.
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
"VALUES (?, ?, ?, ?, ?, ?)",
("r-1", "ET-700", "enduro-trails", "feature/ET-700-x", "review", "r-1"),
)
conn.commit()
conn.close()
yield
reload_projects()
if os.path.exists(_test_db):
os.unlink(_test_db)
class _FakeResp:
def __init__(self, status_code, payload):
self.status_code = status_code
self._payload = payload
def json(self):
return self._payload
def _comment(text, plane_id="r-1"):
return client.post("/webhook/plane", json={
"event": "issue_comment", "action": "created",
"data": {"work_item_id": plane_id, "comment_stripped": text,
"project": ENDURO_PLANE_ID},
})
def _status(state_id, plane_id="r-1", old="prev"):
return client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": plane_id, "name": "Status task", "project": ENDURO_PLANE_ID,
"state": {"id": state_id, "name": "X", "group": "started"},
},
"activity": {"field": "state", "new_value": state_id, "old_value": old},
})
def _stage(plane_id="r-1"):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()
conn.close()
return row[0] if row else None
# --------------------------------------------------------------------------- #
# Bug 3 root: In Review must not revert on a comment.
# --------------------------------------------------------------------------- #
@patch("src.webhooks.plane.enqueue_job")
@patch("src.plane_sync.set_issue_in_progress")
@patch("src.plane_sync._set_issue_state_direct")
@patch("src.plane_sync.update_issue_state")
def test_inreview_comment_does_not_revert(
mock_update_state, mock_set_direct, mock_sip, mock_enqueue
):
"""Bug 3: task in In Review, ANY comment arrives -> status NOT reverted to
In Progress, NO agent launched. The analyst's own 'waiting for approval'
comment used to echo back and self-hit -> reverted In Review -> In Progress.
"""
# analyst's own echo comment
resp = _comment("Готово, жду approved")
assert resp.status_code == 200
# no status changes whatsoever
mock_sip.assert_not_called()
mock_set_direct.assert_not_called()
mock_update_state.assert_not_called()
# no agent launched
mock_enqueue.assert_not_called()
# stage untouched
assert _stage() == "review"
# --------------------------------------------------------------------------- #
# Any comment -> zero pipeline side-effects.
# --------------------------------------------------------------------------- #
@pytest.mark.parametrize("text", [":approved:", ":rejected: bad", "plain text", ""])
@patch("src.webhooks.plane.enqueue_job")
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
@patch("src.plane_sync.set_issue_in_progress")
@patch("src.plane_sync._set_issue_state_direct")
def test_any_comment_no_pipeline_action(
mock_set_direct, mock_sip, mock_rollback, mock_advance, mock_enqueue, text
):
resp = _comment(text)
assert resp.status_code == 200
mock_advance.assert_not_called()
mock_rollback.assert_not_called()
mock_sip.assert_not_called()
mock_set_direct.assert_not_called()
mock_enqueue.assert_not_called()
assert _stage() == "review"
# --------------------------------------------------------------------------- #
# Approved status advances WITHOUT in_progress reset.
# --------------------------------------------------------------------------- #
@patch("src.plane_sync.set_issue_in_progress")
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
def test_approved_status_advances_without_inprogress_reset(mock_advance, mock_sip):
resp = _status(APPROVED)
assert resp.status_code == 200
mock_advance.assert_awaited_once()
# work_item_id passed positionally
assert "ET-700" in mock_advance.call_args.args
# bug 3 (cause B): NO intermediate set_issue_in_progress before advance.
mock_sip.assert_not_called()
# --------------------------------------------------------------------------- #
# Rejected status pulls reason from latest comment.
# --------------------------------------------------------------------------- #
@patch("src.webhooks.plane.httpx.get")
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
def test_rejected_status_pulls_reason_from_comment(mock_rollback, mock_get):
mock_get.return_value = _FakeResp(200, {"results": [
{"comment_stripped": "old comment", "created_at": "2026-06-03T09:00:00Z"},
{"comment_html": "<p>Needs more test coverage</p>",
"created_at": "2026-06-03T11:30:00Z"},
]})
resp = _status(REJECTED)
assert resp.status_code == 200
mock_rollback.assert_awaited_once()
reason = mock_rollback.call_args.args[-1]
# latest by created_at, HTML stripped
assert "Needs more test coverage" in reason
assert "<p>" not in reason
@patch("src.webhooks.plane.httpx.get")
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
def test_rejected_status_no_comment_uses_fallback(mock_rollback, mock_get):
mock_get.return_value = _FakeResp(200, {"results": []})
resp = _status(REJECTED)
assert resp.status_code == 200
mock_rollback.assert_awaited_once()
reason = mock_rollback.call_args.args[-1]
assert "no reason comment" in reason

View File

@@ -2,8 +2,9 @@
* work_item.created / issue created -> NO task, NO branch, NO analyst.
* issue updated -> In Progress (from backlog) -> task created + analyst enqueued.
* a second In Progress update for the same issue -> NO duplicate, NO restart
(protects handle_comment, which also flips issues to In Progress).
* a second In Progress update while the agent is busy -> NO duplicate, NO
restart (busy-guard).
* In Progress returned from Needs Input (agent idle) -> agent RELAUNCHED.
launcher / Gitea network are mocked. Real FastAPI endpoint via TestClient.
"""
@@ -125,15 +126,34 @@ def test_in_progress_starts_pipeline(mock_seq, mock_branch, mock_docs, mock_enqu
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
def test_repeat_in_progress_is_idempotent(mock_seq, mock_branch, mock_docs, mock_enqueue):
def test_repeat_in_progress_while_job_active_does_not_relaunch(
mock_seq, mock_branch, mock_docs, mock_enqueue
):
"""Status-only model busy-guard: a duplicate In Progress webhook that arrives
while the stage agent still has a queued/running job must NOT relaunch the
agent (no double launch).
"""
mock_enqueue.return_value = 1
_to_in_progress("st-2")
assert _count("st-2") == 1
assert mock_enqueue.call_count == 1
# Second In Progress update (e.g. handle_comment re-set the status). Use a
# DISTINCT body (different activity old_value) so webhook dedup does NOT
# short-circuit it — this exercises the existing-task idempotency guard in
# enqueue_job is mocked above, so no real job row exists. Seed an ACTIVE
# (queued) job for the task so has_active_job_for_task() reports the agent as
# busy -> the busy-guard fires.
conn = get_db()
task_id = conn.execute(
"SELECT id FROM tasks WHERE plane_id='st-2'"
).fetchone()[0]
conn.execute(
"INSERT INTO jobs (agent, repo, task_id, status) VALUES (?, ?, ?, 'queued')",
("analyst", "enduro-trails", task_id),
)
conn.commit()
conn.close()
# Second In Progress update. DISTINCT body (different activity old_value) so
# webhook dedup does NOT short-circuit it — this exercises the busy-guard in
# handle_status_start, not the delivery-dedup layer.
resp = client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
@@ -147,4 +167,77 @@ def test_repeat_in_progress_is_idempotent(mock_seq, mock_branch, mock_docs, mock
})
assert resp.status_code == 200
assert _count("st-2") == 1 # still exactly one task
assert mock_enqueue.call_count == 1 # analyst NOT re-enqueued
assert mock_enqueue.call_count == 1 # analyst NOT re-enqueued (busy-guard)
@patch("src.webhooks.plane.add_comment", create=True)
@patch("src.webhooks.plane.enqueue_job")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
def test_inprogress_from_needs_input_relaunches_analyst(
mock_seq, mock_branch, mock_docs, mock_enqueue, mock_comment
):
"""Status-only answer-to-questions flow: an existing analysis task whose agent
is IDLE (no active job — it went to Needs Input) is returned to In Progress
-> the analyst is relaunched to read Slava's fresh comments.
+ double-webhook protection: a second In Progress while the relaunch job is
active does NOT relaunch again.
"""
mock_enqueue.return_value = 1
# First In Progress: starts the pipeline (creates task + enqueues analyst).
_to_in_progress("st-ni")
assert _count("st-ni") == 1
assert mock_enqueue.call_count == 1
# The analyst finished and asked questions -> Needs Input. In our model that
# means NO active job for the task (enqueue_job is mocked, so no job row).
conn = get_db()
task_id = conn.execute(
"SELECT id FROM tasks WHERE plane_id='st-ni'"
).fetchone()[0]
has_job = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE task_id=? AND status IN ('queued','running')",
(task_id,),
).fetchone()[0]
conn.close()
assert has_job == 0 # agent idle
# Slava answers + returns the issue to In Progress (distinct body).
resp = client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": "st-ni", "name": "A valid backlog item title",
"description_stripped": "A sufficiently long description for QG-0.",
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "needs-input"},
})
assert resp.status_code == 200
assert _count("st-ni") == 1 # no duplicate task
assert mock_enqueue.call_count == 2 # analyst RELAUNCHED
assert mock_enqueue.call_args.args[0] == "analyst"
# Seed an active job for the relaunch, then a SECOND In Progress webhook must
# NOT relaunch again (busy-guard against double webhooks).
conn = get_db()
conn.execute(
"INSERT INTO jobs (agent, repo, task_id, status) VALUES (?, ?, ?, 'running')",
("analyst", "enduro-trails", task_id),
)
conn.commit()
conn.close()
resp2 = client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": "st-ni", "name": "A valid backlog item title",
"description_stripped": "A sufficiently long description for QG-0.",
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "x-y-z"},
})
assert resp2.status_code == 200
assert mock_enqueue.call_count == 2 # still 2 — busy-guard held

View File

@@ -1,12 +1,14 @@
"""Feature 2 (variant B): verdict statuses Approved / Rejected.
"""Status-only verdict model: verdict statuses Approved / Rejected.
* issue updated -> Approved : calls _try_advance_stage (== :approved: comment).
* issue updated -> Rejected : calls _rollback_stage (== :rejected: comment).
* the :approved: / :rejected: COMMENT mechanisms still work (both paths live).
* issue updated -> Approved : calls _try_advance_stage, with NO intermediate
set_issue_in_progress reset (bug 3 fix).
* issue updated -> Rejected : calls _rollback_stage, with the reason pulled
from the issue's latest comment.
* COMMENTS NEVER trigger the pipeline: a :approved: / :rejected: comment is a
pure no-op (the comment-based control mechanism was removed).
We mock the shared engine entry points (_try_advance_stage / _rollback_stage)
and assert they fire for both the status and the comment trigger, so the two
mechanisms are proven to funnel into the same logic.
and assert they fire ONLY for the status trigger, never for a comment.
"""
import os
@@ -83,8 +85,21 @@ def _comment(text, plane_id="v-1"):
})
class _FakeResp:
def __init__(self, status_code, payload):
self.status_code = status_code
self._payload = payload
def json(self):
return self._payload
def _comments_response(comments):
return _FakeResp(200, {"results": comments})
# --------------------------------------------------------------------------- #
# Approved status -> advance
# Approved status -> advance (no in_progress reset)
# --------------------------------------------------------------------------- #
@patch("src.plane_sync.set_issue_in_progress")
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
@@ -95,36 +110,52 @@ def test_approved_status_advances(mock_advance, mock_sip):
# advanced the right task (ET-500 at review)
args = mock_advance.call_args.args
assert "ET-500" in args # work_item_id is passed positionally
# bug 3 fix: handle_verdict no longer resets the status to In Progress.
mock_sip.assert_not_called()
@patch("src.plane_sync.set_issue_in_progress")
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
def test_approved_comment_still_advances(mock_advance, mock_sip):
def test_approved_comment_is_noop(mock_advance, mock_rollback, mock_sip):
"""Status-only model: a :approved: comment NEVER advances the pipeline."""
resp = _comment(":approved:")
assert resp.status_code == 200
mock_advance.assert_awaited_once()
mock_advance.assert_not_called()
mock_rollback.assert_not_called()
mock_sip.assert_not_called()
# --------------------------------------------------------------------------- #
# Rejected status -> rollback
# Rejected status -> rollback (reason from latest comment)
# --------------------------------------------------------------------------- #
@patch("src.webhooks.plane.httpx.get")
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
def test_rejected_status_rolls_back(mock_rollback):
def test_rejected_status_rolls_back(mock_rollback, mock_get):
mock_get.return_value = _comments_response(
[{"comment_stripped": "ADR missing tradeoffs",
"created_at": "2026-06-03T10:00:00Z"}]
)
resp = _status(REJECTED)
assert resp.status_code == 200
mock_rollback.assert_awaited_once()
# reason note for a status reject (no inline reason available)
kwargs_reason = mock_rollback.call_args.args[-1]
assert "rejected via status" in kwargs_reason
# reason pulled from the latest comment
reason = mock_rollback.call_args.args[-1]
assert "ADR missing tradeoffs" in reason
@patch("src.webhooks.plane.httpx.get")
@patch("src.plane_sync.set_issue_in_progress")
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
def test_rejected_comment_still_rolls_back(mock_rollback):
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
def test_rejected_comment_is_noop(mock_advance, mock_rollback, mock_sip, mock_get):
"""Status-only model: a :rejected: comment NEVER rolls back the pipeline."""
resp = _comment(":rejected: bad ADR")
assert resp.status_code == 200
mock_rollback.assert_awaited_once()
reason = mock_rollback.call_args.args[-1]
assert "bad ADR" in reason
mock_advance.assert_not_called()
mock_rollback.assert_not_called()
mock_sip.assert_not_called()
mock_get.assert_not_called()
# --------------------------------------------------------------------------- #

View File

@@ -95,27 +95,32 @@ def test_plane_webhook_generates_sequential_ids(mock_docs, mock_branch):
assert ids[1] == "ET-002"
APPROVED_STATE = "a519a341-dada-4a91-8910-7604f82b79c5"
REJECTED_STATE = "ba958f3c-5db5-461d-8f82-89425e413b97"
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane.launcher")
def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tmp_path, monkeypatch):
"""Comment :approved: at stage=analysis advance to architecture."""
"""Status-only model: Approved STATUS at stage=analysis -> advance to
architecture. A comment never triggers this.
"""
# Patch repos_dir for QG check
monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path))
# Create task first
client.post("/webhook/plane", json={
"event": "work_item.created",
"data": {"id": "adv-001", "name": "Advance test", "project": "proj-1"}
})
# Get the task to find work_item_id
# Seed an analysis task directly (creation no longer makes a task post-PR#11).
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'adv-001'").fetchone()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
"VALUES (?, ?, ?, ?, ?, ?)",
("adv-001", "ET-001", "enduro-trails", "feature/ET-001-x", "analysis", "adv-001"),
)
conn.commit()
conn.close()
work_item_id = task["work_item_id"]
work_item_id = "ET-001"
# Create required analysis files
# Create required analysis files so the analysis QG passes.
wi_dir = tmp_path / "enduro-trails" / "docs" / "work-items" / work_item_id
wi_dir.mkdir(parents=True)
(wi_dir / "01-brd.md").write_text("# BRD")
@@ -123,16 +128,15 @@ def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tm
(wi_dir / "03-acceptance-criteria.md").write_text("# AC")
(wi_dir / "04-test-plan.yaml").write_text("tests: []")
# Mock launcher
mock_launcher.launch.return_value = 1
# Send approved comment
# Send Approved STATUS change.
resp = client.post("/webhook/plane", json={
"event": "comment.created",
"event": "issue", "action": "updated",
"data": {
"work_item_id": "adv-001",
"comment": "Looks good :approved:"
}
"id": "adv-001", "name": "Advance test", "project": "proj-1",
"state": {"id": APPROVED_STATE, "name": "Approved", "group": "completed"},
},
})
assert resp.status_code == 200
@@ -143,29 +147,39 @@ def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tm
assert task["stage"] == "architecture"
@patch("src.webhooks.plane.httpx.get")
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_rejected_rolls_back(mock_docs, mock_branch):
"""Comment :rejected: rolls back stage."""
# Create task
client.post("/webhook/plane", json={
"event": "work_item.created",
"data": {"id": "rej-001", "name": "Reject test", "project": "proj-1"}
})
def test_plane_rejected_rolls_back(mock_docs, mock_branch, mock_get):
"""Status-only model: Rejected STATUS rolls back stage. A comment never
triggers this; the reason is pulled from the latest comment.
"""
class _R:
status_code = 200
@staticmethod
def json():
return {"results": [
{"comment_stripped": "missing ADR", "created_at": "2026-06-03T10:00:00Z"}
]}
mock_get.return_value = _R()
# Manually set stage to architecture
# Seed an architecture task directly.
conn = get_db()
conn.execute("UPDATE tasks SET stage = 'architecture' WHERE plane_id = 'rej-001'")
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
"VALUES (?, ?, ?, ?, ?, ?)",
("rej-001", "ET-002", "enduro-trails", "feature/ET-002-x", "architecture", "rej-001"),
)
conn.commit()
conn.close()
# Send rejected comment
# Send Rejected STATUS change.
resp = client.post("/webhook/plane", json={
"event": "comment.created",
"event": "issue", "action": "updated",
"data": {
"work_item_id": "rej-001",
"comment": "Not ready :rejected:"
}
"id": "rej-001", "name": "Reject test", "project": "proj-1",
"state": {"id": REJECTED_STATE, "name": "Rejected", "group": "cancelled"},
},
})
assert resp.status_code == 200