Files
orchestrator/src/qg/checks.py
Dev Agent a87c633003 refactor(plane_sync): parameterize project_id (backward compatible)
ORCH-6: sync functions resolve the issue PROJECT_ID via the registry
(get_project_by_repo) and accept project_id; default stays enduro so
existing ET callers keep working.
2026-06-02 22:30:42 +03:00

286 lines
11 KiB
Python

"""Quality Gate checks — real implementations using Gitea/Plane API and filesystem."""
import os
import logging
import httpx
from ..config import settings
logger = logging.getLogger("orchestrator.qg")
from ..git_worktree import get_worktree_path, ensure_worktree
def _repo_path(repo: str, branch: str | None = None) -> str:
"""Resolve the working path to read agent artifacts from.
ORCH-2 / S-4: artifacts now live in the per-branch worktree. When a branch is
given and its worktree exists on disk, read from there; otherwise fall back to
the shared /repos/<repo> clone (keeps backward-compat for 2-arg callers/tests).
"""
if branch:
wt = get_worktree_path(repo, branch)
if os.path.isdir(wt):
return wt
return os.path.join(settings.repos_dir, repo)
# Shared httpx client config
GITEA_HEADERS = {"Authorization": f"token {settings.gitea_token}"}
GITEA_BASE = f"{settings.gitea_url}/api/v1"
def check_analysis_complete(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check if analysis artifacts exist in the repo branch.
Required files:
- docs/work-items/<work_item_id>/01-brd.md
- docs/work-items/<work_item_id>/02-trz.md
- docs/work-items/<work_item_id>/03-acceptance-criteria.md
- docs/work-items/<work_item_id>/04-test-plan.yaml
"""
required_files = [
f"docs/work-items/{work_item_id}/01-brd.md",
f"docs/work-items/{work_item_id}/02-trz.md",
f"docs/work-items/{work_item_id}/03-acceptance-criteria.md",
f"docs/work-items/{work_item_id}/04-test-plan.yaml",
]
repo_path = _repo_path(repo, branch)
missing = []
for f in required_files:
full_path = os.path.join(repo_path, f)
if not os.path.isfile(full_path):
missing.append(f)
if missing:
return False, f"Missing files: {', '.join(missing)}"
return True, "All analysis artifacts present"
def check_architecture_done(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check if architecture artifacts exist.
Required: docs/work-items/<work_item_id>/06-adr/ (at least 1 file)
OR: docs/work-items/<work_item_id>/07-infra-requirements.md
"""
repo_path = _repo_path(repo, branch)
adr_dir = os.path.join(repo_path, f"docs/work-items/{work_item_id}/06-adr")
infra_file = os.path.join(repo_path, f"docs/work-items/{work_item_id}/07-infra-requirements.md")
if os.path.isdir(adr_dir) and len(os.listdir(adr_dir)) > 0:
return True, "ADR directory exists with files"
if os.path.isfile(infra_file):
return True, "Infra requirements file exists"
return False, "No ADR directory or infra-requirements.md found"
def check_ci_green(repo: str, branch: str) -> tuple[bool, str]:
"""
Check if CI status is green for branch via Gitea API.
GET /repos/{owner}/{repo}/commits/{branch}/status
"""
owner = settings.gitea_owner
url = f"{GITEA_BASE}/repos/{owner}/{repo}/commits/{branch}/status"
try:
resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10)
if resp.status_code == 404:
return False, f"Branch '{branch}' not found or no status"
resp.raise_for_status()
data = resp.json()
state = data.get("state", "unknown")
if state == "success":
return True, "CI green"
return False, f"CI state: {state}"
except httpx.HTTPError as e:
logger.error(f"Gitea API error checking CI: {e}")
return False, f"API error: {e}"
def check_review_approved(repo: str, pr_number: int) -> tuple[bool, str]:
"""
Check if PR has at least one approved review and no request_changes.
GET /repos/{owner}/{repo}/pulls/{pr_number}/reviews
"""
owner = settings.gitea_owner
url = f"{GITEA_BASE}/repos/{owner}/{repo}/pulls/{pr_number}/reviews"
try:
resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10)
resp.raise_for_status()
reviews = resp.json()
approved = 0
changes_requested = 0
for review in reviews:
# Skip stale reviews (dismissed by new commits)
if review.get("stale", False):
continue
state = review.get("state", "").upper()
if state == "APPROVED":
approved += 1
elif state == "REQUEST_CHANGES":
changes_requested += 1
if changes_requested > 0:
return False, f"Changes requested ({changes_requested} reviews)"
if approved > 0:
return True, f"Approved ({approved} reviews)"
return False, "No reviews yet"
except httpx.HTTPError as e:
logger.error(f"Gitea API error checking reviews: {e}")
return False, f"API error: {e}"
def check_tests_passed(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check if test report exists and contains PASS indicator.
File: docs/work-items/<work_item_id>/13-test-report.md
"""
repo_path = _repo_path(repo, branch)
report_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/13-test-report.md")
if not os.path.isfile(report_path):
return False, "Test report not found"
try:
with open(report_path, "r") as f:
content = f.read()
if "PASS" in content or "All tests passed" in content:
return True, "Test report indicates PASS"
return False, "Test report exists but no PASS indicator found"
except OSError as e:
return False, f"Error reading test report: {e}"
def check_analysis_approved(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check if analysis is complete AND approved by stakeholder.
Requirements:
1. All analysis artifacts exist (BRD, TRZ, AC, TestPlan)
2. Stakeholder has posted :approved: comment on the Plane issue
This QG is designed to be triggered by :approved: comment handler,
so the approval check verifies file completeness as a safety gate.
"""
# First check files
files_ok, files_reason = check_analysis_complete(repo, work_item_id, branch)
if not files_ok:
return False, files_reason
# Check for :approved: comment via Plane API
try:
from ..plane_sync import find_issue_id, PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID
from ..projects import get_project_by_repo
# ORCH-6: verify approval in the issue's own Plane project.
_proj = get_project_by_repo(repo)
_pid = _proj.plane_project_id if _proj else PROJECT_ID
issue_id = find_issue_id(work_item_id, _pid)
if not issue_id:
return False, "Cannot find Plane issue to verify approval"
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/comments/"
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
comments = resp.json()
# Handle paginated response
if isinstance(comments, dict):
comments = comments.get("results", [])
for comment in comments:
body = comment.get("comment_html", "") or comment.get("comment", "")
if ":approved:" in body:
return True, "Analysis complete and approved by stakeholder"
return False, "Analysis artifacts present but no :approved: comment found"
except Exception as e:
logger.warning(f"Failed to check approval for {work_item_id}: {e}")
# If we can't reach Plane API but files exist, allow advance
# (the :approved: handler already verified the comment exists)
return True, f"Files present; Plane API check skipped ({e})"
def check_reviewer_verdict(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Check reviewer agent verdict from 12-review.md (S-5 fix).
Reads ONLY the machine-readable `verdict:` field from the YAML frontmatter,
so tables / prose that merely mention APPROVED or REQUEST_CHANGES no longer
cause false positives/negatives. Returns:
(True, ...) -> verdict: APPROVED
(False, ...) -> verdict: REQUEST_CHANGES, missing verdict, or no frontmatter
"""
import yaml
repo_path = _repo_path(repo, branch)
review_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/12-review.md")
if not os.path.isfile(review_path):
return False, "Review report not found (12-review.md)"
try:
with open(review_path, "r") as f:
content = f.read()
verdict = None
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as e:
return False, f"Invalid YAML frontmatter in review: {e}"
verdict = str(fm.get("verdict", "")).upper().strip()
if verdict == "APPROVED":
return True, "Reviewer verdict: APPROVED"
if verdict == "REQUEST_CHANGES":
return False, "Reviewer verdict: REQUEST_CHANGES"
return False, f"No machine-readable verdict in frontmatter (got: {verdict!r})"
except OSError as e:
return False, f"Error reading review: {e}"
def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
"""
S-1 fix: run the project test suite locally and judge by exit code, instead of
depending on Gitea CI (which is not configured -> always false).
ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this
is safe for concurrent active tasks — no shared /repos checkout race.
"""
import subprocess
try:
repo_path = ensure_worktree(repo, branch)
r = subprocess.run(
["make", "test"], cwd=repo_path,
capture_output=True, text=True, timeout=600,
)
if r.returncode == 0:
return True, "Local tests passed"
tail = (r.stdout + r.stderr)[-500:]
return False, f"Local tests failed: ...{tail}"
except subprocess.TimeoutExpired:
return False, "Local tests timed out (600s)"
except Exception as e:
return False, f"Local test run error: {e}"
# Registry for dynamic lookup by name
QG_CHECKS = {
"check_analysis_approved": check_analysis_approved,
"check_analysis_complete": check_analysis_complete,
"check_architecture_done": check_architecture_done,
"check_ci_green": check_ci_green,
"check_review_approved": check_review_approved,
"check_tests_passed": check_tests_passed,
"check_reviewer_verdict": check_reviewer_verdict,
"check_tests_local": check_tests_local,
}