feat(coverage): deterministic test-coverage gate on deploy-staging->deploy edge (ORCH-027)
Some checks failed
CI / test (push) Failing after 48s
CI / test (pull_request) Failing after 42s

Introduce a deterministic (no-LLM) coverage sub-gate that blocks coverage
degradation before a task branch merges into `main`. Existing gates judge only by
the FACT of passing (check_ci_green / check_tests_passed / merge-gate re-test), not
by completeness — so a batch autonomous run (ORCH-088) silently erodes coverage.

Pattern mirrors the security-gate (ORCH-022): leaf src/coverage_gate.py (never-raise)
+ thin check_coverage_gate in QG_CHECKS + _handle_coverage_gate splice in advance_stage,
run AFTER merge-gate (measured on the caught-up HEAD that lands in main) and BEFORE
image-freshness (fail before the expensive docker rebuild).

- measure_coverage: pytest --cov=src --cov-report=json in the per-branch worktree ->
  line coverage %; None on tool error -> fail-open + WARNING by default (FR-6).
- compute_coverage_verdict (pure): absolute | baseline | both + epsilon (NFR-4 anti-flap);
  baseline None -> bootstrap (absolute-only).
- coverage_baseline DB table (additive, CREATE TABLE IF NOT EXISTS) + ratchet-up in
  _handle_merge_verify (deploy->done): atomic compare-and-set under merge-lease, never
  decreases; bootstrap on first merge.
- Artefact 18-coverage-report.md (coverage_status: frontmatter, single source of truth);
  GET /queue `coverage` block; FAIL -> Telegram; optional POST /coverage/baseline override.
- Flags ORCH_COVERAGE_* (kill-switch + self-hosting-only scope) -> enduro untouched;
  STAGE_TRANSITIONS / existing check_* / verdict keys byte-for-byte unchanged (NFR-5/AC-8).
- pytest-cov==5.0.0 added to requirements.txt.

Tests: tests/test_coverage_gate.py (TC-01..TC-15). Frozen QG-registry anti-regress
tests + deploy-staging edge tests updated for the new sub-gate. Full suite green.

Docs: README / adr-0029 / PIPELINE_DOCS / 18-coverage-report.md template (architecture
stage) + CHANGELOG / CLAUDE.md / .env.example (this PR).

Refs: ORCH-027
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-10 01:04:21 +03:00
parent c0dc1940a6
commit b4b993cf63
16 changed files with 1496 additions and 2 deletions

View File

@@ -259,6 +259,38 @@ class Settings(BaseSettings):
security_dep_audit_fail_closed: bool = False
security_secrets_block: bool = True
# ORCH-027: deterministic test-coverage gate on the deploy-staging -> deploy edge
# (AFTER the merge-gate, BEFORE image-freshness). Measures line coverage of src/
# under pytest-cov in the per-branch worktree, compares to an absolute floor and/or
# the ratchet baseline of `main`, and FAILs (rollback to development + developer
# retry) on degradation. Leaf src/coverage_gate.py (never-raise); machine verdict in
# 18-coverage-report.md frontmatter (coverage_status:). See ADR-001-coverage-gate.md.
# coverage_gate_enabled -> SINGLE kill-switch; False -> pipeline 1:1 as before
# ORCH-027 for everyone. Env ORCH_COVERAGE_GATE_ENABLED.
# coverage_gate_repos -> CSV of repos where the gate is REAL; empty -> only
# the self-hosting repo (orchestrator). Mirrors
# security_gate_repos / image_freshness_repos.
# coverage_min_percent -> absolute floor (% line coverage) for policy
# absolute/both. Default 0.0 -> safe rollout: the
# ratchet baseline drives no-regression, the floor
# never false-fails day one.
# coverage_policy -> absolute | baseline | both (default both): which
# condition(s) must hold (D3).
# coverage_epsilon -> small non-negative noise tolerance (%) so jitter at
# the boundary does not bounce a task (NFR-4).
# coverage_tool_fail_closed -> strict mode: a coverage-tool error -> FAIL instead
# of the default fail-open + warning (FR-6). Default
# False (anti-loop, precedent ORCH-061/022).
# coverage_run_timeout_s -> wall-clock budget for the pytest --cov run (mirrors
# merge_retest_timeout_s / security_scan_timeout_s).
coverage_gate_enabled: bool = True
coverage_gate_repos: str = ""
coverage_min_percent: float = 0.0
coverage_policy: str = "both"
coverage_epsilon: float = 0.5
coverage_tool_fail_closed: bool = False
coverage_run_timeout_s: int = 900
# ORCH-061: tolerate KNOWN sandbox-infra FAILs (C9a/C9b) in the staging suite.
# The self-hosting deploy-staging stage looped because scripts/staging_check.py
# exited non-zero on ANY failed check, so two infra-only failures (sandbox bot

616
src/coverage_gate.py Normal file
View File

@@ -0,0 +1,616 @@
"""Coverage-gate core (ORCH-027): deterministic test-coverage gate before merge.
Background
----------
The orchestrator runs autonomous development: the ``developer`` agent writes code
with no human filter, and on ``testing`` the ``tester`` agent decides for itself
whether the tests are enough. The existing test gates judge only by the FACT of
passing, never by COMPLETENESS: ``check_ci_green`` and ``check_tests_passed`` and
the merge-gate re-test all look at a pytest exit code. None of them notices "300
lines of new code, 0 tests". Across a batch autonomous run (ORCH-088) that means a
monotonic erosion of coverage — every task shaves a corner on tests and the project
silently loses testability.
This module provides the deterministic (no-LLM) primitives that the quality-gate
``check_coverage_gate`` (src/qg/checks.py) composes on the ``deploy-staging ->
deploy`` edge — run **AFTER the merge-gate** (so coverage is measured on the
caught-up HEAD that actually lands in ``main``) and **BEFORE image-freshness** (fail
before the expensive docker rebuild), mirroring the security-gate (ORCH-022):
* ``measure_coverage`` -> run ``pytest --cov=src`` in the per-branch
worktree (offline) -> line coverage ``%`` or
``None`` on tool error.
* ``compute_coverage_verdict`` -> pure: compare (measured, baseline, floor) under
a policy + epsilon -> ``(ok, reason)``.
* ``write_coverage_report`` / ``parse_coverage_status`` -> write the
``18-coverage-report.md`` artefact and read its machine verdict back (single
source of truth: the gate returns exactly the frontmatter it wrote, AC-9).
* ``ratchet_baseline_on_merge`` -> on a CONFIRMED merge (``_handle_merge_verify``,
``deploy -> done`` edge) raise the per-repo baseline UP from the merged branch's
measured coverage (atomic compare-and-set, never decreases — FR-4 / D5).
* ``check_coverage_gate`` -> the orchestrating entry the QG wrapper delegates
to.
Invariants (ADR-001 §7, never broken):
* **Tool error -> fail-open + WARNING by default** (FR-6/AC-6): a coverage-tool
failure / unparseable metric degrades fail-open (anti-loop, precedent
ORCH-061/022 dep-audit); ``coverage_tool_fail_closed`` flips it to strict.
* **never-raise** (AC-7): any internal error is swallowed; an exception never
escapes into ``advance_stage``.
* **Baseline never decreases** (FR-4): the ratchet is an atomic SQL compare-and-set
under the held merge-lease (ORCH-043), so two parallel merges can never lower or
lose the value.
* **Self-hosting safety** (AC-7): the gate only measures / reads / writes the
artefact / decides. It never calls the deploy hook, never restarts the prod
container, never pushes / force-pushes ``main``.
This module is a **leaf**: it imports only ``config`` / ``git_worktree`` and lazily
``qg.checks.is_self_hosting_repo`` / ``db`` / ``notifications``; it never imports
``stage_engine``.
"""
import json
import logging
import os
import subprocess
from .config import settings
from .git_worktree import ensure_worktree, get_worktree_path
logger = logging.getLogger("orchestrator.coverage_gate")
# ---------------------------------------------------------------------------
# Conditionality (mirrors security_gate_applies / _merge_gate_applies)
# ---------------------------------------------------------------------------
def coverage_gate_applies(repo: str) -> bool:
"""Whether the coverage-gate is REAL for this repo (conditional rollout).
Mirrors the ORCH-22 / ORCH-43 / ORCH-58 pattern:
* ``coverage_gate_enabled=False`` -> always False (kill-switch; pipeline is
1:1 as before ORCH-027 for everyone).
* ``coverage_gate_repos`` (CSV) non-empty -> real only for the listed repos.
* empty CSV -> real ONLY for the self-hosting repo (``orchestrator``).
Never raises (AC-7): any error -> False (the safe no-op default).
"""
try:
if not settings.coverage_gate_enabled:
return False
raw = (settings.coverage_gate_repos or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
return (repo or "").strip().lower() in allowed
# Lazy import keeps this module a leaf (no qg import at module load).
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("coverage_gate_applies error for %s: %s", repo, e)
return False
# ---------------------------------------------------------------------------
# Measurement (pytest --cov=src in the per-branch worktree) — FR-1 / D2
# ---------------------------------------------------------------------------
def parse_coverage_percent(data) -> float | None:
"""Pure: extract ``totals.percent_covered`` (line coverage ``%``) from a
coverage.py JSON dict. Returns ``None`` if the shape is missing / unparseable.
Never raises.
"""
try:
if not isinstance(data, dict):
return None
totals = data.get("totals")
if not isinstance(totals, dict):
return None
pct = totals.get("percent_covered")
if pct is None:
return None
return float(pct)
except (TypeError, ValueError):
return None
def measure_coverage(repo: str, branch: str) -> float | None:
"""Run ``pytest --cov=src`` in the per-branch worktree -> line coverage ``%``.
Scope is ``src/`` only (the tests themselves are out of scope, BRD §«Вне
объёма»). Offline — coverage needs no network. The measurer is intentionally
encapsulated here so the pure decision logic and the baseline storage are
stack-agnostic (a future jest/jacoco measurer is a new ``measure_*`` branch,
BR-6).
The coverage metric is read from the ``--cov-report=json`` file regardless of
the pytest exit code: a non-zero exit because of *failing tests* is already
caught upstream (``check_ci_green`` / merge-gate re-test), and a partial run
still produces a meaningful coverage JSON. A genuine tool error (missing
plugin / timeout / no JSON / unparseable) -> ``None`` (the caller degrades
fail-open by default, FR-6). Never raises (AC-7).
"""
try:
wt = ensure_worktree(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("measure_coverage: worktree error for %s/%s: %s", repo, branch, e)
return None
cov_json = os.path.join(wt, ".coverage-report.json")
# Remove a stale report so we never read a previous pass's metric.
try:
if os.path.isfile(cov_json):
os.remove(cov_json)
except OSError:
pass
cmd = [
"python", "-m", "pytest", "tests/",
"--cov=src",
f"--cov-report=json:{cov_json}",
"--cov-report=", # suppress the terminal cov report (json only)
"-q",
]
timeout = settings.coverage_run_timeout_s
try:
subprocess.run(cmd, cwd=wt, capture_output=True, text=True, timeout=timeout)
except subprocess.TimeoutExpired:
logger.warning(
"measure_coverage: pytest --cov timed out after %ss for %s/%s",
timeout, repo, branch,
)
return None
except FileNotFoundError:
logger.warning(
"measure_coverage: pytest / pytest-cov not available for %s/%s", repo, branch
)
return None
except (subprocess.SubprocessError, OSError) as e:
logger.warning("measure_coverage: pytest --cov error for %s/%s: %s", repo, branch, e)
return None
data = None
try:
if not os.path.isfile(cov_json):
logger.warning(
"measure_coverage: no coverage json produced for %s/%s", repo, branch
)
return None
with open(cov_json, "r", encoding="utf-8") as f:
data = json.load(f)
except (OSError, ValueError) as e:
logger.warning(
"measure_coverage: cannot parse coverage json for %s/%s: %s", repo, branch, e
)
return None
finally:
try:
if os.path.isfile(cov_json):
os.remove(cov_json)
except OSError:
pass
return parse_coverage_percent(data)
# ---------------------------------------------------------------------------
# Pure decision (FR-2 / D3) — the core of the unit tests
# ---------------------------------------------------------------------------
def compute_coverage_verdict(measured, baseline, floor, policy, epsilon) -> tuple[bool, str]:
"""Pure: decide PASS/FAIL from (measured, baseline, floor, policy, epsilon).
Deterministic, no LLM, no I/O. Returns ``(ok: bool, reason: str)``.
* ``policy = "absolute"`` -> PASS ⇔ ``measured >= floor - epsilon``.
* ``policy = "baseline"`` -> PASS ⇔ ``measured >= baseline - epsilon``.
* ``policy = "both"`` (default) -> PASS ⇔ BOTH conditions hold.
* ``baseline is None`` (no stored baseline / bootstrap) -> the baseline
condition does NOT apply (cannot regress against nothing); only the
absolute part decides. For ``policy = "baseline"`` with no baseline this is
a bootstrap PASS (the measured value seeds the baseline at merge, D5).
* ``epsilon`` — a small non-negative tolerance so jitter at the boundary does
not bounce a task (NFR-4).
Never raises: bad inputs -> ``(False, reason)`` (a verdict cannot be computed ->
conservative FAIL for the pure function; the orchestrating entry maps a *tool*
error to fail-open separately).
"""
try:
pol = (policy or "both").strip().lower()
eps = max(0.0, float(epsilon if epsilon is not None else 0.0))
m = float(measured)
except (TypeError, ValueError) as e:
return False, f"coverage verdict: bad inputs ({e})"
abs_applicable = pol in ("absolute", "both")
base_applicable = pol in ("baseline", "both") and baseline is not None
checks: list[str] = []
ok = True
if abs_applicable:
try:
f = float(floor if floor is not None else 0.0)
except (TypeError, ValueError):
f = 0.0
abs_ok = m >= f - eps
checks.append(
f"absolute {m:.2f}% >= floor {f:.2f}%-eps{eps:.2f} -> "
f"{'PASS' if abs_ok else 'FAIL'}"
)
ok = ok and abs_ok
if base_applicable:
b = float(baseline)
base_ok = m >= b - eps
checks.append(
f"baseline {m:.2f}% >= base {b:.2f}%-eps{eps:.2f} -> "
f"{'PASS' if base_ok else 'FAIL'}"
)
ok = ok and base_ok
elif pol in ("baseline", "both") and baseline is None:
checks.append("baseline N/A (bootstrap — no stored baseline)")
body = "; ".join(checks) if checks else "no applicable condition (bootstrap) -> PASS"
reason = f"measured={m:.2f}% policy={pol} eps={eps:.2f}: {body}"
return ok, reason
def compute_delta(measured, baseline, floor) -> float:
"""Pure: signed ``measured - max(applicable references)`` (%, 2 decimals).
References are the present ones among ``baseline`` / ``floor``. With neither ->
``0.0``. Never raises.
"""
try:
m = float(measured)
refs = []
if baseline is not None:
refs.append(float(baseline))
if floor is not None:
refs.append(float(floor))
if not refs:
return 0.0
return round(m - max(refs), 2)
except (TypeError, ValueError):
return 0.0
# ---------------------------------------------------------------------------
# Artefact: write the report, read the machine verdict back (FR-7 / D7 / AC-9)
# ---------------------------------------------------------------------------
def _report_rel(work_item_id: str) -> str:
return f"docs/work-items/{work_item_id}/18-coverage-report.md"
def _report_path(repo: str, work_item_id: str, branch: str) -> str:
"""Absolute path of 18-coverage-report.md inside the task worktree."""
try:
wt = get_worktree_path(repo, branch)
if not os.path.isdir(wt):
wt = ensure_worktree(repo, branch)
except Exception: # noqa: BLE001 - never-raise; fall back to shared clone
wt = os.path.join(settings.repos_dir, repo)
return os.path.join(wt, _report_rel(work_item_id))
def _num(v) -> str:
"""Render a numeric field with 2 decimals, or empty for None/unparseable."""
if v is None:
return ""
try:
return f"{float(v):.2f}"
except (TypeError, ValueError):
return ""
def render_coverage_report(work_item_id: str, fields: dict) -> str:
"""Pure: render the 18-coverage-report.md content (frontmatter + body).
The machine verdict lives ONLY in the YAML frontmatter ``coverage_status:``
(canon, regiser-sensitive); ``measured_coverage`` is the single source of truth
for the ratchet (D5). Never raises.
"""
baseline = fields.get("baseline")
baseline_str = "" if baseline is None else _num(baseline)
return (
"---\n"
f"coverage_status: {fields.get('coverage_status', 'FAIL')}\n"
f"work_item: {work_item_id}\n"
f"measured_coverage: {_num(fields.get('measured_coverage'))}\n"
f"baseline: {baseline_str}\n"
f"floor: {_num(fields.get('floor'))}\n"
f"policy: {fields.get('policy', 'both')}\n"
f"epsilon: {_num(fields.get('epsilon'))}\n"
f"delta: {_num(fields.get('delta'))}\n"
"---\n"
f"# Coverage Report — {work_item_id}\n\n"
"Детерминированный гейт покрытия (ORCH-027) — под-гейт ребра "
"`deploy-staging→deploy` (ПОСЛЕ merge-gate, ДО image-freshness). Машинный "
"вердикт читается ТОЛЬКО из `coverage_status:` frontmatter выше.\n\n"
"## Verdict\n"
f"{fields.get('reason', '')}\n\n"
"## Measurement\n"
f"{fields.get('measurement', '')}\n\n"
"## Policy\n"
f"{fields.get('policy_detail', '')}\n"
)
def write_coverage_report(repo: str, work_item_id: str, branch: str, fields: dict) -> str:
"""Write 18-coverage-report.md into the task worktree; return its path.
Best-effort / never-raise: a write error is logged and the path is still
returned (the caller's read-back then fails closed)."""
path = _report_path(repo, work_item_id, branch)
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(render_coverage_report(work_item_id, fields))
except OSError as e:
logger.error("write_coverage_report error for %s/%s: %s", repo, work_item_id, e)
return path
def parse_coverage_status(content: str) -> tuple[bool, str]:
"""Map a 18-coverage-report.md body to a quality-gate verdict by reading ONLY
the machine-readable ``coverage_status:`` YAML frontmatter — never the prose.
Mirrors ``parse_security_status`` (canon: machine verdict only from frontmatter,
AC-9). The negative token (FAIL) is authoritative (checked first). Returns:
* ``coverage_status: PASS`` -> ``(True, "Coverage status: PASS")``
* ``coverage_status: FAIL`` -> ``(False, "Coverage status: FAIL")``
* missing field / no frontmatter / bad YAML -> ``(False, <reason>)``.
Parse delegated to the unified ``frontmatter.parse_frontmatter`` primitive
(ORCH-052c single source of YAML-frontmatter logic).
"""
from .frontmatter import parse_frontmatter
parse = parse_frontmatter(content)
if parse.yaml_error is not None:
return False, f"Invalid YAML frontmatter in coverage report: {parse.yaml_error}"
status = None
if parse.has_block and not parse.malformed:
status = str(parse.data.get("coverage_status", "")).upper().strip()
if status == "FAIL":
return False, "Coverage status: FAIL"
if status == "PASS":
return True, "Coverage status: PASS"
return False, f"No machine-readable coverage_status in frontmatter (got: {status!r})"
def read_measured_coverage(content: str) -> float | None:
"""Read ``measured_coverage`` (%, float) from a 18-coverage-report.md body via
the unified frontmatter parser. ``None`` when absent / unparseable (ratchet then
no-ops). Never raises.
"""
try:
from .frontmatter import parse_frontmatter
parse = parse_frontmatter(content)
if not parse.has_block or parse.malformed:
return None
raw = parse.data.get("measured_coverage")
if raw is None or (isinstance(raw, str) and not raw.strip()):
return None
return float(raw)
except (TypeError, ValueError):
return None
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("read_measured_coverage error: %s", e)
return None
def _error_fields(work_item_id, floor, policy, epsilon, baseline, *, fail_closed: bool) -> dict:
"""Build the report fields for a tool-error pass (FR-6)."""
status = "FAIL" if fail_closed else "PASS"
mode = "fail-closed (FAIL)" if fail_closed else "fail-open (WARNING)"
return {
"coverage_status": status,
"measured_coverage": None,
"baseline": baseline,
"floor": floor,
"policy": policy,
"epsilon": epsilon,
"delta": None,
"reason": f"coverage measurement failed -> {mode}",
"measurement": (
"coverage tool error / unparseable metric "
f"(coverage_tool_fail_closed={fail_closed})"
),
"policy_detail": f"policy={policy}, floor={floor}, baseline={baseline}, epsilon={epsilon}",
}
# ---------------------------------------------------------------------------
# Ratchet baseline UP on a confirmed merge (FR-4 / D5)
# ---------------------------------------------------------------------------
def ratchet_baseline_on_merge(repo: str, work_item_id: str, branch: str, sha: str | None = None) -> bool:
"""Raise the per-repo coverage baseline UP from the merged branch's measured
coverage. Called from ``_handle_merge_verify`` (deploy -> done edge) AFTER the
merge is confirmed and BEFORE the task advances to ``done`` (D5).
Reads the measured value from ``18-coverage-report.md`` (single source of truth
— the exact metric the gate wrote on the deploy-staging->deploy edge) and applies
an atomic compare-and-set (``db.ratchet_coverage_baseline``) that never lowers
the baseline. Bootstrap: the first applicable merge seeds the baseline.
Returns True iff the baseline was inserted/raised. never-raise (AC-7): any error
-> False (observability best-effort; a ratchet failure must never break the
deploy->done path).
"""
try:
if not coverage_gate_applies(repo):
return False
path = _report_path(repo, work_item_id, branch)
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
except OSError as e:
logger.warning(
"ratchet: cannot read coverage report for %s/%s: %s", repo, work_item_id, e
)
return False
measured = read_measured_coverage(content)
if measured is None:
logger.warning(
"ratchet: no measured_coverage in report for %s/%s", repo, work_item_id
)
return False
from . import db
updated = db.ratchet_coverage_baseline(repo, measured, sha)
if updated:
logger.info(
"coverage baseline ratcheted for %s -> %.2f%% (sha=%s)", repo, measured, sha
)
else:
logger.info(
"coverage baseline unchanged for %s (measured %.2f%% not above current)",
repo, measured,
)
return updated
except Exception as e: # noqa: BLE001 - never-raise contract
logger.error("ratchet_baseline_on_merge error for %s/%s: %s", repo, work_item_id, e)
return False
# ---------------------------------------------------------------------------
# Orchestrating entry — delegated to by qg.checks.check_coverage_gate
# ---------------------------------------------------------------------------
def check_coverage_gate(repo: str, work_item_id: str, branch: str) -> tuple[bool, str]:
"""ORCH-027 coverage-gate on the deploy-staging -> deploy edge (after merge-gate).
Deterministic, no LLM. Algorithm (ADR-001 D1..D7):
1. Conditionality: ``coverage_gate_enabled=False`` -> ``(True, "...disabled")``;
a repo the gate is not real for -> ``(True, "coverage-gate N/A for <repo>")``.
2. ``measure_coverage`` (pytest --cov=src in the worktree). ``None`` (tool
error) -> fail-open + WARNING by default (``coverage_tool_fail_closed``
flips to FAIL), FR-6.
3. ``compute_coverage_verdict`` -> write ``18-coverage-report.md`` -> read the
verdict BACK via ``parse_coverage_status`` (single source of truth: the
returned verdict == the artefact frontmatter, AC-9).
4. FAIL -> ``(False, reason)`` (engine rolls back to ``development`` + releases
the merge lease); PASS -> ``(True, reason)`` (engine proceeds to
image-freshness).
Never-raise (AC-7): any internal error -> a (bool, reason) pair following the
fail-open default (so an unexpected fault never wedges the autonomous pipeline),
unless ``coverage_tool_fail_closed`` is set.
"""
floor = getattr(settings, "coverage_min_percent", 0.0)
policy = getattr(settings, "coverage_policy", "both")
epsilon = getattr(settings, "coverage_epsilon", 0.5)
try:
if not settings.coverage_gate_enabled:
return True, "coverage-gate disabled"
if not coverage_gate_applies(repo):
return True, f"coverage-gate N/A for {repo}"
from . import db
try:
baseline = db.get_coverage_baseline(repo)
except Exception as e: # noqa: BLE001 - baseline read best-effort
logger.warning("coverage-gate: baseline read error for %s: %s", repo, e)
baseline = None
measured = measure_coverage(repo, branch)
if measured is None:
fail_closed = bool(settings.coverage_tool_fail_closed)
fields = _error_fields(
work_item_id, floor, policy, epsilon, baseline, fail_closed=fail_closed
)
write_coverage_report(repo, work_item_id, branch, fields)
if fail_closed:
logger.warning(
"coverage-gate %s/%s: measurement failed -> fail-CLOSED (FAIL)",
repo, work_item_id,
)
return False, "coverage-gate fail-closed: measurement failed (tool error)"
logger.warning(
"coverage-gate %s/%s: measurement failed -> fail-OPEN + WARNING",
repo, work_item_id,
)
return True, "coverage-gate fail-open (WARNING): measurement failed (tool error)"
ok, reason = compute_coverage_verdict(measured, baseline, floor, policy, epsilon)
delta = compute_delta(measured, baseline, floor)
fields = {
"coverage_status": "PASS" if ok else "FAIL",
"measured_coverage": measured,
"baseline": baseline,
"floor": floor,
"policy": policy,
"epsilon": epsilon,
"delta": delta,
"reason": reason,
"measurement": f"pytest --cov=src: line coverage src/ = {measured:.2f}%",
"policy_detail": (
f"policy={policy}, floor={floor}%, "
f"baseline={'bootstrap' if baseline is None else f'{baseline:.2f}%'}, "
f"epsilon={epsilon}%"
),
}
path = write_coverage_report(repo, work_item_id, branch, fields)
# Read the machine verdict back from the artefact we just wrote — so the
# returned (bool, reason) is guaranteed == the YAML frontmatter (AC-9).
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
except OSError as e:
return False, f"cannot read coverage report (fail-closed): {e}"
verdict_ok, _v = parse_coverage_status(content)
if verdict_ok:
logger.info("coverage-gate passed for %s/%s: %s", repo, work_item_id, reason)
return True, f"coverage OK ({reason})"
# FAIL -> surface loudly (Telegram with the clickable issue number, FR-7).
try:
from .notifications import send_telegram, link_for
base_str = "n/a" if baseline is None else f"{baseline:.2f}%"
send_telegram(
f"\U0001f4c9 {link_for(work_item_id)}: coverage-гейт FAIL — измерено "
f"{measured:.2f}% (floor {floor}%, baseline {base_str}, "
f"delta {delta:+.2f}%). Откат на development для доработки тестов."
)
except Exception as e: # noqa: BLE001 - telegram best-effort
logger.warning("coverage-gate FAIL telegram failed: %s", e)
return False, reason
except Exception as e: # noqa: BLE001 - never-raise contract (AC-7)
logger.error("check_coverage_gate error for %s/%s: %s", repo, branch, e)
# An unexpected internal error follows the fail-open default (anti-loop): a
# coverage-tool/logic fault must not wedge the autonomous pipeline. The
# operator can flip coverage_tool_fail_closed to make it strict.
try:
if settings.coverage_tool_fail_closed:
return False, f"coverage-gate error (fail-closed): {e}"
except Exception: # noqa: BLE001
pass
return True, f"coverage-gate error (fail-open): {e}"
# ---------------------------------------------------------------------------
# Observability snapshot for GET /queue (FR-7 / AC-9)
# ---------------------------------------------------------------------------
def snapshot() -> dict:
"""Read-only coverage-gate summary for GET /queue (FR-7 / AC-9).
Additive block; existing /queue keys are untouched. never-raise: any error ->
a minimal dict with the flags.
"""
try:
enabled = bool(settings.coverage_gate_enabled)
except Exception: # noqa: BLE001
enabled = False
out = {
"enabled": enabled,
"repos": getattr(settings, "coverage_gate_repos", "") or "",
"policy": getattr(settings, "coverage_policy", "both"),
"floor": getattr(settings, "coverage_min_percent", 0.0),
"epsilon": getattr(settings, "coverage_epsilon", 0.5),
"fail_closed": bool(getattr(settings, "coverage_tool_fail_closed", False)),
"baselines": {},
}
try:
from . import db
out["baselines"] = db.all_coverage_baselines()
except Exception as e: # noqa: BLE001 - never-raise -> empty baselines
logger.warning("coverage snapshot baselines error: %s", e)
return out

128
src/db.py
View File

@@ -199,10 +199,138 @@ def init_db():
CREATE INDEX IF NOT EXISTS idx_repo_freeze_active
ON repo_freeze (repo, cleared_at);
""")
# ORCH-027 (FR-4, ADR-001 D4): additive per-repo coverage baseline for the
# coverage-gate ratchet. One row per repo; the baseline is monotonically
# non-decreasing via ratchet_coverage_baseline (atomic compare-and-set). Purely
# ADDITIVE (CREATE TABLE IF NOT EXISTS, pattern repo_freeze/job_deps) ->
# idempotent, restart-safe on the shared prod DB; existing tables untouched
# (NFR-5). See docs/work-items/ORCH-027/08-data-requirements.md.
conn.executescript("""
CREATE TABLE IF NOT EXISTS coverage_baseline (
repo TEXT PRIMARY KEY,
coverage REAL NOT NULL,
source_sha TEXT,
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
""")
conn.commit()
conn.close()
def get_coverage_baseline(repo: str) -> float | None:
"""ORCH-027: read the per-repo coverage baseline (%, line coverage).
Returns ``None`` when no baseline is stored yet (bootstrap mode — the gate then
decides on the absolute floor only, D3). Raises only on a real DB error (the
coverage_gate leaf caller wraps this in its never-raise contract).
"""
if not repo:
return None
conn = get_db()
try:
row = conn.execute(
"SELECT coverage FROM coverage_baseline WHERE repo = ?", (repo,)
).fetchone()
finally:
conn.close()
if row is None:
return None
try:
return float(row["coverage"])
except (TypeError, ValueError):
return None
def ratchet_coverage_baseline(repo: str, coverage: float, sha: str | None = None) -> bool:
"""ORCH-027 (FR-4, D5): raise the per-repo coverage baseline UP, never down.
Atomic compare-and-set: ``UPDATE ... WHERE coverage <= ?`` (the baseline never
decreases — an equal value is an idempotent no-harm re-stamp), or ``INSERT`` when
no row exists yet (bootstrap). Under the held merge-lease (ORCH-043) plus this
single-statement guard, two parallel merges can never lower or lose the value.
Returns True iff a row was inserted or raised.
"""
if not repo:
return False
try:
cov = float(coverage)
except (TypeError, ValueError):
return False
conn = get_db()
try:
cur = conn.execute(
"UPDATE coverage_baseline "
"SET coverage = ?, source_sha = ?, updated_at = datetime('now') "
"WHERE repo = ? AND coverage <= ?",
(cov, sha, repo, cov),
)
changed = cur.rowcount or 0
if changed == 0:
# No row updated: either the row is absent (bootstrap INSERT) or the
# existing baseline is already higher (skip — never lower it).
exists = conn.execute(
"SELECT 1 FROM coverage_baseline WHERE repo = ?", (repo,)
).fetchone()
if exists is None:
conn.execute(
"INSERT INTO coverage_baseline (repo, coverage, source_sha, updated_at) "
"VALUES (?, ?, ?, datetime('now'))",
(repo, cov, sha),
)
changed = 1
conn.commit()
return bool(changed)
finally:
conn.close()
def set_coverage_baseline(repo: str, coverage: float, sha: str | None = None) -> bool:
"""ORCH-027 (D8): UNCONDITIONALLY set the per-repo coverage baseline.
For a legitimate one-off coverage drop (e.g. removing a large tested module) via
the manual ``POST /coverage/baseline`` override. Unlike ``ratchet_coverage_baseline``
this CAN lower the baseline. Returns True on success.
"""
if not repo:
return False
try:
cov = float(coverage)
except (TypeError, ValueError):
return False
conn = get_db()
try:
conn.execute(
"INSERT INTO coverage_baseline (repo, coverage, source_sha, updated_at) "
"VALUES (?, ?, ?, datetime('now')) "
"ON CONFLICT(repo) DO UPDATE SET coverage = excluded.coverage, "
"source_sha = excluded.source_sha, updated_at = excluded.updated_at",
(repo, cov, sha),
)
conn.commit()
return True
finally:
conn.close()
def all_coverage_baselines() -> dict:
"""ORCH-027: all per-repo coverage baselines for the GET /queue snapshot."""
conn = get_db()
try:
rows = conn.execute(
"SELECT repo, coverage, source_sha, updated_at FROM coverage_baseline"
).fetchall()
finally:
conn.close()
return {
r["repo"]: {
"coverage": r["coverage"],
"source_sha": r["source_sha"],
"updated_at": r["updated_at"],
}
for r in rows
}
def _ensure_column(conn, table: str, column: str, decl: str):
"""Add a column to `table` if it does not already exist (idempotent migration)."""
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]

View File

@@ -170,6 +170,7 @@ async def queue():
from . import merge_gate
from . import task_deps
from . import serial_gate
from . import coverage_gate
from . import labels
from . import cancel
from .disk_watchdog import disk_watchdog
@@ -189,6 +190,9 @@ async def queue():
# ORCH-088 (D9 / AC-10): per-repo serial-gate observability (read-only) —
# active task, queued/waiting analyst-jobs, freeze state. Additive block.
"serial_gate": serial_gate.snapshot(),
# ORCH-027 (FR-7 / AC-9): coverage-gate observability (read-only) —
# kill-switch, scope, policy/floor/epsilon, per-repo baselines. Additive block.
"coverage": coverage_gate.snapshot(),
# ORCH-089 (D7): auto-mode-by-label observability (read-only) — kill-switch,
# label names, scope. Additive block.
"auto_labels": labels.snapshot(),
@@ -236,3 +240,23 @@ async def serial_gate_unfreeze(repo: str = ""):
except Exception:
pass
return {"ok": True, "repo": repo, "cleared": cleared, "frozen": frozen}
@app.post("/coverage/baseline")
async def coverage_set_baseline(repo: str = "", value: float | None = None):
"""ORCH-027 (D8): manually set/override the per-repo coverage baseline.
For a legitimate one-off coverage drop (e.g. removing a large tested module) the
operator sets the baseline directly here (by образцу ``POST /serial-gate/unfreeze``)
instead of waiting for the upward-only ratchet. Unlike the ratchet this CAN lower
the baseline. Alternative without this endpoint: temporarily flip
``ORCH_COVERAGE_POLICY=absolute``.
"""
from . import db
if not repo or not repo.strip():
return {"ok": False, "error": "missing 'repo'", "repo": repo}
if value is None:
return {"ok": False, "error": "missing 'value'", "repo": repo}
repo = repo.strip()
ok = db.set_coverage_baseline(repo, value, sha="manual-override")
return {"ok": ok, "repo": repo, "baseline": db.get_coverage_baseline(repo)}

View File

@@ -755,6 +755,23 @@ def check_security_gate(repo: str, work_item_id: str, branch: str) -> tuple[bool
return _impl(repo, work_item_id, branch)
def check_coverage_gate(repo: str, work_item_id: str, branch: str) -> tuple[bool, str]:
"""ORCH-027 coverage sub-gate (pytest --cov=src) on the deploy-staging -> deploy
edge, run AFTER the merge-gate (caught-up HEAD) and BEFORE image-freshness.
Thin registry wrapper that delegates to ``coverage_gate.check_coverage_gate``
(measure line coverage of src/, compare to floor/baseline under a policy, write/
read-back ``18-coverage-report.md``). The real logic lives in
``src/coverage_gate.py`` (leaf module, never-raise, fail-open on a tool error by
default); importing it lazily here avoids an import cycle (coverage_gate imports
is_self_hosting_repo from this module). For non-self repos with an empty scope it
returns ``(True, "coverage-gate N/A for <repo>")`` so the deploy edge is unchanged
for them (AC-5).
"""
from ..coverage_gate import check_coverage_gate as _impl
return _impl(repo, work_item_id, branch)
# Registry for dynamic lookup by name
QG_CHECKS = {
"check_analysis_approved": check_analysis_approved,
@@ -770,4 +787,5 @@ QG_CHECKS = {
"check_branch_mergeable": check_branch_mergeable,
"check_staging_image_fresh": _check_staging_image_fresh,
"check_security_gate": check_security_gate,
"check_coverage_gate": check_coverage_gate,
}

View File

@@ -322,6 +322,19 @@ def advance_stage(
):
return result
# --- ORCH-027 coverage sub-gate (deploy-staging -> deploy edge) ----
# AFTER the merge-gate (coverage measured on the caught-up HEAD that
# lands in `main`, so the metric matches landed code) and BEFORE the
# image-freshness rebuild (fail before the expensive docker rebuild).
# Deterministic (no LLM): pytest --cov=src -> line coverage % vs floor /
# ratchet baseline. FAIL -> rollback to development + release the merge
# lease (held by the merge-gate's PASS). It owns the outcome on
# intervention (mirrors the merge-gate / image-freshness).
if _handle_coverage_gate(
task_id, current_stage, repo, work_item_id, branch, agent, result
):
return result
# --- ORCH-058 freshness sub-gate (deploy-staging -> deploy edge) ---
# AFTER the merge-gate finalised the validated HEAD and BEFORE Phase A.
# Rebuilds the staging image from that validated commit + recreates 8501
@@ -1124,6 +1137,90 @@ def _handle_security_gate(
return True
# ---------------------------------------------------------------------------
# ORCH-027: coverage sub-gate on the deploy-staging -> deploy edge
# ---------------------------------------------------------------------------
def _handle_coverage_gate(
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
) -> bool:
"""Run check_coverage_gate on the deploy-staging -> deploy edge (ORCH-027).
Runs AFTER the merge-gate (so coverage is measured on the rebased/caught-up HEAD
that actually lands in `main`) and BEFORE the image-freshness rebuild (fail before
the expensive docker rebuild). Deterministic (no LLM): pytest --cov=src in the
per-branch worktree -> line coverage % -> compute_coverage_verdict vs the absolute
floor and/or the ratchet baseline. The machine verdict lives in
18-coverage-report.md frontmatter. A coverage-tool error degrades fail-open +
WARNING by default (FR-6), so an infra hiccup never wedges the autonomous pipeline.
Returns True if the gate INTERVENED (the caller must return without advancing):
* FAIL (coverage below policy) -> ROLLBACK to development (+ developer retry,
capped by MAX_DEVELOPER_RETRIES) and RELEASE the merge lease (the merge-gate
held it on its PASS; coverage failed before the merge — mirrors the
image-freshness rollback, ADR-001 D1/TR-2).
Returns False when the gate PASSED (clean / fail-open / N/A) so advance_stage
proceeds to the image-freshness sub-gate. On a PASS the merge lease stays HELD
until the actual merge (released on done / rollback).
"""
passed, reason = _run_qg("check_coverage_gate", repo, work_item_id, branch)
if passed:
logger.info(f"Task {task_id}: coverage-gate passed ({reason})")
return False
result.qg_name = "check_coverage_gate"
result.qg_passed = False
result.qg_reason = reason
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
set_issue_in_progress(work_item_id)
# The merge-gate held the lease on its PASS; coverage failed before the merge, so
# release it (holder-aware no-op if a different task already owns it). Mirrors the
# image-freshness rollback (ADR-001 D1/TR-2).
try:
merge_gate.release_merge_lease(repo, branch)
except Exception as e: # noqa: BLE001 - defensive
logger.warning(f"Task {task_id}: merge-lease release on coverage fail failed: {e}")
notify_qg_failure(task_id, current_stage, "check_coverage_gate", reason)
plane_add_comment(
work_item_id,
f"❌ Coverage-гейт провален ({reason}). Откат на development. "
f"Developer нужен для добавления тестов (покрытие src/ просело).",
author="deployer",
)
retry_count = _developer_retry_count(task_id)
if retry_count < MAX_DEVELOPER_RETRIES:
report_ref = f"docs/work-items/{work_item_id}/18-coverage-report.md"
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: Coverage-гейт провален "
f"(attempt {retry_count + 1}/{MAX_DEVELOPER_RETRIES}). "
f"Причина: {reason}. Добавь тесты, чтобы покрытие src/ не падало ниже "
f"политики. Полный отчёт: {report_ref}"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
result.enqueued_agent = "developer"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: coverage-gate FAILED, enqueued developer (job_id={new_job})"
)
else:
set_issue_blocked(work_item_id)
send_telegram(
f"\U0001f6a8 {link_for(work_item_id)}: Coverage-гейт still failing after "
f"{MAX_DEVELOPER_RETRIES} developer retries ({reason}). "
f"Manual intervention needed."
)
result.alerted = True
logger.error(
f"Task {task_id}: coverage-gate FAILED, rolled back deploy-staging -> "
f"development ({reason})"
)
return True
# ---------------------------------------------------------------------------
# ORCH-058: staging-image freshness sub-gate on the deploy-staging -> deploy edge
# ---------------------------------------------------------------------------
@@ -1546,6 +1643,17 @@ def _handle_merge_verify(task_id, repo, work_item_id, branch, result: AdvanceRes
task_id, repo, work_item_id, branch, guard_msg, result
)
# ORCH-027 (D5): ratchet the per-repo coverage baseline UP from this
# merged branch's measured coverage (single source of truth:
# 18-coverage-report.md). Atomic compare-and-set under the still-held
# merge-lease -> the baseline never decreases. never-raise (observability
# best-effort): a ratchet failure must never break the deploy->done path.
try:
from . import coverage_gate
coverage_gate.ratchet_baseline_on_merge(repo, work_item_id, branch, sha)
except Exception as e: # noqa: BLE001 - observability best-effort
logger.warning(f"Task {task_id}: coverage baseline ratchet failed: {e}")
merge_gate.note_merge_verified()
try:
self_deploy.record_merged_to_main(repo, work_item_id, branch, True)