Files
orchestrator/src/security_gate.py
claude-bot 92961d1d32 refactor(frontmatter): unified frontmatter contract + handoff spec (ORCH-52c)
src/frontmatter.py grows from a single-key reader into the full machine
contract: reader (read_frontmatter_value, unchanged), one parse primitive
(parse_frontmatter), writer (render/write_frontmatter), schema validator
(validate_schema/REQUIRED_FIELDS, warning-only by default) and a shared
strip_frontmatter helper. The five verdict gates (check_reviewer_verdict,
_parse_tests_verdict, _parse_deploy_status, _parse_staging_status,
parse_security_status) now read through the single parse_frontmatter point
instead of duplicated ad-hoc YAML logic; review_parse._strip_frontmatter and
security_gate.extract_security_findings reuse the shared helper.

Strictly backward compatible + never-raise: STAGE_TRANSITIONS, the QG_CHECKS
composition, verdict semantics (incl. ORCH-047 three-field tester + negative
token priority), reason-strings and worktree->origin/main fallback are 1:1.
The schema validator never influences a gate verdict by default; hard-fail is
reserved behind the frontmatter_validation_strict kill-switch (default False).

New formal handoff spec docs/_standards/HANDOFF_PROTOCOL.md ("stage -> required
output" + required frontmatter schema), aligned 1:1 with PIPELINE_DOCS.md.

Tests: test_frontmatter.py (TC-01..07), test_qg_verdicts.py (TC-08..15),
test_security_gate.py (TC-12), test_stages_invariants.py (TC-16). Full
tests/ green (1212).

Refs: ORCH-076

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 14:14:30 +03:00

688 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Security-gate core (ORCH-022): secret-scanning + dependency audit before merge.
Background
----------
The orchestrator is autonomous: the ``developer`` agent writes code with no human
filter. Before a task branch merges into ``main`` there was no automatic check for a
leaked secret (key / token / password / private key) or a vulnerable dependency
(known CVE). For the self-hosting ``orchestrator`` repo this is acute: one shared
prod instance serves every project from a shared DB, so a secret or CVE that slips
through one task lands in the prod of all projects (CLAUDE.md §self-hosting, §8).
This module provides the deterministic (no-LLM) primitives that the quality-gate
``check_security_gate`` (src/qg/checks.py) composes on the ``deploy-staging ->
deploy`` edge, **FIRST** among the edge sub-gates (BEFORE the merge-gate and
image-freshness), immediately before the deployer merges the PR (ADR-001 Р-1):
* ``scan_secrets`` -> run ``gitleaks`` over ``origin/main..HEAD`` (offline).
* ``audit_dependencies`` -> run ``pip-audit`` over ``requirements.txt`` (OSV/PyPI).
* ``classify_severity`` -> pure: map a CVE severity to block / warning.
* ``compute_verdict`` -> pure: combine findings + thresholds -> the artefact
frontmatter fields + a human-readable reason.
* ``write_security_report`` / ``parse_security_status`` -> write the
``17-security-report.md`` artefact and read its machine verdict back (single
source of truth: the gate returns exactly the frontmatter it wrote, AC-8).
* ``check_security_gate`` -> the orchestrating entry the QG wrapper delegates to.
Invariants (ADR-001 §7, never broken):
* **Secrets are unconditional** (BR-2): gitleaks is fully offline, so the "a
secret always blocks" guarantee does not depend on the network. A secret-scan
TOOL error is **fail-closed** (we cannot prove "no secret" -> FAIL).
* **Dependency audit is best-effort** (Р-3): an unreachable CVE feed degrades
**fail-open + a loud warning** by default (anti-loop, precedent ORCH-061);
``security_dep_audit_fail_closed`` flips it to strict.
* **never-raise**: any internal error -> ``(False, "<reason>")``; an exception
never escapes into ``advance_stage`` (AC-16).
* **Self-hosting safety** (AC-19): the gate only reads / scans / writes the
artefact. It never calls the deploy hook and never restarts the prod container.
This module is a **leaf**: it imports only ``config`` / ``git_worktree`` and lazily
``qg.checks.is_self_hosting_repo`` / ``notifications``; it never imports
``stage_engine``.
"""
import json
import logging
import os
import subprocess
from dataclasses import dataclass, field
from .config import settings
from .git_worktree import ensure_worktree, get_worktree_path
logger = logging.getLogger("orchestrator.security_gate")
# Bounded git timeout so a hung fetch never wedges the monitor-thread running the
# gate (the scan timeout itself comes from settings.security_scan_timeout_s).
_GIT_TIMEOUT = 60
# Severity ranking for the dependency block threshold. UNKNOWN / unrecognised is
# intentionally absent -> classified as "warning" (anti-loop, ADR-001 Р-4).
_SEVERITY_ORDER = {"LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
# ---------------------------------------------------------------------------
# Result containers (plain dataclasses, easy to build in tests)
# ---------------------------------------------------------------------------
@dataclass
class SecretScanResult:
"""Outcome of :func:`scan_secrets`.
status:
* ``"clean"`` -> no secret found.
* ``"found"`` -> ``findings`` lists the confirmed (non-allowlisted) secrets.
* ``"error"`` -> the scanner could not run (missing binary / timeout / rc>=2);
treated as **fail-closed** by :func:`compute_verdict` (BR-2).
"""
status: str = "clean"
findings: list = field(default_factory=list)
detail: str = ""
@dataclass
class DepAuditResult:
"""Outcome of :func:`audit_dependencies`.
status:
* ``"ok"`` -> the audit ran; ``findings`` may be empty or non-empty.
* ``"degraded"`` -> the CVE feed was unreachable / the tool failed; **fail-open**
by default (ADR-001 Р-3), surfaced as ``deps_audit_degraded: true``.
"""
status: str = "ok"
findings: list = field(default_factory=list)
detail: str = ""
# ---------------------------------------------------------------------------
# Conditionality (mirrors _merge_gate_applies / image_freshness_applies)
# ---------------------------------------------------------------------------
def security_gate_applies(repo: str) -> bool:
"""Whether the security-gate is REAL for this repo (conditional rollout).
Mirrors the ORCH-35 / ORCH-43 / ORCH-58 pattern:
* ``security_gate_enabled=False`` -> always False (kill-switch; pipeline is
1:1 as before ORCH-022 for everyone).
* ``security_gate_repos`` (CSV) non-empty -> real only for the listed repos.
* empty CSV -> real ONLY for the self-hosting repo (``orchestrator``).
Never raises (AC-16): any error -> False (the safe no-op default).
"""
try:
if not settings.security_gate_enabled:
return False
raw = (settings.security_gate_repos or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
return (repo or "").strip().lower() in allowed
# Lazy import keeps this module a leaf (no qg import at module load).
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("security_gate_applies error for %s: %s", repo, e)
return False
# ---------------------------------------------------------------------------
# Secret-scanning (gitleaks, offline) — FR-1 / AC-1..AC-3
# ---------------------------------------------------------------------------
def _gitleaks_config_path(worktree: str) -> str | None:
"""Versioned ``.gitleaks.toml`` at the repo root (BR-13), or None if absent."""
cfg = os.path.join(worktree, ".gitleaks.toml")
return cfg if os.path.isfile(cfg) else None
def _mask(secret: str) -> str:
"""Mask a matched secret so the artefact never re-leaks it verbatim."""
s = (secret or "").strip()
if len(s) <= 8:
return "****"
return f"{s[:4]}{s[-2:]}"
def parse_gitleaks_report(text: str) -> list:
"""Pure parser for the gitleaks JSON report -> a list of finding dicts.
Each finding: ``{"file", "rule", "line", "match"}`` (the match is MASKED).
Tolerates an empty / non-JSON / non-list body (returns ``[]``); never raises.
"""
try:
data = json.loads(text or "[]")
except (ValueError, TypeError):
return []
if not isinstance(data, list):
return []
out = []
for item in data:
if not isinstance(item, dict):
continue
out.append(
{
"file": item.get("File") or item.get("file") or "?",
"rule": item.get("RuleID") or item.get("Description") or "secret",
"line": item.get("StartLine") or item.get("startLine") or 0,
"match": _mask(item.get("Secret") or item.get("Match") or ""),
}
)
return out
def scan_secrets(repo: str, branch: str) -> SecretScanResult:
"""Scan ``origin/main..HEAD`` of the task branch for secrets with ``gitleaks``.
Offline (BR-2): gitleaks rules are local, so the "a secret always blocks"
guarantee never depends on the network. Scanning the ``origin/main..HEAD``
range covers exactly the commits this task adds (and that will land in
``main``), and — because it runs BEFORE the merge-gate rebase — does not blame
the task for a secret introduced by a parallel update of ``main`` (ADR-001 Р-1).
Exit-code contract (07-infra-requirements.md I-1): 0 = clean, 1 = secrets
found, >=2 = tool error. A tool error / missing binary / timeout -> ``"error"``
(fail-closed downstream). Never raises (AC-16).
"""
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
return SecretScanResult(status="error", detail=f"worktree error: {e}")
# Refresh origin/main so the origin/main..HEAD range is meaningful. Best-effort:
# a fetch failure does not abort the scan (gitleaks still scans whatever range
# it can resolve); the scan itself is the security-critical step.
try:
subprocess.run(
["git", "-C", wt, "fetch", "origin", "main"],
capture_output=True, timeout=_GIT_TIMEOUT,
)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("scan_secrets: fetch origin/main failed for %s/%s: %s", repo, branch, e)
report_path = os.path.join(wt, ".gitleaks-report.json")
cmd = [
"gitleaks", "detect",
"--source", wt,
"--log-opts", "origin/main..HEAD",
"--report-format", "json",
"--report-path", report_path,
"--exit-code", "1",
"--no-banner",
]
cfg = _gitleaks_config_path(wt)
if cfg:
cmd += ["--config", cfg]
timeout = settings.security_scan_timeout_s
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
except subprocess.TimeoutExpired:
return SecretScanResult(status="error", detail=f"gitleaks timeout after {timeout}s")
except FileNotFoundError:
# Missing binary -> fail-closed (we cannot prove the branch is secret-free).
return SecretScanResult(status="error", detail="gitleaks binary not found")
except (subprocess.SubprocessError, OSError) as e:
return SecretScanResult(status="error", detail=f"gitleaks error: {e}")
finally:
# The report file is transient scratch inside the worktree; remove it after
# reading so it is never committed/scanned on a later pass.
report_text = ""
try:
if os.path.isfile(report_path):
with open(report_path, "r", encoding="utf-8") as f:
report_text = f.read()
os.remove(report_path)
except OSError:
report_text = ""
if r.returncode == 0:
return SecretScanResult(status="clean", detail="no secrets found")
if r.returncode == 1:
findings = parse_gitleaks_report(report_text) or parse_gitleaks_report(r.stdout)
if not findings:
# rc=1 with no parseable findings -> still treat as found (fail-closed).
findings = [{"file": "?", "rule": "secret", "line": 0, "match": "****"}]
return SecretScanResult(
status="found", findings=findings, detail=f"{len(findings)} secret(s) found"
)
# rc >= 2 (or any other) -> tool error -> fail-closed.
tail = ((r.stderr or "") + (r.stdout or "")).strip()[-200:]
return SecretScanResult(status="error", detail=f"gitleaks rc={r.returncode}: {tail}")
# ---------------------------------------------------------------------------
# Dependency audit (pip-audit, OSV/PyPI) — FR-2 / AC-4..AC-7
# ---------------------------------------------------------------------------
def parse_pip_audit_report(text: str) -> list:
"""Pure parser for the ``pip-audit -f json`` report -> a list of finding dicts.
Each finding: ``{"package", "version", "id", "severity", "fix"}``. pip-audit's
default JSON rarely carries a CVSS severity (OSV advisories often omit it), so a
missing severity is reported as ``"UNKNOWN"`` (classified as a warning, never an
auto-block — ADR-001 Р-4 anti-loop). Tolerates both the modern
``{"dependencies": [...]}`` shape and a bare list; never raises.
"""
try:
data = json.loads(text or "{}")
except (ValueError, TypeError):
return []
if isinstance(data, dict):
deps = data.get("dependencies", data.get("vulnerabilities", []))
elif isinstance(data, list):
deps = data
else:
return []
out = []
for dep in deps or []:
if not isinstance(dep, dict):
continue
name = dep.get("name") or dep.get("package") or "?"
version = dep.get("version") or "?"
for v in dep.get("vulns", dep.get("vulnerabilities", [])) or []:
if not isinstance(v, dict):
continue
sev = _extract_severity(v)
fix = v.get("fix_versions") or v.get("fixed_in") or []
aliases = v.get("aliases") or []
vuln_id = v.get("id") or (aliases[0] if aliases else "?")
out.append(
{
"package": name,
"version": version,
"id": vuln_id,
"severity": sev,
"fix": ", ".join(fix) if isinstance(fix, list) else str(fix),
}
)
return out
def _extract_severity(vuln: dict) -> str:
"""Best-effort severity extraction from a pip-audit vuln record -> UPPER token.
pip-audit JSON may carry severity in different shapes depending on the advisory
source; when none is present we return ``"UNKNOWN"`` (warning, never a block).
"""
raw = vuln.get("severity")
if isinstance(raw, str) and raw.strip():
return raw.strip().upper()
if isinstance(raw, list) and raw:
first = raw[0]
if isinstance(first, dict):
val = first.get("severity") or first.get("score") or first.get("type")
if val:
return str(val).strip().upper()
elif first:
return str(first).strip().upper()
return "UNKNOWN"
def audit_dependencies(repo: str, branch: str) -> DepAuditResult:
"""Audit the branch's ``requirements.txt`` for known CVEs with ``pip-audit``.
The advisory source is OSV/PyPI -> it needs the network. Per ADR-001 Р-3 an
unreachable feed / tool failure degrades **fail-open** by default (status
``"degraded"``), so a transient network problem on the prod instance never
produces a false rollback loop (precedent ORCH-061). The ``"degraded"`` state
is surfaced loudly (``deps_audit_degraded: true`` + warning log + Telegram).
Returns a :class:`DepAuditResult`. Never raises (AC-16).
"""
try:
wt = get_worktree_path(repo, branch)
if not os.path.isdir(wt):
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
return DepAuditResult(status="degraded", detail=f"worktree error: {e}")
req = os.path.join(wt, "requirements.txt")
if not os.path.isfile(req):
# Python-only v1 (A3): no manifest -> nothing to audit (not a degrade).
return DepAuditResult(status="ok", detail="no requirements.txt to audit")
cmd = ["pip-audit", "-r", req, "-f", "json", "--progress-spinner", "off"]
timeout = settings.security_scan_timeout_s
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
except subprocess.TimeoutExpired:
return DepAuditResult(status="degraded", detail=f"pip-audit timeout after {timeout}s")
except FileNotFoundError:
# Missing binary -> degrade (dep-audit is best-effort, not unconditional).
return DepAuditResult(status="degraded", detail="pip-audit binary not found")
except (subprocess.SubprocessError, OSError) as e:
return DepAuditResult(status="degraded", detail=f"pip-audit error: {e}")
# pip-audit exits 0 (no vulns) or 1 (vulns found) with valid JSON on stdout. A
# network/feed error produces non-JSON output (and often a non-zero rc) -> if
# we cannot parse the JSON we degrade fail-open rather than block falsely.
out = (r.stdout or "").strip()
if not out:
if r.returncode == 0:
return DepAuditResult(status="ok", detail="no vulnerabilities")
tail = (r.stderr or "").strip()[-200:]
return DepAuditResult(status="degraded", detail=f"pip-audit no output (rc={r.returncode}): {tail}")
try:
json.loads(out)
except ValueError:
tail = (r.stderr or "").strip()[-200:]
return DepAuditResult(status="degraded", detail=f"pip-audit feed unavailable: {tail}")
findings = parse_pip_audit_report(out)
return DepAuditResult(status="ok", findings=findings, detail=f"{len(findings)} vuln(s)")
# ---------------------------------------------------------------------------
# Pure classification + verdict (FR-2/FR-3/Р-4) — the core of the unit tests
# ---------------------------------------------------------------------------
def classify_severity(severity: str, block_threshold: str) -> str:
"""Pure: classify a CVE severity against the block threshold -> token.
Returns ``"block"`` when ``severity >= block_threshold`` in CRITICAL > HIGH >
MEDIUM > LOW order, else ``"warning"``. An UNKNOWN / unrecognised severity is
ALWAYS ``"warning"`` (never an auto-block — anti-loop, ADR-001 Р-4). Never
raises.
"""
sev = (severity or "").upper().strip()
thr = (block_threshold or "HIGH").upper().strip()
sev_rank = _SEVERITY_ORDER.get(sev)
thr_rank = _SEVERITY_ORDER.get(thr, _SEVERITY_ORDER["HIGH"])
if sev_rank is None:
return "warning"
return "block" if sev_rank >= thr_rank else "warning"
def compute_verdict(
secret_result: SecretScanResult,
dep_result: DepAuditResult,
*,
secrets_block: bool,
dep_block_severity: str,
dep_fail_closed: bool,
) -> dict:
"""Pure: combine scan results + thresholds into the artefact's machine fields.
Returns a dict with the frontmatter fields (``security_status``,
``secrets_found``, ``deps_blocking``, ``deps_warning``, ``deps_audit_degraded``),
a one-line ``reason`` summary, and the categorised finding lists for the body.
Decision (ADR-001 Р-4):
* secret-scan ERROR -> FAIL (fail-closed; BR-2 secrets guarantee is unconditional).
* any secret found AND ``secrets_block`` -> FAIL.
* any dependency at/over ``dep_block_severity`` -> FAIL (``deps_blocking``).
* MEDIUM/LOW/UNKNOWN deps -> warning only (``deps_warning``), never block.
* feed degraded -> warning by default; FAIL only when ``dep_fail_closed``.
Never raises.
"""
secret_scan_error = secret_result.status == "error"
secret_findings = list(secret_result.findings) if secret_result.status == "found" else []
secrets_found = len(secret_findings)
deps_audit_degraded = dep_result.status == "degraded"
blocking_findings = []
warning_findings = []
for f in dep_result.findings or []:
if classify_severity(f.get("severity", "UNKNOWN"), dep_block_severity) == "block":
blocking_findings.append(f)
else:
warning_findings.append(f)
reasons = []
fail = False
if secret_scan_error:
fail = True
reasons.append(f"secret scan error (fail-closed): {secret_result.detail}")
if secrets_block and secrets_found > 0:
fail = True
names = ", ".join(
f"{x.get('rule')} in {x.get('file')}:{x.get('line')}" for x in secret_findings
)
reasons.append(f"{secrets_found} secret(s): {names}")
if blocking_findings:
fail = True
names = ", ".join(
f"{x.get('package')} {x.get('version')} {x.get('id')} ({x.get('severity')})"
for x in blocking_findings
)
reasons.append(f"{len(blocking_findings)} blocking CVE(s): {names}")
if deps_audit_degraded and dep_fail_closed:
fail = True
reasons.append(f"dep-audit feed unavailable (fail-closed): {dep_result.detail}")
status = "FAIL" if fail else "PASS"
if reasons:
reason = "; ".join(reasons)
else:
extra = " (dep-audit degraded — warning only)" if deps_audit_degraded else ""
reason = f"clean: {secrets_found} secrets, {len(blocking_findings)} blocking CVE(s){extra}"
return {
"security_status": status,
"secrets_found": secrets_found,
"secret_scan_error": secret_scan_error,
"deps_blocking": len(blocking_findings),
"deps_warning": len(warning_findings),
"deps_audit_degraded": deps_audit_degraded,
"reason": reason,
"secret_findings": secret_findings,
"blocking_findings": blocking_findings,
"warning_findings": warning_findings,
}
# ---------------------------------------------------------------------------
# Artefact: write the report, read the machine verdict back (FR-3 / AC-8..AC-10)
# ---------------------------------------------------------------------------
def _report_rel(work_item_id: str) -> str:
return f"docs/work-items/{work_item_id}/17-security-report.md"
def _report_path(repo: str, work_item_id: str, branch: str) -> str:
"""Absolute path of 17-security-report.md inside the task worktree."""
try:
wt = get_worktree_path(repo, branch)
if not os.path.isdir(wt):
wt = ensure_worktree(repo, branch)
except Exception: # noqa: BLE001 - never-raise; fall back to shared clone
wt = os.path.join(settings.repos_dir, repo)
return os.path.join(wt, _report_rel(work_item_id))
def _bool_yaml(v: bool) -> str:
return "true" if v else "false"
def render_security_report(work_item_id: str, fields: dict) -> str:
"""Pure: render the 17-security-report.md content (frontmatter + body) from the
fields produced by :func:`compute_verdict`. Never raises."""
def _secret_lines():
items = fields.get("secret_findings") or []
if not items:
return "- None"
return "\n".join(
f"- `{x.get('file')}:{x.get('line')}` — {x.get('rule')} (match `{x.get('match')}`)"
for x in items
)
def _dep_lines(key):
items = fields.get(key) or []
if not items:
return "- None"
return "\n".join(
f"- `{x.get('package')}=={x.get('version')}` — {x.get('id')} "
f"severity={x.get('severity')} fix={x.get('fix') or 'n/a'}"
for x in items
)
return (
"---\n"
f"security_status: {fields.get('security_status', 'FAIL')}\n"
f"secrets_found: {int(fields.get('secrets_found', 0))}\n"
f"deps_blocking: {int(fields.get('deps_blocking', 0))}\n"
f"deps_warning: {int(fields.get('deps_warning', 0))}\n"
f"deps_audit_degraded: {_bool_yaml(bool(fields.get('deps_audit_degraded', False)))}\n"
"---\n"
f"# Security Report — {work_item_id}\n\n"
"Детерминированный security-гейт (ORCH-022): secret-scanning (gitleaks, offline) + "
"dependency audit (pip-audit). Машинный вердикт читается ТОЛЬКО из frontmatter выше.\n\n"
"## Verdict\n"
f"{fields.get('reason', '')}\n\n"
"## Secrets\n"
f"{_secret_lines()}\n\n"
"## Dependencies (blocking)\n"
f"{_dep_lines('blocking_findings')}\n\n"
"## Dependencies (warning)\n"
f"{_dep_lines('warning_findings')}\n"
)
def write_security_report(repo: str, work_item_id: str, branch: str, fields: dict) -> str:
"""Write 17-security-report.md into the task worktree; return its path.
Best-effort/never-raise: a write error is logged and the path is still returned
(the caller's read-back then fails closed). The artefact body is human-readable;
the machine verdict lives ONLY in the YAML frontmatter (canon)."""
path = _report_path(repo, work_item_id, branch)
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(render_security_report(work_item_id, fields))
except OSError as e:
logger.error("write_security_report error for %s/%s: %s", repo, work_item_id, e)
return path
def parse_security_status(content: str) -> tuple[bool, str]:
"""Map a 17-security-report.md body to a quality-gate verdict by reading ONLY
the machine-readable ``security_status:`` YAML frontmatter — never the prose.
Mirrors ``_parse_deploy_status`` / ``_parse_staging_status`` (canon: machine
verdict only from frontmatter, AC-8). The negative token (FAIL) is authoritative
(checked first). Returns:
* ``security_status: PASS`` -> ``(True, "Security status: PASS")``
* ``security_status: FAIL`` -> ``(False, "Security status: FAIL")``
* missing field / no frontmatter / bad YAML -> ``(False, <reason>)`` (fail-closed
on the verdict read, AC-9).
ORCH-52c: parse delegated to the unified ``frontmatter.parse_frontmatter``
primitive (single source of YAML-frontmatter logic); the security_status
semantics (FAIL authoritative) are UNCHANGED (1:1).
"""
from .frontmatter import parse_frontmatter
parse = parse_frontmatter(content)
if parse.yaml_error is not None:
return False, f"Invalid YAML frontmatter in security report: {parse.yaml_error}"
status = None
if parse.has_block and not parse.malformed:
status = str(parse.data.get("security_status", "")).upper().strip()
if status == "FAIL":
return False, "Security status: FAIL"
if status == "PASS":
return True, "Security status: PASS"
return False, f"No machine-readable security_status in frontmatter (got: {status!r})"
def extract_security_findings(report_path: str) -> str:
"""ORCH-046: best-effort verbatim excerpt of the report's finding sections for
embedding into the developer's ``task_desc`` on a rollback.
Pulls the ``## Verdict`` + ``## Secrets`` + ``## Dependencies (blocking)``
sections so the developer sees the must-fix substance directly (not just a
link). Contract «never raise»: any error / missing file -> ``""`` (the caller
then falls back to the reason + link). Mirrors ``review_parse`` defensiveness.
"""
try:
if not os.path.isfile(report_path):
return ""
with open(report_path, "r", encoding="utf-8") as f:
content = f.read()
# Drop the frontmatter; keep the human body (ORCH-52c: shared helper).
from .frontmatter import strip_frontmatter
content = strip_frontmatter(content)
wanted = ("## Verdict", "## Secrets", "## Dependencies (blocking)")
lines = content.splitlines()
out = []
keep = False
for ln in lines:
if ln.startswith("## "):
keep = any(ln.startswith(w) for w in wanted)
if keep:
out.append(ln)
excerpt = "\n".join(out).strip()
return excerpt[:1500]
except Exception as e: # noqa: BLE001 - never-raise (ORCH-046 defensive)
logger.warning("extract_security_findings error for %s: %s", report_path, e)
return ""
# ---------------------------------------------------------------------------
# Orchestrating entry — delegated to by qg.checks.check_security_gate
# ---------------------------------------------------------------------------
def check_security_gate(repo: str, work_item_id: str, branch: str) -> tuple[bool, str]:
"""ORCH-022 security-gate on the deploy-staging -> deploy edge, run FIRST.
Deterministic, no LLM. Algorithm (ADR-001 Р-1/Р-5):
1. Conditionality: ``security_gate_enabled=False`` -> ``(True, "...disabled")``;
a repo the gate is not real for -> ``(True, "security-gate N/A for <repo>")``.
2. ``scan_secrets`` (offline) + ``audit_dependencies`` (best-effort).
3. ``compute_verdict`` -> write ``17-security-report.md`` -> read the verdict
BACK via ``parse_security_status`` (single source of truth: the returned
verdict == the artefact frontmatter, AC-8).
4. FAIL -> ``(False, reason)`` (engine rolls back to ``development``); PASS ->
``(True, reason)`` (engine proceeds to the merge-gate).
A degraded dep-audit on a PASS is surfaced loudly (Telegram + log) without
failing the gate (ADR-001 Р-3). Never-raise (AC-16): any internal error ->
``(False, "<reason>")``; an exception never escapes into ``advance_stage``.
"""
try:
if not settings.security_gate_enabled:
return True, "security-gate disabled"
if not security_gate_applies(repo):
return True, f"security-gate N/A for {repo}"
secret_result = scan_secrets(repo, branch)
dep_result = audit_dependencies(repo, branch)
fields = compute_verdict(
secret_result,
dep_result,
secrets_block=settings.security_secrets_block,
dep_block_severity=settings.security_dep_block_severity,
dep_fail_closed=settings.security_dep_audit_fail_closed,
)
path = write_security_report(repo, work_item_id, branch, fields)
# Read the machine verdict back from the artefact we just wrote — so the
# returned (bool, reason) is guaranteed == the YAML frontmatter (AC-8).
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
except OSError as e:
return False, f"cannot read security report (fail-closed): {e}"
ok, _verdict = parse_security_status(content)
# Surface a degraded dep-audit loudly even when the gate passes (Р-3 / BR-11).
if fields.get("deps_audit_degraded"):
logger.warning(
"security-gate %s/%s: dep-audit DEGRADED (fail-%s): %s",
repo, work_item_id,
"closed" if settings.security_dep_audit_fail_closed else "open",
dep_result.detail,
)
try:
from .notifications import send_telegram, link_for
send_telegram(
f"⚠️ {link_for(work_item_id)}: dep-audit недоступен фид CVE "
f"({dep_result.detail}). "
+ ("Гейт fail-closed → FAIL." if settings.security_dep_audit_fail_closed
else "Гейт fail-open → warning (секреты проверены оффлайн).")
)
except Exception as e: # noqa: BLE001 - telegram best-effort
logger.warning("security-gate degraded telegram failed: %s", e)
if ok:
logger.info("security-gate passed for %s/%s: %s", repo, work_item_id, fields["reason"])
return True, f"security clean ({fields['reason']})"
return False, fields["reason"]
except Exception as e: # noqa: BLE001 - never-raise contract (AC-16)
logger.error("check_security_gate error for %s/%s: %s", repo, branch, e)
return False, f"security-gate error: {e}"