Files
orchestrator/tests/test_orch114_transition_ownership.py
claude-bot c4a97a7a28 fix(stage-engine): address ORCH-114 review — env/docs canon + in-region rollback CAS
Resolves the REQUEST_CHANGES findings on ORCH-114 (durable transition-ownership
lease + expected-stage CAS):

P1 — documentation = golden source:
- .env.example: add ORCH_TRANSITION_LEASE_ENABLED / ORCH_TRANSITION_LEASE_REPOS
  (canon of 100% start keys, ORCH-101), next to the other gate kill-switches.
- CLAUDE.md: add the ORCH-114 passport section (mechanism, invariant, flags,
  ADR links) so a future agent editing advance_stage/reaper/webhooks finds the
  ownership invariant in the first mandatory-read doc (ORCH-078 traceability index).

P2 — should-fix:
- docs/overview/ (system showcase, ORCH-011): add transition_lease to
  tech-data-model.md (helper tables), tech-observability.md (/queue blocks) and
  tech-architecture.md (components).
- ADR-001 D4 alignment: the four side-effectful-edge rollback handlers
  (_handle_merge_gate_rollback / _handle_security_gate / _handle_coverage_gate /
  _handle_image_freshness) now write `development` through the expected-stage CAS
  via a shared _rollback_stage_cas helper (defence against the rollback↔done
  contradiction, BR-6) instead of a bare unconditional update_task_stage. Under the
  held lease the sole owner always wins; a lost race aborts WITHOUT side effects.
  Kill-switch off / out-of-scope repo -> degenerates to the prior write -> 1:1.
- Test isolation: make tests/test_webhooks.py order-independent by pinning the
  proj-1 registry per-test (mirrors test_webhook_dedup.proj_registry); it had only
  passed by relying on import order. Drop the needless module-level ORCH_DB_PATH
  setdefault in test_orch114 (fresh_db already isolates db_path).

New regression tests (TC-11): in-region rollback writes route through CAS;
rollback CAS wins when at expected stage; rollback CAS-lost does NOT clobber `done`;
kill-switch-off rollback degenerates to the unconditional write.

ruff clean (src/stage_engine.py, src/transition_lease.py); full suite 2052 passed.

Refs: ORCH-114
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 19:28:38 +03:00

715 lines
31 KiB
Python

"""ORCH-114 (adr-0045): durable transition-ownership lease + expected-stage CAS.
Covers FR-1…FR-7 / AC-1…AC-13 (TC-01..TC-14, see 04-test-plan.yaml). The mechanism
prevents a concurrent OR post-restart re-entry into a side-effectful stage transition
(``deploy-staging -> deploy`` sub-gates, ``deploy -> done`` merge-verify, Phase C
finalize) from re-applying an irreversible effect or producing a contradictory
rollback↔done — incident ORCH-111.
No network / no real git / no docker / no prod: the heavy edge sub-gates and the
finalization handlers are stubbed with call-counters and the DB is driven directly
(the same convention as test_orch113_reaper_finalizer_liveness.py).
The autouse conftest fixture defaults the kill-switch OFF for the whole suite; this
module re-enables it per test (``_enable``) so the feature is scoped ON here.
"""
import inspect
import os
import tempfile
import pytest
# NB: deliberately NO module-level os.environ["ORCH_DB_PATH"] setdefault — pinning the
# process-wide settings.db_path on first import is needless here (the autouse `fresh_db`
# fixture below isolates db_path per-test via monkeypatch). The cross-module settings
# singleton (e.g. ORCH_PROJECTS_JSON) is whoever imports `src` first; test_webhooks now
# pins its own registry per-test rather than relying on import order (ORCH-114 review P2).
os.environ.setdefault("ORCH_REPOS_DIR", tempfile.gettempdir())
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db
from src.db import init_db, get_db, get_job, update_task_stage_cas
import src.transition_lease as tl
import src.stage_engine as se
from src.job_reaper import JobReaper
_REPO = "orchestrator" # self-hosting -> transition_lease.applies(repo) is True
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "orch114.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
init_db()
# Reset the leaf's in-memory counters between tests (process-local module state).
with tl._LOCK:
for k in tl._COUNTERS:
tl._COUNTERS[k] = 0
yield
def _enable(monkeypatch, repos: str = ""):
"""Turn the ORCH-114 mechanism ON (it is OFF by default via conftest)."""
monkeypatch.setattr(db.settings, "transition_lease_enabled", True, raising=False)
monkeypatch.setattr(db.settings, "transition_lease_repos", repos, raising=False)
def _disable(monkeypatch):
monkeypatch.setattr(db.settings, "transition_lease_enabled", False, raising=False)
# --- helpers ----------------------------------------------------------------
def _make_task(stage="deploy-staging", repo=_REPO, branch="feature/orch114",
work_item_id="ORCH-114"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, branch, stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _task_stage(tid):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (tid,)).fetchone()
conn.close()
return row[0] if row else None
def _make_running_job(agent="deployer", repo=_REPO, task_id=None, pid=None,
age_s=0, attempts=0, max_attempts=2, run_id=None,
exit_code=0, finished_age_s=600):
conn = get_db()
if run_id is None and exit_code is not None:
cur = conn.execute(
"INSERT INTO agent_runs (task_id, agent, finished_at, exit_code) "
"VALUES (?, ?, datetime('now', ?), ?)",
(task_id, agent, f"-{int(finished_age_s)} seconds", exit_code),
)
run_id = cur.lastrowid
cur = conn.execute(
"INSERT INTO jobs (agent, repo, task_id, status, attempts, max_attempts, "
"run_id, pid, started_at) "
"VALUES (?, ?, ?, 'running', ?, ?, ?, ?, datetime('now', ?))",
(agent, repo, task_id, attempts, max_attempts, run_id, pid,
f"-{int(age_s)} seconds"),
)
job_id = cur.lastrowid
conn.commit()
conn.close()
return job_id
def _stub_side_effects(monkeypatch):
"""Patch the deploy-staging edge sub-gates + Phase A with call-counters.
Each sub-gate returns False (no intervention) so advance_stage proceeds to Phase
A; Phase A is stubbed to a counter that does NOT touch the network/prod. Returns
the shared ``counts`` dict.
"""
counts = {"security": 0, "merge": 0, "coverage": 0, "image": 0, "phase_a": 0}
def _mk(key):
def _fake(task_id, current_stage, repo, work_item_id, branch, agent, result):
counts[key] += 1
return False # no intervention -> advance continues
return _fake
monkeypatch.setattr(se, "_handle_security_gate", _mk("security"))
monkeypatch.setattr(se, "_handle_merge_gate", _mk("merge"))
monkeypatch.setattr(se, "_handle_coverage_gate", _mk("coverage"))
monkeypatch.setattr(se, "_handle_image_freshness", _mk("image"))
def _fake_phase_a(task_id, current_stage, repo, work_item_id, branch, result):
counts["phase_a"] += 1
result.advanced = True
result.to_stage = "deploy"
monkeypatch.setattr(se, "_handle_self_deploy_phase_a", _fake_phase_a)
# The QG (check_staging_status) is the entry gate; force it green so we reach the
# side-effectful sub-gates instead of rolling back.
monkeypatch.setattr(se, "_run_qg", lambda *a, **k: (True, "ok"))
return counts
# ===========================================================================
# TC-01 — MANDATORY regression: no double effect on concurrent entry (AC-1)
# ===========================================================================
def test_tc01_concurrent_entry_no_double_effect(monkeypatch):
_enable(monkeypatch)
counts = _stub_side_effects(monkeypatch)
tid = _make_task(stage="deploy-staging")
# Actor A — a LIVE finalizer — owns the transition (acquired on entry).
assert tl.acquire(tid, "monitor", stage="deploy-staging") is True
# Actor B (reaper/reconciler/webhook re-drive) enters the SAME transition.
res_b = se.advance_stage(tid, "deploy-staging", _REPO, "ORCH-114", "feature/orch114",
finished_agent=None)
# Busy -> deferred WITHOUT any side effect, stage unchanged.
assert res_b.note == "transition-lease-busy"
assert res_b.advanced is False
assert counts == {"security": 0, "merge": 0, "coverage": 0, "image": 0, "phase_a": 0}
assert _task_stage(tid) == "deploy-staging"
# The owner finishes (release), then drives the transition exactly once.
tl.release(tid, force=True)
res_a = se.advance_stage(tid, "deploy-staging", _REPO, "ORCH-114", "feature/orch114",
finished_agent="deployer")
# Each side-effectful step ran EXACTLY once (one consistent outcome).
assert counts == {"security": 1, "merge": 1, "coverage": 1, "image": 1, "phase_a": 1}
assert res_a.advanced is True
def test_tc01_red_before_fix_demonstration(monkeypatch):
"""With the kill-switch OFF (== before ORCH-114) the second actor is NOT blocked
and re-runs every sub-gate -> the double-effect bug. This is the RED that the
lease turns GREEN."""
_disable(monkeypatch)
counts = _stub_side_effects(monkeypatch)
tid = _make_task(stage="deploy-staging")
# acquire is a no-op when disabled -> no owner-exclusion.
assert tl.acquire(tid, "monitor", stage="deploy-staging") is True
se.advance_stage(tid, "deploy-staging", _REPO, "ORCH-114", "feature/orch114",
finished_agent=None)
# Without the lease the "second" actor ran the side effects (the bug).
assert counts["merge"] == 1 and counts["security"] == 1
# ===========================================================================
# TC-02 — expected-stage CAS on the stage write (AC-2)
# ===========================================================================
def test_tc02_cas_first_wins_second_lost(monkeypatch):
tid = _make_task(stage="review")
# First writer with the correct expectation wins.
assert update_task_stage_cas(tid, "review", "testing") is True
assert _task_stage(tid) == "testing"
# Second writer with the now-stale expectation loses; stage is NOT re-mutated.
assert update_task_stage_cas(tid, "review", "development") is False
assert _task_stage(tid) == "testing"
def test_tc02_commit_cas_killswitch_off_unconditional(monkeypatch):
"""Kill-switch off / repo out of scope -> commit_stage_cas degenerates to the
prior unconditional update_task_stage (byte-for-byte: the expected_stage is
ignored, the write always lands)."""
_disable(monkeypatch)
tid = _make_task(stage="review")
# Even a WRONG expected stage writes unconditionally when the mechanism is off.
assert tl.commit_stage_cas(tid, "totally-wrong", "testing", _REPO) is True
assert _task_stage(tid) == "testing"
def test_tc02_commit_cas_enabled_does_real_cas(monkeypatch):
_enable(monkeypatch)
tid = _make_task(stage="review")
# Wrong expectation -> CAS lost, no write.
assert tl.commit_stage_cas(tid, "wrong", "testing", _REPO) is False
assert _task_stage(tid) == "review"
# Correct expectation -> CAS won.
assert tl.commit_stage_cas(tid, "review", "testing", _REPO) is True
assert _task_stage(tid) == "testing"
# ===========================================================================
# TC-03 — ownership lifecycle: acquire / release / reclaim (AC-3)
# ===========================================================================
def test_tc03_acquire_release_visible_durably(monkeypatch):
_enable(monkeypatch)
tid = _make_task()
assert tl.is_held_by_live_owner(tid) is False
assert tl.acquire(tid, "monitor", run_id=7, stage="deploy-staging") is True
assert tl.is_held_by_live_owner(tid) is True
# Durable: a fresh DB read (snapshot) sees the holder.
snap = tl.snapshot()
assert snap["active"] == 1
assert snap["holders"][0]["task_id"] == tid
assert snap["holders"][0]["owner"] == "monitor"
assert snap["holders"][0]["live"] is True
# A second acquire by another actor is busy while the live owner holds it.
assert tl.acquire(tid, "reaper", stage="deploy-staging") is False
tl.release(tid, force=True)
assert tl.is_held_by_live_owner(tid) is False
def test_tc03_release_in_finally_on_exception(monkeypatch):
"""advance_stage must release the lease even when a sub-gate raises (try/finally)."""
_enable(monkeypatch)
monkeypatch.setattr(se, "_run_qg", lambda *a, **k: (True, "ok"))
def _boom(*a, **k):
raise RuntimeError("sub-gate exploded")
monkeypatch.setattr(se, "_handle_security_gate", _boom)
tid = _make_task(stage="deploy-staging")
res = se.advance_stage(tid, "deploy-staging", _REPO, "ORCH-114", "feature/orch114",
finished_agent="deployer")
# The outer except swallowed the error; the finally released the lease.
assert res.advanced is False
assert tl.is_held_by_live_owner(tid) is False
# ===========================================================================
# TC-04 — reaper defers on a live lease, cross-path (beyond deploy-staging) (AC-4)
# ===========================================================================
def test_tc04_reaper_defers_on_deploy_edge(monkeypatch):
"""ORCH-114 generalises ORCH-113 beyond Tier-2/deploy-staging: a live lease on the
deploy->done edge also defers the reaper."""
_enable(monkeypatch)
monkeypatch.setattr(JobReaper, "_gate_is_green",
lambda self, stage, job, branch, wid: True)
calls = []
import src.agents.launcher as L
monkeypatch.setattr(L.launcher, "_try_advance_stage",
lambda *a, **k: calls.append(a))
tid = _make_task(stage="deploy") # NOT deploy-staging -> proves generalisation
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=600)
assert tl.acquire(tid, "finalizer", stage="deploy") is True
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "running" # not reaped
assert calls == [] # no second advance
assert r.finalizer_defers_total == 1
# ===========================================================================
# TC-05 — reaper reaps a dead/stale lease in bounded time (Tier-3) (AC-5)
# ===========================================================================
def test_tc05_tier3_backstop_reaps_and_releases_lease(monkeypatch):
_enable(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_max_running_s", 1000)
tid = _make_task(stage="deploy")
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=10,
age_s=2000, attempts=0, max_attempts=2)
assert tl.acquire(tid, "finalizer", stage="deploy") is True
r = JobReaper()
r.reap_once()
# Backstop reaps regardless of the marker; the lease is force-released with the job.
assert get_job(jid)["status"] == "queued"
assert tl.is_held_by_live_owner(tid) is False
def test_tc05_reclaim_if_stale_removes_dead_boot_row(monkeypatch):
_enable(monkeypatch)
tid = _make_task()
# A row from a PREVIOUS process boot (a dead owner) is stale.
conn = get_db()
conn.execute(
"INSERT INTO transition_lease (task_id, owner, owner_pid, owner_boot_id) "
"VALUES (?, 'monitor', 1, 'OLD-DEAD-BOOT')",
(tid,),
)
conn.commit()
conn.close()
assert tl.is_held_by_live_owner(tid) is False # stale -> not live
assert tl.reclaim_if_stale(tid) is True
assert tl.snapshot()["active"] == 0
def test_tc05_budget_invariant_preserved():
"""The lease introduced no new TTL; the cross-cutting reaper budget is untouched."""
s = db.settings
assert s.reaper_max_running_s == 5400
assert s.reaper_finalize_grace_s == 300
sigma = s.merge_retest_timeout_s + s.coverage_run_timeout_s
assert s.reaper_max_running_s > sigma + s.reaper_finalize_grace_s
# ===========================================================================
# TC-06 — smart restart recovery (AC-6)
# ===========================================================================
def test_tc06_recover_on_startup_clears_previous_boot_lease(monkeypatch):
_enable(monkeypatch)
tid = _make_task(stage="deploy")
# Simulate a process that died MID-finalization: a lease row with a DIFFERENT boot.
conn = get_db()
conn.execute(
"INSERT INTO transition_lease (task_id, owner, owner_pid, owner_boot_id) "
"VALUES (?, 'finalizer', 999999, 'PREVIOUS-BOOT')",
(tid,),
)
conn.commit()
conn.close()
# Before recovery the row is stale (boot mismatch) -> not a live owner.
assert tl.is_held_by_live_owner(tid) is False
# Startup recovery (after requeue_running_jobs) clears it deterministically.
assert tl.recover_on_startup() == 1
assert tl.snapshot()["active"] == 0
# The requeued job can now re-drive the transition cleanly (no stale owner blocks).
assert tl.acquire(tid, "monitor", stage="deploy") is True
def test_tc06_recovery_does_not_touch_current_boot_lease(monkeypatch):
"""A lease this very process holds must NOT be cleared by recovery (only previous
boots are stale)."""
_enable(monkeypatch)
tid = _make_task()
assert tl.acquire(tid, "monitor", stage="deploy-staging") is True
assert tl.recover_on_startup() == 0 # current-boot lease is live, kept
assert tl.is_held_by_live_owner(tid) is True
# ===========================================================================
# TC-07 — reconciler F-1 defers on an active lease (AC-7)
# ===========================================================================
def test_tc07_reconciler_f1_defers(monkeypatch):
_enable(monkeypatch)
from src.reconciler import Reconciler
import src.reconciler as rec
# Spy on the advance path; it must NOT be called while the lease is held.
advanced = []
monkeypatch.setattr(rec, "advance_if_gate_passed",
lambda *a, **k: advanced.append(a))
# Pass the cheap local guards so we reach the lease check.
monkeypatch.setattr(rec, "has_active_job_for_task", lambda *a, **k: False)
monkeypatch.setattr(rec, "developer_retry_count", lambda *a, **k: 0)
monkeypatch.setattr(rec, "MAX_DEVELOPER_RETRIES", 3, raising=False)
monkeypatch.setattr(rec, "grace_for_stage", lambda *a, **k: 0)
r = Reconciler()
monkeypatch.setattr(r, "_resolve_issue_status", lambda task: ({}, {}, None))
monkeypatch.setattr(r, "_is_terminal_state", lambda *a, **k: False)
monkeypatch.setattr(r, "_is_blocked_or_needs_input", lambda *a, **k: False)
tid = _make_task(stage="review")
assert tl.acquire(tid, "monitor", stage="review") is True
r._reconcile_gate_task({
"id": tid, "stage": "review", "repo": _REPO,
"work_item_id": "ORCH-114", "branch": "feature/orch114", "age_s": 10_000,
})
assert advanced == [] # F-1 deferred
assert r.transition_lease_defers_total == 1
# ===========================================================================
# TC-08 — webhook path defers on an active lease (AC-8)
# ===========================================================================
def test_tc08_plane_webhook_defers(monkeypatch):
_enable(monkeypatch)
import asyncio
from src.webhooks.plane import _try_advance_stage
called = []
monkeypatch.setattr(se, "advance_stage", lambda *a, **k: called.append(a))
tid = _make_task(stage="deploy")
assert tl.acquire(tid, "finalizer", stage="deploy") is True
# Lease held -> the webhook advance is deferred (advance_stage NOT invoked).
asyncio.run(_try_advance_stage(tid, "deploy", _REPO, "ORCH-114", "feature/orch114"))
assert called == []
# The late legitimate signal is not lost: after release it advances.
tl.release(tid, force=True)
asyncio.run(_try_advance_stage(tid, "deploy", _REPO, "ORCH-114", "feature/orch114"))
assert len(called) == 1
# ===========================================================================
# TC-09 — kill-switch off -> byte-for-byte prior (AC-9)
# ===========================================================================
def test_tc09_killswitch_off_inert(monkeypatch):
_disable(monkeypatch)
tid = _make_task(stage="review")
# Lease neither written nor read.
assert tl.acquire(tid, "monitor", stage="review") is True # no-op True
assert tl.is_held_by_live_owner(tid) is False
assert tl.snapshot()["enabled"] is False
assert tl.snapshot()["active"] == 0
# CAS degenerates to the unconditional update (expected ignored).
assert tl.commit_stage_cas(tid, "anything", "testing", _REPO) is True
assert _task_stage(tid) == "testing"
def test_tc09_applies_scope(monkeypatch):
_enable(monkeypatch) # empty repos CSV -> self-hosting only
assert tl.applies("orchestrator") is True
assert tl.applies("enduro-trails") is False
# Explicit CSV scope.
_enable(monkeypatch, repos="enduro-trails")
assert tl.applies("enduro-trails") is True
assert tl.applies("orchestrator") is False
# ===========================================================================
# TC-10 — never-raise + fail-open (hot path) / fail-closed (prod safety) (AC-10)
# ===========================================================================
def test_tc10_never_raise_on_db_error(monkeypatch):
_enable(monkeypatch)
def _boom(*a, **k):
raise RuntimeError("DB exploded")
monkeypatch.setattr(tl.db, "get_db", _boom)
# acquire -> fail-CLOSED (busy) so a side-effectful caller DEFERS (no double effect).
assert tl.acquire(123, "monitor", stage="deploy") is False
# is_held_by_live_owner -> fail-CLOSED (treat as held -> conservative defer).
assert tl.is_held_by_live_owner(123) is True
# release / reclaim / recover / snapshot never raise.
tl.release(123, force=True)
assert tl.reclaim_if_stale(123) is False
assert tl.recover_on_startup() == 0
assert isinstance(tl.snapshot(), dict)
def test_tc10_commit_cas_error_aborts_write(monkeypatch):
_enable(monkeypatch)
monkeypatch.setattr(tl.db, "update_task_stage_cas",
lambda *a, **k: (_ for _ in ()).throw(RuntimeError("boom")))
# CAS error -> abort the write (never a blind overwrite) -> False, no raise.
assert tl.commit_stage_cas(1, "review", "testing", _REPO) is False
def test_tc10_hot_claim_path_not_touched():
"""AC-8 ORCH-088 intact: the hot claim path does NOT consult the transition-lease,
so a lease bug can never wedge the shared queue (fail-open by construction)."""
src_claim = inspect.getsource(db.claim_next_job)
assert "transition_lease" not in src_claim
# ===========================================================================
# TC-11 — structural audit: pipeline invariants untouched, storage additive (AC-11)
# ===========================================================================
def test_tc11_stage_transitions_and_qg_untouched():
from src.stages import STAGE_TRANSITIONS
from src.qg.checks import QG_CHECKS
# The canonical edge order is intact (no new stages/edges).
assert STAGE_TRANSITIONS["deploy-staging"]["next"] == "deploy"
assert STAGE_TRANSITIONS["deploy-staging"]["qg"] == "check_staging_status"
assert STAGE_TRANSITIONS["deploy"]["next"] == "done"
# The QG registry still carries the machine-verdict gates byte-for-byte.
for name in ("check_staging_status", "check_deploy_status", "check_coverage_gate"):
assert name in QG_CHECKS
def test_tc11_storage_additive_existing_tables_unchanged():
conn = get_db()
# The additive table exists (CREATE TABLE IF NOT EXISTS).
row = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='transition_lease'"
).fetchone()
assert row is not None
# `tasks` schema is byte-for-byte: NO epoch/version column was added (ADR D2).
cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()}
conn.close()
assert "epoch" not in cols and "version" not in cols
assert {"id", "stage", "repo", "branch", "work_item_id"} <= cols
def test_tc11_bypass_paths_use_cas_not_unconditional_write():
"""The 6 bypass writers (gitea x5 + plane rollback) + the main advance write route
through commit_stage_cas; none does an unconditional update_task_stage on the
concurrent path (TR-4)."""
import src.webhooks.gitea as g
import src.webhooks.plane as p
gsrc = inspect.getsource(g)
assert "commit_stage_cas" in gsrc
# The gitea handlers no longer import / call the bare update_task_stage.
assert "update_task_stage(" not in gsrc
psrc = inspect.getsource(p._rollback_stage)
assert "commit_stage_cas" in psrc
assert "update_task_stage(" not in psrc
# The main advance write uses CAS.
asrc = inspect.getsource(se.advance_stage)
assert "commit_stage_cas(task_id, current_stage, next_stage" in asrc
def test_tc11_inregion_rollback_writes_use_cas(monkeypatch):
"""ADR-001 D4: the four side-effectful-edge rollback handlers
(_handle_merge_gate_rollback / _handle_security_gate / _handle_coverage_gate /
_handle_image_freshness) write `development` through the expected-stage CAS
(via _rollback_stage_cas), NOT a bare unconditional update_task_stage. (The
non-side-effectful launcher rollbacks in _handle_qg_failure_rollbacks are out of
scope — no lease is held there.)"""
for fn in (
se._handle_merge_gate_rollback,
se._handle_security_gate,
se._handle_coverage_gate,
se._handle_image_freshness,
):
src = inspect.getsource(fn)
assert "_rollback_stage_cas(task_id, current_stage, repo, result)" in src, (
f"{fn.__name__} must route the rollback write through the CAS helper"
)
assert 'update_task_stage(task_id, "development")' not in src, (
f"{fn.__name__} must not do a bare unconditional rollback write"
)
# The helper itself goes through commit_stage_cas.
assert "commit_stage_cas(task_id, current_stage" in inspect.getsource(
se._rollback_stage_cas
)
def test_tc11_rollback_cas_wins_when_at_expected_stage(monkeypatch):
"""With the mechanism ON, a rollback whose task is STILL at current_stage wins the
CAS -> the stage is written to `development` and the caller proceeds (returns True)."""
_enable(monkeypatch)
tid = _make_task(stage="deploy-staging")
result = se.AdvanceResult()
assert se._rollback_stage_cas(tid, "deploy-staging", _REPO, result) is True
assert _task_stage(tid) == "development"
assert result.note != "rollback-cas-lost"
def test_tc11_rollback_cas_lost_aborts_without_overwriting_done(monkeypatch):
"""BR-6 / ADR-001 D4: if a concurrent winner already advanced the task to `done`,
the stale rollback LOSES the expected-stage CAS -> it must NOT overwrite `done`
with `development`, and the caller aborts the rollback side effects."""
_enable(monkeypatch)
tid = _make_task(stage="deploy-staging")
# Simulate a concurrent winner having advanced the task to terminal `done`.
conn = get_db()
conn.execute("UPDATE tasks SET stage='done' WHERE id=?", (tid,))
conn.commit()
conn.close()
result = se.AdvanceResult()
# The rollback still believes current_stage is deploy-staging (its read-on-entry).
assert se._rollback_stage_cas(tid, "deploy-staging", _REPO, result) is False
assert _task_stage(tid) == "done" # NOT clobbered back to development
assert result.note == "rollback-cas-lost"
def test_tc11_rollback_cas_killswitch_off_unconditional(monkeypatch):
"""Kill-switch off -> _rollback_stage_cas degenerates to the prior unconditional
write (always True, no CAS), so behaviour is byte-for-byte pre-ORCH-114 (AC-9)."""
_disable(monkeypatch)
tid = _make_task(stage="done") # even a mismatched stage writes unconditionally
result = se.AdvanceResult()
assert se._rollback_stage_cas(tid, "deploy-staging", _REPO, result) is True
assert _task_stage(tid) == "development"
# ===========================================================================
# TC-12 — observability (AC-12)
# ===========================================================================
def test_tc12_snapshot_shape_and_counters(monkeypatch):
_enable(monkeypatch)
tid = _make_task(stage="deploy-staging")
tl.acquire(tid, "monitor", run_id=3, stage="deploy-staging")
snap = tl.snapshot()
assert snap["enabled"] is True
assert snap["active"] == 1
assert set(snap.keys()) >= {"enabled", "repos", "boot_id", "active", "holders", "counters"}
h = snap["holders"][0]
assert {"task_id", "owner", "stage", "age_s", "live"} <= set(h.keys())
assert snap["counters"]["acquired_total"] >= 1
def test_tc12_forced_reclaim_emits_telegram(monkeypatch):
_enable(monkeypatch)
sent = []
monkeypatch.setattr("src.notifications.send_telegram",
lambda *a, **k: sent.append(a), raising=False)
tid = _make_task()
# A previous-boot (stale) lease that recovery force-reclaims at startup.
conn = get_db()
conn.execute(
"INSERT INTO transition_lease (task_id, owner, owner_pid, owner_boot_id) "
"VALUES (?, 'finalizer', 1, 'PREV-BOOT')",
(tid,),
)
conn.commit()
conn.close()
assert tl.recover_on_startup() == 1
assert len(sent) == 1 # forced/stale reclaim is observable via Telegram
def test_tc12_queue_block_wired():
"""GET /queue carries the additive transition_lease block (read-only)."""
import src.main as main_mod
qsrc = inspect.getsource(main_mod.queue)
assert '"transition_lease": transition_lease.snapshot()' in qsrc
# ===========================================================================
# TC-13 — self-hosting safety (AC-13)
# ===========================================================================
def _code_only(module) -> str:
"""Return the module source with comments AND string literals stripped, so a
structural audit scans EXECUTABLE code only (not docstring prose). Mirrors the
tokenize approach of tests/test_no_host_hardcodes.py."""
import io
import tokenize
src = inspect.getsource(module)
out = []
for tok in tokenize.generate_tokens(io.StringIO(src).readline):
if tok.type in (tokenize.COMMENT, tokenize.STRING):
continue
out.append(tok.string)
return " ".join(out)
def test_tc13_leaf_has_no_dangerous_side_effects():
"""The ownership mechanism never restarts the prod container, never pushes /
force-pushes main, never spawns a subprocess and never touches the detached
deploy process. Scans EXECUTABLE code only (docstring prose is excluded)."""
code = _code_only(tl)
forbidden = ["subprocess", "system", "docker", "force_push", "Popen",
"os.kill", "restart", "rmtree", "remove"]
for token in forbidden:
assert token not in code, f"transition_lease must not reference {token!r} in code"
def test_tc13_leaf_imports_only_safe_modules():
"""The leaf imports only db + config at module load (lazily merge_gate / qg /
notifications) — it never imports stage_engine / launcher / self_deploy."""
src_tl = inspect.getsource(tl)
assert "import stage_engine" not in src_tl
assert "from .stage_engine" not in src_tl
assert "import launcher" not in src_tl
assert "self_deploy" not in src_tl
# ===========================================================================
# TC-14 — full pipeline happy-path with the mechanism ON (BR-8)
# ===========================================================================
def test_tc14_single_actor_happy_path_one_set_of_effects(monkeypatch):
"""A single advance on deploy-staging with the mechanism ON runs each sub-gate
exactly once and leaves NO lease behind (clean acquire+release)."""
_enable(monkeypatch)
counts = _stub_side_effects(monkeypatch)
tid = _make_task(stage="deploy-staging")
res = se.advance_stage(tid, "deploy-staging", _REPO, "ORCH-114", "feature/orch114",
finished_agent="deployer")
assert counts == {"security": 1, "merge": 1, "coverage": 1, "image": 1, "phase_a": 1}
assert res.advanced is True
# The lease was released in the finally (no leak).
assert tl.is_held_by_live_owner(tid) is False
def test_tc14_deploy_to_done_finalize_advances_via_cas(monkeypatch):
"""The deploy->done finalize path (Phase C) reaches the terminal write via the CAS
and releases the lease (single consistent done)."""
_enable(monkeypatch)
monkeypatch.setattr(se, "_run_qg", lambda *a, **k: (True, "ok"))
# merge-verify CONFIRMED (no HOLD) so advance proceeds to done.
monkeypatch.setattr(se, "_handle_merge_verify", lambda *a, **k: False)
# Avoid post-deploy / plane side effects on the done write.
monkeypatch.setattr(se.post_deploy, "post_deploy_applies", lambda *a, **k: False)
monkeypatch.setattr(se, "set_issue_done", lambda *a, **k: None, raising=False)
monkeypatch.setattr(se.merge_gate, "release_merge_lease", lambda *a, **k: None)
monkeypatch.setattr(se, "enqueue_job", lambda *a, **k: 1, raising=False)
tid = _make_task(stage="deploy")
res = se.advance_stage(tid, "deploy", _REPO, "ORCH-114", "feature/orch114",
finished_agent="deployer")
assert res.advanced is True
assert _task_stage(tid) == "done"
assert tl.is_held_by_live_owner(tid) is False