fix(webhook): remove comment-based approve, keep status-only verdict

Status-only verdict model: comments NEVER drive the pipeline. Removed the
whole comment-based control mechanism from handle_comment (:approved: /
:rejected: / answer-to-questions) which caused bug 3 (echo self-hit): the
analyst posts its own "waiting for approval" comment, handle_comment catches
its own comment and reverts In Review -> In Progress. handle_comment is now a
pure logger with no side effects.

handle_status_start: a return to In Progress on an EXISTING task (Slava
answered the analyst questions in Needs Input) now RELAUNCHES the stage agent
instead of being a no-op. Distinguished from a duplicate In Progress webhook
via has_active_job_for_task() (new db helper): no active job => agent idle =>
relaunch; active job => busy => skip (no double launch).
This commit is contained in:
Dev Agent
2026-06-03 22:18:02 +03:00
parent cd73c75cda
commit 6b3e144949
2 changed files with 135 additions and 122 deletions

View File

@@ -351,6 +351,23 @@ def mark_job(
conn.close()
def has_active_job_for_task(task_id: int) -> bool:
"""True if the task already has a queued or running job.
Used by the status-only verdict model (handle_status_start) to guard against
double-launching an agent when a duplicate In Progress webhook arrives or a
job is still in flight. The events de-dup absorbs identical webhook bodies;
this guards against distinct webhooks while a job is pending/running.
"""
conn = get_db()
row = conn.execute(
"SELECT 1 FROM jobs WHERE task_id = ? AND status IN ('queued','running') LIMIT 1",
(task_id,),
).fetchone()
conn.close()
return row is not None
def count_running_jobs() -> int:
"""Number of jobs currently in 'running' status (for max_concurrency)."""
conn = get_db()

View File

@@ -98,10 +98,12 @@ async def plane_webhook(request: Request):
# QG-0 sanity log here (no branch, no analyst, no task row).
await handle_work_item_created(data, project_id)
elif (event == "work_item.updated") or (event == "issue" and action == "updated"):
# Feature 1 & 2: status changes drive the pipeline.
# Backlog/Todo/Triage -> In Progress : START the pipeline (idempotent)
# -> Approved : advance (== :approved: comment)
# -> Rejected : rollback (== :rejected: comment)
# Status-only verdict model: status changes drive the pipeline.
# Backlog/Todo/Triage -> In Progress : START pipeline, or relaunch the
# stage agent if returned from
# Needs Input.
# -> Approved : advance to the next stage.
# -> Rejected : rollback (reason from latest comment).
await handle_issue_updated(data, project_id)
elif (event == "comment.created") or (event == "issue_comment" and action == "created"):
await handle_comment(data, project_id)
@@ -127,11 +129,11 @@ async def handle_issue_updated(data: dict, project_id: str = ""):
"""Feature 1 & 2: react to a Plane issue status change.
Routes the NEW state UUID (data.state.id) to:
- in_progress : start the pipeline if this issue has no task yet
(idempotent — an existing task is NOT restarted; protects handle_comment
which also flips issues to In Progress during approve/answer flows).
- approved : same as a :approved: comment (advance current stage).
- rejected : same as a :rejected: comment (rollback + relaunch).
- in_progress : start the pipeline if this issue has no task yet; if a
task already exists and the stage agent is idle (returned from Needs
Input), relaunch the stage agent so it reads Slava's fresh comments.
- approved : advance to the next stage.
- rejected : rollback to the previous stage (reason from latest comment).
Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.)
is ignored here — those are statuses the orchestrator itself sets.
"""
@@ -154,31 +156,98 @@ async def handle_issue_updated(data: dict, project_id: str = ""):
async def handle_status_start(data: dict, project_id: str = ""):
"""Feature 1: an issue moved into In Progress -> start the pipeline.
"""An issue moved into In Progress.
Idempotent: if a task already exists for this plane_id, do nothing (no dup,
no analyst restart). This is what makes handle_comment's set_issue_in_progress
safe — by then the task already exists, so the start is skipped.
Two cases under the status-only verdict model:
1. No task yet for this plane_id -> START the pipeline (start_pipeline).
2. A task already exists -> this is Slava returning the issue from
Needs Input to In Progress after answering the analyst's questions. We
must RELAUNCH the current stage's agent so it reads the fresh comments
from Plane (the answer-to-questions flow used to live in handle_comment;
it is now status-driven).
KEY FORK — telling "answer to questions" apart from a plain duplicate In
Progress webhook (the dedup-protection case):
The tasks table stores no Plane status, and the issue.updated payload only
carries the NEW state (In Progress), so we cannot read the previous status
from here. Instead we use the only reliable local signal: whether the
stage's agent is currently in flight.
- The orchestrator sets In Progress itself while an agent runs. When the
agent FINISHES it leaves the issue in Needs Input or In Review and has
NO queued/running job. So: an existing task with NO active job means the
agent is idle / waiting -> a return to In Progress is a genuine relaunch
request -> enqueue the stage agent.
- If a queued/running job already exists for the task, the agent is busy
(or a duplicate webhook arrived) -> SKIP (no double launch). The events
de-dup at the top of plane_webhook already absorbs identical webhook
bodies; this job guard additionally covers distinct webhooks fired while
a job is still pending/running.
"""
from ..db import has_active_job_for_task
plane_id = str(data.get("id") or "")
existing = get_task_by_plane_id(plane_id)
if existing:
if not existing:
logger.info(f"Status->In Progress for {plane_id}: starting pipeline")
await start_pipeline(data, project_id)
return
task_id = existing["id"]
current_stage = existing["stage"]
repo = existing["repo"]
work_item_id = existing.get("work_item_id", "")
branch = existing.get("branch", "")
# Duplicate / busy guard: a job is already pending or running for this task.
if has_active_job_for_task(task_id):
logger.info(
f"Status->In Progress for {plane_id}: task already exists "
f"(stage={existing.get('stage')}), not restarting"
f"Status->In Progress for {plane_id}: task {task_id} already has an "
f"active job (stage={current_stage}), not relaunching"
)
return
logger.info(f"Status->In Progress for {plane_id}: starting pipeline")
await start_pipeline(data, project_id)
# Agent is idle -> Slava answered questions and returned the issue to In
# Progress. Relaunch the current stage's agent to read the fresh comments.
from ..plane_sync import STAGE_AUTHORS, add_comment as _add_comment
stage_agent = STAGE_AUTHORS.get(current_stage)
if not stage_agent:
logger.info(
f"Status->In Progress for {plane_id}: no agent for stage "
f"'{current_stage}', not relaunching"
)
return
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: {current_stage}\nNote: Stakeholder returned the issue to In "
f"Progress (answered your questions). Read the latest comments in Plane "
f"and revise your artifacts."
)
job_id = enqueue_job(stage_agent, repo, task_desc, task_id=task_id)
logger.info(
f"Task {task_id}: returned to In Progress (Needs Input answered), "
f"relaunched {stage_agent} for stage {current_stage} (job_id={job_id})"
)
try:
_add_comment(
work_item_id,
"\U0001f504 \u0410\u0433\u0435\u043d\u0442 \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.",
author=stage_agent,
)
except Exception as e:
logger.error(f"Failed to post relaunch comment for {work_item_id}: {e}")
async def handle_verdict(data: dict, project_id: str, approved: bool):
"""Feature 2 (variant B): a status verdict mirrors the comment verdicts.
"""Status-only verdict: a Plane status change drives advance / rollback.
Approved status == :approved: comment -> _try_advance_stage.
Rejected status == :rejected: comment -> rollback to previous stage + relaunch
(reason is unknown from a status change; Slava writes it in a separate
comment, so we pass a fixed note).
Approved status -> _try_advance_stage.
Rejected status -> rollback to the previous stage.
"""
plane_id = str(data.get("id") or "")
task = get_task_by_plane_id(plane_id)
@@ -199,7 +268,8 @@ async def handle_verdict(data: dict, project_id: str, approved: bool):
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
return
# Rejected: mirror the :rejected: comment rollback branch.
# Rejected: roll back to the previous stage (reason note placeholder; the
# reason-from-comment lookup is added in a follow-up commit).
reason = "(rejected via status, see latest comment)"
await _rollback_stage(
task_id, current_stage, repo, work_item_id, branch, reason
@@ -395,108 +465,34 @@ async def start_pipeline(data: dict, project_id: str = ""):
async def handle_comment(data: dict, project_id: str = ""):
"""Status-only verdict model: comments NEVER drive the pipeline.
The whole comment-based control mechanism (``:approved:`` / ``:rejected:``
and the analysis answer-to-questions flow) was removed. It caused bug 3
(echo self-hit): the analyst posts its own "waiting for approval" comment,
handle_comment catches its own comment and reverts In Review -> In Progress.
Comments are now logged only — no status change, no enqueue, no side effect.
The pipeline is driven solely by status changes (handle_issue_updated):
- Approved -> advance
- Rejected -> rollback (reason pulled from the latest comment)
- In Progress (returned from Needs Input) -> relaunch the stage agent
"""
Handle comment event — check for :approved: or :rejected:.
Advance or rollback stage accordingly.
"""
comment_body = data.get("comment_stripped", data.get("comment", data.get("body", data.get("comment_html", ""))))
plane_id = str(data.get("work_item_id") or data.get("issue_id") or data.get("issue") or "")
if not plane_id:
logger.warning("Comment event without work_item_id, skipping")
return
task = get_task_by_plane_id(plane_id)
if not task:
logger.warning(f"No task found for plane_id={plane_id}")
return
task_id = task["id"]
current_stage = task["stage"]
repo = task["repo"]
work_item_id = task.get("work_item_id", "")
branch = task.get("branch", "")
if ":rejected:" in comment_body:
# Extract reason (text after :rejected:)
reason = comment_body.split(":rejected:", 1)[-1].strip()[:300]
await _rollback_stage(task_id, current_stage, repo, work_item_id, branch, reason)
return
if ":approved:" in comment_body:
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
# Try to advance stage
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
return
# Task 3: If neither :approved: nor :rejected: — check if this is an answer to questions
if current_stage == "analysis":
from ..plane_sync import PLANE_STATES, set_issue_in_progress
issue_id = task.get("plane_issue_id") or task.get("plane_id")
if not issue_id:
issue_id = plane_id
if issue_id:
from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE
from ..plane_sync import PROJECT_ID as _DEFAULT_PROJECT_ID
# ORCH-6: route to this task's own Plane project (resolved from repo).
_proj = get_project_by_repo(repo)
_pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID)
import httpx as _httpx
try:
_resp = _httpx.get(
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/",
headers=PLANE_HEADERS, timeout=10
)
if _resp.status_code == 200:
issue_data = _resp.json()
if issue_data.get("state") == PLANE_STATES["needs_input"]:
# Task 11: Check analyst retry count (max 3 question rounds)
conn3 = get_db()
analyst_runs = conn3.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='analyst'",
(task_id,)
).fetchone()[0]
conn3.close()
if analyst_runs >= 4: # initial + 3 retries
from ..plane_sync import set_issue_blocked, add_comment as _pc
set_issue_blocked(work_item_id)
_pc(
work_item_id,
"\U0001f6a8 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0439 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. Analyst \u043d\u0435 \u043c\u043e\u0436\u0435\u0442 \u0441\u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0422\u0417. "
"\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430.",
author="analyst",
)
from ..notifications import send_telegram
send_telegram(f"\U0001f6a8 {work_item_id}: 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432 analyst'\u0430 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. \u041d\u0443\u0436\u043d\u0430 \u043f\u043e\u043c\u043e\u0449\u044c.")
return
# This is an answer to analyst's questions — relaunch
set_issue_in_progress(work_item_id)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Stakeholder answered your questions. "
f"Read the latest comment in Plane and revise your artifacts.\n"
f"Answer: {comment_body[:500]}"
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _pc2
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.", author="analyst")
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
return
except Exception as e:
logger.error(f"Failed to check issue state: {e}")
plane_id = str(
data.get("work_item_id") or data.get("issue_id") or data.get("issue") or ""
)
logger.info(
f"comment.created for {plane_id}: logged only, no pipeline action "
f"(status-only verdict model)"
)
async def _rollback_stage(
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str,
reason: str,
):
"""Shared :rejected: / Rejected-status rollback (Feature 2 variant B).
"""Rollback triggered by a status change to Rejected.
Both the :rejected: comment and a status change to Rejected funnel here so
the two mechanisms behave identically:
- at analysis: relaunch the analyst with the rejection reason;
- otherwise: roll back to the previous stage and relaunch its agent
(via the existing rollback notify + an enqueue of the prev-stage agent).
@@ -565,10 +561,10 @@ async def _try_advance_stage(
is synchronous. We run it off the event loop via asyncio.to_thread so there
is exactly one implementation shared with the launcher.
finished_agent is None on this webhook path (a human :approved: comment, not
a finished agent), so the agent-specific rollback branches inside the engine
intentionally do not trigger — identical to the old plane behavior, which
only ran the QG and either advanced or reported the failure.
finished_agent is None on this webhook path (a human Approved status change,
not a finished agent), so the agent-specific rollback branches inside the
engine intentionally do not trigger — the webhook path only runs the QG and
either advances or reports the failure.
"""
import asyncio
from ..stage_engine import advance_stage