"""ORCH-053: stuck-task reconciler (sweeper for lost webhooks). The pipeline advances ONLY on incoming webhooks (Plane status / Gitea CI/PR). A dropped event (502 on a rebuilding instance, no Plane/Gitea retries, an unresolved ``sha->branch``) leaves the source of truth (the gate / the Plane status) changed while the task stays put — a silently stuck task (incident ORCH-044). None of the existing resilience layers (``requeue_running_jobs``, orphan-recovery, events de-dup, ``ci_poll``) reconcile this "source-of-truth != task-stage" drift; they all work at the jobs/agent_runs level, not the stage transition. This module is a background daemon thread (modelled on ``queue_worker``) that periodically replays the missed transition through the SAME standard gates / handlers a webhook would use: * **F-1 gate-side** (``reconcile_gate_once``): for each task with ``stage != 'done'``, no active job and ``age(updated_at) >= grace_for_stage(stage)``, do a read-only pre-evaluation of the stage's canonical quality gate; green -> advance through the unchanged ``stage_engine.advance_stage(..., finished_agent=None)``; red -> silence (no advance, no notification). ``analysis`` is NOT reconciled here (human gate; owned by F-2). **ORCH-060:** before the gate is even evaluated, F-1 skips (silently) tasks that are waiting for a human — Guard 1: escalated by developer retries (``developer_retry_count >= MAX_DEVELOPER_RETRIES``, deterministic, local; closes the ET-013 bounce loop) checked first, then Guard 2: an explicit Plane ``Blocked`` / ``Needs Input`` state (Variant A — networked, never-raise -> conservative skip). * **F-2 plane-side** (``reconcile_plane_once``): poll the Plane API per project (``list_issues_by_state``) and replay In Progress / Approved / Rejected through ``webhooks.plane.handle_status_start`` / ``handle_verdict`` (no logic duplicated). Invariants: source of truth is the gate / Plane (not the event); advance only via ``advance_stage``; idempotency (active-job guard + atomic create-claim + grace + ``max_concurrency=1``); never-raise per unit of work; silence when in sync; restart-safe; kill-switch ``ORCH_RECONCILE_ENABLED`` (+ ``ORCH_RECONCILE_PLANE_ENABLED`` mutes only F-2). The DB schema and the registries (``STAGE_TRANSITIONS`` / ``QG_CHECKS``) are unchanged. See docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md and the cross-cutting docs/architecture/adr/adr-0007-reconciler.md. """ import asyncio import json import logging import threading from datetime import datetime, timezone from .config import settings from .db import ( get_active_tasks_for_reconcile, get_task_by_plane_id, has_active_job_for_task, ) from .stage_engine import ( advance_if_gate_passed, developer_retry_count, MAX_DEVELOPER_RETRIES, ) from .stages import get_qg_for_stage from .plane_sync import fetch_issue_state, get_project_states, list_issues_by_state from .webhooks.plane import handle_status_start, handle_verdict from .notifications import send_telegram from . import projects logger = logging.getLogger("orchestrator.reconciler") def _parse_grace_overrides(raw: str) -> dict[str, int]: """Parse ``reconcile_grace_overrides_json`` into {stage: seconds}. Invalid / non-object JSON -> {} (caller falls back to the default grace), mirroring the never-raise contract of ``agent_timeout_overrides_json``. """ if not raw or not raw.strip(): return {} try: data = json.loads(raw) except (ValueError, TypeError) as e: logger.warning(f"reconcile_grace_overrides_json is not valid JSON, ignoring: {e}") return {} if not isinstance(data, dict): logger.warning("reconcile_grace_overrides_json must be a JSON object, ignoring") return {} out: dict[str, int] = {} for k, v in data.items(): try: out[str(k)] = int(v) except (ValueError, TypeError): logger.warning(f"reconcile_grace_overrides_json[{k}] is not an int, ignoring") return out def grace_for_stage(stage: str) -> int: """Per-stage "stuck" threshold (seconds): override from JSON, else default.""" overrides = _parse_grace_overrides(settings.reconcile_grace_overrides_json) return overrides.get(stage, settings.reconcile_grace_default_s) def _age_seconds_iso(ts: str) -> float | None: """Age in seconds of a Plane ISO-8601 timestamp (e.g. issue.updated_at). Returns None when the value is missing / unparseable (caller decides the fallback). Handles a trailing 'Z' and treats naive timestamps as UTC. """ if not ts: return None try: text = ts.strip() if text.endswith("Z"): text = text[:-1] + "+00:00" dt = datetime.fromisoformat(text) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return (datetime.now(timezone.utc) - dt).total_seconds() except Exception: return None class Reconciler: """Background daemon that reconciles webhook-induced stage drift. Modelled on ``QueueWorker``: a plain ``threading.Thread(daemon=True)`` + ``threading.Event`` for a clean stop. No correctness-critical state is held in memory — every tick re-reads the DB / Plane; the observability counters (``last_run_ts`` / ``unblocked_total`` / ``last_unblocked``) are best-effort and may reset on restart (AC-11 allows this). """ def __init__(self, interval_s: float | None = None): self.interval_s = ( interval_s if interval_s is not None else settings.reconcile_interval_s ) self._stop = threading.Event() self._thread: threading.Thread | None = None # Best-effort observability (F-4). self.last_run_ts: float | None = None self.unblocked_total: int = 0 self.last_unblocked: str | None = None # -- F-1: gate-side ---------------------------------------------------- def reconcile_gate_once(self) -> None: """One F-1 pass over all non-terminal tasks (per-task never-raise).""" if not settings.reconcile_enabled: return for task in get_active_tasks_for_reconcile(): try: self._reconcile_gate_task(task) except Exception as e: # noqa: BLE001 - isolate one task's failure logger.error( f"reconciler F-1: task {task.get('id')} " f"(stage={task.get('stage')}) failed: {e}" ) def _reconcile_gate_task(self, task: dict) -> None: task_id = task["id"] stage = task["stage"] # AC-16: analysis is a human gate -> owned by F-2, never F-1. if stage == "analysis": return # created / done have no gate to evaluate. if get_qg_for_stage(stage) is None: return # AC-3: a queued/running job means the task is legitimately in flight (or # a live webhook just enqueued one) -> do not touch it. if has_active_job_for_task(task_id): return # AC-5: respect the per-stage grace ("stuck", not just busy). age_s = task.get("age_s") or 0 if age_s < grace_for_stage(stage): return # ORCH-060 Guard 1: escalated tasks (developer retries reached the cap) are # terminal — they wait for a human, not the sweeper. Without this, a task # whose CI is green but whose reviewer kept sending REQUEST_CHANGES until the # cap would be re-unblocked every tick (incident ET-013, infinite bounce). # Deterministic, local SQL, no network — and checked FIRST (cheapest). if developer_retry_count(task_id) >= MAX_DEVELOPER_RETRIES: return # ORCH-060 Guard 2: respect an explicit human gate (Blocked / Needs Input). # Networked; runs after Guard 1 so escalated tasks never hit Plane. if self._is_blocked_or_needs_input(task): return result = advance_if_gate_passed( task_id, stage, task["repo"], task.get("work_item_id") or "", task.get("branch") or "", ) if result is not None and getattr(result, "advanced", False): self._note_unblock(task.get("work_item_id") or str(task_id), stage) def _is_blocked_or_needs_input(self, task: dict) -> bool: """ORCH-060 Guard 2: is this issue in an explicit human Plane gate? Variant A (no schema migration): resolve the task's Plane project, fetch the issue's current state uuid and compare against the project's ``blocked`` / ``needs_input`` states. ``tasks`` has no status column, so the live Plane state is the source of truth. **Never-raise, conservative fallback.** Any error / unresolved project / missing state -> return ``True`` (treat as "possibly blocked" -> skip): NOT unblocking a task is always safe, whereas wrongly unblocking a human-gated task re-introduces the bounce we are trying to kill. The sub-flag ``reconcile_skip_blocked_enabled`` disables ONLY this networked guard (escape hatch for a Plane outage); Guard 1 stays active. """ if not settings.reconcile_skip_blocked_enabled: return False try: proj = projects.get_project_by_repo(task.get("repo") or "") if proj is None: return True # cannot resolve the project -> conservative skip pid = proj.plane_project_id states = get_project_states(pid) issue_id = task.get("plane_id") or task.get("plane_issue_id") or "" cur = fetch_issue_state(issue_id, pid) if cur is None: return True # Plane unreachable / no state -> conservative skip return cur in {states.get("blocked"), states.get("needs_input")} except Exception as e: # noqa: BLE001 - never break the tick logger.warning( f"reconciler Guard 2: blocked-check failed for task " f"{task.get('id')}, skipping conservatively: {e}" ) return True # -- F-2: plane-side --------------------------------------------------- def reconcile_plane_once(self) -> None: """One F-2 pass: poll Plane per project and replay missed transitions.""" if not settings.reconcile_enabled or not settings.reconcile_plane_enabled: return for proj in projects.PROJECTS: try: self._reconcile_plane_project(proj) except Exception as e: # noqa: BLE001 - isolate one project's failure logger.error(f"reconciler F-2: project {proj.repo} failed: {e}") def _reconcile_plane_project(self, proj) -> None: pid = proj.plane_project_id # Resolve the actionable state uuids per-project (never hardcode). states = get_project_states(pid) in_progress = states["in_progress"] approved = states["approved"] rejected = states["rejected"] issues = list_issues_by_state(pid, [in_progress, approved, rejected]) for issue in issues: try: self._reconcile_plane_issue( issue, pid, in_progress, approved, rejected ) except Exception as e: # noqa: BLE001 - isolate one issue's failure logger.error( f"reconciler F-2: issue {issue.get('id')} failed: {e}" ) def _reconcile_plane_issue( self, issue: dict, project_id: str, in_progress: str, approved: str, rejected: str, ) -> None: issue_id = str(issue.get("id") or "") if not issue_id: return state = issue.get("state") new_state = state.get("id") if isinstance(state, dict) else state # Grace ("lost, not merely delayed"): use the issue's own updated_at age. # A missing/unparseable timestamp is treated as old enough (the active-job # guard + atomic create-claim still prevent doubling). age = _age_seconds_iso(issue.get("updated_at") or "") if age is not None and age < settings.reconcile_grace_default_s: return task = get_task_by_plane_id(issue_id) # AC-3/AC-4: a live webhook is in flight for this task -> skip. if task is not None and has_active_job_for_task(task["id"]): return # issue_data in the shape the plane handlers expect; missing name / # description are pulled by the handlers themselves (fetch_issue_fields). issue_data = { "id": issue_id, "state": {"id": new_state}, "project": project_id, "name": issue.get("name", ""), "description_stripped": issue.get("description_stripped", ""), } if new_state == in_progress and task is None: # In Progress without a task -> start the pipeline (lost start webhook). self._dispatch(handle_status_start, issue_data, project_id) self._note_unblock(issue_id, "analysis") elif new_state == approved and task is not None: # Approved but the stage never advanced -> replay the verdict. self._dispatch(handle_verdict, issue_data, project_id, approved=True) self._note_unblock(task.get("work_item_id") or issue_id, task["stage"]) elif new_state == rejected and task is not None: # Rejected but never rolled back -> replay the verdict. self._dispatch(handle_verdict, issue_data, project_id, approved=False) self._note_unblock(task.get("work_item_id") or issue_id, task["stage"]) # else: everything is in sync -> silence (AC-10). @staticmethod def _dispatch(coro_fn, *args, **kwargs) -> None: """Run an async plane handler from this sync thread. ``asyncio.run`` spins a fresh event loop per call, which is required because ``handle_verdict -> _try_advance_stage`` uses ``asyncio.to_thread`` (needs a running loop). The handlers are REUSED verbatim — no pipeline logic is duplicated here. """ asyncio.run(coro_fn(*args, **kwargs)) # -- observability (F-4) ---------------------------------------------- def _note_unblock(self, work_item_id: str, stage: str) -> None: """Record + announce that a stuck task was unblocked (AC-12). Fires only on an actual state change (an advance / replayed transition), never per idle tick, so it does not conflict with AC-9 / AC-10. """ self.unblocked_total += 1 self.last_unblocked = work_item_id logger.info( f"reconciler: {work_item_id} {stage} разблокирована (потерян webhook)" ) if settings.reconcile_notify_unblock: try: send_telegram( f"\U0001f527 reconciler: {work_item_id} {stage} " f"разблокирована (потерян webhook)" ) except Exception as e: # noqa: BLE001 - never break the tick logger.warning(f"reconciler: unblock telegram failed: {e}") # -- loop / lifecycle -------------------------------------------------- def _tick(self) -> None: if settings.reconcile_enabled: self.reconcile_gate_once() # F-1 if settings.reconcile_plane_enabled: self.reconcile_plane_once() # F-2 self.last_run_ts = datetime.now(timezone.utc).timestamp() def _run(self) -> None: logger.info( f"Reconciler started (interval={self.interval_s}s, " f"enabled={settings.reconcile_enabled}, " f"plane_enabled={settings.reconcile_plane_enabled})" ) while not self._stop.is_set(): try: self._tick() except Exception as e: # noqa: BLE001 - outer never-raise logger.error(f"Reconciler loop error: {e}") self._stop.wait(self.interval_s) logger.info("Reconciler stopped") def start(self) -> None: """Start the daemon thread (idempotent: a live thread is a no-op).""" if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread( target=self._run, name="reconciler", daemon=True ) self._thread.start() def stop(self, timeout: float = 5.0) -> None: self._stop.set() if self._thread: self._thread.join(timeout=timeout) def status(self) -> dict: """Reconcile snapshot for /queue observability.""" return { "enabled": settings.reconcile_enabled, "plane_enabled": settings.reconcile_plane_enabled, "interval": self.interval_s, "last_run_ts": self.last_run_ts, "unblocked_total": self.unblocked_total, "last_unblocked": self.last_unblocked, } # Module-level singleton used by the FastAPI lifespan. reconciler = Reconciler()