From a613fd81808b5aa8bab4b86ee8440ca8f59028e7 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 00:12:17 +0300 Subject: [PATCH] test(resilience): 34 tests for preflight/classifier/backoff/breaker (ORCH-1) Covers preflight FAIL->queued + cache, transient/permanent classifier + Retry-After, exp backoff + available_at gating, launcher transient vs permanent finalize, circuit breaker open/half-open/closed. test_queue worker tests stub preflight OK. Popen never spawned. --- tests/test_queue.py | 6 + tests/test_resilience.py | 295 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 301 insertions(+) create mode 100644 tests/test_resilience.py diff --git a/tests/test_queue.py b/tests/test_queue.py index d3f1536..f6342e8 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -236,6 +236,12 @@ class TestObservability: # QueueWorker max_concurrency (launch_job fully mocked — no real Popen) # --------------------------------------------------------------------------- class TestWorkerConcurrency: + @pytest.fixture(autouse=True) + def _ok_preflight(self, monkeypatch): + # ORCH-1 resilience: the worker gates claims behind preflight; in tests there + # is no claude binary, so stub preflight OK to exercise pure queue/concurrency. + monkeypatch.setattr("src.queue_worker.preflight.check", lambda *a, **k: (True, "ok")) + def test_worker_respects_max_concurrency(self, monkeypatch): from src.queue_worker import QueueWorker diff --git a/tests/test_resilience.py b/tests/test_resilience.py new file mode 100644 index 0000000..1d72117 --- /dev/null +++ b/tests/test_resilience.py @@ -0,0 +1,295 @@ +"""ORCH-1 resilience tests: preflight, 429-classifier, backoff, circuit breaker. + +No real claude/Popen is ever spawned: preflight subprocess and launcher.launch_job +are mocked. DB is a fresh per-test sqlite file. +""" +import os +import tempfile + +import pytest + +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_resilience.db") +os.environ["ORCH_DB_PATH"] = _test_db +os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() +os.environ["ORCH_GITEA_TOKEN"] = "test-token" +os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" + +import src.db as db +from src.db import ( + init_db, enqueue_job, claim_next_job, get_job, count_running_jobs, + mark_job_transient, +) +from src import preflight, error_classifier +from src.error_classifier import classify_text, parse_retry_after, classify_log_file +from src.queue_worker import QueueWorker, CircuitBreaker +from src.agents.launcher import AgentLauncher + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + monkeypatch.setattr(db.settings, "db_path", str(tmp_path / "res.db")) + init_db() + preflight.reset_cache() + yield + + +# --------------------------------------------------------------------------- +# A. Preflight +# --------------------------------------------------------------------------- +class TestPreflight: + def test_fail_when_bin_missing(self, monkeypatch): + monkeypatch.setattr(preflight, "_claude_bin", lambda: "/no/such/claude") + ok, reason = preflight.check(force=True) + assert ok is False + assert "not found" in reason.lower() + + def test_ok_when_version_succeeds(self, monkeypatch, tmp_path): + fake_bin = tmp_path / "claude" + fake_bin.write_text("#!/bin/sh\necho v1\n") + monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin)) + monkeypatch.setattr(preflight, "_run_version", lambda b: (True, "1.2.3")) + ok, reason = preflight.check(force=True) + assert ok is True + + def test_cache_does_not_recheck_within_ttl(self, monkeypatch, tmp_path): + fake_bin = tmp_path / "claude" + fake_bin.write_text("x") + monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin)) + monkeypatch.setattr(db.settings, "preflight_cache_ttl", 999) + + calls = {"n": 0} + + def counting_version(b): + calls["n"] += 1 + return True, "ok" + + monkeypatch.setattr(preflight, "_run_version", counting_version) + preflight.reset_cache() + preflight.check() # first -> runs version + preflight.check() # cached -> no extra version call + preflight.check() + assert calls["n"] == 1 + + def test_force_bypasses_cache(self, monkeypatch, tmp_path): + fake_bin = tmp_path / "claude" + fake_bin.write_text("x") + monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin)) + calls = {"n": 0} + monkeypatch.setattr(preflight, "_run_version", + lambda b: (calls.__setitem__("n", calls["n"] + 1), (True, "ok"))[1]) + preflight.reset_cache() + preflight.check() + preflight.check(force=True) + assert calls["n"] == 2 + + def test_worker_does_not_claim_when_preflight_fails(self, monkeypatch): + # Preflight FAIL -> job stays queued, launch_job never called. + monkeypatch.setattr("src.queue_worker.preflight.check", + lambda *a, **k: (False, "down")) + called = {"launch": False} + monkeypatch.setattr("src.queue_worker.launcher.launch_job", + lambda job: called.__setitem__("launch", True)) + jid = enqueue_job("analyst", "r") + QueueWorker(max_concurrency=1, poll_interval=0.01)._drain_once() + assert called["launch"] is False + assert get_job(jid)["status"] == "queued" + assert count_running_jobs() == 0 + + +# --------------------------------------------------------------------------- +# B. Error classifier +# --------------------------------------------------------------------------- +class TestClassifier: + @pytest.mark.parametrize("text", [ + "Error: 429 Too Many Requests", + "anthropic rate limit exceeded", + "overloaded_error: server is overloaded", + "API quota exhausted", + "503 Service Unavailable", + "connection reset by peer", + ]) + def test_transient_patterns(self, text): + assert classify_text(text) == "transient" + + @pytest.mark.parametrize("text", [ + "Traceback: KeyError 'foo'", + "SyntaxError: invalid syntax", + "assertion failed in test", + "", + ]) + def test_permanent_patterns(self, text): + assert classify_text(text) == "permanent" + + def test_retry_after_header(self): + assert parse_retry_after("HTTP/1.1 429\nRetry-After: 42\n") == 42 + + def test_retry_after_json(self): + assert parse_retry_after('{"error":{"type":"rate_limit","retry_after": 7}}') == 7 + + def test_retry_after_absent(self): + assert parse_retry_after("just an error") is None + + def test_classify_log_file(self, tmp_path): + p = tmp_path / "run.log" + p.write_text("...lots of output...\n429 rate limit. Retry-After: 30\n") + kind, ra = classify_log_file(str(p)) + assert kind == "transient" + assert ra == 30 + + def test_classify_missing_file_is_permanent(self): + kind, ra = classify_log_file("/no/such/log") + assert kind == "permanent" + assert ra is None + + +# --------------------------------------------------------------------------- +# C. Backoff + available_at gating +# --------------------------------------------------------------------------- +class TestBackoff: + def test_backoff_grows_exponentially(self): + lr = AgentLauncher() + # base=10, cap=600 (defaults) + b1 = lr._backoff_seconds(1) + b2 = lr._backoff_seconds(2) + b3 = lr._backoff_seconds(3) + assert b1 == 20 # 2^1*10 + assert b2 == 40 # 2^2*10 + assert b3 == 80 # 2^3*10 + assert b2 > b1 and b3 > b2 + + def test_backoff_capped(self): + lr = AgentLauncher() + assert lr._backoff_seconds(20) == 600 # capped at backoff_max_seconds + + def test_retry_after_respected_when_larger(self): + lr = AgentLauncher() + # transient_attempts=1 -> base backoff 20; Retry-After=120 wins. + assert lr._backoff_seconds(1, retry_after=120) == 120 + + def test_retry_after_ignored_when_smaller(self): + lr = AgentLauncher() + assert lr._backoff_seconds(3, retry_after=5) == 80 # backoff bigger + + def test_transient_requeue_sets_future_available_at_and_claim_skips(self): + jid = enqueue_job("developer", "r") + claim_next_job() + # Big backoff -> available_at far in the future. + mark_job_transient(jid, 3600, error="429") + job = get_job(jid) + assert job["status"] == "queued" + assert job["transient_attempts"] == 1 + assert job["available_at"] is not None + # claim must NOT pick it up while available_at is in the future. + assert claim_next_job() is None + + def test_transient_requeue_claimable_when_due(self): + jid = enqueue_job("developer", "r") + claim_next_job() + mark_job_transient(jid, -5, error="429") # available_at in the past + c = claim_next_job() + assert c is not None and c["id"] == jid + + +# --------------------------------------------------------------------------- +# D. Launcher transient/permanent finalize (no Popen) +# --------------------------------------------------------------------------- +class TestFinalizeClassified: + def test_transient_failure_backoff_requeue(self, tmp_path, monkeypatch): + monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) + log = tmp_path / "1.log" + log.write_text("Error 429 rate limit exceeded\n") + jid = enqueue_job("developer", "r", max_attempts=2) + claim_next_job() + AgentLauncher()._finalize_job(jid, "developer", run_id=1, exit_code=1, + output_path=str(log)) + job = get_job(jid) + assert job["status"] == "queued" + assert job["transient_attempts"] == 1 + assert job["available_at"] is not None # backoff-gated + assert job["attempts"] == 1 # code-fault budget NOT burned + + def test_permanent_failure_uses_normal_attempts(self, tmp_path, monkeypatch): + monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) + log = tmp_path / "2.log" + log.write_text("Traceback: ValueError\n") + jid = enqueue_job("developer", "r", max_attempts=2) + claim_next_job() + AgentLauncher()._finalize_job(jid, "developer", run_id=2, exit_code=1, + output_path=str(log)) + job = get_job(jid) + assert job["status"] == "queued" + assert job["transient_attempts"] == 0 # not transient + assert job["available_at"] is None # no backoff for code-fault + + def test_transient_exhausts_to_failed(self, tmp_path, monkeypatch): + monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) + monkeypatch.setattr(db.settings, "transient_max_attempts", 2) + log = tmp_path / "3.log" + log.write_text("overloaded_error\n") + lr = AgentLauncher() + jid = enqueue_job("developer", "r") + claim_next_job() + lr._finalize_job(jid, "developer", 1, exit_code=1, output_path=str(log)) + assert get_job(jid)["status"] == "queued" # transient 1 -> requeue + # force claimable and retry + mark_job_transient(jid, -1) # makes it due; transient=2 now + claim_next_job() + lr._finalize_job(jid, "developer", 2, exit_code=1, output_path=str(log)) + assert get_job(jid)["status"] == "failed" # transient budget exhausted + + +# --------------------------------------------------------------------------- +# E. Circuit breaker +# --------------------------------------------------------------------------- +class TestCircuitBreaker: + def test_opens_after_threshold(self): + cb = CircuitBreaker(threshold=3, pause_seconds=300) + assert cb.allow_claim() is True + cb.record_transient() + cb.record_transient() + assert cb.state == "closed" + cb.record_transient() # 3rd -> open + assert cb.state == "open" + assert cb.allow_claim() is False # paused, no CLI calls + + def test_recovered_resets_streak(self): + cb = CircuitBreaker(threshold=3) + cb.record_transient() + cb.record_transient() + cb.record_recovered() + assert cb.consecutive_transient == 0 + assert cb.state == "closed" + + def test_half_open_after_pause_then_closed_on_success(self, monkeypatch): + cb = CircuitBreaker(threshold=2, pause_seconds=300) + cb.record_transient() + cb.record_transient() # open + assert cb.state == "open" + # Simulate the pause elapsing. + cb.opened_at -= 301 + assert cb.allow_claim() is True # -> half-open (probe) + assert cb.state == "half-open" + cb.record_recovered() # probe succeeded + assert cb.state == "closed" + + def test_half_open_reopens_on_transient(self): + cb = CircuitBreaker(threshold=2, pause_seconds=300) + cb.record_transient(); cb.record_transient() # open + cb.opened_at -= 301 + cb.allow_claim() # half-open + assert cb.state == "half-open" + cb.record_transient() # probe failed -> re-open + assert cb.state == "open" + + def test_breaker_blocks_worker_claim(self, monkeypatch): + monkeypatch.setattr("src.queue_worker.preflight.check", + lambda *a, **k: (True, "ok")) + called = {"launch": False} + monkeypatch.setattr("src.queue_worker.launcher.launch_job", + lambda job: called.__setitem__("launch", True)) + cb = CircuitBreaker(threshold=1, pause_seconds=300) + cb.record_transient() # open immediately + w = QueueWorker(max_concurrency=1, poll_interval=0.01, breaker=cb) + enqueue_job("analyst", "r") + w._drain_once() + assert called["launch"] is False # breaker open -> no claim, no CLI