Конвейер продвигается только входящими webhook; потерянное событие (502 на ребилде, отсутствие ретраев у Plane/Gitea, неразрезолвленный sha→branch) оставляет задачу молча застрявшей (класс инцидента ORCH-044). Новый фоновый daemon-поток src/reconciler.py (паттерн queue_worker) доигрывает пропущенный переход через те же штатные гейты/обработчики, что и webhook: - F-1 gate-side: для задач stage≠done, без активного job и age(updated_at) ≥ grace_for_stage(stage) — read-only пред-оценка канонического QG; зелёный → stage_engine.advance_stage(..., finished_agent=None); красный → тишина (спам нотификаций структурно невозможен). analysis F-1 не трогает (человеческий гейт). - F-2 plane-side: опрос Plane API per-project (plane_sync.list_issues_by_state, курсорная пагинация, never-raise) → реплей In Progress/Approved/Rejected через существующие handle_status_start/handle_verdict (async из sync-потока, asyncio.run). - F-3: усиление sha→branch в handle_ci_status — БД-fallback по единственной development-задаче repo (неоднозначность → не резолвим), debug→info. - Анти-дубль на создании (db.create_task_atomic под process-wide Lock): гонка reconcile↔webhook не плодит второй task/branch/worktree/analyst-job (AC-4). - F-4 observability: лог-строка разблокировки + Telegram + блок reconcile в /queue. Старт/стоп в main.lifespan (после worker.start() / перед worker.stop()), restart-safe, never-raise на единицу работы. Kill-switches ORCH_RECONCILE_ENABLED / ORCH_RECONCILE_PLANE_ENABLED + grace-настройки. Схема БД и реестры STAGE_TRANSITIONS/QG_CHECKS не менялись. Тесты: test_reconciler.py, test_reconciler_plane.py, test_gitea_sha_resolve.py, test_config.py (33 новых, 563 всего зелёные). Документация обновлена (golden source): architecture/README.md, INFRA.md, README.md, CHANGELOG.md, adr-0007 → accepted. Refs: ORCH-053 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
333 lines
14 KiB
Python
333 lines
14 KiB
Python
"""ORCH-053: stuck-task reconciler (sweeper for lost webhooks).
|
|
|
|
The pipeline advances ONLY on incoming webhooks (Plane status / Gitea CI/PR). A
|
|
dropped event (502 on a rebuilding instance, no Plane/Gitea retries, an
|
|
unresolved ``sha->branch``) leaves the source of truth (the gate / the Plane
|
|
status) changed while the task stays put — a silently stuck task (incident
|
|
ORCH-044). None of the existing resilience layers (``requeue_running_jobs``,
|
|
orphan-recovery, events de-dup, ``ci_poll``) reconcile this
|
|
"source-of-truth != task-stage" drift; they all work at the jobs/agent_runs
|
|
level, not the stage transition.
|
|
|
|
This module is a background daemon thread (modelled on ``queue_worker``) that
|
|
periodically replays the missed transition through the SAME standard gates /
|
|
handlers a webhook would use:
|
|
|
|
* **F-1 gate-side** (``reconcile_gate_once``): for each task with
|
|
``stage != 'done'``, no active job and ``age(updated_at) >=
|
|
grace_for_stage(stage)``, do a read-only pre-evaluation of the stage's
|
|
canonical quality gate; green -> advance through the unchanged
|
|
``stage_engine.advance_stage(..., finished_agent=None)``; red -> silence
|
|
(no advance, no notification). ``analysis`` is NOT reconciled here (human
|
|
gate; owned by F-2).
|
|
|
|
* **F-2 plane-side** (``reconcile_plane_once``): poll the Plane API per
|
|
project (``list_issues_by_state``) and replay In Progress / Approved /
|
|
Rejected through ``webhooks.plane.handle_status_start`` /
|
|
``handle_verdict`` (no logic duplicated).
|
|
|
|
Invariants: source of truth is the gate / Plane (not the event); advance only
|
|
via ``advance_stage``; idempotency (active-job guard + atomic create-claim +
|
|
grace + ``max_concurrency=1``); never-raise per unit of work; silence when in
|
|
sync; restart-safe; kill-switch ``ORCH_RECONCILE_ENABLED``
|
|
(+ ``ORCH_RECONCILE_PLANE_ENABLED`` mutes only F-2). The DB schema and the
|
|
registries (``STAGE_TRANSITIONS`` / ``QG_CHECKS``) are unchanged.
|
|
|
|
See docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md and the
|
|
cross-cutting docs/architecture/adr/adr-0007-reconciler.md.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import threading
|
|
from datetime import datetime, timezone
|
|
|
|
from .config import settings
|
|
from .db import (
|
|
get_active_tasks_for_reconcile,
|
|
get_task_by_plane_id,
|
|
has_active_job_for_task,
|
|
)
|
|
from .stage_engine import advance_if_gate_passed
|
|
from .stages import get_qg_for_stage
|
|
from .plane_sync import get_project_states, list_issues_by_state
|
|
from .webhooks.plane import handle_status_start, handle_verdict
|
|
from .notifications import send_telegram
|
|
from . import projects
|
|
|
|
logger = logging.getLogger("orchestrator.reconciler")
|
|
|
|
|
|
def _parse_grace_overrides(raw: str) -> dict[str, int]:
|
|
"""Parse ``reconcile_grace_overrides_json`` into {stage: seconds}.
|
|
|
|
Invalid / non-object JSON -> {} (caller falls back to the default grace),
|
|
mirroring the never-raise contract of ``agent_timeout_overrides_json``.
|
|
"""
|
|
if not raw or not raw.strip():
|
|
return {}
|
|
try:
|
|
data = json.loads(raw)
|
|
except (ValueError, TypeError) as e:
|
|
logger.warning(f"reconcile_grace_overrides_json is not valid JSON, ignoring: {e}")
|
|
return {}
|
|
if not isinstance(data, dict):
|
|
logger.warning("reconcile_grace_overrides_json must be a JSON object, ignoring")
|
|
return {}
|
|
out: dict[str, int] = {}
|
|
for k, v in data.items():
|
|
try:
|
|
out[str(k)] = int(v)
|
|
except (ValueError, TypeError):
|
|
logger.warning(f"reconcile_grace_overrides_json[{k}] is not an int, ignoring")
|
|
return out
|
|
|
|
|
|
def grace_for_stage(stage: str) -> int:
|
|
"""Per-stage "stuck" threshold (seconds): override from JSON, else default."""
|
|
overrides = _parse_grace_overrides(settings.reconcile_grace_overrides_json)
|
|
return overrides.get(stage, settings.reconcile_grace_default_s)
|
|
|
|
|
|
def _age_seconds_iso(ts: str) -> float | None:
|
|
"""Age in seconds of a Plane ISO-8601 timestamp (e.g. issue.updated_at).
|
|
|
|
Returns None when the value is missing / unparseable (caller decides the
|
|
fallback). Handles a trailing 'Z' and treats naive timestamps as UTC.
|
|
"""
|
|
if not ts:
|
|
return None
|
|
try:
|
|
text = ts.strip()
|
|
if text.endswith("Z"):
|
|
text = text[:-1] + "+00:00"
|
|
dt = datetime.fromisoformat(text)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return (datetime.now(timezone.utc) - dt).total_seconds()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
class Reconciler:
|
|
"""Background daemon that reconciles webhook-induced stage drift.
|
|
|
|
Modelled on ``QueueWorker``: a plain ``threading.Thread(daemon=True)`` +
|
|
``threading.Event`` for a clean stop. No correctness-critical state is held
|
|
in memory — every tick re-reads the DB / Plane; the observability counters
|
|
(``last_run_ts`` / ``unblocked_total`` / ``last_unblocked``) are best-effort
|
|
and may reset on restart (AC-11 allows this).
|
|
"""
|
|
|
|
def __init__(self, interval_s: float | None = None):
|
|
self.interval_s = (
|
|
interval_s if interval_s is not None else settings.reconcile_interval_s
|
|
)
|
|
self._stop = threading.Event()
|
|
self._thread: threading.Thread | None = None
|
|
# Best-effort observability (F-4).
|
|
self.last_run_ts: float | None = None
|
|
self.unblocked_total: int = 0
|
|
self.last_unblocked: str | None = None
|
|
|
|
# -- F-1: gate-side ----------------------------------------------------
|
|
def reconcile_gate_once(self) -> None:
|
|
"""One F-1 pass over all non-terminal tasks (per-task never-raise)."""
|
|
if not settings.reconcile_enabled:
|
|
return
|
|
for task in get_active_tasks_for_reconcile():
|
|
try:
|
|
self._reconcile_gate_task(task)
|
|
except Exception as e: # noqa: BLE001 - isolate one task's failure
|
|
logger.error(
|
|
f"reconciler F-1: task {task.get('id')} "
|
|
f"(stage={task.get('stage')}) failed: {e}"
|
|
)
|
|
|
|
def _reconcile_gate_task(self, task: dict) -> None:
|
|
task_id = task["id"]
|
|
stage = task["stage"]
|
|
# AC-16: analysis is a human gate -> owned by F-2, never F-1.
|
|
if stage == "analysis":
|
|
return
|
|
# created / done have no gate to evaluate.
|
|
if get_qg_for_stage(stage) is None:
|
|
return
|
|
# AC-3: a queued/running job means the task is legitimately in flight (or
|
|
# a live webhook just enqueued one) -> do not touch it.
|
|
if has_active_job_for_task(task_id):
|
|
return
|
|
# AC-5: respect the per-stage grace ("stuck", not just busy).
|
|
age_s = task.get("age_s") or 0
|
|
if age_s < grace_for_stage(stage):
|
|
return
|
|
result = advance_if_gate_passed(
|
|
task_id,
|
|
stage,
|
|
task["repo"],
|
|
task.get("work_item_id") or "",
|
|
task.get("branch") or "",
|
|
)
|
|
if result is not None and getattr(result, "advanced", False):
|
|
self._note_unblock(task.get("work_item_id") or str(task_id), stage)
|
|
|
|
# -- F-2: plane-side ---------------------------------------------------
|
|
def reconcile_plane_once(self) -> None:
|
|
"""One F-2 pass: poll Plane per project and replay missed transitions."""
|
|
if not settings.reconcile_enabled or not settings.reconcile_plane_enabled:
|
|
return
|
|
for proj in projects.PROJECTS:
|
|
try:
|
|
self._reconcile_plane_project(proj)
|
|
except Exception as e: # noqa: BLE001 - isolate one project's failure
|
|
logger.error(f"reconciler F-2: project {proj.repo} failed: {e}")
|
|
|
|
def _reconcile_plane_project(self, proj) -> None:
|
|
pid = proj.plane_project_id
|
|
# Resolve the actionable state uuids per-project (never hardcode).
|
|
states = get_project_states(pid)
|
|
in_progress = states["in_progress"]
|
|
approved = states["approved"]
|
|
rejected = states["rejected"]
|
|
issues = list_issues_by_state(pid, [in_progress, approved, rejected])
|
|
for issue in issues:
|
|
try:
|
|
self._reconcile_plane_issue(
|
|
issue, pid, in_progress, approved, rejected
|
|
)
|
|
except Exception as e: # noqa: BLE001 - isolate one issue's failure
|
|
logger.error(
|
|
f"reconciler F-2: issue {issue.get('id')} failed: {e}"
|
|
)
|
|
|
|
def _reconcile_plane_issue(
|
|
self, issue: dict, project_id: str,
|
|
in_progress: str, approved: str, rejected: str,
|
|
) -> None:
|
|
issue_id = str(issue.get("id") or "")
|
|
if not issue_id:
|
|
return
|
|
state = issue.get("state")
|
|
new_state = state.get("id") if isinstance(state, dict) else state
|
|
|
|
# Grace ("lost, not merely delayed"): use the issue's own updated_at age.
|
|
# A missing/unparseable timestamp is treated as old enough (the active-job
|
|
# guard + atomic create-claim still prevent doubling).
|
|
age = _age_seconds_iso(issue.get("updated_at") or "")
|
|
if age is not None and age < settings.reconcile_grace_default_s:
|
|
return
|
|
|
|
task = get_task_by_plane_id(issue_id)
|
|
# AC-3/AC-4: a live webhook is in flight for this task -> skip.
|
|
if task is not None and has_active_job_for_task(task["id"]):
|
|
return
|
|
|
|
# issue_data in the shape the plane handlers expect; missing name /
|
|
# description are pulled by the handlers themselves (fetch_issue_fields).
|
|
issue_data = {
|
|
"id": issue_id,
|
|
"state": {"id": new_state},
|
|
"project": project_id,
|
|
"name": issue.get("name", ""),
|
|
"description_stripped": issue.get("description_stripped", ""),
|
|
}
|
|
|
|
if new_state == in_progress and task is None:
|
|
# In Progress without a task -> start the pipeline (lost start webhook).
|
|
self._dispatch(handle_status_start, issue_data, project_id)
|
|
self._note_unblock(issue_id, "analysis")
|
|
elif new_state == approved and task is not None:
|
|
# Approved but the stage never advanced -> replay the verdict.
|
|
self._dispatch(handle_verdict, issue_data, project_id, approved=True)
|
|
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"])
|
|
elif new_state == rejected and task is not None:
|
|
# Rejected but never rolled back -> replay the verdict.
|
|
self._dispatch(handle_verdict, issue_data, project_id, approved=False)
|
|
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"])
|
|
# else: everything is in sync -> silence (AC-10).
|
|
|
|
@staticmethod
|
|
def _dispatch(coro_fn, *args, **kwargs) -> None:
|
|
"""Run an async plane handler from this sync thread.
|
|
|
|
``asyncio.run`` spins a fresh event loop per call, which is required
|
|
because ``handle_verdict -> _try_advance_stage`` uses
|
|
``asyncio.to_thread`` (needs a running loop). The handlers are
|
|
REUSED verbatim — no pipeline logic is duplicated here.
|
|
"""
|
|
asyncio.run(coro_fn(*args, **kwargs))
|
|
|
|
# -- observability (F-4) ----------------------------------------------
|
|
def _note_unblock(self, work_item_id: str, stage: str) -> None:
|
|
"""Record + announce that a stuck task was unblocked (AC-12).
|
|
|
|
Fires only on an actual state change (an advance / replayed transition),
|
|
never per idle tick, so it does not conflict with AC-9 / AC-10.
|
|
"""
|
|
self.unblocked_total += 1
|
|
self.last_unblocked = work_item_id
|
|
logger.info(
|
|
f"reconciler: {work_item_id} {stage} разблокирована (потерян webhook)"
|
|
)
|
|
if settings.reconcile_notify_unblock:
|
|
try:
|
|
send_telegram(
|
|
f"\U0001f527 reconciler: {work_item_id} {stage} "
|
|
f"разблокирована (потерян webhook)"
|
|
)
|
|
except Exception as e: # noqa: BLE001 - never break the tick
|
|
logger.warning(f"reconciler: unblock telegram failed: {e}")
|
|
|
|
# -- loop / lifecycle --------------------------------------------------
|
|
def _tick(self) -> None:
|
|
if settings.reconcile_enabled:
|
|
self.reconcile_gate_once() # F-1
|
|
if settings.reconcile_plane_enabled:
|
|
self.reconcile_plane_once() # F-2
|
|
self.last_run_ts = datetime.now(timezone.utc).timestamp()
|
|
|
|
def _run(self) -> None:
|
|
logger.info(
|
|
f"Reconciler started (interval={self.interval_s}s, "
|
|
f"enabled={settings.reconcile_enabled}, "
|
|
f"plane_enabled={settings.reconcile_plane_enabled})"
|
|
)
|
|
while not self._stop.is_set():
|
|
try:
|
|
self._tick()
|
|
except Exception as e: # noqa: BLE001 - outer never-raise
|
|
logger.error(f"Reconciler loop error: {e}")
|
|
self._stop.wait(self.interval_s)
|
|
logger.info("Reconciler stopped")
|
|
|
|
def start(self) -> None:
|
|
"""Start the daemon thread (idempotent: a live thread is a no-op)."""
|
|
if self._thread and self._thread.is_alive():
|
|
return
|
|
self._stop.clear()
|
|
self._thread = threading.Thread(
|
|
target=self._run, name="reconciler", daemon=True
|
|
)
|
|
self._thread.start()
|
|
|
|
def stop(self, timeout: float = 5.0) -> None:
|
|
self._stop.set()
|
|
if self._thread:
|
|
self._thread.join(timeout=timeout)
|
|
|
|
def status(self) -> dict:
|
|
"""Reconcile snapshot for /queue observability."""
|
|
return {
|
|
"enabled": settings.reconcile_enabled,
|
|
"plane_enabled": settings.reconcile_plane_enabled,
|
|
"interval": self.interval_s,
|
|
"last_run_ts": self.last_run_ts,
|
|
"unblocked_total": self.unblocked_total,
|
|
"last_unblocked": self.last_unblocked,
|
|
}
|
|
|
|
|
|
# Module-level singleton used by the FastAPI lifespan.
|
|
reconciler = Reconciler()
|