Files
orchestrator/src/merge_gate.py

678 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Merge-gate core (ORCH-043): catch a branch up to the CURRENT origin/main,
re-test it, and serialise merges with a file lease.
Background
----------
The pipeline validates a branch against the ``main`` it was BRANCHED from, not the
``main`` at the moment of merge. Between "branch validated" and "branch merged" a
parallel task may have advanced ``main`` -> a *semantic* merge conflict: git merges
with no textual conflict, yet the combined ``main`` is broken. For the self-hosting
``orchestrator`` repo that means a red ``main`` of the tool serving every project.
This module provides the deterministic (no-LLM) primitives the quality-gate
``check_branch_mergeable`` (src/qg/checks.py) composes on the
``deploy-staging -> deploy`` edge, BEFORE the deployer merges the PR:
* ``branch_is_behind_main`` -> is the branch missing the latest origin/main?
* ``auto_rebase_onto_main`` -> rebase onto origin/main + push --force-with-lease
(ONLY the task branch; NEVER main).
* ``retest_branch`` -> run the project test-suite in the caught-up worktree.
* file lease (``acquire_merge_lease`` / ``release_merge_lease``) -> serialise the
"catch-up + re-test + merge" of ONE repo, held from the gate to the actual merge.
Invariants (self-hosting safety, ТЗ §10):
* NEVER push or force-push ``main`` — the only force op is ``--force-with-lease``
on the task branch.
* All git ops run in the per-branch worktree (ensure_worktree), never the shared clone.
* Every public function honours a strict **never-raise** contract: any git/OS error
-> ``(False, "<reason>")`` (or a safe bool), never a propagated exception.
"""
import json
import logging
import os
import subprocess
import time
from .config import settings
from .git_worktree import ensure_worktree, get_worktree_path
logger = logging.getLogger("orchestrator.merge_gate")
# git sub-command timeouts (seconds). Generous but bounded so a hung git never
# wedges the monitor-thread that runs the gate.
_FETCH_TIMEOUT = 60
_REBASE_TIMEOUT = 120
_PUSH_TIMEOUT = 60
_SHORT_TIMEOUT = 30
# ---------------------------------------------------------------------------
# behind / ancestor detection
# ---------------------------------------------------------------------------
def branch_is_behind_main(repo: str, branch: str) -> bool:
"""Return True iff ``branch`` does NOT already contain the latest origin/main.
A branch is "behind" when ``origin/main`` is **not** an ancestor of the branch
HEAD (``git merge-base --is-ancestor origin/main HEAD`` returns non-zero). All
work happens in the per-branch worktree (ORCH-2 / S-4 isolation).
Never-raise (AC-9 / TC-03): any git/OS failure or an ambiguous result is treated
as "cannot prove the branch is up-to-date" -> return True (force a rebase attempt
rather than merge blindly). It returns a bool, never raises.
"""
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("branch_is_behind_main: worktree error for %s/%s: %s", repo, branch, e)
return True
try:
subprocess.run(
["git", "-C", wt, "fetch", "origin", "main"],
capture_output=True, timeout=_FETCH_TIMEOUT,
)
r = subprocess.run(
["git", "-C", wt, "merge-base", "--is-ancestor", "origin/main", "HEAD"],
capture_output=True, timeout=_SHORT_TIMEOUT,
)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("branch_is_behind_main: git error for %s/%s: %s", repo, branch, e)
return True
if r.returncode == 0:
# origin/main IS an ancestor of HEAD -> branch already up-to-date.
return False
if r.returncode == 1:
# origin/main is NOT an ancestor -> branch is behind.
return True
# Any other code (e.g. bad ref) -> ambiguous; do not merge blindly.
logger.warning(
"branch_is_behind_main: ambiguous merge-base rc=%s for %s/%s (treating as behind)",
r.returncode, repo, branch,
)
return True
def _conflicted_files(wt: str) -> str:
"""Best-effort list of unmerged (conflicting) files in the worktree."""
try:
r = subprocess.run(
["git", "-C", wt, "diff", "--name-only", "--diff-filter=U"],
capture_output=True, text=True, timeout=_SHORT_TIMEOUT,
)
files = r.stdout.strip().replace("\n", ", ")
return files or "unknown"
except (subprocess.SubprocessError, OSError):
return "unknown"
# ---------------------------------------------------------------------------
# auto-rebase onto origin/main
# ---------------------------------------------------------------------------
def auto_rebase_onto_main(repo: str, branch: str) -> tuple[bool, str]:
"""Catch ``branch`` up to ``origin/main`` via rebase, then push it.
Steps (all in the per-branch worktree):
1. ``git fetch origin main``.
2. ``git rebase origin/main``:
- textual conflict (non-zero) -> ``git rebase --abort`` (leave worktree
clean) -> ``(False, "rebase conflict: <files>")`` (AC-3).
3. clean rebase -> ``git push --force-with-lease origin <branch>`` — ONLY the
task branch, NEVER ``main`` (AC-7) -> ``(True, "rebased onto origin/main")``.
Never-raise (AC-9): any git/OS error -> ``(False, "<reason>")``.
"""
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
return False, f"rebase setup error: {e}"
try:
subprocess.run(
["git", "-C", wt, "fetch", "origin", "main"],
capture_output=True, timeout=_FETCH_TIMEOUT,
)
r = subprocess.run(
["git", "-C", wt, "rebase", "origin/main"],
capture_output=True, text=True, timeout=_REBASE_TIMEOUT,
)
if r.returncode != 0:
files = _conflicted_files(wt)
subprocess.run(
["git", "-C", wt, "rebase", "--abort"],
capture_output=True, timeout=_SHORT_TIMEOUT,
)
logger.warning("auto_rebase: conflict on %s/%s: %s", repo, branch, files)
return False, f"rebase conflict: {files}"
# Clean rebase -> push ONLY the task branch with a lease (never main).
p = subprocess.run(
["git", "-C", wt, "push", "--force-with-lease", "origin", branch],
capture_output=True, text=True, timeout=_PUSH_TIMEOUT,
)
if p.returncode != 0:
detail = (p.stderr or p.stdout or "").strip()[:200]
logger.warning("auto_rebase: push failed on %s/%s: %s", repo, branch, detail)
return False, f"push --force-with-lease failed: {detail}"
logger.info("auto_rebase: %s/%s rebased onto origin/main and pushed", repo, branch)
return True, "rebased onto origin/main"
except subprocess.TimeoutExpired:
# Leave no half-finished rebase behind.
try:
subprocess.run(
["git", "-C", wt, "rebase", "--abort"],
capture_output=True, timeout=_SHORT_TIMEOUT,
)
except (subprocess.SubprocessError, OSError):
pass
return False, "rebase timeout"
except (subprocess.SubprocessError, OSError) as e:
return False, f"rebase error: {e}"
# ---------------------------------------------------------------------------
# re-test in the caught-up worktree
# ---------------------------------------------------------------------------
def retest_branch(repo: str, branch: str) -> tuple[bool, str]:
"""Run the project test-suite in the (already caught-up) branch worktree.
Command: ``python -m pytest <merge_retest_target>`` (default ``tests/``),
matching the orchestrator CI / check_tests_local pattern. Bounded by
``settings.merge_retest_timeout_s``.
Returns:
* ``(True, "re-test green")`` — pytest rc == 0
* ``(False, "re-test timeout after <T>s")`` — exceeded the timeout (AC-6)
* ``(False, "re-test failed: ...<tail>")`` — non-zero rc, with output tail
Never-raise (AC-9): any setup/OS error -> ``(False, "<reason>")``.
"""
wt = get_worktree_path(repo, branch)
if not os.path.isdir(wt):
# Caller usually rebased first (worktree exists); ensure as a fallback.
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
return False, f"re-test setup error: {e}"
target = settings.merge_retest_target or "tests/"
timeout = settings.merge_retest_timeout_s
try:
r = subprocess.run(
["python", "-m", "pytest", target, "-q"],
cwd=wt, capture_output=True, text=True, timeout=timeout,
)
except subprocess.TimeoutExpired:
logger.warning("retest_branch: timeout (%ss) on %s/%s", timeout, repo, branch)
return False, f"re-test timeout after {timeout}s"
except (subprocess.SubprocessError, OSError) as e:
return False, f"re-test error: {e}"
if r.returncode == 0:
return True, "re-test green"
tail = ((r.stdout or "") + (r.stderr or ""))[-500:]
logger.warning("retest_branch: red on %s/%s", repo, branch)
return False, f"re-test failed: ...{tail}"
# ---------------------------------------------------------------------------
# merge-lease (serialise catch-up + re-test + merge per repo)
# ---------------------------------------------------------------------------
def _lease_path(repo: str) -> str:
"""Filesystem path of the per-repo merge lease (no schema change, ТЗ §4)."""
return os.path.join(settings.repos_dir, f".merge-lease-{repo}.json")
def _read_lease(path: str) -> dict | None:
"""Read+parse the lease file; None if missing or corrupt (never-raise)."""
try:
with open(path, "r", encoding="utf-8") as f:
return json.loads(f.read())
except FileNotFoundError:
return None
except (OSError, ValueError) as e:
logger.warning("merge-lease read error at %s: %s", path, e)
return None
def _write_lease(path: str, holder: dict) -> None:
"""Atomically (O_CREAT|O_EXCL) write the lease; raises FileExistsError if held."""
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
try:
os.write(fd, json.dumps(holder).encode("utf-8"))
finally:
os.close(fd)
def acquire_merge_lease(
repo: str, branch: str, work_item_id: str | None = None, task_id: int | None = None
) -> tuple[bool, str]:
"""Try to acquire the per-repo merge lease. **Non-blocking** (anti-deadlock).
Holder identity is the task ``branch`` (stable, one branch per task). Outcomes:
* no lease file -> acquire, write metadata -> ``(True, "lease acquired")``
* lease held by self -> idempotent re-acquire (restart/retry) -> ``(True, "lease already held")``
* lease held by other, age < merge_lock_timeout_s -> ``(False, "merge-lock busy")``
* lease held by other, age >= merge_lock_timeout_s -> stale -> reclaim with a
``logger.warning`` (the holder process died without releasing) -> ``(True, ...)``
Never-raise: any unexpected error -> ``(False, "merge-lock busy")`` so the caller
DEFERS and retries rather than burning a developer retry on an infra hiccup.
"""
path = _lease_path(repo)
holder = {
"branch": branch,
"work_item_id": work_item_id,
"task_id": task_id,
"acquired_at": time.time(),
"pid": os.getpid(),
}
try:
try:
_write_lease(path, holder)
logger.info("merge-lease acquired for %s by %s", repo, branch)
return True, "lease acquired"
except FileExistsError:
pass
existing = _read_lease(path)
if existing is None:
# Corrupt/empty lease file — reclaim it.
_force_write_lease(path, holder)
logger.warning("merge-lease for %s was corrupt; reclaimed by %s", repo, branch)
return True, "lease reclaimed (corrupt)"
if existing.get("branch") == branch:
return True, "lease already held"
age = time.time() - float(existing.get("acquired_at") or 0)
if age >= settings.merge_lock_timeout_s:
_force_write_lease(path, holder)
logger.warning(
"merge-lease for %s was stale (age %.0fs >= %ss, holder=%s); reclaimed by %s",
repo, age, settings.merge_lock_timeout_s, existing.get("branch"), branch,
)
return True, "lease reclaimed (stale)"
logger.info(
"merge-lease for %s busy (held by %s, age %.0fs); %s defers",
repo, existing.get("branch"), age, branch,
)
return False, "merge-lock busy"
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("acquire_merge_lease unexpected error for %s/%s: %s", repo, branch, e)
return False, "merge-lock busy"
def _force_write_lease(path: str, holder: dict) -> None:
"""Overwrite the lease (used for stale/corrupt reclaim). Best-effort."""
try:
with open(path, "w", encoding="utf-8") as f:
f.write(json.dumps(holder))
except OSError as e:
logger.warning("merge-lease force-write error at %s: %s", path, e)
def release_merge_lease(repo: str, branch: str | None = None) -> None:
"""Release the per-repo merge lease. **Idempotent** and **holder-aware**.
If ``branch`` is given, the lease is removed ONLY when the current holder's
branch matches (so a delayed release from an already-merged task can never
delete a lease a DIFFERENT task acquired afterwards). With ``branch=None`` the
release is unconditional (best-effort backstop). Never raises.
"""
path = _lease_path(repo)
try:
if branch is not None:
existing = _read_lease(path)
if existing is not None and existing.get("branch") != branch:
logger.info(
"merge-lease release skipped for %s: holder=%s != %s",
repo, existing.get("branch"), branch,
)
return
os.remove(path)
logger.info("merge-lease released for %s (%s)", repo, branch or "force")
except FileNotFoundError:
return
except OSError as e:
logger.warning("merge-lease release error for %s: %s", repo, e)
# ---------------------------------------------------------------------------
# ORCH-065: proactive stale/dead merge-lease reclaim (Problem B)
# ---------------------------------------------------------------------------
def pid_alive(pid) -> bool:
"""Return True iff process ``pid`` is alive (``os.kill(pid, 0)`` liveness probe).
Semantics (ADR-001 Р-2, never-raise):
* ``ProcessLookupError`` -> the process is gone -> ``False`` (reclaimable).
* ``PermissionError`` -> the pid exists but is owned by another user ->
``True`` (alive; conservatively do NOT reclaim).
* missing / invalid pid -> ``True`` (conservative: a lease that predates the
pid field, or a malformed pid, is NOT reclaimed on the liveness signal —
the TTL backstop still catches it).
Never raises; any unexpected OS/type error -> conservative ``True``.
"""
if not pid:
return True
try:
os.kill(int(pid), 0)
return True
except ProcessLookupError:
return False
except PermissionError:
return True
except (OSError, ValueError, TypeError):
return True
def _lease_reclaim_applies(repo: str) -> bool:
"""Whether proactive lease-reclaim is REAL for ``repo`` (same scope as merge-gate).
Reuses ``qg.checks._merge_gate_applies`` (``merge_gate_repos`` CSV, else the
self-hosting ``orchestrator``) so reclaim and the gate share one predicate
(ADR-001 Р-2 / FR-2.4). Imported lazily to avoid an import cycle (qg.checks
imports merge_gate lazily inside ``check_branch_mergeable``). Never raises:
any error -> ``False`` (no-op, the safe default).
"""
try:
from .qg.checks import _merge_gate_applies
return _merge_gate_applies(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("lease-reclaim applicability check failed for %s: %s", repo, e)
return False
def reclaim_stale_lease(repo: str) -> bool:
"""Proactively reclaim a dead/stale merge-lease for ``repo`` (ADR-001 Р-2).
Unlike the lazy TTL reclaim inside ``acquire_merge_lease`` (which only fires
when ANOTHER task tries to acquire), this releases the lease as soon as the
holder is provably gone — without waiting for the TTL or a foreign acquire:
* holder pid is dead (``pid_alive`` is False) -> reclaim, OR
* lease age >= ``merge_lock_timeout_s`` (TTL) -> reclaim (AC-7).
A LIVE holder within its TTL is never touched (AC-8 — protects a legitimate
in-flight merge). Reclaim is holder-aware (``release_merge_lease(repo,
branch=holder)``) so it can never delete a lease a different task acquired in
the meantime. Conditional (FR-2.4): real only for ``merge_gate_repos`` /
self-hosting; other repos -> no-op. Kill-switch ``lease_reclaim_enabled``.
Returns True iff a lease was reclaimed. Never raises (AC-9): any read/remove
error is logged and swallowed so a single bad lease never kills the reaper
thread. Does NOT run any git operation — only the lease file is removed.
"""
try:
if not settings.lease_reclaim_enabled:
return False
if not _lease_reclaim_applies(repo):
return False
path = _lease_path(repo)
existing = _read_lease(path)
if existing is None:
return False # no lease (or unreadable -> _read_lease already logged)
holder = existing.get("branch")
pid = existing.get("pid")
age = time.time() - float(existing.get("acquired_at") or 0)
dead = not pid_alive(pid)
expired = age >= settings.merge_lock_timeout_s
if not (dead or expired):
return False # live holder within TTL -> protect legitimate merge
why = f"dead pid={pid}" if dead else f"stale age={age:.0f}s>=TTL"
release_merge_lease(repo, branch=holder)
logger.warning(
"merge-lease for %s reclaimed proactively (%s, holder=%s)",
repo, why, holder,
)
try:
from .notifications import send_telegram
send_telegram(
f"\U0001f527 merge-lease для {repo} освобождён проактивно "
f"({why}, holder={holder})"
)
except Exception as e: # noqa: BLE001 - telegram best-effort, never fatal
logger.warning("lease-reclaim telegram failed for %s: %s", repo, e)
return True
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("reclaim_stale_lease unexpected error for %s: %s", repo, e)
return False
# ---------------------------------------------------------------------------
# ORCH-065: idempotent merge finalization guard (Problem C)
# ---------------------------------------------------------------------------
def pr_already_merged(repo: str, branch: str) -> bool:
"""Return True iff the PR for ``branch`` is ALREADY merged (ADR-001 Р-3, FR-3.2).
A deterministic, read-only guard the merge path consults BEFORE attempting a
(second) merge so a re-driven / reaped task is idempotent: an already-merged
PR -> no-op, never a duplicate merge and never an error. This is the ONLY new
merge-related helper and it does NOT merge — it only READS the PR state via
the existing Gitea client, so it does not introduce duplicate merge logic.
Consultation point: the actual merge actor is the **deployer agent** (it merges
the feature PR at the start of the ``deploy`` stage — see webhooks/gitea.py),
so the wiring lives in the deployer prompt (``.openclaw/agents/deployer.md``),
which runs this exact function before any (re-)merge. The merge-gate quality
check (``qg.checks.check_branch_mergeable``) is intentionally NOT modified
(ORCH-065 AC-13: ``check_*`` behaviour unchanged) — it runs on the FIRST
deploy-staging -> deploy edge and does not re-run on a ``deploy``-stage re-drive,
which is exactly where the second-merge risk lives.
Queries Gitea ``GET /repos/{owner}/{repo}/pulls?state=all&head=<branch>`` and
reports True when any matching PR has ``merged == True``. Never raises (AC-9):
any HTTP/parse error -> ``False`` (conservative: "not known-merged" lets the
normal gate re-evaluate rather than silently skipping a real merge).
"""
try:
import httpx
owner = settings.gitea_owner
headers = {"Authorization": f"token {settings.gitea_token}"}
resp = httpx.get(
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
params={"state": "all", "head": branch},
headers=headers, timeout=_SHORT_TIMEOUT,
)
if resp.status_code != 200:
return False
for pr in resp.json() or []:
if pr.get("merged") is True:
return True
return False
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("pr_already_merged check failed for %s/%s: %s", repo, branch, e)
return False
# ---------------------------------------------------------------------------
# ORCH-071: deterministic merge-actor + post-deploy merge verification.
#
# For the self-hosting repo the `deploy` stage runs the deterministic self-deploy
# path (Phase A/B/C) and the LLM `deployer` agent — historically the ONLY actor
# that merged the feature PR into `main` — never runs. These two helpers close the
# "phantom merge" gap (LESSONS_2026-06-08): a deterministic actor merges the PR via
# the Gitea PR-merge API (NEVER a push/force-push to main, INV-4) and a verifier
# confirms `main` actually received the commit before the pipeline reaches `done`.
# Both wire into the `deploy -> done` under-gate (stage_engine._handle_merge_verify).
# ---------------------------------------------------------------------------
# Lightweight in-process observability counters (D8). Reset only on process start;
# surfaced read-only via `merge_verify_status()` in GET /queue. Never the source of
# truth for any decision — purely informational.
_MERGE_VERIFY_COUNTERS: dict = {
"merge_verified_total": 0,
"not_merged_alerts_total": 0,
"last_alert_wi": None,
}
def note_merge_verified() -> None:
"""Bump the 'merge verified -> done' counter (observability only). Never raises."""
try:
_MERGE_VERIFY_COUNTERS["merge_verified_total"] += 1
except Exception: # noqa: BLE001 - observability must never break a decision
pass
def note_not_merged_alert(work_item_id: str | None) -> None:
"""Bump the 'deploy succeeded but not merged' counter. Never raises."""
try:
_MERGE_VERIFY_COUNTERS["not_merged_alerts_total"] += 1
_MERGE_VERIFY_COUNTERS["last_alert_wi"] = work_item_id
except Exception: # noqa: BLE001 - observability must never break a decision
pass
def merge_verify_status() -> dict:
"""Snapshot of the merge-verify under-gate for GET /queue. Never raises."""
try:
return {
"enabled": bool(settings.merge_verify_enabled),
"repos": settings.merge_verify_repos or "",
"merge_verified_total": _MERGE_VERIFY_COUNTERS["merge_verified_total"],
"not_merged_alerts_total": _MERGE_VERIFY_COUNTERS["not_merged_alerts_total"],
"last_alert_wi": _MERGE_VERIFY_COUNTERS["last_alert_wi"],
}
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("merge_verify_status error: %s", e)
return {"enabled": False}
def merge_verify_applies(repo: str) -> bool:
"""Whether the ORCH-071 merge-verify under-gate is REAL for this repo.
Mirrors ``self_deploy_applies`` / ``image_freshness_applies`` (FR-5 / AC-10):
* ``merge_verify_enabled=False`` -> always False (global kill-switch -> the
pipeline behaves exactly as before ORCH-071 for everyone).
* ``merge_verify_repos`` (CSV) non-empty -> real only for listed repos.
* empty CSV -> real ONLY for the self-hosting repo (``orchestrator``); other
repos keep the LLM-``deployer`` merge path unchanged (AC-4b).
Never raises (any error -> False = no-op, the safe default).
"""
try:
if not settings.merge_verify_enabled:
return False
raw = (settings.merge_verify_repos or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
return (repo or "").strip().lower() in allowed
# Lazy import keeps this a leaf-ish module (qg.checks imports merge_gate lazily).
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("merge_verify_applies error for %s: %s", repo, e)
return False
def merge_pr(repo: str, branch: str) -> tuple[bool, str]:
"""Deterministically merge the open PR for ``branch`` via the Gitea PR-merge API.
The self-hosting deterministic merge-actor (FR-1 / D3). NEVER pushes or
force-pushes ``main`` (INV-4/AC-8) — the ONLY mutation is the Gitea
``POST /pulls/{index}/merge`` call, exactly what the LLM ``deployer`` used to do
on non-self repos.
Algorithm:
1. ``pr_already_merged`` -> True -> no-op ``(True, "already-merged")`` (INV-5/AC-9).
2. ``GET /repos/{owner}/{repo}/pulls?state=open`` -> the open PR whose head ref
== ``branch`` -> its index. No open PR -> ``(False, "no open PR")``.
3. ``POST /repos/{owner}/{repo}/pulls/{index}/merge`` (Do: ``merge``) ->
200/201 -> ``(True, "merged PR #<n>")``; otherwise ``(False, "<reason>")``.
Never-raise (INV-1/AC-9 / TC-09): any HTTP/parse error -> ``(False, reason)``.
"""
try:
if pr_already_merged(repo, branch):
logger.info("merge_pr: %s/%s already merged -> no-op", repo, branch)
return True, "already-merged"
import httpx
owner = settings.gitea_owner
headers = {"Authorization": f"token {settings.gitea_token}"}
base = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}"
timeout = settings.merge_pr_timeout_s
resp = httpx.get(
f"{base}/pulls", params={"state": "open"}, headers=headers, timeout=timeout
)
if resp.status_code != 200:
return False, f"list PRs failed: HTTP {resp.status_code}"
index = None
for pr in resp.json() or []:
if pr.get("head", {}).get("ref") == branch:
index = pr.get("number")
break
if index is None:
return False, "no open PR"
m = httpx.post(
f"{base}/pulls/{index}/merge",
json={"Do": "merge"},
headers=headers,
timeout=timeout,
)
if m.status_code in (200, 201):
logger.info("merge_pr: merged PR #%s for %s/%s", index, repo, branch)
return True, f"merged PR #{index}"
detail = (m.text or "").strip()[:200]
logger.warning(
"merge_pr: merge failed for %s/%s PR #%s: HTTP %s %s",
repo, branch, index, m.status_code, detail,
)
return False, f"merge failed: HTTP {m.status_code}"
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("merge_pr unexpected error for %s/%s: %s", repo, branch, e)
return False, f"merge error: {e}"
def verify_merged_to_main(repo: str, branch: str, sha: str) -> bool:
"""Return True iff the deployed commit is confirmed merged into ``origin/main``.
Post-deploy verification (FR-2 / D4): the merge is confirmed when EITHER
* ``pr_already_merged(repo, branch)`` is True (Gitea ``PR.merged == true``), OR
* ``git merge-base --is-ancestor <sha> origin/main`` succeeds in the per-branch
worktree (after ``git fetch origin main``), i.e. the validated SHA is an
ancestor of the current ``origin/main``.
``sha`` is the validated commit (``image_freshness.validated_revision`` =
worktree ``git rev-parse HEAD``). An empty ``sha`` makes the git branch
inconclusive (only the PR-merged branch can then confirm).
Never-raise (INV-1/AC-7 / TC-04): any git/HTTP error -> ``False`` (= "not
confirmed" -> fail-closed for ``done``: alert + HOLD). The exception is NEVER
propagated into ``advance_stage``.
"""
try:
if pr_already_merged(repo, branch):
return True
if not sha:
logger.warning(
"verify_merged_to_main: empty SHA for %s/%s and PR not known-merged",
repo, branch,
)
return False
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning(
"verify_merged_to_main: worktree error for %s/%s: %s", repo, branch, e
)
return False
subprocess.run(
["git", "-C", wt, "fetch", "origin", "main"],
capture_output=True, timeout=settings.merge_verify_timeout_s,
)
r = subprocess.run(
["git", "-C", wt, "merge-base", "--is-ancestor", sha, "origin/main"],
capture_output=True, timeout=settings.merge_verify_timeout_s,
)
return r.returncode == 0
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning(
"verify_merged_to_main unexpected error for %s/%s: %s", repo, branch, e
)
return False