"""Quality Gate checks — real implementations using Gitea/Plane API and filesystem.""" import os import logging import httpx from ..config import settings logger = logging.getLogger("orchestrator.qg") from ..git_worktree import get_worktree_path, ensure_worktree def _repo_path(repo: str, branch: str | None = None) -> str: """Resolve the working path to read agent artifacts from. ORCH-2 / S-4: artifacts now live in the per-branch worktree. When a branch is given and its worktree exists on disk, read from there; otherwise fall back to the shared /repos/ clone (keeps backward-compat for 2-arg callers/tests). """ if branch: wt = get_worktree_path(repo, branch) if os.path.isdir(wt): return wt return os.path.join(settings.repos_dir, repo) # Shared httpx client config GITEA_HEADERS = {"Authorization": f"token {settings.gitea_token}"} GITEA_BASE = f"{settings.gitea_url}/api/v1" def check_analysis_complete(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check if analysis artifacts exist in the repo branch. Required files: - docs/work-items//01-brd.md - docs/work-items//02-trz.md - docs/work-items//03-acceptance-criteria.md - docs/work-items//04-test-plan.yaml """ required_files = [ f"docs/work-items/{work_item_id}/01-brd.md", f"docs/work-items/{work_item_id}/02-trz.md", f"docs/work-items/{work_item_id}/03-acceptance-criteria.md", f"docs/work-items/{work_item_id}/04-test-plan.yaml", ] repo_path = _repo_path(repo, branch) missing = [] for f in required_files: full_path = os.path.join(repo_path, f) if not os.path.isfile(full_path): missing.append(f) if missing: return False, f"Missing files: {', '.join(missing)}" return True, "All analysis artifacts present" def check_architecture_done(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check if architecture artifacts exist. Required: docs/work-items//06-adr/ (at least 1 file) OR: docs/work-items//07-infra-requirements.md """ repo_path = _repo_path(repo, branch) adr_dir = os.path.join(repo_path, f"docs/work-items/{work_item_id}/06-adr") infra_file = os.path.join(repo_path, f"docs/work-items/{work_item_id}/07-infra-requirements.md") if os.path.isdir(adr_dir) and len(os.listdir(adr_dir)) > 0: return True, "ADR directory exists with files" if os.path.isfile(infra_file): return True, "Infra requirements file exists" return False, "No ADR directory or infra-requirements.md found" def check_ci_green(repo: str, branch: str) -> tuple[bool, str]: """ Check if CI status is green for branch via Gitea API. GET /repos/{owner}/{repo}/commits/{branch}/status """ owner = settings.gitea_owner url = f"{GITEA_BASE}/repos/{owner}/{repo}/commits/{branch}/status" try: resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10) if resp.status_code == 404: return False, f"Branch '{branch}' not found or no status" resp.raise_for_status() data = resp.json() state = data.get("state", "unknown") if state == "success": return True, "CI green" return False, f"CI state: {state}" except httpx.HTTPError as e: logger.error(f"Gitea API error checking CI: {e}") return False, f"API error: {e}" def check_review_approved(repo: str, pr_number: int) -> tuple[bool, str]: """ Check if PR has at least one approved review and no request_changes. GET /repos/{owner}/{repo}/pulls/{pr_number}/reviews """ owner = settings.gitea_owner url = f"{GITEA_BASE}/repos/{owner}/{repo}/pulls/{pr_number}/reviews" try: resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10) resp.raise_for_status() reviews = resp.json() approved = 0 changes_requested = 0 for review in reviews: # Skip stale reviews (dismissed by new commits) if review.get("stale", False): continue state = review.get("state", "").upper() if state == "APPROVED": approved += 1 elif state == "REQUEST_CHANGES": changes_requested += 1 if changes_requested > 0: return False, f"Changes requested ({changes_requested} reviews)" if approved > 0: return True, f"Approved ({approved} reviews)" return False, "No reviews yet" except httpx.HTTPError as e: logger.error(f"Gitea API error checking reviews: {e}") return False, f"API error: {e}" def check_tests_passed(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check if test report exists and contains PASS indicator. File: docs/work-items//13-test-report.md """ repo_path = _repo_path(repo, branch) report_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/13-test-report.md") if not os.path.isfile(report_path): return False, "Test report not found" try: with open(report_path, "r") as f: content = f.read() if "PASS" in content or "All tests passed" in content: return True, "Test report indicates PASS" return False, "Test report exists but no PASS indicator found" except OSError as e: return False, f"Error reading test report: {e}" def check_analysis_approved(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check if analysis is complete AND approved by stakeholder. Requirements: 1. All analysis artifacts exist (BRD, TRZ, AC, TestPlan) 2. Stakeholder has posted :approved: comment on the Plane issue This QG is designed to be triggered by :approved: comment handler, so the approval check verifies file completeness as a safety gate. """ # First check files files_ok, files_reason = check_analysis_complete(repo, work_item_id, branch) if not files_ok: return False, files_reason # Check for :approved: comment via Plane API try: from ..plane_sync import find_issue_id, PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID from ..projects import get_project_by_repo # ORCH-6: verify approval in the issue's own Plane project. _proj = get_project_by_repo(repo) _pid = _proj.plane_project_id if _proj else PROJECT_ID issue_id = find_issue_id(work_item_id, _pid) if not issue_id: return False, "Cannot find Plane issue to verify approval" url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/comments/" resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10) resp.raise_for_status() comments = resp.json() # Handle paginated response if isinstance(comments, dict): comments = comments.get("results", []) for comment in comments: body = comment.get("comment_html", "") or comment.get("comment", "") if ":approved:" in body: return True, "Analysis complete and approved by stakeholder" return False, "Analysis artifacts present but no :approved: comment found" except Exception as e: logger.warning(f"Failed to check approval for {work_item_id}: {e}") # If we can't reach Plane API but files exist, allow advance # (the :approved: handler already verified the comment exists) return True, f"Files present; Plane API check skipped ({e})" def check_reviewer_verdict(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ Check reviewer agent verdict from 12-review.md (S-5 fix). Reads ONLY the machine-readable `verdict:` field from the YAML frontmatter, so tables / prose that merely mention APPROVED or REQUEST_CHANGES no longer cause false positives/negatives. Returns: (True, ...) -> verdict: APPROVED (False, ...) -> verdict: REQUEST_CHANGES, missing verdict, or no frontmatter """ import yaml repo_path = _repo_path(repo, branch) review_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/12-review.md") if not os.path.isfile(review_path): return False, "Review report not found (12-review.md)" try: with open(review_path, "r") as f: content = f.read() verdict = None if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: try: fm = yaml.safe_load(parts[1]) or {} except yaml.YAMLError as e: return False, f"Invalid YAML frontmatter in review: {e}" verdict = str(fm.get("verdict", "")).upper().strip() if verdict == "APPROVED": return True, "Reviewer verdict: APPROVED" if verdict == "REQUEST_CHANGES": return False, "Reviewer verdict: REQUEST_CHANGES" return False, f"No machine-readable verdict in frontmatter (got: {verdict!r})" except OSError as e: return False, f"Error reading review: {e}" def check_tests_local(repo: str, branch: str) -> tuple[bool, str]: """ DEPRECATED: replaced by check_ci_green on the development stage (CI is now configured). Kept for backward-compat; not wired to any stage. S-1 fix: run the project test suite locally and judge by exit code, instead of depending on Gitea CI (which is not configured -> always false). БАГ 5 fix: invoke pytest directly instead of make test. make is not installed in the orchestrator container, so the previous ["make", "test"] call raised FileNotFoundError. This reproduces the Makefile test target 1:1 (cd src/api && python -m pytest ../../tests/ -v). ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this is safe for concurrent active tasks — no shared /repos checkout race. """ import subprocess try: repo_path = ensure_worktree(repo, branch) r = subprocess.run( ["python", "-m", "pytest", "../../tests/", "-v"], cwd=os.path.join(repo_path, "src", "api"), capture_output=True, text=True, timeout=600, ) if r.returncode == 0: return True, "Local tests passed" tail = (r.stdout + r.stderr)[-500:] return False, f"Local tests failed: ...{tail}" except subprocess.TimeoutExpired: return False, "Local tests timed out (600s)" except Exception as e: return False, f"Local test run error: {e}" def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]: """ БАГ 8 fix: gate the deploy -> done transition on the deployer's machine-readable verdict in 14-deploy-log.md frontmatter, NOT on the LLM process exit code (which is always 0 on a successful agent session even when the deploy failed). Mirrors check_reviewer_verdict (S-5): reads ONLY `deploy_status:` from YAML frontmatter. Returns: (True, ...) -> deploy_status: SUCCESS (False, ...) -> deploy_status: FAILED, missing field, or no frontmatter """ import yaml repo_path = _repo_path(repo, branch) log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/14-deploy-log.md") if not os.path.isfile(log_path): return False, "Deploy log not found (14-deploy-log.md)" try: with open(log_path, "r") as f: content = f.read() status = None if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: try: fm = yaml.safe_load(parts[1]) or {} except yaml.YAMLError as e: return False, f"Invalid YAML frontmatter in deploy log: {e}" status = str(fm.get("deploy_status", "")).upper().strip() if status == "SUCCESS": return True, "Deploy status: SUCCESS" if status == "FAILED": return False, "Deploy status: FAILED" return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})" except OSError as e: return False, f"Error reading deploy log: {e}" # Registry for dynamic lookup by name QG_CHECKS = { "check_analysis_approved": check_analysis_approved, "check_analysis_complete": check_analysis_complete, "check_architecture_done": check_architecture_done, "check_ci_green": check_ci_green, "check_review_approved": check_review_approved, "check_tests_passed": check_tests_passed, "check_reviewer_verdict": check_reviewer_verdict, "check_tests_local": check_tests_local, "check_deploy_status": check_deploy_status, }