feat(coverage): deterministic test-coverage gate on deploy-staging->deploy edge (ORCH-027)
Introduce a deterministic (no-LLM) coverage sub-gate that blocks coverage degradation before a task branch merges into `main`. Existing gates judge only by the FACT of passing (check_ci_green / check_tests_passed / merge-gate re-test), not by completeness — so a batch autonomous run (ORCH-088) silently erodes coverage. Pattern mirrors the security-gate (ORCH-022): leaf src/coverage_gate.py (never-raise) + thin check_coverage_gate in QG_CHECKS + _handle_coverage_gate splice in advance_stage, run AFTER merge-gate (measured on the caught-up HEAD that lands in main) and BEFORE image-freshness (fail before the expensive docker rebuild). - measure_coverage: pytest --cov=src --cov-report=json in the per-branch worktree -> line coverage %; None on tool error -> fail-open + WARNING by default (FR-6). - compute_coverage_verdict (pure): absolute | baseline | both + epsilon (NFR-4 anti-flap); baseline None -> bootstrap (absolute-only). - coverage_baseline DB table (additive, CREATE TABLE IF NOT EXISTS) + ratchet-up in _handle_merge_verify (deploy->done): atomic compare-and-set under merge-lease, never decreases; bootstrap on first merge. - Artefact 18-coverage-report.md (coverage_status: frontmatter, single source of truth); GET /queue `coverage` block; FAIL -> Telegram; optional POST /coverage/baseline override. - Flags ORCH_COVERAGE_* (kill-switch + self-hosting-only scope) -> enduro untouched; STAGE_TRANSITIONS / existing check_* / verdict keys byte-for-byte unchanged (NFR-5/AC-8). - pytest-cov==5.0.0 added to requirements.txt. Tests: tests/test_coverage_gate.py (TC-01..TC-15). Frozen QG-registry anti-regress tests + deploy-staging edge tests updated for the new sub-gate. Full suite green. Docs: README / adr-0029 / PIPELINE_DOCS / 18-coverage-report.md template (architecture stage) + CHANGELOG / CLAUDE.md / .env.example (this PR). Refs: ORCH-027 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
616
src/coverage_gate.py
Normal file
616
src/coverage_gate.py
Normal file
@@ -0,0 +1,616 @@
|
||||
"""Coverage-gate core (ORCH-027): deterministic test-coverage gate before merge.
|
||||
|
||||
Background
|
||||
----------
|
||||
The orchestrator runs autonomous development: the ``developer`` agent writes code
|
||||
with no human filter, and on ``testing`` the ``tester`` agent decides for itself
|
||||
whether the tests are enough. The existing test gates judge only by the FACT of
|
||||
passing, never by COMPLETENESS: ``check_ci_green`` and ``check_tests_passed`` and
|
||||
the merge-gate re-test all look at a pytest exit code. None of them notices "300
|
||||
lines of new code, 0 tests". Across a batch autonomous run (ORCH-088) that means a
|
||||
monotonic erosion of coverage — every task shaves a corner on tests and the project
|
||||
silently loses testability.
|
||||
|
||||
This module provides the deterministic (no-LLM) primitives that the quality-gate
|
||||
``check_coverage_gate`` (src/qg/checks.py) composes on the ``deploy-staging ->
|
||||
deploy`` edge — run **AFTER the merge-gate** (so coverage is measured on the
|
||||
caught-up HEAD that actually lands in ``main``) and **BEFORE image-freshness** (fail
|
||||
before the expensive docker rebuild), mirroring the security-gate (ORCH-022):
|
||||
|
||||
* ``measure_coverage`` -> run ``pytest --cov=src`` in the per-branch
|
||||
worktree (offline) -> line coverage ``%`` or
|
||||
``None`` on tool error.
|
||||
* ``compute_coverage_verdict`` -> pure: compare (measured, baseline, floor) under
|
||||
a policy + epsilon -> ``(ok, reason)``.
|
||||
* ``write_coverage_report`` / ``parse_coverage_status`` -> write the
|
||||
``18-coverage-report.md`` artefact and read its machine verdict back (single
|
||||
source of truth: the gate returns exactly the frontmatter it wrote, AC-9).
|
||||
* ``ratchet_baseline_on_merge`` -> on a CONFIRMED merge (``_handle_merge_verify``,
|
||||
``deploy -> done`` edge) raise the per-repo baseline UP from the merged branch's
|
||||
measured coverage (atomic compare-and-set, never decreases — FR-4 / D5).
|
||||
* ``check_coverage_gate`` -> the orchestrating entry the QG wrapper delegates
|
||||
to.
|
||||
|
||||
Invariants (ADR-001 §7, never broken):
|
||||
* **Tool error -> fail-open + WARNING by default** (FR-6/AC-6): a coverage-tool
|
||||
failure / unparseable metric degrades fail-open (anti-loop, precedent
|
||||
ORCH-061/022 dep-audit); ``coverage_tool_fail_closed`` flips it to strict.
|
||||
* **never-raise** (AC-7): any internal error is swallowed; an exception never
|
||||
escapes into ``advance_stage``.
|
||||
* **Baseline never decreases** (FR-4): the ratchet is an atomic SQL compare-and-set
|
||||
under the held merge-lease (ORCH-043), so two parallel merges can never lower or
|
||||
lose the value.
|
||||
* **Self-hosting safety** (AC-7): the gate only measures / reads / writes the
|
||||
artefact / decides. It never calls the deploy hook, never restarts the prod
|
||||
container, never pushes / force-pushes ``main``.
|
||||
|
||||
This module is a **leaf**: it imports only ``config`` / ``git_worktree`` and lazily
|
||||
``qg.checks.is_self_hosting_repo`` / ``db`` / ``notifications``; it never imports
|
||||
``stage_engine``.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from .config import settings
|
||||
from .git_worktree import ensure_worktree, get_worktree_path
|
||||
|
||||
logger = logging.getLogger("orchestrator.coverage_gate")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Conditionality (mirrors security_gate_applies / _merge_gate_applies)
|
||||
# ---------------------------------------------------------------------------
|
||||
def coverage_gate_applies(repo: str) -> bool:
|
||||
"""Whether the coverage-gate is REAL for this repo (conditional rollout).
|
||||
|
||||
Mirrors the ORCH-22 / ORCH-43 / ORCH-58 pattern:
|
||||
* ``coverage_gate_enabled=False`` -> always False (kill-switch; pipeline is
|
||||
1:1 as before ORCH-027 for everyone).
|
||||
* ``coverage_gate_repos`` (CSV) non-empty -> real only for the listed repos.
|
||||
* empty CSV -> real ONLY for the self-hosting repo (``orchestrator``).
|
||||
Never raises (AC-7): any error -> False (the safe no-op default).
|
||||
"""
|
||||
try:
|
||||
if not settings.coverage_gate_enabled:
|
||||
return False
|
||||
raw = (settings.coverage_gate_repos or "").strip()
|
||||
if raw:
|
||||
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
|
||||
return (repo or "").strip().lower() in allowed
|
||||
# Lazy import keeps this module a leaf (no qg import at module load).
|
||||
from .qg.checks import is_self_hosting_repo
|
||||
return is_self_hosting_repo(repo)
|
||||
except Exception as e: # noqa: BLE001 - never-raise contract
|
||||
logger.warning("coverage_gate_applies error for %s: %s", repo, e)
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Measurement (pytest --cov=src in the per-branch worktree) — FR-1 / D2
|
||||
# ---------------------------------------------------------------------------
|
||||
def parse_coverage_percent(data) -> float | None:
|
||||
"""Pure: extract ``totals.percent_covered`` (line coverage ``%``) from a
|
||||
coverage.py JSON dict. Returns ``None`` if the shape is missing / unparseable.
|
||||
Never raises.
|
||||
"""
|
||||
try:
|
||||
if not isinstance(data, dict):
|
||||
return None
|
||||
totals = data.get("totals")
|
||||
if not isinstance(totals, dict):
|
||||
return None
|
||||
pct = totals.get("percent_covered")
|
||||
if pct is None:
|
||||
return None
|
||||
return float(pct)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def measure_coverage(repo: str, branch: str) -> float | None:
|
||||
"""Run ``pytest --cov=src`` in the per-branch worktree -> line coverage ``%``.
|
||||
|
||||
Scope is ``src/`` only (the tests themselves are out of scope, BRD §«Вне
|
||||
объёма»). Offline — coverage needs no network. The measurer is intentionally
|
||||
encapsulated here so the pure decision logic and the baseline storage are
|
||||
stack-agnostic (a future jest/jacoco measurer is a new ``measure_*`` branch,
|
||||
BR-6).
|
||||
|
||||
The coverage metric is read from the ``--cov-report=json`` file regardless of
|
||||
the pytest exit code: a non-zero exit because of *failing tests* is already
|
||||
caught upstream (``check_ci_green`` / merge-gate re-test), and a partial run
|
||||
still produces a meaningful coverage JSON. A genuine tool error (missing
|
||||
plugin / timeout / no JSON / unparseable) -> ``None`` (the caller degrades
|
||||
fail-open by default, FR-6). Never raises (AC-7).
|
||||
"""
|
||||
try:
|
||||
wt = ensure_worktree(repo, branch)
|
||||
except Exception as e: # noqa: BLE001 - never-raise contract
|
||||
logger.warning("measure_coverage: worktree error for %s/%s: %s", repo, branch, e)
|
||||
return None
|
||||
|
||||
cov_json = os.path.join(wt, ".coverage-report.json")
|
||||
# Remove a stale report so we never read a previous pass's metric.
|
||||
try:
|
||||
if os.path.isfile(cov_json):
|
||||
os.remove(cov_json)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
cmd = [
|
||||
"python", "-m", "pytest", "tests/",
|
||||
"--cov=src",
|
||||
f"--cov-report=json:{cov_json}",
|
||||
"--cov-report=", # suppress the terminal cov report (json only)
|
||||
"-q",
|
||||
]
|
||||
timeout = settings.coverage_run_timeout_s
|
||||
try:
|
||||
subprocess.run(cmd, cwd=wt, capture_output=True, text=True, timeout=timeout)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(
|
||||
"measure_coverage: pytest --cov timed out after %ss for %s/%s",
|
||||
timeout, repo, branch,
|
||||
)
|
||||
return None
|
||||
except FileNotFoundError:
|
||||
logger.warning(
|
||||
"measure_coverage: pytest / pytest-cov not available for %s/%s", repo, branch
|
||||
)
|
||||
return None
|
||||
except (subprocess.SubprocessError, OSError) as e:
|
||||
logger.warning("measure_coverage: pytest --cov error for %s/%s: %s", repo, branch, e)
|
||||
return None
|
||||
|
||||
data = None
|
||||
try:
|
||||
if not os.path.isfile(cov_json):
|
||||
logger.warning(
|
||||
"measure_coverage: no coverage json produced for %s/%s", repo, branch
|
||||
)
|
||||
return None
|
||||
with open(cov_json, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
except (OSError, ValueError) as e:
|
||||
logger.warning(
|
||||
"measure_coverage: cannot parse coverage json for %s/%s: %s", repo, branch, e
|
||||
)
|
||||
return None
|
||||
finally:
|
||||
try:
|
||||
if os.path.isfile(cov_json):
|
||||
os.remove(cov_json)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
return parse_coverage_percent(data)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pure decision (FR-2 / D3) — the core of the unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
def compute_coverage_verdict(measured, baseline, floor, policy, epsilon) -> tuple[bool, str]:
|
||||
"""Pure: decide PASS/FAIL from (measured, baseline, floor, policy, epsilon).
|
||||
|
||||
Deterministic, no LLM, no I/O. Returns ``(ok: bool, reason: str)``.
|
||||
|
||||
* ``policy = "absolute"`` -> PASS ⇔ ``measured >= floor - epsilon``.
|
||||
* ``policy = "baseline"`` -> PASS ⇔ ``measured >= baseline - epsilon``.
|
||||
* ``policy = "both"`` (default) -> PASS ⇔ BOTH conditions hold.
|
||||
* ``baseline is None`` (no stored baseline / bootstrap) -> the baseline
|
||||
condition does NOT apply (cannot regress against nothing); only the
|
||||
absolute part decides. For ``policy = "baseline"`` with no baseline this is
|
||||
a bootstrap PASS (the measured value seeds the baseline at merge, D5).
|
||||
* ``epsilon`` — a small non-negative tolerance so jitter at the boundary does
|
||||
not bounce a task (NFR-4).
|
||||
|
||||
Never raises: bad inputs -> ``(False, reason)`` (a verdict cannot be computed ->
|
||||
conservative FAIL for the pure function; the orchestrating entry maps a *tool*
|
||||
error to fail-open separately).
|
||||
"""
|
||||
try:
|
||||
pol = (policy or "both").strip().lower()
|
||||
eps = max(0.0, float(epsilon if epsilon is not None else 0.0))
|
||||
m = float(measured)
|
||||
except (TypeError, ValueError) as e:
|
||||
return False, f"coverage verdict: bad inputs ({e})"
|
||||
|
||||
abs_applicable = pol in ("absolute", "both")
|
||||
base_applicable = pol in ("baseline", "both") and baseline is not None
|
||||
|
||||
checks: list[str] = []
|
||||
ok = True
|
||||
|
||||
if abs_applicable:
|
||||
try:
|
||||
f = float(floor if floor is not None else 0.0)
|
||||
except (TypeError, ValueError):
|
||||
f = 0.0
|
||||
abs_ok = m >= f - eps
|
||||
checks.append(
|
||||
f"absolute {m:.2f}% >= floor {f:.2f}%-eps{eps:.2f} -> "
|
||||
f"{'PASS' if abs_ok else 'FAIL'}"
|
||||
)
|
||||
ok = ok and abs_ok
|
||||
|
||||
if base_applicable:
|
||||
b = float(baseline)
|
||||
base_ok = m >= b - eps
|
||||
checks.append(
|
||||
f"baseline {m:.2f}% >= base {b:.2f}%-eps{eps:.2f} -> "
|
||||
f"{'PASS' if base_ok else 'FAIL'}"
|
||||
)
|
||||
ok = ok and base_ok
|
||||
elif pol in ("baseline", "both") and baseline is None:
|
||||
checks.append("baseline N/A (bootstrap — no stored baseline)")
|
||||
|
||||
body = "; ".join(checks) if checks else "no applicable condition (bootstrap) -> PASS"
|
||||
reason = f"measured={m:.2f}% policy={pol} eps={eps:.2f}: {body}"
|
||||
return ok, reason
|
||||
|
||||
|
||||
def compute_delta(measured, baseline, floor) -> float:
|
||||
"""Pure: signed ``measured - max(applicable references)`` (%, 2 decimals).
|
||||
|
||||
References are the present ones among ``baseline`` / ``floor``. With neither ->
|
||||
``0.0``. Never raises.
|
||||
"""
|
||||
try:
|
||||
m = float(measured)
|
||||
refs = []
|
||||
if baseline is not None:
|
||||
refs.append(float(baseline))
|
||||
if floor is not None:
|
||||
refs.append(float(floor))
|
||||
if not refs:
|
||||
return 0.0
|
||||
return round(m - max(refs), 2)
|
||||
except (TypeError, ValueError):
|
||||
return 0.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Artefact: write the report, read the machine verdict back (FR-7 / D7 / AC-9)
|
||||
# ---------------------------------------------------------------------------
|
||||
def _report_rel(work_item_id: str) -> str:
|
||||
return f"docs/work-items/{work_item_id}/18-coverage-report.md"
|
||||
|
||||
|
||||
def _report_path(repo: str, work_item_id: str, branch: str) -> str:
|
||||
"""Absolute path of 18-coverage-report.md inside the task worktree."""
|
||||
try:
|
||||
wt = get_worktree_path(repo, branch)
|
||||
if not os.path.isdir(wt):
|
||||
wt = ensure_worktree(repo, branch)
|
||||
except Exception: # noqa: BLE001 - never-raise; fall back to shared clone
|
||||
wt = os.path.join(settings.repos_dir, repo)
|
||||
return os.path.join(wt, _report_rel(work_item_id))
|
||||
|
||||
|
||||
def _num(v) -> str:
|
||||
"""Render a numeric field with 2 decimals, or empty for None/unparseable."""
|
||||
if v is None:
|
||||
return ""
|
||||
try:
|
||||
return f"{float(v):.2f}"
|
||||
except (TypeError, ValueError):
|
||||
return ""
|
||||
|
||||
|
||||
def render_coverage_report(work_item_id: str, fields: dict) -> str:
|
||||
"""Pure: render the 18-coverage-report.md content (frontmatter + body).
|
||||
|
||||
The machine verdict lives ONLY in the YAML frontmatter ``coverage_status:``
|
||||
(canon, regiser-sensitive); ``measured_coverage`` is the single source of truth
|
||||
for the ratchet (D5). Never raises.
|
||||
"""
|
||||
baseline = fields.get("baseline")
|
||||
baseline_str = "" if baseline is None else _num(baseline)
|
||||
return (
|
||||
"---\n"
|
||||
f"coverage_status: {fields.get('coverage_status', 'FAIL')}\n"
|
||||
f"work_item: {work_item_id}\n"
|
||||
f"measured_coverage: {_num(fields.get('measured_coverage'))}\n"
|
||||
f"baseline: {baseline_str}\n"
|
||||
f"floor: {_num(fields.get('floor'))}\n"
|
||||
f"policy: {fields.get('policy', 'both')}\n"
|
||||
f"epsilon: {_num(fields.get('epsilon'))}\n"
|
||||
f"delta: {_num(fields.get('delta'))}\n"
|
||||
"---\n"
|
||||
f"# Coverage Report — {work_item_id}\n\n"
|
||||
"Детерминированный гейт покрытия (ORCH-027) — под-гейт ребра "
|
||||
"`deploy-staging→deploy` (ПОСЛЕ merge-gate, ДО image-freshness). Машинный "
|
||||
"вердикт читается ТОЛЬКО из `coverage_status:` frontmatter выше.\n\n"
|
||||
"## Verdict\n"
|
||||
f"{fields.get('reason', '')}\n\n"
|
||||
"## Measurement\n"
|
||||
f"{fields.get('measurement', '')}\n\n"
|
||||
"## Policy\n"
|
||||
f"{fields.get('policy_detail', '')}\n"
|
||||
)
|
||||
|
||||
|
||||
def write_coverage_report(repo: str, work_item_id: str, branch: str, fields: dict) -> str:
|
||||
"""Write 18-coverage-report.md into the task worktree; return its path.
|
||||
|
||||
Best-effort / never-raise: a write error is logged and the path is still
|
||||
returned (the caller's read-back then fails closed)."""
|
||||
path = _report_path(repo, work_item_id, branch)
|
||||
try:
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
f.write(render_coverage_report(work_item_id, fields))
|
||||
except OSError as e:
|
||||
logger.error("write_coverage_report error for %s/%s: %s", repo, work_item_id, e)
|
||||
return path
|
||||
|
||||
|
||||
def parse_coverage_status(content: str) -> tuple[bool, str]:
|
||||
"""Map a 18-coverage-report.md body to a quality-gate verdict by reading ONLY
|
||||
the machine-readable ``coverage_status:`` YAML frontmatter — never the prose.
|
||||
|
||||
Mirrors ``parse_security_status`` (canon: machine verdict only from frontmatter,
|
||||
AC-9). The negative token (FAIL) is authoritative (checked first). Returns:
|
||||
* ``coverage_status: PASS`` -> ``(True, "Coverage status: PASS")``
|
||||
* ``coverage_status: FAIL`` -> ``(False, "Coverage status: FAIL")``
|
||||
* missing field / no frontmatter / bad YAML -> ``(False, <reason>)``.
|
||||
|
||||
Parse delegated to the unified ``frontmatter.parse_frontmatter`` primitive
|
||||
(ORCH-052c single source of YAML-frontmatter logic).
|
||||
"""
|
||||
from .frontmatter import parse_frontmatter
|
||||
|
||||
parse = parse_frontmatter(content)
|
||||
if parse.yaml_error is not None:
|
||||
return False, f"Invalid YAML frontmatter in coverage report: {parse.yaml_error}"
|
||||
status = None
|
||||
if parse.has_block and not parse.malformed:
|
||||
status = str(parse.data.get("coverage_status", "")).upper().strip()
|
||||
if status == "FAIL":
|
||||
return False, "Coverage status: FAIL"
|
||||
if status == "PASS":
|
||||
return True, "Coverage status: PASS"
|
||||
return False, f"No machine-readable coverage_status in frontmatter (got: {status!r})"
|
||||
|
||||
|
||||
def read_measured_coverage(content: str) -> float | None:
|
||||
"""Read ``measured_coverage`` (%, float) from a 18-coverage-report.md body via
|
||||
the unified frontmatter parser. ``None`` when absent / unparseable (ratchet then
|
||||
no-ops). Never raises.
|
||||
"""
|
||||
try:
|
||||
from .frontmatter import parse_frontmatter
|
||||
parse = parse_frontmatter(content)
|
||||
if not parse.has_block or parse.malformed:
|
||||
return None
|
||||
raw = parse.data.get("measured_coverage")
|
||||
if raw is None or (isinstance(raw, str) and not raw.strip()):
|
||||
return None
|
||||
return float(raw)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("read_measured_coverage error: %s", e)
|
||||
return None
|
||||
|
||||
|
||||
def _error_fields(work_item_id, floor, policy, epsilon, baseline, *, fail_closed: bool) -> dict:
|
||||
"""Build the report fields for a tool-error pass (FR-6)."""
|
||||
status = "FAIL" if fail_closed else "PASS"
|
||||
mode = "fail-closed (FAIL)" if fail_closed else "fail-open (WARNING)"
|
||||
return {
|
||||
"coverage_status": status,
|
||||
"measured_coverage": None,
|
||||
"baseline": baseline,
|
||||
"floor": floor,
|
||||
"policy": policy,
|
||||
"epsilon": epsilon,
|
||||
"delta": None,
|
||||
"reason": f"coverage measurement failed -> {mode}",
|
||||
"measurement": (
|
||||
"coverage tool error / unparseable metric "
|
||||
f"(coverage_tool_fail_closed={fail_closed})"
|
||||
),
|
||||
"policy_detail": f"policy={policy}, floor={floor}, baseline={baseline}, epsilon={epsilon}",
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Ratchet baseline UP on a confirmed merge (FR-4 / D5)
|
||||
# ---------------------------------------------------------------------------
|
||||
def ratchet_baseline_on_merge(repo: str, work_item_id: str, branch: str, sha: str | None = None) -> bool:
|
||||
"""Raise the per-repo coverage baseline UP from the merged branch's measured
|
||||
coverage. Called from ``_handle_merge_verify`` (deploy -> done edge) AFTER the
|
||||
merge is confirmed and BEFORE the task advances to ``done`` (D5).
|
||||
|
||||
Reads the measured value from ``18-coverage-report.md`` (single source of truth
|
||||
— the exact metric the gate wrote on the deploy-staging->deploy edge) and applies
|
||||
an atomic compare-and-set (``db.ratchet_coverage_baseline``) that never lowers
|
||||
the baseline. Bootstrap: the first applicable merge seeds the baseline.
|
||||
|
||||
Returns True iff the baseline was inserted/raised. never-raise (AC-7): any error
|
||||
-> False (observability best-effort; a ratchet failure must never break the
|
||||
deploy->done path).
|
||||
"""
|
||||
try:
|
||||
if not coverage_gate_applies(repo):
|
||||
return False
|
||||
path = _report_path(repo, work_item_id, branch)
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
except OSError as e:
|
||||
logger.warning(
|
||||
"ratchet: cannot read coverage report for %s/%s: %s", repo, work_item_id, e
|
||||
)
|
||||
return False
|
||||
measured = read_measured_coverage(content)
|
||||
if measured is None:
|
||||
logger.warning(
|
||||
"ratchet: no measured_coverage in report for %s/%s", repo, work_item_id
|
||||
)
|
||||
return False
|
||||
from . import db
|
||||
updated = db.ratchet_coverage_baseline(repo, measured, sha)
|
||||
if updated:
|
||||
logger.info(
|
||||
"coverage baseline ratcheted for %s -> %.2f%% (sha=%s)", repo, measured, sha
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"coverage baseline unchanged for %s (measured %.2f%% not above current)",
|
||||
repo, measured,
|
||||
)
|
||||
return updated
|
||||
except Exception as e: # noqa: BLE001 - never-raise contract
|
||||
logger.error("ratchet_baseline_on_merge error for %s/%s: %s", repo, work_item_id, e)
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Orchestrating entry — delegated to by qg.checks.check_coverage_gate
|
||||
# ---------------------------------------------------------------------------
|
||||
def check_coverage_gate(repo: str, work_item_id: str, branch: str) -> tuple[bool, str]:
|
||||
"""ORCH-027 coverage-gate on the deploy-staging -> deploy edge (after merge-gate).
|
||||
|
||||
Deterministic, no LLM. Algorithm (ADR-001 D1..D7):
|
||||
1. Conditionality: ``coverage_gate_enabled=False`` -> ``(True, "...disabled")``;
|
||||
a repo the gate is not real for -> ``(True, "coverage-gate N/A for <repo>")``.
|
||||
2. ``measure_coverage`` (pytest --cov=src in the worktree). ``None`` (tool
|
||||
error) -> fail-open + WARNING by default (``coverage_tool_fail_closed``
|
||||
flips to FAIL), FR-6.
|
||||
3. ``compute_coverage_verdict`` -> write ``18-coverage-report.md`` -> read the
|
||||
verdict BACK via ``parse_coverage_status`` (single source of truth: the
|
||||
returned verdict == the artefact frontmatter, AC-9).
|
||||
4. FAIL -> ``(False, reason)`` (engine rolls back to ``development`` + releases
|
||||
the merge lease); PASS -> ``(True, reason)`` (engine proceeds to
|
||||
image-freshness).
|
||||
|
||||
Never-raise (AC-7): any internal error -> a (bool, reason) pair following the
|
||||
fail-open default (so an unexpected fault never wedges the autonomous pipeline),
|
||||
unless ``coverage_tool_fail_closed`` is set.
|
||||
"""
|
||||
floor = getattr(settings, "coverage_min_percent", 0.0)
|
||||
policy = getattr(settings, "coverage_policy", "both")
|
||||
epsilon = getattr(settings, "coverage_epsilon", 0.5)
|
||||
try:
|
||||
if not settings.coverage_gate_enabled:
|
||||
return True, "coverage-gate disabled"
|
||||
if not coverage_gate_applies(repo):
|
||||
return True, f"coverage-gate N/A for {repo}"
|
||||
|
||||
from . import db
|
||||
try:
|
||||
baseline = db.get_coverage_baseline(repo)
|
||||
except Exception as e: # noqa: BLE001 - baseline read best-effort
|
||||
logger.warning("coverage-gate: baseline read error for %s: %s", repo, e)
|
||||
baseline = None
|
||||
|
||||
measured = measure_coverage(repo, branch)
|
||||
if measured is None:
|
||||
fail_closed = bool(settings.coverage_tool_fail_closed)
|
||||
fields = _error_fields(
|
||||
work_item_id, floor, policy, epsilon, baseline, fail_closed=fail_closed
|
||||
)
|
||||
write_coverage_report(repo, work_item_id, branch, fields)
|
||||
if fail_closed:
|
||||
logger.warning(
|
||||
"coverage-gate %s/%s: measurement failed -> fail-CLOSED (FAIL)",
|
||||
repo, work_item_id,
|
||||
)
|
||||
return False, "coverage-gate fail-closed: measurement failed (tool error)"
|
||||
logger.warning(
|
||||
"coverage-gate %s/%s: measurement failed -> fail-OPEN + WARNING",
|
||||
repo, work_item_id,
|
||||
)
|
||||
return True, "coverage-gate fail-open (WARNING): measurement failed (tool error)"
|
||||
|
||||
ok, reason = compute_coverage_verdict(measured, baseline, floor, policy, epsilon)
|
||||
delta = compute_delta(measured, baseline, floor)
|
||||
fields = {
|
||||
"coverage_status": "PASS" if ok else "FAIL",
|
||||
"measured_coverage": measured,
|
||||
"baseline": baseline,
|
||||
"floor": floor,
|
||||
"policy": policy,
|
||||
"epsilon": epsilon,
|
||||
"delta": delta,
|
||||
"reason": reason,
|
||||
"measurement": f"pytest --cov=src: line coverage src/ = {measured:.2f}%",
|
||||
"policy_detail": (
|
||||
f"policy={policy}, floor={floor}%, "
|
||||
f"baseline={'bootstrap' if baseline is None else f'{baseline:.2f}%'}, "
|
||||
f"epsilon={epsilon}%"
|
||||
),
|
||||
}
|
||||
path = write_coverage_report(repo, work_item_id, branch, fields)
|
||||
|
||||
# Read the machine verdict back from the artefact we just wrote — so the
|
||||
# returned (bool, reason) is guaranteed == the YAML frontmatter (AC-9).
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
except OSError as e:
|
||||
return False, f"cannot read coverage report (fail-closed): {e}"
|
||||
verdict_ok, _v = parse_coverage_status(content)
|
||||
|
||||
if verdict_ok:
|
||||
logger.info("coverage-gate passed for %s/%s: %s", repo, work_item_id, reason)
|
||||
return True, f"coverage OK ({reason})"
|
||||
|
||||
# FAIL -> surface loudly (Telegram with the clickable issue number, FR-7).
|
||||
try:
|
||||
from .notifications import send_telegram, link_for
|
||||
base_str = "n/a" if baseline is None else f"{baseline:.2f}%"
|
||||
send_telegram(
|
||||
f"\U0001f4c9 {link_for(work_item_id)}: coverage-гейт FAIL — измерено "
|
||||
f"{measured:.2f}% (floor {floor}%, baseline {base_str}, "
|
||||
f"delta {delta:+.2f}%). Откат на development для доработки тестов."
|
||||
)
|
||||
except Exception as e: # noqa: BLE001 - telegram best-effort
|
||||
logger.warning("coverage-gate FAIL telegram failed: %s", e)
|
||||
return False, reason
|
||||
except Exception as e: # noqa: BLE001 - never-raise contract (AC-7)
|
||||
logger.error("check_coverage_gate error for %s/%s: %s", repo, branch, e)
|
||||
# An unexpected internal error follows the fail-open default (anti-loop): a
|
||||
# coverage-tool/logic fault must not wedge the autonomous pipeline. The
|
||||
# operator can flip coverage_tool_fail_closed to make it strict.
|
||||
try:
|
||||
if settings.coverage_tool_fail_closed:
|
||||
return False, f"coverage-gate error (fail-closed): {e}"
|
||||
except Exception: # noqa: BLE001
|
||||
pass
|
||||
return True, f"coverage-gate error (fail-open): {e}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Observability snapshot for GET /queue (FR-7 / AC-9)
|
||||
# ---------------------------------------------------------------------------
|
||||
def snapshot() -> dict:
|
||||
"""Read-only coverage-gate summary for GET /queue (FR-7 / AC-9).
|
||||
|
||||
Additive block; existing /queue keys are untouched. never-raise: any error ->
|
||||
a minimal dict with the flags.
|
||||
"""
|
||||
try:
|
||||
enabled = bool(settings.coverage_gate_enabled)
|
||||
except Exception: # noqa: BLE001
|
||||
enabled = False
|
||||
out = {
|
||||
"enabled": enabled,
|
||||
"repos": getattr(settings, "coverage_gate_repos", "") or "",
|
||||
"policy": getattr(settings, "coverage_policy", "both"),
|
||||
"floor": getattr(settings, "coverage_min_percent", 0.0),
|
||||
"epsilon": getattr(settings, "coverage_epsilon", 0.5),
|
||||
"fail_closed": bool(getattr(settings, "coverage_tool_fail_closed", False)),
|
||||
"baselines": {},
|
||||
}
|
||||
try:
|
||||
from . import db
|
||||
out["baselines"] = db.all_coverage_baselines()
|
||||
except Exception as e: # noqa: BLE001 - never-raise -> empty baselines
|
||||
logger.warning("coverage snapshot baselines error: %s", e)
|
||||
return out
|
||||
Reference in New Issue
Block a user