Files
orchestrator/tests/test_resilience.py
Dev Agent a613fd8180 test(resilience): 34 tests for preflight/classifier/backoff/breaker (ORCH-1)
Covers preflight FAIL->queued + cache, transient/permanent classifier +
Retry-After, exp backoff + available_at gating, launcher transient vs permanent
finalize, circuit breaker open/half-open/closed. test_queue worker tests stub
preflight OK. Popen never spawned.
2026-06-03 00:12:17 +03:00

296 lines
12 KiB
Python

"""ORCH-1 resilience tests: preflight, 429-classifier, backoff, circuit breaker.
No real claude/Popen is ever spawned: preflight subprocess and launcher.launch_job
are mocked. DB is a fresh per-test sqlite file.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_resilience.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
import src.db as db
from src.db import (
init_db, enqueue_job, claim_next_job, get_job, count_running_jobs,
mark_job_transient,
)
from src import preflight, error_classifier
from src.error_classifier import classify_text, parse_retry_after, classify_log_file
from src.queue_worker import QueueWorker, CircuitBreaker
from src.agents.launcher import AgentLauncher
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
monkeypatch.setattr(db.settings, "db_path", str(tmp_path / "res.db"))
init_db()
preflight.reset_cache()
yield
# ---------------------------------------------------------------------------
# A. Preflight
# ---------------------------------------------------------------------------
class TestPreflight:
def test_fail_when_bin_missing(self, monkeypatch):
monkeypatch.setattr(preflight, "_claude_bin", lambda: "/no/such/claude")
ok, reason = preflight.check(force=True)
assert ok is False
assert "not found" in reason.lower()
def test_ok_when_version_succeeds(self, monkeypatch, tmp_path):
fake_bin = tmp_path / "claude"
fake_bin.write_text("#!/bin/sh\necho v1\n")
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
monkeypatch.setattr(preflight, "_run_version", lambda b: (True, "1.2.3"))
ok, reason = preflight.check(force=True)
assert ok is True
def test_cache_does_not_recheck_within_ttl(self, monkeypatch, tmp_path):
fake_bin = tmp_path / "claude"
fake_bin.write_text("x")
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
monkeypatch.setattr(db.settings, "preflight_cache_ttl", 999)
calls = {"n": 0}
def counting_version(b):
calls["n"] += 1
return True, "ok"
monkeypatch.setattr(preflight, "_run_version", counting_version)
preflight.reset_cache()
preflight.check() # first -> runs version
preflight.check() # cached -> no extra version call
preflight.check()
assert calls["n"] == 1
def test_force_bypasses_cache(self, monkeypatch, tmp_path):
fake_bin = tmp_path / "claude"
fake_bin.write_text("x")
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
calls = {"n": 0}
monkeypatch.setattr(preflight, "_run_version",
lambda b: (calls.__setitem__("n", calls["n"] + 1), (True, "ok"))[1])
preflight.reset_cache()
preflight.check()
preflight.check(force=True)
assert calls["n"] == 2
def test_worker_does_not_claim_when_preflight_fails(self, monkeypatch):
# Preflight FAIL -> job stays queued, launch_job never called.
monkeypatch.setattr("src.queue_worker.preflight.check",
lambda *a, **k: (False, "down"))
called = {"launch": False}
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
lambda job: called.__setitem__("launch", True))
jid = enqueue_job("analyst", "r")
QueueWorker(max_concurrency=1, poll_interval=0.01)._drain_once()
assert called["launch"] is False
assert get_job(jid)["status"] == "queued"
assert count_running_jobs() == 0
# ---------------------------------------------------------------------------
# B. Error classifier
# ---------------------------------------------------------------------------
class TestClassifier:
@pytest.mark.parametrize("text", [
"Error: 429 Too Many Requests",
"anthropic rate limit exceeded",
"overloaded_error: server is overloaded",
"API quota exhausted",
"503 Service Unavailable",
"connection reset by peer",
])
def test_transient_patterns(self, text):
assert classify_text(text) == "transient"
@pytest.mark.parametrize("text", [
"Traceback: KeyError 'foo'",
"SyntaxError: invalid syntax",
"assertion failed in test",
"",
])
def test_permanent_patterns(self, text):
assert classify_text(text) == "permanent"
def test_retry_after_header(self):
assert parse_retry_after("HTTP/1.1 429\nRetry-After: 42\n") == 42
def test_retry_after_json(self):
assert parse_retry_after('{"error":{"type":"rate_limit","retry_after": 7}}') == 7
def test_retry_after_absent(self):
assert parse_retry_after("just an error") is None
def test_classify_log_file(self, tmp_path):
p = tmp_path / "run.log"
p.write_text("...lots of output...\n429 rate limit. Retry-After: 30\n")
kind, ra = classify_log_file(str(p))
assert kind == "transient"
assert ra == 30
def test_classify_missing_file_is_permanent(self):
kind, ra = classify_log_file("/no/such/log")
assert kind == "permanent"
assert ra is None
# ---------------------------------------------------------------------------
# C. Backoff + available_at gating
# ---------------------------------------------------------------------------
class TestBackoff:
def test_backoff_grows_exponentially(self):
lr = AgentLauncher()
# base=10, cap=600 (defaults)
b1 = lr._backoff_seconds(1)
b2 = lr._backoff_seconds(2)
b3 = lr._backoff_seconds(3)
assert b1 == 20 # 2^1*10
assert b2 == 40 # 2^2*10
assert b3 == 80 # 2^3*10
assert b2 > b1 and b3 > b2
def test_backoff_capped(self):
lr = AgentLauncher()
assert lr._backoff_seconds(20) == 600 # capped at backoff_max_seconds
def test_retry_after_respected_when_larger(self):
lr = AgentLauncher()
# transient_attempts=1 -> base backoff 20; Retry-After=120 wins.
assert lr._backoff_seconds(1, retry_after=120) == 120
def test_retry_after_ignored_when_smaller(self):
lr = AgentLauncher()
assert lr._backoff_seconds(3, retry_after=5) == 80 # backoff bigger
def test_transient_requeue_sets_future_available_at_and_claim_skips(self):
jid = enqueue_job("developer", "r")
claim_next_job()
# Big backoff -> available_at far in the future.
mark_job_transient(jid, 3600, error="429")
job = get_job(jid)
assert job["status"] == "queued"
assert job["transient_attempts"] == 1
assert job["available_at"] is not None
# claim must NOT pick it up while available_at is in the future.
assert claim_next_job() is None
def test_transient_requeue_claimable_when_due(self):
jid = enqueue_job("developer", "r")
claim_next_job()
mark_job_transient(jid, -5, error="429") # available_at in the past
c = claim_next_job()
assert c is not None and c["id"] == jid
# ---------------------------------------------------------------------------
# D. Launcher transient/permanent finalize (no Popen)
# ---------------------------------------------------------------------------
class TestFinalizeClassified:
def test_transient_failure_backoff_requeue(self, tmp_path, monkeypatch):
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
log = tmp_path / "1.log"
log.write_text("Error 429 rate limit exceeded\n")
jid = enqueue_job("developer", "r", max_attempts=2)
claim_next_job()
AgentLauncher()._finalize_job(jid, "developer", run_id=1, exit_code=1,
output_path=str(log))
job = get_job(jid)
assert job["status"] == "queued"
assert job["transient_attempts"] == 1
assert job["available_at"] is not None # backoff-gated
assert job["attempts"] == 1 # code-fault budget NOT burned
def test_permanent_failure_uses_normal_attempts(self, tmp_path, monkeypatch):
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
log = tmp_path / "2.log"
log.write_text("Traceback: ValueError\n")
jid = enqueue_job("developer", "r", max_attempts=2)
claim_next_job()
AgentLauncher()._finalize_job(jid, "developer", run_id=2, exit_code=1,
output_path=str(log))
job = get_job(jid)
assert job["status"] == "queued"
assert job["transient_attempts"] == 0 # not transient
assert job["available_at"] is None # no backoff for code-fault
def test_transient_exhausts_to_failed(self, tmp_path, monkeypatch):
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
monkeypatch.setattr(db.settings, "transient_max_attempts", 2)
log = tmp_path / "3.log"
log.write_text("overloaded_error\n")
lr = AgentLauncher()
jid = enqueue_job("developer", "r")
claim_next_job()
lr._finalize_job(jid, "developer", 1, exit_code=1, output_path=str(log))
assert get_job(jid)["status"] == "queued" # transient 1 -> requeue
# force claimable and retry
mark_job_transient(jid, -1) # makes it due; transient=2 now
claim_next_job()
lr._finalize_job(jid, "developer", 2, exit_code=1, output_path=str(log))
assert get_job(jid)["status"] == "failed" # transient budget exhausted
# ---------------------------------------------------------------------------
# E. Circuit breaker
# ---------------------------------------------------------------------------
class TestCircuitBreaker:
def test_opens_after_threshold(self):
cb = CircuitBreaker(threshold=3, pause_seconds=300)
assert cb.allow_claim() is True
cb.record_transient()
cb.record_transient()
assert cb.state == "closed"
cb.record_transient() # 3rd -> open
assert cb.state == "open"
assert cb.allow_claim() is False # paused, no CLI calls
def test_recovered_resets_streak(self):
cb = CircuitBreaker(threshold=3)
cb.record_transient()
cb.record_transient()
cb.record_recovered()
assert cb.consecutive_transient == 0
assert cb.state == "closed"
def test_half_open_after_pause_then_closed_on_success(self, monkeypatch):
cb = CircuitBreaker(threshold=2, pause_seconds=300)
cb.record_transient()
cb.record_transient() # open
assert cb.state == "open"
# Simulate the pause elapsing.
cb.opened_at -= 301
assert cb.allow_claim() is True # -> half-open (probe)
assert cb.state == "half-open"
cb.record_recovered() # probe succeeded
assert cb.state == "closed"
def test_half_open_reopens_on_transient(self):
cb = CircuitBreaker(threshold=2, pause_seconds=300)
cb.record_transient(); cb.record_transient() # open
cb.opened_at -= 301
cb.allow_claim() # half-open
assert cb.state == "half-open"
cb.record_transient() # probe failed -> re-open
assert cb.state == "open"
def test_breaker_blocks_worker_claim(self, monkeypatch):
monkeypatch.setattr("src.queue_worker.preflight.check",
lambda *a, **k: (True, "ok"))
called = {"launch": False}
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
lambda job: called.__setitem__("launch", True))
cb = CircuitBreaker(threshold=1, pause_seconds=300)
cb.record_transient() # open immediately
w = QueueWorker(max_concurrency=1, poll_interval=0.01, breaker=cb)
enqueue_job("analyst", "r")
w._drain_once()
assert called["launch"] is False # breaker open -> no claim, no CLI