From 6b3e14494976bc75e0c3ee2f63c0591c0d22ca63 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 22:18:02 +0300 Subject: [PATCH] fix(webhook): remove comment-based approve, keep status-only verdict Status-only verdict model: comments NEVER drive the pipeline. Removed the whole comment-based control mechanism from handle_comment (:approved: / :rejected: / answer-to-questions) which caused bug 3 (echo self-hit): the analyst posts its own "waiting for approval" comment, handle_comment catches its own comment and reverts In Review -> In Progress. handle_comment is now a pure logger with no side effects. handle_status_start: a return to In Progress on an EXISTING task (Slava answered the analyst questions in Needs Input) now RELAUNCHES the stage agent instead of being a no-op. Distinguished from a duplicate In Progress webhook via has_active_job_for_task() (new db helper): no active job => agent idle => relaunch; active job => busy => skip (no double launch). --- src/db.py | 17 +++ src/webhooks/plane.py | 240 +++++++++++++++++++++--------------------- 2 files changed, 135 insertions(+), 122 deletions(-) diff --git a/src/db.py b/src/db.py index 51ae5a1..816e2ac 100644 --- a/src/db.py +++ b/src/db.py @@ -351,6 +351,23 @@ def mark_job( conn.close() +def has_active_job_for_task(task_id: int) -> bool: + """True if the task already has a queued or running job. + + Used by the status-only verdict model (handle_status_start) to guard against + double-launching an agent when a duplicate In Progress webhook arrives or a + job is still in flight. The events de-dup absorbs identical webhook bodies; + this guards against distinct webhooks while a job is pending/running. + """ + conn = get_db() + row = conn.execute( + "SELECT 1 FROM jobs WHERE task_id = ? AND status IN ('queued','running') LIMIT 1", + (task_id,), + ).fetchone() + conn.close() + return row is not None + + def count_running_jobs() -> int: """Number of jobs currently in 'running' status (for max_concurrency).""" conn = get_db() diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index 9efe6be..e888b6b 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -98,10 +98,12 @@ async def plane_webhook(request: Request): # QG-0 sanity log here (no branch, no analyst, no task row). await handle_work_item_created(data, project_id) elif (event == "work_item.updated") or (event == "issue" and action == "updated"): - # Feature 1 & 2: status changes drive the pipeline. - # Backlog/Todo/Triage -> In Progress : START the pipeline (idempotent) - # -> Approved : advance (== :approved: comment) - # -> Rejected : rollback (== :rejected: comment) + # Status-only verdict model: status changes drive the pipeline. + # Backlog/Todo/Triage -> In Progress : START pipeline, or relaunch the + # stage agent if returned from + # Needs Input. + # -> Approved : advance to the next stage. + # -> Rejected : rollback (reason from latest comment). await handle_issue_updated(data, project_id) elif (event == "comment.created") or (event == "issue_comment" and action == "created"): await handle_comment(data, project_id) @@ -127,11 +129,11 @@ async def handle_issue_updated(data: dict, project_id: str = ""): """Feature 1 & 2: react to a Plane issue status change. Routes the NEW state UUID (data.state.id) to: - - in_progress : start the pipeline if this issue has no task yet - (idempotent — an existing task is NOT restarted; protects handle_comment - which also flips issues to In Progress during approve/answer flows). - - approved : same as a :approved: comment (advance current stage). - - rejected : same as a :rejected: comment (rollback + relaunch). + - in_progress : start the pipeline if this issue has no task yet; if a + task already exists and the stage agent is idle (returned from Needs + Input), relaunch the stage agent so it reads Slava's fresh comments. + - approved : advance to the next stage. + - rejected : rollback to the previous stage (reason from latest comment). Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.) is ignored here — those are statuses the orchestrator itself sets. """ @@ -154,31 +156,98 @@ async def handle_issue_updated(data: dict, project_id: str = ""): async def handle_status_start(data: dict, project_id: str = ""): - """Feature 1: an issue moved into In Progress -> start the pipeline. + """An issue moved into In Progress. - Idempotent: if a task already exists for this plane_id, do nothing (no dup, - no analyst restart). This is what makes handle_comment's set_issue_in_progress - safe — by then the task already exists, so the start is skipped. + Two cases under the status-only verdict model: + + 1. No task yet for this plane_id -> START the pipeline (start_pipeline). + + 2. A task already exists -> this is Slava returning the issue from + Needs Input to In Progress after answering the analyst's questions. We + must RELAUNCH the current stage's agent so it reads the fresh comments + from Plane (the answer-to-questions flow used to live in handle_comment; + it is now status-driven). + + KEY FORK — telling "answer to questions" apart from a plain duplicate In + Progress webhook (the dedup-protection case): + + The tasks table stores no Plane status, and the issue.updated payload only + carries the NEW state (In Progress), so we cannot read the previous status + from here. Instead we use the only reliable local signal: whether the + stage's agent is currently in flight. + + - The orchestrator sets In Progress itself while an agent runs. When the + agent FINISHES it leaves the issue in Needs Input or In Review and has + NO queued/running job. So: an existing task with NO active job means the + agent is idle / waiting -> a return to In Progress is a genuine relaunch + request -> enqueue the stage agent. + - If a queued/running job already exists for the task, the agent is busy + (or a duplicate webhook arrived) -> SKIP (no double launch). The events + de-dup at the top of plane_webhook already absorbs identical webhook + bodies; this job guard additionally covers distinct webhooks fired while + a job is still pending/running. """ + from ..db import has_active_job_for_task + plane_id = str(data.get("id") or "") existing = get_task_by_plane_id(plane_id) - if existing: + + if not existing: + logger.info(f"Status->In Progress for {plane_id}: starting pipeline") + await start_pipeline(data, project_id) + return + + task_id = existing["id"] + current_stage = existing["stage"] + repo = existing["repo"] + work_item_id = existing.get("work_item_id", "") + branch = existing.get("branch", "") + + # Duplicate / busy guard: a job is already pending or running for this task. + if has_active_job_for_task(task_id): logger.info( - f"Status->In Progress for {plane_id}: task already exists " - f"(stage={existing.get('stage')}), not restarting" + f"Status->In Progress for {plane_id}: task {task_id} already has an " + f"active job (stage={current_stage}), not relaunching" ) return - logger.info(f"Status->In Progress for {plane_id}: starting pipeline") - await start_pipeline(data, project_id) + + # Agent is idle -> Slava answered questions and returned the issue to In + # Progress. Relaunch the current stage's agent to read the fresh comments. + from ..plane_sync import STAGE_AUTHORS, add_comment as _add_comment + stage_agent = STAGE_AUTHORS.get(current_stage) + if not stage_agent: + logger.info( + f"Status->In Progress for {plane_id}: no agent for stage " + f"'{current_stage}', not relaunching" + ) + return + + task_desc = ( + f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" + f"Stage: {current_stage}\nNote: Stakeholder returned the issue to In " + f"Progress (answered your questions). Read the latest comments in Plane " + f"and revise your artifacts." + ) + job_id = enqueue_job(stage_agent, repo, task_desc, task_id=task_id) + logger.info( + f"Task {task_id}: returned to In Progress (Needs Input answered), " + f"relaunched {stage_agent} for stage {current_stage} (job_id={job_id})" + ) + try: + _add_comment( + work_item_id, + "\U0001f504 \u0410\u0433\u0435\u043d\u0442 \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.", + author=stage_agent, + ) + except Exception as e: + logger.error(f"Failed to post relaunch comment for {work_item_id}: {e}") async def handle_verdict(data: dict, project_id: str, approved: bool): - """Feature 2 (variant B): a status verdict mirrors the comment verdicts. + """Status-only verdict: a Plane status change drives advance / rollback. - Approved status == :approved: comment -> _try_advance_stage. - Rejected status == :rejected: comment -> rollback to previous stage + relaunch - (reason is unknown from a status change; Slava writes it in a separate - comment, so we pass a fixed note). + Approved status -> _try_advance_stage. + Rejected status -> rollback to the previous stage. """ plane_id = str(data.get("id") or "") task = get_task_by_plane_id(plane_id) @@ -199,7 +268,8 @@ async def handle_verdict(data: dict, project_id: str, approved: bool): await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch) return - # Rejected: mirror the :rejected: comment rollback branch. + # Rejected: roll back to the previous stage (reason note placeholder; the + # reason-from-comment lookup is added in a follow-up commit). reason = "(rejected via status, see latest comment)" await _rollback_stage( task_id, current_stage, repo, work_item_id, branch, reason @@ -395,108 +465,34 @@ async def start_pipeline(data: dict, project_id: str = ""): async def handle_comment(data: dict, project_id: str = ""): + """Status-only verdict model: comments NEVER drive the pipeline. + + The whole comment-based control mechanism (``:approved:`` / ``:rejected:`` + and the analysis answer-to-questions flow) was removed. It caused bug 3 + (echo self-hit): the analyst posts its own "waiting for approval" comment, + handle_comment catches its own comment and reverts In Review -> In Progress. + + Comments are now logged only — no status change, no enqueue, no side effect. + The pipeline is driven solely by status changes (handle_issue_updated): + - Approved -> advance + - Rejected -> rollback (reason pulled from the latest comment) + - In Progress (returned from Needs Input) -> relaunch the stage agent """ - Handle comment event — check for :approved: or :rejected:. - Advance or rollback stage accordingly. - """ - comment_body = data.get("comment_stripped", data.get("comment", data.get("body", data.get("comment_html", "")))) - plane_id = str(data.get("work_item_id") or data.get("issue_id") or data.get("issue") or "") - - if not plane_id: - logger.warning("Comment event without work_item_id, skipping") - return - - task = get_task_by_plane_id(plane_id) - if not task: - logger.warning(f"No task found for plane_id={plane_id}") - return - - task_id = task["id"] - current_stage = task["stage"] - repo = task["repo"] - work_item_id = task.get("work_item_id", "") - branch = task.get("branch", "") - - if ":rejected:" in comment_body: - # Extract reason (text after :rejected:) - reason = comment_body.split(":rejected:", 1)[-1].strip()[:300] - await _rollback_stage(task_id, current_stage, repo, work_item_id, branch, reason) - return - - if ":approved:" in comment_body: - from ..plane_sync import set_issue_in_progress - set_issue_in_progress(work_item_id) - # Try to advance stage - await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch) - return - - # Task 3: If neither :approved: nor :rejected: — check if this is an answer to questions - if current_stage == "analysis": - from ..plane_sync import PLANE_STATES, set_issue_in_progress - issue_id = task.get("plane_issue_id") or task.get("plane_id") - if not issue_id: - issue_id = plane_id - if issue_id: - from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE - from ..plane_sync import PROJECT_ID as _DEFAULT_PROJECT_ID - # ORCH-6: route to this task's own Plane project (resolved from repo). - _proj = get_project_by_repo(repo) - _pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID) - import httpx as _httpx - try: - _resp = _httpx.get( - f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/", - headers=PLANE_HEADERS, timeout=10 - ) - if _resp.status_code == 200: - issue_data = _resp.json() - if issue_data.get("state") == PLANE_STATES["needs_input"]: - # Task 11: Check analyst retry count (max 3 question rounds) - conn3 = get_db() - analyst_runs = conn3.execute( - "SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='analyst'", - (task_id,) - ).fetchone()[0] - conn3.close() - - if analyst_runs >= 4: # initial + 3 retries - from ..plane_sync import set_issue_blocked, add_comment as _pc - set_issue_blocked(work_item_id) - _pc( - work_item_id, - "\U0001f6a8 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0439 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. Analyst \u043d\u0435 \u043c\u043e\u0436\u0435\u0442 \u0441\u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0422\u0417. " - "\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430.", - author="analyst", - ) - from ..notifications import send_telegram - send_telegram(f"\U0001f6a8 {work_item_id}: 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432 analyst'\u0430 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. \u041d\u0443\u0436\u043d\u0430 \u043f\u043e\u043c\u043e\u0449\u044c.") - return - - # This is an answer to analyst's questions — relaunch - set_issue_in_progress(work_item_id) - task_desc = ( - f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" - f"Stage: analysis\nNote: Stakeholder answered your questions. " - f"Read the latest comment in Plane and revise your artifacts.\n" - f"Answer: {comment_body[:500]}" - ) - new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) - from ..plane_sync import add_comment as _pc2 - _pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.", author="analyst") - logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})") - return - except Exception as e: - logger.error(f"Failed to check issue state: {e}") + plane_id = str( + data.get("work_item_id") or data.get("issue_id") or data.get("issue") or "" + ) + logger.info( + f"comment.created for {plane_id}: logged only, no pipeline action " + f"(status-only verdict model)" + ) async def _rollback_stage( task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str, reason: str, ): - """Shared :rejected: / Rejected-status rollback (Feature 2 variant B). + """Rollback triggered by a status change to Rejected. - Both the :rejected: comment and a status change to Rejected funnel here so - the two mechanisms behave identically: - at analysis: relaunch the analyst with the rejection reason; - otherwise: roll back to the previous stage and relaunch its agent (via the existing rollback notify + an enqueue of the prev-stage agent). @@ -565,10 +561,10 @@ async def _try_advance_stage( is synchronous. We run it off the event loop via asyncio.to_thread so there is exactly one implementation shared with the launcher. - finished_agent is None on this webhook path (a human :approved: comment, not - a finished agent), so the agent-specific rollback branches inside the engine - intentionally do not trigger — identical to the old plane behavior, which - only ran the QG and either advanced or reported the failure. + finished_agent is None on this webhook path (a human Approved status change, + not a finished agent), so the agent-specific rollback branches inside the + engine intentionally do not trigger — the webhook path only runs the QG and + either advances or reports the failure. """ import asyncio from ..stage_engine import advance_stage