Закрывает F-1-пробел ORCH-068: терминал-исключение и in-memory dedup (изначально только F-2) распространены на gate-side путь реконсилятора, устраняя ложное «🔧 reconciler: ET-002 done разблокирована (потерян webhook)» (особенно после рестарта). - D1: новый _resolve_issue_status — один сетевой резолв Plane-статуса задачи за тик (states, groups, state_uuid) после дешёвых локальных гардов; never-raise -> ({}, {}, None) при сбое. - D2: безусловный терминал-скип ДО Guard 2 (группа Plane completed/ cancelled, fallback на логические ключи done/cancelled, либо стадия в БД орка ∈ {done, cancelled}); skipped_terminal_total++, не подчинён reconcile_skip_blocked_enabled. - D3: _is_blocked_or_needs_input переиспользует резолв D1 (опц. аргументы, _UNSET -> самостоятельный резолв для прямых/легаси-вызовов; 1:1). - D4: вызов _note_unblock на F-1 теперь передаёт state_uuid -> dedup работает на обоих путях (deduped_total++ на повторе). Анти-регресс: легитимный unblock не-терминальной застрявшей задачи по-прежнему advance + один Telegram. STAGE_TRANSITIONS / QG_CHECKS / схема БД / сигнатуры advance_*/_note_unblock / форма status() / новые флаги — без изменений; never-raise сохранён. Тесты: tests/test_reconciler.py TC-86-01..09/11, tests/test_reconciler_plane.py TC-86-10. Полный прогон зелёный (1069). Refs: ORCH-086 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
604 lines
28 KiB
Python
604 lines
28 KiB
Python
"""ORCH-053: stuck-task reconciler (sweeper for lost webhooks).
|
|
|
|
The pipeline advances ONLY on incoming webhooks (Plane status / Gitea CI/PR). A
|
|
dropped event (502 on a rebuilding instance, no Plane/Gitea retries, an
|
|
unresolved ``sha->branch``) leaves the source of truth (the gate / the Plane
|
|
status) changed while the task stays put — a silently stuck task (incident
|
|
ORCH-044). None of the existing resilience layers (``requeue_running_jobs``,
|
|
orphan-recovery, events de-dup, ``ci_poll``) reconcile this
|
|
"source-of-truth != task-stage" drift; they all work at the jobs/agent_runs
|
|
level, not the stage transition.
|
|
|
|
This module is a background daemon thread (modelled on ``queue_worker``) that
|
|
periodically replays the missed transition through the SAME standard gates /
|
|
handlers a webhook would use:
|
|
|
|
* **F-1 gate-side** (``reconcile_gate_once``): for each task with
|
|
``stage != 'done'``, no active job and ``age(updated_at) >=
|
|
grace_for_stage(stage)``, do a read-only pre-evaluation of the stage's
|
|
canonical quality gate; green -> advance through the unchanged
|
|
``stage_engine.advance_stage(..., finished_agent=None)``; red -> silence
|
|
(no advance, no notification). ``analysis`` is NOT reconciled here (human
|
|
gate; owned by F-2). **ORCH-060:** before the gate is even evaluated, F-1
|
|
skips (silently) tasks that are waiting for a human — Guard 1: escalated by
|
|
developer retries (``developer_retry_count >= MAX_DEVELOPER_RETRIES``,
|
|
deterministic, local; closes the ET-013 bounce loop) checked first, then
|
|
Guard 2: an explicit Plane ``Blocked`` / ``Needs Input`` state (Variant A —
|
|
networked, never-raise -> conservative skip).
|
|
|
|
* **F-2 plane-side** (``reconcile_plane_once``): poll the Plane API per
|
|
project (``list_issues_by_state``) and replay In Progress / Approved /
|
|
Rejected through ``webhooks.plane.handle_status_start`` /
|
|
``handle_verdict`` (no logic duplicated).
|
|
|
|
Invariants: source of truth is the gate / Plane (not the event); advance only
|
|
via ``advance_stage``; idempotency (active-job guard + atomic create-claim +
|
|
grace + ``max_concurrency=1``); never-raise per unit of work; silence when in
|
|
sync; restart-safe; kill-switch ``ORCH_RECONCILE_ENABLED``
|
|
(+ ``ORCH_RECONCILE_PLANE_ENABLED`` mutes only F-2). The DB schema and the
|
|
registries (``STAGE_TRANSITIONS`` / ``QG_CHECKS``) are unchanged.
|
|
|
|
See docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md and the
|
|
cross-cutting docs/architecture/adr/adr-0007-reconciler.md.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import threading
|
|
from datetime import datetime, timezone
|
|
|
|
from .config import settings
|
|
from .db import (
|
|
get_active_tasks_for_reconcile,
|
|
get_task_by_plane_id,
|
|
has_active_job_for_task,
|
|
)
|
|
from .stage_engine import (
|
|
advance_if_gate_passed,
|
|
developer_retry_count,
|
|
MAX_DEVELOPER_RETRIES,
|
|
)
|
|
from .stages import get_qg_for_stage
|
|
from .plane_sync import (
|
|
fetch_issue_state,
|
|
get_project_states,
|
|
get_project_state_groups,
|
|
list_issues_by_state,
|
|
)
|
|
from .webhooks.plane import handle_status_start, handle_verdict
|
|
from .notifications import send_telegram, link_for
|
|
from . import projects
|
|
from . import task_deps
|
|
|
|
logger = logging.getLogger("orchestrator.reconciler")
|
|
|
|
# ORCH-086 (D3): sentinel distinguishing "caller did not pass a pre-resolved
|
|
# state_uuid" (Guard 2 self-resolves, backward-compatible 1-arg call) from an
|
|
# explicit ``None`` (Plane unreachable -> conservative skip).
|
|
_UNSET = object()
|
|
|
|
|
|
def _parse_grace_overrides(raw: str) -> dict[str, int]:
|
|
"""Parse ``reconcile_grace_overrides_json`` into {stage: seconds}.
|
|
|
|
Invalid / non-object JSON -> {} (caller falls back to the default grace),
|
|
mirroring the never-raise contract of ``agent_timeout_overrides_json``.
|
|
"""
|
|
if not raw or not raw.strip():
|
|
return {}
|
|
try:
|
|
data = json.loads(raw)
|
|
except (ValueError, TypeError) as e:
|
|
logger.warning(f"reconcile_grace_overrides_json is not valid JSON, ignoring: {e}")
|
|
return {}
|
|
if not isinstance(data, dict):
|
|
logger.warning("reconcile_grace_overrides_json must be a JSON object, ignoring")
|
|
return {}
|
|
out: dict[str, int] = {}
|
|
for k, v in data.items():
|
|
try:
|
|
out[str(k)] = int(v)
|
|
except (ValueError, TypeError):
|
|
logger.warning(f"reconcile_grace_overrides_json[{k}] is not an int, ignoring")
|
|
return out
|
|
|
|
|
|
def grace_for_stage(stage: str) -> int:
|
|
"""Per-stage "stuck" threshold (seconds): override from JSON, else default."""
|
|
overrides = _parse_grace_overrides(settings.reconcile_grace_overrides_json)
|
|
return overrides.get(stage, settings.reconcile_grace_default_s)
|
|
|
|
|
|
def _age_seconds_iso(ts: str) -> float | None:
|
|
"""Age in seconds of a Plane ISO-8601 timestamp (e.g. issue.updated_at).
|
|
|
|
Returns None when the value is missing / unparseable (caller decides the
|
|
fallback). Handles a trailing 'Z' and treats naive timestamps as UTC.
|
|
"""
|
|
if not ts:
|
|
return None
|
|
try:
|
|
text = ts.strip()
|
|
if text.endswith("Z"):
|
|
text = text[:-1] + "+00:00"
|
|
dt = datetime.fromisoformat(text)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return (datetime.now(timezone.utc) - dt).total_seconds()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
class Reconciler:
|
|
"""Background daemon that reconciles webhook-induced stage drift.
|
|
|
|
Modelled on ``QueueWorker``: a plain ``threading.Thread(daemon=True)`` +
|
|
``threading.Event`` for a clean stop. No correctness-critical state is held
|
|
in memory — every tick re-reads the DB / Plane; the observability counters
|
|
(``last_run_ts`` / ``unblocked_total`` / ``last_unblocked``) are best-effort
|
|
and may reset on restart (AC-11 allows this).
|
|
"""
|
|
|
|
def __init__(self, interval_s: float | None = None):
|
|
self.interval_s = (
|
|
interval_s if interval_s is not None else settings.reconcile_interval_s
|
|
)
|
|
self._stop = threading.Event()
|
|
self._thread: threading.Thread | None = None
|
|
# Best-effort observability (F-4).
|
|
self.last_run_ts: float | None = None
|
|
self.unblocked_total: int = 0
|
|
self.last_unblocked: str | None = None
|
|
# ORCH-068 observability: terminal-state skips and dedup suppressions.
|
|
self.skipped_terminal_total: int = 0
|
|
self.deduped_total: int = 0
|
|
# ORCH-068 (TR-3): in-memory dedup guard {issue_id -> last unblocked
|
|
# state uuid}. Best-effort (resets on restart, like unblocked_total);
|
|
# suppresses a repeat unblock notification for the same issue+state.
|
|
self._unblock_dedup: dict[str, str] = {}
|
|
|
|
# -- F-1: gate-side ----------------------------------------------------
|
|
def reconcile_gate_once(self) -> None:
|
|
"""One F-1 pass over all non-terminal tasks (per-task never-raise)."""
|
|
if not settings.reconcile_enabled:
|
|
return
|
|
for task in get_active_tasks_for_reconcile():
|
|
try:
|
|
self._reconcile_gate_task(task)
|
|
except Exception as e: # noqa: BLE001 - isolate one task's failure
|
|
logger.error(
|
|
f"reconciler F-1: task {task.get('id')} "
|
|
f"(stage={task.get('stage')}) failed: {e}"
|
|
)
|
|
# ORCH-026 (B-3) backstop: surface ANY dependency deadlock in the declared
|
|
# graph, even one whose tasks are not individually evaluated above (e.g. no
|
|
# active queued job). One alert per cycle; never-raise.
|
|
if settings.task_deps_enabled:
|
|
try:
|
|
cyc = task_deps.find_any_cycle()
|
|
if cyc:
|
|
task_deps.handle_cycle(cyc)
|
|
except Exception as e: # noqa: BLE001 - never break the sweep
|
|
logger.error(f"reconciler F-1: cycle backstop failed: {e}")
|
|
|
|
def _reconcile_gate_task(self, task: dict) -> None:
|
|
task_id = task["id"]
|
|
stage = task["stage"]
|
|
# AC-16: analysis is a human gate -> owned by F-2, never F-1.
|
|
if stage == "analysis":
|
|
return
|
|
# ORCH-086 D2 (DB-side terminal drift): ``get_active_tasks_for_reconcile``
|
|
# filters ``stage != 'done'`` but NOT ``cancelled``. A task already
|
|
# terminal in the orchestrator DB is fully in sync by definition -> skip
|
|
# before any gate/network work, mirroring the F-2 terminal-skip counter
|
|
# (single semantics with ``_reconcile_plane_issue``). Local, no network.
|
|
if stage in ("done", "cancelled"):
|
|
self.skipped_terminal_total += 1
|
|
return
|
|
# created / done have no gate to evaluate.
|
|
if get_qg_for_stage(stage) is None:
|
|
return
|
|
# AC-3: a queued/running job means the task is legitimately in flight (or
|
|
# a live webhook just enqueued one) -> do not touch it.
|
|
if has_active_job_for_task(task_id):
|
|
return
|
|
# AC-5: respect the per-stage grace ("stuck", not just busy).
|
|
age_s = task.get("age_s") or 0
|
|
if age_s < grace_for_stage(stage):
|
|
return
|
|
# ORCH-060 Guard 1: escalated tasks (developer retries reached the cap) are
|
|
# terminal — they wait for a human, not the sweeper. Without this, a task
|
|
# whose CI is green but whose reviewer kept sending REQUEST_CHANGES until the
|
|
# cap would be re-unblocked every tick (incident ET-013, infinite bounce).
|
|
# Deterministic, local SQL, no network — and checked FIRST (cheapest).
|
|
if developer_retry_count(task_id) >= MAX_DEVELOPER_RETRIES:
|
|
return
|
|
# ORCH-086 D1: single networked resolve per task per tick, AFTER the cheap
|
|
# local guards (so busy/young/escalated tasks never hit Plane). Feeds the
|
|
# Plane-side terminal-skip (D2), Guard 2 (D3) and the state_uuid handed to
|
|
# _note_unblock (D4) — no duplicate fetch.
|
|
states, groups, state_uuid = self._resolve_issue_status(task)
|
|
# ORCH-086 D2 (Plane-side terminal-skip), UNCONDITIONAL (not gated by
|
|
# reconcile_skip_blocked_enabled, which gates ONLY Guard 2). A task whose
|
|
# Plane status is terminal (group completed/cancelled, or the logical
|
|
# done/cancelled fallback) is fully in sync -> never a real unblock.
|
|
# Runs BEFORE Guard 2 so terminal tasks correctly bump skipped_terminal_total
|
|
# instead of being swallowed by Guard 2's conservative path. Closes the F-1
|
|
# gap of ORCH-068 (which only covered F-2); fixes the spurious
|
|
# "ET-002 ... разблокирована" notification.
|
|
if self._is_terminal_state(state_uuid, states, groups):
|
|
self.skipped_terminal_total += 1
|
|
return
|
|
# ORCH-060 Guard 2: respect an explicit human gate (Blocked / Needs Input).
|
|
# Reuses the D1 resolve (ORCH-086 D3) so the tick makes a single fetch.
|
|
if self._is_blocked_or_needs_input(task, states, state_uuid):
|
|
return
|
|
# ORCH-026 Guard 3 (B-5): a task blocked by an unfinished declared
|
|
# dependency is legitimately waiting, NOT stuck -> F-1 must not advance it
|
|
# past its depends-on (mirrors the Blocked/Needs-Input skip). Local DB,
|
|
# never-raise (is_task_ready fails OPEN). If the wait is actually a
|
|
# dependency DEADLOCK (cycle), surface it (Blocked + alert) once.
|
|
if settings.task_deps_enabled:
|
|
ready, _waiting = task_deps.is_task_ready(task_id)
|
|
if not ready:
|
|
cyc = task_deps.detect_cycle(task_id)
|
|
if cyc:
|
|
task_deps.handle_cycle(cyc)
|
|
return
|
|
result = advance_if_gate_passed(
|
|
task_id,
|
|
stage,
|
|
task["repo"],
|
|
task.get("work_item_id") or "",
|
|
task.get("branch") or "",
|
|
)
|
|
if result is not None and getattr(result, "advanced", False):
|
|
# ORCH-086 D4: pass state_uuid so the in-memory dedup guard covers F-1
|
|
# too (a repeat tick for the same issue+state is suppressed; survives
|
|
# the "first pass after restart" symptom together with the D2 skip).
|
|
self._note_unblock(
|
|
task.get("work_item_id") or str(task_id), stage, state_uuid
|
|
)
|
|
|
|
def _resolve_issue_status(
|
|
self, task: dict
|
|
) -> tuple[dict, dict, str | None]:
|
|
"""ORCH-086 D1: one networked resolve per task per tick.
|
|
|
|
Returns ``(states, groups, current_state_uuid)``. A single
|
|
``fetch_issue_state`` plus the cached (ORCH-068 TTL)
|
|
``get_project_states`` / ``get_project_state_groups``. The result feeds
|
|
the terminal-skip (D2), Guard 2 (D3) and the ``state_uuid`` handed to
|
|
``_note_unblock`` (D4), so the tick never fetches the same issue twice.
|
|
|
|
**never-raise.** On any failure / unresolved project / missing state ->
|
|
``({} or states, {} or groups, None)`` so callers apply their
|
|
conservative fallback (terminal-skip = not terminal; Guard 2 = skip).
|
|
"""
|
|
try:
|
|
proj = projects.get_project_by_repo(task.get("repo") or "")
|
|
if proj is None:
|
|
return {}, {}, None
|
|
pid = proj.plane_project_id
|
|
states = get_project_states(pid)
|
|
groups = get_project_state_groups(pid)
|
|
issue_id = task.get("plane_id") or task.get("plane_issue_id") or ""
|
|
state_uuid = fetch_issue_state(issue_id, pid)
|
|
return states or {}, groups or {}, state_uuid
|
|
except Exception as e: # noqa: BLE001 - never break the tick
|
|
logger.warning(
|
|
f"reconciler D1: status resolve failed for task "
|
|
f"{task.get('id')}, treating as unresolved: {e}"
|
|
)
|
|
return {}, {}, None
|
|
|
|
def _is_blocked_or_needs_input(
|
|
self, task: dict, states: dict | None = None, state_uuid=_UNSET
|
|
) -> bool:
|
|
"""Guard 2 (ORCH-060 + ORCH-066): is this issue waiting for a human OR in
|
|
an active orchestrator wait that F-1 must not "revive"?
|
|
|
|
Variant A (no schema migration): resolve the task's Plane project, fetch
|
|
the issue's current state uuid and compare against a skip-set. ``tasks``
|
|
has no status column, so the live Plane state is the source of truth.
|
|
|
|
Skip-set = explicit human gates (``blocked`` / ``needs_input``) PLUS the
|
|
ORCH-066 active waits (``awaiting_deploy`` / ``deploying`` / ``monitoring``,
|
|
BR-13). **Anti-regress (CRITICAL):** the active-wait keys alias onto
|
|
``in_review`` / ``in_progress`` / ``done`` on a project that did not create
|
|
them. Adding them verbatim would make F-1 wrongly skip enduro
|
|
In Progress / Done tasks (regression of ORCH-053/060). So they are
|
|
included ONLY when DISTINCT from the project's base working statuses
|
|
(i.e. actually created as separate statuses): enduro collapses them to {}
|
|
-> zero regress; orchestrator keeps three real statuses -> BR-13.
|
|
|
|
**Never-raise, conservative fallback.** Any error / unresolved project /
|
|
missing state -> return ``True`` (treat as "possibly blocked" -> skip):
|
|
NOT unblocking a task is always safe, whereas wrongly unblocking a
|
|
human-gated task re-introduces the bounce we are trying to kill. The
|
|
sub-flag ``reconcile_skip_blocked_enabled`` disables ONLY this networked
|
|
guard (escape hatch for a Plane outage); Guard 1 stays active.
|
|
|
|
**ORCH-086 D3:** the production caller (``_reconcile_gate_task``) passes
|
|
the already-resolved ``(states, state_uuid)`` from the single D1 fetch, so
|
|
the tick does not hit Plane twice. When ``state_uuid`` is left ``_UNSET``
|
|
(direct/legacy 1-arg call) Guard 2 self-resolves via ``_resolve_issue_status``
|
|
— behaviour identical to the pre-ORCH-086 code.
|
|
"""
|
|
if not settings.reconcile_skip_blocked_enabled:
|
|
return False
|
|
try:
|
|
if state_uuid is _UNSET:
|
|
# Backward-compatible self-resolve (direct callers / tests).
|
|
states, _groups, state_uuid = self._resolve_issue_status(task)
|
|
if not states or state_uuid is None:
|
|
return True # unresolved project / Plane unreachable -> conservative skip
|
|
cur = state_uuid
|
|
# ORCH-066 BR-13: active orchestrator waits, minus base working
|
|
# statuses so aliased (enduro) keys never widen the skip-set.
|
|
base_working = {
|
|
states.get(k) for k in (
|
|
"backlog", "todo", "in_progress", "in_review", "review",
|
|
"architecture", "development", "testing",
|
|
"approved", "rejected", "done",
|
|
)
|
|
}
|
|
extra_waits = {
|
|
states.get("awaiting_deploy"),
|
|
states.get("deploying"),
|
|
states.get("monitoring"),
|
|
} - base_working - {None}
|
|
skip_set = {states.get("blocked"), states.get("needs_input")} | extra_waits
|
|
return cur in skip_set
|
|
except Exception as e: # noqa: BLE001 - never break the tick
|
|
logger.warning(
|
|
f"reconciler Guard 2: blocked-check failed for task "
|
|
f"{task.get('id')}, skipping conservatively: {e}"
|
|
)
|
|
return True
|
|
|
|
# -- F-2: plane-side ---------------------------------------------------
|
|
def reconcile_plane_once(self) -> None:
|
|
"""One F-2 pass: poll Plane per project and replay missed transitions."""
|
|
if not settings.reconcile_enabled or not settings.reconcile_plane_enabled:
|
|
return
|
|
for proj in projects.PROJECTS:
|
|
try:
|
|
self._reconcile_plane_project(proj)
|
|
except Exception as e: # noqa: BLE001 - isolate one project's failure
|
|
logger.error(f"reconciler F-2: project {proj.repo} failed: {e}")
|
|
|
|
def _reconcile_plane_project(self, proj) -> None:
|
|
pid = proj.plane_project_id
|
|
# Resolve the actionable state uuids per-project (never hardcode).
|
|
# ORCH-066 (AC-19): the start/resume trigger is `To Analyse` (was
|
|
# In Progress). On a project without that status, `to_analyse` aliases to
|
|
# the project's own `in_progress` UUID, so enduro behaviour is identical
|
|
# (and `list_issues_by_state` deduplicates the uuid via its internal set).
|
|
states = get_project_states(pid)
|
|
# ORCH-066 (AC-19): start/resume trigger is `To Analyse`.
|
|
to_analyse = states["to_analyse"]
|
|
# ORCH-068 D1: {uuid -> group} from the SAME cache record (no extra
|
|
# fetch); empty when the API was unreachable -> per-issue fallback by key.
|
|
groups = get_project_state_groups(pid)
|
|
approved = states["approved"]
|
|
rejected = states["rejected"]
|
|
issues = list_issues_by_state(pid, [to_analyse, approved, rejected])
|
|
for issue in issues:
|
|
try:
|
|
self._reconcile_plane_issue(
|
|
issue, pid, to_analyse, approved, rejected, states, groups
|
|
)
|
|
except Exception as e: # noqa: BLE001 - isolate one issue's failure
|
|
logger.error(
|
|
f"reconciler F-2: issue {issue.get('id')} failed: {e}"
|
|
)
|
|
|
|
def _is_terminal_state(
|
|
self, state_uuid: str, states: dict, groups: dict
|
|
) -> bool:
|
|
"""ORCH-068 D1: is ``state_uuid`` a terminal (completed/cancelled) state?
|
|
|
|
Primary discriminator is the Plane **state group** (project-independent,
|
|
robust to UUID aliasing after status renames): ``group`` in
|
|
``{completed, cancelled}`` -> terminal. When the group is unavailable
|
|
(API gave no ``group`` / we fell back to ``_DEFAULT_STATES``), fall back
|
|
to the logical terminal keys ``done`` / ``cancelled``.
|
|
"""
|
|
if not state_uuid:
|
|
return False
|
|
grp = groups.get(state_uuid)
|
|
if grp:
|
|
return grp in {"completed", "cancelled"}
|
|
# Fallback (group unknown): logical terminal keys for this project.
|
|
return state_uuid in {states.get("done"), states.get("cancelled")}
|
|
|
|
def _reconcile_plane_issue(
|
|
self, issue: dict, project_id: str,
|
|
to_analyse: str, approved: str, rejected: str,
|
|
states: dict, groups: dict,
|
|
) -> None:
|
|
issue_id = str(issue.get("id") or "")
|
|
if not issue_id:
|
|
return
|
|
state = issue.get("state")
|
|
new_state = state.get("id") if isinstance(state, dict) else state
|
|
|
|
# ORCH-068 D1: a terminal issue (Done / Cancelled) is fully in sync by
|
|
# definition -> never actionable. Excluded per-issue (not by narrowing
|
|
# `wanted`) because UUID aliasing can make a terminal uuid collide with
|
|
# an actionable one — only the state GROUP disentangles them. Restores
|
|
# the silence-when-in-sync invariant (AC-1/AC-2).
|
|
if self._is_terminal_state(new_state, states, groups):
|
|
self.skipped_terminal_total += 1
|
|
return
|
|
|
|
# Grace ("lost, not merely delayed"): use the issue's own updated_at age.
|
|
# A missing/unparseable timestamp is treated as old enough (the active-job
|
|
# guard + atomic create-claim still prevent doubling).
|
|
age = _age_seconds_iso(issue.get("updated_at") or "")
|
|
if age is not None and age < settings.reconcile_grace_default_s:
|
|
return
|
|
|
|
task = get_task_by_plane_id(issue_id)
|
|
# AC-3/AC-4: a live webhook is in flight for this task -> skip.
|
|
if task is not None and has_active_job_for_task(task["id"]):
|
|
return
|
|
|
|
# issue_data in the shape the plane handlers expect; missing name /
|
|
# description are pulled by the handlers themselves (fetch_issue_fields).
|
|
issue_data = {
|
|
"id": issue_id,
|
|
"state": {"id": new_state},
|
|
"project": project_id,
|
|
"name": issue.get("name", ""),
|
|
"description_stripped": issue.get("description_stripped", ""),
|
|
}
|
|
|
|
if new_state == to_analyse and task is None:
|
|
# To Analyse without a task -> start the pipeline (lost start webhook).
|
|
# ORCH-068 D2: confirm a REAL change (the task now exists) before
|
|
# announcing — a no-op dispatch stays silent.
|
|
self._dispatch(handle_status_start, issue_data, project_id)
|
|
if get_task_by_plane_id(issue_id) is not None:
|
|
self._note_unblock(issue_id, "analysis", new_state)
|
|
elif new_state == to_analyse and task is not None:
|
|
# To Analyse with an existing (idle) task -> resume the analyst from
|
|
# Needs Input (lost resume webhook). handle_status_start applies its
|
|
# own busy-guard / start-vs-resume fork.
|
|
self._dispatch(handle_status_start, issue_data, project_id)
|
|
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"], new_state)
|
|
|
|
elif new_state == approved and task is not None:
|
|
# Approved but the stage never advanced -> replay the verdict.
|
|
stage_before = task["stage"]
|
|
self._dispatch(handle_verdict, issue_data, project_id, approved=True)
|
|
if self._stage_changed(issue_id, stage_before):
|
|
self._note_unblock(
|
|
task.get("work_item_id") or issue_id, stage_before, new_state
|
|
)
|
|
elif new_state == rejected and task is not None:
|
|
# Rejected but never rolled back -> replay the verdict.
|
|
stage_before = task["stage"]
|
|
self._dispatch(handle_verdict, issue_data, project_id, approved=False)
|
|
if self._stage_changed(issue_id, stage_before):
|
|
self._note_unblock(
|
|
task.get("work_item_id") or issue_id, stage_before, new_state
|
|
)
|
|
# else: everything is in sync -> silence (AC-10).
|
|
|
|
@staticmethod
|
|
def _stage_changed(issue_id: str, stage_before: str) -> bool:
|
|
"""ORCH-068 D2: did the dispatched handler actually move the stage?
|
|
|
|
Re-reads the task after ``_dispatch`` and compares to the captured
|
|
``stage_before``. A no-op replay (the task was already in the target
|
|
state) leaves the stage unchanged -> no unblock notification.
|
|
"""
|
|
after = get_task_by_plane_id(issue_id)
|
|
stage_after = after["stage"] if after else stage_before
|
|
return stage_after != stage_before
|
|
|
|
@staticmethod
|
|
def _dispatch(coro_fn, *args, **kwargs) -> None:
|
|
"""Run an async plane handler from this sync thread.
|
|
|
|
``asyncio.run`` spins a fresh event loop per call, which is required
|
|
because ``handle_verdict -> _try_advance_stage`` uses
|
|
``asyncio.to_thread`` (needs a running loop). The handlers are
|
|
REUSED verbatim — no pipeline logic is duplicated here.
|
|
"""
|
|
asyncio.run(coro_fn(*args, **kwargs))
|
|
|
|
# -- observability (F-4) ----------------------------------------------
|
|
def _note_unblock(
|
|
self, work_item_id: str, stage: str, state_uuid: str | None = None
|
|
) -> None:
|
|
"""Record + announce that a stuck task was unblocked (AC-12).
|
|
|
|
Fires only on an actual state change (an advance / replayed transition),
|
|
never per idle tick, so it does not conflict with AC-9 / AC-10.
|
|
|
|
ORCH-068 (TR-3): an in-memory dedup guard keyed by ``issue_id ->
|
|
state_uuid`` suppresses a repeat notification for the same issue+state
|
|
if a future no-op path ever reaches here. ``state_uuid`` is the issue's
|
|
Plane state; ``work_item_id`` doubles as the issue id for the
|
|
pipeline-start case (which has no work item yet).
|
|
"""
|
|
dedup_key = work_item_id
|
|
if state_uuid is not None and self._unblock_dedup.get(dedup_key) == state_uuid:
|
|
self.deduped_total += 1
|
|
return
|
|
if state_uuid is not None:
|
|
self._unblock_dedup[dedup_key] = state_uuid
|
|
|
|
self.unblocked_total += 1
|
|
self.last_unblocked = work_item_id
|
|
logger.info(
|
|
f"reconciler: {work_item_id} {stage} разблокирована (потерян webhook)"
|
|
)
|
|
if settings.reconcile_notify_unblock:
|
|
try:
|
|
send_telegram(
|
|
f"\U0001f527 reconciler: {link_for(work_item_id)} {stage} "
|
|
f"разблокирована (потерян webhook)"
|
|
)
|
|
except Exception as e: # noqa: BLE001 - never break the tick
|
|
logger.warning(f"reconciler: unblock telegram failed: {e}")
|
|
|
|
# -- loop / lifecycle --------------------------------------------------
|
|
def _tick(self) -> None:
|
|
if settings.reconcile_enabled:
|
|
self.reconcile_gate_once() # F-1
|
|
if settings.reconcile_plane_enabled:
|
|
self.reconcile_plane_once() # F-2
|
|
self.last_run_ts = datetime.now(timezone.utc).timestamp()
|
|
|
|
def _run(self) -> None:
|
|
logger.info(
|
|
f"Reconciler started (interval={self.interval_s}s, "
|
|
f"enabled={settings.reconcile_enabled}, "
|
|
f"plane_enabled={settings.reconcile_plane_enabled})"
|
|
)
|
|
while not self._stop.is_set():
|
|
try:
|
|
self._tick()
|
|
except Exception as e: # noqa: BLE001 - outer never-raise
|
|
logger.error(f"Reconciler loop error: {e}")
|
|
self._stop.wait(self.interval_s)
|
|
logger.info("Reconciler stopped")
|
|
|
|
def start(self) -> None:
|
|
"""Start the daemon thread (idempotent: a live thread is a no-op)."""
|
|
if self._thread and self._thread.is_alive():
|
|
return
|
|
self._stop.clear()
|
|
self._thread = threading.Thread(
|
|
target=self._run, name="reconciler", daemon=True
|
|
)
|
|
self._thread.start()
|
|
|
|
def stop(self, timeout: float = 5.0) -> None:
|
|
self._stop.set()
|
|
if self._thread:
|
|
self._thread.join(timeout=timeout)
|
|
|
|
def status(self) -> dict:
|
|
"""Reconcile snapshot for /queue observability."""
|
|
return {
|
|
"enabled": settings.reconcile_enabled,
|
|
"plane_enabled": settings.reconcile_plane_enabled,
|
|
"interval": self.interval_s,
|
|
"last_run_ts": self.last_run_ts,
|
|
"unblocked_total": self.unblocked_total,
|
|
"last_unblocked": self.last_unblocked,
|
|
# ORCH-068 observability.
|
|
"skipped_terminal_total": self.skipped_terminal_total,
|
|
"deduped_total": self.deduped_total,
|
|
}
|
|
|
|
|
|
# Module-level singleton used by the FastAPI lifespan.
|
|
reconciler = Reconciler()
|