"""Legacy root-owned ownership detect + actionable worktree error (ORCH-057). Background ---------- ORCH-040 moved both containers to ``user: "1000:1000"`` by editing ONLY ``docker-compose.yml``. Changing ``user:`` does NOT change the owner of files that the previous root container already created. The bind-mount ``/home/slin/repos -> /repos`` therefore still held ``root:root`` directories (``_wt/``, old worktrees, ``.git/objects``, ``data/runs``). Under uid 1000 (no root) ``git_worktree. ensure_worktree`` could not create a worktree next to a ``root:root`` ``/repos/_wt`` and failed with a RAW ``fatal: could not create leading directories … Permission denied`` — the agent never started and the operator had no diagnosis. The container runs as numeric uid 1000 WITHOUT root, so it physically cannot ``chown`` foreign (root-owned) files — only DETECT + DIAGNOSE. The real fix is the documented operator procedure (INFRA.md «Миграция uid»), run once on the host. This leaf (ADR-001) provides three additive, kill-switch-reversible primitives: * ``classify_worktree_error`` / ``build_worktree_help`` — the pure classifier + actionable message used by ``git_worktree.ensure_worktree`` (D1 / FR-1). * ``scan_ownership`` — a cheap, TTL-cached, never-raise walk of the infra roots that reports whether any file has ``uid != target_uid`` (D2 / FR-2), used by the startup hook (D3 / FR-3) and the ``GET /queue`` ``fs_ownership`` block. * ``normalize`` — an opt-in ``chown`` that runs ONLY when the process is privileged (root / CAP_CHOWN); under uid 1000 it is a no-op + honest log, NOT an error (D4 / FR-4). Invariants (never broken): * **never-raise** (NFR-3): every public function degrades to a conservative, non-blocking default and NEVER propagates into the worker / lifespan / worktree path. A detect error -> WARNING + ``mismatch=False`` (do not block / panic). * **applies() first** (NFR-2): the expensive walk runs only when the layer is REAL for the repo (``fs_normalize_enabled`` + scope; empty CSV -> self-hosting only), so enduro-trails is never scanned at the default config. * **kill-switch reversible** (D6): ``fs_normalize_enabled=False`` -> all code inert, behaviour 1:1 as before ORCH-057 (the actionable error contract too). * **no chown without privilege** (NFR-1): the code only reads / detects / diagnoses; a real ``chown`` happens only when privileged and ``fs_normalize_auto=True``. Leaf: imports only ``config`` / ``logging`` / ``os`` / ``time`` (+ lazily ``qg.checks.is_self_hosting_repo`` / ``notifications`` for scope / observability). It never imports ``git_worktree`` / ``stage_engine`` / ``launcher`` (``git_worktree`` imports THIS module, so the dependency is one-way). """ from __future__ import annotations import errno import logging import os import time from dataclasses import dataclass, field from .config import settings logger = logging.getLogger("orchestrator.fs_normalize") # Permission-class markers in a git stderr / OSError string (D1 / TR-1). Narrow on # purpose — a non-permission error (real branch conflict, missing origin/main, # timeout) must NOT be reclassified (AC-2 FAIL-condition), so we match only the # unambiguous "no permission to create the file/object" phrases. _PERM_MARKERS = ( "permission denied", "could not create leading directories", "insufficient permission for adding an object", "operation not permitted", ) # --------------------------------------------------------------------------- # Resolution helpers (target uid, scope, roots) # --------------------------------------------------------------------------- def _resolve_target_uid(target_uid: int | None = None) -> int: """The uid the scan compares against (the subject that "cannot create files"). Resolution order (D2 / TR-7): explicit ``target_uid`` arg > ``os.getuid()`` (the uid the process really runs as) > ``settings.fs_target_uid`` fallback (default 1000) when ``os.getuid()`` is unavailable. Never raises. """ if target_uid is not None: return int(target_uid) try: return os.getuid() except (AttributeError, OSError): # pragma: no cover - non-POSIX fallback try: return int(settings.fs_target_uid) except (TypeError, ValueError): return 1000 def _scope_repos() -> list[str]: """Repos the layer is REAL for (used to build the default ``.git`` roots). Non-empty ``fs_normalize_repos`` CSV -> those repos; empty -> self-hosting only (``orchestrator``), mirroring ``coverage_gate``. Never raises -> [] on error. """ try: raw = (settings.fs_normalize_repos or "").strip() except Exception: # noqa: BLE001 - never-raise return [] if raw: return [r.strip() for r in raw.split(",") if r.strip()] try: from .qg.checks import SELF_HOSTING_REPO return [SELF_HOSTING_REPO] except Exception: # noqa: BLE001 return ["orchestrator"] def _runs_root() -> str: """``data/runs`` root (per ADR: ``os.path.dirname(db_path)/runs``).""" try: rd = getattr(settings, "runs_dir", None) if rd: return rd except Exception: # noqa: BLE001 pass try: return os.path.join(os.path.dirname(settings.db_path), "runs") except Exception: # noqa: BLE001 return "/app/data/runs" def _default_roots() -> list[str]: """The default scan roots (D2): ``/repos/_wt``, ``data/runs`` and each in-scope repo's ``.git/objects`` + ``.git/worktrees``. Never raises -> [] on error. """ roots: list[str] = [] try: wt = getattr(settings, "worktrees_dir", None) if wt: roots.append(wt) roots.append(_runs_root()) repos_dir = getattr(settings, "repos_dir", "/repos") for repo in _scope_repos(): base = os.path.join(repos_dir, repo, ".git") roots.append(os.path.join(base, "objects")) roots.append(os.path.join(base, "worktrees")) except Exception as e: # noqa: BLE001 - never-raise logger.warning("fs_normalize._default_roots error: %s", e) return roots def _resolve_roots(roots: list[str] | None = None) -> list[str]: """Resolve scan roots: explicit arg > ``fs_scan_roots`` CSV > the default set.""" if roots is not None: return list(roots) try: raw = (settings.fs_scan_roots or "").strip() except Exception: # noqa: BLE001 raw = "" if raw: return [r.strip() for r in raw.split(",") if r.strip()] return _default_roots() # --------------------------------------------------------------------------- # Conditionality (mirrors coverage_gate_applies) # --------------------------------------------------------------------------- def applies(repo: str) -> bool: """Whether the ORCH-057 layer is REAL for this repo (D6 / NFR-2). * ``fs_normalize_enabled=False`` -> always False (kill-switch). * ``fs_normalize_repos`` (CSV) non-empty -> real only for the listed repos. * empty CSV -> real ONLY for the self-hosting repo (``orchestrator``). Never raises -> False (the safe no-op default). """ try: if not settings.fs_normalize_enabled: return False raw = (settings.fs_normalize_repos or "").strip() if raw: allowed = {r.strip().lower() for r in raw.split(",") if r.strip()} return (repo or "").strip().lower() in allowed from .qg.checks import is_self_hosting_repo return is_self_hosting_repo(repo) except Exception as e: # noqa: BLE001 - never-raise contract logger.warning("fs_normalize.applies error for %s: %s", repo, e) return False # --------------------------------------------------------------------------- # D1: actionable worktree error (pure classifier + message) # --------------------------------------------------------------------------- def classify_worktree_error(text: str | None) -> bool: """Pure: True iff ``text`` looks like a "no permission to create" failure. Matches only the narrow ``_PERM_MARKERS`` so a non-permission git error keeps its original contract (AC-2). Never raises -> False on bad input. """ try: t = (text or "").lower() return any(m in t for m in _PERM_MARKERS) except Exception: # noqa: BLE001 return False def is_permission_failure(*, stderr: str | None = None, exc: BaseException | None = None) -> bool: """True iff a worktree failure is the legacy-ownership permission class. Considers both a git ``stderr`` string (marker match) and an ``OSError`` (``PermissionError`` or ``errno`` in ``EACCES``/``EPERM``). Never raises. """ try: if isinstance(exc, PermissionError): return True if isinstance(exc, OSError) and exc.errno in (errno.EACCES, errno.EPERM): return True if classify_worktree_error(stderr): return True if exc is not None and classify_worktree_error(str(exc)): return True except Exception: # noqa: BLE001 return False return False def build_worktree_help(repo: str, branch: str, target_uid: int | None = None, raw: str = "") -> str: """Build the actionable RuntimeError message for a permission-class worktree failure (D1): names the root cause + the healing command + the INFRA.md procedure, instead of a raw git stderr (AC-2). Never raises. """ try: tuid = _resolve_target_uid(target_uid) wt_dir = getattr(settings, "worktrees_dir", "/repos/_wt") git_dir = os.path.join(getattr(settings, "repos_dir", "/repos"), repo, ".git") msg = ( f"Cannot create git worktree for {repo}:{branch} — permission denied. " f"Likely cause: legacy root-owned files in {wt_dir} or {git_dir} left over " f"from before the uid migration (ORCH-040). This container runs as uid " f"{tuid} without root and cannot chown foreign files itself. Fix (run once " f"on the host as root): `sudo chown -R {tuid}:{tuid} {wt_dir}` and " f"`sudo chown -R {tuid}:{tuid} {git_dir}`. See docs/operations/INFRA.md " f"section «Миграция uid: обязательная нормализация legacy root-файлов»." ) if raw: msg += f" (underlying error: {raw.strip()})" return msg except Exception: # noqa: BLE001 - never-raise; degrade to a minimal hint return ( f"Cannot create git worktree for {repo}:{branch} — permission denied " f"(legacy root-owned files; see docs/operations/INFRA.md «Миграция uid»)." ) # --------------------------------------------------------------------------- # D2: ownership scan (TTL-cached, never-raise, early-exit per root) # --------------------------------------------------------------------------- @dataclass class OwnershipScan: """Result of an ownership scan (D2). ``mismatch`` is the boolean verdict.""" mismatch: bool target_uid: int roots_checked: list[str] = field(default_factory=list) roots_mismatch: list[str] = field(default_factory=list) sample_path: str | None = None count: int | None = None checked_at: float = 0.0 enabled: bool = True def to_dict(self) -> dict: return { "enabled": self.enabled, "mismatch": self.mismatch, "target_uid": self.target_uid, "roots_checked": self.roots_checked, "roots_mismatch": self.roots_mismatch, "sample_path": self.sample_path, "count": self.count, "checked_at": self.checked_at, } class _ScanCache: def __init__(self): self.ts: float = 0.0 self.key: tuple | None = None self.result: OwnershipScan | None = None _cache = _ScanCache() def reset_cache() -> None: """Invalidate the TTL detect cache (tests / forced recheck).""" _cache.ts = 0.0 _cache.key = None _cache.result = None def _first_mismatch(root: str, target_uid: int) -> str | None: """Return the first path under ``root`` whose ``st_uid != target_uid`` (early exit), else None. ``os.lstat`` (not ``stat``) so a symlink's own ownership is judged, never its target. Never raises -> None on any walk error. """ try: if not os.path.exists(root): return None try: if os.lstat(root).st_uid != target_uid: return root except OSError: return None for dirpath, dirnames, filenames in os.walk(root, onerror=None): for name in dirnames: p = os.path.join(dirpath, name) try: if os.lstat(p).st_uid != target_uid: return p except OSError: continue for name in filenames: p = os.path.join(dirpath, name) try: if os.lstat(p).st_uid != target_uid: return p except OSError: continue except Exception as e: # noqa: BLE001 - never-raise logger.warning("fs_normalize._first_mismatch error for %s: %s", root, e) return None return None def _scan(roots: list[str], target_uid: int) -> OwnershipScan: """Walk each root, early-exiting per root at its first mismatch. The clean case (no mismatch) walks fully; the dirty case stops fast per root (TR-2 cost). Lists every affected root (informative verdict). Never raises -> conservative ``mismatch=False`` on a wholesale error. """ roots_checked: list[str] = [] roots_mismatch: list[str] = [] sample_path: str | None = None try: for root in roots: if not os.path.exists(root): continue roots_checked.append(root) hit = _first_mismatch(root, target_uid) if hit is not None: roots_mismatch.append(root) if sample_path is None: sample_path = hit except Exception as e: # noqa: BLE001 - never-raise -> conservative verdict logger.warning("fs_normalize._scan error -> mismatch=False: %s", e) return OwnershipScan( mismatch=False, target_uid=target_uid, roots_checked=roots_checked, roots_mismatch=[], checked_at=time.time(), ) return OwnershipScan( mismatch=bool(roots_mismatch), target_uid=target_uid, roots_checked=roots_checked, roots_mismatch=roots_mismatch, sample_path=sample_path, checked_at=time.time(), ) def scan_ownership( roots: list[str] | None = None, target_uid: int | None = None, force: bool = False, ) -> OwnershipScan: """Detect files with ``uid != target_uid`` across the infra roots (D2 / FR-2). TTL-cached (``fs_scan_cache_ttl_s``, mirrors ``preflight._cache``): a repeat call inside the window with the SAME (roots, target_uid) returns the cached result without re-walking; ``force=True`` (or ``reset_cache()``) re-scans. Kill-switch off -> an inert ``mismatch=False`` result (``enabled=False``). Never raises. """ try: if not settings.fs_normalize_enabled: return OwnershipScan( mismatch=False, target_uid=_resolve_target_uid(target_uid), checked_at=time.time(), enabled=False, ) resolved_roots = _resolve_roots(roots) tuid = _resolve_target_uid(target_uid) key = (tuple(resolved_roots), tuid) now = time.time() try: ttl = float(settings.fs_scan_cache_ttl_s) except (TypeError, ValueError): ttl = 300.0 if ( not force and _cache.result is not None and _cache.key == key and (now - _cache.ts) < ttl ): return _cache.result result = _scan(resolved_roots, tuid) _cache.ts = now _cache.key = key _cache.result = result return result except Exception as e: # noqa: BLE001 - never-raise -> conservative verdict logger.warning("fs_normalize.scan_ownership error -> mismatch=False: %s", e) return OwnershipScan( mismatch=False, target_uid=_resolve_target_uid(target_uid), checked_at=time.time(), ) # --------------------------------------------------------------------------- # D4: opt-in normalize (chown ONLY when privileged) — never init-container # --------------------------------------------------------------------------- def _is_privileged() -> bool: """True iff the process can chown foreign files (root). Under uid 1000 -> False. A practical check: ``os.geteuid() == 0``. A CAP_CHOWN-without-root environment still degrades to the honest no-op (a chown attempt would simply fail and be swallowed). Never raises -> False (the safe "not privileged" default). """ try: return os.geteuid() == 0 except (AttributeError, OSError): # pragma: no cover - non-POSIX return False def normalize(roots: list[str] | None = None, target_uid: int | None = None) -> dict: """Opt-in ``chown -R target_uid:target_uid`` over the roots, ONLY when the process is privileged (D4 / FR-4). Under uid 1000 (the prod-self case) it is a no-op + honest log "operator procedure required" — NOT an error. Gated by ``fs_normalize_auto`` at the call site; this function additionally self-guards on ``_is_privileged()``. Never raises. Returns a result dict ``{attempted, privileged, changed, errors, note}``. """ result = {"attempted": False, "privileged": False, "changed": 0, "errors": [], "note": ""} try: if not settings.fs_normalize_enabled: result["note"] = "disabled (fs_normalize_enabled=False)" return result tuid = _resolve_target_uid(target_uid) privileged = _is_privileged() result["privileged"] = privileged if not privileged: result["note"] = ( "not privileged (process runs as non-root) — chown of legacy " "root-owned files needs the operator procedure (docs/operations/" "INFRA.md «Миграция uid»)." ) logger.warning("fs_normalize.normalize: %s", result["note"]) return result result["attempted"] = True resolved_roots = _resolve_roots(roots) changed = 0 for root in resolved_roots: if not os.path.exists(root): continue for path in _iter_paths(root): try: if os.lstat(path).st_uid != tuid: os.chown(path, tuid, tuid, follow_symlinks=False) changed += 1 except OSError as e: result["errors"].append(f"{path}: {e}") result["changed"] = changed result["note"] = f"chown applied to {changed} path(s) over {len(resolved_roots)} root(s)" logger.info("fs_normalize.normalize: %s", result["note"]) return result except Exception as e: # noqa: BLE001 - never-raise logger.error("fs_normalize.normalize error: %s", e) result["note"] = f"error: {e}" return result def _iter_paths(root: str): """Yield ``root`` and every path beneath it (never raises per item).""" try: yield root for dirpath, dirnames, filenames in os.walk(root, onerror=None): for name in dirnames + filenames: yield os.path.join(dirpath, name) except Exception as e: # noqa: BLE001 logger.warning("fs_normalize._iter_paths error for %s: %s", root, e) # --------------------------------------------------------------------------- # Observability snapshot for GET /queue (D6 / AC-4) # --------------------------------------------------------------------------- def snapshot() -> dict: """Read-only ownership summary for GET /queue (``fs_ownership`` block, AC-4). Additive; uses the TTL-cached scan (no expensive walk on every /queue hit). never-raise: any error -> a minimal dict carrying the flags. """ try: enabled = bool(settings.fs_normalize_enabled) except Exception: # noqa: BLE001 enabled = False try: auto = bool(getattr(settings, "fs_normalize_auto", False)) except Exception: # noqa: BLE001 auto = False try: repos_cfg = getattr(settings, "fs_normalize_repos", "") or "" except Exception: # noqa: BLE001 repos_cfg = "" out = { "enabled": enabled, "auto": auto, "repos": repos_cfg, "target_uid": _resolve_target_uid(), "mismatch": False, "roots_checked": [], "roots_mismatch": [], "sample_path": None, "checked_at": None, } try: if enabled: scan = scan_ownership() out["mismatch"] = scan.mismatch out["target_uid"] = scan.target_uid out["roots_checked"] = scan.roots_checked out["roots_mismatch"] = scan.roots_mismatch out["sample_path"] = scan.sample_path out["checked_at"] = scan.checked_at or None except Exception as e: # noqa: BLE001 - never-raise -> minimal dict logger.warning("fs_normalize.snapshot error: %s", e) return out def healing_command(target_uid: int | None = None) -> str: """The one-line operator healing hint (startup WARNING / Telegram). Never raises.""" try: tuid = _resolve_target_uid(target_uid) wt_dir = getattr(settings, "worktrees_dir", "/repos/_wt") return ( f"sudo chown -R {tuid}:{tuid} {wt_dir} /.git data/runs " f"(см. docs/operations/INFRA.md «Миграция uid»)" ) except Exception: # noqa: BLE001 return "sudo chown -R 1000:1000 /repos/_wt (см. docs/operations/INFRA.md «Миграция uid»)"