Files
orchestrator/src/qg/checks.py
claude-bot 92961d1d32 refactor(frontmatter): unified frontmatter contract + handoff spec (ORCH-52c)
src/frontmatter.py grows from a single-key reader into the full machine
contract: reader (read_frontmatter_value, unchanged), one parse primitive
(parse_frontmatter), writer (render/write_frontmatter), schema validator
(validate_schema/REQUIRED_FIELDS, warning-only by default) and a shared
strip_frontmatter helper. The five verdict gates (check_reviewer_verdict,
_parse_tests_verdict, _parse_deploy_status, _parse_staging_status,
parse_security_status) now read through the single parse_frontmatter point
instead of duplicated ad-hoc YAML logic; review_parse._strip_frontmatter and
security_gate.extract_security_findings reuse the shared helper.

Strictly backward compatible + never-raise: STAGE_TRANSITIONS, the QG_CHECKS
composition, verdict semantics (incl. ORCH-047 three-field tester + negative
token priority), reason-strings and worktree->origin/main fallback are 1:1.
The schema validator never influences a gate verdict by default; hard-fail is
reserved behind the frontmatter_validation_strict kill-switch (default False).

New formal handoff spec docs/_standards/HANDOFF_PROTOCOL.md ("stage -> required
output" + required frontmatter schema), aligned 1:1 with PIPELINE_DOCS.md.

Tests: test_frontmatter.py (TC-01..07), test_qg_verdicts.py (TC-08..15),
test_security_gate.py (TC-12), test_stages_invariants.py (TC-16). Full
tests/ green (1212).

Refs: ORCH-076

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 14:14:30 +03:00

774 lines
34 KiB
Python

"""Quality Gate checks — real implementations using Gitea/Plane API and filesystem."""
import os
import time
import logging
import subprocess
import httpx
from ..config import settings
logger = logging.getLogger("orchestrator.qg")
from ..git_worktree import get_worktree_path, ensure_worktree
def _repo_path(repo: str, branch: str | None = None) -> str:
"""Resolve the working path to read agent artifacts from.
ORCH-2 / S-4: artifacts now live in the per-branch worktree. When a branch is
given and its worktree exists on disk, read from there; otherwise fall back to
the shared /repos/<repo> clone (keeps backward-compat for 2-arg callers/tests).
"""
if branch:
wt = get_worktree_path(repo, branch)
if os.path.isdir(wt):
return wt
return os.path.join(settings.repos_dir, repo)
# Shared httpx client config
GITEA_HEADERS = {"Authorization": f"token {settings.gitea_token}"}
GITEA_BASE = f"{settings.gitea_url}/api/v1"
def check_analysis_complete(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check if analysis artifacts exist in the repo branch.
Required files:
- docs/work-items/<work_item_id>/01-brd.md
- docs/work-items/<work_item_id>/02-trz.md
- docs/work-items/<work_item_id>/03-acceptance-criteria.md
- docs/work-items/<work_item_id>/04-test-plan.yaml
"""
required_files = [
f"docs/work-items/{work_item_id}/01-brd.md",
f"docs/work-items/{work_item_id}/02-trz.md",
f"docs/work-items/{work_item_id}/03-acceptance-criteria.md",
f"docs/work-items/{work_item_id}/04-test-plan.yaml",
]
repo_path = _repo_path(repo, branch)
missing = []
for f in required_files:
full_path = os.path.join(repo_path, f)
if not os.path.isfile(full_path):
missing.append(f)
if missing:
return False, f"Missing files: {', '.join(missing)}"
return True, "All analysis artifacts present"
def check_architecture_done(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check if architecture artifacts exist.
Required: docs/work-items/<work_item_id>/06-adr/ (at least 1 file)
OR: docs/work-items/<work_item_id>/07-infra-requirements.md
"""
repo_path = _repo_path(repo, branch)
adr_dir = os.path.join(repo_path, f"docs/work-items/{work_item_id}/06-adr")
infra_file = os.path.join(repo_path, f"docs/work-items/{work_item_id}/07-infra-requirements.md")
if os.path.isdir(adr_dir) and len(os.listdir(adr_dir)) > 0:
return True, "ADR directory exists with files"
if os.path.isfile(infra_file):
return True, "Infra requirements file exists"
return False, "No ADR directory or infra-requirements.md found"
def check_ci_green(repo: str, branch: str) -> tuple[bool, str]:
"""
Check if CI status is green for branch via Gitea API.
GET /repos/{owner}/{repo}/commits/{branch}/status
ORCH-045: polling with retry to fix a race condition. The gate used to do a
single status read right after the developer push; if CI was still ``pending``
for the first 1-3s (real case ORCH-017: polled 17:58:54 -> pending, CI went
green 17:58:55) the gate returned False once and the task stalled silently.
Behaviour now:
* ``success`` -> (True, "CI green") immediately.
* ``failure`` / ``error`` -> (False, "CI state: <state>") immediately
(CI is red, retrying is pointless).
* ``pending`` / unknown -> sleep ``ci_poll_interval_s`` and poll again,
up to ``ci_poll_max_attempts`` times.
* still pending after all attempts -> (False, "CI still pending after <T>s").
* 404 -> (False, "Branch not found or no status").
* transient httpx errors -> logged and retried within the attempt budget;
if every attempt errors -> (False, "API error: <e>").
"""
owner = settings.gitea_owner
url = f"{GITEA_BASE}/repos/{owner}/{repo}/commits/{branch}/status"
attempts = settings.ci_poll_max_attempts
interval = settings.ci_poll_interval_s
last_state = "unknown"
last_error: Exception | None = None
for i in range(1, attempts + 1):
try:
resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10)
if resp.status_code == 404:
return False, f"Branch '{branch}' not found or no status"
resp.raise_for_status()
data = resp.json()
last_state = data.get("state", "unknown")
last_error = None
if last_state == "success":
return True, "CI green"
if last_state in ("failure", "error"):
return False, f"CI state: {last_state}"
# non-terminal (pending / unknown / other) -> retry below
except httpx.HTTPError as e:
last_error = e
logger.error(f"check_ci_green: attempt {i}/{attempts} API error: {e}")
if i < attempts:
if last_error is not None:
logger.info(
f"check_ci_green: attempt {i}/{attempts}, error, retrying in {interval}s"
)
else:
logger.info(
f"check_ci_green: attempt {i}/{attempts}, state={last_state}, "
f"retrying in {interval}s"
)
time.sleep(interval)
if last_error is not None:
return False, f"API error: {last_error}"
return False, f"CI still pending after {attempts * interval}s"
def check_review_approved(repo: str, pr_number: int) -> tuple[bool, str]:
"""
Check if PR has at least one approved review and no request_changes.
GET /repos/{owner}/{repo}/pulls/{pr_number}/reviews
"""
owner = settings.gitea_owner
url = f"{GITEA_BASE}/repos/{owner}/{repo}/pulls/{pr_number}/reviews"
try:
resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10)
resp.raise_for_status()
reviews = resp.json()
approved = 0
changes_requested = 0
for review in reviews:
# Skip stale reviews (dismissed by new commits)
if review.get("stale", False):
continue
state = review.get("state", "").upper()
if state == "APPROVED":
approved += 1
elif state == "REQUEST_CHANGES":
changes_requested += 1
if changes_requested > 0:
return False, f"Changes requested ({changes_requested} reviews)"
if approved > 0:
return True, f"Approved ({approved} reviews)"
return False, "No reviews yet"
except httpx.HTTPError as e:
logger.error(f"Gitea API error checking reviews: {e}")
return False, f"API error: {e}"
def check_tests_passed(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Gate the testing -> deploy transition on the tester's MACHINE-READABLE verdict
in 13-test-report.md frontmatter, NOT on a naive substring search of the body.
ET-013 fix: the previous implementation did `if "PASS" in content`, so a report
explicitly marked `verdict: BLOCKED` / `status: blocked` but whose prose mentioned
"23 passed" / "✅ PASS" / "All checks passed" was treated as a pass, and an
unfinished feature reached Done. This mirrors check_reviewer_verdict (S-5) and
check_deploy_status (БАГ 8): read ONLY the YAML frontmatter, never the body.
ORCH-047: the machine verdict is read from any of three equal-rank frontmatter
fields — `result:` (canonical, what the tester prompt emits), `verdict:` or
`status:` (legacy / enduro-trails). See _parse_tests_verdict.
File: docs/work-items/<work_item_id>/13-test-report.md
"""
repo_path = _repo_path(repo, branch)
report_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/13-test-report.md")
if not os.path.isfile(report_path):
return False, "Test report not found"
try:
with open(report_path, "r") as f:
content = f.read()
except OSError as e:
return False, f"Error reading test report: {e}"
return _parse_tests_verdict(content)
# Positive / negative verdict tokens, derived from REAL tester reports in
# enduro-trails (ET-001..ET-014). The tester is inconsistent: most write
# `verdict: PASS`, but ET-006 used `verdict: ready-to-deploy` (with `status: PASSED`),
# ET-007 `verdict: PASS — ready-to-deploy`, ET-008 `verdict: stage:ready-to-deploy`
# (with `status: pass`). ET-013 (the bug) used `verdict: BLOCKED` / `status: blocked`.
# We therefore match known positive/negative TOKENS inside the normalized
# verdict/status fields, and treat a negative token as authoritative (a BLOCKED/FAILED
# report never passes, even if another field looks positive).
_TESTS_NEGATIVE_TOKENS = ("BLOCKED", "FAILED", "FAIL", "REQUEST_CHANGES", "REJECT", "RED")
_TESTS_POSITIVE_TOKENS = ("PASSED", "PASS", "READY-TO-DEPLOY", "READY_TO_DEPLOY", "GREEN", "APPROVED")
def _parse_tests_verdict(content: str) -> tuple[bool, str]:
"""Map a 13-test-report.md body to a quality-gate verdict by reading ONLY the
machine-readable YAML frontmatter fields — never the prose body.
Three equal-rank fields are accepted (ORCH-047): `result:` (the canonical field
the tester prompt `.openclaw/agents/tester.md` is told to emit, `result: PASS|FAIL`),
plus `verdict:` and `status:` (legacy / enduro-trails ET-001..ET-014). ANY single
non-empty field is sufficient. Token sets are frozen for backward compatibility.
Rules:
- No frontmatter / bad YAML / none of the three fields present -> (False, reason).
- A negative token (BLOCKED/FAILED/...) in ANY field -> (False) and is
authoritative (ET-013 main case: verdict BLOCKED wins over any prose PASS, and
beats a positive token in another field).
- Otherwise a positive token (PASS/PASSED/READY-TO-DEPLOY/...) in ANY field -> (True).
- Anything else (fields set but unrecognized) -> (False, reason).
ORCH-52c: the YAML-frontmatter parse is now delegated to the unified
``frontmatter.parse_frontmatter`` primitive (single source of parse logic); the
token-logic, upper-casing, three-field set and negative-token priority are
UNCHANGED (semantics 1:1, AC-3/AC-6). Reason-strings are reproduced from the
structured parse states.
"""
from ..frontmatter import parse_frontmatter, maybe_warn_schema
parse = parse_frontmatter(content)
if not parse.has_block:
return False, "No YAML frontmatter in test report (cannot read machine verdict)"
if parse.malformed:
return False, "Malformed YAML frontmatter in test report"
if parse.yaml_error is not None:
return False, f"Invalid YAML frontmatter in test report: {parse.yaml_error}"
fm = parse.data
# Warning-only schema check (FR-2/D3): inert — never changes the verdict.
if fm:
maybe_warn_schema(content, "test report")
verdict = str(fm.get("verdict", "") or "").upper().strip()
status = str(fm.get("status", "") or "").upper().strip()
result = str(fm.get("result", "") or "").upper().strip()
if not verdict and not status and not result:
return False, "No machine-readable verdict/status/result in test report frontmatter"
value = verdict or status or result
fields = f"{verdict} {status} {result}"
for neg in _TESTS_NEGATIVE_TOKENS:
if neg in fields:
return False, f"Test verdict: {value} ({neg})"
for pos in _TESTS_POSITIVE_TOKENS:
if pos in fields:
return True, f"Test verdict: {value} (PASS)"
return (
False,
f"No recognized PASS verdict in frontmatter "
f"(verdict={verdict!r}, status={status!r}, result={result!r})",
)
def check_analysis_approved(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check if analysis is complete AND approved by stakeholder.
Requirements:
1. All analysis artifacts exist (BRD, TRZ, AC, TestPlan)
2. Stakeholder has posted :approved: comment on the Plane issue
This QG is designed to be triggered by :approved: comment handler,
so the approval check verifies file completeness as a safety gate.
"""
# First check files
files_ok, files_reason = check_analysis_complete(repo, work_item_id, branch)
if not files_ok:
return False, files_reason
# Check for :approved: comment via Plane API
try:
from ..plane_sync import find_issue_id, PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID
from ..projects import get_project_by_repo
# ORCH-6: verify approval in the issue's own Plane project.
_proj = get_project_by_repo(repo)
_pid = _proj.plane_project_id if _proj else PROJECT_ID
issue_id = find_issue_id(work_item_id, _pid)
if not issue_id:
return False, "Cannot find Plane issue to verify approval"
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/comments/"
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
comments = resp.json()
# Handle paginated response
if isinstance(comments, dict):
comments = comments.get("results", [])
for comment in comments:
body = comment.get("comment_html", "") or comment.get("comment", "")
if ":approved:" in body:
return True, "Analysis complete and approved by stakeholder"
return False, "Analysis artifacts present but no :approved: comment found"
except Exception as e:
logger.warning(f"Failed to check approval for {work_item_id}: {e}")
# If we can't reach Plane API but files exist, allow advance
# (the :approved: handler already verified the comment exists)
return True, f"Files present; Plane API check skipped ({e})"
def check_reviewer_verdict(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check reviewer agent verdict from 12-review.md (S-5 fix).
Reads ONLY the machine-readable `verdict:` field from the YAML frontmatter,
so tables / prose that merely mention APPROVED or REQUEST_CHANGES no longer
cause false positives/negatives. Returns:
(True, ...) -> verdict: APPROVED
(False, ...) -> verdict: REQUEST_CHANGES, missing verdict, or no frontmatter
ORCH-52c: the YAML-frontmatter parse is delegated to the unified
``frontmatter.parse_frontmatter`` primitive; the verdict semantics
(APPROVED/REQUEST_CHANGES) are UNCHANGED (1:1, AC-3/AC-6).
"""
from ..frontmatter import parse_frontmatter, maybe_warn_schema
repo_path = _repo_path(repo, branch)
review_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/12-review.md")
if not os.path.isfile(review_path):
return False, "Review report not found (12-review.md)"
try:
with open(review_path, "r") as f:
content = f.read()
parse = parse_frontmatter(content)
if parse.yaml_error is not None:
return False, f"Invalid YAML frontmatter in review: {parse.yaml_error}"
verdict = None
if parse.has_block and not parse.malformed:
if parse.data:
maybe_warn_schema(content, "review report")
verdict = str(parse.data.get("verdict", "")).upper().strip()
if verdict == "APPROVED":
return True, "Reviewer verdict: APPROVED"
if verdict == "REQUEST_CHANGES":
return False, "Reviewer verdict: REQUEST_CHANGES"
return False, f"No machine-readable verdict in frontmatter (got: {verdict!r})"
except OSError as e:
return False, f"Error reading review: {e}"
def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
"""
DEPRECATED: replaced by check_ci_green on the development stage (CI is now
configured). Kept for backward-compat; not wired to any stage.
S-1 fix: run the project test suite locally and judge by exit code, instead of
depending on Gitea CI (which is not configured -> always false).
БАГ 5 fix: invoke pytest directly instead of make test. make is not installed
in the orchestrator container, so the previous ["make", "test"] call raised
FileNotFoundError. This reproduces the Makefile test target 1:1
(cd src/api && python -m pytest ../../tests/ -v).
ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this
is safe for concurrent active tasks — no shared /repos checkout race.
"""
import subprocess
try:
repo_path = ensure_worktree(repo, branch)
r = subprocess.run(
["python", "-m", "pytest", "../../tests/", "-v"],
cwd=os.path.join(repo_path, "src", "api"),
capture_output=True, text=True, timeout=600,
)
if r.returncode == 0:
return True, "Local tests passed"
tail = (r.stdout + r.stderr)[-500:]
return False, f"Local tests failed: ...{tail}"
except subprocess.TimeoutExpired:
return False, "Local tests timed out (600s)"
except Exception as e:
return False, f"Local test run error: {e}"
def _parse_deploy_status(content: str) -> tuple[bool, str]:
"""Parse a 14-deploy-log.md body and map its `deploy_status:` frontmatter to a
quality-gate verdict. Reads ONLY the machine-readable YAML field, never prose.
deploy_status: SUCCESS -> (True, "Deploy status: SUCCESS")
deploy_status: FAILED -> (False, "Deploy status: FAILED")
missing field / no frontmatter / bad YAML -> (False, <reason>)
ORCH-52c: parse delegated to the unified ``frontmatter.parse_frontmatter``;
the deploy_status semantics (БАГ-8) are UNCHANGED (1:1).
"""
from ..frontmatter import parse_frontmatter, maybe_warn_schema
parse = parse_frontmatter(content)
if parse.yaml_error is not None:
return False, f"Invalid YAML frontmatter in deploy log: {parse.yaml_error}"
status = None
if parse.has_block and not parse.malformed:
if parse.data:
maybe_warn_schema(content, "deploy log")
status = str(parse.data.get("deploy_status", "")).upper().strip()
if status == "SUCCESS":
return True, "Deploy status: SUCCESS"
if status == "FAILED":
return False, "Deploy status: FAILED"
return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})"
def _deploy_log_from_main(repo: str, work_item_id: str) -> str | None:
"""Best-effort read of 14-deploy-log.md from origin/main on the shared clone.
The deployer writes 14-deploy-log.md and merges the deploy artifacts into main
via a separate PR (see ET-013), so the file lands in origin/main, NOT in the
feature branch worktree the gate normally reads. This recovers it from main.
Degrades gracefully: any git failure (no clone, network/fetch error, file
absent in main) returns None instead of raising, so the caller falls back to
the plain "not found" verdict. Never raises.
"""
repo_clone = os.path.join(settings.repos_dir, repo)
if not os.path.isdir(os.path.join(repo_clone, ".git")):
return None
rel = f"docs/work-items/{work_item_id}/14-deploy-log.md"
try:
# Refresh origin/main so we see freshly-merged deploy artifacts.
subprocess.run(
["git", "-C", repo_clone, "fetch", "origin", "main"],
check=False, capture_output=True, timeout=30,
)
show = subprocess.run(
["git", "-C", repo_clone, "show", f"origin/main:{rel}"],
check=False, capture_output=True, text=True, timeout=15,
)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("deploy-log origin/main lookup failed for %s/%s: %s", repo, work_item_id, e)
return None
if show.returncode != 0:
return None
return show.stdout
def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
БАГ 8 fix: gate the deploy -> done transition on the deployer's machine-readable
verdict in 14-deploy-log.md frontmatter, NOT on the LLM process exit code
(which is always 0 on a successful agent session even when the deploy failed).
Mirrors check_reviewer_verdict (S-5): reads ONLY `deploy_status:` from YAML
frontmatter. Returns:
(True, ...) -> deploy_status: SUCCESS
(False, ...) -> deploy_status: FAILED, missing field, or no frontmatter
ET-013 path-sync fix: the deployer writes 14-deploy-log.md and merges the deploy
artifacts into main via a SEPARATE PR, so the log lands in origin/main, not in
the feature-branch worktree this gate reads via _repo_path(repo, branch). If the
file is absent in the worktree we fall back to reading it from origin/main on the
shared clone. Lookup order: worktree -> origin/main -> not found.
"""
repo_path = _repo_path(repo, branch)
log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/14-deploy-log.md")
if os.path.isfile(log_path):
try:
with open(log_path, "r") as f:
content = f.read()
except OSError as e:
return False, f"Error reading deploy log: {e}"
return _parse_deploy_status(content)
# Not in the feature worktree — the deployer may have merged it into main.
main_content = _deploy_log_from_main(repo, work_item_id)
if main_content is not None:
return _parse_deploy_status(main_content)
return False, "Deploy log not found (14-deploy-log.md)"
# ---------------------------------------------------------------------------
# Self-hosting detection: staging-infra (localhost:8501) exists ONLY for the
# orchestrator repo itself (self-hosting). Other repos have no staging instance
# and their deployer prompts know nothing about it -- the gate must be a no-op
# for them. The repo value is the plain gitea repo name (ProjectConfig.repo),
# matching what _run_qg/advance_stage pass in. See ORCH-35 / PR #31.
# ---------------------------------------------------------------------------
SELF_HOSTING_REPO = "orchestrator"
def is_self_hosting_repo(repo: str) -> bool:
"""Return True iff repo is the self-hosted orchestrator (has staging infra).
Comparison is case-insensitive and strips whitespace for safety, but in
practice repo comes from the gitea webhook payload .repository.name which
is always lowercase (confirmed via projects.py registry entry).
"""
return (repo or "").strip().lower() == SELF_HOSTING_REPO.lower()
def _parse_staging_status(content: str) -> tuple[bool, str]:
"""Parse a 15-staging-log.md body and map its `staging_status:` frontmatter to a
quality-gate verdict. Reads ONLY the machine-readable YAML field, never prose.
staging_status: SUCCESS -> (True, "Staging status: SUCCESS")
staging_status: FAILED -> (False, "Staging status: FAILED")
missing field / no frontmatter / bad YAML -> (False, <reason>)
ORCH-52c: parse delegated to the unified ``frontmatter.parse_frontmatter``;
the staging_status semantics (self-hosting) are UNCHANGED (1:1).
"""
from ..frontmatter import parse_frontmatter, maybe_warn_schema
parse = parse_frontmatter(content)
if parse.yaml_error is not None:
return False, f"Invalid YAML frontmatter in staging log: {parse.yaml_error}"
status = None
if parse.has_block and not parse.malformed:
if parse.data:
maybe_warn_schema(content, "staging log")
status = str(parse.data.get("staging_status", "")).upper().strip()
if status == "SUCCESS":
return True, "Staging status: SUCCESS"
if status == "FAILED":
return False, "Staging status: FAILED"
return False, f"No machine-readable staging_status in frontmatter (got: {status!r})"
def _staging_log_from_main(repo: str, work_item_id: str) -> str | None:
"""Best-effort read of 15-staging-log.md from origin/main on the shared clone.
The deployer writes 15-staging-log.md and merges the staging artifacts into main
via a separate PR (mirroring the deploy-log pattern), so the file lands in
origin/main, NOT in the feature branch worktree the gate normally reads.
This recovers it from main.
Degrades gracefully: any git failure (no clone, network/fetch error, file
absent in main) returns None instead of raising, so the caller falls back to
the plain "not found" verdict. Never raises.
"""
repo_clone = os.path.join(settings.repos_dir, repo)
if not os.path.isdir(os.path.join(repo_clone, ".git")):
return None
rel = f"docs/work-items/{work_item_id}/15-staging-log.md"
try:
# Refresh origin/main so we see freshly-merged staging artifacts.
subprocess.run(
["git", "-C", repo_clone, "fetch", "origin", "main"],
check=False, capture_output=True, timeout=30,
)
show = subprocess.run(
["git", "-C", repo_clone, "show", f"origin/main:{rel}"],
check=False, capture_output=True, text=True, timeout=15,
)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("staging-log origin/main lookup failed for %s/%s: %s", repo, work_item_id, e)
return None
if show.returncode != 0:
return None
return show.stdout
def check_staging_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Gate the deploy-staging -> deploy transition on the deployer's machine-readable
verdict in 15-staging-log.md frontmatter (staging_status: SUCCESS|FAILED).
ORCH-35 conditional gate (Variant A):
- Non-self-hosting repos (anything other than "orchestrator") have no staging
instance and no deployer knowledge of it -> gate is an immediate pass.
- Self-hosting repo ("orchestrator") -> real check: reads ONLY the machine-
readable staging_status: field from YAML frontmatter, never body prose.
Mirrors check_deploy_status (БАГ 8) for the self-hosting path.
Lookup order (self-hosting only): worktree -> origin/main -> not found.
Returns:
(True, "Staging gate N/A for <repo>") -> non-self-hosting repo (instant pass)
(True, ...) -> staging_status: SUCCESS (self-hosting path)
(False, ...) -> staging_status: FAILED, missing field, or no frontmatter
"""
# Variant A: non-self-hosting repos have no staging infra -- skip entirely.
if not is_self_hosting_repo(repo):
return True, f"Staging gate N/A for {repo}"
# Self-hosting (orchestrator) path: real verdict check.
repo_path = _repo_path(repo, branch)
log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/15-staging-log.md")
if os.path.isfile(log_path):
try:
with open(log_path, "r") as f:
content = f.read()
except OSError as e:
return False, f"Error reading staging log: {e}"
return _parse_staging_status(content)
# Not in the feature worktree -- the deployer may have merged it into main.
main_content = _staging_log_from_main(repo, work_item_id)
if main_content is not None:
return _parse_staging_status(main_content)
return False, "Staging log not found (15-staging-log.md)"
def _merge_gate_applies(repo: str) -> bool:
"""Whether the merge-gate is REAL for this repo (ORCH-043, conditional rollout).
Mirrors the ORCH-35 conditional staging-gate. ``merge_gate_repos`` is a CSV of
repos where the gate is enforced; when empty the gate is real ONLY for the
self-hosting repo (``orchestrator``). Other repos -> conditional no-op.
"""
raw = (settings.merge_gate_repos or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
return (repo or "").strip().lower() in allowed
return is_self_hosting_repo(repo)
def check_branch_mergeable(repo: str, work_item_id: str, branch: str) -> tuple[bool, str]:
"""ORCH-043 merge-gate: validate the branch against the CURRENT origin/main
immediately before the deployer merges its PR (deploy-staging -> deploy edge).
Deterministic, no LLM. Algorithm (ADR-001 §4):
1. Conditionality: merge_gate_enabled=False -> (True, "merge-gate disabled");
repo where the gate is not real -> (True, "merge-gate N/A for <repo>").
2. Acquire the per-repo merge lease (NON-blocking). Busy -> (False, "merge-lock
busy") — a SIGNAL for the engine to DEFER (not a code fault, no rollback).
3. Double-check "behind origin/main" UNDER the lease (main may have moved while
we waited). Not behind -> (True, "branch up-to-date with main"); lease HELD.
4. Behind -> auto_rebase_onto_main:
- conflict -> release lease -> (False, "rebase conflict: ...")
- clean -> retest_branch:
green -> (True, "rebased onto main, re-test green"); lease HELD
red/timeout -> release lease -> (False, "re-test ... after rebase")
5. On SUCCESS the lease is HELD until the actual merge (released on PR-merged
webhook / deploy->done / rollback). On any FAILURE the lease is released.
Never-raise (AC-9): any internal error -> (False, "<reason>") with the lease
released; an exception never escapes into advance_stage.
"""
# Imported lazily so qg.checks stays importable without the merge_gate deps in
# minimal/test contexts and to avoid an import cycle surprise.
from .. import merge_gate
try:
if not settings.merge_gate_enabled:
return True, "merge-gate disabled"
if not _merge_gate_applies(repo):
return True, f"merge-gate N/A for {repo}"
acquired, reason = merge_gate.acquire_merge_lease(repo, branch, work_item_id)
if not acquired:
# "merge-lock busy" -> caller defers; lease NOT held by us, nothing to release.
return False, reason
try:
# ORCH-026 (Level A, A-2): proactive pre-merge rebase. When
# premerge_rebase_always is on, ALWAYS rebase onto the CURRENT
# origin/main under the held lease — even when branch_is_behind_main
# says "not behind". The ancestor check can miss a divergence
# (squash/force-push history, ORCH-073 phantom-merge class), so an
# unconditional rebase is a deterministic anti-phantom: it guarantees
# B carries A's code before merge. auto_rebase_onto_main is a cheap
# no-op on an already up-to-date branch (rc 0, push up-to-date, CI not
# retriggered). Kill-switch off -> 1:1 the ORCH-043 short-circuit
# below (rebase only when behind).
always = bool(getattr(settings, "premerge_rebase_always", False))
# Double-check under the lease: another task may have just merged.
if not always and not merge_gate.branch_is_behind_main(repo, branch):
logger.info("check_branch_mergeable: %s up-to-date with main", branch)
return True, "branch up-to-date with main"
ok, rb_reason = merge_gate.auto_rebase_onto_main(repo, branch)
if not ok:
merge_gate.release_merge_lease(repo, branch)
return False, rb_reason # "rebase conflict: ..."
ok_t, t_reason = merge_gate.retest_branch(repo, branch)
if ok_t:
logger.info("check_branch_mergeable: %s rebased + re-test green", branch)
return True, "rebased onto main, re-test green"
merge_gate.release_merge_lease(repo, branch)
if "timeout" in t_reason:
return False, t_reason # "re-test timeout after <T>s" (AC-6)
tail = t_reason.removeprefix("re-test failed: ")
return False, f"re-test failed after rebase: {tail}"
except Exception as e: # noqa: BLE001 - never-raise; always release on error
merge_gate.release_merge_lease(repo, branch)
logger.error("check_branch_mergeable inner error for %s/%s: %s", repo, branch, e)
return False, f"merge-gate error: {e}"
except Exception as e: # noqa: BLE001 - outer never-raise guard
logger.error("check_branch_mergeable error for %s/%s: %s", repo, branch, e)
return False, f"merge-gate error: {e}"
def _check_staging_image_fresh(repo: str, work_item_id: str, branch: str) -> tuple[bool, str]:
"""ORCH-058 freshness sub-gate (Strategy A) on the deploy-staging -> deploy edge.
Thin registry wrapper that delegates to ``image_freshness.check_staging_image_fresh``
(rebuild the staging image from the validated commit + recreate 8501). The real
logic lives in ``src/image_freshness.py`` (leaf module, never-raise, fail-closed);
importing it lazily here avoids an import cycle (image_freshness imports
is_self_hosting_repo from this module). For non-self repos it returns
``(True, "N/A")`` so the deploy edge is unchanged for them (AC-5).
"""
from ..image_freshness import check_staging_image_fresh
return check_staging_image_fresh(repo, work_item_id, branch)
def check_security_gate(repo: str, work_item_id: str, branch: str) -> tuple[bool, str]:
"""ORCH-022 security sub-gate (secret-scan + dependency audit) on the
deploy-staging -> deploy edge, run FIRST (before merge-gate / image-freshness).
Thin registry wrapper that delegates to ``security_gate.check_security_gate``
(gitleaks offline + pip-audit, write/read-back ``17-security-report.md``). The
real logic lives in ``src/security_gate.py`` (leaf module, never-raise,
fail-closed on secrets, fail-open degrade for the dep-audit feed); importing it
lazily here avoids an import cycle (security_gate imports is_self_hosting_repo
from this module). For non-self repos with an empty scope it returns
``(True, "security-gate N/A for <repo>")`` so the deploy edge is unchanged for
them (AC-13/TC-13).
"""
from ..security_gate import check_security_gate as _impl
return _impl(repo, work_item_id, branch)
# Registry for dynamic lookup by name
QG_CHECKS = {
"check_analysis_approved": check_analysis_approved,
"check_analysis_complete": check_analysis_complete,
"check_architecture_done": check_architecture_done,
"check_ci_green": check_ci_green,
"check_review_approved": check_review_approved,
"check_tests_passed": check_tests_passed,
"check_reviewer_verdict": check_reviewer_verdict,
"check_tests_local": check_tests_local,
"check_deploy_status": check_deploy_status,
"check_staging_status": check_staging_status,
"check_branch_mergeable": check_branch_mergeable,
"check_staging_image_fresh": _check_staging_image_fresh,
"check_security_gate": check_security_gate,
}