"""Merge-gate core (ORCH-043): catch a branch up to the CURRENT origin/main, re-test it, and serialise merges with a file lease. Background ---------- The pipeline validates a branch against the ``main`` it was BRANCHED from, not the ``main`` at the moment of merge. Between "branch validated" and "branch merged" a parallel task may have advanced ``main`` -> a *semantic* merge conflict: git merges with no textual conflict, yet the combined ``main`` is broken. For the self-hosting ``orchestrator`` repo that means a red ``main`` of the tool serving every project. This module provides the deterministic (no-LLM) primitives the quality-gate ``check_branch_mergeable`` (src/qg/checks.py) composes on the ``deploy-staging -> deploy`` edge, BEFORE the deployer merges the PR: * ``branch_is_behind_main`` -> is the branch missing the latest origin/main? * ``auto_rebase_onto_main`` -> rebase onto origin/main + push --force-with-lease (ONLY the task branch; NEVER main). * ``retest_branch`` -> run the project test-suite in the caught-up worktree. * file lease (``acquire_merge_lease`` / ``release_merge_lease``) -> serialise the "catch-up + re-test + merge" of ONE repo, held from the gate to the actual merge. Invariants (self-hosting safety, ТЗ §10): * NEVER push or force-push ``main`` — the only force op is ``--force-with-lease`` on the task branch. * All git ops run in the per-branch worktree (ensure_worktree), never the shared clone. * Every public function honours a strict **never-raise** contract: any git/OS error -> ``(False, "")`` (or a safe bool), never a propagated exception. """ import json import logging import os import subprocess import time from .config import settings from .git_worktree import ensure_worktree, get_worktree_path logger = logging.getLogger("orchestrator.merge_gate") # git sub-command timeouts (seconds). Generous but bounded so a hung git never # wedges the monitor-thread that runs the gate. _FETCH_TIMEOUT = 60 _REBASE_TIMEOUT = 120 _PUSH_TIMEOUT = 60 _SHORT_TIMEOUT = 30 # --------------------------------------------------------------------------- # behind / ancestor detection # --------------------------------------------------------------------------- def branch_is_behind_main(repo: str, branch: str) -> bool: """Return True iff ``branch`` does NOT already contain the latest origin/main. A branch is "behind" when ``origin/main`` is **not** an ancestor of the branch HEAD (``git merge-base --is-ancestor origin/main HEAD`` returns non-zero). All work happens in the per-branch worktree (ORCH-2 / S-4 isolation). Never-raise (AC-9 / TC-03): any git/OS failure or an ambiguous result is treated as "cannot prove the branch is up-to-date" -> return True (force a rebase attempt rather than merge blindly). It returns a bool, never raises. """ try: wt = ensure_worktree(repo, branch) except Exception as e: # noqa: BLE001 - never-raise contract logger.warning("branch_is_behind_main: worktree error for %s/%s: %s", repo, branch, e) return True try: subprocess.run( ["git", "-C", wt, "fetch", "origin", "main"], capture_output=True, timeout=_FETCH_TIMEOUT, ) r = subprocess.run( ["git", "-C", wt, "merge-base", "--is-ancestor", "origin/main", "HEAD"], capture_output=True, timeout=_SHORT_TIMEOUT, ) except (subprocess.SubprocessError, OSError) as e: logger.warning("branch_is_behind_main: git error for %s/%s: %s", repo, branch, e) return True if r.returncode == 0: # origin/main IS an ancestor of HEAD -> branch already up-to-date. return False if r.returncode == 1: # origin/main is NOT an ancestor -> branch is behind. return True # Any other code (e.g. bad ref) -> ambiguous; do not merge blindly. logger.warning( "branch_is_behind_main: ambiguous merge-base rc=%s for %s/%s (treating as behind)", r.returncode, repo, branch, ) return True def _conflicted_files(wt: str) -> str: """Best-effort list of unmerged (conflicting) files in the worktree.""" try: r = subprocess.run( ["git", "-C", wt, "diff", "--name-only", "--diff-filter=U"], capture_output=True, text=True, timeout=_SHORT_TIMEOUT, ) files = r.stdout.strip().replace("\n", ", ") return files or "unknown" except (subprocess.SubprocessError, OSError): return "unknown" # --------------------------------------------------------------------------- # auto-rebase onto origin/main # --------------------------------------------------------------------------- def auto_rebase_onto_main(repo: str, branch: str) -> tuple[bool, str]: """Catch ``branch`` up to ``origin/main`` via rebase, then push it. Steps (all in the per-branch worktree): 1. ``git fetch origin main``. 2. ``git rebase origin/main``: - textual conflict (non-zero) -> ``git rebase --abort`` (leave worktree clean) -> ``(False, "rebase conflict: ")`` (AC-3). 3. clean rebase -> ``git push --force-with-lease origin `` — ONLY the task branch, NEVER ``main`` (AC-7) -> ``(True, "rebased onto origin/main")``. Never-raise (AC-9): any git/OS error -> ``(False, "")``. """ try: wt = ensure_worktree(repo, branch) except Exception as e: # noqa: BLE001 - never-raise contract return False, f"rebase setup error: {e}" try: subprocess.run( ["git", "-C", wt, "fetch", "origin", "main"], capture_output=True, timeout=_FETCH_TIMEOUT, ) r = subprocess.run( ["git", "-C", wt, "rebase", "origin/main"], capture_output=True, text=True, timeout=_REBASE_TIMEOUT, ) if r.returncode != 0: files = _conflicted_files(wt) subprocess.run( ["git", "-C", wt, "rebase", "--abort"], capture_output=True, timeout=_SHORT_TIMEOUT, ) logger.warning("auto_rebase: conflict on %s/%s: %s", repo, branch, files) return False, f"rebase conflict: {files}" # Clean rebase -> push ONLY the task branch with a lease (never main). p = subprocess.run( ["git", "-C", wt, "push", "--force-with-lease", "origin", branch], capture_output=True, text=True, timeout=_PUSH_TIMEOUT, ) if p.returncode != 0: detail = (p.stderr or p.stdout or "").strip()[:200] logger.warning("auto_rebase: push failed on %s/%s: %s", repo, branch, detail) return False, f"push --force-with-lease failed: {detail}" logger.info("auto_rebase: %s/%s rebased onto origin/main and pushed", repo, branch) return True, "rebased onto origin/main" except subprocess.TimeoutExpired: # Leave no half-finished rebase behind. try: subprocess.run( ["git", "-C", wt, "rebase", "--abort"], capture_output=True, timeout=_SHORT_TIMEOUT, ) except (subprocess.SubprocessError, OSError): pass return False, "rebase timeout" except (subprocess.SubprocessError, OSError) as e: return False, f"rebase error: {e}" # --------------------------------------------------------------------------- # re-test in the caught-up worktree # --------------------------------------------------------------------------- def retest_branch(repo: str, branch: str) -> tuple[bool, str]: """Run the project test-suite in the (already caught-up) branch worktree. Command: ``python -m pytest `` (default ``tests/``), matching the orchestrator CI / check_tests_local pattern. Bounded by ``settings.merge_retest_timeout_s``. Returns: * ``(True, "re-test green")`` — pytest rc == 0 * ``(False, "re-test timeout after s")`` — exceeded the timeout (AC-6) * ``(False, "re-test failed: ...")`` — non-zero rc, with output tail Never-raise (AC-9): any setup/OS error -> ``(False, "")``. """ wt = get_worktree_path(repo, branch) if not os.path.isdir(wt): # Caller usually rebased first (worktree exists); ensure as a fallback. try: wt = ensure_worktree(repo, branch) except Exception as e: # noqa: BLE001 - never-raise contract return False, f"re-test setup error: {e}" target = settings.merge_retest_target or "tests/" timeout = settings.merge_retest_timeout_s try: r = subprocess.run( ["python", "-m", "pytest", target, "-q"], cwd=wt, capture_output=True, text=True, timeout=timeout, ) except subprocess.TimeoutExpired: logger.warning("retest_branch: timeout (%ss) on %s/%s", timeout, repo, branch) return False, f"re-test timeout after {timeout}s" except (subprocess.SubprocessError, OSError) as e: return False, f"re-test error: {e}" if r.returncode == 0: return True, "re-test green" tail = ((r.stdout or "") + (r.stderr or ""))[-500:] logger.warning("retest_branch: red on %s/%s", repo, branch) return False, f"re-test failed: ...{tail}" # --------------------------------------------------------------------------- # merge-lease (serialise catch-up + re-test + merge per repo) # --------------------------------------------------------------------------- def _lease_path(repo: str) -> str: """Filesystem path of the per-repo merge lease (no schema change, ТЗ §4).""" return os.path.join(settings.repos_dir, f".merge-lease-{repo}.json") def _read_lease(path: str) -> dict | None: """Read+parse the lease file; None if missing or corrupt (never-raise).""" try: with open(path, "r", encoding="utf-8") as f: return json.loads(f.read()) except FileNotFoundError: return None except (OSError, ValueError) as e: logger.warning("merge-lease read error at %s: %s", path, e) return None def _write_lease(path: str, holder: dict) -> None: """Atomically (O_CREAT|O_EXCL) write the lease; raises FileExistsError if held.""" fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644) try: os.write(fd, json.dumps(holder).encode("utf-8")) finally: os.close(fd) def acquire_merge_lease( repo: str, branch: str, work_item_id: str | None = None, task_id: int | None = None ) -> tuple[bool, str]: """Try to acquire the per-repo merge lease. **Non-blocking** (anti-deadlock). Holder identity is the task ``branch`` (stable, one branch per task). Outcomes: * no lease file -> acquire, write metadata -> ``(True, "lease acquired")`` * lease held by self -> idempotent re-acquire (restart/retry) -> ``(True, "lease already held")`` * lease held by other, age < merge_lock_timeout_s -> ``(False, "merge-lock busy")`` * lease held by other, age >= merge_lock_timeout_s -> stale -> reclaim with a ``logger.warning`` (the holder process died without releasing) -> ``(True, ...)`` Never-raise: any unexpected error -> ``(False, "merge-lock busy")`` so the caller DEFERS and retries rather than burning a developer retry on an infra hiccup. """ path = _lease_path(repo) holder = { "branch": branch, "work_item_id": work_item_id, "task_id": task_id, "acquired_at": time.time(), "pid": os.getpid(), } try: try: _write_lease(path, holder) logger.info("merge-lease acquired for %s by %s", repo, branch) return True, "lease acquired" except FileExistsError: pass existing = _read_lease(path) if existing is None: # Corrupt/empty lease file — reclaim it. _force_write_lease(path, holder) logger.warning("merge-lease for %s was corrupt; reclaimed by %s", repo, branch) return True, "lease reclaimed (corrupt)" if existing.get("branch") == branch: return True, "lease already held" age = time.time() - float(existing.get("acquired_at") or 0) if age >= settings.merge_lock_timeout_s: _force_write_lease(path, holder) logger.warning( "merge-lease for %s was stale (age %.0fs >= %ss, holder=%s); reclaimed by %s", repo, age, settings.merge_lock_timeout_s, existing.get("branch"), branch, ) return True, "lease reclaimed (stale)" logger.info( "merge-lease for %s busy (held by %s, age %.0fs); %s defers", repo, existing.get("branch"), age, branch, ) return False, "merge-lock busy" except Exception as e: # noqa: BLE001 - never-raise contract logger.warning("acquire_merge_lease unexpected error for %s/%s: %s", repo, branch, e) return False, "merge-lock busy" def _force_write_lease(path: str, holder: dict) -> None: """Overwrite the lease (used for stale/corrupt reclaim). Best-effort.""" try: with open(path, "w", encoding="utf-8") as f: f.write(json.dumps(holder)) except OSError as e: logger.warning("merge-lease force-write error at %s: %s", path, e) def release_merge_lease(repo: str, branch: str | None = None) -> None: """Release the per-repo merge lease. **Idempotent** and **holder-aware**. If ``branch`` is given, the lease is removed ONLY when the current holder's branch matches (so a delayed release from an already-merged task can never delete a lease a DIFFERENT task acquired afterwards). With ``branch=None`` the release is unconditional (best-effort backstop). Never raises. """ path = _lease_path(repo) try: if branch is not None: existing = _read_lease(path) if existing is not None and existing.get("branch") != branch: logger.info( "merge-lease release skipped for %s: holder=%s != %s", repo, existing.get("branch"), branch, ) return os.remove(path) logger.info("merge-lease released for %s (%s)", repo, branch or "force") except FileNotFoundError: return except OSError as e: logger.warning("merge-lease release error for %s: %s", repo, e)