Files
orchestrator/src/coverage_gate.py
claude-bot 9647fe1ffb fix(coverage): use sys.executable for the pytest --cov subprocess (ORCH-027)
measure_coverage hardcoded "python" for the coverage subprocess; the prod container
and the CI runner expose "python3" (a bare "python" may be absent), and pytest-cov
lives in exactly the running interpreter's environment. Use sys.executable so the
measurement always runs under the same interpreter as the orchestrator.

Refs: ORCH-027
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 01:26:24 +03:00

621 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Coverage-gate core (ORCH-027): deterministic test-coverage gate before merge.
Background
----------
The orchestrator runs autonomous development: the ``developer`` agent writes code
with no human filter, and on ``testing`` the ``tester`` agent decides for itself
whether the tests are enough. The existing test gates judge only by the FACT of
passing, never by COMPLETENESS: ``check_ci_green`` and ``check_tests_passed`` and
the merge-gate re-test all look at a pytest exit code. None of them notices "300
lines of new code, 0 tests". Across a batch autonomous run (ORCH-088) that means a
monotonic erosion of coverage — every task shaves a corner on tests and the project
silently loses testability.
This module provides the deterministic (no-LLM) primitives that the quality-gate
``check_coverage_gate`` (src/qg/checks.py) composes on the ``deploy-staging ->
deploy`` edge — run **AFTER the merge-gate** (so coverage is measured on the
caught-up HEAD that actually lands in ``main``) and **BEFORE image-freshness** (fail
before the expensive docker rebuild), mirroring the security-gate (ORCH-022):
* ``measure_coverage`` -> run ``pytest --cov=src`` in the per-branch
worktree (offline) -> line coverage ``%`` or
``None`` on tool error.
* ``compute_coverage_verdict`` -> pure: compare (measured, baseline, floor) under
a policy + epsilon -> ``(ok, reason)``.
* ``write_coverage_report`` / ``parse_coverage_status`` -> write the
``18-coverage-report.md`` artefact and read its machine verdict back (single
source of truth: the gate returns exactly the frontmatter it wrote, AC-9).
* ``ratchet_baseline_on_merge`` -> on a CONFIRMED merge (``_handle_merge_verify``,
``deploy -> done`` edge) raise the per-repo baseline UP from the merged branch's
measured coverage (atomic compare-and-set, never decreases — FR-4 / D5).
* ``check_coverage_gate`` -> the orchestrating entry the QG wrapper delegates
to.
Invariants (ADR-001 §7, never broken):
* **Tool error -> fail-open + WARNING by default** (FR-6/AC-6): a coverage-tool
failure / unparseable metric degrades fail-open (anti-loop, precedent
ORCH-061/022 dep-audit); ``coverage_tool_fail_closed`` flips it to strict.
* **never-raise** (AC-7): any internal error is swallowed; an exception never
escapes into ``advance_stage``.
* **Baseline never decreases** (FR-4): the ratchet is an atomic SQL compare-and-set
under the held merge-lease (ORCH-043), so two parallel merges can never lower or
lose the value.
* **Self-hosting safety** (AC-7): the gate only measures / reads / writes the
artefact / decides. It never calls the deploy hook, never restarts the prod
container, never pushes / force-pushes ``main``.
This module is a **leaf**: it imports only ``config`` / ``git_worktree`` and lazily
``qg.checks.is_self_hosting_repo`` / ``db`` / ``notifications``; it never imports
``stage_engine``.
"""
import json
import logging
import os
import subprocess
import sys
from .config import settings
from .git_worktree import ensure_worktree, get_worktree_path
logger = logging.getLogger("orchestrator.coverage_gate")
# ---------------------------------------------------------------------------
# Conditionality (mirrors security_gate_applies / _merge_gate_applies)
# ---------------------------------------------------------------------------
def coverage_gate_applies(repo: str) -> bool:
"""Whether the coverage-gate is REAL for this repo (conditional rollout).
Mirrors the ORCH-22 / ORCH-43 / ORCH-58 pattern:
* ``coverage_gate_enabled=False`` -> always False (kill-switch; pipeline is
1:1 as before ORCH-027 for everyone).
* ``coverage_gate_repos`` (CSV) non-empty -> real only for the listed repos.
* empty CSV -> real ONLY for the self-hosting repo (``orchestrator``).
Never raises (AC-7): any error -> False (the safe no-op default).
"""
try:
if not settings.coverage_gate_enabled:
return False
raw = (settings.coverage_gate_repos or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
return (repo or "").strip().lower() in allowed
# Lazy import keeps this module a leaf (no qg import at module load).
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("coverage_gate_applies error for %s: %s", repo, e)
return False
# ---------------------------------------------------------------------------
# Measurement (pytest --cov=src in the per-branch worktree) — FR-1 / D2
# ---------------------------------------------------------------------------
def parse_coverage_percent(data) -> float | None:
"""Pure: extract ``totals.percent_covered`` (line coverage ``%``) from a
coverage.py JSON dict. Returns ``None`` if the shape is missing / unparseable.
Never raises.
"""
try:
if not isinstance(data, dict):
return None
totals = data.get("totals")
if not isinstance(totals, dict):
return None
pct = totals.get("percent_covered")
if pct is None:
return None
return float(pct)
except (TypeError, ValueError):
return None
def measure_coverage(repo: str, branch: str) -> float | None:
"""Run ``pytest --cov=src`` in the per-branch worktree -> line coverage ``%``.
Scope is ``src/`` only (the tests themselves are out of scope, BRD §«Вне
объёма»). Offline — coverage needs no network. The measurer is intentionally
encapsulated here so the pure decision logic and the baseline storage are
stack-agnostic (a future jest/jacoco measurer is a new ``measure_*`` branch,
BR-6).
The coverage metric is read from the ``--cov-report=json`` file regardless of
the pytest exit code: a non-zero exit because of *failing tests* is already
caught upstream (``check_ci_green`` / merge-gate re-test), and a partial run
still produces a meaningful coverage JSON. A genuine tool error (missing
plugin / timeout / no JSON / unparseable) -> ``None`` (the caller degrades
fail-open by default, FR-6). Never raises (AC-7).
"""
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("measure_coverage: worktree error for %s/%s: %s", repo, branch, e)
return None
cov_json = os.path.join(wt, ".coverage-report.json")
# Remove a stale report so we never read a previous pass's metric.
try:
if os.path.isfile(cov_json):
os.remove(cov_json)
except OSError:
pass
# Use the SAME interpreter that runs the orchestrator (sys.executable), not a
# bare "python" — the prod container / CI runner expose "python3", and the
# pytest-cov plugin lives in exactly this interpreter's environment.
cmd = [
sys.executable, "-m", "pytest", "tests/",
"--cov=src",
f"--cov-report=json:{cov_json}",
"--cov-report=", # suppress the terminal cov report (json only)
"-q",
]
timeout = settings.coverage_run_timeout_s
try:
subprocess.run(cmd, cwd=wt, capture_output=True, text=True, timeout=timeout)
except subprocess.TimeoutExpired:
logger.warning(
"measure_coverage: pytest --cov timed out after %ss for %s/%s",
timeout, repo, branch,
)
return None
except FileNotFoundError:
logger.warning(
"measure_coverage: pytest / pytest-cov not available for %s/%s", repo, branch
)
return None
except (subprocess.SubprocessError, OSError) as e:
logger.warning("measure_coverage: pytest --cov error for %s/%s: %s", repo, branch, e)
return None
data = None
try:
if not os.path.isfile(cov_json):
logger.warning(
"measure_coverage: no coverage json produced for %s/%s", repo, branch
)
return None
with open(cov_json, "r", encoding="utf-8") as f:
data = json.load(f)
except (OSError, ValueError) as e:
logger.warning(
"measure_coverage: cannot parse coverage json for %s/%s: %s", repo, branch, e
)
return None
finally:
try:
if os.path.isfile(cov_json):
os.remove(cov_json)
except OSError:
pass
return parse_coverage_percent(data)
# ---------------------------------------------------------------------------
# Pure decision (FR-2 / D3) — the core of the unit tests
# ---------------------------------------------------------------------------
def compute_coverage_verdict(measured, baseline, floor, policy, epsilon) -> tuple[bool, str]:
"""Pure: decide PASS/FAIL from (measured, baseline, floor, policy, epsilon).
Deterministic, no LLM, no I/O. Returns ``(ok: bool, reason: str)``.
* ``policy = "absolute"`` -> PASS ⇔ ``measured >= floor - epsilon``.
* ``policy = "baseline"`` -> PASS ⇔ ``measured >= baseline - epsilon``.
* ``policy = "both"`` (default) -> PASS ⇔ BOTH conditions hold.
* ``baseline is None`` (no stored baseline / bootstrap) -> the baseline
condition does NOT apply (cannot regress against nothing); only the
absolute part decides. For ``policy = "baseline"`` with no baseline this is
a bootstrap PASS (the measured value seeds the baseline at merge, D5).
* ``epsilon`` — a small non-negative tolerance so jitter at the boundary does
not bounce a task (NFR-4).
Never raises: bad inputs -> ``(False, reason)`` (a verdict cannot be computed ->
conservative FAIL for the pure function; the orchestrating entry maps a *tool*
error to fail-open separately).
"""
try:
pol = (policy or "both").strip().lower()
eps = max(0.0, float(epsilon if epsilon is not None else 0.0))
m = float(measured)
except (TypeError, ValueError) as e:
return False, f"coverage verdict: bad inputs ({e})"
abs_applicable = pol in ("absolute", "both")
base_applicable = pol in ("baseline", "both") and baseline is not None
checks: list[str] = []
ok = True
if abs_applicable:
try:
f = float(floor if floor is not None else 0.0)
except (TypeError, ValueError):
f = 0.0
abs_ok = m >= f - eps
checks.append(
f"absolute {m:.2f}% >= floor {f:.2f}%-eps{eps:.2f} -> "
f"{'PASS' if abs_ok else 'FAIL'}"
)
ok = ok and abs_ok
if base_applicable:
b = float(baseline)
base_ok = m >= b - eps
checks.append(
f"baseline {m:.2f}% >= base {b:.2f}%-eps{eps:.2f} -> "
f"{'PASS' if base_ok else 'FAIL'}"
)
ok = ok and base_ok
elif pol in ("baseline", "both") and baseline is None:
checks.append("baseline N/A (bootstrap — no stored baseline)")
body = "; ".join(checks) if checks else "no applicable condition (bootstrap) -> PASS"
reason = f"measured={m:.2f}% policy={pol} eps={eps:.2f}: {body}"
return ok, reason
def compute_delta(measured, baseline, floor) -> float:
"""Pure: signed ``measured - max(applicable references)`` (%, 2 decimals).
References are the present ones among ``baseline`` / ``floor``. With neither ->
``0.0``. Never raises.
"""
try:
m = float(measured)
refs = []
if baseline is not None:
refs.append(float(baseline))
if floor is not None:
refs.append(float(floor))
if not refs:
return 0.0
return round(m - max(refs), 2)
except (TypeError, ValueError):
return 0.0
# ---------------------------------------------------------------------------
# Artefact: write the report, read the machine verdict back (FR-7 / D7 / AC-9)
# ---------------------------------------------------------------------------
def _report_rel(work_item_id: str) -> str:
return f"docs/work-items/{work_item_id}/18-coverage-report.md"
def _report_path(repo: str, work_item_id: str, branch: str) -> str:
"""Absolute path of 18-coverage-report.md inside the task worktree."""
try:
wt = get_worktree_path(repo, branch)
if not os.path.isdir(wt):
wt = ensure_worktree(repo, branch)
except Exception: # noqa: BLE001 - never-raise; fall back to shared clone
wt = os.path.join(settings.repos_dir, repo)
return os.path.join(wt, _report_rel(work_item_id))
def _num(v) -> str:
"""Render a numeric field with 2 decimals, or empty for None/unparseable."""
if v is None:
return ""
try:
return f"{float(v):.2f}"
except (TypeError, ValueError):
return ""
def render_coverage_report(work_item_id: str, fields: dict) -> str:
"""Pure: render the 18-coverage-report.md content (frontmatter + body).
The machine verdict lives ONLY in the YAML frontmatter ``coverage_status:``
(canon, regiser-sensitive); ``measured_coverage`` is the single source of truth
for the ratchet (D5). Never raises.
"""
baseline = fields.get("baseline")
baseline_str = "" if baseline is None else _num(baseline)
return (
"---\n"
f"coverage_status: {fields.get('coverage_status', 'FAIL')}\n"
f"work_item: {work_item_id}\n"
f"measured_coverage: {_num(fields.get('measured_coverage'))}\n"
f"baseline: {baseline_str}\n"
f"floor: {_num(fields.get('floor'))}\n"
f"policy: {fields.get('policy', 'both')}\n"
f"epsilon: {_num(fields.get('epsilon'))}\n"
f"delta: {_num(fields.get('delta'))}\n"
"---\n"
f"# Coverage Report — {work_item_id}\n\n"
"Детерминированный гейт покрытия (ORCH-027) — под-гейт ребра "
"`deploy-staging→deploy` (ПОСЛЕ merge-gate, ДО image-freshness). Машинный "
"вердикт читается ТОЛЬКО из `coverage_status:` frontmatter выше.\n\n"
"## Verdict\n"
f"{fields.get('reason', '')}\n\n"
"## Measurement\n"
f"{fields.get('measurement', '')}\n\n"
"## Policy\n"
f"{fields.get('policy_detail', '')}\n"
)
def write_coverage_report(repo: str, work_item_id: str, branch: str, fields: dict) -> str:
"""Write 18-coverage-report.md into the task worktree; return its path.
Best-effort / never-raise: a write error is logged and the path is still
returned (the caller's read-back then fails closed)."""
path = _report_path(repo, work_item_id, branch)
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(render_coverage_report(work_item_id, fields))
except OSError as e:
logger.error("write_coverage_report error for %s/%s: %s", repo, work_item_id, e)
return path
def parse_coverage_status(content: str) -> tuple[bool, str]:
"""Map a 18-coverage-report.md body to a quality-gate verdict by reading ONLY
the machine-readable ``coverage_status:`` YAML frontmatter — never the prose.
Mirrors ``parse_security_status`` (canon: machine verdict only from frontmatter,
AC-9). The negative token (FAIL) is authoritative (checked first). Returns:
* ``coverage_status: PASS`` -> ``(True, "Coverage status: PASS")``
* ``coverage_status: FAIL`` -> ``(False, "Coverage status: FAIL")``
* missing field / no frontmatter / bad YAML -> ``(False, <reason>)``.
Parse delegated to the unified ``frontmatter.parse_frontmatter`` primitive
(ORCH-052c single source of YAML-frontmatter logic).
"""
from .frontmatter import parse_frontmatter
parse = parse_frontmatter(content)
if parse.yaml_error is not None:
return False, f"Invalid YAML frontmatter in coverage report: {parse.yaml_error}"
status = None
if parse.has_block and not parse.malformed:
status = str(parse.data.get("coverage_status", "")).upper().strip()
if status == "FAIL":
return False, "Coverage status: FAIL"
if status == "PASS":
return True, "Coverage status: PASS"
return False, f"No machine-readable coverage_status in frontmatter (got: {status!r})"
def read_measured_coverage(content: str) -> float | None:
"""Read ``measured_coverage`` (%, float) from a 18-coverage-report.md body via
the unified frontmatter parser. ``None`` when absent / unparseable (ratchet then
no-ops). Never raises.
"""
try:
from .frontmatter import parse_frontmatter
parse = parse_frontmatter(content)
if not parse.has_block or parse.malformed:
return None
raw = parse.data.get("measured_coverage")
if raw is None or (isinstance(raw, str) and not raw.strip()):
return None
return float(raw)
except (TypeError, ValueError):
return None
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("read_measured_coverage error: %s", e)
return None
def _error_fields(work_item_id, floor, policy, epsilon, baseline, *, fail_closed: bool) -> dict:
"""Build the report fields for a tool-error pass (FR-6)."""
status = "FAIL" if fail_closed else "PASS"
mode = "fail-closed (FAIL)" if fail_closed else "fail-open (WARNING)"
return {
"coverage_status": status,
"measured_coverage": None,
"baseline": baseline,
"floor": floor,
"policy": policy,
"epsilon": epsilon,
"delta": None,
"reason": f"coverage measurement failed -> {mode}",
"measurement": (
"coverage tool error / unparseable metric "
f"(coverage_tool_fail_closed={fail_closed})"
),
"policy_detail": f"policy={policy}, floor={floor}, baseline={baseline}, epsilon={epsilon}",
}
# ---------------------------------------------------------------------------
# Ratchet baseline UP on a confirmed merge (FR-4 / D5)
# ---------------------------------------------------------------------------
def ratchet_baseline_on_merge(repo: str, work_item_id: str, branch: str, sha: str | None = None) -> bool:
"""Raise the per-repo coverage baseline UP from the merged branch's measured
coverage. Called from ``_handle_merge_verify`` (deploy -> done edge) AFTER the
merge is confirmed and BEFORE the task advances to ``done`` (D5).
Reads the measured value from ``18-coverage-report.md`` (single source of truth
— the exact metric the gate wrote on the deploy-staging->deploy edge) and applies
an atomic compare-and-set (``db.ratchet_coverage_baseline``) that never lowers
the baseline. Bootstrap: the first applicable merge seeds the baseline.
Returns True iff the baseline was inserted/raised. never-raise (AC-7): any error
-> False (observability best-effort; a ratchet failure must never break the
deploy->done path).
"""
try:
if not coverage_gate_applies(repo):
return False
path = _report_path(repo, work_item_id, branch)
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
except OSError as e:
logger.warning(
"ratchet: cannot read coverage report for %s/%s: %s", repo, work_item_id, e
)
return False
measured = read_measured_coverage(content)
if measured is None:
logger.warning(
"ratchet: no measured_coverage in report for %s/%s", repo, work_item_id
)
return False
from . import db
updated = db.ratchet_coverage_baseline(repo, measured, sha)
if updated:
logger.info(
"coverage baseline ratcheted for %s -> %.2f%% (sha=%s)", repo, measured, sha
)
else:
logger.info(
"coverage baseline unchanged for %s (measured %.2f%% not above current)",
repo, measured,
)
return updated
except Exception as e: # noqa: BLE001 - never-raise contract
logger.error("ratchet_baseline_on_merge error for %s/%s: %s", repo, work_item_id, e)
return False
# ---------------------------------------------------------------------------
# Orchestrating entry — delegated to by qg.checks.check_coverage_gate
# ---------------------------------------------------------------------------
def check_coverage_gate(repo: str, work_item_id: str, branch: str) -> tuple[bool, str]:
"""ORCH-027 coverage-gate on the deploy-staging -> deploy edge (after merge-gate).
Deterministic, no LLM. Algorithm (ADR-001 D1..D7):
1. Conditionality: ``coverage_gate_enabled=False`` -> ``(True, "...disabled")``;
a repo the gate is not real for -> ``(True, "coverage-gate N/A for <repo>")``.
2. ``measure_coverage`` (pytest --cov=src in the worktree). ``None`` (tool
error) -> fail-open + WARNING by default (``coverage_tool_fail_closed``
flips to FAIL), FR-6.
3. ``compute_coverage_verdict`` -> write ``18-coverage-report.md`` -> read the
verdict BACK via ``parse_coverage_status`` (single source of truth: the
returned verdict == the artefact frontmatter, AC-9).
4. FAIL -> ``(False, reason)`` (engine rolls back to ``development`` + releases
the merge lease); PASS -> ``(True, reason)`` (engine proceeds to
image-freshness).
Never-raise (AC-7): any internal error -> a (bool, reason) pair following the
fail-open default (so an unexpected fault never wedges the autonomous pipeline),
unless ``coverage_tool_fail_closed`` is set.
"""
floor = getattr(settings, "coverage_min_percent", 0.0)
policy = getattr(settings, "coverage_policy", "both")
epsilon = getattr(settings, "coverage_epsilon", 0.5)
try:
if not settings.coverage_gate_enabled:
return True, "coverage-gate disabled"
if not coverage_gate_applies(repo):
return True, f"coverage-gate N/A for {repo}"
from . import db
try:
baseline = db.get_coverage_baseline(repo)
except Exception as e: # noqa: BLE001 - baseline read best-effort
logger.warning("coverage-gate: baseline read error for %s: %s", repo, e)
baseline = None
measured = measure_coverage(repo, branch)
if measured is None:
fail_closed = bool(settings.coverage_tool_fail_closed)
fields = _error_fields(
work_item_id, floor, policy, epsilon, baseline, fail_closed=fail_closed
)
write_coverage_report(repo, work_item_id, branch, fields)
if fail_closed:
logger.warning(
"coverage-gate %s/%s: measurement failed -> fail-CLOSED (FAIL)",
repo, work_item_id,
)
return False, "coverage-gate fail-closed: measurement failed (tool error)"
logger.warning(
"coverage-gate %s/%s: measurement failed -> fail-OPEN + WARNING",
repo, work_item_id,
)
return True, "coverage-gate fail-open (WARNING): measurement failed (tool error)"
ok, reason = compute_coverage_verdict(measured, baseline, floor, policy, epsilon)
delta = compute_delta(measured, baseline, floor)
fields = {
"coverage_status": "PASS" if ok else "FAIL",
"measured_coverage": measured,
"baseline": baseline,
"floor": floor,
"policy": policy,
"epsilon": epsilon,
"delta": delta,
"reason": reason,
"measurement": f"pytest --cov=src: line coverage src/ = {measured:.2f}%",
"policy_detail": (
f"policy={policy}, floor={floor}%, "
f"baseline={'bootstrap' if baseline is None else f'{baseline:.2f}%'}, "
f"epsilon={epsilon}%"
),
}
path = write_coverage_report(repo, work_item_id, branch, fields)
# Read the machine verdict back from the artefact we just wrote — so the
# returned (bool, reason) is guaranteed == the YAML frontmatter (AC-9).
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
except OSError as e:
return False, f"cannot read coverage report (fail-closed): {e}"
verdict_ok, _v = parse_coverage_status(content)
if verdict_ok:
logger.info("coverage-gate passed for %s/%s: %s", repo, work_item_id, reason)
return True, f"coverage OK ({reason})"
# FAIL -> surface loudly (Telegram with the clickable issue number, FR-7).
try:
from .notifications import send_telegram, link_for
base_str = "n/a" if baseline is None else f"{baseline:.2f}%"
send_telegram(
f"\U0001f4c9 {link_for(work_item_id)}: coverage-гейт FAIL — измерено "
f"{measured:.2f}% (floor {floor}%, baseline {base_str}, "
f"delta {delta:+.2f}%). Откат на development для доработки тестов."
)
except Exception as e: # noqa: BLE001 - telegram best-effort
logger.warning("coverage-gate FAIL telegram failed: %s", e)
return False, reason
except Exception as e: # noqa: BLE001 - never-raise contract (AC-7)
logger.error("check_coverage_gate error for %s/%s: %s", repo, branch, e)
# An unexpected internal error follows the fail-open default (anti-loop): a
# coverage-tool/logic fault must not wedge the autonomous pipeline. The
# operator can flip coverage_tool_fail_closed to make it strict.
try:
if settings.coverage_tool_fail_closed:
return False, f"coverage-gate error (fail-closed): {e}"
except Exception: # noqa: BLE001
pass
return True, f"coverage-gate error (fail-open): {e}"
# ---------------------------------------------------------------------------
# Observability snapshot for GET /queue (FR-7 / AC-9)
# ---------------------------------------------------------------------------
def snapshot() -> dict:
"""Read-only coverage-gate summary for GET /queue (FR-7 / AC-9).
Additive block; existing /queue keys are untouched. never-raise: any error ->
a minimal dict with the flags.
"""
try:
enabled = bool(settings.coverage_gate_enabled)
except Exception: # noqa: BLE001
enabled = False
out = {
"enabled": enabled,
"repos": getattr(settings, "coverage_gate_repos", "") or "",
"policy": getattr(settings, "coverage_policy", "both"),
"floor": getattr(settings, "coverage_min_percent", 0.0),
"epsilon": getattr(settings, "coverage_epsilon", 0.5),
"fail_closed": bool(getattr(settings, "coverage_tool_fail_closed", False)),
"baselines": {},
}
try:
from . import db
out["baselines"] = db.all_coverage_baselines()
except Exception as e: # noqa: BLE001 - never-raise -> empty baselines
logger.warning("coverage snapshot baselines error: %s", e)
return out