"""Security-gate core (ORCH-022): secret-scanning + dependency audit before merge. Background ---------- The orchestrator is autonomous: the ``developer`` agent writes code with no human filter. Before a task branch merges into ``main`` there was no automatic check for a leaked secret (key / token / password / private key) or a vulnerable dependency (known CVE). For the self-hosting ``orchestrator`` repo this is acute: one shared prod instance serves every project from a shared DB, so a secret or CVE that slips through one task lands in the prod of all projects (CLAUDE.md §self-hosting, §8). This module provides the deterministic (no-LLM) primitives that the quality-gate ``check_security_gate`` (src/qg/checks.py) composes on the ``deploy-staging -> deploy`` edge, **FIRST** among the edge sub-gates (BEFORE the merge-gate and image-freshness), immediately before the deployer merges the PR (ADR-001 Р-1): * ``scan_secrets`` -> run ``gitleaks`` over ``origin/main..HEAD`` (offline). * ``audit_dependencies`` -> run ``pip-audit`` over ``requirements.txt`` (OSV/PyPI). * ``classify_severity`` -> pure: map a CVE severity to block / warning. * ``compute_verdict`` -> pure: combine findings + thresholds -> the artefact frontmatter fields + a human-readable reason. * ``write_security_report`` / ``parse_security_status`` -> write the ``17-security-report.md`` artefact and read its machine verdict back (single source of truth: the gate returns exactly the frontmatter it wrote, AC-8). * ``check_security_gate`` -> the orchestrating entry the QG wrapper delegates to. Invariants (ADR-001 §7, never broken): * **Secrets are unconditional** (BR-2): gitleaks is fully offline, so the "a secret always blocks" guarantee does not depend on the network. A secret-scan TOOL error is **fail-closed** (we cannot prove "no secret" -> FAIL). * **Dependency audit is best-effort** (Р-3): an unreachable CVE feed degrades **fail-open + a loud warning** by default (anti-loop, precedent ORCH-061); ``security_dep_audit_fail_closed`` flips it to strict. * **never-raise**: any internal error -> ``(False, "")``; an exception never escapes into ``advance_stage`` (AC-16). * **Self-hosting safety** (AC-19): the gate only reads / scans / writes the artefact. It never calls the deploy hook and never restarts the prod container. This module is a **leaf**: it imports only ``config`` / ``git_worktree`` and lazily ``qg.checks.is_self_hosting_repo`` / ``notifications``; it never imports ``stage_engine``. """ import json import logging import os import subprocess from dataclasses import dataclass, field from .config import settings from .git_worktree import ensure_worktree, get_worktree_path logger = logging.getLogger("orchestrator.security_gate") # Bounded git timeout so a hung fetch never wedges the monitor-thread running the # gate (the scan timeout itself comes from settings.security_scan_timeout_s). _GIT_TIMEOUT = 60 # Severity ranking for the dependency block threshold. UNKNOWN / unrecognised is # intentionally absent -> classified as "warning" (anti-loop, ADR-001 Р-4). _SEVERITY_ORDER = {"LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4} # --------------------------------------------------------------------------- # Result containers (plain dataclasses, easy to build in tests) # --------------------------------------------------------------------------- @dataclass class SecretScanResult: """Outcome of :func:`scan_secrets`. status: * ``"clean"`` -> no secret found. * ``"found"`` -> ``findings`` lists the confirmed (non-allowlisted) secrets. * ``"error"`` -> the scanner could not run (missing binary / timeout / rc>=2); treated as **fail-closed** by :func:`compute_verdict` (BR-2). """ status: str = "clean" findings: list = field(default_factory=list) detail: str = "" @dataclass class DepAuditResult: """Outcome of :func:`audit_dependencies`. status: * ``"ok"`` -> the audit ran; ``findings`` may be empty or non-empty. * ``"degraded"`` -> the CVE feed was unreachable / the tool failed; **fail-open** by default (ADR-001 Р-3), surfaced as ``deps_audit_degraded: true``. """ status: str = "ok" findings: list = field(default_factory=list) detail: str = "" # --------------------------------------------------------------------------- # Conditionality (mirrors _merge_gate_applies / image_freshness_applies) # --------------------------------------------------------------------------- def security_gate_applies(repo: str) -> bool: """Whether the security-gate is REAL for this repo (conditional rollout). Mirrors the ORCH-35 / ORCH-43 / ORCH-58 pattern: * ``security_gate_enabled=False`` -> always False (kill-switch; pipeline is 1:1 as before ORCH-022 for everyone). * ``security_gate_repos`` (CSV) non-empty -> real only for the listed repos. * empty CSV -> real ONLY for the self-hosting repo (``orchestrator``). Never raises (AC-16): any error -> False (the safe no-op default). """ try: if not settings.security_gate_enabled: return False raw = (settings.security_gate_repos or "").strip() if raw: allowed = {r.strip().lower() for r in raw.split(",") if r.strip()} return (repo or "").strip().lower() in allowed # Lazy import keeps this module a leaf (no qg import at module load). from .qg.checks import is_self_hosting_repo return is_self_hosting_repo(repo) except Exception as e: # noqa: BLE001 - never-raise contract logger.warning("security_gate_applies error for %s: %s", repo, e) return False # --------------------------------------------------------------------------- # Secret-scanning (gitleaks, offline) — FR-1 / AC-1..AC-3 # --------------------------------------------------------------------------- def _gitleaks_config_path(worktree: str) -> str | None: """Versioned ``.gitleaks.toml`` at the repo root (BR-13), or None if absent.""" cfg = os.path.join(worktree, ".gitleaks.toml") return cfg if os.path.isfile(cfg) else None def _mask(secret: str) -> str: """Mask a matched secret so the artefact never re-leaks it verbatim.""" s = (secret or "").strip() if len(s) <= 8: return "****" return f"{s[:4]}…{s[-2:]}" def parse_gitleaks_report(text: str) -> list: """Pure parser for the gitleaks JSON report -> a list of finding dicts. Each finding: ``{"file", "rule", "line", "match"}`` (the match is MASKED). Tolerates an empty / non-JSON / non-list body (returns ``[]``); never raises. """ try: data = json.loads(text or "[]") except (ValueError, TypeError): return [] if not isinstance(data, list): return [] out = [] for item in data: if not isinstance(item, dict): continue out.append( { "file": item.get("File") or item.get("file") or "?", "rule": item.get("RuleID") or item.get("Description") or "secret", "line": item.get("StartLine") or item.get("startLine") or 0, "match": _mask(item.get("Secret") or item.get("Match") or ""), } ) return out def scan_secrets(repo: str, branch: str) -> SecretScanResult: """Scan ``origin/main..HEAD`` of the task branch for secrets with ``gitleaks``. Offline (BR-2): gitleaks rules are local, so the "a secret always blocks" guarantee never depends on the network. Scanning the ``origin/main..HEAD`` range covers exactly the commits this task adds (and that will land in ``main``), and — because it runs BEFORE the merge-gate rebase — does not blame the task for a secret introduced by a parallel update of ``main`` (ADR-001 Р-1). Exit-code contract (07-infra-requirements.md I-1): 0 = clean, 1 = secrets found, >=2 = tool error. A tool error / missing binary / timeout -> ``"error"`` (fail-closed downstream). Never raises (AC-16). """ try: wt = ensure_worktree(repo, branch) except Exception as e: # noqa: BLE001 - never-raise contract return SecretScanResult(status="error", detail=f"worktree error: {e}") # Refresh origin/main so the origin/main..HEAD range is meaningful. Best-effort: # a fetch failure does not abort the scan (gitleaks still scans whatever range # it can resolve); the scan itself is the security-critical step. try: subprocess.run( ["git", "-C", wt, "fetch", "origin", "main"], capture_output=True, timeout=_GIT_TIMEOUT, ) except (subprocess.SubprocessError, OSError) as e: logger.warning("scan_secrets: fetch origin/main failed for %s/%s: %s", repo, branch, e) report_path = os.path.join(wt, ".gitleaks-report.json") cmd = [ "gitleaks", "detect", "--source", wt, "--log-opts", "origin/main..HEAD", "--report-format", "json", "--report-path", report_path, "--exit-code", "1", "--no-banner", ] cfg = _gitleaks_config_path(wt) if cfg: cmd += ["--config", cfg] timeout = settings.security_scan_timeout_s try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) except subprocess.TimeoutExpired: return SecretScanResult(status="error", detail=f"gitleaks timeout after {timeout}s") except FileNotFoundError: # Missing binary -> fail-closed (we cannot prove the branch is secret-free). return SecretScanResult(status="error", detail="gitleaks binary not found") except (subprocess.SubprocessError, OSError) as e: return SecretScanResult(status="error", detail=f"gitleaks error: {e}") finally: # The report file is transient scratch inside the worktree; remove it after # reading so it is never committed/scanned on a later pass. report_text = "" try: if os.path.isfile(report_path): with open(report_path, "r", encoding="utf-8") as f: report_text = f.read() os.remove(report_path) except OSError: report_text = "" if r.returncode == 0: return SecretScanResult(status="clean", detail="no secrets found") if r.returncode == 1: findings = parse_gitleaks_report(report_text) or parse_gitleaks_report(r.stdout) if not findings: # rc=1 with no parseable findings -> still treat as found (fail-closed). findings = [{"file": "?", "rule": "secret", "line": 0, "match": "****"}] return SecretScanResult( status="found", findings=findings, detail=f"{len(findings)} secret(s) found" ) # rc >= 2 (or any other) -> tool error -> fail-closed. tail = ((r.stderr or "") + (r.stdout or "")).strip()[-200:] return SecretScanResult(status="error", detail=f"gitleaks rc={r.returncode}: {tail}") # --------------------------------------------------------------------------- # Dependency audit (pip-audit, OSV/PyPI) — FR-2 / AC-4..AC-7 # --------------------------------------------------------------------------- def parse_pip_audit_report(text: str) -> list: """Pure parser for the ``pip-audit -f json`` report -> a list of finding dicts. Each finding: ``{"package", "version", "id", "severity", "fix"}``. pip-audit's default JSON rarely carries a CVSS severity (OSV advisories often omit it), so a missing severity is reported as ``"UNKNOWN"`` (classified as a warning, never an auto-block — ADR-001 Р-4 anti-loop). Tolerates both the modern ``{"dependencies": [...]}`` shape and a bare list; never raises. """ try: data = json.loads(text or "{}") except (ValueError, TypeError): return [] if isinstance(data, dict): deps = data.get("dependencies", data.get("vulnerabilities", [])) elif isinstance(data, list): deps = data else: return [] out = [] for dep in deps or []: if not isinstance(dep, dict): continue name = dep.get("name") or dep.get("package") or "?" version = dep.get("version") or "?" for v in dep.get("vulns", dep.get("vulnerabilities", [])) or []: if not isinstance(v, dict): continue sev = _extract_severity(v) fix = v.get("fix_versions") or v.get("fixed_in") or [] aliases = v.get("aliases") or [] vuln_id = v.get("id") or (aliases[0] if aliases else "?") out.append( { "package": name, "version": version, "id": vuln_id, "severity": sev, "fix": ", ".join(fix) if isinstance(fix, list) else str(fix), } ) return out def _extract_severity(vuln: dict) -> str: """Best-effort severity extraction from a pip-audit vuln record -> UPPER token. pip-audit JSON may carry severity in different shapes depending on the advisory source; when none is present we return ``"UNKNOWN"`` (warning, never a block). """ raw = vuln.get("severity") if isinstance(raw, str) and raw.strip(): return raw.strip().upper() if isinstance(raw, list) and raw: first = raw[0] if isinstance(first, dict): val = first.get("severity") or first.get("score") or first.get("type") if val: return str(val).strip().upper() elif first: return str(first).strip().upper() return "UNKNOWN" def audit_dependencies(repo: str, branch: str) -> DepAuditResult: """Audit the branch's ``requirements.txt`` for known CVEs with ``pip-audit``. The advisory source is OSV/PyPI -> it needs the network. Per ADR-001 Р-3 an unreachable feed / tool failure degrades **fail-open** by default (status ``"degraded"``), so a transient network problem on the prod instance never produces a false rollback loop (precedent ORCH-061). The ``"degraded"`` state is surfaced loudly (``deps_audit_degraded: true`` + warning log + Telegram). Returns a :class:`DepAuditResult`. Never raises (AC-16). """ try: wt = get_worktree_path(repo, branch) if not os.path.isdir(wt): wt = ensure_worktree(repo, branch) except Exception as e: # noqa: BLE001 - never-raise contract return DepAuditResult(status="degraded", detail=f"worktree error: {e}") req = os.path.join(wt, "requirements.txt") if not os.path.isfile(req): # Python-only v1 (A3): no manifest -> nothing to audit (not a degrade). return DepAuditResult(status="ok", detail="no requirements.txt to audit") cmd = ["pip-audit", "-r", req, "-f", "json", "--progress-spinner", "off"] timeout = settings.security_scan_timeout_s try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) except subprocess.TimeoutExpired: return DepAuditResult(status="degraded", detail=f"pip-audit timeout after {timeout}s") except FileNotFoundError: # Missing binary -> degrade (dep-audit is best-effort, not unconditional). return DepAuditResult(status="degraded", detail="pip-audit binary not found") except (subprocess.SubprocessError, OSError) as e: return DepAuditResult(status="degraded", detail=f"pip-audit error: {e}") # pip-audit exits 0 (no vulns) or 1 (vulns found) with valid JSON on stdout. A # network/feed error produces non-JSON output (and often a non-zero rc) -> if # we cannot parse the JSON we degrade fail-open rather than block falsely. out = (r.stdout or "").strip() if not out: if r.returncode == 0: return DepAuditResult(status="ok", detail="no vulnerabilities") tail = (r.stderr or "").strip()[-200:] return DepAuditResult(status="degraded", detail=f"pip-audit no output (rc={r.returncode}): {tail}") try: json.loads(out) except ValueError: tail = (r.stderr or "").strip()[-200:] return DepAuditResult(status="degraded", detail=f"pip-audit feed unavailable: {tail}") findings = parse_pip_audit_report(out) return DepAuditResult(status="ok", findings=findings, detail=f"{len(findings)} vuln(s)") # --------------------------------------------------------------------------- # Pure classification + verdict (FR-2/FR-3/Р-4) — the core of the unit tests # --------------------------------------------------------------------------- def classify_severity(severity: str, block_threshold: str) -> str: """Pure: classify a CVE severity against the block threshold -> token. Returns ``"block"`` when ``severity >= block_threshold`` in CRITICAL > HIGH > MEDIUM > LOW order, else ``"warning"``. An UNKNOWN / unrecognised severity is ALWAYS ``"warning"`` (never an auto-block — anti-loop, ADR-001 Р-4). Never raises. """ sev = (severity or "").upper().strip() thr = (block_threshold or "HIGH").upper().strip() sev_rank = _SEVERITY_ORDER.get(sev) thr_rank = _SEVERITY_ORDER.get(thr, _SEVERITY_ORDER["HIGH"]) if sev_rank is None: return "warning" return "block" if sev_rank >= thr_rank else "warning" def compute_verdict( secret_result: SecretScanResult, dep_result: DepAuditResult, *, secrets_block: bool, dep_block_severity: str, dep_fail_closed: bool, ) -> dict: """Pure: combine scan results + thresholds into the artefact's machine fields. Returns a dict with the frontmatter fields (``security_status``, ``secrets_found``, ``deps_blocking``, ``deps_warning``, ``deps_audit_degraded``), a one-line ``reason`` summary, and the categorised finding lists for the body. Decision (ADR-001 Р-4): * secret-scan ERROR -> FAIL (fail-closed; BR-2 secrets guarantee is unconditional). * any secret found AND ``secrets_block`` -> FAIL. * any dependency at/over ``dep_block_severity`` -> FAIL (``deps_blocking``). * MEDIUM/LOW/UNKNOWN deps -> warning only (``deps_warning``), never block. * feed degraded -> warning by default; FAIL only when ``dep_fail_closed``. Never raises. """ secret_scan_error = secret_result.status == "error" secret_findings = list(secret_result.findings) if secret_result.status == "found" else [] secrets_found = len(secret_findings) deps_audit_degraded = dep_result.status == "degraded" blocking_findings = [] warning_findings = [] for f in dep_result.findings or []: if classify_severity(f.get("severity", "UNKNOWN"), dep_block_severity) == "block": blocking_findings.append(f) else: warning_findings.append(f) reasons = [] fail = False if secret_scan_error: fail = True reasons.append(f"secret scan error (fail-closed): {secret_result.detail}") if secrets_block and secrets_found > 0: fail = True names = ", ".join( f"{x.get('rule')} in {x.get('file')}:{x.get('line')}" for x in secret_findings ) reasons.append(f"{secrets_found} secret(s): {names}") if blocking_findings: fail = True names = ", ".join( f"{x.get('package')} {x.get('version')} {x.get('id')} ({x.get('severity')})" for x in blocking_findings ) reasons.append(f"{len(blocking_findings)} blocking CVE(s): {names}") if deps_audit_degraded and dep_fail_closed: fail = True reasons.append(f"dep-audit feed unavailable (fail-closed): {dep_result.detail}") status = "FAIL" if fail else "PASS" if reasons: reason = "; ".join(reasons) else: extra = " (dep-audit degraded — warning only)" if deps_audit_degraded else "" reason = f"clean: {secrets_found} secrets, {len(blocking_findings)} blocking CVE(s){extra}" return { "security_status": status, "secrets_found": secrets_found, "secret_scan_error": secret_scan_error, "deps_blocking": len(blocking_findings), "deps_warning": len(warning_findings), "deps_audit_degraded": deps_audit_degraded, "reason": reason, "secret_findings": secret_findings, "blocking_findings": blocking_findings, "warning_findings": warning_findings, } # --------------------------------------------------------------------------- # Artefact: write the report, read the machine verdict back (FR-3 / AC-8..AC-10) # --------------------------------------------------------------------------- def _report_rel(work_item_id: str) -> str: return f"docs/work-items/{work_item_id}/17-security-report.md" def _report_path(repo: str, work_item_id: str, branch: str) -> str: """Absolute path of 17-security-report.md inside the task worktree.""" try: wt = get_worktree_path(repo, branch) if not os.path.isdir(wt): wt = ensure_worktree(repo, branch) except Exception: # noqa: BLE001 - never-raise; fall back to shared clone wt = os.path.join(settings.repos_dir, repo) return os.path.join(wt, _report_rel(work_item_id)) def _bool_yaml(v: bool) -> str: return "true" if v else "false" def render_security_report(work_item_id: str, fields: dict) -> str: """Pure: render the 17-security-report.md content (frontmatter + body) from the fields produced by :func:`compute_verdict`. Never raises.""" def _secret_lines(): items = fields.get("secret_findings") or [] if not items: return "- None" return "\n".join( f"- `{x.get('file')}:{x.get('line')}` — {x.get('rule')} (match `{x.get('match')}`)" for x in items ) def _dep_lines(key): items = fields.get(key) or [] if not items: return "- None" return "\n".join( f"- `{x.get('package')}=={x.get('version')}` — {x.get('id')} " f"severity={x.get('severity')} fix={x.get('fix') or 'n/a'}" for x in items ) return ( "---\n" f"security_status: {fields.get('security_status', 'FAIL')}\n" f"secrets_found: {int(fields.get('secrets_found', 0))}\n" f"deps_blocking: {int(fields.get('deps_blocking', 0))}\n" f"deps_warning: {int(fields.get('deps_warning', 0))}\n" f"deps_audit_degraded: {_bool_yaml(bool(fields.get('deps_audit_degraded', False)))}\n" "---\n" f"# Security Report — {work_item_id}\n\n" "Детерминированный security-гейт (ORCH-022): secret-scanning (gitleaks, offline) + " "dependency audit (pip-audit). Машинный вердикт читается ТОЛЬКО из frontmatter выше.\n\n" "## Verdict\n" f"{fields.get('reason', '')}\n\n" "## Secrets\n" f"{_secret_lines()}\n\n" "## Dependencies (blocking)\n" f"{_dep_lines('blocking_findings')}\n\n" "## Dependencies (warning)\n" f"{_dep_lines('warning_findings')}\n" ) def write_security_report(repo: str, work_item_id: str, branch: str, fields: dict) -> str: """Write 17-security-report.md into the task worktree; return its path. Best-effort/never-raise: a write error is logged and the path is still returned (the caller's read-back then fails closed). The artefact body is human-readable; the machine verdict lives ONLY in the YAML frontmatter (canon).""" path = _report_path(repo, work_item_id, branch) try: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: f.write(render_security_report(work_item_id, fields)) except OSError as e: logger.error("write_security_report error for %s/%s: %s", repo, work_item_id, e) return path def parse_security_status(content: str) -> tuple[bool, str]: """Map a 17-security-report.md body to a quality-gate verdict by reading ONLY the machine-readable ``security_status:`` YAML frontmatter — never the prose. Mirrors ``_parse_deploy_status`` / ``_parse_staging_status`` (canon: machine verdict only from frontmatter, AC-8). The negative token (FAIL) is authoritative (checked first). Returns: * ``security_status: PASS`` -> ``(True, "Security status: PASS")`` * ``security_status: FAIL`` -> ``(False, "Security status: FAIL")`` * missing field / no frontmatter / bad YAML -> ``(False, )`` (fail-closed on the verdict read, AC-9). """ import yaml status = None if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: try: fm = yaml.safe_load(parts[1]) or {} except yaml.YAMLError as e: return False, f"Invalid YAML frontmatter in security report: {e}" if isinstance(fm, dict): status = str(fm.get("security_status", "")).upper().strip() if status == "FAIL": return False, "Security status: FAIL" if status == "PASS": return True, "Security status: PASS" return False, f"No machine-readable security_status in frontmatter (got: {status!r})" def extract_security_findings(report_path: str) -> str: """ORCH-046: best-effort verbatim excerpt of the report's finding sections for embedding into the developer's ``task_desc`` on a rollback. Pulls the ``## Verdict`` + ``## Secrets`` + ``## Dependencies (blocking)`` sections so the developer sees the must-fix substance directly (not just a link). Contract «never raise»: any error / missing file -> ``""`` (the caller then falls back to the reason + link). Mirrors ``review_parse`` defensiveness. """ try: if not os.path.isfile(report_path): return "" with open(report_path, "r", encoding="utf-8") as f: content = f.read() # Drop the frontmatter; keep the human body. if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: content = parts[2] wanted = ("## Verdict", "## Secrets", "## Dependencies (blocking)") lines = content.splitlines() out = [] keep = False for ln in lines: if ln.startswith("## "): keep = any(ln.startswith(w) for w in wanted) if keep: out.append(ln) excerpt = "\n".join(out).strip() return excerpt[:1500] except Exception as e: # noqa: BLE001 - never-raise (ORCH-046 defensive) logger.warning("extract_security_findings error for %s: %s", report_path, e) return "" # --------------------------------------------------------------------------- # Orchestrating entry — delegated to by qg.checks.check_security_gate # --------------------------------------------------------------------------- def check_security_gate(repo: str, work_item_id: str, branch: str) -> tuple[bool, str]: """ORCH-022 security-gate on the deploy-staging -> deploy edge, run FIRST. Deterministic, no LLM. Algorithm (ADR-001 Р-1/Р-5): 1. Conditionality: ``security_gate_enabled=False`` -> ``(True, "...disabled")``; a repo the gate is not real for -> ``(True, "security-gate N/A for ")``. 2. ``scan_secrets`` (offline) + ``audit_dependencies`` (best-effort). 3. ``compute_verdict`` -> write ``17-security-report.md`` -> read the verdict BACK via ``parse_security_status`` (single source of truth: the returned verdict == the artefact frontmatter, AC-8). 4. FAIL -> ``(False, reason)`` (engine rolls back to ``development``); PASS -> ``(True, reason)`` (engine proceeds to the merge-gate). A degraded dep-audit on a PASS is surfaced loudly (Telegram + log) without failing the gate (ADR-001 Р-3). Never-raise (AC-16): any internal error -> ``(False, "")``; an exception never escapes into ``advance_stage``. """ try: if not settings.security_gate_enabled: return True, "security-gate disabled" if not security_gate_applies(repo): return True, f"security-gate N/A for {repo}" secret_result = scan_secrets(repo, branch) dep_result = audit_dependencies(repo, branch) fields = compute_verdict( secret_result, dep_result, secrets_block=settings.security_secrets_block, dep_block_severity=settings.security_dep_block_severity, dep_fail_closed=settings.security_dep_audit_fail_closed, ) path = write_security_report(repo, work_item_id, branch, fields) # Read the machine verdict back from the artefact we just wrote — so the # returned (bool, reason) is guaranteed == the YAML frontmatter (AC-8). try: with open(path, "r", encoding="utf-8") as f: content = f.read() except OSError as e: return False, f"cannot read security report (fail-closed): {e}" ok, _verdict = parse_security_status(content) # Surface a degraded dep-audit loudly even when the gate passes (Р-3 / BR-11). if fields.get("deps_audit_degraded"): logger.warning( "security-gate %s/%s: dep-audit DEGRADED (fail-%s): %s", repo, work_item_id, "closed" if settings.security_dep_audit_fail_closed else "open", dep_result.detail, ) try: from .notifications import send_telegram, link_for send_telegram( f"⚠️ {link_for(work_item_id)}: dep-audit недоступен фид CVE " f"({dep_result.detail}). " + ("Гейт fail-closed → FAIL." if settings.security_dep_audit_fail_closed else "Гейт fail-open → warning (секреты проверены оффлайн).") ) except Exception as e: # noqa: BLE001 - telegram best-effort logger.warning("security-gate degraded telegram failed: %s", e) if ok: logger.info("security-gate passed for %s/%s: %s", repo, work_item_id, fields["reason"]) return True, f"security clean ({fields['reason']})" return False, fields["reason"] except Exception as e: # noqa: BLE001 - never-raise contract (AC-16) logger.error("check_security_gate error for %s/%s: %s", repo, branch, e) return False, f"security-gate error: {e}"