Files
orchestrator/tests/test_job_reaper.py
claude-bot 4bebb921ff feat(reaper): job-reaper + stale merge-lease reclaim + idempotent merge finalization
Closes the "zombie jobs" incident class: job status was set only inside
the live launcher process, so a process death left jobs.status='running'
forever; at max_concurrency=1 one zombie blocked ALL projects' queue
(self-hosting risk). Adds a background daemon (src/job_reaper.py) with
three-tier liveness (dead-pid streak / known exit_code / max-running
backstop) whose only mutating write is an atomic terminal flip guarded by
WHERE status='running' (no double-process). For exit0 the canonical QG is
the source of truth via gate-driven advance, not "exit0".

Also proactively reclaims stale merge-lease (dead pid OR TTL) via file
delete only (no git ops), and makes merge finalization idempotent
(pr_already_merged guard + up-to-date short-circuit on re-drive).

New jobs.pid column via idempotent _ensure_column (no migration); pid
stamped in launcher._spawn after Popen. Reaper start/stop in lifespan;
"reaper" snapshot in GET /queue. Kill-switches: ORCH_REAPER_ENABLED,
ORCH_REAPER_INTERVAL_S, ORCH_REAPER_DEAD_TICKS, ORCH_REAPER_MAX_RUNNING_S,
ORCH_LEASE_RECLAIM_ENABLED.

Invariants unchanged (AC-13): STAGE_TRANSITIONS, QG_CHECKS registry,
check_branch_mergeable signature/behaviour, BUG-8 rollback, hook exit
codes. restart-safe, never-raise per unit of background work.

Docs: docs/architecture/README.md, CHANGELOG.md, .env.example.
Tests: tests/test_job_reaper.py, tests/test_merge_lease_reclaim.py,
tests/test_merge_gate.py (TC-16), tests/test_merge_gate_race.py (TC-17),
tests/test_queue.py, tests/test_config.py (TC-19/TC-20). 742 passed.

Refs: ORCH-065

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-07 16:14:45 +00:00

286 lines
11 KiB
Python

"""ORCH-065: job-reaper unit tests (TC-01..TC-08, TC-21).
The reaper never spawns claude; we drive the DB directly (a 'running' jobs row +
optional agent_runs exit_code/pid) and assert the terminal flip + side-effects.
``os.kill`` liveness is monkeypatched so a 'dead'/'alive' pid is deterministic.
"""
import os
import tempfile
import pytest
# Override env before importing app modules (same convention as test_queue.py).
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch_reaper.db")
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
import src.db as db
from src.db import init_db, get_db, enqueue_job, get_job
import src.job_reaper as jr
from src.job_reaper import JobReaper
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "reaper.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
init_db()
yield
# --- helpers ----------------------------------------------------------------
def _make_running_job(agent="developer", repo="orchestrator", task_id=None,
pid=None, age_s=0, attempts=0, max_attempts=2,
run_id=None, exit_code=None):
"""Insert a job already in 'running' with the given pid/age/attempts.
started_at is back-dated by ``age_s`` seconds so running_age_s reflects it.
When ``exit_code`` is given an agent_runs row is created and linked (Tier-2).
"""
conn = get_db()
if run_id is None and exit_code is not None:
cur = conn.execute(
"INSERT INTO agent_runs (task_id, agent, finished_at, exit_code) "
"VALUES (?, ?, datetime('now'), ?)",
(task_id, agent, exit_code),
)
run_id = cur.lastrowid
cur = conn.execute(
"INSERT INTO jobs (agent, repo, task_id, status, attempts, max_attempts, "
"run_id, pid, started_at) "
"VALUES (?, ?, ?, 'running', ?, ?, ?, ?, datetime('now', ?))",
(agent, repo, task_id, attempts, max_attempts, run_id, pid,
f"-{int(age_s)} seconds"),
)
job_id = cur.lastrowid
conn.commit()
conn.close()
return job_id
def _make_task(repo="orchestrator", branch="feature/x", stage="development",
work_item_id="ORCH-1"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, branch, stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _dead_pid(monkeypatch):
"""Force merge_gate.pid_alive -> False (process gone) for the reaper."""
import src.merge_gate as mg
monkeypatch.setattr(mg, "pid_alive", lambda pid: False)
def _alive_pid(monkeypatch):
import src.merge_gate as mg
monkeypatch.setattr(mg, "pid_alive", lambda pid: True)
# --- TC-01: dead executor -> reaped without process restart -----------------
def test_tc01_dead_pid_reaped_to_queued(monkeypatch):
_dead_pid(monkeypatch)
jid = _make_running_job(pid=999999, attempts=0, max_attempts=2)
r = JobReaper()
r.reap_once() # tick 1 (streak=1, dead_ticks default 2 -> not yet)
assert get_job(jid)["status"] == "running"
r.reap_once() # tick 2 -> reaped
assert get_job(jid)["status"] == "queued"
assert r.reaped_total == 1
assert r.last_reaped["job_id"] == jid
# --- TC-02: live agent within timeout is NEVER reaped -----------------------
def test_tc02_alive_pid_never_reaped(monkeypatch):
_alive_pid(monkeypatch)
jid = _make_running_job(pid=4321, age_s=10)
r = JobReaper()
for _ in range(5):
r.reap_once()
assert get_job(jid)["status"] == "running"
assert r.reaped_total == 0
def test_tc02_alive_within_max_running_not_reaped(monkeypatch):
_alive_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_max_running_s", 3600)
jid = _make_running_job(pid=4321, age_s=1800) # < ceiling, alive
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "running"
# --- TC-03: zombie only after reaper_dead_ticks consecutive ticks -----------
def test_tc03_requires_consecutive_dead_ticks(monkeypatch):
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 3)
import src.merge_gate as mg
# Dead, dead, ALIVE (resets), dead, dead, dead -> reaped only on the 6th tick.
seq = iter([False, False, True, False, False, False])
monkeypatch.setattr(mg, "pid_alive", lambda pid: next(seq))
jid = _make_running_job(pid=999998)
r = JobReaper()
for _ in range(5):
r.reap_once()
assert get_job(jid)["status"] == "running"
r.reap_once() # 6th tick: third CONSECUTIVE dead -> reaped
assert get_job(jid)["status"] == "queued"
# --- TC-04: backstop ceiling reaps even when liveness is unknown ------------
def test_tc04_backstop_ceiling(monkeypatch):
_alive_pid(monkeypatch) # liveness says "alive", but age exceeds the ceiling
monkeypatch.setattr(db.settings, "reaper_max_running_s", 100)
jid = _make_running_job(pid=4321, age_s=500)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued"
assert r.reaped_total == 1
def test_tc04_backstop_no_pid(monkeypatch):
monkeypatch.setattr(db.settings, "reaper_max_running_s", 100)
jid = _make_running_job(pid=None, age_s=500)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued"
# --- TC-05: correct outcome by exit_code (Tier-2) ---------------------------
def test_tc05_exit0_gate_green_done(monkeypatch):
# A developer job runs to LEAVE the 'architecture' stage (-> 'development').
tid = _make_task(stage="architecture")
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0)
# gate green -> advance succeeds (stage leaves the developer candidate set).
import src.agents.launcher as L
monkeypatch.setattr(
L.launcher, "_try_advance_stage",
lambda run_id, agent, repo, branch: db.update_task_stage(tid, "development"),
)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "done"
def test_tc05_exit0_gate_red_requeues(monkeypatch):
tid = _make_task(stage="architecture")
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0,
attempts=0, max_attempts=2)
# gate red -> _try_advance_stage is a no-op (stage stays 'architecture').
import src.agents.launcher as L
monkeypatch.setattr(L.launcher, "_try_advance_stage",
lambda run_id, agent, repo, branch: None)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued" # exit0 but gate red -> not 'done'
def test_tc05_nonzero_exit_requeue_then_failed(monkeypatch):
sent = []
monkeypatch.setattr(jr, "JobReaper", JobReaper)
tid = _make_task(stage="development")
jid = _make_running_job(agent="developer", task_id=tid, exit_code=1,
attempts=1, max_attempts=2)
r = JobReaper()
import src.notifications as notif
monkeypatch.setattr(notif, "send_telegram", lambda *a, **k: sent.append(a))
r.reap_once() # attempts(1) < max(2) -> queued
assert get_job(jid)["status"] == "queued"
# Now exhaust the budget.
jid2 = _make_running_job(agent="developer", task_id=tid, exit_code=1,
attempts=2, max_attempts=2)
r.reap_once()
assert get_job(jid2)["status"] == "failed"
assert sent, "failed reap must send a Telegram alert"
# --- TC-06: atomicity — reaper vs requeue_running_jobs (status guard) --------
def test_tc06_atomic_no_double_reap(monkeypatch):
_dead_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1)
jid = _make_running_job(pid=999997, attempts=0, max_attempts=2)
# Simulate the startup requeue winning the row first.
n = db.requeue_running_jobs()
assert n == 1
assert get_job(jid)["status"] == "queued"
# The reaper now scans: the row is no longer 'running' -> reap_running_job's
# WHERE status='running' guard yields rowcount 0 -> no second processing.
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued"
assert r.reaped_total == 0
def test_tc06_reap_running_job_guard_returns_false_when_not_running():
jid = enqueue_job("developer", "orchestrator") # status 'queued', not running
assert db.reap_running_job(jid, "done") is False
assert get_job(jid)["status"] == "queued"
# --- TC-07: kill-switch reaper_enabled=False -> no-op -----------------------
def test_tc07_kill_switch(monkeypatch):
_dead_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_enabled", False)
monkeypatch.setattr(db.settings, "lease_reclaim_enabled", False)
jid = _make_running_job(pid=999996, age_s=99999)
r = JobReaper()
for _ in range(3):
r.reap_once()
assert get_job(jid)["status"] == "running"
assert r.reaped_total == 0
# --- TC-08: never-raise — a DB/OS error in one tick does not propagate -------
def test_tc08_never_raise_isolates_per_job(monkeypatch):
_dead_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1)
good = _make_running_job(pid=111, attempts=0, max_attempts=2)
bad = _make_running_job(pid=222, attempts=0, max_attempts=2)
r = JobReaper()
orig = r._reap_job
def boom(job):
if job["id"] == bad:
raise RuntimeError("simulated per-job failure")
return orig(job)
monkeypatch.setattr(r, "_reap_job", boom)
# Must not raise despite the bad job blowing up.
r.reap_once()
# The good job is still reaped; the bad one is isolated (stays running).
assert get_job(good)["status"] == "queued"
assert get_job(bad)["status"] == "running"
def test_tc08_reap_once_outer_never_raises(monkeypatch):
monkeypatch.setattr(jr, "get_running_jobs",
lambda: (_ for _ in ()).throw(RuntimeError("db down")))
r = JobReaper()
# reap_once swallows... actually get_running_jobs is iterated in the for; the
# _tick wrapper guarantees the loop never dies. Assert _tick is safe.
r._tick()
assert r.last_run_ts is not None
# --- TC-21: startup lease-reclaim + reaper start/stop smoke -----------------
def test_tc21_reaper_start_stop_smoke():
r = JobReaper(interval_s=0.05)
r.start()
assert r._thread is not None and r._thread.is_alive()
r.stop(timeout=2)
assert not r._thread.is_alive()
def test_tc21_reclaim_all_stale_leases_callable(monkeypatch):
# No lease files present -> 0 reclaimed, never raises (registration smoke).
monkeypatch.setattr(db.settings, "lease_reclaim_enabled", True)
assert jr.reclaim_all_stale_leases() == 0