test(queue): 19 tests for job queue lifecycle/atomicity/retry/worker (ORCH-1)
Covers enqueue->claim->mark, atomic claim (no double dispatch, 8-thread race), retry fail->queued->failed, requeue_running_jobs, observability, worker max_concurrency. Popen fully mocked (no real agent spawned).
This commit is contained in:
298
tests/test_queue.py
Normal file
298
tests/test_queue.py
Normal file
@@ -0,0 +1,298 @@
|
||||
"""Tests for ORCH-1 (F-2b) persistent job queue.
|
||||
|
||||
Covers:
|
||||
- enqueue_job -> claim_next_job -> mark_job lifecycle
|
||||
- claim_next_job atomicity (no double-dispatch of the same job)
|
||||
- retry: fail -> requeue while attempts < max_attempts, then failed
|
||||
- requeue_running_jobs (queue-recovery)
|
||||
- count_running_jobs / job_status_counts / recent_jobs
|
||||
- QueueWorker respects max_concurrency (Popen / launch fully mocked)
|
||||
|
||||
The real claude/Popen is NEVER spawned: launcher.launch_job is mocked in worker
|
||||
tests, and the launcher finalize logic is exercised directly via mark_job.
|
||||
"""
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
# Override env before importing app modules (same convention as test_qg.py).
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_queue.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
||||
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||
|
||||
import src.db as db
|
||||
from src.db import (
|
||||
init_db,
|
||||
enqueue_job,
|
||||
claim_next_job,
|
||||
mark_job,
|
||||
count_running_jobs,
|
||||
requeue_running_jobs,
|
||||
get_job,
|
||||
job_status_counts,
|
||||
recent_jobs,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fresh_db(tmp_path, monkeypatch):
|
||||
"""Point the DB at a fresh per-test sqlite file and init the schema."""
|
||||
dbfile = tmp_path / "queue.db"
|
||||
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
|
||||
init_db()
|
||||
yield
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# enqueue / claim / mark lifecycle
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestLifecycle:
|
||||
def test_enqueue_creates_queued_job(self):
|
||||
jid = enqueue_job("analyst", "enduro-trails", "task body", task_id=7)
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "queued"
|
||||
assert job["agent"] == "analyst"
|
||||
assert job["repo"] == "enduro-trails"
|
||||
assert job["task_content"] == "task body"
|
||||
assert job["task_id"] == 7
|
||||
assert job["attempts"] == 0
|
||||
assert job["max_attempts"] == 2
|
||||
|
||||
def test_claim_marks_running_and_increments_attempts(self):
|
||||
jid = enqueue_job("developer", "repo")
|
||||
claimed = claim_next_job()
|
||||
assert claimed is not None
|
||||
assert claimed["id"] == jid
|
||||
assert claimed["status"] == "running"
|
||||
assert claimed["attempts"] == 1
|
||||
assert count_running_jobs() == 1
|
||||
|
||||
def test_claim_empty_queue_returns_none(self):
|
||||
assert claim_next_job() is None
|
||||
|
||||
def test_claim_is_fifo(self):
|
||||
a = enqueue_job("analyst", "r")
|
||||
b = enqueue_job("developer", "r")
|
||||
assert claim_next_job()["id"] == a
|
||||
assert claim_next_job()["id"] == b
|
||||
|
||||
def test_mark_done(self):
|
||||
jid = enqueue_job("tester", "r")
|
||||
claim_next_job()
|
||||
mark_job(jid, "done", run_id=42)
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "done"
|
||||
assert job["run_id"] == 42
|
||||
assert job["finished_at"] is not None
|
||||
assert count_running_jobs() == 0
|
||||
|
||||
def test_mark_failed_records_error(self):
|
||||
jid = enqueue_job("tester", "r")
|
||||
claim_next_job()
|
||||
mark_job(jid, "failed", run_id=9, error="boom")
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "failed"
|
||||
assert job["error"] == "boom"
|
||||
assert job["finished_at"] is not None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# claim atomicity — no double dispatch
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestClaimAtomicity:
|
||||
def test_single_job_claimed_once(self):
|
||||
jid = enqueue_job("analyst", "r")
|
||||
first = claim_next_job()
|
||||
second = claim_next_job()
|
||||
assert first["id"] == jid
|
||||
assert second is None # already running, not re-dispatched
|
||||
|
||||
def test_concurrent_claims_no_duplicate(self):
|
||||
"""Many enqueued jobs claimed from parallel threads -> each claimed once."""
|
||||
import threading
|
||||
|
||||
n = 20
|
||||
for _ in range(n):
|
||||
enqueue_job("developer", "r")
|
||||
|
||||
claimed_ids = []
|
||||
lock = threading.Lock()
|
||||
|
||||
def grab():
|
||||
while True:
|
||||
job = claim_next_job()
|
||||
if job is None:
|
||||
return
|
||||
with lock:
|
||||
claimed_ids.append(job["id"])
|
||||
|
||||
threads = [threading.Thread(target=grab) for _ in range(8)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert len(claimed_ids) == n
|
||||
assert len(set(claimed_ids)) == n # no id claimed twice
|
||||
assert count_running_jobs() == n
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# retry semantics (mirrors launcher._finalize_job logic)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestRetry:
|
||||
def test_fail_requeues_while_under_max(self):
|
||||
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||
job = claim_next_job() # attempts=1
|
||||
assert job["attempts"] == 1
|
||||
# attempts(1) < max(2) -> requeue
|
||||
mark_job(jid, "queued", error="exit 1")
|
||||
j = get_job(jid)
|
||||
assert j["status"] == "queued"
|
||||
assert j["error"] == "exit 1"
|
||||
assert j["started_at"] is None # requeue clears started_at
|
||||
|
||||
def test_fail_fails_when_max_reached(self):
|
||||
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||
claim_next_job() # attempts=1 -> requeue
|
||||
mark_job(jid, "queued")
|
||||
job2 = claim_next_job() # attempts=2
|
||||
assert job2["attempts"] == 2
|
||||
# attempts(2) >= max(2) -> failed
|
||||
mark_job(jid, "failed", error="exit 1")
|
||||
assert get_job(jid)["status"] == "failed"
|
||||
|
||||
def test_finalize_job_done(self):
|
||||
"""launcher._finalize_job marks done on exit_code 0 (no Popen needed)."""
|
||||
from src.agents.launcher import AgentLauncher
|
||||
jid = enqueue_job("analyst", "r")
|
||||
claim_next_job()
|
||||
AgentLauncher()._finalize_job(jid, "analyst", run_id=5, exit_code=0)
|
||||
assert get_job(jid)["status"] == "done"
|
||||
|
||||
def test_finalize_job_requeue_then_fail(self, monkeypatch):
|
||||
from src.agents.launcher import AgentLauncher
|
||||
# Silence telegram side-effect.
|
||||
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||
lr = AgentLauncher()
|
||||
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||
|
||||
claim_next_job() # attempts=1
|
||||
lr._finalize_job(jid, "developer", run_id=1, exit_code=2)
|
||||
assert get_job(jid)["status"] == "queued" # 1 < 2 -> requeue
|
||||
|
||||
claim_next_job() # attempts=2
|
||||
lr._finalize_job(jid, "developer", run_id=2, exit_code=2)
|
||||
assert get_job(jid)["status"] == "failed" # 2 >= 2 -> failed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# queue-recovery
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestRequeueRunning:
|
||||
def test_requeue_running_jobs(self):
|
||||
a = enqueue_job("analyst", "r")
|
||||
b = enqueue_job("developer", "r")
|
||||
claim_next_job() # a -> running
|
||||
claim_next_job() # b -> running
|
||||
assert count_running_jobs() == 2
|
||||
n = requeue_running_jobs()
|
||||
assert n == 2
|
||||
assert count_running_jobs() == 0
|
||||
assert get_job(a)["status"] == "queued"
|
||||
assert get_job(b)["status"] == "queued"
|
||||
|
||||
def test_requeue_preserves_attempts(self):
|
||||
jid = enqueue_job("analyst", "r")
|
||||
claim_next_job() # attempts=1
|
||||
requeue_running_jobs()
|
||||
assert get_job(jid)["attempts"] == 1 # not reset
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# observability helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestObservability:
|
||||
def test_status_counts(self):
|
||||
enqueue_job("analyst", "r") # stays queued
|
||||
enqueue_job("developer", "r") # first claimed -> running (FIFO)
|
||||
claim_next_job()
|
||||
counts = job_status_counts()
|
||||
assert counts["running"] == 1
|
||||
assert counts["queued"] == 1
|
||||
assert counts["done"] == 0
|
||||
assert counts["failed"] == 0
|
||||
|
||||
def test_recent_jobs_desc(self):
|
||||
ids = [enqueue_job("analyst", "r") for _ in range(3)]
|
||||
recent = recent_jobs(10)
|
||||
assert [r["id"] for r in recent] == sorted(ids, reverse=True)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# QueueWorker max_concurrency (launch_job fully mocked — no real Popen)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestWorkerConcurrency:
|
||||
def test_worker_respects_max_concurrency(self, monkeypatch):
|
||||
from src.queue_worker import QueueWorker
|
||||
|
||||
launched = []
|
||||
|
||||
def fake_launch_job(job):
|
||||
# Simulate a long-running agent: the job stays 'running' (we do NOT
|
||||
# mark it done), so the slot remains occupied.
|
||||
launched.append(job["id"])
|
||||
return 100 + job["id"]
|
||||
|
||||
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
|
||||
|
||||
for _ in range(5):
|
||||
enqueue_job("developer", "r")
|
||||
|
||||
w = QueueWorker(max_concurrency=2, poll_interval=0.01)
|
||||
w._drain_once()
|
||||
|
||||
# Only max_concurrency jobs may be launched / running at once.
|
||||
assert len(launched) == 2
|
||||
assert count_running_jobs() == 2
|
||||
|
||||
def test_worker_drains_as_slots_free(self, monkeypatch):
|
||||
from src.queue_worker import QueueWorker
|
||||
|
||||
def fake_launch_job(job):
|
||||
# Immediately complete the job so the slot frees for the next claim.
|
||||
mark_job(job["id"], "done", run_id=job["id"])
|
||||
return job["id"]
|
||||
|
||||
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
|
||||
|
||||
for _ in range(4):
|
||||
enqueue_job("analyst", "r")
|
||||
|
||||
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
|
||||
w._drain_once()
|
||||
|
||||
# With instant completion and concurrency 1, one drain pass empties the queue.
|
||||
assert job_status_counts()["done"] == 4
|
||||
assert count_running_jobs() == 0
|
||||
|
||||
def test_worker_launch_failure_does_not_wedge_slot(self, monkeypatch):
|
||||
from src.queue_worker import QueueWorker
|
||||
|
||||
def boom(job):
|
||||
raise RuntimeError("repo missing")
|
||||
|
||||
monkeypatch.setattr("src.queue_worker.launcher.launch_job", boom)
|
||||
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||
|
||||
enqueue_job("developer", "r", max_attempts=1)
|
||||
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
|
||||
w._drain_once()
|
||||
|
||||
# attempts=1 >= max_attempts=1 -> failed, not stuck running.
|
||||
assert count_running_jobs() == 0
|
||||
counts = job_status_counts()
|
||||
assert counts["failed"] == 1
|
||||
Reference in New Issue
Block a user