"""ORCH-114 (adr-0045): durable transition-ownership lease + expected-stage CAS. Leaf module — pure, never-raise (pattern of ``serial_gate`` / ``coverage_gate`` / ``finalizer_liveness``: imports only ``db`` + ``config`` and lazily ``merge_gate.pid_alive`` / ``qg.checks.is_self_hosting_repo`` / ``notifications``; it NEVER imports ``stage_engine`` / ``launcher`` and talks to no network). The bug class it closes ----------------------- ``stage_engine.advance_stage`` is the single entry to side-effectful transitions (the heavy ``deploy-staging -> deploy`` edge sub-gates — security / merge-gate re-test / coverage / image-freshness — and the ``deploy -> done`` merge-verify: ``merge_pr`` / coverage-ratchet / proof-of-merge). It is RE-ENTERABLE: at least five actors (monitor / Plane-webhook / reconciler F-1 / job-reaper / deploy finalizer) can enter the SAME transition independently, and the stage write was a bare ``UPDATE … WHERE id=?`` with no compare-and-swap. Two concurrent — or a post-restart re-driven — entry therefore re-applied irreversible effects and produced contradictory outcomes (one path rolled back to ``development`` while another merged + finished — incident ORCH-111, job 1914 / PR #130). ORCH-113 closed only the in-memory, Tier-2, ``deploy-staging``-only slice of this; it is lost on restart. Two complementary layers (ADR-001 D1), both gated by one kill-switch: 1. **Durable lease (owner-exclusion on ENTRY).** A row in the additive ``transition_lease`` table (one per task) records "an actor owns this task's side-effectful transition". A second actor that sees a LIVE owner does not start the heavy sub-gates AT ALL (prevention, not post-hoc repair). 2. **Expected-stage CAS (atomicity on the WRITE).** ``update_task_stage_cas`` writes the stage only when the task is still at the expected stage; a lost race aborts with NO side effect. It also closes the six paths that write the stage in BYPASS of ``advance_stage`` (gitea / plane direct ``update_task_stage``). Liveness without a heartbeat (ADR-001 D3) ----------------------------------------- An owner is LIVE ⇔ ``owner_boot_id == `` AND ``merge_gate.pid_alive(owner_pid)``. There is NO heartbeat (a blocking 900 s merge re-test cannot beat one — the very argument ORCH-113 used to reject heartbeats). This makes restart recovery free: a new process has a new ``boot_id`` so every row written by a previous process is instantly stale and reclaimed (``recover_on_startup``). Within the one-process model every live owner shares the SAME boot id and pid, so a same-boot row is by definition owned by the (alive) current process; only a different-boot row can be stale — which is why the acquire/recover logic keys staleness on the boot id. No own TTL (ADR-001 D8): the lease's hard age ceiling IS the reaper Tier-3 backstop ``reaper_max_running_s`` (the reaper force-releases the lease when it reaps), so the cross-cutting budget invariant ORCH-065/109/110/113 is untouched. never-raise (ADR-001 D9 / NFR-1): every public function is isolated. The directional defaults: * ``acquire`` error -> ``False`` (busy): the caller DEFERS/aborts a side-effectful transition rather than risk a double effect (fail-CLOSED to no-double-effect). * ``is_held_by_live_owner`` error -> ``True`` (treat as held): the consulting reconciler / webhook / reaper conservatively DEFERS (the safe action; the reaper Tier-3 backstop still bounds a genuinely stuck task). * ``commit_stage_cas`` error on the CAS path -> ``False``: abort the write, never a blind overwrite. The hot claim path (``db.claim_next_job``) is deliberately NOT touched, so a lease bug can never wedge the shared queue of all projects (AC-8 ORCH-088 intact). See docs/work-items/ORCH-114/06-adr/ADR-001-transition-ownership-lease-and-stage-cas.md and the cross-cutting docs/architecture/adr/adr-0045-transition-ownership-lease-and-stage-cas.md. """ from __future__ import annotations import logging import os import secrets import threading from . import db from .config import settings logger = logging.getLogger("orchestrator.transition_lease") # Per-process boot nonce (ADR-001 D3). Generated ONCE at import: every lease row a # previous process wrote carries a DIFFERENT boot id and is therefore instantly # stale after a restart -> reclaimed by recover_on_startup / acquire. Not derived # from the clock so it cannot collide across a fast restart. _BOOT_ID = secrets.token_hex(16) # Best-effort observability counters (reset on restart, like the reaper's). Guarded # by a lock because the monitor / reaper / reconciler / webhook threads all touch # them. Never a source of truth — purely for GET /queue. _LOCK = threading.Lock() _COUNTERS: dict[str, int] = { "acquired_total": 0, # leases successfully acquired "busy_total": 0, # acquire deferred — a live owner already held it "released_total": 0, # normal try/finally releases "cas_lost_total": 0, # stage-CAS lost the race (aborted without side effect) "stale_reclaims_total": 0, # rows reclaimed because the owner was not live "force_reclaims_total": 0, # rows force-released (reaper / operator) } def _bump(key: str, n: int = 1) -> None: try: with _LOCK: _COUNTERS[key] = _COUNTERS.get(key, 0) + n except Exception: # noqa: BLE001 - counters never break a caller pass def boot_id() -> str: """This process's boot nonce (exposed for tests / observability).""" return _BOOT_ID # --------------------------------------------------------------------------- # Conditionality (mirrors coverage_gate_applies — self-hosting-only by default) # --------------------------------------------------------------------------- def _enabled() -> bool: try: return bool(getattr(settings, "transition_lease_enabled", False)) except Exception: # noqa: BLE001 return False def applies(repo: str) -> bool: """Whether the transition-lease + CAS are REAL for this repo (ADR-001 D10). * ``transition_lease_enabled=False`` -> always False (kill-switch; the lease is neither written nor read AND ``commit_stage_cas`` degenerates to the prior unconditional ``update_task_stage`` -> behaviour byte-for-byte as before ORCH-114). * ``transition_lease_repos`` (CSV) non-empty -> real only for the listed repos. * empty CSV -> real ONLY for the self-hosting repo (``orchestrator``), where the irreversible side-effectful edges live (mirrors coverage_gate_repos -> enduro untouched at the default). Never raises -> False on error (the safe "mechanism inert" default == kill-switch off). """ try: if not _enabled(): return False raw = (getattr(settings, "transition_lease_repos", "") or "").strip() if raw: allowed = {r.strip().lower() for r in raw.split(",") if r.strip()} return (repo or "").strip().lower() in allowed from .qg.checks import is_self_hosting_repo return is_self_hosting_repo(repo) except Exception as e: # noqa: BLE001 - never-raise contract logger.warning("transition_lease.applies error for %s: %s", repo, e) return False # --------------------------------------------------------------------------- # Liveness # --------------------------------------------------------------------------- def _pid_alive(pid) -> bool: """Probe ``pid`` liveness via ``merge_gate.pid_alive`` (ADR-001 references it for a single shared semantics). Lazy import keeps this module a leaf; on import error fall back to a conservative ``True`` (a lease whose pid we cannot probe is treated as live — the boot-id check below + the Tier-3 backstop still bound it). """ try: from .merge_gate import pid_alive return pid_alive(pid) except Exception: # noqa: BLE001 return True def _row_is_live(owner_boot_id, owner_pid) -> bool: """True iff the lease owner is LIVE (this process's boot AND a live pid). A row from a DIFFERENT boot id (a previous process) is dead by construction (ADR-001 D3); a same-boot row is owned by the current — alive — process, but we still probe the pid for forward-compatibility with a future multi-process model. """ if owner_boot_id != _BOOT_ID: return False return _pid_alive(owner_pid) def is_held_by_live_owner(task_id) -> bool: """True iff an active lease row for ``task_id`` is owned by a LIVE actor. Consulted by the reconciler F-1 / Plane-webhook DEFER guards and the reaper. Returns ``False`` when there is no row or the owner is stale. Fail-CLOSED on any error -> ``True`` (treat as held): the consulting caller DEFERS, which is always the safe-against-double-effect action (the reaper Tier-3 backstop still bounds a truly stuck task). When the mechanism is disabled -> ``False`` (no defer). """ if task_id is None: return False if not _enabled(): return False try: conn = db.get_db() try: row = conn.execute( "SELECT owner_boot_id, owner_pid FROM transition_lease WHERE task_id=?", (task_id,), ).fetchone() finally: conn.close() if row is None: return False return _row_is_live(row["owner_boot_id"], row["owner_pid"]) except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt (conservative defer) logger.warning( "transition_lease.is_held_by_live_owner error for task %s -> " "fail-CLOSED (defer): %s", task_id, e, ) return True # --------------------------------------------------------------------------- # Acquire / release / reclaim # --------------------------------------------------------------------------- def _clear_stale_row(conn, task_id) -> int: """Delete the lease row for ``task_id`` IFF its owner is not live. Returns the rowcount. Uses the caller's connection (same transaction as the INSERT in ``acquire``). Raises on a real DB fault (the caller swallows).""" row = conn.execute( "SELECT owner_boot_id, owner_pid FROM transition_lease WHERE task_id=?", (task_id,), ).fetchone() if row is None: return 0 if _row_is_live(row["owner_boot_id"], row["owner_pid"]): return 0 cur = conn.execute("DELETE FROM transition_lease WHERE task_id=?", (task_id,)) return cur.rowcount or 0 def acquire(task_id, owner: str, run_id=None, stage: str | None = None) -> bool: """Acquire the side-effectful-transition lease for ``task_id`` (ADR-001 D5). Atomic rowcount-guard (pattern ``claim_next_job`` / ``reap_running_job``): a stale row (owner from a previous boot / dead pid) is cleared first, then an ``INSERT … ON CONFLICT(task_id) DO NOTHING`` competes only with LIVE same-process owners. ``rowcount == 1`` -> WE won. ``rowcount == 0`` -> a live owner already holds it -> ``False`` (the caller DEFERS without starting the heavy region). Kill-switch off -> ``True`` (no-op acquire; the caller proceeds exactly as before ORCH-114; ``release`` is then an idempotent no-op). ``task_id is None`` -> ``True`` (a job with no task cannot be leased — legacy direct ``launch()``; proceed). never-raise: any error -> ``False`` (busy) so the caller DEFERS a side-effectful transition rather than risk a double effect (fail-CLOSED to no-double-effect, ADR-001 D9). """ if not _enabled(): return True if task_id is None: return True try: conn = db.get_db() try: _clear_stale_row(conn, task_id) cur = conn.execute( "INSERT INTO transition_lease " "(task_id, owner, owner_pid, owner_boot_id, run_id, stage) " "VALUES (?, ?, ?, ?, ?, ?) " "ON CONFLICT(task_id) DO NOTHING", (task_id, owner or "engine", os.getpid(), _BOOT_ID, run_id, stage), ) conn.commit() won = (cur.rowcount == 1) finally: conn.close() if won: _bump("acquired_total") return True _bump("busy_total") logger.info( "transition_lease: task %s busy (a live owner holds the transition); " "%s defers", task_id, owner, ) return False except Exception as e: # noqa: BLE001 - fail-CLOSED (busy) to avoid double effects logger.warning("transition_lease.acquire error for task %s: %s", task_id, e) return False def release(task_id, force: bool = False) -> None: """Release the lease for ``task_id`` (ADR-001 D5). Idempotent, never raises. * ``force=False`` (normal try/finally release in ``advance_stage``): delete only a row owned by THIS process (``owner_boot_id == boot``), so a release delayed past a reaper-reclaim-then-reacquire can never delete a lease a DIFFERENT process/owner acquired afterwards (holder-aware, mirrors ``release_merge_lease``). * ``force=True`` (reaper reap / operator endpoint): delete unconditionally. """ if task_id is None: return if not _enabled(): return try: conn = db.get_db() try: if force: cur = conn.execute( "DELETE FROM transition_lease WHERE task_id=?", (task_id,) ) else: cur = conn.execute( "DELETE FROM transition_lease WHERE task_id=? AND owner_boot_id=?", (task_id, _BOOT_ID), ) conn.commit() n = cur.rowcount or 0 finally: conn.close() if n: _bump("force_reclaims_total" if force else "released_total", n) except Exception as e: # noqa: BLE001 - never-raise (a leaked lease is bounded by Tier-3) logger.warning("transition_lease.release error for task %s: %s", task_id, e) def reclaim_if_stale(task_id) -> bool: """Reclaim (delete) the lease row for ``task_id`` IFF its owner is not live. Returns True iff a stale row was reclaimed. Used by the operator endpoint and as a backstop. never-raise -> False on error. """ if task_id is None or not _enabled(): return False try: conn = db.get_db() try: n = _clear_stale_row(conn, task_id) conn.commit() finally: conn.close() if n: _bump("stale_reclaims_total", n) logger.warning("transition_lease: reclaimed stale lease for task %s", task_id) return n > 0 except Exception as e: # noqa: BLE001 - never-raise logger.warning("transition_lease.reclaim_if_stale error for task %s: %s", task_id, e) return False def recover_on_startup() -> int: """Clear every lease left by a PREVIOUS process boot (ADR-001 D7). Called from ``main.lifespan`` right after ``requeue_running_jobs`` and BEFORE the reaper starts. A fresh process boot id means every existing row predates this process -> stale -> deleted, so the requeued jobs re-drive their transitions cleanly (idempotency comes from the authoritative durable facts — SHA-in-main, the INITIATED self-deploy marker, the coverage-ratchet CAS — NOT from a new recovery brain). Returns the number of rows cleared. never-raise -> 0 on error. """ if not _enabled(): return 0 try: conn = db.get_db() try: cur = conn.execute( "DELETE FROM transition_lease " "WHERE owner_boot_id IS NULL OR owner_boot_id != ?", (_BOOT_ID,), ) conn.commit() n = cur.rowcount or 0 finally: conn.close() if n: _bump("stale_reclaims_total", n) logger.warning( "transition_lease.recover_on_startup: cleared %d stale lease(s) from a " "previous boot", n, ) # FR-6 / AC-12: a forced/stale reclaim is observable (Telegram alert). A # restart-time bulk reclaim is summarised (per-task clickable alerts come # from the operator endpoint). best-effort, never-raise. try: from .notifications import send_telegram send_telegram( f"♻️ Transition-lease recovery: сброшено {n} устаревш" f"(ий/их) lease после рестарта (переходы будут пере-исполнены " f"последовательно)." ) except Exception: # noqa: BLE001 - alert is best-effort pass return n except Exception as e: # noqa: BLE001 - never-raise logger.warning("transition_lease.recover_on_startup error: %s", e) return 0 # --------------------------------------------------------------------------- # Stage write — compare-and-swap wrapper (ADR-001 D5 / FR-2) # --------------------------------------------------------------------------- def commit_stage_cas(task_id, expected_stage: str, new_stage: str, repo: str) -> bool: """Write the task stage under the ORCH-114 contract. Returns True iff the write was applied (and the caller may proceed with side effects), False iff the writer lost the CAS race (the caller MUST abort WITHOUT side effects). * ``applies(repo)`` False (kill-switch off / repo out of scope) -> the prior unconditional ``update_task_stage`` (byte-for-byte) -> always True. Not wrapped in a swallowing try, so a DB error propagates EXACTLY as before ORCH-114. * ``applies(repo)`` True -> ``update_task_stage_cas`` (expected-stage compare-and- swap). A lost race -> False (no side effect). never-raise on the CAS path: a DB error -> False (abort the write; never a blind overwrite). """ try: scoped = applies(repo) except Exception: # noqa: BLE001 - applies already never-raises; belt-and-suspenders scoped = False if not scoped: db.update_task_stage(task_id, new_stage) return True try: won = db.update_task_stage_cas(task_id, expected_stage, new_stage) if not won: _bump("cas_lost_total") return won except Exception as e: # noqa: BLE001 - abort the write (no blind overwrite) logger.warning( "transition_lease.commit_stage_cas error for task %s (%s->%s): %s", task_id, expected_stage, new_stage, e, ) return False # --------------------------------------------------------------------------- # Observability snapshot for GET /queue (ADR-001 D10 / FR-6) # --------------------------------------------------------------------------- def snapshot() -> dict: """Read-only transition-lease summary for GET /queue. Additive block; existing /queue keys untouched. never-raise -> a minimal dict on error. """ try: enabled = _enabled() except Exception: # noqa: BLE001 enabled = False try: repos_cfg = getattr(settings, "transition_lease_repos", "") or "" except Exception: # noqa: BLE001 repos_cfg = "" holders: list[dict] = [] try: conn = db.get_db() try: rows = conn.execute( "SELECT task_id, owner, owner_pid, owner_boot_id, run_id, stage, " "acquired_at, " "CAST(strftime('%s','now') - strftime('%s', acquired_at) AS INTEGER) " " AS age_s " "FROM transition_lease ORDER BY task_id" ).fetchall() finally: conn.close() for r in rows: holders.append({ "task_id": r["task_id"], "owner": r["owner"], "stage": r["stage"], "run_id": r["run_id"], "age_s": r["age_s"], "live": _row_is_live(r["owner_boot_id"], r["owner_pid"]), }) except Exception as e: # noqa: BLE001 - never break /queue logger.warning("transition_lease.snapshot error: %s", e) try: with _LOCK: counters = dict(_COUNTERS) except Exception: # noqa: BLE001 counters = {} return { "enabled": enabled, "repos": repos_cfg, "boot_id": _BOOT_ID, "active": len(holders), "holders": holders, "counters": counters, }