"""Tests for ORCH-1 (F-2b) persistent job queue. Covers: - enqueue_job -> claim_next_job -> mark_job lifecycle - claim_next_job atomicity (no double-dispatch of the same job) - retry: fail -> requeue while attempts < max_attempts, then failed - requeue_running_jobs (queue-recovery) - count_running_jobs / job_status_counts / recent_jobs - QueueWorker respects max_concurrency (Popen / launch fully mocked) The real claude/Popen is NEVER spawned: launcher.launch_job is mocked in worker tests, and the launcher finalize logic is exercised directly via mark_job. """ import os import tempfile import pytest # Override env before importing app modules (same convention as test_qg.py). _test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_queue.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ["ORCH_GITEA_TOKEN"] = "test-token" os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" import src.db as db from src.db import ( init_db, enqueue_job, claim_next_job, mark_job, count_running_jobs, requeue_running_jobs, get_job, job_status_counts, recent_jobs, ) @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): """Point the DB at a fresh per-test sqlite file and init the schema.""" dbfile = tmp_path / "queue.db" monkeypatch.setattr(db.settings, "db_path", str(dbfile)) init_db() yield # --------------------------------------------------------------------------- # enqueue / claim / mark lifecycle # --------------------------------------------------------------------------- class TestLifecycle: def test_enqueue_creates_queued_job(self): jid = enqueue_job("analyst", "enduro-trails", "task body", task_id=7) job = get_job(jid) assert job["status"] == "queued" assert job["agent"] == "analyst" assert job["repo"] == "enduro-trails" assert job["task_content"] == "task body" assert job["task_id"] == 7 assert job["attempts"] == 0 assert job["max_attempts"] == 2 def test_claim_marks_running_and_increments_attempts(self): jid = enqueue_job("developer", "repo") claimed = claim_next_job() assert claimed is not None assert claimed["id"] == jid assert claimed["status"] == "running" assert claimed["attempts"] == 1 assert count_running_jobs() == 1 def test_claim_empty_queue_returns_none(self): assert claim_next_job() is None def test_claim_is_fifo(self): a = enqueue_job("analyst", "r") b = enqueue_job("developer", "r") assert claim_next_job()["id"] == a assert claim_next_job()["id"] == b def test_mark_done(self): jid = enqueue_job("tester", "r") claim_next_job() mark_job(jid, "done", run_id=42) job = get_job(jid) assert job["status"] == "done" assert job["run_id"] == 42 assert job["finished_at"] is not None assert count_running_jobs() == 0 def test_mark_failed_records_error(self): jid = enqueue_job("tester", "r") claim_next_job() mark_job(jid, "failed", run_id=9, error="boom") job = get_job(jid) assert job["status"] == "failed" assert job["error"] == "boom" assert job["finished_at"] is not None # --------------------------------------------------------------------------- # claim atomicity — no double dispatch # --------------------------------------------------------------------------- class TestClaimAtomicity: def test_single_job_claimed_once(self): jid = enqueue_job("analyst", "r") first = claim_next_job() second = claim_next_job() assert first["id"] == jid assert second is None # already running, not re-dispatched def test_concurrent_claims_no_duplicate(self): """Many enqueued jobs claimed from parallel threads -> each claimed once.""" import threading n = 20 for _ in range(n): enqueue_job("developer", "r") claimed_ids = [] lock = threading.Lock() def grab(): while True: job = claim_next_job() if job is None: return with lock: claimed_ids.append(job["id"]) threads = [threading.Thread(target=grab) for _ in range(8)] for t in threads: t.start() for t in threads: t.join() assert len(claimed_ids) == n assert len(set(claimed_ids)) == n # no id claimed twice assert count_running_jobs() == n # --------------------------------------------------------------------------- # retry semantics (mirrors launcher._finalize_job logic) # --------------------------------------------------------------------------- class TestRetry: def test_fail_requeues_while_under_max(self): jid = enqueue_job("developer", "r", max_attempts=2) job = claim_next_job() # attempts=1 assert job["attempts"] == 1 # attempts(1) < max(2) -> requeue mark_job(jid, "queued", error="exit 1") j = get_job(jid) assert j["status"] == "queued" assert j["error"] == "exit 1" assert j["started_at"] is None # requeue clears started_at def test_fail_fails_when_max_reached(self): jid = enqueue_job("developer", "r", max_attempts=2) claim_next_job() # attempts=1 -> requeue mark_job(jid, "queued") job2 = claim_next_job() # attempts=2 assert job2["attempts"] == 2 # attempts(2) >= max(2) -> failed mark_job(jid, "failed", error="exit 1") assert get_job(jid)["status"] == "failed" def test_finalize_job_done(self): """launcher._finalize_job marks done on exit_code 0 (no Popen needed).""" from src.agents.launcher import AgentLauncher jid = enqueue_job("analyst", "r") claim_next_job() AgentLauncher()._finalize_job(jid, "analyst", run_id=5, exit_code=0) assert get_job(jid)["status"] == "done" def test_finalize_job_requeue_then_fail(self, monkeypatch): from src.agents.launcher import AgentLauncher # Silence telegram side-effect. monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) lr = AgentLauncher() jid = enqueue_job("developer", "r", max_attempts=2) claim_next_job() # attempts=1 lr._finalize_job(jid, "developer", run_id=1, exit_code=2) assert get_job(jid)["status"] == "queued" # 1 < 2 -> requeue claim_next_job() # attempts=2 lr._finalize_job(jid, "developer", run_id=2, exit_code=2) assert get_job(jid)["status"] == "failed" # 2 >= 2 -> failed # --------------------------------------------------------------------------- # queue-recovery # --------------------------------------------------------------------------- class TestRequeueRunning: def test_requeue_running_jobs(self): a = enqueue_job("analyst", "r") b = enqueue_job("developer", "r") claim_next_job() # a -> running claim_next_job() # b -> running assert count_running_jobs() == 2 n = requeue_running_jobs() assert n == 2 assert count_running_jobs() == 0 assert get_job(a)["status"] == "queued" assert get_job(b)["status"] == "queued" def test_requeue_preserves_attempts(self): jid = enqueue_job("analyst", "r") claim_next_job() # attempts=1 requeue_running_jobs() assert get_job(jid)["attempts"] == 1 # not reset # --------------------------------------------------------------------------- # observability helpers # --------------------------------------------------------------------------- class TestObservability: def test_status_counts(self): enqueue_job("analyst", "r") # stays queued enqueue_job("developer", "r") # first claimed -> running (FIFO) claim_next_job() counts = job_status_counts() assert counts["running"] == 1 assert counts["queued"] == 1 assert counts["done"] == 0 assert counts["failed"] == 0 def test_recent_jobs_desc(self): ids = [enqueue_job("analyst", "r") for _ in range(3)] recent = recent_jobs(10) assert [r["id"] for r in recent] == sorted(ids, reverse=True) # --------------------------------------------------------------------------- # QueueWorker max_concurrency (launch_job fully mocked — no real Popen) # --------------------------------------------------------------------------- class TestWorkerConcurrency: @pytest.fixture(autouse=True) def _ok_preflight(self, monkeypatch): # ORCH-1 resilience: the worker gates claims behind preflight; in tests there # is no claude binary, so stub preflight OK to exercise pure queue/concurrency. monkeypatch.setattr("src.queue_worker.preflight.check", lambda *a, **k: (True, "ok")) def test_worker_respects_max_concurrency(self, monkeypatch): from src.queue_worker import QueueWorker launched = [] def fake_launch_job(job): # Simulate a long-running agent: the job stays 'running' (we do NOT # mark it done), so the slot remains occupied. launched.append(job["id"]) return 100 + job["id"] monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job) for _ in range(5): enqueue_job("developer", "r") w = QueueWorker(max_concurrency=2, poll_interval=0.01) w._drain_once() # Only max_concurrency jobs may be launched / running at once. assert len(launched) == 2 assert count_running_jobs() == 2 def test_worker_drains_as_slots_free(self, monkeypatch): from src.queue_worker import QueueWorker def fake_launch_job(job): # Immediately complete the job so the slot frees for the next claim. mark_job(job["id"], "done", run_id=job["id"]) return job["id"] monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job) for _ in range(4): enqueue_job("analyst", "r") w = QueueWorker(max_concurrency=1, poll_interval=0.01) w._drain_once() # With instant completion and concurrency 1, one drain pass empties the queue. assert job_status_counts()["done"] == 4 assert count_running_jobs() == 0 def test_worker_launch_failure_does_not_wedge_slot(self, monkeypatch): from src.queue_worker import QueueWorker def boom(job): raise RuntimeError("repo missing") monkeypatch.setattr("src.queue_worker.launcher.launch_job", boom) monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) enqueue_job("developer", "r", max_attempts=1) w = QueueWorker(max_concurrency=1, poll_interval=0.01) w._drain_once() # attempts=1 >= max_attempts=1 -> failed, not stuck running. assert count_running_jobs() == 0 counts = job_status_counts() assert counts["failed"] == 1 # --------------------------------------------------------------------------- # ORCH-065: job-reaper unblocks the shared queue (TC-09) + /queue block (TC-18) # --------------------------------------------------------------------------- class TestReaperUnblocksQueue: def test_tc09_reap_unblocks_claim_at_concurrency_1(self, monkeypatch): """A zombie 'running' row at max_concurrency=1 blocks every claim; once the reaper reaps it the next queued job can be claimed (AC-2).""" import src.merge_gate as mg from src.job_reaper import JobReaper monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1) monkeypatch.setattr(mg, "pid_alive", lambda pid: False) # zombie pid dead # A zombie row stuck 'running' with a dead pid. conn = db.get_db() cur = conn.execute( "INSERT INTO jobs (agent, repo, status, attempts, max_attempts, pid, " "started_at) VALUES ('developer','r','running',2,2,999999,datetime('now'))" ) zombie = cur.lastrowid conn.commit() conn.close() # A second job waits in the queue behind it. nxt = enqueue_job("analyst", "r") # At concurrency 1 the slot is fully occupied -> nothing else can run. assert count_running_jobs() == 1 monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) JobReaper().reap_once() # dead pid, attempts>=max -> failed assert get_job(zombie)["status"] == "failed" assert count_running_jobs() == 0 # Queue is unblocked: the next job claims successfully. claimed = claim_next_job() assert claimed is not None and claimed["id"] == nxt def test_tc18_queue_endpoint_has_reaper_block(self): """GET /queue exposes the reaper observability block (AC-15). Calls the endpoint coroutine directly (no lifespan / no background threads / no network) so the test stays hermetic. """ import asyncio import src.main as main body = asyncio.run(main.queue()) assert "reaper" in body reaper = body["reaper"] for key in ("enabled", "interval", "last_run_ts", "reaped_total", "last_reaped", "lease_reclaimed_total"): assert key in reaper