src/frontmatter.py grows from a single-key reader into the full machine
contract: reader (read_frontmatter_value, unchanged), one parse primitive
(parse_frontmatter), writer (render/write_frontmatter), schema validator
(validate_schema/REQUIRED_FIELDS, warning-only by default) and a shared
strip_frontmatter helper. The five verdict gates (check_reviewer_verdict,
_parse_tests_verdict, _parse_deploy_status, _parse_staging_status,
parse_security_status) now read through the single parse_frontmatter point
instead of duplicated ad-hoc YAML logic; review_parse._strip_frontmatter and
security_gate.extract_security_findings reuse the shared helper.
Strictly backward compatible + never-raise: STAGE_TRANSITIONS, the QG_CHECKS
composition, verdict semantics (incl. ORCH-047 three-field tester + negative
token priority), reason-strings and worktree->origin/main fallback are 1:1.
The schema validator never influences a gate verdict by default; hard-fail is
reserved behind the frontmatter_validation_strict kill-switch (default False).
New formal handoff spec docs/_standards/HANDOFF_PROTOCOL.md ("stage -> required
output" + required frontmatter schema), aligned 1:1 with PIPELINE_DOCS.md.
Tests: test_frontmatter.py (TC-01..07), test_qg_verdicts.py (TC-08..15),
test_security_gate.py (TC-12), test_stages_invariants.py (TC-16). Full
tests/ green (1212).
Refs: ORCH-076
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
351 lines
15 KiB
Python
351 lines
15 KiB
Python
"""ORCH-022 / TC-01..TC-12: the security-gate leaf module (src/security_gate.py).
|
||
|
||
These exercise the DETERMINISTIC core: the pure classifier / verdict / frontmatter
|
||
helpers (no binaries needed) plus scan_secrets / audit_dependencies with the
|
||
external scanners (gitleaks / pip-audit) mocked at subprocess.run. The integration
|
||
of the gate into advance_stage is covered in tests/test_stage_engine_security_gate.py;
|
||
the QG registry wiring in tests/test_qg_security.py.
|
||
|
||
Contract under test (ADR-001 §7):
|
||
* secrets are UNCONDITIONAL + offline -> a found secret blocks; a tool error is
|
||
fail-closed (FAIL);
|
||
* dependency audit is best-effort -> blocking only at/over the severity threshold;
|
||
UNKNOWN / below-threshold -> warning; an unreachable feed degrades fail-open +
|
||
warning by default, fail-closed only when configured;
|
||
* the machine verdict lives ONLY in the YAML frontmatter (read-back == written);
|
||
* never-raise: any internal error -> (False, reason), no exception escapes.
|
||
"""
|
||
|
||
import os
|
||
import subprocess
|
||
|
||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||
|
||
import pytest # noqa: E402
|
||
|
||
from src import security_gate as sg # noqa: E402
|
||
|
||
_REPO = "orchestrator"
|
||
_BRANCH = "feature/ORCH-022-x"
|
||
_WI = "ORCH-022"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Builders for the result containers (no binaries needed).
|
||
# ---------------------------------------------------------------------------
|
||
def _clean_secret():
|
||
return sg.SecretScanResult(status="clean", detail="no secrets found")
|
||
|
||
|
||
def _found_secret(n=1):
|
||
findings = [
|
||
{"file": "src/config.py", "rule": "generic-api-key", "line": 12 + i, "match": "abcd…yz"}
|
||
for i in range(n)
|
||
]
|
||
return sg.SecretScanResult(status="found", findings=findings, detail=f"{n} secret(s)")
|
||
|
||
|
||
def _ok_deps(findings=None):
|
||
return sg.DepAuditResult(status="ok", findings=findings or [], detail="ok")
|
||
|
||
|
||
def _degraded_deps():
|
||
return sg.DepAuditResult(status="degraded", detail="pip-audit feed unavailable")
|
||
|
||
|
||
def _verdict(secret, dep, *, secrets_block=True, dep_block_severity="HIGH", dep_fail_closed=False):
|
||
return sg.compute_verdict(
|
||
secret, dep,
|
||
secrets_block=secrets_block,
|
||
dep_block_severity=dep_block_severity,
|
||
dep_fail_closed=dep_fail_closed,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# TC-01 / TC-02 / TC-03 — secret-scanning (FR-1 / AC-1..AC-3)
|
||
# ---------------------------------------------------------------------------
|
||
def test_tc01_secret_in_diff_fails():
|
||
"""TC-01: a planted secret -> FAIL, secrets_found>=1, reason names the finding."""
|
||
fields = _verdict(_found_secret(1), _ok_deps())
|
||
assert fields["security_status"] == "FAIL"
|
||
assert fields["secrets_found"] >= 1
|
||
# The reason must name the finding substance (rule + file), not just "FAIL".
|
||
assert "generic-api-key" in fields["reason"]
|
||
assert "src/config.py" in fields["reason"]
|
||
|
||
|
||
def test_tc02_clean_branch_passes():
|
||
"""TC-02: a clean branch -> PASS, secrets_found=0."""
|
||
fields = _verdict(_clean_secret(), _ok_deps())
|
||
assert fields["security_status"] == "PASS"
|
||
assert fields["secrets_found"] == 0
|
||
assert fields["deps_blocking"] == 0
|
||
|
||
|
||
def test_tc03_allowlisted_match_does_not_fail(monkeypatch, tmp_path):
|
||
"""TC-03: an allowlisted match (placeholder / fixture) is filtered by gitleaks
|
||
(rc=0) -> scan_secrets reports clean -> PASS. The allowlist lives in the
|
||
versioned .gitleaks.toml; here we assert the gate honours gitleaks' rc=0."""
|
||
wt = tmp_path / "wt"
|
||
wt.mkdir()
|
||
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
|
||
|
||
def _fake_run(cmd, **kwargs):
|
||
# `git fetch` and `gitleaks detect` both routed here; both "succeed clean".
|
||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||
|
||
monkeypatch.setattr(sg.subprocess, "run", _fake_run)
|
||
res = sg.scan_secrets(_REPO, _BRANCH)
|
||
assert res.status == "clean"
|
||
fields = _verdict(res, _ok_deps())
|
||
assert fields["security_status"] == "PASS"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# TC-04..TC-07 — dependency audit + thresholds (FR-2 / AC-4..AC-7)
|
||
# ---------------------------------------------------------------------------
|
||
def test_tc04_high_cve_at_high_threshold_blocks():
|
||
"""TC-04: a HIGH/CRITICAL CVE at threshold HIGH -> FAIL, deps_blocking>=1."""
|
||
deps = _ok_deps([
|
||
{"package": "requests", "version": "2.0.0", "id": "CVE-1", "severity": "HIGH", "fix": "2.1"},
|
||
{"package": "urllib3", "version": "1.0.0", "id": "CVE-2", "severity": "CRITICAL", "fix": "1.1"},
|
||
])
|
||
fields = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
|
||
assert fields["security_status"] == "FAIL"
|
||
assert fields["deps_blocking"] >= 1
|
||
assert "CVE-1" in fields["reason"] or "CVE-2" in fields["reason"]
|
||
|
||
|
||
def test_tc05_only_medium_low_warns_passes():
|
||
"""TC-05: only MEDIUM/LOW vulns -> PASS, deps_warning>=1, findings in the body."""
|
||
deps = _ok_deps([
|
||
{"package": "jinja2", "version": "2.0", "id": "CVE-M", "severity": "MEDIUM", "fix": "2.1"},
|
||
{"package": "click", "version": "7.0", "id": "CVE-L", "severity": "LOW", "fix": ""},
|
||
])
|
||
fields = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
|
||
assert fields["security_status"] == "PASS"
|
||
assert fields["deps_warning"] >= 1
|
||
assert fields["deps_blocking"] == 0
|
||
body = sg.render_security_report(_WI, fields)
|
||
assert "CVE-M" in body and "CVE-L" in body
|
||
|
||
|
||
def test_tc06_threshold_config_changes_classification():
|
||
"""TC-06: severity=CRITICAL makes a HIGH CVE a warning; severity=HIGH blocks it."""
|
||
assert sg.classify_severity("HIGH", "CRITICAL") == "warning"
|
||
assert sg.classify_severity("HIGH", "HIGH") == "block"
|
||
assert sg.classify_severity("CRITICAL", "CRITICAL") == "block"
|
||
# UNKNOWN is ALWAYS a warning, never an auto-block (anti-loop, Р-4).
|
||
assert sg.classify_severity("UNKNOWN", "LOW") == "warning"
|
||
assert sg.classify_severity("", "HIGH") == "warning"
|
||
|
||
deps = _ok_deps([
|
||
{"package": "x", "version": "1", "id": "CVE-H", "severity": "HIGH", "fix": ""},
|
||
])
|
||
at_critical = _verdict(_clean_secret(), deps, dep_block_severity="CRITICAL")
|
||
at_high = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
|
||
assert at_critical["security_status"] == "PASS"
|
||
assert at_critical["deps_warning"] == 1
|
||
assert at_high["security_status"] == "FAIL"
|
||
assert at_high["deps_blocking"] == 1
|
||
|
||
|
||
def test_tc07_degraded_feed_failopen_default_failclosed_strict():
|
||
"""TC-07: an unreachable CVE feed degrades fail-open + warning by default (no
|
||
exception, no false FAIL); fail-closed -> FAIL only when configured."""
|
||
default = _verdict(_clean_secret(), _degraded_deps(), dep_fail_closed=False)
|
||
assert default["security_status"] == "PASS"
|
||
assert default["deps_audit_degraded"] is True
|
||
|
||
strict = _verdict(_clean_secret(), _degraded_deps(), dep_fail_closed=True)
|
||
assert strict["security_status"] == "FAIL"
|
||
assert strict["deps_audit_degraded"] is True
|
||
assert "fail-closed" in strict["reason"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# TC-08..TC-10 — verdict / frontmatter parser + artefact (FR-3 / AC-8..AC-10)
|
||
# ---------------------------------------------------------------------------
|
||
def test_tc08_verdict_only_from_frontmatter():
|
||
"""TC-08: the verdict is read ONLY from the YAML frontmatter; prose in the body
|
||
does not influence it; the negative (FAIL) token is authoritative."""
|
||
# Frontmatter PASS but body screams FAIL -> still PASS (prose ignored).
|
||
pass_fm = (
|
||
"---\nsecurity_status: PASS\nsecrets_found: 0\n---\n"
|
||
"# Report\nThis build totally FAILED everything, FAIL FAIL.\n"
|
||
)
|
||
ok, reason = sg.parse_security_status(pass_fm)
|
||
assert ok is True
|
||
assert "PASS" in reason
|
||
|
||
# Frontmatter FAIL but body says PASS -> FAIL (negative token authoritative).
|
||
fail_fm = "---\nsecurity_status: FAIL\n---\nEverything PASS, looks great!\n"
|
||
ok, reason = sg.parse_security_status(fail_fm)
|
||
assert ok is False
|
||
assert "FAIL" in reason
|
||
|
||
|
||
def test_tc09_missing_or_broken_frontmatter_failclosed():
|
||
"""TC-09: no frontmatter / broken YAML / missing field -> (False, reason)."""
|
||
# No frontmatter at all.
|
||
ok, reason = sg.parse_security_status("# Just a body, no frontmatter\nPASS\n")
|
||
assert ok is False and reason
|
||
|
||
# Frontmatter present but no security_status field.
|
||
ok, reason = sg.parse_security_status("---\nother: 1\n---\nbody\n")
|
||
assert ok is False
|
||
|
||
# Broken YAML in the frontmatter.
|
||
ok, reason = sg.parse_security_status("---\nsecurity_status: : : [bad\n---\nbody\n")
|
||
assert ok is False
|
||
|
||
|
||
def test_orch076_parse_security_status_via_unified_api():
|
||
"""ORCH-076 TC-12: parse_security_status now reads through the unified
|
||
frontmatter primitive; the PASS/FAIL semantics are 1:1 with before, and an
|
||
old report WITHOUT the new schema fields still reads exactly the same."""
|
||
from src import frontmatter as fm
|
||
|
||
# Delegates to the single parse primitive (no private duplicated parse).
|
||
assert "parse_frontmatter" in sg.parse_security_status.__doc__
|
||
|
||
# PASS / FAIL semantics preserved.
|
||
assert sg.parse_security_status("---\nsecurity_status: PASS\n---\n")[0] is True
|
||
assert sg.parse_security_status("---\nsecurity_status: FAIL\n---\n")[0] is False
|
||
|
||
# An additive full schema does not change the verdict (FR-5 / AC-4).
|
||
schema = (
|
||
"work_item: ORCH-076\nstage: deploy-staging\nauthor_agent: deployer\n"
|
||
"status: PASS\ncreated_at: 2026-06-09\nmodel_used: claude-opus-4-8\n"
|
||
)
|
||
with_schema = fm.render_frontmatter(
|
||
{**{k.split(":")[0]: v for k, v in
|
||
(line.split(": ", 1) for line in schema.strip().splitlines())},
|
||
"security_status": "PASS"}
|
||
)
|
||
assert sg.parse_security_status(with_schema)[0] is True
|
||
|
||
|
||
def test_tc10_artifact_has_valid_frontmatter_and_body(tmp_path, monkeypatch):
|
||
"""TC-10: 17-security-report.md is written with valid frontmatter (all machine
|
||
fields) and a body listing the findings; read-back == the written verdict."""
|
||
wt = tmp_path / "wt"
|
||
wt.mkdir()
|
||
monkeypatch.setattr(sg, "get_worktree_path", lambda r, b: str(wt))
|
||
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
|
||
|
||
deps = _ok_deps([
|
||
{"package": "requests", "version": "2.0", "id": "CVE-X", "severity": "HIGH", "fix": "2.1"},
|
||
{"package": "click", "version": "7.0", "id": "CVE-L", "severity": "LOW", "fix": ""},
|
||
])
|
||
fields = _verdict(_found_secret(1), deps, dep_block_severity="HIGH")
|
||
path = sg.write_security_report(_REPO, _WI, _BRANCH, fields)
|
||
assert os.path.isfile(path)
|
||
with open(path, encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
# Frontmatter carries every machine field.
|
||
for key in ("security_status", "secrets_found", "deps_blocking", "deps_warning",
|
||
"deps_audit_degraded"):
|
||
assert f"{key}:" in content
|
||
# Body lists findings.
|
||
assert "CVE-X" in content and "CVE-L" in content
|
||
# Read-back agrees with the computed status (single source of truth, AC-8).
|
||
ok, _ = sg.parse_security_status(content)
|
||
assert ok is (fields["security_status"] == "PASS")
|
||
assert ok is False # this fixture is a FAIL (secret + HIGH CVE)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# TC-11 / TC-12 — never-raise / timeout (FR-5/FR-6 / AC-14..AC-17)
|
||
# ---------------------------------------------------------------------------
|
||
def test_tc11_missing_binary_failclosed_never_raises(monkeypatch, tmp_path):
|
||
"""TC-11: a missing scanner binary / internal exception -> error -> FAIL
|
||
(fail-closed for secrets), and the exception never propagates."""
|
||
wt = tmp_path / "wt"
|
||
wt.mkdir()
|
||
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
|
||
|
||
def _raise_fnf(cmd, **kwargs):
|
||
# git fetch ok, gitleaks missing.
|
||
if cmd[:1] == ["git"]:
|
||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||
raise FileNotFoundError("gitleaks")
|
||
|
||
monkeypatch.setattr(sg.subprocess, "run", _raise_fnf)
|
||
res = sg.scan_secrets(_REPO, _BRANCH)
|
||
assert res.status == "error"
|
||
fields = _verdict(res, _ok_deps())
|
||
assert fields["security_status"] == "FAIL" # fail-closed, BR-2
|
||
assert "fail-closed" in fields["reason"]
|
||
|
||
# check_security_gate as a whole never raises even if everything explodes.
|
||
monkeypatch.setattr(sg, "security_gate_applies", lambda r: True)
|
||
|
||
def _boom(*a, **k):
|
||
raise RuntimeError("kaboom")
|
||
|
||
monkeypatch.setattr(sg, "scan_secrets", _boom)
|
||
ok, reason = sg.check_security_gate(_REPO, _WI, _BRANCH)
|
||
assert ok is False
|
||
assert "error" in reason.lower()
|
||
|
||
|
||
def test_tc12_timeout_is_deterministic_failclosed(monkeypatch, tmp_path):
|
||
"""TC-12: exceeding the scan timeout -> a deterministic error verdict, no hang."""
|
||
wt = tmp_path / "wt"
|
||
wt.mkdir()
|
||
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
|
||
|
||
def _timeout(cmd, **kwargs):
|
||
if cmd[:1] == ["git"]:
|
||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||
raise subprocess.TimeoutExpired(cmd, kwargs.get("timeout", 1))
|
||
|
||
monkeypatch.setattr(sg.subprocess, "run", _timeout)
|
||
res = sg.scan_secrets(_REPO, _BRANCH)
|
||
assert res.status == "error"
|
||
assert "timeout" in res.detail.lower()
|
||
fields = _verdict(res, _ok_deps())
|
||
assert fields["security_status"] == "FAIL"
|
||
|
||
# pip-audit timeout -> degrade (best-effort), not a hard error.
|
||
monkeypatch.setattr(sg, "get_worktree_path", lambda r, b: str(wt))
|
||
(wt / "requirements.txt").write_text("requests==2.0\n")
|
||
dep = sg.audit_dependencies(_REPO, _BRANCH)
|
||
assert dep.status == "degraded"
|
||
assert "timeout" in dep.detail.lower()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Parser robustness (supports the above; pure, never raises)
|
||
# ---------------------------------------------------------------------------
|
||
def test_parse_gitleaks_report_tolerant():
|
||
assert sg.parse_gitleaks_report("") == []
|
||
assert sg.parse_gitleaks_report("not json") == []
|
||
assert sg.parse_gitleaks_report("{}") == []
|
||
parsed = sg.parse_gitleaks_report(
|
||
'[{"File":"a.py","RuleID":"key","StartLine":3,"Secret":"supersecretvalue"}]'
|
||
)
|
||
assert parsed[0]["file"] == "a.py"
|
||
assert parsed[0]["rule"] == "key"
|
||
# The secret value is masked, never re-leaked verbatim.
|
||
assert "supersecretvalue" not in parsed[0]["match"]
|
||
|
||
|
||
def test_parse_pip_audit_report_tolerant():
|
||
assert sg.parse_pip_audit_report("") == []
|
||
assert sg.parse_pip_audit_report("garbage") == []
|
||
doc = (
|
||
'{"dependencies":[{"name":"requests","version":"2.0",'
|
||
'"vulns":[{"id":"CVE-1","severity":"HIGH","fix_versions":["2.1"]}]}]}'
|
||
)
|
||
parsed = sg.parse_pip_audit_report(doc)
|
||
assert parsed[0]["package"] == "requests"
|
||
assert parsed[0]["severity"] == "HIGH"
|
||
# Missing severity -> UNKNOWN.
|
||
doc2 = '{"dependencies":[{"name":"x","version":"1","vulns":[{"id":"CVE-2"}]}]}'
|
||
assert sg.parse_pip_audit_report(doc2)[0]["severity"] == "UNKNOWN"
|