"""ORCH-065: job-reaper unit tests (TC-01..TC-08, TC-21). The reaper never spawns claude; we drive the DB directly (a 'running' jobs row + optional agent_runs exit_code/pid) and assert the terminal flip + side-effects. ``os.kill`` liveness is monkeypatched so a 'dead'/'alive' pid is deterministic. """ import os import tempfile import pytest # Override env before importing app modules (same convention as test_queue.py). os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch_reaper.db") os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ["ORCH_GITEA_TOKEN"] = "test-token" os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" import src.db as db from src.db import init_db, get_db, enqueue_job, get_job import src.job_reaper as jr from src.job_reaper import JobReaper @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): dbfile = tmp_path / "reaper.db" monkeypatch.setattr(db.settings, "db_path", str(dbfile)) init_db() yield # --- helpers ---------------------------------------------------------------- def _make_running_job(agent="developer", repo="orchestrator", task_id=None, pid=None, age_s=0, attempts=0, max_attempts=2, run_id=None, exit_code=None, finished_age_s=600): """Insert a job already in 'running' with the given pid/age/attempts. started_at is back-dated by ``age_s`` seconds so running_age_s reflects it. When ``exit_code`` is given an agent_runs row is created and linked (Tier-2); its ``finished_at`` is back-dated by ``finished_age_s`` seconds so the Tier-2 finalization grace (``reaper_finalize_grace_s``, default 300) is satisfied by default — pass a small ``finished_age_s`` to exercise the "monitor may still be finalizing" deferral. """ conn = get_db() if run_id is None and exit_code is not None: cur = conn.execute( "INSERT INTO agent_runs (task_id, agent, finished_at, exit_code) " "VALUES (?, ?, datetime('now', ?), ?)", (task_id, agent, f"-{int(finished_age_s)} seconds", exit_code), ) run_id = cur.lastrowid cur = conn.execute( "INSERT INTO jobs (agent, repo, task_id, status, attempts, max_attempts, " "run_id, pid, started_at) " "VALUES (?, ?, ?, 'running', ?, ?, ?, ?, datetime('now', ?))", (agent, repo, task_id, attempts, max_attempts, run_id, pid, f"-{int(age_s)} seconds"), ) job_id = cur.lastrowid conn.commit() conn.close() return job_id def _make_task(repo="orchestrator", branch="feature/x", stage="development", work_item_id="ORCH-1"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " "VALUES (?, ?, ?, ?, ?)", (work_item_id, work_item_id, repo, branch, stage), ) tid = cur.lastrowid conn.commit() conn.close() return tid def _dead_pid(monkeypatch): """Force merge_gate.pid_alive -> False (process gone) for the reaper.""" import src.merge_gate as mg monkeypatch.setattr(mg, "pid_alive", lambda pid: False) def _alive_pid(monkeypatch): import src.merge_gate as mg monkeypatch.setattr(mg, "pid_alive", lambda pid: True) # --- TC-01: dead executor -> reaped without process restart ----------------- def test_tc01_dead_pid_reaped_to_queued(monkeypatch): _dead_pid(monkeypatch) jid = _make_running_job(pid=999999, attempts=0, max_attempts=2) r = JobReaper() r.reap_once() # tick 1 (streak=1, dead_ticks default 2 -> not yet) assert get_job(jid)["status"] == "running" r.reap_once() # tick 2 -> reaped assert get_job(jid)["status"] == "queued" assert r.reaped_total == 1 assert r.last_reaped["job_id"] == jid # --- TC-02: live agent within timeout is NEVER reaped ----------------------- def test_tc02_alive_pid_never_reaped(monkeypatch): _alive_pid(monkeypatch) jid = _make_running_job(pid=4321, age_s=10) r = JobReaper() for _ in range(5): r.reap_once() assert get_job(jid)["status"] == "running" assert r.reaped_total == 0 def test_tc02_alive_within_max_running_not_reaped(monkeypatch): _alive_pid(monkeypatch) monkeypatch.setattr(db.settings, "reaper_max_running_s", 3600) jid = _make_running_job(pid=4321, age_s=1800) # < ceiling, alive r = JobReaper() r.reap_once() assert get_job(jid)["status"] == "running" # --- TC-03: zombie only after reaper_dead_ticks consecutive ticks ----------- def test_tc03_requires_consecutive_dead_ticks(monkeypatch): monkeypatch.setattr(db.settings, "reaper_dead_ticks", 3) import src.merge_gate as mg # Dead, dead, ALIVE (resets), dead, dead, dead -> reaped only on the 6th tick. seq = iter([False, False, True, False, False, False]) monkeypatch.setattr(mg, "pid_alive", lambda pid: next(seq)) jid = _make_running_job(pid=999998) r = JobReaper() for _ in range(5): r.reap_once() assert get_job(jid)["status"] == "running" r.reap_once() # 6th tick: third CONSECUTIVE dead -> reaped assert get_job(jid)["status"] == "queued" # --- TC-04: backstop ceiling reaps even when liveness is unknown ------------ def test_tc04_backstop_ceiling(monkeypatch): _alive_pid(monkeypatch) # liveness says "alive", but age exceeds the ceiling monkeypatch.setattr(db.settings, "reaper_max_running_s", 100) jid = _make_running_job(pid=4321, age_s=500) r = JobReaper() r.reap_once() assert get_job(jid)["status"] == "queued" assert r.reaped_total == 1 def test_tc04_backstop_no_pid(monkeypatch): monkeypatch.setattr(db.settings, "reaper_max_running_s", 100) jid = _make_running_job(pid=None, age_s=500) r = JobReaper() r.reap_once() assert get_job(jid)["status"] == "queued" # --- TC-05: correct outcome by exit_code (Tier-2) --------------------------- def _gate(monkeypatch, green: bool): """Force the reaper's READ-ONLY gate pre-evaluation to green/red.""" monkeypatch.setattr( JobReaper, "_gate_is_green", lambda self, stage, job, branch, wid: green, ) def test_tc05_exit0_gate_green_done(monkeypatch): # A developer job runs to LEAVE the 'architecture' stage (-> 'development'). tid = _make_task(stage="architecture") jid = _make_running_job(agent="developer", task_id=tid, exit_code=0) _gate(monkeypatch, green=True) # gate green -> the claim flips 'done' FIRST, then the advance runs. import src.agents.launcher as L monkeypatch.setattr( L.launcher, "_try_advance_stage", lambda run_id, agent, repo, branch: db.update_task_stage(tid, "development"), ) r = JobReaper() r.reap_once() assert get_job(jid)["status"] == "done" def test_tc05_exit0_gate_red_requeues(monkeypatch): tid = _make_task(stage="architecture") jid = _make_running_job(agent="developer", task_id=tid, exit_code=0, attempts=0, max_attempts=2) _gate(monkeypatch, green=False) # read-only pre-eval says red # The advance path must NEVER run when the gate is red (claim-before-act). import src.agents.launcher as L called = [] monkeypatch.setattr(L.launcher, "_try_advance_stage", lambda run_id, agent, repo, branch: called.append(1)) r = JobReaper() r.reap_once() assert get_job(jid)["status"] == "queued" # exit0 but gate red -> not 'done' assert not called, "no advance/side-effects on a red gate" def test_tc05_exit0_already_advanced_done_no_side_effects(monkeypatch): # Stage already past the developer candidate set -> idempotent clean 'done' # with NO advance call (the monitor already advanced before dying). tid = _make_task(stage="development") # developer's candidate is 'architecture' jid = _make_running_job(agent="developer", task_id=tid, exit_code=0) import src.agents.launcher as L called = [] monkeypatch.setattr(L.launcher, "_try_advance_stage", lambda run_id, agent, repo, branch: called.append(1)) r = JobReaper() r.reap_once() assert get_job(jid)["status"] == "done" assert not called, "already-advanced reap must not re-advance" def test_tc05_nonzero_exit_requeue_then_failed(monkeypatch): sent = [] monkeypatch.setattr(jr, "JobReaper", JobReaper) tid = _make_task(stage="development") jid = _make_running_job(agent="developer", task_id=tid, exit_code=1, attempts=1, max_attempts=2) r = JobReaper() import src.notifications as notif monkeypatch.setattr(notif, "send_telegram", lambda *a, **k: sent.append(a)) r.reap_once() # attempts(1) < max(2) -> queued assert get_job(jid)["status"] == "queued" # Now exhaust the budget. jid2 = _make_running_job(agent="developer", task_id=tid, exit_code=1, attempts=2, max_attempts=2) r.reap_once() assert get_job(jid2)["status"] == "failed" assert sent, "failed reap must send a Telegram alert" # --- TC-05b: Tier-2 finalization grace (live monitor still finalizing) ------- def test_tc05_tier2_within_grace_not_reaped(monkeypatch): """exit_code freshly recorded -> a LIVE monitor may still be finalizing. The reaper must NOT reap it within ``reaper_finalize_grace_s`` (FR-1.3/AC-3: a live finalizing monitor — git push / PR / Plane comments — is never reaped, no dup advance / enqueue). """ monkeypatch.setattr(db.settings, "reaper_finalize_grace_s", 300) tid = _make_task(stage="architecture") # exit_code recorded only 5s ago -> still inside the finalization grace. jid = _make_running_job(agent="developer", task_id=tid, exit_code=0, finished_age_s=5) import src.agents.launcher as L called = [] monkeypatch.setattr(L.launcher, "_try_advance_stage", lambda run_id, agent, repo, branch: called.append(1)) r = JobReaper() r.reap_once() assert get_job(jid)["status"] == "running" # deferred, NOT reaped assert r.reaped_total == 0 assert not called, "a live finalizing monitor must not be advanced by the reaper" def test_tc05_tier2_after_grace_reaped(monkeypatch): """Once exit_code has been recorded longer than the grace, the monitor is genuinely dead and the Tier-2 reap proceeds.""" monkeypatch.setattr(db.settings, "reaper_finalize_grace_s", 300) tid = _make_task(stage="architecture") jid = _make_running_job(agent="developer", task_id=tid, exit_code=0, finished_age_s=600) # well past the grace _gate(monkeypatch, green=True) import src.agents.launcher as L monkeypatch.setattr( L.launcher, "_try_advance_stage", lambda run_id, agent, repo, branch: db.update_task_stage(tid, "development"), ) r = JobReaper() r.reap_once() assert get_job(jid)["status"] == "done" def test_tc05_tier2_lost_claim_no_side_effects(monkeypatch): """claim-BEFORE-act: when another actor (a late monitor / startup requeue) moves the row out of 'running' AFTER the reaper read it but BEFORE the atomic claim, the reaper's claim loses (rowcount==0) and it performs NO advance side effects (no dup advance / dup enqueue) — ADR-001 Р-1.""" monkeypatch.setattr(db.settings, "reaper_finalize_grace_s", 0) tid = _make_task(stage="architecture") jid = _make_running_job(agent="developer", task_id=tid, exit_code=0, finished_age_s=10) import src.agents.launcher as L called = [] monkeypatch.setattr(L.launcher, "_try_advance_stage", lambda run_id, agent, repo, branch: called.append(1)) # The read-only gate pre-eval reports green, but the row is concurrently # claimed by someone else right before the reaper's atomic claim runs. def green_then_steal(self, stage, job, branch, wid): db.requeue_running_jobs() # another actor wins the 'running' row first return True monkeypatch.setattr(JobReaper, "_gate_is_green", green_then_steal) r = JobReaper() r.reap_once() # Reaper lost the atomic claim -> no advance, no double work. The row stays # where the winner left it ('queued'), not flipped to 'done' by the reaper. assert not called, "reaper that lost the claim must not advance/enqueue" assert get_job(jid)["status"] == "queued" assert r.reaped_total == 0 # --- TC-06: atomicity — reaper vs requeue_running_jobs (status guard) -------- def test_tc06_atomic_no_double_reap(monkeypatch): _dead_pid(monkeypatch) monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1) jid = _make_running_job(pid=999997, attempts=0, max_attempts=2) # Simulate the startup requeue winning the row first. n = db.requeue_running_jobs() assert n == 1 assert get_job(jid)["status"] == "queued" # The reaper now scans: the row is no longer 'running' -> reap_running_job's # WHERE status='running' guard yields rowcount 0 -> no second processing. r = JobReaper() r.reap_once() assert get_job(jid)["status"] == "queued" assert r.reaped_total == 0 def test_tc06_reap_running_job_guard_returns_false_when_not_running(): jid = enqueue_job("developer", "orchestrator") # status 'queued', not running assert db.reap_running_job(jid, "done") is False assert get_job(jid)["status"] == "queued" # --- TC-07: kill-switch reaper_enabled=False -> no-op ----------------------- def test_tc07_kill_switch(monkeypatch): _dead_pid(monkeypatch) monkeypatch.setattr(db.settings, "reaper_enabled", False) monkeypatch.setattr(db.settings, "lease_reclaim_enabled", False) jid = _make_running_job(pid=999996, age_s=99999) r = JobReaper() for _ in range(3): r.reap_once() assert get_job(jid)["status"] == "running" assert r.reaped_total == 0 # --- TC-08: never-raise — a DB/OS error in one tick does not propagate ------- def test_tc08_never_raise_isolates_per_job(monkeypatch): _dead_pid(monkeypatch) monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1) good = _make_running_job(pid=111, attempts=0, max_attempts=2) bad = _make_running_job(pid=222, attempts=0, max_attempts=2) r = JobReaper() orig = r._reap_job def boom(job): if job["id"] == bad: raise RuntimeError("simulated per-job failure") return orig(job) monkeypatch.setattr(r, "_reap_job", boom) # Must not raise despite the bad job blowing up. r.reap_once() # The good job is still reaped; the bad one is isolated (stays running). assert get_job(good)["status"] == "queued" assert get_job(bad)["status"] == "running" def test_tc08_reap_once_outer_never_raises(monkeypatch): monkeypatch.setattr(jr, "get_running_jobs", lambda: (_ for _ in ()).throw(RuntimeError("db down"))) r = JobReaper() # reap_once swallows... actually get_running_jobs is iterated in the for; the # _tick wrapper guarantees the loop never dies. Assert _tick is safe. r._tick() assert r.last_run_ts is not None # --- TC-21: startup lease-reclaim + reaper start/stop smoke ----------------- def test_tc21_reaper_start_stop_smoke(): r = JobReaper(interval_s=0.05) r.start() assert r._thread is not None and r._thread.is_alive() r.stop(timeout=2) assert not r._thread.is_alive() def test_tc21_reclaim_all_stale_leases_callable(monkeypatch): # No lease files present -> 0 reclaimed, never raises (registration smoke). monkeypatch.setattr(db.settings, "lease_reclaim_enabled", True) assert jr.reclaim_all_stale_leases() == 0