Files
orchestrator/src/preflight.py
claude-bot 98b47fe021
Some checks failed
CI / test (push) Failing after 14s
CI / test (pull_request) Failing after 13s
feat(preflight): catch logged-out auth and treat empty result as failure
ORCH-044 closes two blind spots that let a single de-authenticated agent
stall the shared queue for all projects:

P1 — preflight auth gate. `claude --version` answers even when logged out,
so version-only preflight was blind to auth. Adds a token-free, network-free
check of <AGENT_HOME>/.claude/.credentials.json: missing/unreadable/no-oauth
or an expired `claudeAiOauth.expiresAt` (epoch ms, vs now + skew) => preflight
FAIL; absent expiry => OK (no false positives). Result is cached on the same
preflight_cache_ttl. Post-factum safety net: launcher detects auth markers
("not logged in" / "/login" / "unauthorized" / 401) in the run log and resets
the preflight cache so the next tick re-evaluates auth. Auth failure is a gate,
not a transient — it does not spin the circuit breaker. Emergency toggle
ORCH_PREFLIGHT_CHECK_AUTH=false restores version-only behaviour.

P3 — empty log / no result-JSON => job failed. exit_code==0 with an empty or
JSON-less run log no longer counts as success: a separate result_ok flag gates
stage advance + usage comments, fires a Telegram alert, and routes the job
through the normal transient/permanent failure path (exit_code integrity in
agent_runs preserved).

Scope: P2 (--effort) is intentionally excluded and tracked in ORCH-50.

New settings: ORCH_PREFLIGHT_CHECK_AUTH, ORCH_CLAUDE_CREDENTIALS_PATH,
ORCH_AUTH_EXPIRY_SKEW_SECONDS. Docs updated (INFRA.md, internals.md, CHANGELOG).

Refs: ORCH-044

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 08:11:27 +00:00

236 lines
8.8 KiB
Python

"""ORCH-1 resilience: cheap preflight check (CLI / network available?).
Goal: before the worker claims a job, confirm the claude CLI binary and runtime
are reachable WITHOUT spending any tokens. We only do local/cheap checks:
1. os.path.exists(CLAUDE_BIN) -- instant
2. `claude --version` (timeout 5s) -- spawns CLI, does NOT call the API
3. auth check (ORCH-044, P1) -- read the local OAuth credentials file
The result is cached for `preflight_cache_ttl` seconds so we do not re-run
`claude --version` (or re-read the credentials file) on every worker tick.
🚫 We deliberately do NOT do a prompt ping (ping->pong) — that would burn the
rate limit and add latency. Preflight is local-only.
ORCH-044 (P1): `claude --version` answers successfully even when claude is NOT
logged in (the version is local information), so version-only preflight was blind
to auth. We add a token-free auth gate: read <AGENT_HOME>/.claude/.credentials.json
and validate the OAuth token (presence + expiry). Combined with a post-factum
`Not logged in` marker detection (is_auth_failure_text), this stops a logged-out
instance from claiming jobs and silently dying with an empty run log. No network
call is ever made here.
"""
import os
import re
import json
import time
import logging
import subprocess
from .config import settings
logger = logging.getLogger("orchestrator.preflight")
_VERSION_TIMEOUT = 5
# ORCH-044 (P1b): post-factum auth-failure markers. If an agent started under a
# session that died/expired between preflight and spawn, these substrings in the
# run log identify the auth failure so the launcher can invalidate the preflight
# cache (forcing the next tick to re-evaluate auth proactively).
_AUTH_FAIL_RE = re.compile(
r"not logged in|please run\s*/login|invalid api key|unauthorized|\b401\b",
re.IGNORECASE,
)
class _PreflightCache:
def __init__(self):
self.ts: float = 0.0
self.ok: bool = False
self.reason: str = "not checked yet"
_cache = _PreflightCache()
def _claude_bin() -> str:
"""Resolve the claude binary preflight should check.
Must match the binary the launcher actually spawns. The launcher hardcodes
AgentLauncher.CLAUDE_BIN for the real Popen, so we prefer that; we only fall
back to settings.claude_bin / a default if it is somehow unset. (Note: the
container's ORCH_CLAUDE_BIN may point elsewhere; preflight follows the path
that is genuinely executed, not the unused env override.)
"""
try:
from .agents.launcher import AgentLauncher
launcher_bin = getattr(AgentLauncher, "CLAUDE_BIN", None)
if launcher_bin and os.path.exists(launcher_bin):
return launcher_bin
# Launcher path not present -> fall back to configured/default.
return launcher_bin or getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe"
except Exception:
return getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe"
def _run_version(bin_path: str) -> tuple[bool, str]:
"""`claude --version` — proves the CLI runs without touching the API."""
try:
r = subprocess.run(
[bin_path, "--version"],
capture_output=True,
text=True,
timeout=_VERSION_TIMEOUT,
)
if r.returncode == 0:
return True, (r.stdout or r.stderr or "").strip()[:120] or "ok"
return False, f"--version exit {r.returncode}: {(r.stderr or r.stdout).strip()[:120]}"
except subprocess.TimeoutExpired:
return False, f"--version timed out after {_VERSION_TIMEOUT}s"
except FileNotFoundError:
return False, "claude binary not found (FileNotFoundError)"
except Exception as e: # pragma: no cover - defensive
return False, f"--version error: {e}"
def _agent_home() -> str:
"""Resolve the HOME the launcher actually spawns claude under (ORCH-044, TR-1.3).
The auth credentials live under the *agent's* HOME (/home/slin), which the
launcher injects into the claude subprocess env — NOT the orchestrator
process HOME. We mirror _claude_bin()'s "follow the genuinely executed path"
approach by reading AgentLauncher.AGENT_HOME. Falls back to the known default
if the launcher cannot be imported (e.g. isolated unit test).
"""
try:
from .agents.launcher import AgentLauncher
home = getattr(AgentLauncher, "AGENT_HOME", None)
if home:
return home
except Exception:
pass
return "/home/slin"
def _credentials_path() -> str:
"""Path to claude's OAuth credentials file (ORCH-044, P1).
settings.claude_credentials_path wins when set; otherwise
<AGENT_HOME>/.claude/.credentials.json.
"""
explicit = (getattr(settings, "claude_credentials_path", "") or "").strip()
if explicit:
return explicit
return os.path.join(_agent_home(), ".claude", ".credentials.json")
def _iso(epoch_ms) -> str:
"""Best-effort epoch-ms -> ISO-8601 UTC string (for human-readable reasons)."""
try:
from datetime import datetime, timezone
return datetime.fromtimestamp(int(epoch_ms) / 1000, tz=timezone.utc).isoformat()
except Exception:
return str(epoch_ms)
def is_auth_failure_text(text: str) -> bool:
"""ORCH-044 (P1b): True if `text` contains a claude auth-failure marker.
Used post-factum on a run log so the launcher can tell an auth death apart
from a generic failure and reset the preflight cache. Never raises.
"""
if not text:
return False
try:
return bool(_AUTH_FAIL_RE.search(text))
except Exception:
return False
def _check_auth() -> tuple[bool, str]:
"""ORCH-044 (P1a): token-free local auth gate. Never raises.
Steps (ADR-001 §P1):
1. credentials file missing / unreadable / invalid JSON -> not ok.
2. no claudeAiOauth block / accessToken -> not ok.
3. claudeAiOauth.expiresAt (epoch ms) <= now + skew -> expired -> not ok.
4. accessToken present but expiresAt absent/unparsable -> OK (cannot prove
expiry; we do not manufacture false positives that would wedge the shared
queue — see ADR Risks R-1).
Fail-safe: any unexpected error returns (False, ...) so a logged-out / broken
state never claims a job (BR-2 / TR-3.5). This reads only a local file — no
network call, no token spend (BR-1 / AC-5).
"""
try:
path = _credentials_path()
if not os.path.exists(path):
return False, f"claude not logged in: credentials missing ({path})"
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
except (OSError, ValueError) as e:
return False, f"claude not logged in: credentials unreadable ({e})"
oauth = data.get("claudeAiOauth") if isinstance(data, dict) else None
if not isinstance(oauth, dict) or not oauth.get("accessToken"):
return False, "claude not logged in: no oauth token"
expires = oauth.get("expiresAt")
if expires is None:
return True, "auth ok (no expiry recorded)"
try:
expires_ms = int(expires)
except (TypeError, ValueError):
return True, "auth ok (unparsable expiry)"
skew_ms = int(getattr(settings, "auth_expiry_skew_seconds", 0) or 0) * 1000
now_ms = int(time.time() * 1000)
if expires_ms <= now_ms + skew_ms:
return False, f"OAuth token expired at {_iso(expires_ms)}"
return True, "auth ok"
except Exception as e: # pragma: no cover - defensive fail-safe
return False, f"auth check error: {e}"
def _compute() -> tuple[bool, str]:
bin_path = _claude_bin()
if not os.path.exists(bin_path):
return False, f"CLAUDE_BIN not found: {bin_path}"
ok, reason = _run_version(bin_path)
if not ok:
return ok, reason
# ORCH-044 (P1): version is local info and answers even when logged out, so
# gate on a token-free auth check too. Toggleable for emergencies.
if getattr(settings, "preflight_check_auth", True):
auth_ok, auth_reason = _check_auth()
if not auth_ok:
return False, auth_reason
return True, reason
def check(force: bool = False) -> tuple[bool, str]:
"""Return (ok, reason). Cached for preflight_cache_ttl seconds.
force=True bypasses the cache (used by the breaker half-open probe / tests).
"""
now = time.time()
ttl = settings.preflight_cache_ttl
if not force and _cache.ts > 0 and (now - _cache.ts) < ttl:
return _cache.ok, _cache.reason
ok, reason = _compute()
_cache.ts = now
_cache.ok = ok
_cache.reason = reason
if not ok:
logger.warning(f"Preflight FAIL: {reason}")
return ok, reason
def reset_cache() -> None:
"""Invalidate the cache (tests / forced recheck)."""
_cache.ts = 0.0
_cache.ok = False
_cache.reason = "reset"