Files
orchestrator/tests/test_security_gate.py
claude-bot 92961d1d32 refactor(frontmatter): unified frontmatter contract + handoff spec (ORCH-52c)
src/frontmatter.py grows from a single-key reader into the full machine
contract: reader (read_frontmatter_value, unchanged), one parse primitive
(parse_frontmatter), writer (render/write_frontmatter), schema validator
(validate_schema/REQUIRED_FIELDS, warning-only by default) and a shared
strip_frontmatter helper. The five verdict gates (check_reviewer_verdict,
_parse_tests_verdict, _parse_deploy_status, _parse_staging_status,
parse_security_status) now read through the single parse_frontmatter point
instead of duplicated ad-hoc YAML logic; review_parse._strip_frontmatter and
security_gate.extract_security_findings reuse the shared helper.

Strictly backward compatible + never-raise: STAGE_TRANSITIONS, the QG_CHECKS
composition, verdict semantics (incl. ORCH-047 three-field tester + negative
token priority), reason-strings and worktree->origin/main fallback are 1:1.
The schema validator never influences a gate verdict by default; hard-fail is
reserved behind the frontmatter_validation_strict kill-switch (default False).

New formal handoff spec docs/_standards/HANDOFF_PROTOCOL.md ("stage -> required
output" + required frontmatter schema), aligned 1:1 with PIPELINE_DOCS.md.

Tests: test_frontmatter.py (TC-01..07), test_qg_verdicts.py (TC-08..15),
test_security_gate.py (TC-12), test_stages_invariants.py (TC-16). Full
tests/ green (1212).

Refs: ORCH-076

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 14:14:30 +03:00

351 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ORCH-022 / TC-01..TC-12: the security-gate leaf module (src/security_gate.py).
These exercise the DETERMINISTIC core: the pure classifier / verdict / frontmatter
helpers (no binaries needed) plus scan_secrets / audit_dependencies with the
external scanners (gitleaks / pip-audit) mocked at subprocess.run. The integration
of the gate into advance_stage is covered in tests/test_stage_engine_security_gate.py;
the QG registry wiring in tests/test_qg_security.py.
Contract under test (ADR-001 §7):
* secrets are UNCONDITIONAL + offline -> a found secret blocks; a tool error is
fail-closed (FAIL);
* dependency audit is best-effort -> blocking only at/over the severity threshold;
UNKNOWN / below-threshold -> warning; an unreachable feed degrades fail-open +
warning by default, fail-closed only when configured;
* the machine verdict lives ONLY in the YAML frontmatter (read-back == written);
* never-raise: any internal error -> (False, reason), no exception escapes.
"""
import os
import subprocess
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import pytest # noqa: E402
from src import security_gate as sg # noqa: E402
_REPO = "orchestrator"
_BRANCH = "feature/ORCH-022-x"
_WI = "ORCH-022"
# ---------------------------------------------------------------------------
# Builders for the result containers (no binaries needed).
# ---------------------------------------------------------------------------
def _clean_secret():
return sg.SecretScanResult(status="clean", detail="no secrets found")
def _found_secret(n=1):
findings = [
{"file": "src/config.py", "rule": "generic-api-key", "line": 12 + i, "match": "abcd…yz"}
for i in range(n)
]
return sg.SecretScanResult(status="found", findings=findings, detail=f"{n} secret(s)")
def _ok_deps(findings=None):
return sg.DepAuditResult(status="ok", findings=findings or [], detail="ok")
def _degraded_deps():
return sg.DepAuditResult(status="degraded", detail="pip-audit feed unavailable")
def _verdict(secret, dep, *, secrets_block=True, dep_block_severity="HIGH", dep_fail_closed=False):
return sg.compute_verdict(
secret, dep,
secrets_block=secrets_block,
dep_block_severity=dep_block_severity,
dep_fail_closed=dep_fail_closed,
)
# ---------------------------------------------------------------------------
# TC-01 / TC-02 / TC-03 — secret-scanning (FR-1 / AC-1..AC-3)
# ---------------------------------------------------------------------------
def test_tc01_secret_in_diff_fails():
"""TC-01: a planted secret -> FAIL, secrets_found>=1, reason names the finding."""
fields = _verdict(_found_secret(1), _ok_deps())
assert fields["security_status"] == "FAIL"
assert fields["secrets_found"] >= 1
# The reason must name the finding substance (rule + file), not just "FAIL".
assert "generic-api-key" in fields["reason"]
assert "src/config.py" in fields["reason"]
def test_tc02_clean_branch_passes():
"""TC-02: a clean branch -> PASS, secrets_found=0."""
fields = _verdict(_clean_secret(), _ok_deps())
assert fields["security_status"] == "PASS"
assert fields["secrets_found"] == 0
assert fields["deps_blocking"] == 0
def test_tc03_allowlisted_match_does_not_fail(monkeypatch, tmp_path):
"""TC-03: an allowlisted match (placeholder / fixture) is filtered by gitleaks
(rc=0) -> scan_secrets reports clean -> PASS. The allowlist lives in the
versioned .gitleaks.toml; here we assert the gate honours gitleaks' rc=0."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
def _fake_run(cmd, **kwargs):
# `git fetch` and `gitleaks detect` both routed here; both "succeed clean".
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
monkeypatch.setattr(sg.subprocess, "run", _fake_run)
res = sg.scan_secrets(_REPO, _BRANCH)
assert res.status == "clean"
fields = _verdict(res, _ok_deps())
assert fields["security_status"] == "PASS"
# ---------------------------------------------------------------------------
# TC-04..TC-07 — dependency audit + thresholds (FR-2 / AC-4..AC-7)
# ---------------------------------------------------------------------------
def test_tc04_high_cve_at_high_threshold_blocks():
"""TC-04: a HIGH/CRITICAL CVE at threshold HIGH -> FAIL, deps_blocking>=1."""
deps = _ok_deps([
{"package": "requests", "version": "2.0.0", "id": "CVE-1", "severity": "HIGH", "fix": "2.1"},
{"package": "urllib3", "version": "1.0.0", "id": "CVE-2", "severity": "CRITICAL", "fix": "1.1"},
])
fields = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
assert fields["security_status"] == "FAIL"
assert fields["deps_blocking"] >= 1
assert "CVE-1" in fields["reason"] or "CVE-2" in fields["reason"]
def test_tc05_only_medium_low_warns_passes():
"""TC-05: only MEDIUM/LOW vulns -> PASS, deps_warning>=1, findings in the body."""
deps = _ok_deps([
{"package": "jinja2", "version": "2.0", "id": "CVE-M", "severity": "MEDIUM", "fix": "2.1"},
{"package": "click", "version": "7.0", "id": "CVE-L", "severity": "LOW", "fix": ""},
])
fields = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
assert fields["security_status"] == "PASS"
assert fields["deps_warning"] >= 1
assert fields["deps_blocking"] == 0
body = sg.render_security_report(_WI, fields)
assert "CVE-M" in body and "CVE-L" in body
def test_tc06_threshold_config_changes_classification():
"""TC-06: severity=CRITICAL makes a HIGH CVE a warning; severity=HIGH blocks it."""
assert sg.classify_severity("HIGH", "CRITICAL") == "warning"
assert sg.classify_severity("HIGH", "HIGH") == "block"
assert sg.classify_severity("CRITICAL", "CRITICAL") == "block"
# UNKNOWN is ALWAYS a warning, never an auto-block (anti-loop, Р-4).
assert sg.classify_severity("UNKNOWN", "LOW") == "warning"
assert sg.classify_severity("", "HIGH") == "warning"
deps = _ok_deps([
{"package": "x", "version": "1", "id": "CVE-H", "severity": "HIGH", "fix": ""},
])
at_critical = _verdict(_clean_secret(), deps, dep_block_severity="CRITICAL")
at_high = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
assert at_critical["security_status"] == "PASS"
assert at_critical["deps_warning"] == 1
assert at_high["security_status"] == "FAIL"
assert at_high["deps_blocking"] == 1
def test_tc07_degraded_feed_failopen_default_failclosed_strict():
"""TC-07: an unreachable CVE feed degrades fail-open + warning by default (no
exception, no false FAIL); fail-closed -> FAIL only when configured."""
default = _verdict(_clean_secret(), _degraded_deps(), dep_fail_closed=False)
assert default["security_status"] == "PASS"
assert default["deps_audit_degraded"] is True
strict = _verdict(_clean_secret(), _degraded_deps(), dep_fail_closed=True)
assert strict["security_status"] == "FAIL"
assert strict["deps_audit_degraded"] is True
assert "fail-closed" in strict["reason"]
# ---------------------------------------------------------------------------
# TC-08..TC-10 — verdict / frontmatter parser + artefact (FR-3 / AC-8..AC-10)
# ---------------------------------------------------------------------------
def test_tc08_verdict_only_from_frontmatter():
"""TC-08: the verdict is read ONLY from the YAML frontmatter; prose in the body
does not influence it; the negative (FAIL) token is authoritative."""
# Frontmatter PASS but body screams FAIL -> still PASS (prose ignored).
pass_fm = (
"---\nsecurity_status: PASS\nsecrets_found: 0\n---\n"
"# Report\nThis build totally FAILED everything, FAIL FAIL.\n"
)
ok, reason = sg.parse_security_status(pass_fm)
assert ok is True
assert "PASS" in reason
# Frontmatter FAIL but body says PASS -> FAIL (negative token authoritative).
fail_fm = "---\nsecurity_status: FAIL\n---\nEverything PASS, looks great!\n"
ok, reason = sg.parse_security_status(fail_fm)
assert ok is False
assert "FAIL" in reason
def test_tc09_missing_or_broken_frontmatter_failclosed():
"""TC-09: no frontmatter / broken YAML / missing field -> (False, reason)."""
# No frontmatter at all.
ok, reason = sg.parse_security_status("# Just a body, no frontmatter\nPASS\n")
assert ok is False and reason
# Frontmatter present but no security_status field.
ok, reason = sg.parse_security_status("---\nother: 1\n---\nbody\n")
assert ok is False
# Broken YAML in the frontmatter.
ok, reason = sg.parse_security_status("---\nsecurity_status: : : [bad\n---\nbody\n")
assert ok is False
def test_orch076_parse_security_status_via_unified_api():
"""ORCH-076 TC-12: parse_security_status now reads through the unified
frontmatter primitive; the PASS/FAIL semantics are 1:1 with before, and an
old report WITHOUT the new schema fields still reads exactly the same."""
from src import frontmatter as fm
# Delegates to the single parse primitive (no private duplicated parse).
assert "parse_frontmatter" in sg.parse_security_status.__doc__
# PASS / FAIL semantics preserved.
assert sg.parse_security_status("---\nsecurity_status: PASS\n---\n")[0] is True
assert sg.parse_security_status("---\nsecurity_status: FAIL\n---\n")[0] is False
# An additive full schema does not change the verdict (FR-5 / AC-4).
schema = (
"work_item: ORCH-076\nstage: deploy-staging\nauthor_agent: deployer\n"
"status: PASS\ncreated_at: 2026-06-09\nmodel_used: claude-opus-4-8\n"
)
with_schema = fm.render_frontmatter(
{**{k.split(":")[0]: v for k, v in
(line.split(": ", 1) for line in schema.strip().splitlines())},
"security_status": "PASS"}
)
assert sg.parse_security_status(with_schema)[0] is True
def test_tc10_artifact_has_valid_frontmatter_and_body(tmp_path, monkeypatch):
"""TC-10: 17-security-report.md is written with valid frontmatter (all machine
fields) and a body listing the findings; read-back == the written verdict."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "get_worktree_path", lambda r, b: str(wt))
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
deps = _ok_deps([
{"package": "requests", "version": "2.0", "id": "CVE-X", "severity": "HIGH", "fix": "2.1"},
{"package": "click", "version": "7.0", "id": "CVE-L", "severity": "LOW", "fix": ""},
])
fields = _verdict(_found_secret(1), deps, dep_block_severity="HIGH")
path = sg.write_security_report(_REPO, _WI, _BRANCH, fields)
assert os.path.isfile(path)
with open(path, encoding="utf-8") as f:
content = f.read()
# Frontmatter carries every machine field.
for key in ("security_status", "secrets_found", "deps_blocking", "deps_warning",
"deps_audit_degraded"):
assert f"{key}:" in content
# Body lists findings.
assert "CVE-X" in content and "CVE-L" in content
# Read-back agrees with the computed status (single source of truth, AC-8).
ok, _ = sg.parse_security_status(content)
assert ok is (fields["security_status"] == "PASS")
assert ok is False # this fixture is a FAIL (secret + HIGH CVE)
# ---------------------------------------------------------------------------
# TC-11 / TC-12 — never-raise / timeout (FR-5/FR-6 / AC-14..AC-17)
# ---------------------------------------------------------------------------
def test_tc11_missing_binary_failclosed_never_raises(monkeypatch, tmp_path):
"""TC-11: a missing scanner binary / internal exception -> error -> FAIL
(fail-closed for secrets), and the exception never propagates."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
def _raise_fnf(cmd, **kwargs):
# git fetch ok, gitleaks missing.
if cmd[:1] == ["git"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise FileNotFoundError("gitleaks")
monkeypatch.setattr(sg.subprocess, "run", _raise_fnf)
res = sg.scan_secrets(_REPO, _BRANCH)
assert res.status == "error"
fields = _verdict(res, _ok_deps())
assert fields["security_status"] == "FAIL" # fail-closed, BR-2
assert "fail-closed" in fields["reason"]
# check_security_gate as a whole never raises even if everything explodes.
monkeypatch.setattr(sg, "security_gate_applies", lambda r: True)
def _boom(*a, **k):
raise RuntimeError("kaboom")
monkeypatch.setattr(sg, "scan_secrets", _boom)
ok, reason = sg.check_security_gate(_REPO, _WI, _BRANCH)
assert ok is False
assert "error" in reason.lower()
def test_tc12_timeout_is_deterministic_failclosed(monkeypatch, tmp_path):
"""TC-12: exceeding the scan timeout -> a deterministic error verdict, no hang."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
def _timeout(cmd, **kwargs):
if cmd[:1] == ["git"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise subprocess.TimeoutExpired(cmd, kwargs.get("timeout", 1))
monkeypatch.setattr(sg.subprocess, "run", _timeout)
res = sg.scan_secrets(_REPO, _BRANCH)
assert res.status == "error"
assert "timeout" in res.detail.lower()
fields = _verdict(res, _ok_deps())
assert fields["security_status"] == "FAIL"
# pip-audit timeout -> degrade (best-effort), not a hard error.
monkeypatch.setattr(sg, "get_worktree_path", lambda r, b: str(wt))
(wt / "requirements.txt").write_text("requests==2.0\n")
dep = sg.audit_dependencies(_REPO, _BRANCH)
assert dep.status == "degraded"
assert "timeout" in dep.detail.lower()
# ---------------------------------------------------------------------------
# Parser robustness (supports the above; pure, never raises)
# ---------------------------------------------------------------------------
def test_parse_gitleaks_report_tolerant():
assert sg.parse_gitleaks_report("") == []
assert sg.parse_gitleaks_report("not json") == []
assert sg.parse_gitleaks_report("{}") == []
parsed = sg.parse_gitleaks_report(
'[{"File":"a.py","RuleID":"key","StartLine":3,"Secret":"supersecretvalue"}]'
)
assert parsed[0]["file"] == "a.py"
assert parsed[0]["rule"] == "key"
# The secret value is masked, never re-leaked verbatim.
assert "supersecretvalue" not in parsed[0]["match"]
def test_parse_pip_audit_report_tolerant():
assert sg.parse_pip_audit_report("") == []
assert sg.parse_pip_audit_report("garbage") == []
doc = (
'{"dependencies":[{"name":"requests","version":"2.0",'
'"vulns":[{"id":"CVE-1","severity":"HIGH","fix_versions":["2.1"]}]}]}'
)
parsed = sg.parse_pip_audit_report(doc)
assert parsed[0]["package"] == "requests"
assert parsed[0]["severity"] == "HIGH"
# Missing severity -> UNKNOWN.
doc2 = '{"dependencies":[{"name":"x","version":"1","vulns":[{"id":"CVE-2"}]}]}'
assert sg.parse_pip_audit_report(doc2)[0]["severity"] == "UNKNOWN"