"""ORCH-1 resilience tests: preflight, 429-classifier, backoff, circuit breaker. No real claude/Popen is ever spawned: preflight subprocess and launcher.launch_job are mocked. DB is a fresh per-test sqlite file. """ import os import tempfile import pytest _test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_resilience.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ["ORCH_GITEA_TOKEN"] = "test-token" os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" import src.db as db from src.db import ( init_db, enqueue_job, claim_next_job, get_job, count_running_jobs, mark_job_transient, ) from src import preflight, error_classifier from src.error_classifier import classify_text, parse_retry_after, classify_log_file from src.queue_worker import QueueWorker, CircuitBreaker from src.agents.launcher import AgentLauncher @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): monkeypatch.setattr(db.settings, "db_path", str(tmp_path / "res.db")) init_db() preflight.reset_cache() yield # --------------------------------------------------------------------------- # A. Preflight # --------------------------------------------------------------------------- class TestPreflight: @pytest.fixture(autouse=True) def _isolate_auth_gate(self, monkeypatch): # ORCH-044: preflight.check() also runs a token-free auth gate reading # /.claude/.credentials.json (AgentLauncher.AGENT_HOME, not the # process HOME). In a clean CI runner those creds are absent, so the gate # returns (False, ...) and version-branch assertions would fail for purely # environmental reasons. Stub the gate green; auth is covered by # tests/test_preflight_auth.py. Production default (preflight_check_auth=True) # is unchanged. monkeypatch.setattr(preflight, "_check_auth", lambda: (True, "auth ok (test stub)")) def test_fail_when_bin_missing(self, monkeypatch): monkeypatch.setattr(preflight, "_claude_bin", lambda: "/no/such/claude") ok, reason = preflight.check(force=True) assert ok is False assert "not found" in reason.lower() def test_ok_when_version_succeeds(self, monkeypatch, tmp_path): fake_bin = tmp_path / "claude" fake_bin.write_text("#!/bin/sh\necho v1\n") monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin)) monkeypatch.setattr(preflight, "_run_version", lambda b: (True, "1.2.3")) ok, reason = preflight.check(force=True) assert ok is True def test_cache_does_not_recheck_within_ttl(self, monkeypatch, tmp_path): fake_bin = tmp_path / "claude" fake_bin.write_text("x") monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin)) monkeypatch.setattr(db.settings, "preflight_cache_ttl", 999) calls = {"n": 0} def counting_version(b): calls["n"] += 1 return True, "ok" monkeypatch.setattr(preflight, "_run_version", counting_version) preflight.reset_cache() preflight.check() # first -> runs version preflight.check() # cached -> no extra version call preflight.check() assert calls["n"] == 1 def test_force_bypasses_cache(self, monkeypatch, tmp_path): fake_bin = tmp_path / "claude" fake_bin.write_text("x") monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin)) calls = {"n": 0} monkeypatch.setattr(preflight, "_run_version", lambda b: (calls.__setitem__("n", calls["n"] + 1), (True, "ok"))[1]) preflight.reset_cache() preflight.check() preflight.check(force=True) assert calls["n"] == 2 def test_worker_does_not_claim_when_preflight_fails(self, monkeypatch): # Preflight FAIL -> job stays queued, launch_job never called. monkeypatch.setattr("src.queue_worker.preflight.check", lambda *a, **k: (False, "down")) called = {"launch": False} monkeypatch.setattr("src.queue_worker.launcher.launch_job", lambda job: called.__setitem__("launch", True)) jid = enqueue_job("analyst", "r") QueueWorker(max_concurrency=1, poll_interval=0.01)._drain_once() assert called["launch"] is False assert get_job(jid)["status"] == "queued" assert count_running_jobs() == 0 # --------------------------------------------------------------------------- # B. Error classifier # --------------------------------------------------------------------------- class TestClassifier: @pytest.mark.parametrize("text", [ "Error: 429 Too Many Requests", "anthropic rate limit exceeded", "overloaded_error: server is overloaded", "API quota exhausted", "503 Service Unavailable", "connection reset by peer", ]) def test_transient_patterns(self, text): assert classify_text(text) == "transient" @pytest.mark.parametrize("text", [ "Traceback: KeyError 'foo'", "SyntaxError: invalid syntax", "assertion failed in test", "", ]) def test_permanent_patterns(self, text): assert classify_text(text) == "permanent" def test_retry_after_header(self): assert parse_retry_after("HTTP/1.1 429\nRetry-After: 42\n") == 42 def test_retry_after_json(self): assert parse_retry_after('{"error":{"type":"rate_limit","retry_after": 7}}') == 7 def test_retry_after_absent(self): assert parse_retry_after("just an error") is None def test_classify_log_file(self, tmp_path): p = tmp_path / "run.log" p.write_text("...lots of output...\n429 rate limit. Retry-After: 30\n") kind, ra = classify_log_file(str(p)) assert kind == "transient" assert ra == 30 def test_classify_missing_file_is_permanent(self): kind, ra = classify_log_file("/no/such/log") assert kind == "permanent" assert ra is None # --------------------------------------------------------------------------- # C. Backoff + available_at gating # --------------------------------------------------------------------------- class TestBackoff: def test_backoff_grows_exponentially(self): lr = AgentLauncher() # base=10, cap=600 (defaults) b1 = lr._backoff_seconds(1) b2 = lr._backoff_seconds(2) b3 = lr._backoff_seconds(3) assert b1 == 20 # 2^1*10 assert b2 == 40 # 2^2*10 assert b3 == 80 # 2^3*10 assert b2 > b1 and b3 > b2 def test_backoff_capped(self): lr = AgentLauncher() assert lr._backoff_seconds(20) == 600 # capped at backoff_max_seconds def test_retry_after_respected_when_larger(self): lr = AgentLauncher() # transient_attempts=1 -> base backoff 20; Retry-After=120 wins. assert lr._backoff_seconds(1, retry_after=120) == 120 def test_retry_after_ignored_when_smaller(self): lr = AgentLauncher() assert lr._backoff_seconds(3, retry_after=5) == 80 # backoff bigger def test_transient_requeue_sets_future_available_at_and_claim_skips(self): jid = enqueue_job("developer", "r") claim_next_job() # Big backoff -> available_at far in the future. mark_job_transient(jid, 3600, error="429") job = get_job(jid) assert job["status"] == "queued" assert job["transient_attempts"] == 1 assert job["available_at"] is not None # claim must NOT pick it up while available_at is in the future. assert claim_next_job() is None def test_transient_requeue_claimable_when_due(self): jid = enqueue_job("developer", "r") claim_next_job() mark_job_transient(jid, -5, error="429") # available_at in the past c = claim_next_job() assert c is not None and c["id"] == jid # --------------------------------------------------------------------------- # D. Launcher transient/permanent finalize (no Popen) # --------------------------------------------------------------------------- class TestFinalizeClassified: def test_transient_failure_backoff_requeue(self, tmp_path, monkeypatch): monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) log = tmp_path / "1.log" log.write_text("Error 429 rate limit exceeded\n") jid = enqueue_job("developer", "r", max_attempts=2) claim_next_job() AgentLauncher()._finalize_job(jid, "developer", run_id=1, exit_code=1, output_path=str(log)) job = get_job(jid) assert job["status"] == "queued" assert job["transient_attempts"] == 1 assert job["available_at"] is not None # backoff-gated assert job["attempts"] == 1 # code-fault budget NOT burned def test_permanent_failure_uses_normal_attempts(self, tmp_path, monkeypatch): monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) log = tmp_path / "2.log" log.write_text("Traceback: ValueError\n") jid = enqueue_job("developer", "r", max_attempts=2) claim_next_job() AgentLauncher()._finalize_job(jid, "developer", run_id=2, exit_code=1, output_path=str(log)) job = get_job(jid) assert job["status"] == "queued" assert job["transient_attempts"] == 0 # not transient assert job["available_at"] is None # no backoff for code-fault def test_transient_exhausts_to_failed(self, tmp_path, monkeypatch): monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) monkeypatch.setattr(db.settings, "transient_max_attempts", 2) log = tmp_path / "3.log" log.write_text("overloaded_error\n") lr = AgentLauncher() jid = enqueue_job("developer", "r") claim_next_job() lr._finalize_job(jid, "developer", 1, exit_code=1, output_path=str(log)) assert get_job(jid)["status"] == "queued" # transient 1 -> requeue # force claimable and retry mark_job_transient(jid, -1) # makes it due; transient=2 now claim_next_job() lr._finalize_job(jid, "developer", 2, exit_code=1, output_path=str(log)) assert get_job(jid)["status"] == "failed" # transient budget exhausted # --------------------------------------------------------------------------- # E. Circuit breaker # --------------------------------------------------------------------------- class TestCircuitBreaker: def test_opens_after_threshold(self): cb = CircuitBreaker(threshold=3, pause_seconds=300) assert cb.allow_claim() is True cb.record_transient() cb.record_transient() assert cb.state == "closed" cb.record_transient() # 3rd -> open assert cb.state == "open" assert cb.allow_claim() is False # paused, no CLI calls def test_recovered_resets_streak(self): cb = CircuitBreaker(threshold=3) cb.record_transient() cb.record_transient() cb.record_recovered() assert cb.consecutive_transient == 0 assert cb.state == "closed" def test_half_open_after_pause_then_closed_on_success(self, monkeypatch): cb = CircuitBreaker(threshold=2, pause_seconds=300) cb.record_transient() cb.record_transient() # open assert cb.state == "open" # Simulate the pause elapsing. cb.opened_at -= 301 assert cb.allow_claim() is True # -> half-open (probe) assert cb.state == "half-open" cb.record_recovered() # probe succeeded assert cb.state == "closed" def test_half_open_reopens_on_transient(self): cb = CircuitBreaker(threshold=2, pause_seconds=300) cb.record_transient(); cb.record_transient() # open cb.opened_at -= 301 cb.allow_claim() # half-open assert cb.state == "half-open" cb.record_transient() # probe failed -> re-open assert cb.state == "open" def test_breaker_blocks_worker_claim(self, monkeypatch): monkeypatch.setattr("src.queue_worker.preflight.check", lambda *a, **k: (True, "ok")) called = {"launch": False} monkeypatch.setattr("src.queue_worker.launcher.launch_job", lambda job: called.__setitem__("launch", True)) cb = CircuitBreaker(threshold=1, pause_seconds=300) cb.record_transient() # open immediately w = QueueWorker(max_concurrency=1, poll_interval=0.01, breaker=cb) enqueue_job("analyst", "r") w._drain_once() assert called["launch"] is False # breaker open -> no claim, no CLI