fix(reconciler): skip escalated / Blocked / Needs-Input tasks in F-1
Reconciler F-1 could not tell "stuck by a lost webhook" from "escalated:
max developer retries reached, waiting for a human". With CI green and a
reviewer that kept sending REQUEST_CHANGES up to the cap, every tick
re-unblocked development -> review -> rollback -> re-unblock (incident
ET-013, infinite bounce: wasted agent runs, Telegram spam, parasitic load
on the shared self-hosting instance).
Add two pre-gate guards in Reconciler._reconcile_gate_task (after the
existing analysis/no-gate/active-job/grace guards, before the gate
pre-evaluation), each an early silent return (no advance, no unblocked_total
increment, no notifications):
- Guard 1 (escalated, deterministic, no network, checked first):
developer_retry_count(task_id) >= MAX_DEVELOPER_RETRIES. Promote
stage_engine._developer_retry_count to public developer_retry_count
(single source of truth; private alias kept). Limit from the constant,
not a literal 3.
- Guard 2 (explicit human Plane gate, Variant A, no DB migration): new
never-raise plane_sync.fetch_issue_state + Reconciler._is_blocked_or_needs_input;
any error/None/unresolved project -> conservative skip. New sub-flag
ORCH_RECONCILE_SKIP_BLOCKED_ENABLED mutes only the networked Guard 2.
F-2 unchanged: Blocked/Needs Input are outside {in_progress, approved,
rejected} so they are never replayed (regression test added). DB schema,
STAGE_TRANSITIONS, QG_CHECKS, never-raise, analysis carve-out and
kill-switches untouched.
Refs: ORCH-060
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -234,12 +234,20 @@ class Settings(BaseSettings):
|
||||
# JSON -> default (mirrors agent_timeout_overrides_json).
|
||||
# reconcile_notify_unblock -> send a Telegram message when a stuck task is
|
||||
# unblocked (F-4 observability).
|
||||
# reconcile_skip_blocked_enabled -> ORCH-060 Guard 2: skip F-1 reconciliation of
|
||||
# issues a human moved to Blocked / Needs Input
|
||||
# (per-candidate Plane state lookup). Disabling it
|
||||
# mutes ONLY the networked Guard 2; Guard 1
|
||||
# (escalated-by-retries, local + deterministic) is
|
||||
# always active. Manual escape hatch during a Plane
|
||||
# outage.
|
||||
reconcile_enabled: bool = True
|
||||
reconcile_interval_s: int = 120
|
||||
reconcile_plane_enabled: bool = True
|
||||
reconcile_grace_default_s: int = 600
|
||||
reconcile_grace_overrides_json: str = ""
|
||||
reconcile_notify_unblock: bool = True
|
||||
reconcile_skip_blocked_enabled: bool = True
|
||||
|
||||
# Telegram notifications
|
||||
telegram_bot_token: str = ""
|
||||
|
||||
@@ -278,6 +278,33 @@ def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
|
||||
return None
|
||||
|
||||
|
||||
def fetch_issue_state(issue_id: str, project_id: str) -> str | None:
|
||||
"""ORCH-060 (F-1 Guard 2): GET the Plane issue and return its current state uuid.
|
||||
|
||||
Used by the reconciler to honour an explicit human gate: an issue a person
|
||||
moved to **Blocked** / **Needs Input** must not be auto-unblocked by the
|
||||
sweeper. Reuses the exact GET issue-detail endpoint / shared token already
|
||||
used by ``fetch_issue_sequence_id`` / ``fetch_issue_fields``.
|
||||
|
||||
Plane returns ``state`` as a bare uuid string; older shapes may nest it as a
|
||||
``{"id": ...}`` dict — both are handled.
|
||||
|
||||
Returns None on network error, non-2xx, or a missing field — never raises, so
|
||||
the caller can apply its conservative fallback (treat as "possibly blocked").
|
||||
"""
|
||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
||||
try:
|
||||
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||||
resp.raise_for_status()
|
||||
state = resp.json().get("state")
|
||||
if isinstance(state, dict):
|
||||
state = state.get("id")
|
||||
return str(state) if state else None
|
||||
except Exception as e:
|
||||
logger.warning(f"fetch_issue_state failed for {issue_id}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
import re as _re
|
||||
|
||||
|
||||
|
||||
@@ -19,7 +19,12 @@ handlers a webhook would use:
|
||||
canonical quality gate; green -> advance through the unchanged
|
||||
``stage_engine.advance_stage(..., finished_agent=None)``; red -> silence
|
||||
(no advance, no notification). ``analysis`` is NOT reconciled here (human
|
||||
gate; owned by F-2).
|
||||
gate; owned by F-2). **ORCH-060:** before the gate is even evaluated, F-1
|
||||
skips (silently) tasks that are waiting for a human — Guard 1: escalated by
|
||||
developer retries (``developer_retry_count >= MAX_DEVELOPER_RETRIES``,
|
||||
deterministic, local; closes the ET-013 bounce loop) checked first, then
|
||||
Guard 2: an explicit Plane ``Blocked`` / ``Needs Input`` state (Variant A —
|
||||
networked, never-raise -> conservative skip).
|
||||
|
||||
* **F-2 plane-side** (``reconcile_plane_once``): poll the Plane API per
|
||||
project (``list_issues_by_state``) and replay In Progress / Approved /
|
||||
@@ -49,9 +54,13 @@ from .db import (
|
||||
get_task_by_plane_id,
|
||||
has_active_job_for_task,
|
||||
)
|
||||
from .stage_engine import advance_if_gate_passed
|
||||
from .stage_engine import (
|
||||
advance_if_gate_passed,
|
||||
developer_retry_count,
|
||||
MAX_DEVELOPER_RETRIES,
|
||||
)
|
||||
from .stages import get_qg_for_stage
|
||||
from .plane_sync import get_project_states, list_issues_by_state
|
||||
from .plane_sync import fetch_issue_state, get_project_states, list_issues_by_state
|
||||
from .webhooks.plane import handle_status_start, handle_verdict
|
||||
from .notifications import send_telegram
|
||||
from . import projects
|
||||
@@ -162,6 +171,17 @@ class Reconciler:
|
||||
age_s = task.get("age_s") or 0
|
||||
if age_s < grace_for_stage(stage):
|
||||
return
|
||||
# ORCH-060 Guard 1: escalated tasks (developer retries reached the cap) are
|
||||
# terminal — they wait for a human, not the sweeper. Without this, a task
|
||||
# whose CI is green but whose reviewer kept sending REQUEST_CHANGES until the
|
||||
# cap would be re-unblocked every tick (incident ET-013, infinite bounce).
|
||||
# Deterministic, local SQL, no network — and checked FIRST (cheapest).
|
||||
if developer_retry_count(task_id) >= MAX_DEVELOPER_RETRIES:
|
||||
return
|
||||
# ORCH-060 Guard 2: respect an explicit human gate (Blocked / Needs Input).
|
||||
# Networked; runs after Guard 1 so escalated tasks never hit Plane.
|
||||
if self._is_blocked_or_needs_input(task):
|
||||
return
|
||||
result = advance_if_gate_passed(
|
||||
task_id,
|
||||
stage,
|
||||
@@ -172,6 +192,41 @@ class Reconciler:
|
||||
if result is not None and getattr(result, "advanced", False):
|
||||
self._note_unblock(task.get("work_item_id") or str(task_id), stage)
|
||||
|
||||
def _is_blocked_or_needs_input(self, task: dict) -> bool:
|
||||
"""ORCH-060 Guard 2: is this issue in an explicit human Plane gate?
|
||||
|
||||
Variant A (no schema migration): resolve the task's Plane project, fetch
|
||||
the issue's current state uuid and compare against the project's
|
||||
``blocked`` / ``needs_input`` states. ``tasks`` has no status column, so
|
||||
the live Plane state is the source of truth.
|
||||
|
||||
**Never-raise, conservative fallback.** Any error / unresolved project /
|
||||
missing state -> return ``True`` (treat as "possibly blocked" -> skip):
|
||||
NOT unblocking a task is always safe, whereas wrongly unblocking a
|
||||
human-gated task re-introduces the bounce we are trying to kill. The
|
||||
sub-flag ``reconcile_skip_blocked_enabled`` disables ONLY this networked
|
||||
guard (escape hatch for a Plane outage); Guard 1 stays active.
|
||||
"""
|
||||
if not settings.reconcile_skip_blocked_enabled:
|
||||
return False
|
||||
try:
|
||||
proj = projects.get_project_by_repo(task.get("repo") or "")
|
||||
if proj is None:
|
||||
return True # cannot resolve the project -> conservative skip
|
||||
pid = proj.plane_project_id
|
||||
states = get_project_states(pid)
|
||||
issue_id = task.get("plane_id") or task.get("plane_issue_id") or ""
|
||||
cur = fetch_issue_state(issue_id, pid)
|
||||
if cur is None:
|
||||
return True # Plane unreachable / no state -> conservative skip
|
||||
return cur in {states.get("blocked"), states.get("needs_input")}
|
||||
except Exception as e: # noqa: BLE001 - never break the tick
|
||||
logger.warning(
|
||||
f"reconciler Guard 2: blocked-check failed for task "
|
||||
f"{task.get('id')}, skipping conservatively: {e}"
|
||||
)
|
||||
return True
|
||||
|
||||
# -- F-2: plane-side ---------------------------------------------------
|
||||
def reconcile_plane_once(self) -> None:
|
||||
"""One F-2 pass: poll Plane per project and replay missed transitions."""
|
||||
|
||||
@@ -142,8 +142,14 @@ def _check_review_approved_by_branch(check_fn, repo: str, work_item_id: str, bra
|
||||
return False, f"Error finding PR: {e}"
|
||||
|
||||
|
||||
def _developer_retry_count(task_id: int) -> int:
|
||||
"""How many developer runs have already happened for this task."""
|
||||
def developer_retry_count(task_id: int) -> int:
|
||||
"""How many developer runs have already happened for this task.
|
||||
|
||||
Single source of truth for the developer-retry count: the rollback path
|
||||
(REQUEST_CHANGES / test-fail / merge-gate) and the ORCH-060 reconciler guard
|
||||
both read the cap from here, so the SQL is never duplicated. ``task`` is
|
||||
considered *escalated* once this reaches ``MAX_DEVELOPER_RETRIES``.
|
||||
"""
|
||||
conn = get_db()
|
||||
n = conn.execute(
|
||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
||||
@@ -153,6 +159,10 @@ def _developer_retry_count(task_id: int) -> int:
|
||||
return n
|
||||
|
||||
|
||||
# Backward-compat private alias — existing internal call sites keep working.
|
||||
_developer_retry_count = developer_retry_count
|
||||
|
||||
|
||||
def advance_stage(
|
||||
task_id: int,
|
||||
current_stage: str,
|
||||
|
||||
Reference in New Issue
Block a user