"""ORCH-065: job-reaper + proactive merge-lease reclaim background daemon. Three failure classes share one root cause — "the thread/process died while it still held captured state" — and one inert recovery layer (``requeue_running_jobs``) that only fires on a process restart: * **A — zombie jobs.** A job's terminal status (``done``/``queued``/``failed``) is written ONLY inside ``launcher._monitor_agent -> _finalize_job`` in the live process. If that thread/process dies between ``proc.wait()`` and the status write (crash, OOM, self-restart mid-deploy) the ``jobs`` row stays ``running`` forever. At ``max_concurrency=1`` one zombie blocks the claim of EVERY project's jobs -> the whole shared pipeline stalls. * **B — stuck merge-lease.** The file lease ``.merge-lease-.json`` (ORCH-043) is reclaimed only lazily, by TTL, and only when ANOTHER task tries to acquire it. Holder liveness (pid) is never probed, so a death with the lease held blocks foreign merges until the TTL expires. This module is a background daemon thread modelled on ``reconciler`` (``threading.Thread(daemon=True)`` + ``threading.Event``, start/stop in ``main.lifespan``, ``/queue`` snapshot, per-unit never-raise, kill-switch). Each tick: (1) scans ``running`` jobs and reaps the dead ones via three-tier liveness detection; (2) proactively reclaims dead/stale merge-leases (mechanism B) for the in-scope repos. Liveness (defense in depth, ADR-001 Р-1): * **Tier-1 (primary): dead pid.** ``jobs.pid`` (stamped by ``launcher._spawn``) probed with ``merge_gate.pid_alive``. A job is reaped only after ``reaper_dead_ticks`` (>=2) CONSECUTIVE dead-pid ticks — an in-memory streak counter kills false positives (AC-3); a live agent within its timeout is never reaped. * **Tier-2 (completion race): exit_code recorded but job still running.** This window is AMBIGUOUS — it is both "the monitor died between writing ``agent_runs.exit_code`` and ``_finalize_job``" AND "a LIVE monitor is still finalizing" (``_monitor_agent`` writes ``exit_code`` FIRST, then git commit/push (+PR), the БАГ-8 check and network Plane usage comments — seconds to tens of seconds — and ONLY THEN ``_try_advance_stage`` -> ``_finalize_job``). The agent pid is already dead in BOTH cases, so it cannot disambiguate. The reaper therefore treats it as a dead monitor (KNOWN outcome) only after a finalization grace: ``exit_code`` recorded for >= ``reaper_finalize_grace_s`` (a live finalizing monitor is NEVER reaped, FR-1.3/AC-3). Within the grace the row is left untouched. * **Tier-3 (backstop): age ceiling.** A job ``running`` longer than ``reaper_max_running_s`` (deliberately > max ``agent_timeout`` + grace) is reaped even when liveness cannot be determined (pid reused / unknown). Action on confirmed death reuses existing contracts (no new merge/stage logic): * The reaper's ONLY mutating write to a job row is the atomic terminal flip ``db.reap_running_job(... WHERE status='running')`` — so a late-arriving monitor / the startup ``requeue_running_jobs`` / a second tick can never double-process a row (AC-5; the loser sees ``rowcount==0``). * **exit0 (Tier-2): claim-BEFORE-act (ADR-001 Р-1).** The source of truth is the canonical quality gate, NOT "exit0". If the stage already advanced -> atomic ``done`` claim only (idempotent cleanup). Else evaluate the canonical QG READ-ONLY (no side effects, the reconciler pattern): red (e.g. the monitor died before git-push, so no artifact) -> failure path (no false ``done``); green -> atomically claim ``done`` FIRST, and only the claim winner then runs ``launcher._try_advance_stage`` (advance + ``enqueue_job`` of the next stage). A tick that loses the claim performs NO side effects, so a late-finalizing monitor / the startup ``requeue_running_jobs`` can never be double-advanced or double-enqueued. * **exit!=0 (Tier-2) / unknown outcome (Tier-1 dead pid, Tier-3 backstop):** ``attempts < max_attempts`` -> ``queued`` (mirrors ``requeue_running_jobs``); budget exhausted -> ``failed`` + Telegram. We never fabricate exit0. Invariants (ТЗ §8 / ADR-001): never-raise per unit of work; idempotency (atomic guard + gate-driven advance); restart-safe (the reaper starts AFTER the startup ``requeue_running_jobs``); silence when nothing is anomalous; the reaper NEVER restarts/kills the prod container and NEVER pushes ``main``. ``STAGE_TRANSITIONS`` / ``QG_CHECKS`` and every ``check_*`` signature are unchanged. See docs/work-items/ORCH-065/06-adr/ADR-001-job-reaper-and-lease-reclaim.md and the cross-cutting docs/architecture/adr/adr-0011-job-reaper-lease-reclaim.md. """ import logging import threading from datetime import datetime, timezone from .config import settings from .db import ( get_db, get_running_jobs, reap_running_job, ) from .stages import STAGE_TRANSITIONS, get_agent_for_stage logger = logging.getLogger("orchestrator.job_reaper") def reclaim_all_stale_leases() -> int: """Proactively reclaim dead/stale merge-leases for every in-scope repo. Used both at startup (``main.lifespan``, next to ``requeue_running_jobs``) and on every reaper tick (mechanism B). Iterates the merge-gate scope (``merge_gate_repos`` CSV, else self-hosting ``orchestrator``) and calls the never-raise ``merge_gate.reclaim_stale_lease`` per repo. Returns the number of leases actually reclaimed. Never raises (per-repo isolation). """ if not settings.lease_reclaim_enabled: return 0 reclaimed = 0 try: from . import merge_gate raw = (settings.merge_gate_repos or "").strip() if raw: repos = [r.strip() for r in raw.split(",") if r.strip()] else: from .qg.checks import SELF_HOSTING_REPO repos = [SELF_HOSTING_REPO] for repo in repos: try: if merge_gate.reclaim_stale_lease(repo): reclaimed += 1 except Exception as e: # noqa: BLE001 - isolate one repo's failure logger.error("lease-reclaim failed for repo %s: %s", repo, e) except Exception as e: # noqa: BLE001 - never-raise contract logger.error("reclaim_all_stale_leases error: %s", e) return reclaimed class JobReaper: """Background daemon that reaps zombie jobs and reclaims stale merge-leases. Modelled on ``Reconciler``: a ``threading.Thread(daemon=True)`` + a ``threading.Event`` for a clean stop. The only in-memory state is the best-effort Tier-1 dead-pid streak counter (``_streak``) and the observability counters (``reaped_total`` / ``last_reaped`` / ``lease_reclaimed_total`` / ``last_run_ts``); all reset on restart, which is safe because the startup ``requeue_running_jobs`` covers the restart path. """ def __init__(self, interval_s: float | None = None): self.interval_s = ( interval_s if interval_s is not None else settings.reaper_interval_s ) self._stop = threading.Event() self._thread: threading.Thread | None = None # Tier-1 anti-false-positive: {job_id: consecutive dead-pid ticks}. self._streak: dict[int, int] = {} # Best-effort observability (Р-6). self.last_run_ts: float | None = None self.reaped_total: int = 0 self.last_reaped: dict | None = None self.lease_reclaimed_total: int = 0 # -- A: zombie-job reaping -------------------------------------------- def reap_once(self) -> None: """One scan over all ``running`` jobs (per-job never-raise) + lease reclaim.""" if settings.reaper_enabled: try: running = get_running_jobs() except Exception as e: # noqa: BLE001 - never break the tick logger.error("reaper: get_running_jobs failed: %s", e) running = [] seen: set[int] = set() for job in running: jid = job.get("id") if jid is not None: seen.add(jid) try: self._reap_job(job) except Exception as e: # noqa: BLE001 - isolate one job's failure logger.error( "reaper: job %s (agent=%s) failed: %s", job.get("id"), job.get("agent"), e, ) # Forget streaks for rows that are no longer running (reaped / requeued # / finished by the monitor) so the dict cannot grow unbounded. self._streak = {k: v for k, v in self._streak.items() if k in seen} # Mechanism B: proactive stale/dead lease reclaim (own kill-switch). try: self.lease_reclaimed_total += reclaim_all_stale_leases() except Exception as e: # noqa: BLE001 - never break the tick logger.error("reaper: lease reclaim sweep failed: %s", e) def _reap_job(self, job: dict) -> None: """Apply the three-tier liveness policy to a single running job.""" from . import merge_gate job_id = job["id"] pid = job.get("pid") age = int(job.get("running_age_s") or 0) exit_code = job.get("exit_code") # from the LEFT JOIN on agent_runs # Tier-2: the process finished (exit_code recorded) but the job is still # 'running'. This is AMBIGUOUS: it is BOTH "the monitor died mid-finalize" # AND "a LIVE monitor is still finalizing" — _monitor_agent writes exit_code # FIRST, then does git commit/push (+PR), the БАГ-8 check, network Plane # usage comments (seconds..tens of seconds), and ONLY THEN _try_advance_stage # -> _finalize_job. The agent pid is already dead in BOTH cases, so pid can # NOT disambiguate. We treat it as a dead monitor (KNOWN outcome) only after # a finalization grace: exit_code must have been recorded for at least # `reaper_finalize_grace_s` (FR-1.3/AC-3 — a live finalizing monitor is never # reaped). Within the grace window we leave the row alone (and fall through to # the Tier-3 backstop only, which never trips before the grace given a sane # config where reaper_max_running_s > reaper_finalize_grace_s). if exit_code is not None: self._streak.pop(job_id, None) finished_age = job.get("finished_age_s") grace = int(settings.reaper_finalize_grace_s) if finished_age is not None and int(finished_age) >= grace: self._reap_known_outcome(job, int(exit_code)) return logger.info( "reaper: job %s exit_code=%s recorded %ss ago (< grace %ss) — " "deferring (monitor may still be finalizing)", job_id, exit_code, finished_age, grace, ) # fall through to the Tier-3 backstop guard below. else: # Tier-1: dead pid, only after `reaper_dead_ticks` consecutive dead ticks. if pid is not None and not merge_gate.pid_alive(pid): n = self._streak.get(job_id, 0) + 1 self._streak[job_id] = n if n >= max(int(settings.reaper_dead_ticks), 1): self._streak.pop(job_id, None) self._reap_unknown_outcome(job, reason=f"dead pid={pid}") return logger.info( "reaper: job %s pid=%s dead (streak %d/%d) — deferring", job_id, pid, n, settings.reaper_dead_ticks, ) else: # Alive / no pid -> reset the streak (must be CONSECUTIVE). self._streak.pop(job_id, None) # Tier-3: backstop ceiling (one-shot; reaps even when liveness is unknown). if age >= int(settings.reaper_max_running_s): self._streak.pop(job_id, None) self._reap_unknown_outcome( job, reason=f"backstop age={age}s>={settings.reaper_max_running_s}s" ) # -- reap actions ------------------------------------------------------ def _reap_known_outcome(self, job: dict, exit_code: int) -> None: """Tier-2: the agent's exit_code is known; drive the job's terminal status.""" if exit_code == 0: self._reap_exit0(job) else: self._reap_unknown_outcome(job, reason=f"exit={exit_code}") def _reap_exit0(self, job: dict) -> None: """Reap an exit0 Tier-2 job with claim-BEFORE-act (ADR-001 Р-1). The atomic ``reap_running_job`` claim (guard ``WHERE status='running'``) MUST precede any ``advance_stage`` / ``enqueue_job`` side effect, so a reaper tick that LOSES the row (to a late-finalizing monitor or the startup ``requeue_running_jobs``) performs NO side effects — no duplicate advance, no duplicate ``enqueue_job`` of the next stage (FR-1.2/AC-4). Because the claim flips the row OUT of 'running', we cannot run the advance first to learn the gate colour. Instead we evaluate the canonical quality gate READ-ONLY (no side effects — the pattern the reconciler uses) to choose the terminal status BEFORE claiming: * already advanced past this agent -> idempotent clean ``done`` (no advance); * gate green -> claim ``done`` first, THEN advance exactly once; * gate red (e.g. monitor died before git-push -> no artifact) -> NOT a real success: route to the retry/fail contract (never a false ``done``). """ job_id = job["id"] run_id = job.get("run_id") agent = job.get("agent") branch, stage, work_item_id = self._task_meta(job) candidates = {s for s in STAGE_TRANSITIONS if get_agent_for_stage(s) == agent} if stage is None or stage not in candidates: # Stage already advanced past this agent (or unknown) -> a clean 'done' # is correct WITHOUT re-advancing. Atomic claim only (idempotent cleanup). if reap_running_job(job_id, "done", run_id=run_id): self._note_reap(job, "done", reason="exit0, already advanced") return if not branch or not self._gate_is_green(stage, job, branch, work_item_id): # exit0 but the gate is red -> do NOT fabricate 'done'; treat as failure # (retry within budget, else failed + Telegram). self._reap_unknown_outcome(job, reason="exit0 but gate red") return # Gate green. CLAIM-BEFORE-ACT: own the row atomically FIRST. if not reap_running_job(job_id, "done", run_id=run_id): # Lost the race -> the winner (late monitor / startup requeue) owns the # advance; we do NOTHING (no duplicate side effects). return # We exclusively own the row now -> drive the gate-based advance exactly once. self._gate_driven_advance(job) self._note_reap(job, "done", reason="exit0, gate green") def _gate_is_green( self, stage: str, job: dict, branch: str, work_item_id: str | None ) -> bool: """Read-only canonical-QG evaluation for a reaped exit0 job (no side effects). Mirrors the reconciler's cheap pre-evaluation: dispatch the stage's QG via the SAME ``_run_qg`` the webhook path uses, returning its pass/fail WITHOUT running ``advance_stage`` (so no stage move / enqueue / notification happens here). A stage with no registered gate is treated as green (nothing blocks a clean 'done'). Never raises -> any error returns False (conservative: route to retry, never a false 'done'). """ try: from .stages import get_qg_for_stage from .stage_engine import _run_qg qg_name = get_qg_for_stage(stage) if not qg_name: return True passed, _reason = _run_qg(qg_name, job.get("repo"), work_item_id, branch) return bool(passed) except Exception as e: # noqa: BLE001 - never break the reap logger.warning( "reaper: gate pre-eval failed for job %s (stage=%s): %s", job.get("id"), stage, e, ) return False def _reap_unknown_outcome(self, job: dict, reason: str) -> None: """Tier-1/Tier-3 (or exit!=0): outcome not a clean success. Mirrors ``requeue_running_jobs`` / the permanent-failure contract: ``attempts < max_attempts`` -> ``queued`` (a retry); budget exhausted -> ``failed`` + Telegram. The terminal flip is the atomic ``reap_running_job`` guard, so a racing requeue/monitor never double-processes the row. """ job_id = job["id"] run_id = job.get("run_id") attempts = int(job.get("attempts") or 0) max_attempts = int(job.get("max_attempts") or 2) err = f"reaped: {reason} (run_id={run_id})" if attempts < max_attempts: if reap_running_job(job_id, "queued", run_id=run_id, error=err): self._note_reap(job, "queued", reason=reason) else: if reap_running_job(job_id, "failed", run_id=run_id, error=err): self._note_reap(job, "failed", reason=reason) self._notify_failed(job, reason) def _gate_driven_advance(self, job: dict) -> bool: """Idempotent, gate-driven stage advance for a reaped exit0 job. Returns True iff the stage is (or has become) advanced past this agent's stage — i.e. the canonical quality gate is satisfied and a clean ``done`` is correct. Returns False when the gate is still red (the caller then routes the job to the failure path instead of a false ``done``). The advance itself reuses the UNCHANGED ``launcher._try_advance_stage`` (which runs the canonical QG and the unified ``advance_stage``); the reaper never duplicates ``update_task_stage`` / ``enqueue_job``. """ agent = job.get("agent") repo = job.get("repo") run_id = job.get("run_id") branch, stage, _wid = self._task_meta(job) # Candidate stages whose finishing agent is THIS agent (deployer maps to # both 'testing' and 'deploy-staging', hence a set). candidates = {s for s in STAGE_TRANSITIONS if get_agent_for_stage(s) == agent} if stage is None or stage not in candidates: # Stage already advanced past this agent (or unknown) -> idempotent # cleanup: a clean 'done' is correct without re-advancing. return True if not branch: return False try: from .agents.launcher import launcher launcher._try_advance_stage(run_id, agent, repo, branch) except Exception as e: # noqa: BLE001 - never break the reap logger.error("reaper: gate-driven advance failed for job %s: %s", job.get("id"), e) return False # Re-read the stage: advanced out of the candidate set -> gate was green. _branch, new_stage, _wid2 = self._task_meta(job) return new_stage is None or new_stage not in candidates @staticmethod def _task_meta(job: dict) -> tuple[str | None, str | None, str | None]: """Resolve (branch, stage, work_item_id) for the job's task. Never raises.""" task_id = job.get("task_id") if not task_id: return None, None, None try: conn = get_db() row = conn.execute( "SELECT branch, stage, work_item_id FROM tasks WHERE id = ?", (task_id,), ).fetchone() conn.close() if not row: return None, None, None return row["branch"], row["stage"], row["work_item_id"] except Exception as e: # noqa: BLE001 - never-raise contract logger.warning("reaper: task lookup failed for job %s: %s", job.get("id"), e) return None, None, None def _notify_failed(self, job: dict, reason: str) -> None: try: from .notifications import send_telegram send_telegram( f"\U0001f6a8 reaper: job {job.get('id')} ({job.get('agent')}, " f"repo {job.get('repo')}) reaped as FAILED: {reason}" ) except Exception as e: # noqa: BLE001 - telegram best-effort logger.warning("reaper: failed-notify telegram error: %s", e) def _note_reap(self, job: dict, outcome: str, reason: str) -> None: """Record + log one successful reap (Р-6 observability).""" self.reaped_total += 1 self.last_reaped = { "job_id": job.get("id"), "agent": job.get("agent"), "outcome": outcome, } logger.warning( "reaper: job %s (agent=%s, repo=%s, run_id=%s, pid=%s) reaped -> %s (%s)", job.get("id"), job.get("agent"), job.get("repo"), job.get("run_id"), job.get("pid"), outcome, reason, ) # -- loop / lifecycle -------------------------------------------------- def _tick(self) -> None: try: self.reap_once() finally: self.last_run_ts = datetime.now(timezone.utc).timestamp() def _run(self) -> None: logger.info( "JobReaper started (interval=%ss, enabled=%s, dead_ticks=%s, " "max_running_s=%s, lease_reclaim=%s)", self.interval_s, settings.reaper_enabled, settings.reaper_dead_ticks, settings.reaper_max_running_s, settings.lease_reclaim_enabled, ) while not self._stop.is_set(): try: self._tick() except Exception as e: # noqa: BLE001 - outer never-raise logger.error("JobReaper loop error: %s", e) self._stop.wait(self.interval_s) logger.info("JobReaper stopped") def start(self) -> None: """Start the daemon thread (idempotent: a live thread is a no-op).""" if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread( target=self._run, name="job-reaper", daemon=True ) self._thread.start() def stop(self, timeout: float = 5.0) -> None: self._stop.set() if self._thread: self._thread.join(timeout=timeout) def status(self) -> dict: """Reaper snapshot for /queue observability (Р-6).""" return { "enabled": settings.reaper_enabled, "interval": self.interval_s, "last_run_ts": self.last_run_ts, "reaped_total": self.reaped_total, "last_reaped": self.last_reaped, "lease_reclaimed_total": self.lease_reclaimed_total, } # Module-level singleton used by the FastAPI lifespan. reaper = JobReaper()