feat(security): security-gate (gitleaks secret-scan + pip-audit) before merge
Some checks failed
CI / test (push) Failing after 19s
CI / test (pull_request) Failing after 18s

Add a deterministic (no-LLM) security sub-gate on the deploy-staging -> deploy
edge, run FIRST (before merge-gate ORCH-043 and image-freshness ORCH-058) so it
fails cheaply before any expensive rebase/rebuild, and scans origin/main..HEAD
before rebase so a task is never blamed for a CVE introduced by an updated main.

Why: the autonomous pipeline merged branches into main with no check for a leaked
secret or a vulnerable dependency. For the self-hosting orchestrator (one shared
prod instance serving every project from a shared DB) a single leak/CVE landed in
the prod of all projects (CLAUDE.md self-hosting, section 8).

- New leaf src/security_gate.py (never-raise): gitleaks (offline, fail-closed on
  tool error => secrets guarantee is unconditional) + pip-audit (best-effort;
  unreachable CVE feed degrades fail-open + loud warning by default, strict via
  security_dep_audit_fail_closed). Verdict lives ONLY in 17-security-report.md
  YAML frontmatter (write -> read-back single source of truth); FAIL is
  authoritative; missing/broken frontmatter => fail-closed.
- check_security_gate thin wrapper registered in QG_CHECKS (lazy import, no cycle).
- _handle_security_gate wired FIRST in advance_stage deploy-staging block: FAIL ->
  rollback to development + developer-retry (cap MAX_DEVELOPER_RETRIES); task_desc
  carries verbatim findings (ORCH-046 pattern). No merge-lease release (runs before
  lease acquire). Self-hosting safe: only reads/scans/writes, never deploys.
- Conditional rollout (security_gate_enabled + security_gate_repos; empty scope ->
  self-hosting only). 6 new ORCH_SECURITY_* settings.
- Infra: pinned gitleaks Go binary in Dockerfile (+curl/ca-certificates), pip-audit
  in requirements.txt, versioned .gitleaks.toml at repo root.
- STAGE_TRANSITIONS and DB schema unchanged.

Docs: docs/architecture/README.md (marked realized), CLAUDE.md (artifact 17),
CHANGELOG.md. Tests: test_security_gate.py, test_qg_security.py,
test_stage_engine_security_gate.py + updated registry/edge snapshots.

Refs: ORCH-022

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-07 17:23:13 +00:00
parent 834cbdf875
commit 10d5e9fcaa
18 changed files with 1643 additions and 6 deletions

View File

@@ -235,6 +235,7 @@ def test_tc19_qg_checks_registry_unchanged():
"check_staging_status",
"check_branch_mergeable",
"check_staging_image_fresh",
"check_security_gate",
}

View File

@@ -101,6 +101,7 @@ def test_tc05_no_approve_does_not_call_prod_hook(monkeypatch):
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _pass,
"check_staging_image_fresh": _pass},
)

View File

@@ -30,6 +30,7 @@ _EXPECTED_QGS = {
"check_staging_status",
"check_branch_mergeable", # ORCH-043 merge-gate (deploy-staging -> deploy edge)
"check_staging_image_fresh", # ORCH-058 image-freshness sub-gate (same edge)
"check_security_gate", # ORCH-022 security sub-gate (same edge, run FIRST)
}

113
tests/test_qg_security.py Normal file
View File

@@ -0,0 +1,113 @@
"""ORCH-022 / TC-13..TC-15: the security-gate QG wrapper + registry wiring.
Covers the thin ``check_security_gate`` registry wrapper in src/qg/checks.py (its
conditionality fast-paths) and that the new check is registered + dispatched by
``_run_qg``. The deterministic core (scan / verdict / frontmatter) is covered in
tests/test_security_gate.py.
"""
import os
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from src import security_gate as sg # noqa: E402
from src.qg import checks as qg # noqa: E402
from src.qg.checks import QG_CHECKS, check_security_gate # noqa: E402
_WI = "ORCH-022"
_BRANCH = "feature/ORCH-022-x"
# ---------------------------------------------------------------------------
# TC-13 — non-self repo with empty scope -> N/A fast pass (no scanner run).
# ---------------------------------------------------------------------------
def test_tc13_non_self_repo_empty_scope_is_na(monkeypatch):
"""TC-13: a non-self repo with an empty scope -> (True, 'security-gate N/A
for <repo>') immediately, WITHOUT invoking the scanners."""
monkeypatch.setattr(sg.settings, "security_gate_enabled", True)
monkeypatch.setattr(sg.settings, "security_gate_repos", "")
called = {"scan": False}
def _should_not_run(*a, **k):
called["scan"] = True
raise AssertionError("scanner must not run for an N/A repo")
monkeypatch.setattr(sg, "scan_secrets", _should_not_run)
monkeypatch.setattr(sg, "audit_dependencies", _should_not_run)
ok, reason = check_security_gate("enduro-trails", _WI, _BRANCH)
assert ok is True
assert "N/A" in reason
assert "enduro-trails" in reason
assert called["scan"] is False
# ---------------------------------------------------------------------------
# TC-14 — kill-switch disabled -> no-op pass.
# ---------------------------------------------------------------------------
def test_tc14_disabled_is_noop_pass(monkeypatch):
"""TC-14: ORCH_SECURITY_GATE_ENABLED=false -> no-op pass (True), scanners untouched."""
monkeypatch.setattr(sg.settings, "security_gate_enabled", False)
def _should_not_run(*a, **k):
raise AssertionError("scanner must not run when the gate is disabled")
monkeypatch.setattr(sg, "scan_secrets", _should_not_run)
monkeypatch.setattr(sg, "audit_dependencies", _should_not_run)
ok, reason = check_security_gate("orchestrator", _WI, _BRANCH)
assert ok is True
assert "disabled" in reason.lower()
# ---------------------------------------------------------------------------
# TC-15 — registered in QG_CHECKS + dispatched by _run_qg.
# ---------------------------------------------------------------------------
def test_tc15_registered_in_qg_checks():
"""TC-15a: the new check is registered and callable."""
assert "check_security_gate" in QG_CHECKS
assert QG_CHECKS["check_security_gate"] is check_security_gate
assert callable(QG_CHECKS["check_security_gate"])
def test_tc15_dispatched_by_run_qg(monkeypatch):
"""TC-15b: _run_qg routes 'check_security_gate' with the (repo, work_item_id,
branch) signature to the registered wrapper."""
from src import stage_engine
captured = {}
def _fake(repo, work_item_id, branch):
captured["args"] = (repo, work_item_id, branch)
return True, "ok"
monkeypatch.setitem(stage_engine.QG_CHECKS, "check_security_gate", _fake)
passed, reason = stage_engine._run_qg("check_security_gate", "orchestrator", _WI, _BRANCH)
assert passed is True
assert captured["args"] == ("orchestrator", _WI, _BRANCH)
def test_security_gate_applies_scope(monkeypatch):
"""Conditionality matrix mirrors merge_gate_applies / image_freshness_applies."""
monkeypatch.setattr(sg.settings, "security_gate_enabled", True)
# Empty scope -> only the self-hosting repo.
monkeypatch.setattr(sg.settings, "security_gate_repos", "")
assert sg.security_gate_applies("orchestrator") is True
assert sg.security_gate_applies("enduro-trails") is False
# Explicit CSV scope -> only the listed repos (case-insensitive).
monkeypatch.setattr(sg.settings, "security_gate_repos", "enduro-trails, foo")
assert sg.security_gate_applies("enduro-trails") is True
assert sg.security_gate_applies("orchestrator") is False
# Kill-switch wins over everything.
monkeypatch.setattr(sg.settings, "security_gate_enabled", False)
assert sg.security_gate_applies("orchestrator") is False
def test_qg_wrapper_delegates(monkeypatch):
"""The QG wrapper delegates to security_gate.check_security_gate verbatim."""
monkeypatch.setattr(sg, "check_security_gate", lambda r, w, b: (False, "delegated FAIL"))
ok, reason = check_security_gate("orchestrator", _WI, _BRANCH)
assert ok is False
assert reason == "delegated FAIL"

324
tests/test_security_gate.py Normal file
View File

@@ -0,0 +1,324 @@
"""ORCH-022 / TC-01..TC-12: the security-gate leaf module (src/security_gate.py).
These exercise the DETERMINISTIC core: the pure classifier / verdict / frontmatter
helpers (no binaries needed) plus scan_secrets / audit_dependencies with the
external scanners (gitleaks / pip-audit) mocked at subprocess.run. The integration
of the gate into advance_stage is covered in tests/test_stage_engine_security_gate.py;
the QG registry wiring in tests/test_qg_security.py.
Contract under test (ADR-001 §7):
* secrets are UNCONDITIONAL + offline -> a found secret blocks; a tool error is
fail-closed (FAIL);
* dependency audit is best-effort -> blocking only at/over the severity threshold;
UNKNOWN / below-threshold -> warning; an unreachable feed degrades fail-open +
warning by default, fail-closed only when configured;
* the machine verdict lives ONLY in the YAML frontmatter (read-back == written);
* never-raise: any internal error -> (False, reason), no exception escapes.
"""
import os
import subprocess
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import pytest # noqa: E402
from src import security_gate as sg # noqa: E402
_REPO = "orchestrator"
_BRANCH = "feature/ORCH-022-x"
_WI = "ORCH-022"
# ---------------------------------------------------------------------------
# Builders for the result containers (no binaries needed).
# ---------------------------------------------------------------------------
def _clean_secret():
return sg.SecretScanResult(status="clean", detail="no secrets found")
def _found_secret(n=1):
findings = [
{"file": "src/config.py", "rule": "generic-api-key", "line": 12 + i, "match": "abcd…yz"}
for i in range(n)
]
return sg.SecretScanResult(status="found", findings=findings, detail=f"{n} secret(s)")
def _ok_deps(findings=None):
return sg.DepAuditResult(status="ok", findings=findings or [], detail="ok")
def _degraded_deps():
return sg.DepAuditResult(status="degraded", detail="pip-audit feed unavailable")
def _verdict(secret, dep, *, secrets_block=True, dep_block_severity="HIGH", dep_fail_closed=False):
return sg.compute_verdict(
secret, dep,
secrets_block=secrets_block,
dep_block_severity=dep_block_severity,
dep_fail_closed=dep_fail_closed,
)
# ---------------------------------------------------------------------------
# TC-01 / TC-02 / TC-03 — secret-scanning (FR-1 / AC-1..AC-3)
# ---------------------------------------------------------------------------
def test_tc01_secret_in_diff_fails():
"""TC-01: a planted secret -> FAIL, secrets_found>=1, reason names the finding."""
fields = _verdict(_found_secret(1), _ok_deps())
assert fields["security_status"] == "FAIL"
assert fields["secrets_found"] >= 1
# The reason must name the finding substance (rule + file), not just "FAIL".
assert "generic-api-key" in fields["reason"]
assert "src/config.py" in fields["reason"]
def test_tc02_clean_branch_passes():
"""TC-02: a clean branch -> PASS, secrets_found=0."""
fields = _verdict(_clean_secret(), _ok_deps())
assert fields["security_status"] == "PASS"
assert fields["secrets_found"] == 0
assert fields["deps_blocking"] == 0
def test_tc03_allowlisted_match_does_not_fail(monkeypatch, tmp_path):
"""TC-03: an allowlisted match (placeholder / fixture) is filtered by gitleaks
(rc=0) -> scan_secrets reports clean -> PASS. The allowlist lives in the
versioned .gitleaks.toml; here we assert the gate honours gitleaks' rc=0."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
def _fake_run(cmd, **kwargs):
# `git fetch` and `gitleaks detect` both routed here; both "succeed clean".
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
monkeypatch.setattr(sg.subprocess, "run", _fake_run)
res = sg.scan_secrets(_REPO, _BRANCH)
assert res.status == "clean"
fields = _verdict(res, _ok_deps())
assert fields["security_status"] == "PASS"
# ---------------------------------------------------------------------------
# TC-04..TC-07 — dependency audit + thresholds (FR-2 / AC-4..AC-7)
# ---------------------------------------------------------------------------
def test_tc04_high_cve_at_high_threshold_blocks():
"""TC-04: a HIGH/CRITICAL CVE at threshold HIGH -> FAIL, deps_blocking>=1."""
deps = _ok_deps([
{"package": "requests", "version": "2.0.0", "id": "CVE-1", "severity": "HIGH", "fix": "2.1"},
{"package": "urllib3", "version": "1.0.0", "id": "CVE-2", "severity": "CRITICAL", "fix": "1.1"},
])
fields = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
assert fields["security_status"] == "FAIL"
assert fields["deps_blocking"] >= 1
assert "CVE-1" in fields["reason"] or "CVE-2" in fields["reason"]
def test_tc05_only_medium_low_warns_passes():
"""TC-05: only MEDIUM/LOW vulns -> PASS, deps_warning>=1, findings in the body."""
deps = _ok_deps([
{"package": "jinja2", "version": "2.0", "id": "CVE-M", "severity": "MEDIUM", "fix": "2.1"},
{"package": "click", "version": "7.0", "id": "CVE-L", "severity": "LOW", "fix": ""},
])
fields = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
assert fields["security_status"] == "PASS"
assert fields["deps_warning"] >= 1
assert fields["deps_blocking"] == 0
body = sg.render_security_report(_WI, fields)
assert "CVE-M" in body and "CVE-L" in body
def test_tc06_threshold_config_changes_classification():
"""TC-06: severity=CRITICAL makes a HIGH CVE a warning; severity=HIGH blocks it."""
assert sg.classify_severity("HIGH", "CRITICAL") == "warning"
assert sg.classify_severity("HIGH", "HIGH") == "block"
assert sg.classify_severity("CRITICAL", "CRITICAL") == "block"
# UNKNOWN is ALWAYS a warning, never an auto-block (anti-loop, Р-4).
assert sg.classify_severity("UNKNOWN", "LOW") == "warning"
assert sg.classify_severity("", "HIGH") == "warning"
deps = _ok_deps([
{"package": "x", "version": "1", "id": "CVE-H", "severity": "HIGH", "fix": ""},
])
at_critical = _verdict(_clean_secret(), deps, dep_block_severity="CRITICAL")
at_high = _verdict(_clean_secret(), deps, dep_block_severity="HIGH")
assert at_critical["security_status"] == "PASS"
assert at_critical["deps_warning"] == 1
assert at_high["security_status"] == "FAIL"
assert at_high["deps_blocking"] == 1
def test_tc07_degraded_feed_failopen_default_failclosed_strict():
"""TC-07: an unreachable CVE feed degrades fail-open + warning by default (no
exception, no false FAIL); fail-closed -> FAIL only when configured."""
default = _verdict(_clean_secret(), _degraded_deps(), dep_fail_closed=False)
assert default["security_status"] == "PASS"
assert default["deps_audit_degraded"] is True
strict = _verdict(_clean_secret(), _degraded_deps(), dep_fail_closed=True)
assert strict["security_status"] == "FAIL"
assert strict["deps_audit_degraded"] is True
assert "fail-closed" in strict["reason"]
# ---------------------------------------------------------------------------
# TC-08..TC-10 — verdict / frontmatter parser + artefact (FR-3 / AC-8..AC-10)
# ---------------------------------------------------------------------------
def test_tc08_verdict_only_from_frontmatter():
"""TC-08: the verdict is read ONLY from the YAML frontmatter; prose in the body
does not influence it; the negative (FAIL) token is authoritative."""
# Frontmatter PASS but body screams FAIL -> still PASS (prose ignored).
pass_fm = (
"---\nsecurity_status: PASS\nsecrets_found: 0\n---\n"
"# Report\nThis build totally FAILED everything, FAIL FAIL.\n"
)
ok, reason = sg.parse_security_status(pass_fm)
assert ok is True
assert "PASS" in reason
# Frontmatter FAIL but body says PASS -> FAIL (negative token authoritative).
fail_fm = "---\nsecurity_status: FAIL\n---\nEverything PASS, looks great!\n"
ok, reason = sg.parse_security_status(fail_fm)
assert ok is False
assert "FAIL" in reason
def test_tc09_missing_or_broken_frontmatter_failclosed():
"""TC-09: no frontmatter / broken YAML / missing field -> (False, reason)."""
# No frontmatter at all.
ok, reason = sg.parse_security_status("# Just a body, no frontmatter\nPASS\n")
assert ok is False and reason
# Frontmatter present but no security_status field.
ok, reason = sg.parse_security_status("---\nother: 1\n---\nbody\n")
assert ok is False
# Broken YAML in the frontmatter.
ok, reason = sg.parse_security_status("---\nsecurity_status: : : [bad\n---\nbody\n")
assert ok is False
def test_tc10_artifact_has_valid_frontmatter_and_body(tmp_path, monkeypatch):
"""TC-10: 17-security-report.md is written with valid frontmatter (all machine
fields) and a body listing the findings; read-back == the written verdict."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "get_worktree_path", lambda r, b: str(wt))
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
deps = _ok_deps([
{"package": "requests", "version": "2.0", "id": "CVE-X", "severity": "HIGH", "fix": "2.1"},
{"package": "click", "version": "7.0", "id": "CVE-L", "severity": "LOW", "fix": ""},
])
fields = _verdict(_found_secret(1), deps, dep_block_severity="HIGH")
path = sg.write_security_report(_REPO, _WI, _BRANCH, fields)
assert os.path.isfile(path)
with open(path, encoding="utf-8") as f:
content = f.read()
# Frontmatter carries every machine field.
for key in ("security_status", "secrets_found", "deps_blocking", "deps_warning",
"deps_audit_degraded"):
assert f"{key}:" in content
# Body lists findings.
assert "CVE-X" in content and "CVE-L" in content
# Read-back agrees with the computed status (single source of truth, AC-8).
ok, _ = sg.parse_security_status(content)
assert ok is (fields["security_status"] == "PASS")
assert ok is False # this fixture is a FAIL (secret + HIGH CVE)
# ---------------------------------------------------------------------------
# TC-11 / TC-12 — never-raise / timeout (FR-5/FR-6 / AC-14..AC-17)
# ---------------------------------------------------------------------------
def test_tc11_missing_binary_failclosed_never_raises(monkeypatch, tmp_path):
"""TC-11: a missing scanner binary / internal exception -> error -> FAIL
(fail-closed for secrets), and the exception never propagates."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
def _raise_fnf(cmd, **kwargs):
# git fetch ok, gitleaks missing.
if cmd[:1] == ["git"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise FileNotFoundError("gitleaks")
monkeypatch.setattr(sg.subprocess, "run", _raise_fnf)
res = sg.scan_secrets(_REPO, _BRANCH)
assert res.status == "error"
fields = _verdict(res, _ok_deps())
assert fields["security_status"] == "FAIL" # fail-closed, BR-2
assert "fail-closed" in fields["reason"]
# check_security_gate as a whole never raises even if everything explodes.
monkeypatch.setattr(sg, "security_gate_applies", lambda r: True)
def _boom(*a, **k):
raise RuntimeError("kaboom")
monkeypatch.setattr(sg, "scan_secrets", _boom)
ok, reason = sg.check_security_gate(_REPO, _WI, _BRANCH)
assert ok is False
assert "error" in reason.lower()
def test_tc12_timeout_is_deterministic_failclosed(monkeypatch, tmp_path):
"""TC-12: exceeding the scan timeout -> a deterministic error verdict, no hang."""
wt = tmp_path / "wt"
wt.mkdir()
monkeypatch.setattr(sg, "ensure_worktree", lambda r, b: str(wt))
def _timeout(cmd, **kwargs):
if cmd[:1] == ["git"]:
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
raise subprocess.TimeoutExpired(cmd, kwargs.get("timeout", 1))
monkeypatch.setattr(sg.subprocess, "run", _timeout)
res = sg.scan_secrets(_REPO, _BRANCH)
assert res.status == "error"
assert "timeout" in res.detail.lower()
fields = _verdict(res, _ok_deps())
assert fields["security_status"] == "FAIL"
# pip-audit timeout -> degrade (best-effort), not a hard error.
monkeypatch.setattr(sg, "get_worktree_path", lambda r, b: str(wt))
(wt / "requirements.txt").write_text("requests==2.0\n")
dep = sg.audit_dependencies(_REPO, _BRANCH)
assert dep.status == "degraded"
assert "timeout" in dep.detail.lower()
# ---------------------------------------------------------------------------
# Parser robustness (supports the above; pure, never raises)
# ---------------------------------------------------------------------------
def test_parse_gitleaks_report_tolerant():
assert sg.parse_gitleaks_report("") == []
assert sg.parse_gitleaks_report("not json") == []
assert sg.parse_gitleaks_report("{}") == []
parsed = sg.parse_gitleaks_report(
'[{"File":"a.py","RuleID":"key","StartLine":3,"Secret":"supersecretvalue"}]'
)
assert parsed[0]["file"] == "a.py"
assert parsed[0]["rule"] == "key"
# The secret value is masked, never re-leaked verbatim.
assert "supersecretvalue" not in parsed[0]["match"]
def test_parse_pip_audit_report_tolerant():
assert sg.parse_pip_audit_report("") == []
assert sg.parse_pip_audit_report("garbage") == []
doc = (
'{"dependencies":[{"name":"requests","version":"2.0",'
'"vulns":[{"id":"CVE-1","severity":"HIGH","fix_versions":["2.1"]}]}]}'
)
parsed = sg.parse_pip_audit_report(doc)
assert parsed[0]["package"] == "requests"
assert parsed[0]["severity"] == "HIGH"
# Missing severity -> UNKNOWN.
doc2 = '{"dependencies":[{"name":"x","version":"1","vulns":[{"id":"CVE-2"}]}]}'
assert sg.parse_pip_audit_report(doc2)[0]["severity"] == "UNKNOWN"

View File

@@ -832,6 +832,7 @@ class TestMergeGate:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _pass,
"check_staging_image_fresh": _pass},
)
@@ -856,6 +857,7 @@ class TestMergeGate:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _fail("merge-lock busy")},
)
monkeypatch.setattr(stage_engine.settings, "merge_defer_delay_s", 30)
@@ -883,6 +885,7 @@ class TestMergeGate:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _fail("merge-lock busy")},
)
monkeypatch.setattr(stage_engine.settings, "merge_defer_max_attempts", 3)
@@ -916,6 +919,7 @@ class TestMergeGate:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _fail("rebase conflict: src/db.py")},
)
task_id = _make_task("deploy-staging", repo="orchestrator", wi="ORCH-043",
@@ -939,6 +943,7 @@ class TestMergeGate:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _fail("re-test failed after rebase: 1 failed")},
)
task_id = _make_task("deploy-staging", repo="orchestrator", wi="ORCH-043",
@@ -962,6 +967,7 @@ class TestMergeGate:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _fail("rebase conflict: src/db.py")},
)
task_id = _make_task("deploy-staging", repo="orchestrator", wi="ORCH-043",
@@ -1014,6 +1020,7 @@ class TestImageFreshnessGate:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _pass,
"check_staging_image_fresh": _fail(
"staging rebuild failed: health FAILED")},
@@ -1041,6 +1048,7 @@ class TestImageFreshnessGate:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _pass,
"check_staging_image_fresh": _fail("provenance mismatch")},
)
@@ -1064,6 +1072,7 @@ class TestImageFreshnessGate:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _pass,
"check_staging_image_fresh": _pass},
)
@@ -1089,6 +1098,7 @@ class TestImageFreshnessGate:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _pass},
) # check_staging_image_fresh left REAL -> N/A for enduro-trails
task_id = _make_task("deploy-staging", repo="enduro-trails", wi="ET-099",
@@ -1160,6 +1170,7 @@ class TestStagingInfraTolerance:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _pass,
"check_staging_image_fresh": _pass},
)
@@ -1232,6 +1243,7 @@ class TestStagingInfraTolerance:
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_security_gate": _pass,
"check_branch_mergeable": _pass,
"check_staging_image_fresh": _pass,
"check_deploy_status": _pass},

View File

@@ -0,0 +1,264 @@
"""ORCH-022 / TC-16..TC-19, TC-21: the security sub-gate wired into advance_stage.
These are integration tests over src.stage_engine.advance_stage on the
deploy-staging -> deploy edge. The security verdict is injected by patching the
QG_CHECKS registry entry (the leaf scanner logic is unit-tested in
tests/test_security_gate.py), so we exercise the ENGINE behaviour:
* FAIL -> rollback to development + enqueue developer + Plane comment + notify;
* the rollback task_desc carries the verbatim findings (ORCH-046 pattern);
* after MAX_DEVELOPER_RETRIES -> set_issue_blocked + Telegram, no bounce;
* PASS -> the pipeline advances normally (no rollback, no noisy notify);
* self-hosting safety: a FAIL never calls the deploy hook / restarts prod.
Network/Plane/Telegram side effects are mocked at the src.stage_engine level.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_security_gate.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
_BRANCH = "feature/ORCH-022-x"
# ---------------------------------------------------------------------------
# Fixtures (mirror tests/test_stage_engine.py)
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change",
"notify_qg_failure",
"notify_approve_requested",
"send_telegram",
"plane_notify_stage",
"plane_notify_qg",
"plane_add_comment",
"set_issue_in_review",
"set_issue_needs_input",
"set_issue_in_progress",
"set_issue_blocked",
"set_issue_done",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo, branch=_BRANCH, wi="ORCH-022"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _jobs():
conn = get_db()
rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall()
conn.close()
return [dict(r) for r in rows]
def _job_contents():
conn = get_db()
rows = conn.execute("SELECT task_content FROM jobs ORDER BY id").fetchall()
conn.close()
return [r[0] for r in rows]
def _add_developer_runs(task_id, n):
conn = get_db()
for _ in range(n):
conn.execute(
"INSERT INTO agent_runs (task_id, agent) VALUES (?, 'developer')",
(task_id,),
)
conn.commit()
conn.close()
def _pass(*a, **k):
return (True, "ok")
def _fail(reason):
def _f(*a, **k):
return (False, reason)
return _f
def _qg_with_security(monkeypatch, security_result):
"""Patch QG_CHECKS so every gate passes EXCEPT the security gate, which returns
``security_result``. Keeps the deploy-staging edge reachable (check_staging_status
passes) and isolates the security verdict under test."""
patched = {k: _pass for k in stage_engine.QG_CHECKS}
patched["check_security_gate"] = security_result
monkeypatch.setattr(stage_engine, "QG_CHECKS", patched)
# ---------------------------------------------------------------------------
# TC-16 — FAIL -> rollback to development + enqueue developer + notify.
# ---------------------------------------------------------------------------
def test_tc16_fail_rolls_back_and_enqueues_developer(monkeypatch):
"""TC-16: security_status FAIL -> rollback deploy-staging -> development,
enqueue developer, Plane comment + notify_qg_failure."""
_qg_with_security(monkeypatch, _fail("2 secret(s): aws-key in src/x.py:3"))
task_id = _make_task("deploy-staging", repo="enduro-trails")
res = advance_stage(
task_id, "deploy-staging", "enduro-trails", "ORCH-022", _BRANCH,
finished_agent="deployer",
)
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "developer"
assert res.qg_name == "check_security_gate"
# The deployer-authored Plane comment + the QG-failure notification fired.
assert stage_engine.plane_add_comment.called
assert stage_engine.notify_qg_failure.called
# ---------------------------------------------------------------------------
# TC-17 — the rollback task_desc carries the verbatim findings (ORCH-046).
# ---------------------------------------------------------------------------
def test_tc17_task_desc_has_verbatim_findings(monkeypatch):
"""TC-17: the re-launched developer's task_desc embeds the verbatim finding
substance (not just a link), following the ORCH-046 pattern."""
reason = "2 secret(s): aws-access-key in src/config.py:12"
_qg_with_security(monkeypatch, _fail(reason))
task_id = _make_task("deploy-staging", repo="enduro-trails")
# Seed a real 17-security-report.md in the worktree so extract_security_findings
# has a verbatim body to excerpt.
wt = stage_engine.get_worktree_path("enduro-trails", _BRANCH)
report_dir = os.path.join(wt, "docs", "work-items", "ORCH-022")
os.makedirs(report_dir, exist_ok=True)
with open(os.path.join(report_dir, "17-security-report.md"), "w", encoding="utf-8") as f:
f.write(
"---\nsecurity_status: FAIL\nsecrets_found: 1\n---\n"
"# Security Report — ORCH-022\n\n"
"## Verdict\n1 secret(s): aws-access-key in src/config.py:12\n\n"
"## Secrets\n- `src/config.py:12` — aws-access-key (match `AKIA…YZ`)\n\n"
"## Dependencies (blocking)\n- None\n"
)
advance_stage(
task_id, "deploy-staging", "enduro-trails", "ORCH-022", _BRANCH,
finished_agent="deployer",
)
contents = _job_contents()
assert len(contents) == 1
desc = contents[0]
# The verbatim reason AND the excerpted finding line are present.
assert "aws-access-key in src/config.py:12" in desc
assert "src/config.py:12" in desc
# Plus the link to the full artefact.
assert "17-security-report.md" in desc
# ---------------------------------------------------------------------------
# TC-18 — after MAX_DEVELOPER_RETRIES -> block + Telegram, no bounce.
# ---------------------------------------------------------------------------
def test_tc18_retry_cap_blocks_and_alerts(monkeypatch):
"""TC-18: after MAX_DEVELOPER_RETRIES developer attempts -> set_issue_blocked +
Telegram alert; no infinite bounce (no new developer job)."""
_qg_with_security(monkeypatch, _fail("blocking CVE"))
task_id = _make_task("deploy-staging", repo="enduro-trails")
_add_developer_runs(task_id, stage_engine.MAX_DEVELOPER_RETRIES)
res = advance_stage(
task_id, "deploy-staging", "enduro-trails", "ORCH-022", _BRANCH,
finished_agent="deployer",
)
assert res.rolled_back_to == "development"
assert res.alerted is True
assert stage_engine.set_issue_blocked.called
assert stage_engine.send_telegram.called
# No further developer job past the cap.
assert _jobs() == []
# ---------------------------------------------------------------------------
# TC-19 — PASS -> the pipeline advances normally.
# ---------------------------------------------------------------------------
def test_tc19_pass_advances_normally(monkeypatch):
"""TC-19: security_status PASS -> advance deploy-staging -> deploy with the
deployer launched, no rollback, no QG-failure notification."""
_qg_with_security(monkeypatch, lambda *a, **k: (True, "security clean"))
task_id = _make_task("deploy-staging", repo="enduro-trails")
res = advance_stage(
task_id, "deploy-staging", "enduro-trails", "ORCH-022", _BRANCH,
finished_agent="deployer",
)
assert res.advanced is True
assert res.to_stage == "deploy"
assert _stage(task_id) == "deploy"
assert res.rolled_back_to is None
# No noisy QG-failure notification on the happy path.
assert not stage_engine.notify_qg_failure.called
# ---------------------------------------------------------------------------
# TC-21 — self-hosting safety: a FAIL never deploys / restarts prod.
# ---------------------------------------------------------------------------
def test_tc21_fail_never_triggers_deploy(monkeypatch):
"""TC-21: on a security FAIL the gate only rolls back + enqueues developer; it
never calls the deploy hook / restarts the prod container (self-hosting safety)."""
_qg_with_security(monkeypatch, _fail("secret found"))
# Spy on the self-deploy entrypoints — none must be invoked on a FAIL.
monkeypatch.setattr(stage_engine.self_deploy, "initiate_deploy", MagicMock())
monkeypatch.setattr(stage_engine.self_deploy, "self_deploy_applies", MagicMock(return_value=True))
task_id = _make_task("deploy-staging", repo="orchestrator")
res = advance_stage(
task_id, "deploy-staging", "orchestrator", "ORCH-022", _BRANCH,
finished_agent="deployer",
)
assert res.rolled_back_to == "development"
# The security FAIL returns BEFORE the self-deploy block -> no deploy initiated.
assert not stage_engine.self_deploy.initiate_deploy.called
# Only the developer is re-enqueued; no deployer job.
jobs = _jobs()
assert all(j["agent"] == "developer" for j in jobs)