feat(security): security-gate (gitleaks secret-scan + pip-audit) before merge

Add a deterministic (no-LLM) security sub-gate on the deploy-staging -> deploy
edge, run FIRST (before merge-gate ORCH-043 and image-freshness ORCH-058) so it
fails cheaply before any expensive rebase/rebuild, and scans origin/main..HEAD
before rebase so a task is never blamed for a CVE introduced by an updated main.

Why: the autonomous pipeline merged branches into main with no check for a leaked
secret or a vulnerable dependency. For the self-hosting orchestrator (one shared
prod instance serving every project from a shared DB) a single leak/CVE landed in
the prod of all projects (CLAUDE.md self-hosting, section 8).

- New leaf src/security_gate.py (never-raise): gitleaks (offline, fail-closed on
  tool error => secrets guarantee is unconditional) + pip-audit (best-effort;
  unreachable CVE feed degrades fail-open + loud warning by default, strict via
  security_dep_audit_fail_closed). Verdict lives ONLY in 17-security-report.md
  YAML frontmatter (write -> read-back single source of truth); FAIL is
  authoritative; missing/broken frontmatter => fail-closed.
- check_security_gate thin wrapper registered in QG_CHECKS (lazy import, no cycle).
- _handle_security_gate wired FIRST in advance_stage deploy-staging block: FAIL ->
  rollback to development + developer-retry (cap MAX_DEVELOPER_RETRIES); task_desc
  carries verbatim findings (ORCH-046 pattern). No merge-lease release (runs before
  lease acquire). Self-hosting safe: only reads/scans/writes, never deploys.
- Conditional rollout (security_gate_enabled + security_gate_repos; empty scope ->
  self-hosting only). 6 new ORCH_SECURITY_* settings.
- Infra: pinned gitleaks Go binary in Dockerfile (+curl/ca-certificates), pip-audit
  in requirements.txt, versioned .gitleaks.toml at repo root.
- STAGE_TRANSITIONS and DB schema unchanged.

Docs: docs/architecture/README.md (marked realized), CLAUDE.md (artifact 17),
CHANGELOG.md. Tests: test_security_gate.py, test_qg_security.py,
test_stage_engine_security_gate.py + updated registry/edge snapshots.

Refs: ORCH-022

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-07 17:23:13 +00:00
committed by deployer
parent 44db94e462
commit 30b6187c73
18 changed files with 1643 additions and 6 deletions

324
tests/test_security_gate.py Normal file
View File

@@ -0,0 +1,324 @@
"""ORCH-022 / TC-01..TC-12: the security-gate leaf module (src/security_gate.py).
These exercise the DETERMINISTIC core: the pure classifier / verdict / frontmatter
helpers (no binaries needed) plus scan_secrets / audit_dependencies with the
external scanners (gitleaks / pip-audit) mocked at subprocess.run. The integration
of the gate into advance_stage is covered in tests/test_stage_engine_security_gate.py;
the QG registry wiring in tests/test_qg_security.py.
Contract under test (ADR-001 §7):
* secrets are UNCONDITIONAL + offline -> a found secret blocks; a tool error is
fail-closed (FAIL);
* dependency audit is best-effort -> blocking only at/over the severity threshold;
UNKNOWN / below-threshold -> warning; an unreachable feed degrades fail-open +
warning by default, fail-closed only when configured;
* the machine verdict lives ONLY in the YAML frontmatter (read-back == written);
* never-raise: any internal error -> (False, reason), no exception escapes.
"""
import os
import subprocess
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import pytest # noqa: E402
from src import security_gate as sg # noqa: E402
_REPO = "orchestrator"
_BRANCH = "feature/ORCH-022-x"
_WI = "ORCH-022"
# ---------------------------------------------------------------------------
# Builders for the result containers (no binaries needed).
# ---------------------------------------------------------------------------
def _clean_secret():
return sg.SecretScanResult(status="clean", detail="no secrets found")
def _found_secret(n=1):
findings = [
{"file": "src/config.py", "rule": "generic-api-key", "line": 12 + i, "match": "abcd…yz"}
for i in range(n)
]
return sg.SecretScanResult(status="found", findings=findings, detail=f"{n} secret(s)")
def _ok_deps(findings=None):
return sg.DepAuditResult(status="ok", findings=findings or [], detail="ok")
def _degraded_deps():
return sg.DepAuditResult(status="degraded", detail="pip-audit feed unavailable")
def _verdict(secret, dep, *, secrets_block=True, dep_block_severity="HIGH", dep_fail_closed=False):
return sg.compute_verdict(
secret, dep,
secrets_block=secrets_block,
dep_block_severity=dep_block_severity,
dep_fail_closed=dep_fail_closed,
)
# ---------------------------------------------------------------------------
# TC-01 / TC-02 / TC-03 — secret-scanning (FR-1 / AC-1..AC-3)
# ---------------------------------------------------------------------------
def test_tc01_secret_in_diff_fails():
"""TC-01: a planted secret -> FAIL, secrets_found>=1, reason names the finding."""
fields = _verdict(_found_secret(1), _ok_deps())
assert fields["security_status"] == "FAIL"
assert fields["secrets_found"] >= 1
# The reason must name the finding substance (rule + file), not just "FAIL".
assert "generic-api-key" in fields["reason"]
assert "src/config.py" in fields["reason"]
def test_tc02_clean_branch_passes():
"""TC-02: a clean branch -> PASS, secrets_found=0."""
fields = _verdict(_clean_secret(), _ok_deps())
assert fields["security_status"] == "PASS"
assert fields["secrets_found"] == 0
assert fields["deps_blocking"] == 0
def test_tc03_allowlisted_match_does_not_fail(monkeypatch, tmp_path):
"""TC-03: an allowlisted match (placeholder / fixture) is filtered by gitleaks
(rc=0) -> scan_secrets reports clean -> PASS. The allowlist lives in the
versioned .gitleaks.toml; here we assert the gate honours gitleaks' rc=0."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
def _fake_run(cmd, **kwargs):
# `git fetch` and `gitleaks detect` both routed here; both "succeed clean".
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
monkeypatch.setattr(sg.subprocess, "run", _fake_run)
res = sg.scan_secrets(_REPO, _BRANCH)
assert res.status == "clean"
fields = _verdict(res, _ok_deps())
assert fields["security_status"] == "PASS"
# ---------------------------------------------------------------------------
# TC-04..TC-07 — dependency audit + thresholds (FR-2 / AC-4..AC-7)
# ---------------------------------------------------------------------------
def test_tc04_high_cve_at_high_threshold_blocks():
"""TC-04: a HIGH/CRITICAL CVE at threshold HIGH -> FAIL, deps_blocking>=1."""
deps = _ok_deps([
{"package": "requests", "version": "2.0.0", "id": "CVE-1", "severity": "HIGH", "fix": "2.1"},
{"package": "urllib3", "version": "1.0.0", "id": "CVE-2", "severity": "CRITICAL", "fix": "1.1"},
])
fields = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
assert fields["security_status"] == "FAIL"
assert fields["deps_blocking"] >= 1
assert "CVE-1" in fields["reason"] or "CVE-2" in fields["reason"]
def test_tc05_only_medium_low_warns_passes():
"""TC-05: only MEDIUM/LOW vulns -> PASS, deps_warning>=1, findings in the body."""
deps = _ok_deps([
{"package": "jinja2", "version": "2.0", "id": "CVE-M", "severity": "MEDIUM", "fix": "2.1"},
{"package": "click", "version": "7.0", "id": "CVE-L", "severity": "LOW", "fix": ""},
])
fields = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
assert fields["security_status"] == "PASS"
assert fields["deps_warning"] >= 1
assert fields["deps_blocking"] == 0
body = sg.render_security_report(_WI, fields)
assert "CVE-M" in body and "CVE-L" in body
def test_tc06_threshold_config_changes_classification():
"""TC-06: severity=CRITICAL makes a HIGH CVE a warning; severity=HIGH blocks it."""
assert sg.classify_severity("HIGH", "CRITICAL") == "warning"
assert sg.classify_severity("HIGH", "HIGH") == "block"
assert sg.classify_severity("CRITICAL", "CRITICAL") == "block"
# UNKNOWN is ALWAYS a warning, never an auto-block (anti-loop, Р-4).
assert sg.classify_severity("UNKNOWN", "LOW") == "warning"
assert sg.classify_severity("", "HIGH") == "warning"
deps = _ok_deps([
{"package": "x", "version": "1", "id": "CVE-H", "severity": "HIGH", "fix": ""},
])
at_critical = _verdict(_clean_secret(), deps, dep_block_severity="CRITICAL")
at_high = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
assert at_critical["security_status"] == "PASS"
assert at_critical["deps_warning"] == 1
assert at_high["security_status"] == "FAIL"
assert at_high["deps_blocking"] == 1
def test_tc07_degraded_feed_failopen_default_failclosed_strict():
"""TC-07: an unreachable CVE feed degrades fail-open + warning by default (no
exception, no false FAIL); fail-closed -> FAIL only when configured."""
default = _verdict(_clean_secret(), _degraded_deps(), dep_fail_closed=False)
assert default["security_status"] == "PASS"
assert default["deps_audit_degraded"] is True
strict = _verdict(_clean_secret(), _degraded_deps(), dep_fail_closed=True)
assert strict["security_status"] == "FAIL"
assert strict["deps_audit_degraded"] is True
assert "fail-closed" in strict["reason"]
# ---------------------------------------------------------------------------
# TC-08..TC-10 — verdict / frontmatter parser + artefact (FR-3 / AC-8..AC-10)
# ---------------------------------------------------------------------------
def test_tc08_verdict_only_from_frontmatter():
"""TC-08: the verdict is read ONLY from the YAML frontmatter; prose in the body
does not influence it; the negative (FAIL) token is authoritative."""
# Frontmatter PASS but body screams FAIL -> still PASS (prose ignored).
pass_fm = (
"---\nsecurity_status: PASS\nsecrets_found: 0\n---\n"
"# Report\nThis build totally FAILED everything, FAIL FAIL.\n"
)
ok, reason = sg.parse_security_status(pass_fm)
assert ok is True
assert "PASS" in reason
# Frontmatter FAIL but body says PASS -> FAIL (negative token authoritative).
fail_fm = "---\nsecurity_status: FAIL\n---\nEverything PASS, looks great!\n"
ok, reason = sg.parse_security_status(fail_fm)
assert ok is False
assert "FAIL" in reason
def test_tc09_missing_or_broken_frontmatter_failclosed():
"""TC-09: no frontmatter / broken YAML / missing field -> (False, reason)."""
# No frontmatter at all.
ok, reason = sg.parse_security_status("# Just a body, no frontmatter\nPASS\n")
assert ok is False and reason
# Frontmatter present but no security_status field.
ok, reason = sg.parse_security_status("---\nother: 1\n---\nbody\n")
assert ok is False
# Broken YAML in the frontmatter.
ok, reason = sg.parse_security_status("---\nsecurity_status: : : [bad\n---\nbody\n")
assert ok is False
def test_tc10_artifact_has_valid_frontmatter_and_body(tmp_path, monkeypatch):
"""TC-10: 17-security-report.md is written with valid frontmatter (all machine
fields) and a body listing the findings; read-back == the written verdict."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "get_worktree_path", lambda r, b: str(wt))
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
deps = _ok_deps([
{"package": "requests", "version": "2.0", "id": "CVE-X", "severity": "HIGH", "fix": "2.1"},
{"package": "click", "version": "7.0", "id": "CVE-L", "severity": "LOW", "fix": ""},
])
fields = _verdict(_found_secret(1), deps, dep_block_severity="HIGH")
path = sg.write_security_report(_REPO, _WI, _BRANCH, fields)
assert os.path.isfile(path)
with open(path, encoding="utf-8") as f:
content = f.read()
# Frontmatter carries every machine field.
for key in ("security_status", "secrets_found", "deps_blocking", "deps_warning",
"deps_audit_degraded"):
assert f"{key}:" in content
# Body lists findings.
assert "CVE-X" in content and "CVE-L" in content
# Read-back agrees with the computed status (single source of truth, AC-8).
ok, _ = sg.parse_security_status(content)
assert ok is (fields["security_status"] == "PASS")
assert ok is False # this fixture is a FAIL (secret + HIGH CVE)
# ---------------------------------------------------------------------------
# TC-11 / TC-12 — never-raise / timeout (FR-5/FR-6 / AC-14..AC-17)
# ---------------------------------------------------------------------------
def test_tc11_missing_binary_failclosed_never_raises(monkeypatch, tmp_path):
"""TC-11: a missing scanner binary / internal exception -> error -> FAIL
(fail-closed for secrets), and the exception never propagates."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
def _raise_fnf(cmd, **kwargs):
# git fetch ok, gitleaks missing.
if cmd[:1] == ["git"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise FileNotFoundError("gitleaks")
monkeypatch.setattr(sg.subprocess, "run", _raise_fnf)
res = sg.scan_secrets(_REPO, _BRANCH)
assert res.status == "error"
fields = _verdict(res, _ok_deps())
assert fields["security_status"] == "FAIL" # fail-closed, BR-2
assert "fail-closed" in fields["reason"]
# check_security_gate as a whole never raises even if everything explodes.
monkeypatch.setattr(sg, "security_gate_applies", lambda r: True)
def _boom(*a, **k):
raise RuntimeError("kaboom")
monkeypatch.setattr(sg, "scan_secrets", _boom)
ok, reason = sg.check_security_gate(_REPO, _WI, _BRANCH)
assert ok is False
assert "error" in reason.lower()
def test_tc12_timeout_is_deterministic_failclosed(monkeypatch, tmp_path):
"""TC-12: exceeding the scan timeout -> a deterministic error verdict, no hang."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
def _timeout(cmd, **kwargs):
if cmd[:1] == ["git"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise subprocess.TimeoutExpired(cmd, kwargs.get("timeout", 1))
monkeypatch.setattr(sg.subprocess, "run", _timeout)
res = sg.scan_secrets(_REPO, _BRANCH)
assert res.status == "error"
assert "timeout" in res.detail.lower()
fields = _verdict(res, _ok_deps())
assert fields["security_status"] == "FAIL"
# pip-audit timeout -> degrade (best-effort), not a hard error.
monkeypatch.setattr(sg, "get_worktree_path", lambda r, b: str(wt))
(wt / "requirements.txt").write_text("requests==2.0\n")
dep = sg.audit_dependencies(_REPO, _BRANCH)
assert dep.status == "degraded"
assert "timeout" in dep.detail.lower()
# ---------------------------------------------------------------------------
# Parser robustness (supports the above; pure, never raises)
# ---------------------------------------------------------------------------
def test_parse_gitleaks_report_tolerant():
assert sg.parse_gitleaks_report("") == []
assert sg.parse_gitleaks_report("not json") == []
assert sg.parse_gitleaks_report("{}") == []
parsed = sg.parse_gitleaks_report(
'[{"File":"a.py","RuleID":"key","StartLine":3,"Secret":"supersecretvalue"}]'
)
assert parsed[0]["file"] == "a.py"
assert parsed[0]["rule"] == "key"
# The secret value is masked, never re-leaked verbatim.
assert "supersecretvalue" not in parsed[0]["match"]
def test_parse_pip_audit_report_tolerant():
assert sg.parse_pip_audit_report("") == []
assert sg.parse_pip_audit_report("garbage") == []
doc = (
'{"dependencies":[{"name":"requests","version":"2.0",'
'"vulns":[{"id":"CVE-1","severity":"HIGH","fix_versions":["2.1"]}]}]}'
)
parsed = sg.parse_pip_audit_report(doc)
assert parsed[0]["package"] == "requests"
assert parsed[0]["severity"] == "HIGH"
# Missing severity -> UNKNOWN.
doc2 = '{"dependencies":[{"name":"x","version":"1","vulns":[{"id":"CVE-2"}]}]}'
assert sg.parse_pip_audit_report(doc2)[0]["severity"] == "UNKNOWN"