Tier-2 reaped a LIVE, still-finalizing monitor: _monitor_agent writes agent_runs.exit_code FIRST, then does git push / PR / Plane comments before _finalize_job, and the agent pid is already dead in that window — so the old "exit_code recorded -> reap now" had no grace and could race a healthy job. Worse, _reap_known_outcome ran the advance (advance_stage -> enqueue_job) BEFORE the atomic claim, so a reaper that lost the race had already enqueued the next stage (dup advance / dup enqueue), violating ADR-001 Р-1. Fix: - Tier-2 grace: reap only once agent_runs.exit_code has been recorded for >= reaper_finalize_grace_s (new setting, default 300s; > max finalization window). A live finalizing monitor is never reaped (FR-1.3/AC-3). New finished_age_s column computed in get_running_jobs. - claim-before-act for exit0: evaluate the canonical QG READ-ONLY (the reconciler pattern) to choose the terminal status, then atomically claim 'done' FIRST; only the claim winner runs the advance. A loser performs no side effects -> no dup advance / dup enqueue. Docs (golden source) updated in the same change: ADR-001, global adr-0011, README, internals, .env.example, CHANGELOG (also fixes the P3 broken adr-0011 link). New tests cover the grace window, lost-claim no-side-effects, and the already-advanced idempotent path. Refs: ORCH-065 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
389 lines
15 KiB
Python
389 lines
15 KiB
Python
"""ORCH-065: job-reaper unit tests (TC-01..TC-08, TC-21).
|
||
|
||
The reaper never spawns claude; we drive the DB directly (a 'running' jobs row +
|
||
optional agent_runs exit_code/pid) and assert the terminal flip + side-effects.
|
||
``os.kill`` liveness is monkeypatched so a 'dead'/'alive' pid is deterministic.
|
||
"""
|
||
import os
|
||
import tempfile
|
||
|
||
import pytest
|
||
|
||
# Override env before importing app modules (same convention as test_queue.py).
|
||
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch_reaper.db")
|
||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
||
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||
|
||
import src.db as db
|
||
from src.db import init_db, get_db, enqueue_job, get_job
|
||
import src.job_reaper as jr
|
||
from src.job_reaper import JobReaper
|
||
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def fresh_db(tmp_path, monkeypatch):
|
||
dbfile = tmp_path / "reaper.db"
|
||
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
|
||
init_db()
|
||
yield
|
||
|
||
|
||
# --- helpers ----------------------------------------------------------------
|
||
def _make_running_job(agent="developer", repo="orchestrator", task_id=None,
|
||
pid=None, age_s=0, attempts=0, max_attempts=2,
|
||
run_id=None, exit_code=None, finished_age_s=600):
|
||
"""Insert a job already in 'running' with the given pid/age/attempts.
|
||
|
||
started_at is back-dated by ``age_s`` seconds so running_age_s reflects it.
|
||
When ``exit_code`` is given an agent_runs row is created and linked (Tier-2);
|
||
its ``finished_at`` is back-dated by ``finished_age_s`` seconds so the
|
||
Tier-2 finalization grace (``reaper_finalize_grace_s``, default 300) is
|
||
satisfied by default — pass a small ``finished_age_s`` to exercise the
|
||
"monitor may still be finalizing" deferral.
|
||
"""
|
||
conn = get_db()
|
||
if run_id is None and exit_code is not None:
|
||
cur = conn.execute(
|
||
"INSERT INTO agent_runs (task_id, agent, finished_at, exit_code) "
|
||
"VALUES (?, ?, datetime('now', ?), ?)",
|
||
(task_id, agent, f"-{int(finished_age_s)} seconds", exit_code),
|
||
)
|
||
run_id = cur.lastrowid
|
||
cur = conn.execute(
|
||
"INSERT INTO jobs (agent, repo, task_id, status, attempts, max_attempts, "
|
||
"run_id, pid, started_at) "
|
||
"VALUES (?, ?, ?, 'running', ?, ?, ?, ?, datetime('now', ?))",
|
||
(agent, repo, task_id, attempts, max_attempts, run_id, pid,
|
||
f"-{int(age_s)} seconds"),
|
||
)
|
||
job_id = cur.lastrowid
|
||
conn.commit()
|
||
conn.close()
|
||
return job_id
|
||
|
||
|
||
def _make_task(repo="orchestrator", branch="feature/x", stage="development",
|
||
work_item_id="ORCH-1"):
|
||
conn = get_db()
|
||
cur = conn.execute(
|
||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
||
"VALUES (?, ?, ?, ?, ?)",
|
||
(work_item_id, work_item_id, repo, branch, stage),
|
||
)
|
||
tid = cur.lastrowid
|
||
conn.commit()
|
||
conn.close()
|
||
return tid
|
||
|
||
|
||
def _dead_pid(monkeypatch):
|
||
"""Force merge_gate.pid_alive -> False (process gone) for the reaper."""
|
||
import src.merge_gate as mg
|
||
monkeypatch.setattr(mg, "pid_alive", lambda pid: False)
|
||
|
||
|
||
def _alive_pid(monkeypatch):
|
||
import src.merge_gate as mg
|
||
monkeypatch.setattr(mg, "pid_alive", lambda pid: True)
|
||
|
||
|
||
# --- TC-01: dead executor -> reaped without process restart -----------------
|
||
def test_tc01_dead_pid_reaped_to_queued(monkeypatch):
|
||
_dead_pid(monkeypatch)
|
||
jid = _make_running_job(pid=999999, attempts=0, max_attempts=2)
|
||
r = JobReaper()
|
||
r.reap_once() # tick 1 (streak=1, dead_ticks default 2 -> not yet)
|
||
assert get_job(jid)["status"] == "running"
|
||
r.reap_once() # tick 2 -> reaped
|
||
assert get_job(jid)["status"] == "queued"
|
||
assert r.reaped_total == 1
|
||
assert r.last_reaped["job_id"] == jid
|
||
|
||
|
||
# --- TC-02: live agent within timeout is NEVER reaped -----------------------
|
||
def test_tc02_alive_pid_never_reaped(monkeypatch):
|
||
_alive_pid(monkeypatch)
|
||
jid = _make_running_job(pid=4321, age_s=10)
|
||
r = JobReaper()
|
||
for _ in range(5):
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "running"
|
||
assert r.reaped_total == 0
|
||
|
||
|
||
def test_tc02_alive_within_max_running_not_reaped(monkeypatch):
|
||
_alive_pid(monkeypatch)
|
||
monkeypatch.setattr(db.settings, "reaper_max_running_s", 3600)
|
||
jid = _make_running_job(pid=4321, age_s=1800) # < ceiling, alive
|
||
r = JobReaper()
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "running"
|
||
|
||
|
||
# --- TC-03: zombie only after reaper_dead_ticks consecutive ticks -----------
|
||
def test_tc03_requires_consecutive_dead_ticks(monkeypatch):
|
||
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 3)
|
||
import src.merge_gate as mg
|
||
# Dead, dead, ALIVE (resets), dead, dead, dead -> reaped only on the 6th tick.
|
||
seq = iter([False, False, True, False, False, False])
|
||
monkeypatch.setattr(mg, "pid_alive", lambda pid: next(seq))
|
||
jid = _make_running_job(pid=999998)
|
||
r = JobReaper()
|
||
for _ in range(5):
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "running"
|
||
r.reap_once() # 6th tick: third CONSECUTIVE dead -> reaped
|
||
assert get_job(jid)["status"] == "queued"
|
||
|
||
|
||
# --- TC-04: backstop ceiling reaps even when liveness is unknown ------------
|
||
def test_tc04_backstop_ceiling(monkeypatch):
|
||
_alive_pid(monkeypatch) # liveness says "alive", but age exceeds the ceiling
|
||
monkeypatch.setattr(db.settings, "reaper_max_running_s", 100)
|
||
jid = _make_running_job(pid=4321, age_s=500)
|
||
r = JobReaper()
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "queued"
|
||
assert r.reaped_total == 1
|
||
|
||
|
||
def test_tc04_backstop_no_pid(monkeypatch):
|
||
monkeypatch.setattr(db.settings, "reaper_max_running_s", 100)
|
||
jid = _make_running_job(pid=None, age_s=500)
|
||
r = JobReaper()
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "queued"
|
||
|
||
|
||
# --- TC-05: correct outcome by exit_code (Tier-2) ---------------------------
|
||
def _gate(monkeypatch, green: bool):
|
||
"""Force the reaper's READ-ONLY gate pre-evaluation to green/red."""
|
||
monkeypatch.setattr(
|
||
JobReaper, "_gate_is_green",
|
||
lambda self, stage, job, branch, wid: green,
|
||
)
|
||
|
||
|
||
def test_tc05_exit0_gate_green_done(monkeypatch):
|
||
# A developer job runs to LEAVE the 'architecture' stage (-> 'development').
|
||
tid = _make_task(stage="architecture")
|
||
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0)
|
||
_gate(monkeypatch, green=True)
|
||
# gate green -> the claim flips 'done' FIRST, then the advance runs.
|
||
import src.agents.launcher as L
|
||
monkeypatch.setattr(
|
||
L.launcher, "_try_advance_stage",
|
||
lambda run_id, agent, repo, branch: db.update_task_stage(tid, "development"),
|
||
)
|
||
r = JobReaper()
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "done"
|
||
|
||
|
||
def test_tc05_exit0_gate_red_requeues(monkeypatch):
|
||
tid = _make_task(stage="architecture")
|
||
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0,
|
||
attempts=0, max_attempts=2)
|
||
_gate(monkeypatch, green=False) # read-only pre-eval says red
|
||
# The advance path must NEVER run when the gate is red (claim-before-act).
|
||
import src.agents.launcher as L
|
||
called = []
|
||
monkeypatch.setattr(L.launcher, "_try_advance_stage",
|
||
lambda run_id, agent, repo, branch: called.append(1))
|
||
r = JobReaper()
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "queued" # exit0 but gate red -> not 'done'
|
||
assert not called, "no advance/side-effects on a red gate"
|
||
|
||
|
||
def test_tc05_exit0_already_advanced_done_no_side_effects(monkeypatch):
|
||
# Stage already past the developer candidate set -> idempotent clean 'done'
|
||
# with NO advance call (the monitor already advanced before dying).
|
||
tid = _make_task(stage="development") # developer's candidate is 'architecture'
|
||
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0)
|
||
import src.agents.launcher as L
|
||
called = []
|
||
monkeypatch.setattr(L.launcher, "_try_advance_stage",
|
||
lambda run_id, agent, repo, branch: called.append(1))
|
||
r = JobReaper()
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "done"
|
||
assert not called, "already-advanced reap must not re-advance"
|
||
|
||
|
||
def test_tc05_nonzero_exit_requeue_then_failed(monkeypatch):
|
||
sent = []
|
||
monkeypatch.setattr(jr, "JobReaper", JobReaper)
|
||
tid = _make_task(stage="development")
|
||
jid = _make_running_job(agent="developer", task_id=tid, exit_code=1,
|
||
attempts=1, max_attempts=2)
|
||
r = JobReaper()
|
||
import src.notifications as notif
|
||
monkeypatch.setattr(notif, "send_telegram", lambda *a, **k: sent.append(a))
|
||
r.reap_once() # attempts(1) < max(2) -> queued
|
||
assert get_job(jid)["status"] == "queued"
|
||
|
||
# Now exhaust the budget.
|
||
jid2 = _make_running_job(agent="developer", task_id=tid, exit_code=1,
|
||
attempts=2, max_attempts=2)
|
||
r.reap_once()
|
||
assert get_job(jid2)["status"] == "failed"
|
||
assert sent, "failed reap must send a Telegram alert"
|
||
|
||
|
||
# --- TC-05b: Tier-2 finalization grace (live monitor still finalizing) -------
|
||
def test_tc05_tier2_within_grace_not_reaped(monkeypatch):
|
||
"""exit_code freshly recorded -> a LIVE monitor may still be finalizing.
|
||
|
||
The reaper must NOT reap it within ``reaper_finalize_grace_s`` (FR-1.3/AC-3:
|
||
a live finalizing monitor — git push / PR / Plane comments — is never reaped,
|
||
no dup advance / enqueue).
|
||
"""
|
||
monkeypatch.setattr(db.settings, "reaper_finalize_grace_s", 300)
|
||
tid = _make_task(stage="architecture")
|
||
# exit_code recorded only 5s ago -> still inside the finalization grace.
|
||
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0,
|
||
finished_age_s=5)
|
||
import src.agents.launcher as L
|
||
called = []
|
||
monkeypatch.setattr(L.launcher, "_try_advance_stage",
|
||
lambda run_id, agent, repo, branch: called.append(1))
|
||
r = JobReaper()
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "running" # deferred, NOT reaped
|
||
assert r.reaped_total == 0
|
||
assert not called, "a live finalizing monitor must not be advanced by the reaper"
|
||
|
||
|
||
def test_tc05_tier2_after_grace_reaped(monkeypatch):
|
||
"""Once exit_code has been recorded longer than the grace, the monitor is
|
||
genuinely dead and the Tier-2 reap proceeds."""
|
||
monkeypatch.setattr(db.settings, "reaper_finalize_grace_s", 300)
|
||
tid = _make_task(stage="architecture")
|
||
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0,
|
||
finished_age_s=600) # well past the grace
|
||
_gate(monkeypatch, green=True)
|
||
import src.agents.launcher as L
|
||
monkeypatch.setattr(
|
||
L.launcher, "_try_advance_stage",
|
||
lambda run_id, agent, repo, branch: db.update_task_stage(tid, "development"),
|
||
)
|
||
r = JobReaper()
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "done"
|
||
|
||
|
||
def test_tc05_tier2_lost_claim_no_side_effects(monkeypatch):
|
||
"""claim-BEFORE-act: when another actor (a late monitor / startup requeue)
|
||
moves the row out of 'running' AFTER the reaper read it but BEFORE the atomic
|
||
claim, the reaper's claim loses (rowcount==0) and it performs NO advance side
|
||
effects (no dup advance / dup enqueue) — ADR-001 Р-1."""
|
||
monkeypatch.setattr(db.settings, "reaper_finalize_grace_s", 0)
|
||
tid = _make_task(stage="architecture")
|
||
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0,
|
||
finished_age_s=10)
|
||
import src.agents.launcher as L
|
||
called = []
|
||
monkeypatch.setattr(L.launcher, "_try_advance_stage",
|
||
lambda run_id, agent, repo, branch: called.append(1))
|
||
|
||
# The read-only gate pre-eval reports green, but the row is concurrently
|
||
# claimed by someone else right before the reaper's atomic claim runs.
|
||
def green_then_steal(self, stage, job, branch, wid):
|
||
db.requeue_running_jobs() # another actor wins the 'running' row first
|
||
return True
|
||
|
||
monkeypatch.setattr(JobReaper, "_gate_is_green", green_then_steal)
|
||
r = JobReaper()
|
||
r.reap_once()
|
||
# Reaper lost the atomic claim -> no advance, no double work. The row stays
|
||
# where the winner left it ('queued'), not flipped to 'done' by the reaper.
|
||
assert not called, "reaper that lost the claim must not advance/enqueue"
|
||
assert get_job(jid)["status"] == "queued"
|
||
assert r.reaped_total == 0
|
||
|
||
|
||
# --- TC-06: atomicity — reaper vs requeue_running_jobs (status guard) --------
|
||
def test_tc06_atomic_no_double_reap(monkeypatch):
|
||
_dead_pid(monkeypatch)
|
||
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1)
|
||
jid = _make_running_job(pid=999997, attempts=0, max_attempts=2)
|
||
# Simulate the startup requeue winning the row first.
|
||
n = db.requeue_running_jobs()
|
||
assert n == 1
|
||
assert get_job(jid)["status"] == "queued"
|
||
# The reaper now scans: the row is no longer 'running' -> reap_running_job's
|
||
# WHERE status='running' guard yields rowcount 0 -> no second processing.
|
||
r = JobReaper()
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "queued"
|
||
assert r.reaped_total == 0
|
||
|
||
|
||
def test_tc06_reap_running_job_guard_returns_false_when_not_running():
|
||
jid = enqueue_job("developer", "orchestrator") # status 'queued', not running
|
||
assert db.reap_running_job(jid, "done") is False
|
||
assert get_job(jid)["status"] == "queued"
|
||
|
||
|
||
# --- TC-07: kill-switch reaper_enabled=False -> no-op -----------------------
|
||
def test_tc07_kill_switch(monkeypatch):
|
||
_dead_pid(monkeypatch)
|
||
monkeypatch.setattr(db.settings, "reaper_enabled", False)
|
||
monkeypatch.setattr(db.settings, "lease_reclaim_enabled", False)
|
||
jid = _make_running_job(pid=999996, age_s=99999)
|
||
r = JobReaper()
|
||
for _ in range(3):
|
||
r.reap_once()
|
||
assert get_job(jid)["status"] == "running"
|
||
assert r.reaped_total == 0
|
||
|
||
|
||
# --- TC-08: never-raise — a DB/OS error in one tick does not propagate -------
|
||
def test_tc08_never_raise_isolates_per_job(monkeypatch):
|
||
_dead_pid(monkeypatch)
|
||
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1)
|
||
good = _make_running_job(pid=111, attempts=0, max_attempts=2)
|
||
bad = _make_running_job(pid=222, attempts=0, max_attempts=2)
|
||
|
||
r = JobReaper()
|
||
orig = r._reap_job
|
||
|
||
def boom(job):
|
||
if job["id"] == bad:
|
||
raise RuntimeError("simulated per-job failure")
|
||
return orig(job)
|
||
|
||
monkeypatch.setattr(r, "_reap_job", boom)
|
||
# Must not raise despite the bad job blowing up.
|
||
r.reap_once()
|
||
# The good job is still reaped; the bad one is isolated (stays running).
|
||
assert get_job(good)["status"] == "queued"
|
||
assert get_job(bad)["status"] == "running"
|
||
|
||
|
||
def test_tc08_reap_once_outer_never_raises(monkeypatch):
|
||
monkeypatch.setattr(jr, "get_running_jobs",
|
||
lambda: (_ for _ in ()).throw(RuntimeError("db down")))
|
||
r = JobReaper()
|
||
# reap_once swallows... actually get_running_jobs is iterated in the for; the
|
||
# _tick wrapper guarantees the loop never dies. Assert _tick is safe.
|
||
r._tick()
|
||
assert r.last_run_ts is not None
|
||
|
||
|
||
# --- TC-21: startup lease-reclaim + reaper start/stop smoke -----------------
|
||
def test_tc21_reaper_start_stop_smoke():
|
||
r = JobReaper(interval_s=0.05)
|
||
r.start()
|
||
assert r._thread is not None and r._thread.is_alive()
|
||
r.stop(timeout=2)
|
||
assert not r._thread.is_alive()
|
||
|
||
|
||
def test_tc21_reclaim_all_stale_leases_callable(monkeypatch):
|
||
# No lease files present -> 0 reclaimed, never raises (registration smoke).
|
||
monkeypatch.setattr(db.settings, "lease_reclaim_enabled", True)
|
||
assert jr.reclaim_all_stale_leases() == 0
|