Files
orchestrator/tests/test_orch113_reaper_finalizer_liveness.py
claude-bot 7cb1f83f6c fix(reaper): do not re-run deploy-staging finalization while finalizer is alive
On the deploy-staging -> deploy edge the live monitor stamps
agent_runs.finished_at FIRST, then runs the heavy edge sub-gates
(security/merge-gate re-test/coverage/image-freshness) in-thread for MINUTES
and only THEN _finalize_job. Reaper Tier-2 measures finished_age_s from
finished_at, so past reaper_finalize_grace_s it treated the live, long
finalizer as dead and independently re-ran the advance -> a second re-test
went red -> false rollback deploy-staging -> development while the original
finalizer concurrently merged the PR (incident ORCH-111, job 1914).

Add a process-local finalizer-ownership registry (src/finalizer_liveness.py,
never-raise): the monitor mark()s ownership right after the exit_code stamp and
clear()s it in a try/finally around the (verbatim-extracted) finalization tail,
so an exception in the monitor thread still releases ownership and a genuinely
dead finalizer is reaped. The reaper Tier-2 consults the marker only when the
kill-switch is on AND the task stage == deploy-staging AND ownership is active
-> DEFER (no second advance) and fall through to the Tier-3 backstop, which
ignores the marker (a stuck/dead finalizer is still reaped in bounded time).
In-memory is authoritative (monitor + reaper are daemon threads of one uvicorn
process); restart is covered by the startup requeue_running_jobs.

Additive, global kill-switch reaper_finalizer_liveness_enabled (default True;
false -> reaper byte-for-byte prior). STAGE_TRANSITIONS / QG_CHECKS / every
check_* / machine-verdict keys / DB schema unchanged; grace/ceiling and the
ORCH-065/109/110 budget invariant untouched; never restarts prod, never pushes
main. Observability: finalizer_defers_total + finalizer_owned in GET /queue.
Tests: tests/test_orch113_reaper_finalizer_liveness.py (TC-01..TC-08, incl. the
mandatory ORCH-111 regression: red before the fix, green after).

Refs: ORCH-113

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 13:08:41 +03:00

387 lines
16 KiB
Python

"""ORCH-113 (adr-0043): reaper must not re-run deploy-staging finalization while
the original finalizer is alive — finalizer-liveness ownership tests (TC-01..TC-08).
Covers the bug from incident ORCH-111 (deployer job 1914): on the
``deploy-staging -> deploy`` edge the live monitor runs the heavy edge sub-gates
(security/merge-gate re-test/coverage/image-freshness) in-thread for MINUTES AFTER
stamping ``agent_runs.finished_at`` and BEFORE ``_finalize_job``. Reaper Tier-2
measures ``finished_age_s`` from ``finished_at``, so past ``reaper_finalize_grace_s``
it treated the live, long-finalizing monitor as dead and independently re-ran the
advance -> a second re-test went red -> false rollback ``deploy-staging ->
development`` while the original finalizer concurrently merged the PR. State diverged.
The reaper never spawns claude / pytest / docker; we drive the DB directly (a
'running' jobs row + a linked agent_runs exit_code) and the process-local
``finalizer_liveness`` marker, then assert the reaper's terminal flip / deferral.
No network, no subprocess — every external is mocked.
"""
import os
import tempfile
import pytest
# Override env before importing app modules (same convention as test_job_reaper.py).
os.environ["ORCH_DB_PATH"] = os.path.join(
tempfile.gettempdir(), "test_orch113_reaper.db"
)
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
import src.db as db
from src.db import init_db, get_db, get_job
import src.finalizer_liveness as fl
import src.job_reaper as jr
from src.job_reaper import JobReaper
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "reaper113.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
init_db()
# Each test starts with a clean ownership registry (process-local module state).
with fl._LOCK:
fl._OWNED.clear()
# Default: kill-switch ON (the fix is active) unless a test flips it.
monkeypatch.setattr(db.settings, "reaper_finalizer_liveness_enabled", True)
yield
with fl._LOCK:
fl._OWNED.clear()
# --- helpers ----------------------------------------------------------------
def _make_task(repo="orchestrator", branch="feature/orch113",
stage="deploy-staging", work_item_id="ORCH-113"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, branch, stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _make_running_job(agent="deployer", repo="orchestrator", task_id=None,
pid=None, age_s=0, attempts=0, max_attempts=2,
run_id=None, exit_code=0, finished_age_s=600):
"""Insert a job already in 'running' with a linked agent_runs row.
``finished_at`` is back-dated by ``finished_age_s`` so the Tier-2 grace
(default 300) is satisfied by default; pass a small value to stay within grace.
"""
conn = get_db()
if run_id is None and exit_code is not None:
cur = conn.execute(
"INSERT INTO agent_runs (task_id, agent, finished_at, exit_code) "
"VALUES (?, ?, datetime('now', ?), ?)",
(task_id, agent, f"-{int(finished_age_s)} seconds", exit_code),
)
run_id = cur.lastrowid
cur = conn.execute(
"INSERT INTO jobs (agent, repo, task_id, status, attempts, max_attempts, "
"run_id, pid, started_at) "
"VALUES (?, ?, ?, 'running', ?, ?, ?, ?, datetime('now', ?))",
(agent, repo, task_id, attempts, max_attempts, run_id, pid,
f"-{int(age_s)} seconds"),
)
job_id = cur.lastrowid
conn.commit()
conn.close()
return job_id
def _spy_advance(monkeypatch, side_effect=None):
"""Patch launcher._try_advance_stage with a call recorder.
Returns a mutable ``calls`` list. ``side_effect(run_id, agent, repo, branch)``
runs on each call (e.g. to simulate the false rollback to development).
"""
import src.agents.launcher as L
calls: list = []
def _fake(run_id, agent, repo, branch):
calls.append((run_id, agent, repo, branch))
if side_effect is not None:
side_effect(run_id, agent, repo, branch)
monkeypatch.setattr(L.launcher, "_try_advance_stage", _fake)
return calls
def _green_gate(monkeypatch):
"""Force the read-only canonical-QG pre-eval green (staging SUCCESS)."""
monkeypatch.setattr(JobReaper, "_gate_is_green",
lambda self, stage, job, branch, wid: True)
# --- TC-01: live finalizer on deploy-staging is NOT reaped (AC-1/FR-1) ------
def test_tc01_live_finalizer_deploy_staging_not_reaped(monkeypatch):
"""exit_code=0 and finished_age_s >= grace, but the finalizer is ALIVE (marked)
-> reaper does NOT call _try_advance_stage; the row stays running; defer logged."""
_green_gate(monkeypatch)
calls = _spy_advance(monkeypatch)
tid = _make_task(stage="deploy-staging")
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=600)
# A live monitor owns this finalization.
fl.mark(jid, run_id=1, stage="deploy-staging")
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "running" # not reaped
assert calls == [] # no second advance
assert r.finalizer_defers_total == 1
assert r.reaped_total == 0
# --- TC-02: strict ownership — non-owner runs zero side effects (AC-2/FR-2) --
def test_tc02_non_owner_runs_no_edge_gates(monkeypatch):
"""While a live finalizer owns the (job, stage), a racing reaper tick performs
NO side-effectful advance/merge-gate/re-test (zero side effects)."""
_green_gate(monkeypatch)
calls = _spy_advance(monkeypatch)
tid = _make_task(stage="deploy-staging")
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=900)
fl.mark(jid, run_id=7, stage="deploy-staging")
r = JobReaper()
# Several ticks while ownership is held — still zero advances, still running.
for _ in range(3):
r.reap_once()
assert calls == []
assert get_job(jid)["status"] == "running"
assert r.finalizer_defers_total == 3
# --- TC-03: a genuinely dead finalizer is still reaped (AC-3/FR-4) ----------
def test_tc03_dead_finalizer_still_reaped_tier2(monkeypatch):
"""No ownership marker (finalizer dead) -> reaper reaps via Tier-2 as before."""
_green_gate(monkeypatch)
calls = _spy_advance(monkeypatch)
tid = _make_task(stage="deploy-staging")
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=600)
# No fl.mark() -> ownership absent (finalizer dead).
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "done" # reaped to done (gate green)
assert len(calls) == 1 # advance driven exactly once
assert r.finalizer_defers_total == 0
def test_tc03_tier3_backstop_ignores_marker(monkeypatch):
"""Even with an active ownership marker, a job past reaper_max_running_s is
reaped by the Tier-3 backstop (a stuck finalizer is never immortal)."""
monkeypatch.setattr(db.settings, "reaper_max_running_s", 1000)
tid = _make_task(stage="deploy-staging")
# age beyond the backstop ceiling; exit_code=0 within grace so Tier-2 defers,
# but Tier-3 must still fire.
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=10,
age_s=2000, attempts=0, max_attempts=2)
fl.mark(jid, run_id=3, stage="deploy-staging")
r = JobReaper()
r.reap_once()
# Backstop reaps to a retry (attempts<max) -> queued, regardless of the marker.
assert get_job(jid)["status"] == "queued"
assert r.reaped_total == 1
# --- TC-04: idempotency under race — exactly-once advance (AC-2/AC-4/FR-2) ---
def test_tc04_idempotent_no_second_advance_under_race(monkeypatch):
"""Monitor finalizing (owns the job) + concurrent reaper ticks -> the heavy
edge-gate advance is NEVER duplicated by the reaper; no false rollback."""
rolled_back = {"hit": False}
def _rollback(run_id, agent, repo, branch):
# Simulate the incident: a second advance rolls back to development.
conn = get_db()
conn.execute("UPDATE tasks SET stage='development' WHERE branch=?", (branch,))
conn.commit()
conn.close()
rolled_back["hit"] = True
_green_gate(monkeypatch)
calls = _spy_advance(monkeypatch, side_effect=_rollback)
tid = _make_task(stage="deploy-staging")
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=1200)
fl.mark(jid, run_id=9, stage="deploy-staging")
r = JobReaper()
for _ in range(5):
r.reap_once()
assert calls == [] # reaper never re-ran the advance
assert rolled_back["hit"] is False
# The task is NOT rolled back; the live finalizer remains the sole driver.
conn = get_db()
stage = conn.execute("SELECT stage FROM tasks WHERE id=?", (tid,)).fetchone()[0]
conn.close()
assert stage == "deploy-staging"
# --- TC-05: MANDATORY regression for incident ORCH-111 (AC-4/FR-5) ----------
def test_tc05_orch111_no_false_rollback_no_retry_increment(monkeypatch):
"""Long (> grace) deploy-staging finalization at staging_status=SUCCESS while
the deploy finalizer concurrently reaches success/merge -> reaper does NOT roll
back deploy-staging -> development and does NOT increment developer-retry; the
task keeps a single consistent state. RED before the fix, GREEN after."""
def _rollback(run_id, agent, repo, branch):
# Simulate the incident: a second advance rolls the task back to development
# and spawns a fresh developer run (the developer-retry count is derived from
# agent_runs — stage_engine.developer_retry_count).
conn = get_db()
conn.execute("UPDATE tasks SET stage='development' WHERE branch=?", (branch,))
_t = conn.execute("SELECT id FROM tasks WHERE branch=?", (branch,)).fetchone()
conn.execute(
"INSERT INTO agent_runs (task_id, agent) VALUES (?, 'developer')",
(_t[0],),
)
conn.commit()
conn.close()
from src.stage_engine import developer_retry_count
_green_gate(monkeypatch) # staging_status SUCCESS
calls = _spy_advance(monkeypatch, side_effect=_rollback)
tid = _make_task(stage="deploy-staging")
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=1500)
# The original finalizer is still alive (running the heavy edge sub-gates).
fl.mark(jid, run_id=11, stage="deploy-staging")
r = JobReaper()
r.reap_once()
# No second advance => no false rollback, no developer-retry increment.
assert calls == []
conn = get_db()
stage = conn.execute("SELECT stage FROM tasks WHERE id=?", (tid,)).fetchone()[0]
conn.close()
assert stage == "deploy-staging" # NOT rolled back to development
assert developer_retry_count(tid) == 0 # developer-retry NOT incremented
assert get_job(jid)["status"] == "running"
assert r.finalizer_defers_total == 1
# --- TC-06: compatibility guard — kill-switch / non-deploy-staging (AC-5) ----
def test_tc06_killswitch_off_byte_for_byte_prior(monkeypatch):
"""Kill-switch OFF -> the marker is ignored; a deploy-staging exit0/past-grace
job is reaped exactly as before the fix (advance driven once)."""
monkeypatch.setattr(db.settings, "reaper_finalizer_liveness_enabled", False)
_green_gate(monkeypatch)
calls = _spy_advance(monkeypatch)
tid = _make_task(stage="deploy-staging")
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=600)
fl.mark(jid, run_id=5, stage="deploy-staging") # marked, but ignored
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "done"
assert len(calls) == 1
assert r.finalizer_defers_total == 0
def test_tc06_non_deploy_staging_stage_not_consulted(monkeypatch):
"""A non-deploy-staging stage is never consulted -> reaped as before even when
an (irrelevant) marker happens to be present."""
_green_gate(monkeypatch)
calls = _spy_advance(monkeypatch)
tid = _make_task(stage="testing") # deployer also owns 'testing'
jid = _make_running_job(task_id=tid, agent="deployer", exit_code=0,
finished_age_s=600)
fl.mark(jid, run_id=6, stage="testing")
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "done"
assert len(calls) == 1
assert r.finalizer_defers_total == 0
def test_tc06_within_grace_unchanged(monkeypatch):
"""Within the finalization grace the Tier-2 path is unchanged (deferred, not
reaped) regardless of the marker — the fix only acts PAST the grace."""
monkeypatch.setattr(db.settings, "reaper_finalize_grace_s", 300)
_green_gate(monkeypatch)
calls = _spy_advance(monkeypatch)
tid = _make_task(stage="deploy-staging")
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=5) # < grace
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "running"
assert calls == []
# Within-grace deferral is the legacy path, not a finalizer-liveness defer.
assert r.finalizer_defers_total == 0
# --- TC-07: cross-cutting budget invariant (NFR-6/AC-5) ---------------------
def test_tc07_budget_invariant_preserved():
"""reaper_max_running_s (5400) > Σ(deploy-staging gate-work) + grace; the fix
changed neither the grace nor the ceiling (ORCH-065/109/110 invariant)."""
s = jr.settings
# The fix did not touch these (zero schema/budget change).
assert s.reaper_finalize_grace_s == 300
assert s.reaper_max_running_s == 5400
# Conservative Σ of the heavy deploy-staging gate-work + grace must fit.
sigma = s.merge_retest_timeout_s + s.coverage_run_timeout_s
assert s.reaper_max_running_s > sigma + s.reaper_finalize_grace_s
# --- TC-08: never-raise — a fault in the liveness path degrades safely -------
def test_tc08_liveness_error_never_breaks_tick(monkeypatch):
"""An exception inside the ownership consultation must not crash the tick; the
job is still processed (conservatively reaped, never blocked)."""
def _boom(job_id):
raise RuntimeError("registry exploded")
monkeypatch.setattr(fl, "is_active", _boom)
_green_gate(monkeypatch)
calls = _spy_advance(monkeypatch)
tid = _make_task(stage="deploy-staging")
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=600)
fl.mark(jid, run_id=2, stage="deploy-staging")
r = JobReaper()
r.reap_once() # must not raise
# is_active raised -> _finalizer_owns conservatively returns False -> reaped.
assert get_job(jid)["status"] == "done"
assert len(calls) == 1
def test_tc08_reap_once_isolates_and_never_raises(monkeypatch):
"""A fault while resolving one job's metadata is isolated; reap_once never
raises and other jobs are still processed."""
def _boom(self, job):
raise RuntimeError("meta exploded")
monkeypatch.setattr(JobReaper, "_task_meta", _boom)
tid = _make_task(stage="deploy-staging")
_make_running_job(task_id=tid, exit_code=0, finished_age_s=600)
r = JobReaper()
r.reap_once() # outer + per-job never-raise -> no exception propagates
def test_tc08_finalizer_liveness_leaf_never_raises():
"""The leaf degrades safely on bad input / None job_id."""
fl.mark(None, None, None) # no-op, no raise
fl.clear(None) # no-op, no raise
assert fl.is_active(None) is False
fl.mark(1234, 1, "deploy-staging")
assert fl.is_active(1234) is True
snap = fl.snapshot()
assert snap["active"] >= 1 and 1234 in snap["jobs"]
fl.clear(1234)
assert fl.is_active(1234) is False