"""Quality Gate checks — real implementations using Gitea/Plane API and filesystem.""" import os import time import logging import subprocess import httpx from ..config import settings logger = logging.getLogger("orchestrator.qg") from ..git_worktree import get_worktree_path, ensure_worktree def _repo_path(repo: str, branch: str | None = None) -> str: """Resolve the working path to read agent artifacts from. ORCH-2 / S-4: artifacts now live in the per-branch worktree. When a branch is given and its worktree exists on disk, read from there; otherwise fall back to the shared /repos/ clone (keeps backward-compat for 2-arg callers/tests). """ if branch: wt = get_worktree_path(repo, branch) if os.path.isdir(wt): return wt return os.path.join(settings.repos_dir, repo) # Shared httpx client config GITEA_HEADERS = {"Authorization": f"token {settings.gitea_token}"} GITEA_BASE = f"{settings.gitea_url}/api/v1" def check_analysis_complete(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check if analysis artifacts exist in the repo branch. Required files: - docs/work-items//01-brd.md - docs/work-items//02-trz.md - docs/work-items//03-acceptance-criteria.md - docs/work-items//04-test-plan.yaml """ required_files = [ f"docs/work-items/{work_item_id}/01-brd.md", f"docs/work-items/{work_item_id}/02-trz.md", f"docs/work-items/{work_item_id}/03-acceptance-criteria.md", f"docs/work-items/{work_item_id}/04-test-plan.yaml", ] repo_path = _repo_path(repo, branch) missing = [] for f in required_files: full_path = os.path.join(repo_path, f) if not os.path.isfile(full_path): missing.append(f) if missing: return False, f"Missing files: {', '.join(missing)}" return True, "All analysis artifacts present" def check_architecture_done(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check if architecture artifacts exist. Required: docs/work-items//06-adr/ (at least 1 file) OR: docs/work-items//07-infra-requirements.md """ repo_path = _repo_path(repo, branch) adr_dir = os.path.join(repo_path, f"docs/work-items/{work_item_id}/06-adr") infra_file = os.path.join(repo_path, f"docs/work-items/{work_item_id}/07-infra-requirements.md") if os.path.isdir(adr_dir) and len(os.listdir(adr_dir)) > 0: return True, "ADR directory exists with files" if os.path.isfile(infra_file): return True, "Infra requirements file exists" return False, "No ADR directory or infra-requirements.md found" def check_ci_green(repo: str, branch: str) -> tuple[bool, str]: """ Check if CI status is green for branch via Gitea API. GET /repos/{owner}/{repo}/commits/{branch}/status ORCH-045: polling with retry to fix a race condition. The gate used to do a single status read right after the developer push; if CI was still ``pending`` for the first 1-3s (real case ORCH-017: polled 17:58:54 -> pending, CI went green 17:58:55) the gate returned False once and the task stalled silently. Behaviour now: * ``success`` -> (True, "CI green") immediately. * ``failure`` / ``error`` -> (False, "CI state: ") immediately (CI is red, retrying is pointless). * ``pending`` / unknown -> sleep ``ci_poll_interval_s`` and poll again, up to ``ci_poll_max_attempts`` times. * still pending after all attempts -> (False, "CI still pending after s"). * 404 -> (False, "Branch not found or no status"). * transient httpx errors -> logged and retried within the attempt budget; if every attempt errors -> (False, "API error: "). """ owner = settings.gitea_owner url = f"{GITEA_BASE}/repos/{owner}/{repo}/commits/{branch}/status" attempts = settings.ci_poll_max_attempts interval = settings.ci_poll_interval_s last_state = "unknown" last_error: Exception | None = None for i in range(1, attempts + 1): try: resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10) if resp.status_code == 404: return False, f"Branch '{branch}' not found or no status" resp.raise_for_status() data = resp.json() last_state = data.get("state", "unknown") last_error = None if last_state == "success": return True, "CI green" if last_state in ("failure", "error"): return False, f"CI state: {last_state}" # non-terminal (pending / unknown / other) -> retry below except httpx.HTTPError as e: last_error = e logger.error(f"check_ci_green: attempt {i}/{attempts} API error: {e}") if i < attempts: if last_error is not None: logger.info( f"check_ci_green: attempt {i}/{attempts}, error, retrying in {interval}s" ) else: logger.info( f"check_ci_green: attempt {i}/{attempts}, state={last_state}, " f"retrying in {interval}s" ) time.sleep(interval) if last_error is not None: return False, f"API error: {last_error}" return False, f"CI still pending after {attempts * interval}s" def check_review_approved(repo: str, pr_number: int) -> tuple[bool, str]: """ Check if PR has at least one approved review and no request_changes. GET /repos/{owner}/{repo}/pulls/{pr_number}/reviews """ owner = settings.gitea_owner url = f"{GITEA_BASE}/repos/{owner}/{repo}/pulls/{pr_number}/reviews" try: resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10) resp.raise_for_status() reviews = resp.json() approved = 0 changes_requested = 0 for review in reviews: # Skip stale reviews (dismissed by new commits) if review.get("stale", False): continue state = review.get("state", "").upper() if state == "APPROVED": approved += 1 elif state == "REQUEST_CHANGES": changes_requested += 1 if changes_requested > 0: return False, f"Changes requested ({changes_requested} reviews)" if approved > 0: return True, f"Approved ({approved} reviews)" return False, "No reviews yet" except httpx.HTTPError as e: logger.error(f"Gitea API error checking reviews: {e}") return False, f"API error: {e}" def check_tests_passed(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Gate the testing -> deploy transition on the tester's MACHINE-READABLE verdict in 13-test-report.md frontmatter, NOT on a naive substring search of the body. ET-013 fix: the previous implementation did `if "PASS" in content`, so a report explicitly marked `verdict: BLOCKED` / `status: blocked` but whose prose mentioned "23 passed" / "✅ PASS" / "All checks passed" was treated as a pass, and an unfinished feature reached Done. This mirrors check_reviewer_verdict (S-5) and check_deploy_status (БАГ 8): read ONLY the YAML frontmatter, never the body. ORCH-047: the machine verdict is read from any of three equal-rank frontmatter fields — `result:` (canonical, what the tester prompt emits), `verdict:` or `status:` (legacy / enduro-trails). See _parse_tests_verdict. File: docs/work-items//13-test-report.md """ repo_path = _repo_path(repo, branch) report_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/13-test-report.md") if not os.path.isfile(report_path): return False, "Test report not found" try: with open(report_path, "r") as f: content = f.read() except OSError as e: return False, f"Error reading test report: {e}" return _parse_tests_verdict(content) # Positive / negative verdict tokens, derived from REAL tester reports in # enduro-trails (ET-001..ET-014). The tester is inconsistent: most write # `verdict: PASS`, but ET-006 used `verdict: ready-to-deploy` (with `status: PASSED`), # ET-007 `verdict: PASS — ready-to-deploy`, ET-008 `verdict: stage:ready-to-deploy` # (with `status: pass`). ET-013 (the bug) used `verdict: BLOCKED` / `status: blocked`. # We therefore match known positive/negative TOKENS inside the normalized # verdict/status fields, and treat a negative token as authoritative (a BLOCKED/FAILED # report never passes, even if another field looks positive). _TESTS_NEGATIVE_TOKENS = ("BLOCKED", "FAILED", "FAIL", "REQUEST_CHANGES", "REJECT", "RED") _TESTS_POSITIVE_TOKENS = ("PASSED", "PASS", "READY-TO-DEPLOY", "READY_TO_DEPLOY", "GREEN", "APPROVED") def _parse_tests_verdict(content: str) -> tuple[bool, str]: """Map a 13-test-report.md body to a quality-gate verdict by reading ONLY the machine-readable YAML frontmatter fields — never the prose body. Three equal-rank fields are accepted (ORCH-047): `result:` (the canonical field the tester prompt `.openclaw/agents/tester.md` is told to emit, `result: PASS|FAIL`), plus `verdict:` and `status:` (legacy / enduro-trails ET-001..ET-014). ANY single non-empty field is sufficient. Token sets are frozen for backward compatibility. Rules: - No frontmatter / bad YAML / none of the three fields present -> (False, reason). - A negative token (BLOCKED/FAILED/...) in ANY field -> (False) and is authoritative (ET-013 main case: verdict BLOCKED wins over any prose PASS, and beats a positive token in another field). - Otherwise a positive token (PASS/PASSED/READY-TO-DEPLOY/...) in ANY field -> (True). - Anything else (fields set but unrecognized) -> (False, reason). """ import yaml if not content.startswith("---"): return False, "No YAML frontmatter in test report (cannot read machine verdict)" parts = content.split("---", 2) if len(parts) < 3: return False, "Malformed YAML frontmatter in test report" try: fm = yaml.safe_load(parts[1]) or {} except yaml.YAMLError as e: return False, f"Invalid YAML frontmatter in test report: {e}" if not isinstance(fm, dict): return False, "Malformed YAML frontmatter in test report (not a mapping)" verdict = str(fm.get("verdict", "") or "").upper().strip() status = str(fm.get("status", "") or "").upper().strip() result = str(fm.get("result", "") or "").upper().strip() if not verdict and not status and not result: return False, "No machine-readable verdict/status/result in test report frontmatter" value = verdict or status or result fields = f"{verdict} {status} {result}" for neg in _TESTS_NEGATIVE_TOKENS: if neg in fields: return False, f"Test verdict: {value} ({neg})" for pos in _TESTS_POSITIVE_TOKENS: if pos in fields: return True, f"Test verdict: {value} (PASS)" return ( False, f"No recognized PASS verdict in frontmatter " f"(verdict={verdict!r}, status={status!r}, result={result!r})", ) def check_analysis_approved(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check if analysis is complete AND approved by stakeholder. Requirements: 1. All analysis artifacts exist (BRD, TRZ, AC, TestPlan) 2. Stakeholder has posted :approved: comment on the Plane issue This QG is designed to be triggered by :approved: comment handler, so the approval check verifies file completeness as a safety gate. """ # First check files files_ok, files_reason = check_analysis_complete(repo, work_item_id, branch) if not files_ok: return False, files_reason # Check for :approved: comment via Plane API try: from ..plane_sync import find_issue_id, PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID from ..projects import get_project_by_repo # ORCH-6: verify approval in the issue's own Plane project. _proj = get_project_by_repo(repo) _pid = _proj.plane_project_id if _proj else PROJECT_ID issue_id = find_issue_id(work_item_id, _pid) if not issue_id: return False, "Cannot find Plane issue to verify approval" url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/comments/" resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() comments = resp.json() # Handle paginated response if isinstance(comments, dict): comments = comments.get("results", []) for comment in comments: body = comment.get("comment_html", "") or comment.get("comment", "") if ":approved:" in body: return True, "Analysis complete and approved by stakeholder" return False, "Analysis artifacts present but no :approved: comment found" except Exception as e: logger.warning(f"Failed to check approval for {work_item_id}: {e}") # If we can't reach Plane API but files exist, allow advance # (the :approved: handler already verified the comment exists) return True, f"Files present; Plane API check skipped ({e})" def check_reviewer_verdict(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check reviewer agent verdict from 12-review.md (S-5 fix). Reads ONLY the machine-readable `verdict:` field from the YAML frontmatter, so tables / prose that merely mention APPROVED or REQUEST_CHANGES no longer cause false positives/negatives. Returns: (True, ...) -> verdict: APPROVED (False, ...) -> verdict: REQUEST_CHANGES, missing verdict, or no frontmatter """ import yaml repo_path = _repo_path(repo, branch) review_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/12-review.md") if not os.path.isfile(review_path): return False, "Review report not found (12-review.md)" try: with open(review_path, "r") as f: content = f.read() verdict = None if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: try: fm = yaml.safe_load(parts[1]) or {} except yaml.YAMLError as e: return False, f"Invalid YAML frontmatter in review: {e}" verdict = str(fm.get("verdict", "")).upper().strip() if verdict == "APPROVED": return True, "Reviewer verdict: APPROVED" if verdict == "REQUEST_CHANGES": return False, "Reviewer verdict: REQUEST_CHANGES" return False, f"No machine-readable verdict in frontmatter (got: {verdict!r})" except OSError as e: return False, f"Error reading review: {e}" def check_tests_local(repo: str, branch: str) -> tuple[bool, str]: """ DEPRECATED: replaced by check_ci_green on the development stage (CI is now configured). Kept for backward-compat; not wired to any stage. S-1 fix: run the project test suite locally and judge by exit code, instead of depending on Gitea CI (which is not configured -> always false). БАГ 5 fix: invoke pytest directly instead of make test. make is not installed in the orchestrator container, so the previous ["make", "test"] call raised FileNotFoundError. This reproduces the Makefile test target 1:1 (cd src/api && python -m pytest ../../tests/ -v). ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this is safe for concurrent active tasks — no shared /repos checkout race. """ import subprocess try: repo_path = ensure_worktree(repo, branch) r = subprocess.run( ["python", "-m", "pytest", "../../tests/", "-v"], cwd=os.path.join(repo_path, "src", "api"), capture_output=True, text=True, timeout=600, ) if r.returncode == 0: return True, "Local tests passed" tail = (r.stdout + r.stderr)[-500:] return False, f"Local tests failed: ...{tail}" except subprocess.TimeoutExpired: return False, "Local tests timed out (600s)" except Exception as e: return False, f"Local test run error: {e}" def _parse_deploy_status(content: str) -> tuple[bool, str]: """Parse a 14-deploy-log.md body and map its `deploy_status:` frontmatter to a quality-gate verdict. Reads ONLY the machine-readable YAML field, never prose. deploy_status: SUCCESS -> (True, "Deploy status: SUCCESS") deploy_status: FAILED -> (False, "Deploy status: FAILED") missing field / no frontmatter / bad YAML -> (False, ) """ import yaml status = None if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: try: fm = yaml.safe_load(parts[1]) or {} except yaml.YAMLError as e: return False, f"Invalid YAML frontmatter in deploy log: {e}" status = str(fm.get("deploy_status", "")).upper().strip() if status == "SUCCESS": return True, "Deploy status: SUCCESS" if status == "FAILED": return False, "Deploy status: FAILED" return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})" def _deploy_log_from_main(repo: str, work_item_id: str) -> str | None: """Best-effort read of 14-deploy-log.md from origin/main on the shared clone. The deployer writes 14-deploy-log.md and merges the deploy artifacts into main via a separate PR (see ET-013), so the file lands in origin/main, NOT in the feature branch worktree the gate normally reads. This recovers it from main. Degrades gracefully: any git failure (no clone, network/fetch error, file absent in main) returns None instead of raising, so the caller falls back to the plain "not found" verdict. Never raises. """ repo_clone = os.path.join(settings.repos_dir, repo) if not os.path.isdir(os.path.join(repo_clone, ".git")): return None rel = f"docs/work-items/{work_item_id}/14-deploy-log.md" try: # Refresh origin/main so we see freshly-merged deploy artifacts. subprocess.run( ["git", "-C", repo_clone, "fetch", "origin", "main"], check=False, capture_output=True, timeout=30, ) show = subprocess.run( ["git", "-C", repo_clone, "show", f"origin/main:{rel}"], check=False, capture_output=True, text=True, timeout=15, ) except (subprocess.SubprocessError, OSError) as e: logger.warning("deploy-log origin/main lookup failed for %s/%s: %s", repo, work_item_id, e) return None if show.returncode != 0: return None return show.stdout def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ БАГ 8 fix: gate the deploy -> done transition on the deployer's machine-readable verdict in 14-deploy-log.md frontmatter, NOT on the LLM process exit code (which is always 0 on a successful agent session even when the deploy failed). Mirrors check_reviewer_verdict (S-5): reads ONLY `deploy_status:` from YAML frontmatter. Returns: (True, ...) -> deploy_status: SUCCESS (False, ...) -> deploy_status: FAILED, missing field, or no frontmatter ET-013 path-sync fix: the deployer writes 14-deploy-log.md and merges the deploy artifacts into main via a SEPARATE PR, so the log lands in origin/main, not in the feature-branch worktree this gate reads via _repo_path(repo, branch). If the file is absent in the worktree we fall back to reading it from origin/main on the shared clone. Lookup order: worktree -> origin/main -> not found. """ repo_path = _repo_path(repo, branch) log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/14-deploy-log.md") if os.path.isfile(log_path): try: with open(log_path, "r") as f: content = f.read() except OSError as e: return False, f"Error reading deploy log: {e}" return _parse_deploy_status(content) # Not in the feature worktree — the deployer may have merged it into main. main_content = _deploy_log_from_main(repo, work_item_id) if main_content is not None: return _parse_deploy_status(main_content) return False, "Deploy log not found (14-deploy-log.md)" # --------------------------------------------------------------------------- # Self-hosting detection: staging-infra (localhost:8501) exists ONLY for the # orchestrator repo itself (self-hosting). Other repos have no staging instance # and their deployer prompts know nothing about it -- the gate must be a no-op # for them. The repo value is the plain gitea repo name (ProjectConfig.repo), # matching what _run_qg/advance_stage pass in. See ORCH-35 / PR #31. # --------------------------------------------------------------------------- SELF_HOSTING_REPO = "orchestrator" def is_self_hosting_repo(repo: str) -> bool: """Return True iff repo is the self-hosted orchestrator (has staging infra). Comparison is case-insensitive and strips whitespace for safety, but in practice repo comes from the gitea webhook payload .repository.name which is always lowercase (confirmed via projects.py registry entry). """ return (repo or "").strip().lower() == SELF_HOSTING_REPO.lower() def _parse_staging_status(content: str) -> tuple[bool, str]: """Parse a 15-staging-log.md body and map its `staging_status:` frontmatter to a quality-gate verdict. Reads ONLY the machine-readable YAML field, never prose. staging_status: SUCCESS -> (True, "Staging status: SUCCESS") staging_status: FAILED -> (False, "Staging status: FAILED") missing field / no frontmatter / bad YAML -> (False, ) """ import yaml status = None if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: try: fm = yaml.safe_load(parts[1]) or {} except yaml.YAMLError as e: return False, f"Invalid YAML frontmatter in staging log: {e}" status = str(fm.get("staging_status", "")).upper().strip() if status == "SUCCESS": return True, "Staging status: SUCCESS" if status == "FAILED": return False, "Staging status: FAILED" return False, f"No machine-readable staging_status in frontmatter (got: {status!r})" def _staging_log_from_main(repo: str, work_item_id: str) -> str | None: """Best-effort read of 15-staging-log.md from origin/main on the shared clone. The deployer writes 15-staging-log.md and merges the staging artifacts into main via a separate PR (mirroring the deploy-log pattern), so the file lands in origin/main, NOT in the feature branch worktree the gate normally reads. This recovers it from main. Degrades gracefully: any git failure (no clone, network/fetch error, file absent in main) returns None instead of raising, so the caller falls back to the plain "not found" verdict. Never raises. """ repo_clone = os.path.join(settings.repos_dir, repo) if not os.path.isdir(os.path.join(repo_clone, ".git")): return None rel = f"docs/work-items/{work_item_id}/15-staging-log.md" try: # Refresh origin/main so we see freshly-merged staging artifacts. subprocess.run( ["git", "-C", repo_clone, "fetch", "origin", "main"], check=False, capture_output=True, timeout=30, ) show = subprocess.run( ["git", "-C", repo_clone, "show", f"origin/main:{rel}"], check=False, capture_output=True, text=True, timeout=15, ) except (subprocess.SubprocessError, OSError) as e: logger.warning("staging-log origin/main lookup failed for %s/%s: %s", repo, work_item_id, e) return None if show.returncode != 0: return None return show.stdout def check_staging_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Gate the deploy-staging -> deploy transition on the deployer's machine-readable verdict in 15-staging-log.md frontmatter (staging_status: SUCCESS|FAILED). ORCH-35 conditional gate (Variant A): - Non-self-hosting repos (anything other than "orchestrator") have no staging instance and no deployer knowledge of it -> gate is an immediate pass. - Self-hosting repo ("orchestrator") -> real check: reads ONLY the machine- readable staging_status: field from YAML frontmatter, never body prose. Mirrors check_deploy_status (БАГ 8) for the self-hosting path. Lookup order (self-hosting only): worktree -> origin/main -> not found. Returns: (True, "Staging gate N/A for ") -> non-self-hosting repo (instant pass) (True, ...) -> staging_status: SUCCESS (self-hosting path) (False, ...) -> staging_status: FAILED, missing field, or no frontmatter """ # Variant A: non-self-hosting repos have no staging infra -- skip entirely. if not is_self_hosting_repo(repo): return True, f"Staging gate N/A for {repo}" # Self-hosting (orchestrator) path: real verdict check. repo_path = _repo_path(repo, branch) log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/15-staging-log.md") if os.path.isfile(log_path): try: with open(log_path, "r") as f: content = f.read() except OSError as e: return False, f"Error reading staging log: {e}" return _parse_staging_status(content) # Not in the feature worktree -- the deployer may have merged it into main. main_content = _staging_log_from_main(repo, work_item_id) if main_content is not None: return _parse_staging_status(main_content) return False, "Staging log not found (15-staging-log.md)" def _merge_gate_applies(repo: str) -> bool: """Whether the merge-gate is REAL for this repo (ORCH-043, conditional rollout). Mirrors the ORCH-35 conditional staging-gate. ``merge_gate_repos`` is a CSV of repos where the gate is enforced; when empty the gate is real ONLY for the self-hosting repo (``orchestrator``). Other repos -> conditional no-op. """ raw = (settings.merge_gate_repos or "").strip() if raw: allowed = {r.strip().lower() for r in raw.split(",") if r.strip()} return (repo or "").strip().lower() in allowed return is_self_hosting_repo(repo) def check_branch_mergeable(repo: str, work_item_id: str, branch: str) -> tuple[bool, str]: """ORCH-043 merge-gate: validate the branch against the CURRENT origin/main immediately before the deployer merges its PR (deploy-staging -> deploy edge). Deterministic, no LLM. Algorithm (ADR-001 §4): 1. Conditionality: merge_gate_enabled=False -> (True, "merge-gate disabled"); repo where the gate is not real -> (True, "merge-gate N/A for "). 2. Acquire the per-repo merge lease (NON-blocking). Busy -> (False, "merge-lock busy") — a SIGNAL for the engine to DEFER (not a code fault, no rollback). 3. Double-check "behind origin/main" UNDER the lease (main may have moved while we waited). Not behind -> (True, "branch up-to-date with main"); lease HELD. 4. Behind -> auto_rebase_onto_main: - conflict -> release lease -> (False, "rebase conflict: ...") - clean -> retest_branch: green -> (True, "rebased onto main, re-test green"); lease HELD red/timeout -> release lease -> (False, "re-test ... after rebase") 5. On SUCCESS the lease is HELD until the actual merge (released on PR-merged webhook / deploy->done / rollback). On any FAILURE the lease is released. Never-raise (AC-9): any internal error -> (False, "") with the lease released; an exception never escapes into advance_stage. """ # Imported lazily so qg.checks stays importable without the merge_gate deps in # minimal/test contexts and to avoid an import cycle surprise. from .. import merge_gate try: if not settings.merge_gate_enabled: return True, "merge-gate disabled" if not _merge_gate_applies(repo): return True, f"merge-gate N/A for {repo}" acquired, reason = merge_gate.acquire_merge_lease(repo, branch, work_item_id) if not acquired: # "merge-lock busy" -> caller defers; lease NOT held by us, nothing to release. return False, reason try: # Double-check under the lease: another task may have just merged. if not merge_gate.branch_is_behind_main(repo, branch): logger.info("check_branch_mergeable: %s up-to-date with main", branch) return True, "branch up-to-date with main" ok, rb_reason = merge_gate.auto_rebase_onto_main(repo, branch) if not ok: merge_gate.release_merge_lease(repo, branch) return False, rb_reason # "rebase conflict: ..." ok_t, t_reason = merge_gate.retest_branch(repo, branch) if ok_t: logger.info("check_branch_mergeable: %s rebased + re-test green", branch) return True, "rebased onto main, re-test green" merge_gate.release_merge_lease(repo, branch) if "timeout" in t_reason: return False, t_reason # "re-test timeout after s" (AC-6) tail = t_reason.removeprefix("re-test failed: ") return False, f"re-test failed after rebase: {tail}" except Exception as e: # noqa: BLE001 - never-raise; always release on error merge_gate.release_merge_lease(repo, branch) logger.error("check_branch_mergeable inner error for %s/%s: %s", repo, branch, e) return False, f"merge-gate error: {e}" except Exception as e: # noqa: BLE001 - outer never-raise guard logger.error("check_branch_mergeable error for %s/%s: %s", repo, branch, e) return False, f"merge-gate error: {e}" # Registry for dynamic lookup by name QG_CHECKS = { "check_analysis_approved": check_analysis_approved, "check_analysis_complete": check_analysis_complete, "check_architecture_done": check_architecture_done, "check_ci_green": check_ci_green, "check_review_approved": check_review_approved, "check_tests_passed": check_tests_passed, "check_reviewer_verdict": check_reviewer_verdict, "check_tests_local": check_tests_local, "check_deploy_status": check_deploy_status, "check_staging_status": check_staging_status, "check_branch_mergeable": check_branch_mergeable, }