Files
orchestrator/tests/test_job_reaper.py
claude-bot 720c31393a fix(reaper): Tier-2 finalization grace + claim-before-act (no dup advance)
Tier-2 reaped a LIVE, still-finalizing monitor: _monitor_agent writes
agent_runs.exit_code FIRST, then does git push / PR / Plane comments before
_finalize_job, and the agent pid is already dead in that window — so the old
"exit_code recorded -> reap now" had no grace and could race a healthy job.
Worse, _reap_known_outcome ran the advance (advance_stage -> enqueue_job)
BEFORE the atomic claim, so a reaper that lost the race had already enqueued
the next stage (dup advance / dup enqueue), violating ADR-001 Р-1.

Fix:
- Tier-2 grace: reap only once agent_runs.exit_code has been recorded for
  >= reaper_finalize_grace_s (new setting, default 300s; > max finalization
  window). A live finalizing monitor is never reaped (FR-1.3/AC-3). New
  finished_age_s column computed in get_running_jobs.
- claim-before-act for exit0: evaluate the canonical QG READ-ONLY (the
  reconciler pattern) to choose the terminal status, then atomically claim
  'done' FIRST; only the claim winner runs the advance. A loser performs no
  side effects -> no dup advance / dup enqueue.

Docs (golden source) updated in the same change: ADR-001, global adr-0011,
README, internals, .env.example, CHANGELOG (also fixes the P3 broken adr-0011
link). New tests cover the grace window, lost-claim no-side-effects, and the
already-advanced idempotent path.

Refs: ORCH-065

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-07 16:14:45 +00:00

389 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ORCH-065: job-reaper unit tests (TC-01..TC-08, TC-21).
The reaper never spawns claude; we drive the DB directly (a 'running' jobs row +
optional agent_runs exit_code/pid) and assert the terminal flip + side-effects.
``os.kill`` liveness is monkeypatched so a 'dead'/'alive' pid is deterministic.
"""
import os
import tempfile
import pytest
# Override env before importing app modules (same convention as test_queue.py).
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch_reaper.db")
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
import src.db as db
from src.db import init_db, get_db, enqueue_job, get_job
import src.job_reaper as jr
from src.job_reaper import JobReaper
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "reaper.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
init_db()
yield
# --- helpers ----------------------------------------------------------------
def _make_running_job(agent="developer", repo="orchestrator", task_id=None,
pid=None, age_s=0, attempts=0, max_attempts=2,
run_id=None, exit_code=None, finished_age_s=600):
"""Insert a job already in 'running' with the given pid/age/attempts.
started_at is back-dated by ``age_s`` seconds so running_age_s reflects it.
When ``exit_code`` is given an agent_runs row is created and linked (Tier-2);
its ``finished_at`` is back-dated by ``finished_age_s`` seconds so the
Tier-2 finalization grace (``reaper_finalize_grace_s``, default 300) is
satisfied by default — pass a small ``finished_age_s`` to exercise the
"monitor may still be finalizing" deferral.
"""
conn = get_db()
if run_id is None and exit_code is not None:
cur = conn.execute(
"INSERT INTO agent_runs (task_id, agent, finished_at, exit_code) "
"VALUES (?, ?, datetime('now', ?), ?)",
(task_id, agent, f"-{int(finished_age_s)} seconds", exit_code),
)
run_id = cur.lastrowid
cur = conn.execute(
"INSERT INTO jobs (agent, repo, task_id, status, attempts, max_attempts, "
"run_id, pid, started_at) "
"VALUES (?, ?, ?, 'running', ?, ?, ?, ?, datetime('now', ?))",
(agent, repo, task_id, attempts, max_attempts, run_id, pid,
f"-{int(age_s)} seconds"),
)
job_id = cur.lastrowid
conn.commit()
conn.close()
return job_id
def _make_task(repo="orchestrator", branch="feature/x", stage="development",
work_item_id="ORCH-1"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, branch, stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _dead_pid(monkeypatch):
"""Force merge_gate.pid_alive -> False (process gone) for the reaper."""
import src.merge_gate as mg
monkeypatch.setattr(mg, "pid_alive", lambda pid: False)
def _alive_pid(monkeypatch):
import src.merge_gate as mg
monkeypatch.setattr(mg, "pid_alive", lambda pid: True)
# --- TC-01: dead executor -> reaped without process restart -----------------
def test_tc01_dead_pid_reaped_to_queued(monkeypatch):
_dead_pid(monkeypatch)
jid = _make_running_job(pid=999999, attempts=0, max_attempts=2)
r = JobReaper()
r.reap_once() # tick 1 (streak=1, dead_ticks default 2 -> not yet)
assert get_job(jid)["status"] == "running"
r.reap_once() # tick 2 -> reaped
assert get_job(jid)["status"] == "queued"
assert r.reaped_total == 1
assert r.last_reaped["job_id"] == jid
# --- TC-02: live agent within timeout is NEVER reaped -----------------------
def test_tc02_alive_pid_never_reaped(monkeypatch):
_alive_pid(monkeypatch)
jid = _make_running_job(pid=4321, age_s=10)
r = JobReaper()
for _ in range(5):
r.reap_once()
assert get_job(jid)["status"] == "running"
assert r.reaped_total == 0
def test_tc02_alive_within_max_running_not_reaped(monkeypatch):
_alive_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_max_running_s", 3600)
jid = _make_running_job(pid=4321, age_s=1800) # < ceiling, alive
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "running"
# --- TC-03: zombie only after reaper_dead_ticks consecutive ticks -----------
def test_tc03_requires_consecutive_dead_ticks(monkeypatch):
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 3)
import src.merge_gate as mg
# Dead, dead, ALIVE (resets), dead, dead, dead -> reaped only on the 6th tick.
seq = iter([False, False, True, False, False, False])
monkeypatch.setattr(mg, "pid_alive", lambda pid: next(seq))
jid = _make_running_job(pid=999998)
r = JobReaper()
for _ in range(5):
r.reap_once()
assert get_job(jid)["status"] == "running"
r.reap_once() # 6th tick: third CONSECUTIVE dead -> reaped
assert get_job(jid)["status"] == "queued"
# --- TC-04: backstop ceiling reaps even when liveness is unknown ------------
def test_tc04_backstop_ceiling(monkeypatch):
_alive_pid(monkeypatch) # liveness says "alive", but age exceeds the ceiling
monkeypatch.setattr(db.settings, "reaper_max_running_s", 100)
jid = _make_running_job(pid=4321, age_s=500)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued"
assert r.reaped_total == 1
def test_tc04_backstop_no_pid(monkeypatch):
monkeypatch.setattr(db.settings, "reaper_max_running_s", 100)
jid = _make_running_job(pid=None, age_s=500)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued"
# --- TC-05: correct outcome by exit_code (Tier-2) ---------------------------
def _gate(monkeypatch, green: bool):
"""Force the reaper's READ-ONLY gate pre-evaluation to green/red."""
monkeypatch.setattr(
JobReaper, "_gate_is_green",
lambda self, stage, job, branch, wid: green,
)
def test_tc05_exit0_gate_green_done(monkeypatch):
# A developer job runs to LEAVE the 'architecture' stage (-> 'development').
tid = _make_task(stage="architecture")
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0)
_gate(monkeypatch, green=True)
# gate green -> the claim flips 'done' FIRST, then the advance runs.
import src.agents.launcher as L
monkeypatch.setattr(
L.launcher, "_try_advance_stage",
lambda run_id, agent, repo, branch: db.update_task_stage(tid, "development"),
)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "done"
def test_tc05_exit0_gate_red_requeues(monkeypatch):
tid = _make_task(stage="architecture")
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0,
attempts=0, max_attempts=2)
_gate(monkeypatch, green=False) # read-only pre-eval says red
# The advance path must NEVER run when the gate is red (claim-before-act).
import src.agents.launcher as L
called = []
monkeypatch.setattr(L.launcher, "_try_advance_stage",
lambda run_id, agent, repo, branch: called.append(1))
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued" # exit0 but gate red -> not 'done'
assert not called, "no advance/side-effects on a red gate"
def test_tc05_exit0_already_advanced_done_no_side_effects(monkeypatch):
# Stage already past the developer candidate set -> idempotent clean 'done'
# with NO advance call (the monitor already advanced before dying).
tid = _make_task(stage="development") # developer's candidate is 'architecture'
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0)
import src.agents.launcher as L
called = []
monkeypatch.setattr(L.launcher, "_try_advance_stage",
lambda run_id, agent, repo, branch: called.append(1))
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "done"
assert not called, "already-advanced reap must not re-advance"
def test_tc05_nonzero_exit_requeue_then_failed(monkeypatch):
sent = []
monkeypatch.setattr(jr, "JobReaper", JobReaper)
tid = _make_task(stage="development")
jid = _make_running_job(agent="developer", task_id=tid, exit_code=1,
attempts=1, max_attempts=2)
r = JobReaper()
import src.notifications as notif
monkeypatch.setattr(notif, "send_telegram", lambda *a, **k: sent.append(a))
r.reap_once() # attempts(1) < max(2) -> queued
assert get_job(jid)["status"] == "queued"
# Now exhaust the budget.
jid2 = _make_running_job(agent="developer", task_id=tid, exit_code=1,
attempts=2, max_attempts=2)
r.reap_once()
assert get_job(jid2)["status"] == "failed"
assert sent, "failed reap must send a Telegram alert"
# --- TC-05b: Tier-2 finalization grace (live monitor still finalizing) -------
def test_tc05_tier2_within_grace_not_reaped(monkeypatch):
"""exit_code freshly recorded -> a LIVE monitor may still be finalizing.
The reaper must NOT reap it within ``reaper_finalize_grace_s`` (FR-1.3/AC-3:
a live finalizing monitor — git push / PR / Plane comments — is never reaped,
no dup advance / enqueue).
"""
monkeypatch.setattr(db.settings, "reaper_finalize_grace_s", 300)
tid = _make_task(stage="architecture")
# exit_code recorded only 5s ago -> still inside the finalization grace.
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0,
finished_age_s=5)
import src.agents.launcher as L
called = []
monkeypatch.setattr(L.launcher, "_try_advance_stage",
lambda run_id, agent, repo, branch: called.append(1))
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "running" # deferred, NOT reaped
assert r.reaped_total == 0
assert not called, "a live finalizing monitor must not be advanced by the reaper"
def test_tc05_tier2_after_grace_reaped(monkeypatch):
"""Once exit_code has been recorded longer than the grace, the monitor is
genuinely dead and the Tier-2 reap proceeds."""
monkeypatch.setattr(db.settings, "reaper_finalize_grace_s", 300)
tid = _make_task(stage="architecture")
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0,
finished_age_s=600) # well past the grace
_gate(monkeypatch, green=True)
import src.agents.launcher as L
monkeypatch.setattr(
L.launcher, "_try_advance_stage",
lambda run_id, agent, repo, branch: db.update_task_stage(tid, "development"),
)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "done"
def test_tc05_tier2_lost_claim_no_side_effects(monkeypatch):
"""claim-BEFORE-act: when another actor (a late monitor / startup requeue)
moves the row out of 'running' AFTER the reaper read it but BEFORE the atomic
claim, the reaper's claim loses (rowcount==0) and it performs NO advance side
effects (no dup advance / dup enqueue) — ADR-001 Р-1."""
monkeypatch.setattr(db.settings, "reaper_finalize_grace_s", 0)
tid = _make_task(stage="architecture")
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0,
finished_age_s=10)
import src.agents.launcher as L
called = []
monkeypatch.setattr(L.launcher, "_try_advance_stage",
lambda run_id, agent, repo, branch: called.append(1))
# The read-only gate pre-eval reports green, but the row is concurrently
# claimed by someone else right before the reaper's atomic claim runs.
def green_then_steal(self, stage, job, branch, wid):
db.requeue_running_jobs() # another actor wins the 'running' row first
return True
monkeypatch.setattr(JobReaper, "_gate_is_green", green_then_steal)
r = JobReaper()
r.reap_once()
# Reaper lost the atomic claim -> no advance, no double work. The row stays
# where the winner left it ('queued'), not flipped to 'done' by the reaper.
assert not called, "reaper that lost the claim must not advance/enqueue"
assert get_job(jid)["status"] == "queued"
assert r.reaped_total == 0
# --- TC-06: atomicity — reaper vs requeue_running_jobs (status guard) --------
def test_tc06_atomic_no_double_reap(monkeypatch):
_dead_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1)
jid = _make_running_job(pid=999997, attempts=0, max_attempts=2)
# Simulate the startup requeue winning the row first.
n = db.requeue_running_jobs()
assert n == 1
assert get_job(jid)["status"] == "queued"
# The reaper now scans: the row is no longer 'running' -> reap_running_job's
# WHERE status='running' guard yields rowcount 0 -> no second processing.
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued"
assert r.reaped_total == 0
def test_tc06_reap_running_job_guard_returns_false_when_not_running():
jid = enqueue_job("developer", "orchestrator") # status 'queued', not running
assert db.reap_running_job(jid, "done") is False
assert get_job(jid)["status"] == "queued"
# --- TC-07: kill-switch reaper_enabled=False -> no-op -----------------------
def test_tc07_kill_switch(monkeypatch):
_dead_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_enabled", False)
monkeypatch.setattr(db.settings, "lease_reclaim_enabled", False)
jid = _make_running_job(pid=999996, age_s=99999)
r = JobReaper()
for _ in range(3):
r.reap_once()
assert get_job(jid)["status"] == "running"
assert r.reaped_total == 0
# --- TC-08: never-raise — a DB/OS error in one tick does not propagate -------
def test_tc08_never_raise_isolates_per_job(monkeypatch):
_dead_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1)
good = _make_running_job(pid=111, attempts=0, max_attempts=2)
bad = _make_running_job(pid=222, attempts=0, max_attempts=2)
r = JobReaper()
orig = r._reap_job
def boom(job):
if job["id"] == bad:
raise RuntimeError("simulated per-job failure")
return orig(job)
monkeypatch.setattr(r, "_reap_job", boom)
# Must not raise despite the bad job blowing up.
r.reap_once()
# The good job is still reaped; the bad one is isolated (stays running).
assert get_job(good)["status"] == "queued"
assert get_job(bad)["status"] == "running"
def test_tc08_reap_once_outer_never_raises(monkeypatch):
monkeypatch.setattr(jr, "get_running_jobs",
lambda: (_ for _ in ()).throw(RuntimeError("db down")))
r = JobReaper()
# reap_once swallows... actually get_running_jobs is iterated in the for; the
# _tick wrapper guarantees the loop never dies. Assert _tick is safe.
r._tick()
assert r.last_run_ts is not None
# --- TC-21: startup lease-reclaim + reaper start/stop smoke -----------------
def test_tc21_reaper_start_stop_smoke():
r = JobReaper(interval_s=0.05)
r.start()
assert r._thread is not None and r._thread.is_alive()
r.stop(timeout=2)
assert not r._thread.is_alive()
def test_tc21_reclaim_all_stale_leases_callable(monkeypatch):
# No lease files present -> 0 reclaimed, never raises (registration smoke).
monkeypatch.setattr(db.settings, "lease_reclaim_enabled", True)
assert jr.reclaim_all_stale_leases() == 0