"""Quality Gate checks — real implementations using Gitea/Plane API and filesystem.""" import os import logging import subprocess import httpx from ..config import settings logger = logging.getLogger("orchestrator.qg") from ..git_worktree import get_worktree_path, ensure_worktree def _repo_path(repo: str, branch: str | None = None) -> str: """Resolve the working path to read agent artifacts from. ORCH-2 / S-4: artifacts now live in the per-branch worktree. When a branch is given and its worktree exists on disk, read from there; otherwise fall back to the shared /repos/ clone (keeps backward-compat for 2-arg callers/tests). """ if branch: wt = get_worktree_path(repo, branch) if os.path.isdir(wt): return wt return os.path.join(settings.repos_dir, repo) # Shared httpx client config GITEA_HEADERS = {"Authorization": f"token {settings.gitea_token}"} GITEA_BASE = f"{settings.gitea_url}/api/v1" def check_analysis_complete(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check if analysis artifacts exist in the repo branch. Required files: - docs/work-items//01-brd.md - docs/work-items//02-trz.md - docs/work-items//03-acceptance-criteria.md - docs/work-items//04-test-plan.yaml """ required_files = [ f"docs/work-items/{work_item_id}/01-brd.md", f"docs/work-items/{work_item_id}/02-trz.md", f"docs/work-items/{work_item_id}/03-acceptance-criteria.md", f"docs/work-items/{work_item_id}/04-test-plan.yaml", ] repo_path = _repo_path(repo, branch) missing = [] for f in required_files: full_path = os.path.join(repo_path, f) if not os.path.isfile(full_path): missing.append(f) if missing: return False, f"Missing files: {', '.join(missing)}" return True, "All analysis artifacts present" def check_architecture_done(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check if architecture artifacts exist. Required: docs/work-items//06-adr/ (at least 1 file) OR: docs/work-items//07-infra-requirements.md """ repo_path = _repo_path(repo, branch) adr_dir = os.path.join(repo_path, f"docs/work-items/{work_item_id}/06-adr") infra_file = os.path.join(repo_path, f"docs/work-items/{work_item_id}/07-infra-requirements.md") if os.path.isdir(adr_dir) and len(os.listdir(adr_dir)) > 0: return True, "ADR directory exists with files" if os.path.isfile(infra_file): return True, "Infra requirements file exists" return False, "No ADR directory or infra-requirements.md found" def check_ci_green(repo: str, branch: str) -> tuple[bool, str]: """ Check if CI status is green for branch via Gitea API. GET /repos/{owner}/{repo}/commits/{branch}/status """ owner = settings.gitea_owner url = f"{GITEA_BASE}/repos/{owner}/{repo}/commits/{branch}/status" try: resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10) if resp.status_code == 404: return False, f"Branch '{branch}' not found or no status" resp.raise_for_status() data = resp.json() state = data.get("state", "unknown") if state == "success": return True, "CI green" return False, f"CI state: {state}" except httpx.HTTPError as e: logger.error(f"Gitea API error checking CI: {e}") return False, f"API error: {e}" def check_review_approved(repo: str, pr_number: int) -> tuple[bool, str]: """ Check if PR has at least one approved review and no request_changes. GET /repos/{owner}/{repo}/pulls/{pr_number}/reviews """ owner = settings.gitea_owner url = f"{GITEA_BASE}/repos/{owner}/{repo}/pulls/{pr_number}/reviews" try: resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10) resp.raise_for_status() reviews = resp.json() approved = 0 changes_requested = 0 for review in reviews: # Skip stale reviews (dismissed by new commits) if review.get("stale", False): continue state = review.get("state", "").upper() if state == "APPROVED": approved += 1 elif state == "REQUEST_CHANGES": changes_requested += 1 if changes_requested > 0: return False, f"Changes requested ({changes_requested} reviews)" if approved > 0: return True, f"Approved ({approved} reviews)" return False, "No reviews yet" except httpx.HTTPError as e: logger.error(f"Gitea API error checking reviews: {e}") return False, f"API error: {e}" def check_tests_passed(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Gate the testing -> deploy transition on the tester's MACHINE-READABLE verdict in 13-test-report.md frontmatter, NOT on a naive substring search of the body. ET-013 fix: the previous implementation did `if "PASS" in content`, so a report explicitly marked `verdict: BLOCKED` / `status: blocked` but whose prose mentioned "23 passed" / "✅ PASS" / "All checks passed" was treated as a pass, and an unfinished feature reached Done. This mirrors check_reviewer_verdict (S-5) and check_deploy_status (БАГ 8): read ONLY the YAML frontmatter `verdict:` / `status:` fields, never the body. File: docs/work-items//13-test-report.md """ repo_path = _repo_path(repo, branch) report_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/13-test-report.md") if not os.path.isfile(report_path): return False, "Test report not found" try: with open(report_path, "r") as f: content = f.read() except OSError as e: return False, f"Error reading test report: {e}" return _parse_tests_verdict(content) # Positive / negative verdict tokens, derived from REAL tester reports in # enduro-trails (ET-001..ET-014). The tester is inconsistent: most write # `verdict: PASS`, but ET-006 used `verdict: ready-to-deploy` (with `status: PASSED`), # ET-007 `verdict: PASS — ready-to-deploy`, ET-008 `verdict: stage:ready-to-deploy` # (with `status: pass`). ET-013 (the bug) used `verdict: BLOCKED` / `status: blocked`. # We therefore match known positive/negative TOKENS inside the normalized # verdict/status fields, and treat a negative token as authoritative (a BLOCKED/FAILED # report never passes, even if another field looks positive). _TESTS_NEGATIVE_TOKENS = ("BLOCKED", "FAILED", "FAIL", "REQUEST_CHANGES", "REJECT", "RED") _TESTS_POSITIVE_TOKENS = ("PASSED", "PASS", "READY-TO-DEPLOY", "READY_TO_DEPLOY", "GREEN", "APPROVED") def _parse_tests_verdict(content: str) -> tuple[bool, str]: """Map a 13-test-report.md body to a quality-gate verdict by reading ONLY the machine-readable `verdict:` (and corroborating `status:`) YAML frontmatter fields. Rules: - No frontmatter / bad YAML / neither field present -> (False, reason). - A negative token (BLOCKED/FAILED/...) in verdict OR status -> (False) and is authoritative (ET-013 main case: verdict BLOCKED wins over any prose PASS). - Otherwise a positive token (PASS/PASSED/READY-TO-DEPLOY/...) in verdict OR status -> (True). - Anything else (unrecognized / empty verdict) -> (False, reason). """ import yaml if not content.startswith("---"): return False, "No YAML frontmatter in test report (cannot read machine verdict)" parts = content.split("---", 2) if len(parts) < 3: return False, "Malformed YAML frontmatter in test report" try: fm = yaml.safe_load(parts[1]) or {} except yaml.YAMLError as e: return False, f"Invalid YAML frontmatter in test report: {e}" if not isinstance(fm, dict): return False, "Malformed YAML frontmatter in test report (not a mapping)" verdict = str(fm.get("verdict", "") or "").upper().strip() status = str(fm.get("status", "") or "").upper().strip() if not verdict and not status: return False, "No machine-readable verdict/status in test report frontmatter" fields = f"{verdict} {status}" for neg in _TESTS_NEGATIVE_TOKENS: if neg in fields: return False, f"Test verdict: {verdict or status} ({neg})" for pos in _TESTS_POSITIVE_TOKENS: if pos in fields: return True, f"Test verdict: {verdict or status} (PASS)" return False, f"No recognized PASS verdict in frontmatter (verdict={verdict!r}, status={status!r})" def check_analysis_approved(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check if analysis is complete AND approved by stakeholder. Requirements: 1. All analysis artifacts exist (BRD, TRZ, AC, TestPlan) 2. Stakeholder has posted :approved: comment on the Plane issue This QG is designed to be triggered by :approved: comment handler, so the approval check verifies file completeness as a safety gate. """ # First check files files_ok, files_reason = check_analysis_complete(repo, work_item_id, branch) if not files_ok: return False, files_reason # Check for :approved: comment via Plane API try: from ..plane_sync import find_issue_id, PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID from ..projects import get_project_by_repo # ORCH-6: verify approval in the issue's own Plane project. _proj = get_project_by_repo(repo) _pid = _proj.plane_project_id if _proj else PROJECT_ID issue_id = find_issue_id(work_item_id, _pid) if not issue_id: return False, "Cannot find Plane issue to verify approval" url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/comments/" resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() comments = resp.json() # Handle paginated response if isinstance(comments, dict): comments = comments.get("results", []) for comment in comments: body = comment.get("comment_html", "") or comment.get("comment", "") if ":approved:" in body: return True, "Analysis complete and approved by stakeholder" return False, "Analysis artifacts present but no :approved: comment found" except Exception as e: logger.warning(f"Failed to check approval for {work_item_id}: {e}") # If we can't reach Plane API but files exist, allow advance # (the :approved: handler already verified the comment exists) return True, f"Files present; Plane API check skipped ({e})" def check_reviewer_verdict(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check reviewer agent verdict from 12-review.md (S-5 fix). Reads ONLY the machine-readable `verdict:` field from the YAML frontmatter, so tables / prose that merely mention APPROVED or REQUEST_CHANGES no longer cause false positives/negatives. Returns: (True, ...) -> verdict: APPROVED (False, ...) -> verdict: REQUEST_CHANGES, missing verdict, or no frontmatter """ import yaml repo_path = _repo_path(repo, branch) review_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/12-review.md") if not os.path.isfile(review_path): return False, "Review report not found (12-review.md)" try: with open(review_path, "r") as f: content = f.read() verdict = None if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: try: fm = yaml.safe_load(parts[1]) or {} except yaml.YAMLError as e: return False, f"Invalid YAML frontmatter in review: {e}" verdict = str(fm.get("verdict", "")).upper().strip() if verdict == "APPROVED": return True, "Reviewer verdict: APPROVED" if verdict == "REQUEST_CHANGES": return False, "Reviewer verdict: REQUEST_CHANGES" return False, f"No machine-readable verdict in frontmatter (got: {verdict!r})" except OSError as e: return False, f"Error reading review: {e}" def check_tests_local(repo: str, branch: str) -> tuple[bool, str]: """ DEPRECATED: replaced by check_ci_green on the development stage (CI is now configured). Kept for backward-compat; not wired to any stage. S-1 fix: run the project test suite locally and judge by exit code, instead of depending on Gitea CI (which is not configured -> always false). БАГ 5 fix: invoke pytest directly instead of make test. make is not installed in the orchestrator container, so the previous ["make", "test"] call raised FileNotFoundError. This reproduces the Makefile test target 1:1 (cd src/api && python -m pytest ../../tests/ -v). ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this is safe for concurrent active tasks — no shared /repos checkout race. """ import subprocess try: repo_path = ensure_worktree(repo, branch) r = subprocess.run( ["python", "-m", "pytest", "../../tests/", "-v"], cwd=os.path.join(repo_path, "src", "api"), capture_output=True, text=True, timeout=600, ) if r.returncode == 0: return True, "Local tests passed" tail = (r.stdout + r.stderr)[-500:] return False, f"Local tests failed: ...{tail}" except subprocess.TimeoutExpired: return False, "Local tests timed out (600s)" except Exception as e: return False, f"Local test run error: {e}" def _parse_deploy_status(content: str) -> tuple[bool, str]: """Parse a 14-deploy-log.md body and map its `deploy_status:` frontmatter to a quality-gate verdict. Reads ONLY the machine-readable YAML field, never prose. deploy_status: SUCCESS -> (True, "Deploy status: SUCCESS") deploy_status: FAILED -> (False, "Deploy status: FAILED") missing field / no frontmatter / bad YAML -> (False, ) """ import yaml status = None if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: try: fm = yaml.safe_load(parts[1]) or {} except yaml.YAMLError as e: return False, f"Invalid YAML frontmatter in deploy log: {e}" status = str(fm.get("deploy_status", "")).upper().strip() if status == "SUCCESS": return True, "Deploy status: SUCCESS" if status == "FAILED": return False, "Deploy status: FAILED" return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})" def _deploy_log_from_main(repo: str, work_item_id: str) -> str | None: """Best-effort read of 14-deploy-log.md from origin/main on the shared clone. The deployer writes 14-deploy-log.md and merges the deploy artifacts into main via a separate PR (see ET-013), so the file lands in origin/main, NOT in the feature branch worktree the gate normally reads. This recovers it from main. Degrades gracefully: any git failure (no clone, network/fetch error, file absent in main) returns None instead of raising, so the caller falls back to the plain "not found" verdict. Never raises. """ repo_clone = os.path.join(settings.repos_dir, repo) if not os.path.isdir(os.path.join(repo_clone, ".git")): return None rel = f"docs/work-items/{work_item_id}/14-deploy-log.md" try: # Refresh origin/main so we see freshly-merged deploy artifacts. subprocess.run( ["git", "-C", repo_clone, "fetch", "origin", "main"], check=False, capture_output=True, timeout=30, ) show = subprocess.run( ["git", "-C", repo_clone, "show", f"origin/main:{rel}"], check=False, capture_output=True, text=True, timeout=15, ) except (subprocess.SubprocessError, OSError) as e: logger.warning("deploy-log origin/main lookup failed for %s/%s: %s", repo, work_item_id, e) return None if show.returncode != 0: return None return show.stdout def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ БАГ 8 fix: gate the deploy -> done transition on the deployer's machine-readable verdict in 14-deploy-log.md frontmatter, NOT on the LLM process exit code (which is always 0 on a successful agent session even when the deploy failed). Mirrors check_reviewer_verdict (S-5): reads ONLY `deploy_status:` from YAML frontmatter. Returns: (True, ...) -> deploy_status: SUCCESS (False, ...) -> deploy_status: FAILED, missing field, or no frontmatter ET-013 path-sync fix: the deployer writes 14-deploy-log.md and merges the deploy artifacts into main via a SEPARATE PR, so the log lands in origin/main, not in the feature-branch worktree this gate reads via _repo_path(repo, branch). If the file is absent in the worktree we fall back to reading it from origin/main on the shared clone. Lookup order: worktree -> origin/main -> not found. """ repo_path = _repo_path(repo, branch) log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/14-deploy-log.md") if os.path.isfile(log_path): try: with open(log_path, "r") as f: content = f.read() except OSError as e: return False, f"Error reading deploy log: {e}" return _parse_deploy_status(content) # Not in the feature worktree — the deployer may have merged it into main. main_content = _deploy_log_from_main(repo, work_item_id) if main_content is not None: return _parse_deploy_status(main_content) return False, "Deploy log not found (14-deploy-log.md)" # Registry for dynamic lookup by name QG_CHECKS = { "check_analysis_approved": check_analysis_approved, "check_analysis_complete": check_analysis_complete, "check_architecture_done": check_architecture_done, "check_ci_green": check_ci_green, "check_review_approved": check_review_approved, "check_tests_passed": check_tests_passed, "check_reviewer_verdict": check_reviewer_verdict, "check_tests_local": check_tests_local, "check_deploy_status": check_deploy_status, }