"""ORCH-044 (P1): token-free preflight auth gate. `claude --version` answers even when claude is logged OUT, so version-only preflight was blind to auth. These tests cover the new local credentials check: missing / expired / valid token, broken JSON fail-safe, no network, caching, HOME-correct path resolution, and the queue-worker claim gate. No real claude/Popen is spawned: `_run_version` is stubbed and credentials live in tmp files. DB is a fresh per-test sqlite (mirrors tests/test_resilience.py). """ import os import json import socket import tempfile import pytest _test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_preflight_auth.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ["ORCH_GITEA_TOKEN"] = "test-token" os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" import src.db as db from src.db import init_db, enqueue_job, get_job, count_running_jobs from src import preflight from src.queue_worker import QueueWorker from src.agents.launcher import AgentLauncher @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): monkeypatch.setattr(db.settings, "db_path", str(tmp_path / "res.db")) init_db() preflight.reset_cache() # auth check on by default; large TTL unless a test overrides it. monkeypatch.setattr(preflight.settings, "preflight_check_auth", True) yield def _fake_bin(monkeypatch, tmp_path): """A bin path that exists + a --version that always succeeds (auth-agnostic).""" b = tmp_path / "claude" b.write_text("#!/bin/sh\necho v1\n") monkeypatch.setattr(preflight, "_claude_bin", lambda: str(b)) monkeypatch.setattr(preflight, "_run_version", lambda b: (True, "1.2.3")) def _write_creds(tmp_path, *, expires_ms=None, access_token="tok", oauth=True, raw=None): path = tmp_path / ".credentials.json" if raw is not None: path.write_text(raw) return path body = {} if oauth: oa = {"accessToken": access_token} if expires_ms is not None: oa["expiresAt"] = expires_ms body["claudeAiOauth"] = oa path.write_text(json.dumps(body)) return path # --------------------------------------------------------------------------- # TC-01 / AC-1: not logged in (no credentials file) -> FAIL # --------------------------------------------------------------------------- def test_missing_credentials_fails(monkeypatch, tmp_path): _fake_bin(monkeypatch, tmp_path) monkeypatch.setattr(preflight, "_credentials_path", lambda: str(tmp_path / "nope.json")) ok, reason = preflight.check(force=True) assert ok is False assert "logged in" in reason.lower() or "credentials" in reason.lower() # --------------------------------------------------------------------------- # TC-02 / AC-2: expired OAuth token -> FAIL # --------------------------------------------------------------------------- def test_expired_token_fails(monkeypatch, tmp_path): _fake_bin(monkeypatch, tmp_path) past = (int(__import__("time").time()) - 3600) * 1000 # 1h ago, epoch ms creds = _write_creds(tmp_path, expires_ms=past) monkeypatch.setattr(preflight, "_credentials_path", lambda: str(creds)) ok, reason = preflight.check(force=True) assert ok is False assert "expired" in reason.lower() # --------------------------------------------------------------------------- # TC-03 / AC-3: valid login -> OK (no regression) # --------------------------------------------------------------------------- def test_valid_login_ok(monkeypatch, tmp_path): _fake_bin(monkeypatch, tmp_path) future = (int(__import__("time").time()) + 3600) * 1000 # 1h ahead creds = _write_creds(tmp_path, expires_ms=future) monkeypatch.setattr(preflight, "_credentials_path", lambda: str(creds)) ok, reason = preflight.check(force=True) assert ok is True def test_token_without_expiry_is_ok(monkeypatch, tmp_path): # accessToken present but no expiresAt -> cannot prove expiry -> OK (ADR §P1.5). _fake_bin(monkeypatch, tmp_path) creds = _write_creds(tmp_path, expires_ms=None) monkeypatch.setattr(preflight, "_credentials_path", lambda: str(creds)) ok, _ = preflight.check(force=True) assert ok is True # --------------------------------------------------------------------------- # TC-04 / AC-1: broken / unreadable credentials JSON -> FAIL (no exception) # --------------------------------------------------------------------------- def test_broken_json_fails_without_raising(monkeypatch, tmp_path): _fake_bin(monkeypatch, tmp_path) creds = _write_creds(tmp_path, raw="{ this is not valid json ") monkeypatch.setattr(preflight, "_credentials_path", lambda: str(creds)) ok, reason = preflight.check(force=True) # must not raise assert ok is False assert "logged in" in reason.lower() or "unreadable" in reason.lower() def test_no_oauth_block_fails(monkeypatch, tmp_path): _fake_bin(monkeypatch, tmp_path) creds = _write_creds(tmp_path, oauth=False) monkeypatch.setattr(preflight, "_credentials_path", lambda: str(creds)) ok, reason = preflight.check(force=True) assert ok is False assert "oauth" in reason.lower() or "logged in" in reason.lower() # --------------------------------------------------------------------------- # TC-05 / AC-5: token-free — no network call in the auth path # --------------------------------------------------------------------------- def test_auth_check_makes_no_network_call(monkeypatch, tmp_path): _fake_bin(monkeypatch, tmp_path) future = (int(__import__("time").time()) + 3600) * 1000 creds = _write_creds(tmp_path, expires_ms=future) monkeypatch.setattr(preflight, "_credentials_path", lambda: str(creds)) def _no_net(*a, **k): raise AssertionError("token-free auth check must not open a socket") monkeypatch.setattr(socket, "socket", _no_net) ok, _ = preflight.check(force=True) assert ok is True # --------------------------------------------------------------------------- # TC-06 / AC-6: auth result cached within preflight_cache_ttl # --------------------------------------------------------------------------- def test_auth_result_cached_within_ttl(monkeypatch, tmp_path): _fake_bin(monkeypatch, tmp_path) monkeypatch.setattr(preflight.settings, "preflight_cache_ttl", 999) calls = {"n": 0} real = preflight._check_auth future = (int(__import__("time").time()) + 3600) * 1000 creds = _write_creds(tmp_path, expires_ms=future) monkeypatch.setattr(preflight, "_credentials_path", lambda: str(creds)) def counting(): calls["n"] += 1 return real() monkeypatch.setattr(preflight, "_check_auth", counting) preflight.reset_cache() preflight.check() # miss -> reads creds preflight.check() # cached -> no re-read preflight.check() assert calls["n"] == 1 # --------------------------------------------------------------------------- # TC-07 / TR-1.3: credentials path resolves from AGENT_HOME, not process env # --------------------------------------------------------------------------- def test_credentials_path_follows_agent_home(monkeypatch, tmp_path): agent_home = tmp_path / "agent_home" agent_home.mkdir() monkeypatch.setattr(AgentLauncher, "AGENT_HOME", str(agent_home)) monkeypatch.setattr(preflight.settings, "claude_credentials_path", "") # The orchestrator process HOME points somewhere else entirely. monkeypatch.setenv("HOME", str(tmp_path / "orchestrator_home")) resolved = preflight._credentials_path() assert resolved == str(agent_home / ".claude" / ".credentials.json") assert str(tmp_path / "orchestrator_home") not in resolved def test_explicit_credentials_path_wins(monkeypatch, tmp_path): monkeypatch.setattr(preflight.settings, "claude_credentials_path", str(tmp_path / "explicit.json")) assert preflight._credentials_path() == str(tmp_path / "explicit.json") # --------------------------------------------------------------------------- # TC-08 / AC-4: auth-fail blocks the queue-worker claim # --------------------------------------------------------------------------- def test_worker_does_not_claim_when_auth_fails(monkeypatch, tmp_path): _fake_bin(monkeypatch, tmp_path) monkeypatch.setattr(preflight, "_credentials_path", lambda: str(tmp_path / "missing.json")) # not logged in called = {"launch": False} monkeypatch.setattr("src.queue_worker.launcher.launch_job", lambda job: called.__setitem__("launch", True)) jid = enqueue_job("analyst", "r") w = QueueWorker(max_concurrency=1, poll_interval=0.01) w._drain_once() assert called["launch"] is False assert get_job(jid)["status"] == "queued" assert count_running_jobs() == 0 assert w.last_preflight_ok is False assert "logged in" in w.last_preflight_reason.lower() \ or "credentials" in w.last_preflight_reason.lower() # --------------------------------------------------------------------------- # Toggle off: preflight_check_auth=False keeps the old version-only behaviour # --------------------------------------------------------------------------- def test_auth_toggle_off_skips_check(monkeypatch, tmp_path): _fake_bin(monkeypatch, tmp_path) monkeypatch.setattr(preflight.settings, "preflight_check_auth", False) monkeypatch.setattr(preflight, "_credentials_path", lambda: str(tmp_path / "missing.json")) ok, _ = preflight.check(force=True) assert ok is True # auth not consulted -> version-only pass # --------------------------------------------------------------------------- # is_auth_failure_text: post-factum marker detection (P1b) # --------------------------------------------------------------------------- @pytest.mark.parametrize("text", [ "Error: Not logged in. Please run /login", "401 Unauthorized", "invalid api key provided", ]) def test_is_auth_failure_text_positive(text): assert preflight.is_auth_failure_text(text) is True @pytest.mark.parametrize("text", ["", "429 rate limit", "Traceback ValueError"]) def test_is_auth_failure_text_negative(text): assert preflight.is_auth_failure_text(text) is False