feat(merge-gate): auto-rebase onto current main + re-test + serialise merges
All checks were successful
CI / test (push) Successful in 15s
CI / test (pull_request) Successful in 17s

Deterministic (no-LLM) sub-gate on the deploy-staging -> deploy edge that
catches a feature branch up to the CURRENT origin/main, re-tests the combined
tree, and serialises merges with a per-repo file lease — so two green parallel
branches can no longer break main (self-hosting safety for the orchestrator repo).

- src/merge_gate.py: branch_is_behind_main, auto_rebase_onto_main (push
  --force-with-lease ONLY the task branch, NEVER main), retest_branch, and a
  file merge-lease (atomic O_CREAT|O_EXCL, holder-aware release, stale reclaim).
  Strict never-raise contract; all git ops in the per-branch worktree.
- src/qg/checks.py: check_branch_mergeable composes the primitives under the
  lease; registered in QG_CHECKS. Conditional rollout (merge_gate_enabled /
  merge_gate_repos, default self-hosting only).
- src/stage_engine.py: sub-gate hook on deploy-staging (not a new stage). PASS ->
  advance; "merge-lock busy" -> DEFER (re-queue with available_at, anti-deadlock
  at max_concurrency=1, capped); conflict/red re-test -> rollback to development
  + developer retry (capped by MAX_DEVELOPER_RETRIES). Lease released on
  deploy->done / rollback / PR-merged webhook.
- src/db.py: enqueue_job(available_at_delay_s=...) for the defer (no schema change).
- src/webhooks/gitea.py: holder-aware lease release on PR-merged.
- src/config.py + .env.example: ORCH_MERGE_* settings.

Docs: README + adr-0006 (architect) already cover the design; CHANGELOG updated.
Tests: test_merge_gate.py, test_qg_merge_gate.py, test_merge_gate_race.py,
test_stage_engine.py::TestMergeGate, test_config.py, QG-registry snapshot.
Full suite: 535 passed.

Refs: ORCH-043

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-06 17:32:50 +00:00
parent ad1589084b
commit 00d69d9e27
14 changed files with 1565 additions and 5 deletions

340
src/merge_gate.py Normal file
View File

@@ -0,0 +1,340 @@
"""Merge-gate core (ORCH-043): catch a branch up to the CURRENT origin/main,
re-test it, and serialise merges with a file lease.
Background
----------
The pipeline validates a branch against the ``main`` it was BRANCHED from, not the
``main`` at the moment of merge. Between "branch validated" and "branch merged" a
parallel task may have advanced ``main`` -> a *semantic* merge conflict: git merges
with no textual conflict, yet the combined ``main`` is broken. For the self-hosting
``orchestrator`` repo that means a red ``main`` of the tool serving every project.
This module provides the deterministic (no-LLM) primitives the quality-gate
``check_branch_mergeable`` (src/qg/checks.py) composes on the
``deploy-staging -> deploy`` edge, BEFORE the deployer merges the PR:
* ``branch_is_behind_main`` -> is the branch missing the latest origin/main?
* ``auto_rebase_onto_main`` -> rebase onto origin/main + push --force-with-lease
(ONLY the task branch; NEVER main).
* ``retest_branch`` -> run the project test-suite in the caught-up worktree.
* file lease (``acquire_merge_lease`` / ``release_merge_lease``) -> serialise the
"catch-up + re-test + merge" of ONE repo, held from the gate to the actual merge.
Invariants (self-hosting safety, ТЗ §10):
* NEVER push or force-push ``main`` — the only force op is ``--force-with-lease``
on the task branch.
* All git ops run in the per-branch worktree (ensure_worktree), never the shared clone.
* Every public function honours a strict **never-raise** contract: any git/OS error
-> ``(False, "<reason>")`` (or a safe bool), never a propagated exception.
"""
import json
import logging
import os
import subprocess
import time
from .config import settings
from .git_worktree import ensure_worktree, get_worktree_path
logger = logging.getLogger("orchestrator.merge_gate")
# git sub-command timeouts (seconds). Generous but bounded so a hung git never
# wedges the monitor-thread that runs the gate.
_FETCH_TIMEOUT = 60
_REBASE_TIMEOUT = 120
_PUSH_TIMEOUT = 60
_SHORT_TIMEOUT = 30
# ---------------------------------------------------------------------------
# behind / ancestor detection
# ---------------------------------------------------------------------------
def branch_is_behind_main(repo: str, branch: str) -> bool:
"""Return True iff ``branch`` does NOT already contain the latest origin/main.
A branch is "behind" when ``origin/main`` is **not** an ancestor of the branch
HEAD (``git merge-base --is-ancestor origin/main HEAD`` returns non-zero). All
work happens in the per-branch worktree (ORCH-2 / S-4 isolation).
Never-raise (AC-9 / TC-03): any git/OS failure or an ambiguous result is treated
as "cannot prove the branch is up-to-date" -> return True (force a rebase attempt
rather than merge blindly). It returns a bool, never raises.
"""
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("branch_is_behind_main: worktree error for %s/%s: %s", repo, branch, e)
return True
try:
subprocess.run(
["git", "-C", wt, "fetch", "origin", "main"],
capture_output=True, timeout=_FETCH_TIMEOUT,
)
r = subprocess.run(
["git", "-C", wt, "merge-base", "--is-ancestor", "origin/main", "HEAD"],
capture_output=True, timeout=_SHORT_TIMEOUT,
)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("branch_is_behind_main: git error for %s/%s: %s", repo, branch, e)
return True
if r.returncode == 0:
# origin/main IS an ancestor of HEAD -> branch already up-to-date.
return False
if r.returncode == 1:
# origin/main is NOT an ancestor -> branch is behind.
return True
# Any other code (e.g. bad ref) -> ambiguous; do not merge blindly.
logger.warning(
"branch_is_behind_main: ambiguous merge-base rc=%s for %s/%s (treating as behind)",
r.returncode, repo, branch,
)
return True
def _conflicted_files(wt: str) -> str:
"""Best-effort list of unmerged (conflicting) files in the worktree."""
try:
r = subprocess.run(
["git", "-C", wt, "diff", "--name-only", "--diff-filter=U"],
capture_output=True, text=True, timeout=_SHORT_TIMEOUT,
)
files = r.stdout.strip().replace("\n", ", ")
return files or "unknown"
except (subprocess.SubprocessError, OSError):
return "unknown"
# ---------------------------------------------------------------------------
# auto-rebase onto origin/main
# ---------------------------------------------------------------------------
def auto_rebase_onto_main(repo: str, branch: str) -> tuple[bool, str]:
"""Catch ``branch`` up to ``origin/main`` via rebase, then push it.
Steps (all in the per-branch worktree):
1. ``git fetch origin main``.
2. ``git rebase origin/main``:
- textual conflict (non-zero) -> ``git rebase --abort`` (leave worktree
clean) -> ``(False, "rebase conflict: <files>")`` (AC-3).
3. clean rebase -> ``git push --force-with-lease origin <branch>`` — ONLY the
task branch, NEVER ``main`` (AC-7) -> ``(True, "rebased onto origin/main")``.
Never-raise (AC-9): any git/OS error -> ``(False, "<reason>")``.
"""
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
return False, f"rebase setup error: {e}"
try:
subprocess.run(
["git", "-C", wt, "fetch", "origin", "main"],
capture_output=True, timeout=_FETCH_TIMEOUT,
)
r = subprocess.run(
["git", "-C", wt, "rebase", "origin/main"],
capture_output=True, text=True, timeout=_REBASE_TIMEOUT,
)
if r.returncode != 0:
files = _conflicted_files(wt)
subprocess.run(
["git", "-C", wt, "rebase", "--abort"],
capture_output=True, timeout=_SHORT_TIMEOUT,
)
logger.warning("auto_rebase: conflict on %s/%s: %s", repo, branch, files)
return False, f"rebase conflict: {files}"
# Clean rebase -> push ONLY the task branch with a lease (never main).
p = subprocess.run(
["git", "-C", wt, "push", "--force-with-lease", "origin", branch],
capture_output=True, text=True, timeout=_PUSH_TIMEOUT,
)
if p.returncode != 0:
detail = (p.stderr or p.stdout or "").strip()[:200]
logger.warning("auto_rebase: push failed on %s/%s: %s", repo, branch, detail)
return False, f"push --force-with-lease failed: {detail}"
logger.info("auto_rebase: %s/%s rebased onto origin/main and pushed", repo, branch)
return True, "rebased onto origin/main"
except subprocess.TimeoutExpired:
# Leave no half-finished rebase behind.
try:
subprocess.run(
["git", "-C", wt, "rebase", "--abort"],
capture_output=True, timeout=_SHORT_TIMEOUT,
)
except (subprocess.SubprocessError, OSError):
pass
return False, "rebase timeout"
except (subprocess.SubprocessError, OSError) as e:
return False, f"rebase error: {e}"
# ---------------------------------------------------------------------------
# re-test in the caught-up worktree
# ---------------------------------------------------------------------------
def retest_branch(repo: str, branch: str) -> tuple[bool, str]:
"""Run the project test-suite in the (already caught-up) branch worktree.
Command: ``python -m pytest <merge_retest_target>`` (default ``tests/``),
matching the orchestrator CI / check_tests_local pattern. Bounded by
``settings.merge_retest_timeout_s``.
Returns:
* ``(True, "re-test green")`` — pytest rc == 0
* ``(False, "re-test timeout after <T>s")`` — exceeded the timeout (AC-6)
* ``(False, "re-test failed: ...<tail>")`` — non-zero rc, with output tail
Never-raise (AC-9): any setup/OS error -> ``(False, "<reason>")``.
"""
wt = get_worktree_path(repo, branch)
if not os.path.isdir(wt):
# Caller usually rebased first (worktree exists); ensure as a fallback.
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
return False, f"re-test setup error: {e}"
target = settings.merge_retest_target or "tests/"
timeout = settings.merge_retest_timeout_s
try:
r = subprocess.run(
["python", "-m", "pytest", target, "-q"],
cwd=wt, capture_output=True, text=True, timeout=timeout,
)
except subprocess.TimeoutExpired:
logger.warning("retest_branch: timeout (%ss) on %s/%s", timeout, repo, branch)
return False, f"re-test timeout after {timeout}s"
except (subprocess.SubprocessError, OSError) as e:
return False, f"re-test error: {e}"
if r.returncode == 0:
return True, "re-test green"
tail = ((r.stdout or "") + (r.stderr or ""))[-500:]
logger.warning("retest_branch: red on %s/%s", repo, branch)
return False, f"re-test failed: ...{tail}"
# ---------------------------------------------------------------------------
# merge-lease (serialise catch-up + re-test + merge per repo)
# ---------------------------------------------------------------------------
def _lease_path(repo: str) -> str:
"""Filesystem path of the per-repo merge lease (no schema change, ТЗ §4)."""
return os.path.join(settings.repos_dir, f".merge-lease-{repo}.json")
def _read_lease(path: str) -> dict | None:
"""Read+parse the lease file; None if missing or corrupt (never-raise)."""
try:
with open(path, "r", encoding="utf-8") as f:
return json.loads(f.read())
except FileNotFoundError:
return None
except (OSError, ValueError) as e:
logger.warning("merge-lease read error at %s: %s", path, e)
return None
def _write_lease(path: str, holder: dict) -> None:
"""Atomically (O_CREAT|O_EXCL) write the lease; raises FileExistsError if held."""
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
try:
os.write(fd, json.dumps(holder).encode("utf-8"))
finally:
os.close(fd)
def acquire_merge_lease(
repo: str, branch: str, work_item_id: str | None = None, task_id: int | None = None
) -> tuple[bool, str]:
"""Try to acquire the per-repo merge lease. **Non-blocking** (anti-deadlock).
Holder identity is the task ``branch`` (stable, one branch per task). Outcomes:
* no lease file -> acquire, write metadata -> ``(True, "lease acquired")``
* lease held by self -> idempotent re-acquire (restart/retry) -> ``(True, "lease already held")``
* lease held by other, age < merge_lock_timeout_s -> ``(False, "merge-lock busy")``
* lease held by other, age >= merge_lock_timeout_s -> stale -> reclaim with a
``logger.warning`` (the holder process died without releasing) -> ``(True, ...)``
Never-raise: any unexpected error -> ``(False, "merge-lock busy")`` so the caller
DEFERS and retries rather than burning a developer retry on an infra hiccup.
"""
path = _lease_path(repo)
holder = {
"branch": branch,
"work_item_id": work_item_id,
"task_id": task_id,
"acquired_at": time.time(),
"pid": os.getpid(),
}
try:
try:
_write_lease(path, holder)
logger.info("merge-lease acquired for %s by %s", repo, branch)
return True, "lease acquired"
except FileExistsError:
pass
existing = _read_lease(path)
if existing is None:
# Corrupt/empty lease file — reclaim it.
_force_write_lease(path, holder)
logger.warning("merge-lease for %s was corrupt; reclaimed by %s", repo, branch)
return True, "lease reclaimed (corrupt)"
if existing.get("branch") == branch:
return True, "lease already held"
age = time.time() - float(existing.get("acquired_at") or 0)
if age >= settings.merge_lock_timeout_s:
_force_write_lease(path, holder)
logger.warning(
"merge-lease for %s was stale (age %.0fs >= %ss, holder=%s); reclaimed by %s",
repo, age, settings.merge_lock_timeout_s, existing.get("branch"), branch,
)
return True, "lease reclaimed (stale)"
logger.info(
"merge-lease for %s busy (held by %s, age %.0fs); %s defers",
repo, existing.get("branch"), age, branch,
)
return False, "merge-lock busy"
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("acquire_merge_lease unexpected error for %s/%s: %s", repo, branch, e)
return False, "merge-lock busy"
def _force_write_lease(path: str, holder: dict) -> None:
"""Overwrite the lease (used for stale/corrupt reclaim). Best-effort."""
try:
with open(path, "w", encoding="utf-8") as f:
f.write(json.dumps(holder))
except OSError as e:
logger.warning("merge-lease force-write error at %s: %s", path, e)
def release_merge_lease(repo: str, branch: str | None = None) -> None:
"""Release the per-repo merge lease. **Idempotent** and **holder-aware**.
If ``branch`` is given, the lease is removed ONLY when the current holder's
branch matches (so a delayed release from an already-merged task can never
delete a lease a DIFFERENT task acquired afterwards). With ``branch=None`` the
release is unconditional (best-effort backstop). Never raises.
"""
path = _lease_path(repo)
try:
if branch is not None:
existing = _read_lease(path)
if existing is not None and existing.get("branch") != branch:
logger.info(
"merge-lease release skipped for %s: holder=%s != %s",
repo, existing.get("branch"), branch,
)
return
os.remove(path)
logger.info("merge-lease released for %s (%s)", repo, branch or "force")
except FileNotFoundError:
return
except OSError as e:
logger.warning("merge-lease release error for %s: %s", repo, e)