fix(stage-engine): durable transition-ownership lease + expected-stage CAS (ORCH-114)

Close the root class of the ORCH-110/111/112/113 incident chain: side-effectful
stage transitions had no single ownership. `advance_stage` is re-enterable and wrote
the stage with a bare `UPDATE ... WHERE id=?` (no compare-and-swap), while >=5 actors
(monitor / Plane-webhook / reconciler F-1 / job-reaper / deploy-finalizer) enter the
same transition independently. A concurrent or post-restart re-entry therefore
re-applied irreversible effects (merge_pr / coverage-ratchet / image-rebuild /
prod-deploy initiation) and produced a contradictory rollback<->done (incident
ORCH-111, job 1914 / PR #130).

Two complementary layers, both additive, under one kill-switch, never-raise:
  1. Durable transition-lease (new table `transition_lease`) — owner-exclusion on
     ENTRY to the side-effectful region: a second actor that sees a LIVE owner does
     not start the heavy sub-gates at all (prevention, not post-hoc repair).
  2. Expected-stage CAS (`db.update_task_stage_cas`) — atomicity on the stage WRITE:
     a lost race aborts with NO side effect. Also closes the 6 paths that write the
     stage in bypass of advance_stage (gitea x5 + plane rollback).

Owner liveness = owner_pid + owner_boot_id (NOT a heartbeat — a blocking 900s merge
re-test cannot beat one; ADR-001 D3), making restart recovery free (a fresh boot_id
renders every prior lease stale -> reclaimed by recover_on_startup). The lease has no
own TTL: its hard age ceiling is the reaper Tier-3 backstop reaper_max_running_s, so
the cross-cutting budget invariant ORCH-065/109/110/113 is untouched.

Generalises ORCH-113 finalizer-liveness (process-local, Tier-2, deploy-staging) to a
durable cross-path lease: the reaper consults it on all relevant paths (defer live,
reclaim dead; Tier-3 ignores the marker -> bounded; a reap force-releases the lease);
reconciler F-1 and the Plane webhook defer on an active lease; main.lifespan calls
recover_on_startup() after requeue_running_jobs. finalizer_liveness.py is unchanged
(it remains the kill-switch-off fallback).

Scope self-hosting (transition_lease_repos="" -> orchestrator only; enduro untouched).
Kill-switch ORCH_TRANSITION_LEASE_ENABLED=false -> CAS degenerates to the prior
unconditional update_task_stage, lease inert, reaper -> ORCH-113 fallback (byte-for-
byte pre-ORCH-114). STAGE_TRANSITIONS / QG_CHECKS / check_* / machine-verdict keys /
existing table schemas — byte-for-byte (one additive table, no epoch column on tasks).

Observability: read-only `transition_lease` block in GET /queue + a Telegram alert on
forced/stale reclaim + optional POST /transition-lease/release?work_item=<id>.

Coverage: tests/test_orch114_transition_ownership.py (TC-01 mandatory regression of
the ORCH-111 class — red before fix, green after; TC-02..TC-14). Full suite green
(2048 passed); the 4 webhook tests that spied on the removed gitea.update_task_stage
were updated to spy on the new commit_stage_cas write path.

ADR: docs/work-items/ORCH-114/06-adr/ADR-001-transition-ownership-lease-and-stage-cas.md
Cross-cutting: docs/architecture/adr/adr-0045-transition-ownership-lease-and-stage-cas.md

Refs: ORCH-114
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-15 17:37:11 +03:00
committed by deployer
parent cc03e68847
commit 6ea4402942
15 changed files with 1591 additions and 82 deletions

View File

@@ -13,7 +13,6 @@ from ..config import settings
from ..db import (
get_db,
get_task_by_repo_branch,
update_task_stage,
enqueue_job,
insert_event_dedup,
)
@@ -24,6 +23,7 @@ from ..notifications import notify_stage_change, notify_qg_failure, notify_error
from ..agents.launcher import launcher
from ..plane_sync import notify_stage_change as plane_notify_stage
from ..projects import get_project_by_repo
from .. import transition_lease
logger = logging.getLogger("orchestrator.webhooks.gitea")
@@ -124,18 +124,25 @@ async def handle_push(payload: dict):
if has_adr:
# Advance to development
next_stage = "development"
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
# ORCH-114 (adr-0045 / D4, TR-4): this push-driven advance writes the stage
# in BYPASS of advance_stage -> route through the expected-stage CAS so it
# cannot clobber a concurrent authoritative write; a lost race skips the
# notify + enqueue (no duplicate agent). Kill-switch off -> unconditional
# (byte-for-byte).
if transition_lease.commit_stage_cas(task_id, current_stage, next_stage, repo_name):
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: push triggered {current_stage}{next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: push triggered {current_stage}{next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
logger.info(f"Task {task_id}: push-advance stage-CAS lost ({current_stage}->{next_stage}); another writer moved it")
elif current_stage == "development":
# Source files pushed — just log, wait for CI
@@ -239,18 +246,22 @@ async def handle_ci_status(payload: dict):
passed, reason = check_ci_green(repo_name, branch)
if passed:
next_stage = "review"
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
# ORCH-114 (adr-0045 / D4, TR-4): CI-green advance in BYPASS of
# advance_stage -> expected-stage CAS; a lost race skips notify + enqueue.
if transition_lease.commit_stage_cas(task_id, current_stage, next_stage, repo_name):
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
logger.info(f"Task {task_id}: CI-green stage-CAS lost ({current_stage}->{next_stage}); another writer moved it")
else:
notify_qg_failure(task_id, current_stage, "check_ci_green", reason)
@@ -330,18 +341,22 @@ async def handle_pr(payload: dict):
passed, reason = check_review_approved(repo_name, pr_number)
if passed:
next_stage = "testing"
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
# ORCH-114 (adr-0045 / D4, TR-4): PR-approved advance in BYPASS of
# advance_stage -> expected-stage CAS; a lost race skips notify + enqueue.
if transition_lease.commit_stage_cas(task_id, current_stage, next_stage, repo_name):
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
logger.info(f"Task {task_id}: PR-approved stage-CAS lost ({current_stage}->{next_stage}); another writer moved it")
else:
notify_qg_failure(task_id, current_stage, "check_review_approved", reason)
@@ -355,18 +370,24 @@ async def handle_pr(payload: dict):
conn.close()
if retry_count < MAX_DEV_RETRIES:
# Back to development, relaunch developer
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
try:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n"
f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
)
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to relaunch developer: {e}")
# Back to development, relaunch developer.
# ORCH-114 (adr-0045 / D4, TR-4): REQUEST_CHANGES rollback writes the
# stage in BYPASS of advance_stage -> expected-stage CAS so it cannot
# clobber a concurrent authoritative write (e.g. a task that already
# advanced); a lost race skips the rollback + developer relaunch.
if transition_lease.commit_stage_cas(task_id, current_stage, "development", repo_name):
notify_stage_change(task_id, current_stage, "development")
try:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n"
f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
)
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to relaunch developer: {e}")
else:
logger.info(f"Task {task_id}: REQUEST_CHANGES rollback stage-CAS lost ({current_stage}->development); another writer moved it")
else:
notify_error(task_id, f"Max developer retries ({MAX_DEV_RETRIES}) reached, escalating")
logger.error(f"Task {task_id}: max retries reached, needs manual intervention")
@@ -395,6 +416,11 @@ async def handle_pr(payload: dict):
f"deployer verdict (check_deploy_status), ignoring merge-driven done."
)
return
update_task_stage(task_id, "done")
notify_stage_change(task_id, current_stage, "done")
logger.info(f"Task {task_id}: PR merged, stage → done")
# ORCH-114 (adr-0045 / D4, TR-4): merge-driven done writes the stage in BYPASS
# of advance_stage -> expected-stage CAS so a concurrent authoritative writer
# is not clobbered; a lost race skips the (idempotent) notify.
if transition_lease.commit_stage_cas(task_id, current_stage, "done", repo_name):
notify_stage_change(task_id, current_stage, "done")
logger.info(f"Task {task_id}: PR merged, stage → done")
else:
logger.info(f"Task {task_id}: merge-driven done stage-CAS lost ({current_stage}->done); another writer moved it")

View File

@@ -14,7 +14,6 @@ from ..db import (
get_task_by_plane_id,
get_next_work_item_id,
ensure_unique_work_item_id,
update_task_stage,
enqueue_job,
insert_event_dedup,
create_task_atomic,
@@ -35,6 +34,7 @@ from ..projects import (
get_project_by_repo,
known_plane_project_ids,
)
from .. import transition_lease
logger = logging.getLogger("orchestrator.webhooks.plane")
@@ -803,7 +803,17 @@ async def _rollback_stage(
if not prev_stage:
logger.info(f"Task {task_id}: rejected at {current_stage} but no previous stage")
return
update_task_stage(task_id, prev_stage)
# ORCH-114 (adr-0045 / D4, TR-4): this Rejected-rollback writes the stage in
# BYPASS of advance_stage. Route it through the expected-stage CAS so it can never
# clobber an authoritative write made by a concurrent owner (e.g. a deploy->done
# finalizer) — a lost race aborts the rollback WITHOUT its side effects. Kill-switch
# off / repo out of scope -> unconditional update (byte-for-byte).
if not transition_lease.commit_stage_cas(task_id, current_stage, prev_stage, repo):
logger.info(
f"Task {task_id}: rollback stage-CAS lost ({current_stage}->{prev_stage}) "
f"— task already moved by another writer; skipping rollback"
)
return
notify_stage_change(task_id, current_stage, prev_stage)
# Feature 3: plane_notify_stage moves the board to the prev stage's status.
plane_notify_stage(work_item_id, current_stage, prev_stage)
@@ -857,10 +867,25 @@ async def _try_advance_stage(
advance_stage). It is True ONLY on the "Confirm Deploy" path
(handle_confirm_deploy) and gates Phase B of the self-hosting prod deploy; the
plain Approved path (handle_verdict) leaves it at the default False.
ORCH-114 (adr-0045 / FR-5, AC-8): if a live actor already owns this task's
side-effectful transition (transition-lease active), DEFER — do not re-enter the
transition in parallel. The late legitimate signal is not lost: once the owner
releases (or dies and the reaper reclaims), a re-approve / the reconciler re-drives
it, or advance_stage becomes an idempotent no-op against the authoritative facts
(SHA-in-main / INITIATED). never raises; no-op when the lease is disabled / repo
out of scope.
"""
import asyncio
from ..stage_engine import advance_stage
if transition_lease.is_held_by_live_owner(task_id):
logger.info(
f"Task {task_id}: transition-lease active — deferring webhook advance "
f"from {current_stage} (confirm_deploy={confirm_deploy})"
)
return
await asyncio.to_thread(
advance_stage,
task_id,