diff --git a/src/error_classifier.py b/src/error_classifier.py new file mode 100644 index 0000000..78fd5a4 --- /dev/null +++ b/src/error_classifier.py @@ -0,0 +1,87 @@ +"""ORCH-1 resilience: classify an agent failure as transient vs permanent. + +Rate limits / overload / network blips cannot be reliably predicted in advance, +so we classify *after the run* by scanning the agent's combined stdout/stderr log +(B-2 sends both to /app/data/runs/.log). + +- transient -> 429 / rate limit / overloaded / network / quota-exhausted etc. + => backoff + transient retry (separate counter, larger budget). +- permanent -> a genuine code fault / agent error + => normal attempts < max_attempts, then 'failed'. + +Also extracts a Retry-After hint (seconds) when the server provided one. +""" +import re + +# Case-insensitive substrings/patterns that signal a transient/rate-limit issue. +_TRANSIENT_PATTERNS = [ + r"\b429\b", + r"rate[\s_-]*limit", + r"rate_limit_error", + r"overloaded", + r"overloaded_error", + r"too many requests", + r"quota", + r"insufficient[_\s-]*quota", + r"retry[\s-]*after", + r"service unavailable", + r"\b503\b", + r"\b529\b", + r"timed out", + r"timeout", + r"connection (reset|refused|error|aborted)", + r"temporarily unavailable", + r"econnreset", + r"etimedout", +] + +_TRANSIENT_RE = re.compile("|".join(_TRANSIENT_PATTERNS), re.IGNORECASE) + +# Retry-After: header style ("Retry-After: 30") or JSON ("retry_after": 30) or +# "retry after 30 seconds". Returns the integer seconds. +_RETRY_AFTER_RE = re.compile( + r"retry[\s_-]*after[\"']?\s*[:=]?\s*[\"']?\s*(\d+)", + re.IGNORECASE, +) + + +def classify_text(text: str) -> str: + """Return 'transient' or 'permanent' for a chunk of log/stderr text.""" + if not text: + return "permanent" + return "transient" if _TRANSIENT_RE.search(text) else "permanent" + + +def parse_retry_after(text: str) -> int | None: + """Return Retry-After seconds if present in the text, else None.""" + if not text: + return None + m = _RETRY_AFTER_RE.search(text) + if m: + try: + return int(m.group(1)) + except (TypeError, ValueError): + return None + return None + + +def classify_log_file(path: str, tail_bytes: int = 16384) -> tuple[str, int | None]: + """Classify the tail of a log file. + + Reads the last `tail_bytes` of the log (rate-limit messages appear near the + end) and returns (classification, retry_after_seconds_or_None). + On any read error, treats it as 'permanent' (no special backoff). + """ + if not path: + return "permanent", None + try: + with open(path, "rb") as f: + try: + f.seek(-tail_bytes, 2) + except OSError: + f.seek(0) + data = f.read() + text = data.decode("utf-8", errors="replace") + except Exception: + return "permanent", None + return classify_text(text), parse_retry_after(text) diff --git a/src/preflight.py b/src/preflight.py new file mode 100644 index 0000000..717cee2 --- /dev/null +++ b/src/preflight.py @@ -0,0 +1,90 @@ +"""ORCH-1 resilience: cheap preflight check (CLI / network available?). + +Goal: before the worker claims a job, confirm the claude CLI binary and runtime +are reachable WITHOUT spending any tokens. We only do local/cheap checks: + + 1. os.path.exists(CLAUDE_BIN) -- instant + 2. `claude --version` (timeout 5s) -- spawns CLI, does NOT call the API + +The result is cached for `preflight_cache_ttl` seconds so we do not re-run +`claude --version` on every worker tick. + +🚫 We deliberately do NOT do a prompt ping (ping->pong) — that would burn the +rate limit and add latency. Preflight is local-only. +""" +import os +import time +import logging +import subprocess + +from .config import settings + +logger = logging.getLogger("orchestrator.preflight") + +_VERSION_TIMEOUT = 5 + + +class _PreflightCache: + def __init__(self): + self.ts: float = 0.0 + self.ok: bool = False + self.reason: str = "not checked yet" + + +_cache = _PreflightCache() + + +def _claude_bin() -> str: + return getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe" + + +def _run_version(bin_path: str) -> tuple[bool, str]: + """`claude --version` — proves the CLI runs without touching the API.""" + try: + r = subprocess.run( + [bin_path, "--version"], + capture_output=True, + text=True, + timeout=_VERSION_TIMEOUT, + ) + if r.returncode == 0: + return True, (r.stdout or r.stderr or "").strip()[:120] or "ok" + return False, f"--version exit {r.returncode}: {(r.stderr or r.stdout).strip()[:120]}" + except subprocess.TimeoutExpired: + return False, f"--version timed out after {_VERSION_TIMEOUT}s" + except FileNotFoundError: + return False, "claude binary not found (FileNotFoundError)" + except Exception as e: # pragma: no cover - defensive + return False, f"--version error: {e}" + + +def _compute() -> tuple[bool, str]: + bin_path = _claude_bin() + if not os.path.exists(bin_path): + return False, f"CLAUDE_BIN not found: {bin_path}" + return _run_version(bin_path) + + +def check(force: bool = False) -> tuple[bool, str]: + """Return (ok, reason). Cached for preflight_cache_ttl seconds. + + force=True bypasses the cache (used by the breaker half-open probe / tests). + """ + now = time.time() + ttl = settings.preflight_cache_ttl + if not force and _cache.ts > 0 and (now - _cache.ts) < ttl: + return _cache.ok, _cache.reason + ok, reason = _compute() + _cache.ts = now + _cache.ok = ok + _cache.reason = reason + if not ok: + logger.warning(f"Preflight FAIL: {reason}") + return ok, reason + + +def reset_cache() -> None: + """Invalidate the cache (tests / forced recheck).""" + _cache.ts = 0.0 + _cache.ok = False + _cache.reason = "reset"