feat(resilience): cheap preflight + 429/transient error classifier (ORCH-1)

preflight.py: cached CLAUDE_BIN exists + claude --version (no tokens, no
prompt-ping). error_classifier.py: classify_log_file -> transient|permanent
from log tail + Retry-After parsing.
This commit is contained in:
Dev Agent
2026-06-03 00:12:17 +03:00
parent 0cd9b11fe0
commit 4ef87a3959
2 changed files with 177 additions and 0 deletions

87
src/error_classifier.py Normal file
View File

@@ -0,0 +1,87 @@
"""ORCH-1 resilience: classify an agent failure as transient vs permanent.
Rate limits / overload / network blips cannot be reliably predicted in advance,
so we classify *after the run* by scanning the agent's combined stdout/stderr log
(B-2 sends both to /app/data/runs/<run_id>.log).
- transient -> 429 / rate limit / overloaded / network / quota-exhausted etc.
=> backoff + transient retry (separate counter, larger budget).
- permanent -> a genuine code fault / agent error
=> normal attempts < max_attempts, then 'failed'.
Also extracts a Retry-After hint (seconds) when the server provided one.
"""
import re
# Case-insensitive substrings/patterns that signal a transient/rate-limit issue.
_TRANSIENT_PATTERNS = [
r"\b429\b",
r"rate[\s_-]*limit",
r"rate_limit_error",
r"overloaded",
r"overloaded_error",
r"too many requests",
r"quota",
r"insufficient[_\s-]*quota",
r"retry[\s-]*after",
r"service unavailable",
r"\b503\b",
r"\b529\b",
r"timed out",
r"timeout",
r"connection (reset|refused|error|aborted)",
r"temporarily unavailable",
r"econnreset",
r"etimedout",
]
_TRANSIENT_RE = re.compile("|".join(_TRANSIENT_PATTERNS), re.IGNORECASE)
# Retry-After: header style ("Retry-After: 30") or JSON ("retry_after": 30) or
# "retry after 30 seconds". Returns the integer seconds.
_RETRY_AFTER_RE = re.compile(
r"retry[\s_-]*after[\"']?\s*[:=]?\s*[\"']?\s*(\d+)",
re.IGNORECASE,
)
def classify_text(text: str) -> str:
"""Return 'transient' or 'permanent' for a chunk of log/stderr text."""
if not text:
return "permanent"
return "transient" if _TRANSIENT_RE.search(text) else "permanent"
def parse_retry_after(text: str) -> int | None:
"""Return Retry-After seconds if present in the text, else None."""
if not text:
return None
m = _RETRY_AFTER_RE.search(text)
if m:
try:
return int(m.group(1))
except (TypeError, ValueError):
return None
return None
def classify_log_file(path: str, tail_bytes: int = 16384) -> tuple[str, int | None]:
"""Classify the tail of a log file.
Reads the last `tail_bytes` of the log (rate-limit messages appear near the
end) and returns (classification, retry_after_seconds_or_None).
On any read error, treats it as 'permanent' (no special backoff).
"""
if not path:
return "permanent", None
try:
with open(path, "rb") as f:
try:
f.seek(-tail_bytes, 2)
except OSError:
f.seek(0)
data = f.read()
text = data.decode("utf-8", errors="replace")
except Exception:
return "permanent", None
return classify_text(text), parse_retry_after(text)

90
src/preflight.py Normal file
View File

@@ -0,0 +1,90 @@
"""ORCH-1 resilience: cheap preflight check (CLI / network available?).
Goal: before the worker claims a job, confirm the claude CLI binary and runtime
are reachable WITHOUT spending any tokens. We only do local/cheap checks:
1. os.path.exists(CLAUDE_BIN) -- instant
2. `claude --version` (timeout 5s) -- spawns CLI, does NOT call the API
The result is cached for `preflight_cache_ttl` seconds so we do not re-run
`claude --version` on every worker tick.
🚫 We deliberately do NOT do a prompt ping (ping->pong) — that would burn the
rate limit and add latency. Preflight is local-only.
"""
import os
import time
import logging
import subprocess
from .config import settings
logger = logging.getLogger("orchestrator.preflight")
_VERSION_TIMEOUT = 5
class _PreflightCache:
def __init__(self):
self.ts: float = 0.0
self.ok: bool = False
self.reason: str = "not checked yet"
_cache = _PreflightCache()
def _claude_bin() -> str:
return getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe"
def _run_version(bin_path: str) -> tuple[bool, str]:
"""`claude --version` — proves the CLI runs without touching the API."""
try:
r = subprocess.run(
[bin_path, "--version"],
capture_output=True,
text=True,
timeout=_VERSION_TIMEOUT,
)
if r.returncode == 0:
return True, (r.stdout or r.stderr or "").strip()[:120] or "ok"
return False, f"--version exit {r.returncode}: {(r.stderr or r.stdout).strip()[:120]}"
except subprocess.TimeoutExpired:
return False, f"--version timed out after {_VERSION_TIMEOUT}s"
except FileNotFoundError:
return False, "claude binary not found (FileNotFoundError)"
except Exception as e: # pragma: no cover - defensive
return False, f"--version error: {e}"
def _compute() -> tuple[bool, str]:
bin_path = _claude_bin()
if not os.path.exists(bin_path):
return False, f"CLAUDE_BIN not found: {bin_path}"
return _run_version(bin_path)
def check(force: bool = False) -> tuple[bool, str]:
"""Return (ok, reason). Cached for preflight_cache_ttl seconds.
force=True bypasses the cache (used by the breaker half-open probe / tests).
"""
now = time.time()
ttl = settings.preflight_cache_ttl
if not force and _cache.ts > 0 and (now - _cache.ts) < ttl:
return _cache.ok, _cache.reason
ok, reason = _compute()
_cache.ts = now
_cache.ok = ok
_cache.reason = reason
if not ok:
logger.warning(f"Preflight FAIL: {reason}")
return ok, reason
def reset_cache() -> None:
"""Invalidate the cache (tests / forced recheck)."""
_cache.ts = 0.0
_cache.ok = False
_cache.reason = "reset"