"""ORCH-022 / TC-01..TC-12: the security-gate leaf module (src/security_gate.py). These exercise the DETERMINISTIC core: the pure classifier / verdict / frontmatter helpers (no binaries needed) plus scan_secrets / audit_dependencies with the external scanners (gitleaks / pip-audit) mocked at subprocess.run. The integration of the gate into advance_stage is covered in tests/test_stage_engine_security_gate.py; the QG registry wiring in tests/test_qg_security.py. Contract under test (ADR-001 §7): * secrets are UNCONDITIONAL + offline -> a found secret blocks; a tool error is fail-closed (FAIL); * dependency audit is best-effort -> blocking only at/over the severity threshold; UNKNOWN / below-threshold -> warning; an unreachable feed degrades fail-open + warning by default, fail-closed only when configured; * the machine verdict lives ONLY in the YAML frontmatter (read-back == written); * never-raise: any internal error -> (False, reason), no exception escapes. """ import os import subprocess os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import pytest # noqa: E402 from src import security_gate as sg # noqa: E402 _REPO = "orchestrator" _BRANCH = "feature/ORCH-022-x" _WI = "ORCH-022" # --------------------------------------------------------------------------- # Builders for the result containers (no binaries needed). # --------------------------------------------------------------------------- def _clean_secret(): return sg.SecretScanResult(status="clean", detail="no secrets found") def _found_secret(n=1): findings = [ {"file": "src/config.py", "rule": "generic-api-key", "line": 12 + i, "match": "abcd…yz"} for i in range(n) ] return sg.SecretScanResult(status="found", findings=findings, detail=f"{n} secret(s)") def _ok_deps(findings=None): return sg.DepAuditResult(status="ok", findings=findings or [], detail="ok") def _degraded_deps(): return sg.DepAuditResult(status="degraded", detail="pip-audit feed unavailable") def _verdict(secret, dep, *, secrets_block=True, dep_block_severity="HIGH", dep_fail_closed=False): return sg.compute_verdict( secret, dep, secrets_block=secrets_block, dep_block_severity=dep_block_severity, dep_fail_closed=dep_fail_closed, ) # --------------------------------------------------------------------------- # TC-01 / TC-02 / TC-03 — secret-scanning (FR-1 / AC-1..AC-3) # --------------------------------------------------------------------------- def test_tc01_secret_in_diff_fails(): """TC-01: a planted secret -> FAIL, secrets_found>=1, reason names the finding.""" fields = _verdict(_found_secret(1), _ok_deps()) assert fields["security_status"] == "FAIL" assert fields["secrets_found"] >= 1 # The reason must name the finding substance (rule + file), not just "FAIL". assert "generic-api-key" in fields["reason"] assert "src/config.py" in fields["reason"] def test_tc02_clean_branch_passes(): """TC-02: a clean branch -> PASS, secrets_found=0.""" fields = _verdict(_clean_secret(), _ok_deps()) assert fields["security_status"] == "PASS" assert fields["secrets_found"] == 0 assert fields["deps_blocking"] == 0 def test_tc03_allowlisted_match_does_not_fail(monkeypatch, tmp_path): """TC-03: an allowlisted match (placeholder / fixture) is filtered by gitleaks (rc=0) -> scan_secrets reports clean -> PASS. The allowlist lives in the versioned .gitleaks.toml; here we assert the gate honours gitleaks' rc=0.""" wt = tmp_path / "wt" wt.mkdir() monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt)) def _fake_run(cmd, **kwargs): # `git fetch` and `gitleaks detect` both routed here; both "succeed clean". return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") monkeypatch.setattr(sg.subprocess, "run", _fake_run) res = sg.scan_secrets(_REPO, _BRANCH) assert res.status == "clean" fields = _verdict(res, _ok_deps()) assert fields["security_status"] == "PASS" # --------------------------------------------------------------------------- # TC-04..TC-07 — dependency audit + thresholds (FR-2 / AC-4..AC-7) # --------------------------------------------------------------------------- def test_tc04_high_cve_at_high_threshold_blocks(): """TC-04: a HIGH/CRITICAL CVE at threshold HIGH -> FAIL, deps_blocking>=1.""" deps = _ok_deps([ {"package": "requests", "version": "2.0.0", "id": "CVE-1", "severity": "HIGH", "fix": "2.1"}, {"package": "urllib3", "version": "1.0.0", "id": "CVE-2", "severity": "CRITICAL", "fix": "1.1"}, ]) fields = _verdict(_clean_secret(), deps, dep_block_severity="HIGH") assert fields["security_status"] == "FAIL" assert fields["deps_blocking"] >= 1 assert "CVE-1" in fields["reason"] or "CVE-2" in fields["reason"] def test_tc05_only_medium_low_warns_passes(): """TC-05: only MEDIUM/LOW vulns -> PASS, deps_warning>=1, findings in the body.""" deps = _ok_deps([ {"package": "jinja2", "version": "2.0", "id": "CVE-M", "severity": "MEDIUM", "fix": "2.1"}, {"package": "click", "version": "7.0", "id": "CVE-L", "severity": "LOW", "fix": ""}, ]) fields = _verdict(_clean_secret(), deps, dep_block_severity="HIGH") assert fields["security_status"] == "PASS" assert fields["deps_warning"] >= 1 assert fields["deps_blocking"] == 0 body = sg.render_security_report(_WI, fields) assert "CVE-M" in body and "CVE-L" in body def test_tc06_threshold_config_changes_classification(): """TC-06: severity=CRITICAL makes a HIGH CVE a warning; severity=HIGH blocks it.""" assert sg.classify_severity("HIGH", "CRITICAL") == "warning" assert sg.classify_severity("HIGH", "HIGH") == "block" assert sg.classify_severity("CRITICAL", "CRITICAL") == "block" # UNKNOWN is ALWAYS a warning, never an auto-block (anti-loop, Р-4). assert sg.classify_severity("UNKNOWN", "LOW") == "warning" assert sg.classify_severity("", "HIGH") == "warning" deps = _ok_deps([ {"package": "x", "version": "1", "id": "CVE-H", "severity": "HIGH", "fix": ""}, ]) at_critical = _verdict(_clean_secret(), deps, dep_block_severity="CRITICAL") at_high = _verdict(_clean_secret(), deps, dep_block_severity="HIGH") assert at_critical["security_status"] == "PASS" assert at_critical["deps_warning"] == 1 assert at_high["security_status"] == "FAIL" assert at_high["deps_blocking"] == 1 def test_tc07_degraded_feed_failopen_default_failclosed_strict(): """TC-07: an unreachable CVE feed degrades fail-open + warning by default (no exception, no false FAIL); fail-closed -> FAIL only when configured.""" default = _verdict(_clean_secret(), _degraded_deps(), dep_fail_closed=False) assert default["security_status"] == "PASS" assert default["deps_audit_degraded"] is True strict = _verdict(_clean_secret(), _degraded_deps(), dep_fail_closed=True) assert strict["security_status"] == "FAIL" assert strict["deps_audit_degraded"] is True assert "fail-closed" in strict["reason"] # --------------------------------------------------------------------------- # TC-08..TC-10 — verdict / frontmatter parser + artefact (FR-3 / AC-8..AC-10) # --------------------------------------------------------------------------- def test_tc08_verdict_only_from_frontmatter(): """TC-08: the verdict is read ONLY from the YAML frontmatter; prose in the body does not influence it; the negative (FAIL) token is authoritative.""" # Frontmatter PASS but body screams FAIL -> still PASS (prose ignored). pass_fm = ( "---\nsecurity_status: PASS\nsecrets_found: 0\n---\n" "# Report\nThis build totally FAILED everything, FAIL FAIL.\n" ) ok, reason = sg.parse_security_status(pass_fm) assert ok is True assert "PASS" in reason # Frontmatter FAIL but body says PASS -> FAIL (negative token authoritative). fail_fm = "---\nsecurity_status: FAIL\n---\nEverything PASS, looks great!\n" ok, reason = sg.parse_security_status(fail_fm) assert ok is False assert "FAIL" in reason def test_tc09_missing_or_broken_frontmatter_failclosed(): """TC-09: no frontmatter / broken YAML / missing field -> (False, reason).""" # No frontmatter at all. ok, reason = sg.parse_security_status("# Just a body, no frontmatter\nPASS\n") assert ok is False and reason # Frontmatter present but no security_status field. ok, reason = sg.parse_security_status("---\nother: 1\n---\nbody\n") assert ok is False # Broken YAML in the frontmatter. ok, reason = sg.parse_security_status("---\nsecurity_status: : : [bad\n---\nbody\n") assert ok is False def test_orch076_parse_security_status_via_unified_api(): """ORCH-076 TC-12: parse_security_status now reads through the unified frontmatter primitive; the PASS/FAIL semantics are 1:1 with before, and an old report WITHOUT the new schema fields still reads exactly the same.""" from src import frontmatter as fm # Delegates to the single parse primitive (no private duplicated parse). assert "parse_frontmatter" in sg.parse_security_status.__doc__ # PASS / FAIL semantics preserved. assert sg.parse_security_status("---\nsecurity_status: PASS\n---\n")[0] is True assert sg.parse_security_status("---\nsecurity_status: FAIL\n---\n")[0] is False # An additive full schema does not change the verdict (FR-5 / AC-4). schema = ( "work_item: ORCH-076\nstage: deploy-staging\nauthor_agent: deployer\n" "status: PASS\ncreated_at: 2026-06-09\nmodel_used: claude-opus-4-8\n" ) with_schema = fm.render_frontmatter( {**{k.split(":")[0]: v for k, v in (line.split(": ", 1) for line in schema.strip().splitlines())}, "security_status": "PASS"} ) assert sg.parse_security_status(with_schema)[0] is True def test_tc10_artifact_has_valid_frontmatter_and_body(tmp_path, monkeypatch): """TC-10: 17-security-report.md is written with valid frontmatter (all machine fields) and a body listing the findings; read-back == the written verdict.""" wt = tmp_path / "wt" wt.mkdir() monkeypatch.setattr(sg, "get_worktree_path", lambda r, b: str(wt)) monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt)) deps = _ok_deps([ {"package": "requests", "version": "2.0", "id": "CVE-X", "severity": "HIGH", "fix": "2.1"}, {"package": "click", "version": "7.0", "id": "CVE-L", "severity": "LOW", "fix": ""}, ]) fields = _verdict(_found_secret(1), deps, dep_block_severity="HIGH") path = sg.write_security_report(_REPO, _WI, _BRANCH, fields) assert os.path.isfile(path) with open(path, encoding="utf-8") as f: content = f.read() # Frontmatter carries every machine field. for key in ("security_status", "secrets_found", "deps_blocking", "deps_warning", "deps_audit_degraded"): assert f"{key}:" in content # Body lists findings. assert "CVE-X" in content and "CVE-L" in content # Read-back agrees with the computed status (single source of truth, AC-8). ok, _ = sg.parse_security_status(content) assert ok is (fields["security_status"] == "PASS") assert ok is False # this fixture is a FAIL (secret + HIGH CVE) # --------------------------------------------------------------------------- # TC-11 / TC-12 — never-raise / timeout (FR-5/FR-6 / AC-14..AC-17) # --------------------------------------------------------------------------- def test_tc11_missing_binary_failclosed_never_raises(monkeypatch, tmp_path): """TC-11: a missing scanner binary / internal exception -> error -> FAIL (fail-closed for secrets), and the exception never propagates.""" wt = tmp_path / "wt" wt.mkdir() monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt)) def _raise_fnf(cmd, **kwargs): # git fetch ok, gitleaks missing. if cmd[:1] == ["git"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") raise FileNotFoundError("gitleaks") monkeypatch.setattr(sg.subprocess, "run", _raise_fnf) res = sg.scan_secrets(_REPO, _BRANCH) assert res.status == "error" fields = _verdict(res, _ok_deps()) assert fields["security_status"] == "FAIL" # fail-closed, BR-2 assert "fail-closed" in fields["reason"] # check_security_gate as a whole never raises even if everything explodes. monkeypatch.setattr(sg, "security_gate_applies", lambda r: True) def _boom(*a, **k): raise RuntimeError("kaboom") monkeypatch.setattr(sg, "scan_secrets", _boom) ok, reason = sg.check_security_gate(_REPO, _WI, _BRANCH) assert ok is False assert "error" in reason.lower() def test_tc12_timeout_is_deterministic_failclosed(monkeypatch, tmp_path): """TC-12: exceeding the scan timeout -> a deterministic error verdict, no hang.""" wt = tmp_path / "wt" wt.mkdir() monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt)) def _timeout(cmd, **kwargs): if cmd[:1] == ["git"]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") raise subprocess.TimeoutExpired(cmd, kwargs.get("timeout", 1)) monkeypatch.setattr(sg.subprocess, "run", _timeout) res = sg.scan_secrets(_REPO, _BRANCH) assert res.status == "error" assert "timeout" in res.detail.lower() fields = _verdict(res, _ok_deps()) assert fields["security_status"] == "FAIL" # pip-audit timeout -> degrade (best-effort), not a hard error. monkeypatch.setattr(sg, "get_worktree_path", lambda r, b: str(wt)) (wt / "requirements.txt").write_text("requests==2.0\n") dep = sg.audit_dependencies(_REPO, _BRANCH) assert dep.status == "degraded" assert "timeout" in dep.detail.lower() # --------------------------------------------------------------------------- # Parser robustness (supports the above; pure, never raises) # --------------------------------------------------------------------------- def test_parse_gitleaks_report_tolerant(): assert sg.parse_gitleaks_report("") == [] assert sg.parse_gitleaks_report("not json") == [] assert sg.parse_gitleaks_report("{}") == [] parsed = sg.parse_gitleaks_report( '[{"File":"a.py","RuleID":"key","StartLine":3,"Secret":"supersecretvalue"}]' ) assert parsed[0]["file"] == "a.py" assert parsed[0]["rule"] == "key" # The secret value is masked, never re-leaked verbatim. assert "supersecretvalue" not in parsed[0]["match"] def test_parse_pip_audit_report_tolerant(): assert sg.parse_pip_audit_report("") == [] assert sg.parse_pip_audit_report("garbage") == [] doc = ( '{"dependencies":[{"name":"requests","version":"2.0",' '"vulns":[{"id":"CVE-1","severity":"HIGH","fix_versions":["2.1"]}]}]}' ) parsed = sg.parse_pip_audit_report(doc) assert parsed[0]["package"] == "requests" assert parsed[0]["severity"] == "HIGH" # Missing severity -> UNKNOWN. doc2 = '{"dependencies":[{"name":"x","version":"1","vulns":[{"id":"CVE-2"}]}]}' assert sg.parse_pip_audit_report(doc2)[0]["severity"] == "UNKNOWN"