test(resilience): 34 tests for preflight/classifier/backoff/breaker (ORCH-1)
Covers preflight FAIL->queued + cache, transient/permanent classifier + Retry-After, exp backoff + available_at gating, launcher transient vs permanent finalize, circuit breaker open/half-open/closed. test_queue worker tests stub preflight OK. Popen never spawned.
This commit is contained in:
@@ -236,6 +236,12 @@ class TestObservability:
|
||||
# QueueWorker max_concurrency (launch_job fully mocked — no real Popen)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestWorkerConcurrency:
|
||||
@pytest.fixture(autouse=True)
|
||||
def _ok_preflight(self, monkeypatch):
|
||||
# ORCH-1 resilience: the worker gates claims behind preflight; in tests there
|
||||
# is no claude binary, so stub preflight OK to exercise pure queue/concurrency.
|
||||
monkeypatch.setattr("src.queue_worker.preflight.check", lambda *a, **k: (True, "ok"))
|
||||
|
||||
def test_worker_respects_max_concurrency(self, monkeypatch):
|
||||
from src.queue_worker import QueueWorker
|
||||
|
||||
|
||||
295
tests/test_resilience.py
Normal file
295
tests/test_resilience.py
Normal file
@@ -0,0 +1,295 @@
|
||||
"""ORCH-1 resilience tests: preflight, 429-classifier, backoff, circuit breaker.
|
||||
|
||||
No real claude/Popen is ever spawned: preflight subprocess and launcher.launch_job
|
||||
are mocked. DB is a fresh per-test sqlite file.
|
||||
"""
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_resilience.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
||||
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||
|
||||
import src.db as db
|
||||
from src.db import (
|
||||
init_db, enqueue_job, claim_next_job, get_job, count_running_jobs,
|
||||
mark_job_transient,
|
||||
)
|
||||
from src import preflight, error_classifier
|
||||
from src.error_classifier import classify_text, parse_retry_after, classify_log_file
|
||||
from src.queue_worker import QueueWorker, CircuitBreaker
|
||||
from src.agents.launcher import AgentLauncher
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fresh_db(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(db.settings, "db_path", str(tmp_path / "res.db"))
|
||||
init_db()
|
||||
preflight.reset_cache()
|
||||
yield
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# A. Preflight
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestPreflight:
|
||||
def test_fail_when_bin_missing(self, monkeypatch):
|
||||
monkeypatch.setattr(preflight, "_claude_bin", lambda: "/no/such/claude")
|
||||
ok, reason = preflight.check(force=True)
|
||||
assert ok is False
|
||||
assert "not found" in reason.lower()
|
||||
|
||||
def test_ok_when_version_succeeds(self, monkeypatch, tmp_path):
|
||||
fake_bin = tmp_path / "claude"
|
||||
fake_bin.write_text("#!/bin/sh\necho v1\n")
|
||||
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
|
||||
monkeypatch.setattr(preflight, "_run_version", lambda b: (True, "1.2.3"))
|
||||
ok, reason = preflight.check(force=True)
|
||||
assert ok is True
|
||||
|
||||
def test_cache_does_not_recheck_within_ttl(self, monkeypatch, tmp_path):
|
||||
fake_bin = tmp_path / "claude"
|
||||
fake_bin.write_text("x")
|
||||
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
|
||||
monkeypatch.setattr(db.settings, "preflight_cache_ttl", 999)
|
||||
|
||||
calls = {"n": 0}
|
||||
|
||||
def counting_version(b):
|
||||
calls["n"] += 1
|
||||
return True, "ok"
|
||||
|
||||
monkeypatch.setattr(preflight, "_run_version", counting_version)
|
||||
preflight.reset_cache()
|
||||
preflight.check() # first -> runs version
|
||||
preflight.check() # cached -> no extra version call
|
||||
preflight.check()
|
||||
assert calls["n"] == 1
|
||||
|
||||
def test_force_bypasses_cache(self, monkeypatch, tmp_path):
|
||||
fake_bin = tmp_path / "claude"
|
||||
fake_bin.write_text("x")
|
||||
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
|
||||
calls = {"n": 0}
|
||||
monkeypatch.setattr(preflight, "_run_version",
|
||||
lambda b: (calls.__setitem__("n", calls["n"] + 1), (True, "ok"))[1])
|
||||
preflight.reset_cache()
|
||||
preflight.check()
|
||||
preflight.check(force=True)
|
||||
assert calls["n"] == 2
|
||||
|
||||
def test_worker_does_not_claim_when_preflight_fails(self, monkeypatch):
|
||||
# Preflight FAIL -> job stays queued, launch_job never called.
|
||||
monkeypatch.setattr("src.queue_worker.preflight.check",
|
||||
lambda *a, **k: (False, "down"))
|
||||
called = {"launch": False}
|
||||
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
|
||||
lambda job: called.__setitem__("launch", True))
|
||||
jid = enqueue_job("analyst", "r")
|
||||
QueueWorker(max_concurrency=1, poll_interval=0.01)._drain_once()
|
||||
assert called["launch"] is False
|
||||
assert get_job(jid)["status"] == "queued"
|
||||
assert count_running_jobs() == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# B. Error classifier
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestClassifier:
|
||||
@pytest.mark.parametrize("text", [
|
||||
"Error: 429 Too Many Requests",
|
||||
"anthropic rate limit exceeded",
|
||||
"overloaded_error: server is overloaded",
|
||||
"API quota exhausted",
|
||||
"503 Service Unavailable",
|
||||
"connection reset by peer",
|
||||
])
|
||||
def test_transient_patterns(self, text):
|
||||
assert classify_text(text) == "transient"
|
||||
|
||||
@pytest.mark.parametrize("text", [
|
||||
"Traceback: KeyError 'foo'",
|
||||
"SyntaxError: invalid syntax",
|
||||
"assertion failed in test",
|
||||
"",
|
||||
])
|
||||
def test_permanent_patterns(self, text):
|
||||
assert classify_text(text) == "permanent"
|
||||
|
||||
def test_retry_after_header(self):
|
||||
assert parse_retry_after("HTTP/1.1 429\nRetry-After: 42\n") == 42
|
||||
|
||||
def test_retry_after_json(self):
|
||||
assert parse_retry_after('{"error":{"type":"rate_limit","retry_after": 7}}') == 7
|
||||
|
||||
def test_retry_after_absent(self):
|
||||
assert parse_retry_after("just an error") is None
|
||||
|
||||
def test_classify_log_file(self, tmp_path):
|
||||
p = tmp_path / "run.log"
|
||||
p.write_text("...lots of output...\n429 rate limit. Retry-After: 30\n")
|
||||
kind, ra = classify_log_file(str(p))
|
||||
assert kind == "transient"
|
||||
assert ra == 30
|
||||
|
||||
def test_classify_missing_file_is_permanent(self):
|
||||
kind, ra = classify_log_file("/no/such/log")
|
||||
assert kind == "permanent"
|
||||
assert ra is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# C. Backoff + available_at gating
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestBackoff:
|
||||
def test_backoff_grows_exponentially(self):
|
||||
lr = AgentLauncher()
|
||||
# base=10, cap=600 (defaults)
|
||||
b1 = lr._backoff_seconds(1)
|
||||
b2 = lr._backoff_seconds(2)
|
||||
b3 = lr._backoff_seconds(3)
|
||||
assert b1 == 20 # 2^1*10
|
||||
assert b2 == 40 # 2^2*10
|
||||
assert b3 == 80 # 2^3*10
|
||||
assert b2 > b1 and b3 > b2
|
||||
|
||||
def test_backoff_capped(self):
|
||||
lr = AgentLauncher()
|
||||
assert lr._backoff_seconds(20) == 600 # capped at backoff_max_seconds
|
||||
|
||||
def test_retry_after_respected_when_larger(self):
|
||||
lr = AgentLauncher()
|
||||
# transient_attempts=1 -> base backoff 20; Retry-After=120 wins.
|
||||
assert lr._backoff_seconds(1, retry_after=120) == 120
|
||||
|
||||
def test_retry_after_ignored_when_smaller(self):
|
||||
lr = AgentLauncher()
|
||||
assert lr._backoff_seconds(3, retry_after=5) == 80 # backoff bigger
|
||||
|
||||
def test_transient_requeue_sets_future_available_at_and_claim_skips(self):
|
||||
jid = enqueue_job("developer", "r")
|
||||
claim_next_job()
|
||||
# Big backoff -> available_at far in the future.
|
||||
mark_job_transient(jid, 3600, error="429")
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "queued"
|
||||
assert job["transient_attempts"] == 1
|
||||
assert job["available_at"] is not None
|
||||
# claim must NOT pick it up while available_at is in the future.
|
||||
assert claim_next_job() is None
|
||||
|
||||
def test_transient_requeue_claimable_when_due(self):
|
||||
jid = enqueue_job("developer", "r")
|
||||
claim_next_job()
|
||||
mark_job_transient(jid, -5, error="429") # available_at in the past
|
||||
c = claim_next_job()
|
||||
assert c is not None and c["id"] == jid
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# D. Launcher transient/permanent finalize (no Popen)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestFinalizeClassified:
|
||||
def test_transient_failure_backoff_requeue(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||
log = tmp_path / "1.log"
|
||||
log.write_text("Error 429 rate limit exceeded\n")
|
||||
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||
claim_next_job()
|
||||
AgentLauncher()._finalize_job(jid, "developer", run_id=1, exit_code=1,
|
||||
output_path=str(log))
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "queued"
|
||||
assert job["transient_attempts"] == 1
|
||||
assert job["available_at"] is not None # backoff-gated
|
||||
assert job["attempts"] == 1 # code-fault budget NOT burned
|
||||
|
||||
def test_permanent_failure_uses_normal_attempts(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||
log = tmp_path / "2.log"
|
||||
log.write_text("Traceback: ValueError\n")
|
||||
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||
claim_next_job()
|
||||
AgentLauncher()._finalize_job(jid, "developer", run_id=2, exit_code=1,
|
||||
output_path=str(log))
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "queued"
|
||||
assert job["transient_attempts"] == 0 # not transient
|
||||
assert job["available_at"] is None # no backoff for code-fault
|
||||
|
||||
def test_transient_exhausts_to_failed(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||
monkeypatch.setattr(db.settings, "transient_max_attempts", 2)
|
||||
log = tmp_path / "3.log"
|
||||
log.write_text("overloaded_error\n")
|
||||
lr = AgentLauncher()
|
||||
jid = enqueue_job("developer", "r")
|
||||
claim_next_job()
|
||||
lr._finalize_job(jid, "developer", 1, exit_code=1, output_path=str(log))
|
||||
assert get_job(jid)["status"] == "queued" # transient 1 -> requeue
|
||||
# force claimable and retry
|
||||
mark_job_transient(jid, -1) # makes it due; transient=2 now
|
||||
claim_next_job()
|
||||
lr._finalize_job(jid, "developer", 2, exit_code=1, output_path=str(log))
|
||||
assert get_job(jid)["status"] == "failed" # transient budget exhausted
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# E. Circuit breaker
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestCircuitBreaker:
|
||||
def test_opens_after_threshold(self):
|
||||
cb = CircuitBreaker(threshold=3, pause_seconds=300)
|
||||
assert cb.allow_claim() is True
|
||||
cb.record_transient()
|
||||
cb.record_transient()
|
||||
assert cb.state == "closed"
|
||||
cb.record_transient() # 3rd -> open
|
||||
assert cb.state == "open"
|
||||
assert cb.allow_claim() is False # paused, no CLI calls
|
||||
|
||||
def test_recovered_resets_streak(self):
|
||||
cb = CircuitBreaker(threshold=3)
|
||||
cb.record_transient()
|
||||
cb.record_transient()
|
||||
cb.record_recovered()
|
||||
assert cb.consecutive_transient == 0
|
||||
assert cb.state == "closed"
|
||||
|
||||
def test_half_open_after_pause_then_closed_on_success(self, monkeypatch):
|
||||
cb = CircuitBreaker(threshold=2, pause_seconds=300)
|
||||
cb.record_transient()
|
||||
cb.record_transient() # open
|
||||
assert cb.state == "open"
|
||||
# Simulate the pause elapsing.
|
||||
cb.opened_at -= 301
|
||||
assert cb.allow_claim() is True # -> half-open (probe)
|
||||
assert cb.state == "half-open"
|
||||
cb.record_recovered() # probe succeeded
|
||||
assert cb.state == "closed"
|
||||
|
||||
def test_half_open_reopens_on_transient(self):
|
||||
cb = CircuitBreaker(threshold=2, pause_seconds=300)
|
||||
cb.record_transient(); cb.record_transient() # open
|
||||
cb.opened_at -= 301
|
||||
cb.allow_claim() # half-open
|
||||
assert cb.state == "half-open"
|
||||
cb.record_transient() # probe failed -> re-open
|
||||
assert cb.state == "open"
|
||||
|
||||
def test_breaker_blocks_worker_claim(self, monkeypatch):
|
||||
monkeypatch.setattr("src.queue_worker.preflight.check",
|
||||
lambda *a, **k: (True, "ok"))
|
||||
called = {"launch": False}
|
||||
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
|
||||
lambda job: called.__setitem__("launch", True))
|
||||
cb = CircuitBreaker(threshold=1, pause_seconds=300)
|
||||
cb.record_transient() # open immediately
|
||||
w = QueueWorker(max_concurrency=1, poll_interval=0.01, breaker=cb)
|
||||
enqueue_job("analyst", "r")
|
||||
w._drain_once()
|
||||
assert called["launch"] is False # breaker open -> no claim, no CLI
|
||||
Reference in New Issue
Block a user