Files
orchestrator/src/qg/checks.py
claude-bot 51a76e8169
All checks were successful
CI / test (push) Successful in 12s
CI / test (pull_request) Successful in 11s
fix(qg): read result: alongside verdict:/status: in tests gate
_parse_tests_verdict now accepts three equal-rank machine-readable
frontmatter fields in 13-test-report.md — result: (canonical tester
output), verdict: and status: (legacy/enduro-trails). Any one non-empty
field suffices; a negative token in any field stays authoritative.

Fixes the producer/consumer contract mismatch where the tester emits
`result: PASS` (per .openclaw/agents/tester.md) but the gate only read
verdict:/status:, causing a testing->development rollback loop until
MAX_DEVELOPER_RETRIES (observed on ORCH-17). Token sets frozen and gate
signature/QG_CHECKS unchanged for full backward compatibility.

Refs: ORCH-047
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-05 21:03:32 +00:00

637 lines
26 KiB
Python

"""Quality Gate checks — real implementations using Gitea/Plane API and filesystem."""
import os
import time
import logging
import subprocess
import httpx
from ..config import settings
logger = logging.getLogger("orchestrator.qg")
from ..git_worktree import get_worktree_path, ensure_worktree
def _repo_path(repo: str, branch: str | None = None) -> str:
"""Resolve the working path to read agent artifacts from.
ORCH-2 / S-4: artifacts now live in the per-branch worktree. When a branch is
given and its worktree exists on disk, read from there; otherwise fall back to
the shared /repos/<repo> clone (keeps backward-compat for 2-arg callers/tests).
"""
if branch:
wt = get_worktree_path(repo, branch)
if os.path.isdir(wt):
return wt
return os.path.join(settings.repos_dir, repo)
# Shared httpx client config
GITEA_HEADERS = {"Authorization": f"token {settings.gitea_token}"}
GITEA_BASE = f"{settings.gitea_url}/api/v1"
def check_analysis_complete(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check if analysis artifacts exist in the repo branch.
Required files:
- docs/work-items/<work_item_id>/01-brd.md
- docs/work-items/<work_item_id>/02-trz.md
- docs/work-items/<work_item_id>/03-acceptance-criteria.md
- docs/work-items/<work_item_id>/04-test-plan.yaml
"""
required_files = [
f"docs/work-items/{work_item_id}/01-brd.md",
f"docs/work-items/{work_item_id}/02-trz.md",
f"docs/work-items/{work_item_id}/03-acceptance-criteria.md",
f"docs/work-items/{work_item_id}/04-test-plan.yaml",
]
repo_path = _repo_path(repo, branch)
missing = []
for f in required_files:
full_path = os.path.join(repo_path, f)
if not os.path.isfile(full_path):
missing.append(f)
if missing:
return False, f"Missing files: {', '.join(missing)}"
return True, "All analysis artifacts present"
def check_architecture_done(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check if architecture artifacts exist.
Required: docs/work-items/<work_item_id>/06-adr/ (at least 1 file)
OR: docs/work-items/<work_item_id>/07-infra-requirements.md
"""
repo_path = _repo_path(repo, branch)
adr_dir = os.path.join(repo_path, f"docs/work-items/{work_item_id}/06-adr")
infra_file = os.path.join(repo_path, f"docs/work-items/{work_item_id}/07-infra-requirements.md")
if os.path.isdir(adr_dir) and len(os.listdir(adr_dir)) > 0:
return True, "ADR directory exists with files"
if os.path.isfile(infra_file):
return True, "Infra requirements file exists"
return False, "No ADR directory or infra-requirements.md found"
def check_ci_green(repo: str, branch: str) -> tuple[bool, str]:
"""
Check if CI status is green for branch via Gitea API.
GET /repos/{owner}/{repo}/commits/{branch}/status
ORCH-045: polling with retry to fix a race condition. The gate used to do a
single status read right after the developer push; if CI was still ``pending``
for the first 1-3s (real case ORCH-017: polled 17:58:54 -> pending, CI went
green 17:58:55) the gate returned False once and the task stalled silently.
Behaviour now:
* ``success`` -> (True, "CI green") immediately.
* ``failure`` / ``error`` -> (False, "CI state: <state>") immediately
(CI is red, retrying is pointless).
* ``pending`` / unknown -> sleep ``ci_poll_interval_s`` and poll again,
up to ``ci_poll_max_attempts`` times.
* still pending after all attempts -> (False, "CI still pending after <T>s").
* 404 -> (False, "Branch not found or no status").
* transient httpx errors -> logged and retried within the attempt budget;
if every attempt errors -> (False, "API error: <e>").
"""
owner = settings.gitea_owner
url = f"{GITEA_BASE}/repos/{owner}/{repo}/commits/{branch}/status"
attempts = settings.ci_poll_max_attempts
interval = settings.ci_poll_interval_s
last_state = "unknown"
last_error: Exception | None = None
for i in range(1, attempts + 1):
try:
resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10)
if resp.status_code == 404:
return False, f"Branch '{branch}' not found or no status"
resp.raise_for_status()
data = resp.json()
last_state = data.get("state", "unknown")
last_error = None
if last_state == "success":
return True, "CI green"
if last_state in ("failure", "error"):
return False, f"CI state: {last_state}"
# non-terminal (pending / unknown / other) -> retry below
except httpx.HTTPError as e:
last_error = e
logger.error(f"check_ci_green: attempt {i}/{attempts} API error: {e}")
if i < attempts:
if last_error is not None:
logger.info(
f"check_ci_green: attempt {i}/{attempts}, error, retrying in {interval}s"
)
else:
logger.info(
f"check_ci_green: attempt {i}/{attempts}, state={last_state}, "
f"retrying in {interval}s"
)
time.sleep(interval)
if last_error is not None:
return False, f"API error: {last_error}"
return False, f"CI still pending after {attempts * interval}s"
def check_review_approved(repo: str, pr_number: int) -> tuple[bool, str]:
"""
Check if PR has at least one approved review and no request_changes.
GET /repos/{owner}/{repo}/pulls/{pr_number}/reviews
"""
owner = settings.gitea_owner
url = f"{GITEA_BASE}/repos/{owner}/{repo}/pulls/{pr_number}/reviews"
try:
resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10)
resp.raise_for_status()
reviews = resp.json()
approved = 0
changes_requested = 0
for review in reviews:
# Skip stale reviews (dismissed by new commits)
if review.get("stale", False):
continue
state = review.get("state", "").upper()
if state == "APPROVED":
approved += 1
elif state == "REQUEST_CHANGES":
changes_requested += 1
if changes_requested > 0:
return False, f"Changes requested ({changes_requested} reviews)"
if approved > 0:
return True, f"Approved ({approved} reviews)"
return False, "No reviews yet"
except httpx.HTTPError as e:
logger.error(f"Gitea API error checking reviews: {e}")
return False, f"API error: {e}"
def check_tests_passed(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Gate the testing -> deploy transition on the tester's MACHINE-READABLE verdict
in 13-test-report.md frontmatter, NOT on a naive substring search of the body.
ET-013 fix: the previous implementation did `if "PASS" in content`, so a report
explicitly marked `verdict: BLOCKED` / `status: blocked` but whose prose mentioned
"23 passed" / "✅ PASS" / "All checks passed" was treated as a pass, and an
unfinished feature reached Done. This mirrors check_reviewer_verdict (S-5) and
check_deploy_status (БАГ 8): read ONLY the YAML frontmatter, never the body.
ORCH-047: the machine verdict is read from any of three equal-rank frontmatter
fields — `result:` (canonical, what the tester prompt emits), `verdict:` or
`status:` (legacy / enduro-trails). See _parse_tests_verdict.
File: docs/work-items/<work_item_id>/13-test-report.md
"""
repo_path = _repo_path(repo, branch)
report_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/13-test-report.md")
if not os.path.isfile(report_path):
return False, "Test report not found"
try:
with open(report_path, "r") as f:
content = f.read()
except OSError as e:
return False, f"Error reading test report: {e}"
return _parse_tests_verdict(content)
# Positive / negative verdict tokens, derived from REAL tester reports in
# enduro-trails (ET-001..ET-014). The tester is inconsistent: most write
# `verdict: PASS`, but ET-006 used `verdict: ready-to-deploy` (with `status: PASSED`),
# ET-007 `verdict: PASS — ready-to-deploy`, ET-008 `verdict: stage:ready-to-deploy`
# (with `status: pass`). ET-013 (the bug) used `verdict: BLOCKED` / `status: blocked`.
# We therefore match known positive/negative TOKENS inside the normalized
# verdict/status fields, and treat a negative token as authoritative (a BLOCKED/FAILED
# report never passes, even if another field looks positive).
_TESTS_NEGATIVE_TOKENS = ("BLOCKED", "FAILED", "FAIL", "REQUEST_CHANGES", "REJECT", "RED")
_TESTS_POSITIVE_TOKENS = ("PASSED", "PASS", "READY-TO-DEPLOY", "READY_TO_DEPLOY", "GREEN", "APPROVED")
def _parse_tests_verdict(content: str) -> tuple[bool, str]:
"""Map a 13-test-report.md body to a quality-gate verdict by reading ONLY the
machine-readable YAML frontmatter fields — never the prose body.
Three equal-rank fields are accepted (ORCH-047): `result:` (the canonical field
the tester prompt `.openclaw/agents/tester.md` is told to emit, `result: PASS|FAIL`),
plus `verdict:` and `status:` (legacy / enduro-trails ET-001..ET-014). ANY single
non-empty field is sufficient. Token sets are frozen for backward compatibility.
Rules:
- No frontmatter / bad YAML / none of the three fields present -> (False, reason).
- A negative token (BLOCKED/FAILED/...) in ANY field -> (False) and is
authoritative (ET-013 main case: verdict BLOCKED wins over any prose PASS, and
beats a positive token in another field).
- Otherwise a positive token (PASS/PASSED/READY-TO-DEPLOY/...) in ANY field -> (True).
- Anything else (fields set but unrecognized) -> (False, reason).
"""
import yaml
if not content.startswith("---"):
return False, "No YAML frontmatter in test report (cannot read machine verdict)"
parts = content.split("---", 2)
if len(parts) < 3:
return False, "Malformed YAML frontmatter in test report"
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as e:
return False, f"Invalid YAML frontmatter in test report: {e}"
if not isinstance(fm, dict):
return False, "Malformed YAML frontmatter in test report (not a mapping)"
verdict = str(fm.get("verdict", "") or "").upper().strip()
status = str(fm.get("status", "") or "").upper().strip()
result = str(fm.get("result", "") or "").upper().strip()
if not verdict and not status and not result:
return False, "No machine-readable verdict/status/result in test report frontmatter"
value = verdict or status or result
fields = f"{verdict} {status} {result}"
for neg in _TESTS_NEGATIVE_TOKENS:
if neg in fields:
return False, f"Test verdict: {value} ({neg})"
for pos in _TESTS_POSITIVE_TOKENS:
if pos in fields:
return True, f"Test verdict: {value} (PASS)"
return (
False,
f"No recognized PASS verdict in frontmatter "
f"(verdict={verdict!r}, status={status!r}, result={result!r})",
)
def check_analysis_approved(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check if analysis is complete AND approved by stakeholder.
Requirements:
1. All analysis artifacts exist (BRD, TRZ, AC, TestPlan)
2. Stakeholder has posted :approved: comment on the Plane issue
This QG is designed to be triggered by :approved: comment handler,
so the approval check verifies file completeness as a safety gate.
"""
# First check files
files_ok, files_reason = check_analysis_complete(repo, work_item_id, branch)
if not files_ok:
return False, files_reason
# Check for :approved: comment via Plane API
try:
from ..plane_sync import find_issue_id, PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID
from ..projects import get_project_by_repo
# ORCH-6: verify approval in the issue's own Plane project.
_proj = get_project_by_repo(repo)
_pid = _proj.plane_project_id if _proj else PROJECT_ID
issue_id = find_issue_id(work_item_id, _pid)
if not issue_id:
return False, "Cannot find Plane issue to verify approval"
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/comments/"
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
comments = resp.json()
# Handle paginated response
if isinstance(comments, dict):
comments = comments.get("results", [])
for comment in comments:
body = comment.get("comment_html", "") or comment.get("comment", "")
if ":approved:" in body:
return True, "Analysis complete and approved by stakeholder"
return False, "Analysis artifacts present but no :approved: comment found"
except Exception as e:
logger.warning(f"Failed to check approval for {work_item_id}: {e}")
# If we can't reach Plane API but files exist, allow advance
# (the :approved: handler already verified the comment exists)
return True, f"Files present; Plane API check skipped ({e})"
def check_reviewer_verdict(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check reviewer agent verdict from 12-review.md (S-5 fix).
Reads ONLY the machine-readable `verdict:` field from the YAML frontmatter,
so tables / prose that merely mention APPROVED or REQUEST_CHANGES no longer
cause false positives/negatives. Returns:
(True, ...) -> verdict: APPROVED
(False, ...) -> verdict: REQUEST_CHANGES, missing verdict, or no frontmatter
"""
import yaml
repo_path = _repo_path(repo, branch)
review_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/12-review.md")
if not os.path.isfile(review_path):
return False, "Review report not found (12-review.md)"
try:
with open(review_path, "r") as f:
content = f.read()
verdict = None
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as e:
return False, f"Invalid YAML frontmatter in review: {e}"
verdict = str(fm.get("verdict", "")).upper().strip()
if verdict == "APPROVED":
return True, "Reviewer verdict: APPROVED"
if verdict == "REQUEST_CHANGES":
return False, "Reviewer verdict: REQUEST_CHANGES"
return False, f"No machine-readable verdict in frontmatter (got: {verdict!r})"
except OSError as e:
return False, f"Error reading review: {e}"
def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
"""
DEPRECATED: replaced by check_ci_green on the development stage (CI is now
configured). Kept for backward-compat; not wired to any stage.
S-1 fix: run the project test suite locally and judge by exit code, instead of
depending on Gitea CI (which is not configured -> always false).
БАГ 5 fix: invoke pytest directly instead of make test. make is not installed
in the orchestrator container, so the previous ["make", "test"] call raised
FileNotFoundError. This reproduces the Makefile test target 1:1
(cd src/api && python -m pytest ../../tests/ -v).
ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this
is safe for concurrent active tasks — no shared /repos checkout race.
"""
import subprocess
try:
repo_path = ensure_worktree(repo, branch)
r = subprocess.run(
["python", "-m", "pytest", "../../tests/", "-v"],
cwd=os.path.join(repo_path, "src", "api"),
capture_output=True, text=True, timeout=600,
)
if r.returncode == 0:
return True, "Local tests passed"
tail = (r.stdout + r.stderr)[-500:]
return False, f"Local tests failed: ...{tail}"
except subprocess.TimeoutExpired:
return False, "Local tests timed out (600s)"
except Exception as e:
return False, f"Local test run error: {e}"
def _parse_deploy_status(content: str) -> tuple[bool, str]:
"""Parse a 14-deploy-log.md body and map its `deploy_status:` frontmatter to a
quality-gate verdict. Reads ONLY the machine-readable YAML field, never prose.
deploy_status: SUCCESS -> (True, "Deploy status: SUCCESS")
deploy_status: FAILED -> (False, "Deploy status: FAILED")
missing field / no frontmatter / bad YAML -> (False, <reason>)
"""
import yaml
status = None
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as e:
return False, f"Invalid YAML frontmatter in deploy log: {e}"
status = str(fm.get("deploy_status", "")).upper().strip()
if status == "SUCCESS":
return True, "Deploy status: SUCCESS"
if status == "FAILED":
return False, "Deploy status: FAILED"
return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})"
def _deploy_log_from_main(repo: str, work_item_id: str) -> str | None:
"""Best-effort read of 14-deploy-log.md from origin/main on the shared clone.
The deployer writes 14-deploy-log.md and merges the deploy artifacts into main
via a separate PR (see ET-013), so the file lands in origin/main, NOT in the
feature branch worktree the gate normally reads. This recovers it from main.
Degrades gracefully: any git failure (no clone, network/fetch error, file
absent in main) returns None instead of raising, so the caller falls back to
the plain "not found" verdict. Never raises.
"""
repo_clone = os.path.join(settings.repos_dir, repo)
if not os.path.isdir(os.path.join(repo_clone, ".git")):
return None
rel = f"docs/work-items/{work_item_id}/14-deploy-log.md"
try:
# Refresh origin/main so we see freshly-merged deploy artifacts.
subprocess.run(
["git", "-C", repo_clone, "fetch", "origin", "main"],
check=False, capture_output=True, timeout=30,
)
show = subprocess.run(
["git", "-C", repo_clone, "show", f"origin/main:{rel}"],
check=False, capture_output=True, text=True, timeout=15,
)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("deploy-log origin/main lookup failed for %s/%s: %s", repo, work_item_id, e)
return None
if show.returncode != 0:
return None
return show.stdout
def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
БАГ 8 fix: gate the deploy -> done transition on the deployer's machine-readable
verdict in 14-deploy-log.md frontmatter, NOT on the LLM process exit code
(which is always 0 on a successful agent session even when the deploy failed).
Mirrors check_reviewer_verdict (S-5): reads ONLY `deploy_status:` from YAML
frontmatter. Returns:
(True, ...) -> deploy_status: SUCCESS
(False, ...) -> deploy_status: FAILED, missing field, or no frontmatter
ET-013 path-sync fix: the deployer writes 14-deploy-log.md and merges the deploy
artifacts into main via a SEPARATE PR, so the log lands in origin/main, not in
the feature-branch worktree this gate reads via _repo_path(repo, branch). If the
file is absent in the worktree we fall back to reading it from origin/main on the
shared clone. Lookup order: worktree -> origin/main -> not found.
"""
repo_path = _repo_path(repo, branch)
log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/14-deploy-log.md")
if os.path.isfile(log_path):
try:
with open(log_path, "r") as f:
content = f.read()
except OSError as e:
return False, f"Error reading deploy log: {e}"
return _parse_deploy_status(content)
# Not in the feature worktree — the deployer may have merged it into main.
main_content = _deploy_log_from_main(repo, work_item_id)
if main_content is not None:
return _parse_deploy_status(main_content)
return False, "Deploy log not found (14-deploy-log.md)"
# ---------------------------------------------------------------------------
# Self-hosting detection: staging-infra (localhost:8501) exists ONLY for the
# orchestrator repo itself (self-hosting). Other repos have no staging instance
# and their deployer prompts know nothing about it -- the gate must be a no-op
# for them. The repo value is the plain gitea repo name (ProjectConfig.repo),
# matching what _run_qg/advance_stage pass in. See ORCH-35 / PR #31.
# ---------------------------------------------------------------------------
SELF_HOSTING_REPO = "orchestrator"
def is_self_hosting_repo(repo: str) -> bool:
"""Return True iff repo is the self-hosted orchestrator (has staging infra).
Comparison is case-insensitive and strips whitespace for safety, but in
practice repo comes from the gitea webhook payload .repository.name which
is always lowercase (confirmed via projects.py registry entry).
"""
return (repo or "").strip().lower() == SELF_HOSTING_REPO.lower()
def _parse_staging_status(content: str) -> tuple[bool, str]:
"""Parse a 15-staging-log.md body and map its `staging_status:` frontmatter to a
quality-gate verdict. Reads ONLY the machine-readable YAML field, never prose.
staging_status: SUCCESS -> (True, "Staging status: SUCCESS")
staging_status: FAILED -> (False, "Staging status: FAILED")
missing field / no frontmatter / bad YAML -> (False, <reason>)
"""
import yaml
status = None
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as e:
return False, f"Invalid YAML frontmatter in staging log: {e}"
status = str(fm.get("staging_status", "")).upper().strip()
if status == "SUCCESS":
return True, "Staging status: SUCCESS"
if status == "FAILED":
return False, "Staging status: FAILED"
return False, f"No machine-readable staging_status in frontmatter (got: {status!r})"
def _staging_log_from_main(repo: str, work_item_id: str) -> str | None:
"""Best-effort read of 15-staging-log.md from origin/main on the shared clone.
The deployer writes 15-staging-log.md and merges the staging artifacts into main
via a separate PR (mirroring the deploy-log pattern), so the file lands in
origin/main, NOT in the feature branch worktree the gate normally reads.
This recovers it from main.
Degrades gracefully: any git failure (no clone, network/fetch error, file
absent in main) returns None instead of raising, so the caller falls back to
the plain "not found" verdict. Never raises.
"""
repo_clone = os.path.join(settings.repos_dir, repo)
if not os.path.isdir(os.path.join(repo_clone, ".git")):
return None
rel = f"docs/work-items/{work_item_id}/15-staging-log.md"
try:
# Refresh origin/main so we see freshly-merged staging artifacts.
subprocess.run(
["git", "-C", repo_clone, "fetch", "origin", "main"],
check=False, capture_output=True, timeout=30,
)
show = subprocess.run(
["git", "-C", repo_clone, "show", f"origin/main:{rel}"],
check=False, capture_output=True, text=True, timeout=15,
)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("staging-log origin/main lookup failed for %s/%s: %s", repo, work_item_id, e)
return None
if show.returncode != 0:
return None
return show.stdout
def check_staging_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Gate the deploy-staging -> deploy transition on the deployer's machine-readable
verdict in 15-staging-log.md frontmatter (staging_status: SUCCESS|FAILED).
ORCH-35 conditional gate (Variant A):
- Non-self-hosting repos (anything other than "orchestrator") have no staging
instance and no deployer knowledge of it -> gate is an immediate pass.
- Self-hosting repo ("orchestrator") -> real check: reads ONLY the machine-
readable staging_status: field from YAML frontmatter, never body prose.
Mirrors check_deploy_status (БАГ 8) for the self-hosting path.
Lookup order (self-hosting only): worktree -> origin/main -> not found.
Returns:
(True, "Staging gate N/A for <repo>") -> non-self-hosting repo (instant pass)
(True, ...) -> staging_status: SUCCESS (self-hosting path)
(False, ...) -> staging_status: FAILED, missing field, or no frontmatter
"""
# Variant A: non-self-hosting repos have no staging infra -- skip entirely.
if not is_self_hosting_repo(repo):
return True, f"Staging gate N/A for {repo}"
# Self-hosting (orchestrator) path: real verdict check.
repo_path = _repo_path(repo, branch)
log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/15-staging-log.md")
if os.path.isfile(log_path):
try:
with open(log_path, "r") as f:
content = f.read()
except OSError as e:
return False, f"Error reading staging log: {e}"
return _parse_staging_status(content)
# Not in the feature worktree -- the deployer may have merged it into main.
main_content = _staging_log_from_main(repo, work_item_id)
if main_content is not None:
return _parse_staging_status(main_content)
return False, "Staging log not found (15-staging-log.md)"
# Registry for dynamic lookup by name
QG_CHECKS = {
"check_analysis_approved": check_analysis_approved,
"check_analysis_complete": check_analysis_complete,
"check_architecture_done": check_architecture_done,
"check_ci_green": check_ci_green,
"check_review_approved": check_review_approved,
"check_tests_passed": check_tests_passed,
"check_reviewer_verdict": check_reviewer_verdict,
"check_tests_local": check_tests_local,
"check_deploy_status": check_deploy_status,
"check_staging_status": check_staging_status,
}