Covers preflight FAIL->queued + cache, transient/permanent classifier + Retry-After, exp backoff + available_at gating, launcher transient vs permanent finalize, circuit breaker open/half-open/closed. test_queue worker tests stub preflight OK. Popen never spawned.
305 lines
11 KiB
Python
305 lines
11 KiB
Python
"""Tests for ORCH-1 (F-2b) persistent job queue.
|
|
|
|
Covers:
|
|
- enqueue_job -> claim_next_job -> mark_job lifecycle
|
|
- claim_next_job atomicity (no double-dispatch of the same job)
|
|
- retry: fail -> requeue while attempts < max_attempts, then failed
|
|
- requeue_running_jobs (queue-recovery)
|
|
- count_running_jobs / job_status_counts / recent_jobs
|
|
- QueueWorker respects max_concurrency (Popen / launch fully mocked)
|
|
|
|
The real claude/Popen is NEVER spawned: launcher.launch_job is mocked in worker
|
|
tests, and the launcher finalize logic is exercised directly via mark_job.
|
|
"""
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
# Override env before importing app modules (same convention as test_qg.py).
|
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_queue.db")
|
|
os.environ["ORCH_DB_PATH"] = _test_db
|
|
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
|
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
|
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
|
|
|
import src.db as db
|
|
from src.db import (
|
|
init_db,
|
|
enqueue_job,
|
|
claim_next_job,
|
|
mark_job,
|
|
count_running_jobs,
|
|
requeue_running_jobs,
|
|
get_job,
|
|
job_status_counts,
|
|
recent_jobs,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
"""Point the DB at a fresh per-test sqlite file and init the schema."""
|
|
dbfile = tmp_path / "queue.db"
|
|
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
|
|
init_db()
|
|
yield
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# enqueue / claim / mark lifecycle
|
|
# ---------------------------------------------------------------------------
|
|
class TestLifecycle:
|
|
def test_enqueue_creates_queued_job(self):
|
|
jid = enqueue_job("analyst", "enduro-trails", "task body", task_id=7)
|
|
job = get_job(jid)
|
|
assert job["status"] == "queued"
|
|
assert job["agent"] == "analyst"
|
|
assert job["repo"] == "enduro-trails"
|
|
assert job["task_content"] == "task body"
|
|
assert job["task_id"] == 7
|
|
assert job["attempts"] == 0
|
|
assert job["max_attempts"] == 2
|
|
|
|
def test_claim_marks_running_and_increments_attempts(self):
|
|
jid = enqueue_job("developer", "repo")
|
|
claimed = claim_next_job()
|
|
assert claimed is not None
|
|
assert claimed["id"] == jid
|
|
assert claimed["status"] == "running"
|
|
assert claimed["attempts"] == 1
|
|
assert count_running_jobs() == 1
|
|
|
|
def test_claim_empty_queue_returns_none(self):
|
|
assert claim_next_job() is None
|
|
|
|
def test_claim_is_fifo(self):
|
|
a = enqueue_job("analyst", "r")
|
|
b = enqueue_job("developer", "r")
|
|
assert claim_next_job()["id"] == a
|
|
assert claim_next_job()["id"] == b
|
|
|
|
def test_mark_done(self):
|
|
jid = enqueue_job("tester", "r")
|
|
claim_next_job()
|
|
mark_job(jid, "done", run_id=42)
|
|
job = get_job(jid)
|
|
assert job["status"] == "done"
|
|
assert job["run_id"] == 42
|
|
assert job["finished_at"] is not None
|
|
assert count_running_jobs() == 0
|
|
|
|
def test_mark_failed_records_error(self):
|
|
jid = enqueue_job("tester", "r")
|
|
claim_next_job()
|
|
mark_job(jid, "failed", run_id=9, error="boom")
|
|
job = get_job(jid)
|
|
assert job["status"] == "failed"
|
|
assert job["error"] == "boom"
|
|
assert job["finished_at"] is not None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# claim atomicity — no double dispatch
|
|
# ---------------------------------------------------------------------------
|
|
class TestClaimAtomicity:
|
|
def test_single_job_claimed_once(self):
|
|
jid = enqueue_job("analyst", "r")
|
|
first = claim_next_job()
|
|
second = claim_next_job()
|
|
assert first["id"] == jid
|
|
assert second is None # already running, not re-dispatched
|
|
|
|
def test_concurrent_claims_no_duplicate(self):
|
|
"""Many enqueued jobs claimed from parallel threads -> each claimed once."""
|
|
import threading
|
|
|
|
n = 20
|
|
for _ in range(n):
|
|
enqueue_job("developer", "r")
|
|
|
|
claimed_ids = []
|
|
lock = threading.Lock()
|
|
|
|
def grab():
|
|
while True:
|
|
job = claim_next_job()
|
|
if job is None:
|
|
return
|
|
with lock:
|
|
claimed_ids.append(job["id"])
|
|
|
|
threads = [threading.Thread(target=grab) for _ in range(8)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert len(claimed_ids) == n
|
|
assert len(set(claimed_ids)) == n # no id claimed twice
|
|
assert count_running_jobs() == n
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# retry semantics (mirrors launcher._finalize_job logic)
|
|
# ---------------------------------------------------------------------------
|
|
class TestRetry:
|
|
def test_fail_requeues_while_under_max(self):
|
|
jid = enqueue_job("developer", "r", max_attempts=2)
|
|
job = claim_next_job() # attempts=1
|
|
assert job["attempts"] == 1
|
|
# attempts(1) < max(2) -> requeue
|
|
mark_job(jid, "queued", error="exit 1")
|
|
j = get_job(jid)
|
|
assert j["status"] == "queued"
|
|
assert j["error"] == "exit 1"
|
|
assert j["started_at"] is None # requeue clears started_at
|
|
|
|
def test_fail_fails_when_max_reached(self):
|
|
jid = enqueue_job("developer", "r", max_attempts=2)
|
|
claim_next_job() # attempts=1 -> requeue
|
|
mark_job(jid, "queued")
|
|
job2 = claim_next_job() # attempts=2
|
|
assert job2["attempts"] == 2
|
|
# attempts(2) >= max(2) -> failed
|
|
mark_job(jid, "failed", error="exit 1")
|
|
assert get_job(jid)["status"] == "failed"
|
|
|
|
def test_finalize_job_done(self):
|
|
"""launcher._finalize_job marks done on exit_code 0 (no Popen needed)."""
|
|
from src.agents.launcher import AgentLauncher
|
|
jid = enqueue_job("analyst", "r")
|
|
claim_next_job()
|
|
AgentLauncher()._finalize_job(jid, "analyst", run_id=5, exit_code=0)
|
|
assert get_job(jid)["status"] == "done"
|
|
|
|
def test_finalize_job_requeue_then_fail(self, monkeypatch):
|
|
from src.agents.launcher import AgentLauncher
|
|
# Silence telegram side-effect.
|
|
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
|
lr = AgentLauncher()
|
|
jid = enqueue_job("developer", "r", max_attempts=2)
|
|
|
|
claim_next_job() # attempts=1
|
|
lr._finalize_job(jid, "developer", run_id=1, exit_code=2)
|
|
assert get_job(jid)["status"] == "queued" # 1 < 2 -> requeue
|
|
|
|
claim_next_job() # attempts=2
|
|
lr._finalize_job(jid, "developer", run_id=2, exit_code=2)
|
|
assert get_job(jid)["status"] == "failed" # 2 >= 2 -> failed
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# queue-recovery
|
|
# ---------------------------------------------------------------------------
|
|
class TestRequeueRunning:
|
|
def test_requeue_running_jobs(self):
|
|
a = enqueue_job("analyst", "r")
|
|
b = enqueue_job("developer", "r")
|
|
claim_next_job() # a -> running
|
|
claim_next_job() # b -> running
|
|
assert count_running_jobs() == 2
|
|
n = requeue_running_jobs()
|
|
assert n == 2
|
|
assert count_running_jobs() == 0
|
|
assert get_job(a)["status"] == "queued"
|
|
assert get_job(b)["status"] == "queued"
|
|
|
|
def test_requeue_preserves_attempts(self):
|
|
jid = enqueue_job("analyst", "r")
|
|
claim_next_job() # attempts=1
|
|
requeue_running_jobs()
|
|
assert get_job(jid)["attempts"] == 1 # not reset
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# observability helpers
|
|
# ---------------------------------------------------------------------------
|
|
class TestObservability:
|
|
def test_status_counts(self):
|
|
enqueue_job("analyst", "r") # stays queued
|
|
enqueue_job("developer", "r") # first claimed -> running (FIFO)
|
|
claim_next_job()
|
|
counts = job_status_counts()
|
|
assert counts["running"] == 1
|
|
assert counts["queued"] == 1
|
|
assert counts["done"] == 0
|
|
assert counts["failed"] == 0
|
|
|
|
def test_recent_jobs_desc(self):
|
|
ids = [enqueue_job("analyst", "r") for _ in range(3)]
|
|
recent = recent_jobs(10)
|
|
assert [r["id"] for r in recent] == sorted(ids, reverse=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# QueueWorker max_concurrency (launch_job fully mocked — no real Popen)
|
|
# ---------------------------------------------------------------------------
|
|
class TestWorkerConcurrency:
|
|
@pytest.fixture(autouse=True)
|
|
def _ok_preflight(self, monkeypatch):
|
|
# ORCH-1 resilience: the worker gates claims behind preflight; in tests there
|
|
# is no claude binary, so stub preflight OK to exercise pure queue/concurrency.
|
|
monkeypatch.setattr("src.queue_worker.preflight.check", lambda *a, **k: (True, "ok"))
|
|
|
|
def test_worker_respects_max_concurrency(self, monkeypatch):
|
|
from src.queue_worker import QueueWorker
|
|
|
|
launched = []
|
|
|
|
def fake_launch_job(job):
|
|
# Simulate a long-running agent: the job stays 'running' (we do NOT
|
|
# mark it done), so the slot remains occupied.
|
|
launched.append(job["id"])
|
|
return 100 + job["id"]
|
|
|
|
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
|
|
|
|
for _ in range(5):
|
|
enqueue_job("developer", "r")
|
|
|
|
w = QueueWorker(max_concurrency=2, poll_interval=0.01)
|
|
w._drain_once()
|
|
|
|
# Only max_concurrency jobs may be launched / running at once.
|
|
assert len(launched) == 2
|
|
assert count_running_jobs() == 2
|
|
|
|
def test_worker_drains_as_slots_free(self, monkeypatch):
|
|
from src.queue_worker import QueueWorker
|
|
|
|
def fake_launch_job(job):
|
|
# Immediately complete the job so the slot frees for the next claim.
|
|
mark_job(job["id"], "done", run_id=job["id"])
|
|
return job["id"]
|
|
|
|
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
|
|
|
|
for _ in range(4):
|
|
enqueue_job("analyst", "r")
|
|
|
|
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
|
|
w._drain_once()
|
|
|
|
# With instant completion and concurrency 1, one drain pass empties the queue.
|
|
assert job_status_counts()["done"] == 4
|
|
assert count_running_jobs() == 0
|
|
|
|
def test_worker_launch_failure_does_not_wedge_slot(self, monkeypatch):
|
|
from src.queue_worker import QueueWorker
|
|
|
|
def boom(job):
|
|
raise RuntimeError("repo missing")
|
|
|
|
monkeypatch.setattr("src.queue_worker.launcher.launch_job", boom)
|
|
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
|
|
|
enqueue_job("developer", "r", max_attempts=1)
|
|
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
|
|
w._drain_once()
|
|
|
|
# attempts=1 >= max_attempts=1 -> failed, not stuck running.
|
|
assert count_running_jobs() == 0
|
|
counts = job_status_counts()
|
|
assert counts["failed"] == 1
|