"""ORCH-053: stuck-task reconciler (sweeper for lost webhooks). The pipeline advances ONLY on incoming webhooks (Plane status / Gitea CI/PR). A dropped event (502 on a rebuilding instance, no Plane/Gitea retries, an unresolved ``sha->branch``) leaves the source of truth (the gate / the Plane status) changed while the task stays put — a silently stuck task (incident ORCH-044). None of the existing resilience layers (``requeue_running_jobs``, orphan-recovery, events de-dup, ``ci_poll``) reconcile this "source-of-truth != task-stage" drift; they all work at the jobs/agent_runs level, not the stage transition. This module is a background daemon thread (modelled on ``queue_worker``) that periodically replays the missed transition through the SAME standard gates / handlers a webhook would use: * **F-1 gate-side** (``reconcile_gate_once``): for each task with ``stage != 'done'``, no active job and ``age(updated_at) >= grace_for_stage(stage)``, do a read-only pre-evaluation of the stage's canonical quality gate; green -> advance through the unchanged ``stage_engine.advance_stage(..., finished_agent=None)``; red -> silence (no advance, no notification). ``analysis`` is NOT reconciled here (human gate; owned by F-2). **ORCH-060:** before the gate is even evaluated, F-1 skips (silently) tasks that are waiting for a human — Guard 1: escalated by developer retries (``developer_retry_count >= MAX_DEVELOPER_RETRIES``, deterministic, local; closes the ET-013 bounce loop) checked first, then Guard 2: an explicit Plane ``Blocked`` / ``Needs Input`` state (Variant A — networked, never-raise -> conservative skip). * **F-2 plane-side** (``reconcile_plane_once``): poll the Plane API per project (``list_issues_by_state``) and replay In Progress / Approved / Rejected through ``webhooks.plane.handle_status_start`` / ``handle_verdict`` (no logic duplicated). Invariants: source of truth is the gate / Plane (not the event); advance only via ``advance_stage``; idempotency (active-job guard + atomic create-claim + grace + ``max_concurrency=1``); never-raise per unit of work; silence when in sync; restart-safe; kill-switch ``ORCH_RECONCILE_ENABLED`` (+ ``ORCH_RECONCILE_PLANE_ENABLED`` mutes only F-2). The DB schema and the registries (``STAGE_TRANSITIONS`` / ``QG_CHECKS``) are unchanged. See docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md and the cross-cutting docs/architecture/adr/adr-0007-reconciler.md. """ import asyncio import json import logging import threading from datetime import datetime, timezone from .config import settings from .db import ( get_active_tasks_for_reconcile, get_task_by_plane_id, has_active_job_for_task, ) from .stage_engine import ( advance_if_gate_passed, developer_retry_count, MAX_DEVELOPER_RETRIES, ) from .stages import get_qg_for_stage from .plane_sync import ( fetch_issue_state, get_project_states, get_project_state_groups, list_issues_by_state, ) from .webhooks.plane import handle_status_start, handle_verdict from .notifications import send_telegram, link_for from . import projects from . import task_deps logger = logging.getLogger("orchestrator.reconciler") # ORCH-086 (D3): sentinel distinguishing "caller did not pass a pre-resolved # state_uuid" (Guard 2 self-resolves, backward-compatible 1-arg call) from an # explicit ``None`` (Plane unreachable -> conservative skip). _UNSET = object() def _parse_grace_overrides(raw: str) -> dict[str, int]: """Parse ``reconcile_grace_overrides_json`` into {stage: seconds}. Invalid / non-object JSON -> {} (caller falls back to the default grace), mirroring the never-raise contract of ``agent_timeout_overrides_json``. """ if not raw or not raw.strip(): return {} try: data = json.loads(raw) except (ValueError, TypeError) as e: logger.warning(f"reconcile_grace_overrides_json is not valid JSON, ignoring: {e}") return {} if not isinstance(data, dict): logger.warning("reconcile_grace_overrides_json must be a JSON object, ignoring") return {} out: dict[str, int] = {} for k, v in data.items(): try: out[str(k)] = int(v) except (ValueError, TypeError): logger.warning(f"reconcile_grace_overrides_json[{k}] is not an int, ignoring") return out def grace_for_stage(stage: str) -> int: """Per-stage "stuck" threshold (seconds): override from JSON, else default.""" overrides = _parse_grace_overrides(settings.reconcile_grace_overrides_json) return overrides.get(stage, settings.reconcile_grace_default_s) def _age_seconds_iso(ts: str) -> float | None: """Age in seconds of a Plane ISO-8601 timestamp (e.g. issue.updated_at). Returns None when the value is missing / unparseable (caller decides the fallback). Handles a trailing 'Z' and treats naive timestamps as UTC. """ if not ts: return None try: text = ts.strip() if text.endswith("Z"): text = text[:-1] + "+00:00" dt = datetime.fromisoformat(text) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return (datetime.now(timezone.utc) - dt).total_seconds() except Exception: return None class Reconciler: """Background daemon that reconciles webhook-induced stage drift. Modelled on ``QueueWorker``: a plain ``threading.Thread(daemon=True)`` + ``threading.Event`` for a clean stop. No correctness-critical state is held in memory — every tick re-reads the DB / Plane; the observability counters (``last_run_ts`` / ``unblocked_total`` / ``last_unblocked``) are best-effort and may reset on restart (AC-11 allows this). """ def __init__(self, interval_s: float | None = None): self.interval_s = ( interval_s if interval_s is not None else settings.reconcile_interval_s ) self._stop = threading.Event() self._thread: threading.Thread | None = None # Best-effort observability (F-4). self.last_run_ts: float | None = None self.unblocked_total: int = 0 self.last_unblocked: str | None = None # ORCH-068 observability: terminal-state skips and dedup suppressions. self.skipped_terminal_total: int = 0 self.deduped_total: int = 0 # ORCH-068 (TR-3): in-memory dedup guard {issue_id -> last unblocked # state uuid}. Best-effort (resets on restart, like unblocked_total); # suppresses a repeat unblock notification for the same issue+state. self._unblock_dedup: dict[str, str] = {} # -- F-1: gate-side ---------------------------------------------------- def reconcile_gate_once(self) -> None: """One F-1 pass over all non-terminal tasks (per-task never-raise).""" if not settings.reconcile_enabled: return for task in get_active_tasks_for_reconcile(): try: self._reconcile_gate_task(task) except Exception as e: # noqa: BLE001 - isolate one task's failure logger.error( f"reconciler F-1: task {task.get('id')} " f"(stage={task.get('stage')}) failed: {e}" ) # ORCH-026 (B-3) backstop: surface ANY dependency deadlock in the declared # graph, even one whose tasks are not individually evaluated above (e.g. no # active queued job). One alert per cycle; never-raise. if settings.task_deps_enabled: try: cyc = task_deps.find_any_cycle() if cyc: task_deps.handle_cycle(cyc) except Exception as e: # noqa: BLE001 - never break the sweep logger.error(f"reconciler F-1: cycle backstop failed: {e}") def _reconcile_gate_task(self, task: dict) -> None: task_id = task["id"] stage = task["stage"] # AC-16: analysis is a human gate -> owned by F-2, never F-1. if stage == "analysis": return # ORCH-086 D2 (DB-side terminal drift): ``get_active_tasks_for_reconcile`` # filters ``stage != 'done'`` but NOT ``cancelled``. A task already # terminal in the orchestrator DB is fully in sync by definition -> skip # before any gate/network work, mirroring the F-2 terminal-skip counter # (single semantics with ``_reconcile_plane_issue``). Local, no network. if stage in ("done", "cancelled"): self.skipped_terminal_total += 1 return # created / done have no gate to evaluate. if get_qg_for_stage(stage) is None: return # AC-3: a queued/running job means the task is legitimately in flight (or # a live webhook just enqueued one) -> do not touch it. if has_active_job_for_task(task_id): return # AC-5: respect the per-stage grace ("stuck", not just busy). age_s = task.get("age_s") or 0 if age_s < grace_for_stage(stage): return # ORCH-060 Guard 1: escalated tasks (developer retries reached the cap) are # terminal — they wait for a human, not the sweeper. Without this, a task # whose CI is green but whose reviewer kept sending REQUEST_CHANGES until the # cap would be re-unblocked every tick (incident ET-013, infinite bounce). # Deterministic, local SQL, no network — and checked FIRST (cheapest). if developer_retry_count(task_id) >= MAX_DEVELOPER_RETRIES: return # ORCH-086 D1: single networked resolve per task per tick, AFTER the cheap # local guards (so busy/young/escalated tasks never hit Plane). Feeds the # Plane-side terminal-skip (D2), Guard 2 (D3) and the state_uuid handed to # _note_unblock (D4) — no duplicate fetch. states, groups, state_uuid = self._resolve_issue_status(task) # ORCH-086 D2 (Plane-side terminal-skip), UNCONDITIONAL (not gated by # reconcile_skip_blocked_enabled, which gates ONLY Guard 2). A task whose # Plane status is terminal (group completed/cancelled, or the logical # done/cancelled fallback) is fully in sync -> never a real unblock. # Runs BEFORE Guard 2 so terminal tasks correctly bump skipped_terminal_total # instead of being swallowed by Guard 2's conservative path. Closes the F-1 # gap of ORCH-068 (which only covered F-2); fixes the spurious # "ET-002 ... разблокирована" notification. if self._is_terminal_state(state_uuid, states, groups): self.skipped_terminal_total += 1 return # ORCH-060 Guard 2: respect an explicit human gate (Blocked / Needs Input). # Reuses the D1 resolve (ORCH-086 D3) so the tick makes a single fetch. if self._is_blocked_or_needs_input(task, states, state_uuid): return # ORCH-026 Guard 3 (B-5): a task blocked by an unfinished declared # dependency is legitimately waiting, NOT stuck -> F-1 must not advance it # past its depends-on (mirrors the Blocked/Needs-Input skip). Local DB, # never-raise (is_task_ready fails OPEN). If the wait is actually a # dependency DEADLOCK (cycle), surface it (Blocked + alert) once. if settings.task_deps_enabled: ready, _waiting = task_deps.is_task_ready(task_id) if not ready: cyc = task_deps.detect_cycle(task_id) if cyc: task_deps.handle_cycle(cyc) return result = advance_if_gate_passed( task_id, stage, task["repo"], task.get("work_item_id") or "", task.get("branch") or "", ) if result is not None and getattr(result, "advanced", False): # ORCH-086 D4: pass state_uuid so the in-memory dedup guard covers F-1 # too (a repeat tick for the same issue+state is suppressed; survives # the "first pass after restart" symptom together with the D2 skip). self._note_unblock( task.get("work_item_id") or str(task_id), stage, state_uuid ) def _resolve_issue_status( self, task: dict ) -> tuple[dict, dict, str | None]: """ORCH-086 D1: one networked resolve per task per tick. Returns ``(states, groups, current_state_uuid)``. A single ``fetch_issue_state`` plus the cached (ORCH-068 TTL) ``get_project_states`` / ``get_project_state_groups``. The result feeds the terminal-skip (D2), Guard 2 (D3) and the ``state_uuid`` handed to ``_note_unblock`` (D4), so the tick never fetches the same issue twice. **never-raise.** On any failure / unresolved project / missing state -> ``({} or states, {} or groups, None)`` so callers apply their conservative fallback (terminal-skip = not terminal; Guard 2 = skip). """ try: proj = projects.get_project_by_repo(task.get("repo") or "") if proj is None: return {}, {}, None pid = proj.plane_project_id states = get_project_states(pid) groups = get_project_state_groups(pid) issue_id = task.get("plane_id") or task.get("plane_issue_id") or "" state_uuid = fetch_issue_state(issue_id, pid) return states or {}, groups or {}, state_uuid except Exception as e: # noqa: BLE001 - never break the tick logger.warning( f"reconciler D1: status resolve failed for task " f"{task.get('id')}, treating as unresolved: {e}" ) return {}, {}, None def _is_blocked_or_needs_input( self, task: dict, states: dict | None = None, state_uuid=_UNSET ) -> bool: """Guard 2 (ORCH-060 + ORCH-066): is this issue waiting for a human OR in an active orchestrator wait that F-1 must not "revive"? Variant A (no schema migration): resolve the task's Plane project, fetch the issue's current state uuid and compare against a skip-set. ``tasks`` has no status column, so the live Plane state is the source of truth. Skip-set = explicit human gates (``blocked`` / ``needs_input``) PLUS the ORCH-066 active waits (``awaiting_deploy`` / ``deploying`` / ``monitoring``, BR-13). **Anti-regress (CRITICAL):** the active-wait keys alias onto ``in_review`` / ``in_progress`` / ``done`` on a project that did not create them. Adding them verbatim would make F-1 wrongly skip enduro In Progress / Done tasks (regression of ORCH-053/060). So they are included ONLY when DISTINCT from the project's base working statuses (i.e. actually created as separate statuses): enduro collapses them to {} -> zero regress; orchestrator keeps three real statuses -> BR-13. **Never-raise, conservative fallback.** Any error / unresolved project / missing state -> return ``True`` (treat as "possibly blocked" -> skip): NOT unblocking a task is always safe, whereas wrongly unblocking a human-gated task re-introduces the bounce we are trying to kill. The sub-flag ``reconcile_skip_blocked_enabled`` disables ONLY this networked guard (escape hatch for a Plane outage); Guard 1 stays active. **ORCH-086 D3:** the production caller (``_reconcile_gate_task``) passes the already-resolved ``(states, state_uuid)`` from the single D1 fetch, so the tick does not hit Plane twice. When ``state_uuid`` is left ``_UNSET`` (direct/legacy 1-arg call) Guard 2 self-resolves via ``_resolve_issue_status`` — behaviour identical to the pre-ORCH-086 code. """ if not settings.reconcile_skip_blocked_enabled: return False try: if state_uuid is _UNSET: # Backward-compatible self-resolve (direct callers / tests). states, _groups, state_uuid = self._resolve_issue_status(task) if not states or state_uuid is None: return True # unresolved project / Plane unreachable -> conservative skip cur = state_uuid # ORCH-066 BR-13: active orchestrator waits, minus base working # statuses so aliased (enduro) keys never widen the skip-set. base_working = { states.get(k) for k in ( "backlog", "todo", "in_progress", "in_review", "review", "architecture", "development", "testing", "approved", "rejected", "done", ) } extra_waits = { states.get("awaiting_deploy"), states.get("deploying"), states.get("monitoring"), } - base_working - {None} skip_set = {states.get("blocked"), states.get("needs_input")} | extra_waits return cur in skip_set except Exception as e: # noqa: BLE001 - never break the tick logger.warning( f"reconciler Guard 2: blocked-check failed for task " f"{task.get('id')}, skipping conservatively: {e}" ) return True # -- F-2: plane-side --------------------------------------------------- def reconcile_plane_once(self) -> None: """One F-2 pass: poll Plane per project and replay missed transitions.""" if not settings.reconcile_enabled or not settings.reconcile_plane_enabled: return for proj in projects.PROJECTS: try: self._reconcile_plane_project(proj) except Exception as e: # noqa: BLE001 - isolate one project's failure logger.error(f"reconciler F-2: project {proj.repo} failed: {e}") def _reconcile_plane_project(self, proj) -> None: pid = proj.plane_project_id # Resolve the actionable state uuids per-project (never hardcode). # ORCH-066 (AC-19): the start/resume trigger is `To Analyse` (was # In Progress). On a project without that status, `to_analyse` aliases to # the project's own `in_progress` UUID, so enduro behaviour is identical # (and `list_issues_by_state` deduplicates the uuid via its internal set). states = get_project_states(pid) # ORCH-066 (AC-19): start/resume trigger is `To Analyse`. to_analyse = states["to_analyse"] # ORCH-068 D1: {uuid -> group} from the SAME cache record (no extra # fetch); empty when the API was unreachable -> per-issue fallback by key. groups = get_project_state_groups(pid) approved = states["approved"] rejected = states["rejected"] issues = list_issues_by_state(pid, [to_analyse, approved, rejected]) for issue in issues: try: self._reconcile_plane_issue( issue, pid, to_analyse, approved, rejected, states, groups ) except Exception as e: # noqa: BLE001 - isolate one issue's failure logger.error( f"reconciler F-2: issue {issue.get('id')} failed: {e}" ) def _is_terminal_state( self, state_uuid: str, states: dict, groups: dict ) -> bool: """ORCH-068 D1: is ``state_uuid`` a terminal (completed/cancelled) state? Primary discriminator is the Plane **state group** (project-independent, robust to UUID aliasing after status renames): ``group`` in ``{completed, cancelled}`` -> terminal. When the group is unavailable (API gave no ``group`` / we fell back to ``_DEFAULT_STATES``), fall back to the logical terminal keys ``done`` / ``cancelled``. """ if not state_uuid: return False grp = groups.get(state_uuid) if grp: return grp in {"completed", "cancelled"} # Fallback (group unknown): logical terminal keys for this project. return state_uuid in {states.get("done"), states.get("cancelled")} def _reconcile_plane_issue( self, issue: dict, project_id: str, to_analyse: str, approved: str, rejected: str, states: dict, groups: dict, ) -> None: issue_id = str(issue.get("id") or "") if not issue_id: return state = issue.get("state") new_state = state.get("id") if isinstance(state, dict) else state # ORCH-068 D1: a terminal issue (Done / Cancelled) is fully in sync by # definition -> never actionable. Excluded per-issue (not by narrowing # `wanted`) because UUID aliasing can make a terminal uuid collide with # an actionable one — only the state GROUP disentangles them. Restores # the silence-when-in-sync invariant (AC-1/AC-2). if self._is_terminal_state(new_state, states, groups): self.skipped_terminal_total += 1 return # Grace ("lost, not merely delayed"): use the issue's own updated_at age. # A missing/unparseable timestamp is treated as old enough (the active-job # guard + atomic create-claim still prevent doubling). age = _age_seconds_iso(issue.get("updated_at") or "") if age is not None and age < settings.reconcile_grace_default_s: return task = get_task_by_plane_id(issue_id) # AC-3/AC-4: a live webhook is in flight for this task -> skip. if task is not None and has_active_job_for_task(task["id"]): return # issue_data in the shape the plane handlers expect; missing name / # description are pulled by the handlers themselves (fetch_issue_fields). issue_data = { "id": issue_id, "state": {"id": new_state}, "project": project_id, "name": issue.get("name", ""), "description_stripped": issue.get("description_stripped", ""), } if new_state == to_analyse and task is None: # To Analyse without a task -> start the pipeline (lost start webhook). # ORCH-068 D2: confirm a REAL change (the task now exists) before # announcing — a no-op dispatch stays silent. self._dispatch(handle_status_start, issue_data, project_id) if get_task_by_plane_id(issue_id) is not None: self._note_unblock(issue_id, "analysis", new_state) elif new_state == to_analyse and task is not None: # To Analyse with an existing (idle) task -> resume the analyst from # Needs Input (lost resume webhook). handle_status_start applies its # own busy-guard / start-vs-resume fork. self._dispatch(handle_status_start, issue_data, project_id) self._note_unblock(task.get("work_item_id") or issue_id, task["stage"], new_state) elif new_state == approved and task is not None: # Approved but the stage never advanced -> replay the verdict. stage_before = task["stage"] self._dispatch(handle_verdict, issue_data, project_id, approved=True) if self._stage_changed(issue_id, stage_before): self._note_unblock( task.get("work_item_id") or issue_id, stage_before, new_state ) elif new_state == rejected and task is not None: # Rejected but never rolled back -> replay the verdict. stage_before = task["stage"] self._dispatch(handle_verdict, issue_data, project_id, approved=False) if self._stage_changed(issue_id, stage_before): self._note_unblock( task.get("work_item_id") or issue_id, stage_before, new_state ) # else: everything is in sync -> silence (AC-10). @staticmethod def _stage_changed(issue_id: str, stage_before: str) -> bool: """ORCH-068 D2: did the dispatched handler actually move the stage? Re-reads the task after ``_dispatch`` and compares to the captured ``stage_before``. A no-op replay (the task was already in the target state) leaves the stage unchanged -> no unblock notification. """ after = get_task_by_plane_id(issue_id) stage_after = after["stage"] if after else stage_before return stage_after != stage_before @staticmethod def _dispatch(coro_fn, *args, **kwargs) -> None: """Run an async plane handler from this sync thread. ``asyncio.run`` spins a fresh event loop per call, which is required because ``handle_verdict -> _try_advance_stage`` uses ``asyncio.to_thread`` (needs a running loop). The handlers are REUSED verbatim — no pipeline logic is duplicated here. """ asyncio.run(coro_fn(*args, **kwargs)) # -- observability (F-4) ---------------------------------------------- def _note_unblock( self, work_item_id: str, stage: str, state_uuid: str | None = None ) -> None: """Record + announce that a stuck task was unblocked (AC-12). Fires only on an actual state change (an advance / replayed transition), never per idle tick, so it does not conflict with AC-9 / AC-10. ORCH-068 (TR-3): an in-memory dedup guard keyed by ``issue_id -> state_uuid`` suppresses a repeat notification for the same issue+state if a future no-op path ever reaches here. ``state_uuid`` is the issue's Plane state; ``work_item_id`` doubles as the issue id for the pipeline-start case (which has no work item yet). """ dedup_key = work_item_id if state_uuid is not None and self._unblock_dedup.get(dedup_key) == state_uuid: self.deduped_total += 1 return if state_uuid is not None: self._unblock_dedup[dedup_key] = state_uuid self.unblocked_total += 1 self.last_unblocked = work_item_id logger.info( f"reconciler: {work_item_id} {stage} разблокирована (потерян webhook)" ) if settings.reconcile_notify_unblock: try: send_telegram( f"\U0001f527 reconciler: {link_for(work_item_id)} {stage} " f"разблокирована (потерян webhook)" ) except Exception as e: # noqa: BLE001 - never break the tick logger.warning(f"reconciler: unblock telegram failed: {e}") # -- loop / lifecycle -------------------------------------------------- def _tick(self) -> None: if settings.reconcile_enabled: self.reconcile_gate_once() # F-1 if settings.reconcile_plane_enabled: self.reconcile_plane_once() # F-2 self.last_run_ts = datetime.now(timezone.utc).timestamp() def _run(self) -> None: logger.info( f"Reconciler started (interval={self.interval_s}s, " f"enabled={settings.reconcile_enabled}, " f"plane_enabled={settings.reconcile_plane_enabled})" ) while not self._stop.is_set(): try: self._tick() except Exception as e: # noqa: BLE001 - outer never-raise logger.error(f"Reconciler loop error: {e}") self._stop.wait(self.interval_s) logger.info("Reconciler stopped") def start(self) -> None: """Start the daemon thread (idempotent: a live thread is a no-op).""" if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread( target=self._run, name="reconciler", daemon=True ) self._thread.start() def stop(self, timeout: float = 5.0) -> None: self._stop.set() if self._thread: self._thread.join(timeout=timeout) def status(self) -> dict: """Reconcile snapshot for /queue observability.""" return { "enabled": settings.reconcile_enabled, "plane_enabled": settings.reconcile_plane_enabled, "interval": self.interval_s, "last_run_ts": self.last_run_ts, "unblocked_total": self.unblocked_total, "last_unblocked": self.last_unblocked, # ORCH-068 observability. "skipped_terminal_total": self.skipped_terminal_total, "deduped_total": self.deduped_total, } # Module-level singleton used by the FastAPI lifespan. reconciler = Reconciler()