Files
orchestrator/src/merge_gate.py
claude-bot 00d69d9e27
All checks were successful
CI / test (push) Successful in 15s
CI / test (pull_request) Successful in 17s
feat(merge-gate): auto-rebase onto current main + re-test + serialise merges
Deterministic (no-LLM) sub-gate on the deploy-staging -> deploy edge that
catches a feature branch up to the CURRENT origin/main, re-tests the combined
tree, and serialises merges with a per-repo file lease — so two green parallel
branches can no longer break main (self-hosting safety for the orchestrator repo).

- src/merge_gate.py: branch_is_behind_main, auto_rebase_onto_main (push
  --force-with-lease ONLY the task branch, NEVER main), retest_branch, and a
  file merge-lease (atomic O_CREAT|O_EXCL, holder-aware release, stale reclaim).
  Strict never-raise contract; all git ops in the per-branch worktree.
- src/qg/checks.py: check_branch_mergeable composes the primitives under the
  lease; registered in QG_CHECKS. Conditional rollout (merge_gate_enabled /
  merge_gate_repos, default self-hosting only).
- src/stage_engine.py: sub-gate hook on deploy-staging (not a new stage). PASS ->
  advance; "merge-lock busy" -> DEFER (re-queue with available_at, anti-deadlock
  at max_concurrency=1, capped); conflict/red re-test -> rollback to development
  + developer retry (capped by MAX_DEVELOPER_RETRIES). Lease released on
  deploy->done / rollback / PR-merged webhook.
- src/db.py: enqueue_job(available_at_delay_s=...) for the defer (no schema change).
- src/webhooks/gitea.py: holder-aware lease release on PR-merged.
- src/config.py + .env.example: ORCH_MERGE_* settings.

Docs: README + adr-0006 (architect) already cover the design; CHANGELOG updated.
Tests: test_merge_gate.py, test_qg_merge_gate.py, test_merge_gate_race.py,
test_stage_engine.py::TestMergeGate, test_config.py, QG-registry snapshot.
Full suite: 535 passed.

Refs: ORCH-043

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 17:32:50 +00:00

341 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Merge-gate core (ORCH-043): catch a branch up to the CURRENT origin/main,
re-test it, and serialise merges with a file lease.
Background
----------
The pipeline validates a branch against the ``main`` it was BRANCHED from, not the
``main`` at the moment of merge. Between "branch validated" and "branch merged" a
parallel task may have advanced ``main`` -> a *semantic* merge conflict: git merges
with no textual conflict, yet the combined ``main`` is broken. For the self-hosting
``orchestrator`` repo that means a red ``main`` of the tool serving every project.
This module provides the deterministic (no-LLM) primitives the quality-gate
``check_branch_mergeable`` (src/qg/checks.py) composes on the
``deploy-staging -> deploy`` edge, BEFORE the deployer merges the PR:
* ``branch_is_behind_main`` -> is the branch missing the latest origin/main?
* ``auto_rebase_onto_main`` -> rebase onto origin/main + push --force-with-lease
(ONLY the task branch; NEVER main).
* ``retest_branch`` -> run the project test-suite in the caught-up worktree.
* file lease (``acquire_merge_lease`` / ``release_merge_lease``) -> serialise the
"catch-up + re-test + merge" of ONE repo, held from the gate to the actual merge.
Invariants (self-hosting safety, ТЗ §10):
* NEVER push or force-push ``main`` — the only force op is ``--force-with-lease``
on the task branch.
* All git ops run in the per-branch worktree (ensure_worktree), never the shared clone.
* Every public function honours a strict **never-raise** contract: any git/OS error
-> ``(False, "<reason>")`` (or a safe bool), never a propagated exception.
"""
import json
import logging
import os
import subprocess
import time
from .config import settings
from .git_worktree import ensure_worktree, get_worktree_path
logger = logging.getLogger("orchestrator.merge_gate")
# git sub-command timeouts (seconds). Generous but bounded so a hung git never
# wedges the monitor-thread that runs the gate.
_FETCH_TIMEOUT = 60
_REBASE_TIMEOUT = 120
_PUSH_TIMEOUT = 60
_SHORT_TIMEOUT = 30
# ---------------------------------------------------------------------------
# behind / ancestor detection
# ---------------------------------------------------------------------------
def branch_is_behind_main(repo: str, branch: str) -> bool:
"""Return True iff ``branch`` does NOT already contain the latest origin/main.
A branch is "behind" when ``origin/main`` is **not** an ancestor of the branch
HEAD (``git merge-base --is-ancestor origin/main HEAD`` returns non-zero). All
work happens in the per-branch worktree (ORCH-2 / S-4 isolation).
Never-raise (AC-9 / TC-03): any git/OS failure or an ambiguous result is treated
as "cannot prove the branch is up-to-date" -> return True (force a rebase attempt
rather than merge blindly). It returns a bool, never raises.
"""
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("branch_is_behind_main: worktree error for %s/%s: %s", repo, branch, e)
return True
try:
subprocess.run(
["git", "-C", wt, "fetch", "origin", "main"],
capture_output=True, timeout=_FETCH_TIMEOUT,
)
r = subprocess.run(
["git", "-C", wt, "merge-base", "--is-ancestor", "origin/main", "HEAD"],
capture_output=True, timeout=_SHORT_TIMEOUT,
)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("branch_is_behind_main: git error for %s/%s: %s", repo, branch, e)
return True
if r.returncode == 0:
# origin/main IS an ancestor of HEAD -> branch already up-to-date.
return False
if r.returncode == 1:
# origin/main is NOT an ancestor -> branch is behind.
return True
# Any other code (e.g. bad ref) -> ambiguous; do not merge blindly.
logger.warning(
"branch_is_behind_main: ambiguous merge-base rc=%s for %s/%s (treating as behind)",
r.returncode, repo, branch,
)
return True
def _conflicted_files(wt: str) -> str:
"""Best-effort list of unmerged (conflicting) files in the worktree."""
try:
r = subprocess.run(
["git", "-C", wt, "diff", "--name-only", "--diff-filter=U"],
capture_output=True, text=True, timeout=_SHORT_TIMEOUT,
)
files = r.stdout.strip().replace("\n", ", ")
return files or "unknown"
except (subprocess.SubprocessError, OSError):
return "unknown"
# ---------------------------------------------------------------------------
# auto-rebase onto origin/main
# ---------------------------------------------------------------------------
def auto_rebase_onto_main(repo: str, branch: str) -> tuple[bool, str]:
"""Catch ``branch`` up to ``origin/main`` via rebase, then push it.
Steps (all in the per-branch worktree):
1. ``git fetch origin main``.
2. ``git rebase origin/main``:
- textual conflict (non-zero) -> ``git rebase --abort`` (leave worktree
clean) -> ``(False, "rebase conflict: <files>")`` (AC-3).
3. clean rebase -> ``git push --force-with-lease origin <branch>`` — ONLY the
task branch, NEVER ``main`` (AC-7) -> ``(True, "rebased onto origin/main")``.
Never-raise (AC-9): any git/OS error -> ``(False, "<reason>")``.
"""
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
return False, f"rebase setup error: {e}"
try:
subprocess.run(
["git", "-C", wt, "fetch", "origin", "main"],
capture_output=True, timeout=_FETCH_TIMEOUT,
)
r = subprocess.run(
["git", "-C", wt, "rebase", "origin/main"],
capture_output=True, text=True, timeout=_REBASE_TIMEOUT,
)
if r.returncode != 0:
files = _conflicted_files(wt)
subprocess.run(
["git", "-C", wt, "rebase", "--abort"],
capture_output=True, timeout=_SHORT_TIMEOUT,
)
logger.warning("auto_rebase: conflict on %s/%s: %s", repo, branch, files)
return False, f"rebase conflict: {files}"
# Clean rebase -> push ONLY the task branch with a lease (never main).
p = subprocess.run(
["git", "-C", wt, "push", "--force-with-lease", "origin", branch],
capture_output=True, text=True, timeout=_PUSH_TIMEOUT,
)
if p.returncode != 0:
detail = (p.stderr or p.stdout or "").strip()[:200]
logger.warning("auto_rebase: push failed on %s/%s: %s", repo, branch, detail)
return False, f"push --force-with-lease failed: {detail}"
logger.info("auto_rebase: %s/%s rebased onto origin/main and pushed", repo, branch)
return True, "rebased onto origin/main"
except subprocess.TimeoutExpired:
# Leave no half-finished rebase behind.
try:
subprocess.run(
["git", "-C", wt, "rebase", "--abort"],
capture_output=True, timeout=_SHORT_TIMEOUT,
)
except (subprocess.SubprocessError, OSError):
pass
return False, "rebase timeout"
except (subprocess.SubprocessError, OSError) as e:
return False, f"rebase error: {e}"
# ---------------------------------------------------------------------------
# re-test in the caught-up worktree
# ---------------------------------------------------------------------------
def retest_branch(repo: str, branch: str) -> tuple[bool, str]:
"""Run the project test-suite in the (already caught-up) branch worktree.
Command: ``python -m pytest <merge_retest_target>`` (default ``tests/``),
matching the orchestrator CI / check_tests_local pattern. Bounded by
``settings.merge_retest_timeout_s``.
Returns:
* ``(True, "re-test green")`` — pytest rc == 0
* ``(False, "re-test timeout after <T>s")`` — exceeded the timeout (AC-6)
* ``(False, "re-test failed: ...<tail>")`` — non-zero rc, with output tail
Never-raise (AC-9): any setup/OS error -> ``(False, "<reason>")``.
"""
wt = get_worktree_path(repo, branch)
if not os.path.isdir(wt):
# Caller usually rebased first (worktree exists); ensure as a fallback.
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
return False, f"re-test setup error: {e}"
target = settings.merge_retest_target or "tests/"
timeout = settings.merge_retest_timeout_s
try:
r = subprocess.run(
["python", "-m", "pytest", target, "-q"],
cwd=wt, capture_output=True, text=True, timeout=timeout,
)
except subprocess.TimeoutExpired:
logger.warning("retest_branch: timeout (%ss) on %s/%s", timeout, repo, branch)
return False, f"re-test timeout after {timeout}s"
except (subprocess.SubprocessError, OSError) as e:
return False, f"re-test error: {e}"
if r.returncode == 0:
return True, "re-test green"
tail = ((r.stdout or "") + (r.stderr or ""))[-500:]
logger.warning("retest_branch: red on %s/%s", repo, branch)
return False, f"re-test failed: ...{tail}"
# ---------------------------------------------------------------------------
# merge-lease (serialise catch-up + re-test + merge per repo)
# ---------------------------------------------------------------------------
def _lease_path(repo: str) -> str:
"""Filesystem path of the per-repo merge lease (no schema change, ТЗ §4)."""
return os.path.join(settings.repos_dir, f".merge-lease-{repo}.json")
def _read_lease(path: str) -> dict | None:
"""Read+parse the lease file; None if missing or corrupt (never-raise)."""
try:
with open(path, "r", encoding="utf-8") as f:
return json.loads(f.read())
except FileNotFoundError:
return None
except (OSError, ValueError) as e:
logger.warning("merge-lease read error at %s: %s", path, e)
return None
def _write_lease(path: str, holder: dict) -> None:
"""Atomically (O_CREAT|O_EXCL) write the lease; raises FileExistsError if held."""
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
try:
os.write(fd, json.dumps(holder).encode("utf-8"))
finally:
os.close(fd)
def acquire_merge_lease(
repo: str, branch: str, work_item_id: str | None = None, task_id: int | None = None
) -> tuple[bool, str]:
"""Try to acquire the per-repo merge lease. **Non-blocking** (anti-deadlock).
Holder identity is the task ``branch`` (stable, one branch per task). Outcomes:
* no lease file -> acquire, write metadata -> ``(True, "lease acquired")``
* lease held by self -> idempotent re-acquire (restart/retry) -> ``(True, "lease already held")``
* lease held by other, age < merge_lock_timeout_s -> ``(False, "merge-lock busy")``
* lease held by other, age >= merge_lock_timeout_s -> stale -> reclaim with a
``logger.warning`` (the holder process died without releasing) -> ``(True, ...)``
Never-raise: any unexpected error -> ``(False, "merge-lock busy")`` so the caller
DEFERS and retries rather than burning a developer retry on an infra hiccup.
"""
path = _lease_path(repo)
holder = {
"branch": branch,
"work_item_id": work_item_id,
"task_id": task_id,
"acquired_at": time.time(),
"pid": os.getpid(),
}
try:
try:
_write_lease(path, holder)
logger.info("merge-lease acquired for %s by %s", repo, branch)
return True, "lease acquired"
except FileExistsError:
pass
existing = _read_lease(path)
if existing is None:
# Corrupt/empty lease file — reclaim it.
_force_write_lease(path, holder)
logger.warning("merge-lease for %s was corrupt; reclaimed by %s", repo, branch)
return True, "lease reclaimed (corrupt)"
if existing.get("branch") == branch:
return True, "lease already held"
age = time.time() - float(existing.get("acquired_at") or 0)
if age >= settings.merge_lock_timeout_s:
_force_write_lease(path, holder)
logger.warning(
"merge-lease for %s was stale (age %.0fs >= %ss, holder=%s); reclaimed by %s",
repo, age, settings.merge_lock_timeout_s, existing.get("branch"), branch,
)
return True, "lease reclaimed (stale)"
logger.info(
"merge-lease for %s busy (held by %s, age %.0fs); %s defers",
repo, existing.get("branch"), age, branch,
)
return False, "merge-lock busy"
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("acquire_merge_lease unexpected error for %s/%s: %s", repo, branch, e)
return False, "merge-lock busy"
def _force_write_lease(path: str, holder: dict) -> None:
"""Overwrite the lease (used for stale/corrupt reclaim). Best-effort."""
try:
with open(path, "w", encoding="utf-8") as f:
f.write(json.dumps(holder))
except OSError as e:
logger.warning("merge-lease force-write error at %s: %s", path, e)
def release_merge_lease(repo: str, branch: str | None = None) -> None:
"""Release the per-repo merge lease. **Idempotent** and **holder-aware**.
If ``branch`` is given, the lease is removed ONLY when the current holder's
branch matches (so a delayed release from an already-merged task can never
delete a lease a DIFFERENT task acquired afterwards). With ``branch=None`` the
release is unconditional (best-effort backstop). Never raises.
"""
path = _lease_path(repo)
try:
if branch is not None:
existing = _read_lease(path)
if existing is not None and existing.get("branch") != branch:
logger.info(
"merge-lease release skipped for %s: holder=%s != %s",
repo, existing.get("branch"), branch,
)
return
os.remove(path)
logger.info("merge-lease released for %s (%s)", repo, branch or "force")
except FileNotFoundError:
return
except OSError as e:
logger.warning("merge-lease release error for %s: %s", repo, e)