Files
orchestrator/src/transition_lease.py
claude-bot 6ea4402942 fix(stage-engine): durable transition-ownership lease + expected-stage CAS (ORCH-114)
Close the root class of the ORCH-110/111/112/113 incident chain: side-effectful
stage transitions had no single ownership. `advance_stage` is re-enterable and wrote
the stage with a bare `UPDATE ... WHERE id=?` (no compare-and-swap), while >=5 actors
(monitor / Plane-webhook / reconciler F-1 / job-reaper / deploy-finalizer) enter the
same transition independently. A concurrent or post-restart re-entry therefore
re-applied irreversible effects (merge_pr / coverage-ratchet / image-rebuild /
prod-deploy initiation) and produced a contradictory rollback<->done (incident
ORCH-111, job 1914 / PR #130).

Two complementary layers, both additive, under one kill-switch, never-raise:
  1. Durable transition-lease (new table `transition_lease`) — owner-exclusion on
     ENTRY to the side-effectful region: a second actor that sees a LIVE owner does
     not start the heavy sub-gates at all (prevention, not post-hoc repair).
  2. Expected-stage CAS (`db.update_task_stage_cas`) — atomicity on the stage WRITE:
     a lost race aborts with NO side effect. Also closes the 6 paths that write the
     stage in bypass of advance_stage (gitea x5 + plane rollback).

Owner liveness = owner_pid + owner_boot_id (NOT a heartbeat — a blocking 900s merge
re-test cannot beat one; ADR-001 D3), making restart recovery free (a fresh boot_id
renders every prior lease stale -> reclaimed by recover_on_startup). The lease has no
own TTL: its hard age ceiling is the reaper Tier-3 backstop reaper_max_running_s, so
the cross-cutting budget invariant ORCH-065/109/110/113 is untouched.

Generalises ORCH-113 finalizer-liveness (process-local, Tier-2, deploy-staging) to a
durable cross-path lease: the reaper consults it on all relevant paths (defer live,
reclaim dead; Tier-3 ignores the marker -> bounded; a reap force-releases the lease);
reconciler F-1 and the Plane webhook defer on an active lease; main.lifespan calls
recover_on_startup() after requeue_running_jobs. finalizer_liveness.py is unchanged
(it remains the kill-switch-off fallback).

Scope self-hosting (transition_lease_repos="" -> orchestrator only; enduro untouched).
Kill-switch ORCH_TRANSITION_LEASE_ENABLED=false -> CAS degenerates to the prior
unconditional update_task_stage, lease inert, reaper -> ORCH-113 fallback (byte-for-
byte pre-ORCH-114). STAGE_TRANSITIONS / QG_CHECKS / check_* / machine-verdict keys /
existing table schemas — byte-for-byte (one additive table, no epoch column on tasks).

Observability: read-only `transition_lease` block in GET /queue + a Telegram alert on
forced/stale reclaim + optional POST /transition-lease/release?work_item=<id>.

Coverage: tests/test_orch114_transition_ownership.py (TC-01 mandatory regression of
the ORCH-111 class — red before fix, green after; TC-02..TC-14). Full suite green
(2048 passed); the 4 webhook tests that spied on the removed gitea.update_task_stage
were updated to spy on the new commit_stage_cas write path.

ADR: docs/work-items/ORCH-114/06-adr/ADR-001-transition-ownership-lease-and-stage-cas.md
Cross-cutting: docs/architecture/adr/adr-0045-transition-ownership-lease-and-stage-cas.md

Refs: ORCH-114
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 19:28:38 +03:00

472 lines
20 KiB
Python

"""ORCH-114 (adr-0045): durable transition-ownership lease + expected-stage CAS.
Leaf module — pure, never-raise (pattern of ``serial_gate`` / ``coverage_gate`` /
``finalizer_liveness``: imports only ``db`` + ``config`` and lazily
``merge_gate.pid_alive`` / ``qg.checks.is_self_hosting_repo`` / ``notifications``;
it NEVER imports ``stage_engine`` / ``launcher`` and talks to no network).
The bug class it closes
-----------------------
``stage_engine.advance_stage`` is the single entry to side-effectful transitions
(the heavy ``deploy-staging -> deploy`` edge sub-gates — security / merge-gate
re-test / coverage / image-freshness — and the ``deploy -> done`` merge-verify:
``merge_pr`` / coverage-ratchet / proof-of-merge). It is RE-ENTERABLE: at least
five actors (monitor / Plane-webhook / reconciler F-1 / job-reaper / deploy
finalizer) can enter the SAME transition independently, and the stage write was a
bare ``UPDATE … WHERE id=?`` with no compare-and-swap. Two concurrent — or a
post-restart re-driven — entry therefore re-applied irreversible effects and
produced contradictory outcomes (one path rolled back to ``development`` while
another merged + finished — incident ORCH-111, job 1914 / PR #130). ORCH-113
closed only the in-memory, Tier-2, ``deploy-staging``-only slice of this; it is
lost on restart.
Two complementary layers (ADR-001 D1), both gated by one kill-switch:
1. **Durable lease (owner-exclusion on ENTRY).** A row in the additive
``transition_lease`` table (one per task) records "an actor owns this task's
side-effectful transition". A second actor that sees a LIVE owner does not
start the heavy sub-gates AT ALL (prevention, not post-hoc repair).
2. **Expected-stage CAS (atomicity on the WRITE).** ``update_task_stage_cas``
writes the stage only when the task is still at the expected stage; a lost
race aborts with NO side effect. It also closes the six paths that write the
stage in BYPASS of ``advance_stage`` (gitea / plane direct ``update_task_stage``).
Liveness without a heartbeat (ADR-001 D3)
-----------------------------------------
An owner is LIVE ⇔ ``owner_boot_id == <this process's boot id>`` AND
``merge_gate.pid_alive(owner_pid)``. There is NO heartbeat (a blocking 900 s merge
re-test cannot beat one — the very argument ORCH-113 used to reject heartbeats).
This makes restart recovery free: a new process has a new ``boot_id`` so every row
written by a previous process is instantly stale and reclaimed
(``recover_on_startup``). Within the one-process model every live owner shares the
SAME boot id and pid, so a same-boot row is by definition owned by the (alive)
current process; only a different-boot row can be stale — which is why the
acquire/recover logic keys staleness on the boot id.
No own TTL (ADR-001 D8): the lease's hard age ceiling IS the reaper Tier-3 backstop
``reaper_max_running_s`` (the reaper force-releases the lease when it reaps), so the
cross-cutting budget invariant ORCH-065/109/110/113 is untouched.
never-raise (ADR-001 D9 / NFR-1): every public function is isolated. The
directional defaults:
* ``acquire`` error -> ``False`` (busy): the caller DEFERS/aborts a side-effectful
transition rather than risk a double effect (fail-CLOSED to no-double-effect).
* ``is_held_by_live_owner`` error -> ``True`` (treat as held): the consulting
reconciler / webhook / reaper conservatively DEFERS (the safe action; the reaper
Tier-3 backstop still bounds a genuinely stuck task).
* ``commit_stage_cas`` error on the CAS path -> ``False``: abort the write, never a
blind overwrite.
The hot claim path (``db.claim_next_job``) is deliberately NOT touched, so a lease
bug can never wedge the shared queue of all projects (AC-8 ORCH-088 intact).
See docs/work-items/ORCH-114/06-adr/ADR-001-transition-ownership-lease-and-stage-cas.md
and the cross-cutting docs/architecture/adr/adr-0045-transition-ownership-lease-and-stage-cas.md.
"""
from __future__ import annotations
import logging
import os
import secrets
import threading
from . import db
from .config import settings
logger = logging.getLogger("orchestrator.transition_lease")
# Per-process boot nonce (ADR-001 D3). Generated ONCE at import: every lease row a
# previous process wrote carries a DIFFERENT boot id and is therefore instantly
# stale after a restart -> reclaimed by recover_on_startup / acquire. Not derived
# from the clock so it cannot collide across a fast restart.
_BOOT_ID = secrets.token_hex(16)
# Best-effort observability counters (reset on restart, like the reaper's). Guarded
# by a lock because the monitor / reaper / reconciler / webhook threads all touch
# them. Never a source of truth — purely for GET /queue.
_LOCK = threading.Lock()
_COUNTERS: dict[str, int] = {
"acquired_total": 0, # leases successfully acquired
"busy_total": 0, # acquire deferred — a live owner already held it
"released_total": 0, # normal try/finally releases
"cas_lost_total": 0, # stage-CAS lost the race (aborted without side effect)
"stale_reclaims_total": 0, # rows reclaimed because the owner was not live
"force_reclaims_total": 0, # rows force-released (reaper / operator)
}
def _bump(key: str, n: int = 1) -> None:
try:
with _LOCK:
_COUNTERS[key] = _COUNTERS.get(key, 0) + n
except Exception: # noqa: BLE001 - counters never break a caller
pass
def boot_id() -> str:
"""This process's boot nonce (exposed for tests / observability)."""
return _BOOT_ID
# ---------------------------------------------------------------------------
# Conditionality (mirrors coverage_gate_applies — self-hosting-only by default)
# ---------------------------------------------------------------------------
def _enabled() -> bool:
try:
return bool(getattr(settings, "transition_lease_enabled", False))
except Exception: # noqa: BLE001
return False
def applies(repo: str) -> bool:
"""Whether the transition-lease + CAS are REAL for this repo (ADR-001 D10).
* ``transition_lease_enabled=False`` -> always False (kill-switch; the lease is
neither written nor read AND ``commit_stage_cas`` degenerates to the prior
unconditional ``update_task_stage`` -> behaviour byte-for-byte as before
ORCH-114).
* ``transition_lease_repos`` (CSV) non-empty -> real only for the listed repos.
* empty CSV -> real ONLY for the self-hosting repo (``orchestrator``), where the
irreversible side-effectful edges live (mirrors coverage_gate_repos -> enduro
untouched at the default).
Never raises -> False on error (the safe "mechanism inert" default == kill-switch
off).
"""
try:
if not _enabled():
return False
raw = (getattr(settings, "transition_lease_repos", "") or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
return (repo or "").strip().lower() in allowed
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("transition_lease.applies error for %s: %s", repo, e)
return False
# ---------------------------------------------------------------------------
# Liveness
# ---------------------------------------------------------------------------
def _pid_alive(pid) -> bool:
"""Probe ``pid`` liveness via ``merge_gate.pid_alive`` (ADR-001 references it for
a single shared semantics). Lazy import keeps this module a leaf; on import error
fall back to a conservative ``True`` (a lease whose pid we cannot probe is treated
as live — the boot-id check below + the Tier-3 backstop still bound it).
"""
try:
from .merge_gate import pid_alive
return pid_alive(pid)
except Exception: # noqa: BLE001
return True
def _row_is_live(owner_boot_id, owner_pid) -> bool:
"""True iff the lease owner is LIVE (this process's boot AND a live pid).
A row from a DIFFERENT boot id (a previous process) is dead by construction
(ADR-001 D3); a same-boot row is owned by the current — alive — process, but we
still probe the pid for forward-compatibility with a future multi-process model.
"""
if owner_boot_id != _BOOT_ID:
return False
return _pid_alive(owner_pid)
def is_held_by_live_owner(task_id) -> bool:
"""True iff an active lease row for ``task_id`` is owned by a LIVE actor.
Consulted by the reconciler F-1 / Plane-webhook DEFER guards and the reaper.
Returns ``False`` when there is no row or the owner is stale. Fail-CLOSED on any
error -> ``True`` (treat as held): the consulting caller DEFERS, which is always
the safe-against-double-effect action (the reaper Tier-3 backstop still bounds a
truly stuck task). When the mechanism is disabled -> ``False`` (no defer).
"""
if task_id is None:
return False
if not _enabled():
return False
try:
conn = db.get_db()
try:
row = conn.execute(
"SELECT owner_boot_id, owner_pid FROM transition_lease WHERE task_id=?",
(task_id,),
).fetchone()
finally:
conn.close()
if row is None:
return False
return _row_is_live(row["owner_boot_id"], row["owner_pid"])
except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt (conservative defer)
logger.warning(
"transition_lease.is_held_by_live_owner error for task %s -> "
"fail-CLOSED (defer): %s", task_id, e,
)
return True
# ---------------------------------------------------------------------------
# Acquire / release / reclaim
# ---------------------------------------------------------------------------
def _clear_stale_row(conn, task_id) -> int:
"""Delete the lease row for ``task_id`` IFF its owner is not live. Returns the
rowcount. Uses the caller's connection (same transaction as the INSERT in
``acquire``). Raises on a real DB fault (the caller swallows)."""
row = conn.execute(
"SELECT owner_boot_id, owner_pid FROM transition_lease WHERE task_id=?",
(task_id,),
).fetchone()
if row is None:
return 0
if _row_is_live(row["owner_boot_id"], row["owner_pid"]):
return 0
cur = conn.execute("DELETE FROM transition_lease WHERE task_id=?", (task_id,))
return cur.rowcount or 0
def acquire(task_id, owner: str, run_id=None, stage: str | None = None) -> bool:
"""Acquire the side-effectful-transition lease for ``task_id`` (ADR-001 D5).
Atomic rowcount-guard (pattern ``claim_next_job`` / ``reap_running_job``): a stale
row (owner from a previous boot / dead pid) is cleared first, then an
``INSERT … ON CONFLICT(task_id) DO NOTHING`` competes only with LIVE same-process
owners. ``rowcount == 1`` -> WE won. ``rowcount == 0`` -> a live owner already
holds it -> ``False`` (the caller DEFERS without starting the heavy region).
Kill-switch off -> ``True`` (no-op acquire; the caller proceeds exactly as before
ORCH-114; ``release`` is then an idempotent no-op). ``task_id is None`` -> ``True``
(a job with no task cannot be leased — legacy direct ``launch()``; proceed).
never-raise: any error -> ``False`` (busy) so the caller DEFERS a side-effectful
transition rather than risk a double effect (fail-CLOSED to no-double-effect,
ADR-001 D9).
"""
if not _enabled():
return True
if task_id is None:
return True
try:
conn = db.get_db()
try:
_clear_stale_row(conn, task_id)
cur = conn.execute(
"INSERT INTO transition_lease "
"(task_id, owner, owner_pid, owner_boot_id, run_id, stage) "
"VALUES (?, ?, ?, ?, ?, ?) "
"ON CONFLICT(task_id) DO NOTHING",
(task_id, owner or "engine", os.getpid(), _BOOT_ID, run_id, stage),
)
conn.commit()
won = (cur.rowcount == 1)
finally:
conn.close()
if won:
_bump("acquired_total")
return True
_bump("busy_total")
logger.info(
"transition_lease: task %s busy (a live owner holds the transition); "
"%s defers", task_id, owner,
)
return False
except Exception as e: # noqa: BLE001 - fail-CLOSED (busy) to avoid double effects
logger.warning("transition_lease.acquire error for task %s: %s", task_id, e)
return False
def release(task_id, force: bool = False) -> None:
"""Release the lease for ``task_id`` (ADR-001 D5). Idempotent, never raises.
* ``force=False`` (normal try/finally release in ``advance_stage``): delete only
a row owned by THIS process (``owner_boot_id == boot``), so a release delayed
past a reaper-reclaim-then-reacquire can never delete a lease a DIFFERENT
process/owner acquired afterwards (holder-aware, mirrors ``release_merge_lease``).
* ``force=True`` (reaper reap / operator endpoint): delete unconditionally.
"""
if task_id is None:
return
if not _enabled():
return
try:
conn = db.get_db()
try:
if force:
cur = conn.execute(
"DELETE FROM transition_lease WHERE task_id=?", (task_id,)
)
else:
cur = conn.execute(
"DELETE FROM transition_lease WHERE task_id=? AND owner_boot_id=?",
(task_id, _BOOT_ID),
)
conn.commit()
n = cur.rowcount or 0
finally:
conn.close()
if n:
_bump("force_reclaims_total" if force else "released_total", n)
except Exception as e: # noqa: BLE001 - never-raise (a leaked lease is bounded by Tier-3)
logger.warning("transition_lease.release error for task %s: %s", task_id, e)
def reclaim_if_stale(task_id) -> bool:
"""Reclaim (delete) the lease row for ``task_id`` IFF its owner is not live.
Returns True iff a stale row was reclaimed. Used by the operator endpoint and as
a backstop. never-raise -> False on error.
"""
if task_id is None or not _enabled():
return False
try:
conn = db.get_db()
try:
n = _clear_stale_row(conn, task_id)
conn.commit()
finally:
conn.close()
if n:
_bump("stale_reclaims_total", n)
logger.warning("transition_lease: reclaimed stale lease for task %s", task_id)
return n > 0
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("transition_lease.reclaim_if_stale error for task %s: %s", task_id, e)
return False
def recover_on_startup() -> int:
"""Clear every lease left by a PREVIOUS process boot (ADR-001 D7).
Called from ``main.lifespan`` right after ``requeue_running_jobs`` and BEFORE the
reaper starts. A fresh process boot id means every existing row predates this
process -> stale -> deleted, so the requeued jobs re-drive their transitions
cleanly (idempotency comes from the authoritative durable facts — SHA-in-main,
the INITIATED self-deploy marker, the coverage-ratchet CAS — NOT from a new
recovery brain). Returns the number of rows cleared. never-raise -> 0 on error.
"""
if not _enabled():
return 0
try:
conn = db.get_db()
try:
cur = conn.execute(
"DELETE FROM transition_lease "
"WHERE owner_boot_id IS NULL OR owner_boot_id != ?",
(_BOOT_ID,),
)
conn.commit()
n = cur.rowcount or 0
finally:
conn.close()
if n:
_bump("stale_reclaims_total", n)
logger.warning(
"transition_lease.recover_on_startup: cleared %d stale lease(s) from a "
"previous boot", n,
)
# FR-6 / AC-12: a forced/stale reclaim is observable (Telegram alert). A
# restart-time bulk reclaim is summarised (per-task clickable alerts come
# from the operator endpoint). best-effort, never-raise.
try:
from .notifications import send_telegram
send_telegram(
f"♻️ Transition-lease recovery: сброшено {n} устаревш"
f"(ий/их) lease после рестарта (переходы будут пере-исполнены "
f"последовательно)."
)
except Exception: # noqa: BLE001 - alert is best-effort
pass
return n
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("transition_lease.recover_on_startup error: %s", e)
return 0
# ---------------------------------------------------------------------------
# Stage write — compare-and-swap wrapper (ADR-001 D5 / FR-2)
# ---------------------------------------------------------------------------
def commit_stage_cas(task_id, expected_stage: str, new_stage: str, repo: str) -> bool:
"""Write the task stage under the ORCH-114 contract. Returns True iff the write
was applied (and the caller may proceed with side effects), False iff the writer
lost the CAS race (the caller MUST abort WITHOUT side effects).
* ``applies(repo)`` False (kill-switch off / repo out of scope) -> the prior
unconditional ``update_task_stage`` (byte-for-byte) -> always True. Not wrapped
in a swallowing try, so a DB error propagates EXACTLY as before ORCH-114.
* ``applies(repo)`` True -> ``update_task_stage_cas`` (expected-stage compare-and-
swap). A lost race -> False (no side effect). never-raise on the CAS path: a DB
error -> False (abort the write; never a blind overwrite).
"""
try:
scoped = applies(repo)
except Exception: # noqa: BLE001 - applies already never-raises; belt-and-suspenders
scoped = False
if not scoped:
db.update_task_stage(task_id, new_stage)
return True
try:
won = db.update_task_stage_cas(task_id, expected_stage, new_stage)
if not won:
_bump("cas_lost_total")
return won
except Exception as e: # noqa: BLE001 - abort the write (no blind overwrite)
logger.warning(
"transition_lease.commit_stage_cas error for task %s (%s->%s): %s",
task_id, expected_stage, new_stage, e,
)
return False
# ---------------------------------------------------------------------------
# Observability snapshot for GET /queue (ADR-001 D10 / FR-6)
# ---------------------------------------------------------------------------
def snapshot() -> dict:
"""Read-only transition-lease summary for GET /queue. Additive block; existing
/queue keys untouched. never-raise -> a minimal dict on error.
"""
try:
enabled = _enabled()
except Exception: # noqa: BLE001
enabled = False
try:
repos_cfg = getattr(settings, "transition_lease_repos", "") or ""
except Exception: # noqa: BLE001
repos_cfg = ""
holders: list[dict] = []
try:
conn = db.get_db()
try:
rows = conn.execute(
"SELECT task_id, owner, owner_pid, owner_boot_id, run_id, stage, "
"acquired_at, "
"CAST(strftime('%s','now') - strftime('%s', acquired_at) AS INTEGER) "
" AS age_s "
"FROM transition_lease ORDER BY task_id"
).fetchall()
finally:
conn.close()
for r in rows:
holders.append({
"task_id": r["task_id"],
"owner": r["owner"],
"stage": r["stage"],
"run_id": r["run_id"],
"age_s": r["age_s"],
"live": _row_is_live(r["owner_boot_id"], r["owner_pid"]),
})
except Exception as e: # noqa: BLE001 - never break /queue
logger.warning("transition_lease.snapshot error: %s", e)
try:
with _LOCK:
counters = dict(_COUNTERS)
except Exception: # noqa: BLE001
counters = {}
return {
"enabled": enabled,
"repos": repos_cfg,
"boot_id": _BOOT_ID,
"active": len(holders),
"holders": holders,
"counters": counters,
}