From 2283b8898b5e652ea2f4aa7d469e0b70d3eb1303 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Tue, 2 Jun 2026 23:58:44 +0300 Subject: [PATCH] test(queue): 19 tests for job queue lifecycle/atomicity/retry/worker (ORCH-1) Covers enqueue->claim->mark, atomic claim (no double dispatch, 8-thread race), retry fail->queued->failed, requeue_running_jobs, observability, worker max_concurrency. Popen fully mocked (no real agent spawned). --- tests/test_queue.py | 298 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 298 insertions(+) create mode 100644 tests/test_queue.py diff --git a/tests/test_queue.py b/tests/test_queue.py new file mode 100644 index 0000000..d3f1536 --- /dev/null +++ b/tests/test_queue.py @@ -0,0 +1,298 @@ +"""Tests for ORCH-1 (F-2b) persistent job queue. + +Covers: + - enqueue_job -> claim_next_job -> mark_job lifecycle + - claim_next_job atomicity (no double-dispatch of the same job) + - retry: fail -> requeue while attempts < max_attempts, then failed + - requeue_running_jobs (queue-recovery) + - count_running_jobs / job_status_counts / recent_jobs + - QueueWorker respects max_concurrency (Popen / launch fully mocked) + +The real claude/Popen is NEVER spawned: launcher.launch_job is mocked in worker +tests, and the launcher finalize logic is exercised directly via mark_job. +""" +import os +import tempfile + +import pytest + +# Override env before importing app modules (same convention as test_qg.py). +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_queue.db") +os.environ["ORCH_DB_PATH"] = _test_db +os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() +os.environ["ORCH_GITEA_TOKEN"] = "test-token" +os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" + +import src.db as db +from src.db import ( + init_db, + enqueue_job, + claim_next_job, + mark_job, + count_running_jobs, + requeue_running_jobs, + get_job, + job_status_counts, + recent_jobs, +) + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + """Point the DB at a fresh per-test sqlite file and init the schema.""" + dbfile = tmp_path / "queue.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + init_db() + yield + + +# --------------------------------------------------------------------------- +# enqueue / claim / mark lifecycle +# --------------------------------------------------------------------------- +class TestLifecycle: + def test_enqueue_creates_queued_job(self): + jid = enqueue_job("analyst", "enduro-trails", "task body", task_id=7) + job = get_job(jid) + assert job["status"] == "queued" + assert job["agent"] == "analyst" + assert job["repo"] == "enduro-trails" + assert job["task_content"] == "task body" + assert job["task_id"] == 7 + assert job["attempts"] == 0 + assert job["max_attempts"] == 2 + + def test_claim_marks_running_and_increments_attempts(self): + jid = enqueue_job("developer", "repo") + claimed = claim_next_job() + assert claimed is not None + assert claimed["id"] == jid + assert claimed["status"] == "running" + assert claimed["attempts"] == 1 + assert count_running_jobs() == 1 + + def test_claim_empty_queue_returns_none(self): + assert claim_next_job() is None + + def test_claim_is_fifo(self): + a = enqueue_job("analyst", "r") + b = enqueue_job("developer", "r") + assert claim_next_job()["id"] == a + assert claim_next_job()["id"] == b + + def test_mark_done(self): + jid = enqueue_job("tester", "r") + claim_next_job() + mark_job(jid, "done", run_id=42) + job = get_job(jid) + assert job["status"] == "done" + assert job["run_id"] == 42 + assert job["finished_at"] is not None + assert count_running_jobs() == 0 + + def test_mark_failed_records_error(self): + jid = enqueue_job("tester", "r") + claim_next_job() + mark_job(jid, "failed", run_id=9, error="boom") + job = get_job(jid) + assert job["status"] == "failed" + assert job["error"] == "boom" + assert job["finished_at"] is not None + + +# --------------------------------------------------------------------------- +# claim atomicity — no double dispatch +# --------------------------------------------------------------------------- +class TestClaimAtomicity: + def test_single_job_claimed_once(self): + jid = enqueue_job("analyst", "r") + first = claim_next_job() + second = claim_next_job() + assert first["id"] == jid + assert second is None # already running, not re-dispatched + + def test_concurrent_claims_no_duplicate(self): + """Many enqueued jobs claimed from parallel threads -> each claimed once.""" + import threading + + n = 20 + for _ in range(n): + enqueue_job("developer", "r") + + claimed_ids = [] + lock = threading.Lock() + + def grab(): + while True: + job = claim_next_job() + if job is None: + return + with lock: + claimed_ids.append(job["id"]) + + threads = [threading.Thread(target=grab) for _ in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(claimed_ids) == n + assert len(set(claimed_ids)) == n # no id claimed twice + assert count_running_jobs() == n + + +# --------------------------------------------------------------------------- +# retry semantics (mirrors launcher._finalize_job logic) +# --------------------------------------------------------------------------- +class TestRetry: + def test_fail_requeues_while_under_max(self): + jid = enqueue_job("developer", "r", max_attempts=2) + job = claim_next_job() # attempts=1 + assert job["attempts"] == 1 + # attempts(1) < max(2) -> requeue + mark_job(jid, "queued", error="exit 1") + j = get_job(jid) + assert j["status"] == "queued" + assert j["error"] == "exit 1" + assert j["started_at"] is None # requeue clears started_at + + def test_fail_fails_when_max_reached(self): + jid = enqueue_job("developer", "r", max_attempts=2) + claim_next_job() # attempts=1 -> requeue + mark_job(jid, "queued") + job2 = claim_next_job() # attempts=2 + assert job2["attempts"] == 2 + # attempts(2) >= max(2) -> failed + mark_job(jid, "failed", error="exit 1") + assert get_job(jid)["status"] == "failed" + + def test_finalize_job_done(self): + """launcher._finalize_job marks done on exit_code 0 (no Popen needed).""" + from src.agents.launcher import AgentLauncher + jid = enqueue_job("analyst", "r") + claim_next_job() + AgentLauncher()._finalize_job(jid, "analyst", run_id=5, exit_code=0) + assert get_job(jid)["status"] == "done" + + def test_finalize_job_requeue_then_fail(self, monkeypatch): + from src.agents.launcher import AgentLauncher + # Silence telegram side-effect. + monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) + lr = AgentLauncher() + jid = enqueue_job("developer", "r", max_attempts=2) + + claim_next_job() # attempts=1 + lr._finalize_job(jid, "developer", run_id=1, exit_code=2) + assert get_job(jid)["status"] == "queued" # 1 < 2 -> requeue + + claim_next_job() # attempts=2 + lr._finalize_job(jid, "developer", run_id=2, exit_code=2) + assert get_job(jid)["status"] == "failed" # 2 >= 2 -> failed + + +# --------------------------------------------------------------------------- +# queue-recovery +# --------------------------------------------------------------------------- +class TestRequeueRunning: + def test_requeue_running_jobs(self): + a = enqueue_job("analyst", "r") + b = enqueue_job("developer", "r") + claim_next_job() # a -> running + claim_next_job() # b -> running + assert count_running_jobs() == 2 + n = requeue_running_jobs() + assert n == 2 + assert count_running_jobs() == 0 + assert get_job(a)["status"] == "queued" + assert get_job(b)["status"] == "queued" + + def test_requeue_preserves_attempts(self): + jid = enqueue_job("analyst", "r") + claim_next_job() # attempts=1 + requeue_running_jobs() + assert get_job(jid)["attempts"] == 1 # not reset + + +# --------------------------------------------------------------------------- +# observability helpers +# --------------------------------------------------------------------------- +class TestObservability: + def test_status_counts(self): + enqueue_job("analyst", "r") # stays queued + enqueue_job("developer", "r") # first claimed -> running (FIFO) + claim_next_job() + counts = job_status_counts() + assert counts["running"] == 1 + assert counts["queued"] == 1 + assert counts["done"] == 0 + assert counts["failed"] == 0 + + def test_recent_jobs_desc(self): + ids = [enqueue_job("analyst", "r") for _ in range(3)] + recent = recent_jobs(10) + assert [r["id"] for r in recent] == sorted(ids, reverse=True) + + +# --------------------------------------------------------------------------- +# QueueWorker max_concurrency (launch_job fully mocked — no real Popen) +# --------------------------------------------------------------------------- +class TestWorkerConcurrency: + def test_worker_respects_max_concurrency(self, monkeypatch): + from src.queue_worker import QueueWorker + + launched = [] + + def fake_launch_job(job): + # Simulate a long-running agent: the job stays 'running' (we do NOT + # mark it done), so the slot remains occupied. + launched.append(job["id"]) + return 100 + job["id"] + + monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job) + + for _ in range(5): + enqueue_job("developer", "r") + + w = QueueWorker(max_concurrency=2, poll_interval=0.01) + w._drain_once() + + # Only max_concurrency jobs may be launched / running at once. + assert len(launched) == 2 + assert count_running_jobs() == 2 + + def test_worker_drains_as_slots_free(self, monkeypatch): + from src.queue_worker import QueueWorker + + def fake_launch_job(job): + # Immediately complete the job so the slot frees for the next claim. + mark_job(job["id"], "done", run_id=job["id"]) + return job["id"] + + monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job) + + for _ in range(4): + enqueue_job("analyst", "r") + + w = QueueWorker(max_concurrency=1, poll_interval=0.01) + w._drain_once() + + # With instant completion and concurrency 1, one drain pass empties the queue. + assert job_status_counts()["done"] == 4 + assert count_running_jobs() == 0 + + def test_worker_launch_failure_does_not_wedge_slot(self, monkeypatch): + from src.queue_worker import QueueWorker + + def boom(job): + raise RuntimeError("repo missing") + + monkeypatch.setattr("src.queue_worker.launcher.launch_job", boom) + monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) + + enqueue_job("developer", "r", max_attempts=1) + w = QueueWorker(max_concurrency=1, poll_interval=0.01) + w._drain_once() + + # attempts=1 >= max_attempts=1 -> failed, not stuck running. + assert count_running_jobs() == 0 + counts = job_status_counts() + assert counts["failed"] == 1