src/frontmatter.py grows from a single-key reader into the full machine
contract: reader (read_frontmatter_value, unchanged), one parse primitive
(parse_frontmatter), writer (render/write_frontmatter), schema validator
(validate_schema/REQUIRED_FIELDS, warning-only by default) and a shared
strip_frontmatter helper. The five verdict gates (check_reviewer_verdict,
_parse_tests_verdict, _parse_deploy_status, _parse_staging_status,
parse_security_status) now read through the single parse_frontmatter point
instead of duplicated ad-hoc YAML logic; review_parse._strip_frontmatter and
security_gate.extract_security_findings reuse the shared helper.
Strictly backward compatible + never-raise: STAGE_TRANSITIONS, the QG_CHECKS
composition, verdict semantics (incl. ORCH-047 three-field tester + negative
token priority), reason-strings and worktree->origin/main fallback are 1:1.
The schema validator never influences a gate verdict by default; hard-fail is
reserved behind the frontmatter_validation_strict kill-switch (default False).
New formal handoff spec docs/_standards/HANDOFF_PROTOCOL.md ("stage -> required
output" + required frontmatter schema), aligned 1:1 with PIPELINE_DOCS.md.
Tests: test_frontmatter.py (TC-01..07), test_qg_verdicts.py (TC-08..15),
test_security_gate.py (TC-12), test_stages_invariants.py (TC-16). Full
tests/ green (1212).
Refs: ORCH-076
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
317 lines
15 KiB
Python
317 lines
15 KiB
Python
"""Unified YAML-frontmatter contract — reader + writer + schema validator (ORCH-52c).
|
|
|
|
History
|
|
-------
|
|
ORCH-016 introduced this module as a *single-key reader* (``read_frontmatter_value``)
|
|
for the status-comment hot path, intentionally duplicating ~10 lines of
|
|
YAML-frontmatter logic already present in ``src/qg/checks.py`` and
|
|
``src/security_gate.py`` to keep that PR's blast radius small. Its docstring noted
|
|
*"merging into a single parser is a follow-up task"* — this module (ORCH-52c /
|
|
ORCH-076) is that follow-up.
|
|
|
|
What this module now provides (ADR-001 / adr-0020)
|
|
--------------------------------------------------
|
|
A **single point of YAML-frontmatter parsing** that every verdict gate delegates to,
|
|
plus a writer and a (warning-only by default) schema validator:
|
|
|
|
* ``read_frontmatter_value(path, key)`` — UNCHANGED single-key reader (INV-3): the
|
|
external callers (``usage.py``, ``notifications.build_status_comment``) keep the
|
|
exact same contract (``str | None``, never-raise, strip, case preserved).
|
|
* ``parse_frontmatter(content)`` — the ONE YAML parse primitive; returns a
|
|
structured :class:`FrontmatterParse` so each gate can reproduce its current
|
|
reason-strings 1:1 (no-block / malformed / yaml-error / data).
|
|
* ``parse_frontmatter_dict`` / ``read_frontmatter`` — convenience shortcuts to the
|
|
parsed mapping (in-memory / from a file).
|
|
* ``render_frontmatter`` / ``write_frontmatter`` — canonical writer; the output is
|
|
byte-compatible with the existing ``split("---", 2)`` + ``yaml.safe_load`` parsers.
|
|
* ``validate_schema`` + ``REQUIRED_FIELDS`` + ``maybe_warn_schema`` — the machine
|
|
schema (``work_item / stage / author_agent / status / created_at / model_used``).
|
|
By default it is **warning-only** and never influences any gate's boolean verdict
|
|
(NFR-3 / INV-4); the strict mode is reserved for ORCH-52d and gated behind the
|
|
``frontmatter_validation_strict`` kill-switch (default ``False``).
|
|
* ``strip_frontmatter(content)`` — shared body-only helper (replaces the duplicated
|
|
``_strip_frontmatter`` in ``review_parse``).
|
|
|
|
Contract — the WHOLE module is **never-raise** (NFR-2), exactly like the original
|
|
reader: any error (I/O, YAML, serialization) is logged to ``logger.debug/warning``
|
|
and degrades to a safe value (``{}`` / ``False`` / the input text); an exception
|
|
never escapes into the pipeline. This is a hard self-hosting requirement: these
|
|
functions read verdicts ON THE GATES of the instance that serves prod for every
|
|
project from one shared DB/queue, so a regression here would stall every project.
|
|
|
|
This module is a **leaf**: it imports only ``logging`` (and lazily ``yaml``); it does
|
|
not import anything project-specific, so it stays cycle-free for ``qg/checks.py``,
|
|
``security_gate.py``, ``post_deploy.py`` and ``review_parse.py``.
|
|
"""
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from typing import Mapping
|
|
|
|
logger = logging.getLogger("orchestrator.frontmatter")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schema constants (the machine-checkable required frontmatter — FR-2 / D3)
|
|
# ---------------------------------------------------------------------------
|
|
#: The required frontmatter fields a stage handoff document is expected to carry.
|
|
#: Source of truth for HANDOFF_PROTOCOL.md §2. The validator is warning-only by
|
|
#: default (D3) — its presence does NOT gate the pipeline unless the
|
|
#: ``frontmatter_validation_strict`` kill-switch is flipped on (reserved, ORCH-52d).
|
|
REQUIRED_FIELDS = ("work_item", "stage", "author_agent", "status", "created_at", "model_used")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parse primitive — the SINGLE point of YAML-frontmatter logic (D1 / D2)
|
|
# ---------------------------------------------------------------------------
|
|
@dataclass(frozen=True)
|
|
class FrontmatterParse:
|
|
"""Structured outcome of parsing a document's leading YAML frontmatter.
|
|
|
|
The structure (not a bare dict) lets each gate reproduce its EXISTING
|
|
reason-strings 1:1 (ADR-001 D2): a gate can tell "no block" from "malformed"
|
|
from "yaml error" from "valid data" without re-implementing the parse.
|
|
|
|
Attributes:
|
|
data: the parsed mapping; ``{}`` when absent / malformed / not a mapping.
|
|
has_block: a leading ``---`` … ``---`` block was present.
|
|
malformed: the content started with ``---`` but had < 3 ``---``-split segments
|
|
(an unterminated frontmatter block).
|
|
yaml_error: the ``yaml.safe_load`` error text, else ``None``.
|
|
"""
|
|
|
|
data: dict = field(default_factory=dict)
|
|
has_block: bool = False
|
|
malformed: bool = False
|
|
yaml_error: str | None = None
|
|
|
|
|
|
def parse_frontmatter(content: str) -> FrontmatterParse:
|
|
"""Parse the leading YAML frontmatter of ``content`` into a :class:`FrontmatterParse`.
|
|
|
|
The single canonical implementation of the block that used to be duplicated in
|
|
every verdict gate (``content.startswith("---")`` -> ``split("---", 2)`` ->
|
|
``yaml.safe_load`` -> ``isinstance(dict)``). Never raises:
|
|
|
|
* not a string / no leading ``---`` -> ``has_block=False``, ``data={}``.
|
|
* ``---`` but < 3 segments (unterminated) -> ``malformed=True``, ``data={}``.
|
|
* ``yaml.safe_load`` error -> ``yaml_error=<text>``, ``data={}``.
|
|
* parsed value is not a mapping -> ``data={}`` (``has_block=True``).
|
|
* valid mapping -> ``data=<dict>``.
|
|
"""
|
|
try:
|
|
if not isinstance(content, str) or not content.startswith("---"):
|
|
return FrontmatterParse()
|
|
|
|
parts = content.split("---", 2)
|
|
if len(parts) < 3:
|
|
# Unterminated frontmatter block.
|
|
return FrontmatterParse(has_block=True, malformed=True)
|
|
|
|
try:
|
|
import yaml
|
|
loaded = yaml.safe_load(parts[1])
|
|
except Exception as e: # yaml.YAMLError + anything pyyaml may surface
|
|
logger.debug(f"parse_frontmatter: yaml parse failed: {e}")
|
|
return FrontmatterParse(has_block=True, yaml_error=str(e))
|
|
|
|
if not isinstance(loaded, dict):
|
|
return FrontmatterParse(has_block=True)
|
|
return FrontmatterParse(data=loaded, has_block=True)
|
|
except Exception as e: # noqa: BLE001 - never-raise contract
|
|
logger.debug(f"parse_frontmatter: unexpected error: {e}")
|
|
return FrontmatterParse()
|
|
|
|
|
|
def parse_frontmatter_dict(content: str) -> dict:
|
|
"""Shortcut: the parsed mapping of ``content``'s frontmatter. Never raises -> ``{}``."""
|
|
return parse_frontmatter(content).data
|
|
|
|
|
|
def read_frontmatter(path: str) -> dict:
|
|
"""Read ``path`` and return its parsed frontmatter mapping. Never raises -> ``{}``."""
|
|
try:
|
|
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
|
content = f.read()
|
|
except OSError as e:
|
|
logger.debug(f"read_frontmatter: cannot open {path}: {e}")
|
|
return {}
|
|
return parse_frontmatter(content).data
|
|
|
|
|
|
def read_frontmatter_value(path: str, key: str) -> str | None:
|
|
"""Return the value of `key` from the leading YAML frontmatter of `path`.
|
|
|
|
Format expected (canonical, matching qg/checks.py):
|
|
---
|
|
key: value
|
|
other: ...
|
|
---
|
|
<body>
|
|
|
|
Never raises. Returns None for any of:
|
|
- missing/unreadable file,
|
|
- no leading `---` frontmatter,
|
|
- malformed/unterminated frontmatter,
|
|
- YAML parse error,
|
|
- frontmatter is not a mapping,
|
|
- key absent (or its value is None/empty).
|
|
|
|
The returned value is stringified and stripped (whitespace removed); casing
|
|
is preserved so the caller decides whether to upper/lower for matching.
|
|
|
|
ORCH-52c: reimplemented on top of the unified ``read_frontmatter`` primitive.
|
|
The external contract (``str | None``, never-raise, strip, case preserved) is
|
|
UNCHANGED — external callers (``usage.py``, ``notifications``) are unaffected
|
|
(INV-3 / FR-3).
|
|
"""
|
|
fm = read_frontmatter(path)
|
|
raw = fm.get(key)
|
|
if raw is None:
|
|
return None
|
|
value = str(raw).strip()
|
|
return value or None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Body helper — replaces the duplicated review_parse._strip_frontmatter (D2)
|
|
# ---------------------------------------------------------------------------
|
|
def strip_frontmatter(content: str) -> str:
|
|
"""Return ``content`` with a leading ``--- … ---`` YAML block removed, if present.
|
|
|
|
Mirrors the previous ``review_parse._strip_frontmatter`` exactly: only a
|
|
well-formed (>= 3 ``---``-split segments) leading block is stripped; otherwise
|
|
the input is returned unchanged. Never raises -> the input text.
|
|
"""
|
|
try:
|
|
if isinstance(content, str) and content.startswith("---"):
|
|
parts = content.split("---", 2)
|
|
if len(parts) >= 3:
|
|
return parts[2]
|
|
except Exception as e: # noqa: BLE001 - never-raise contract
|
|
logger.debug(f"strip_frontmatter: unexpected error: {e}")
|
|
return content
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Writer — canonical render/persist (FR-1 / D1)
|
|
# ---------------------------------------------------------------------------
|
|
def render_frontmatter(data: Mapping[str, object], body: str = "") -> str:
|
|
"""Render ``data`` as a canonical leading YAML-frontmatter block + ``body``.
|
|
|
|
Output shape: ``"---\\n<yaml>\\n---\\n<body>"``. The YAML is emitted with
|
|
``yaml.safe_dump`` (block style, keys unsorted) so it is byte-compatible with
|
|
the existing readers (``split("---", 2)`` + ``yaml.safe_load``): a round-trip
|
|
``render_frontmatter`` -> ``parse_frontmatter`` returns the same mapping.
|
|
|
|
never-raise (NFR-2): a serialization error is logged and the function degrades
|
|
to returning ``body`` unchanged (a document with no frontmatter is read by the
|
|
gates exactly as "no machine verdict", never an exception).
|
|
"""
|
|
try:
|
|
import yaml
|
|
dumped = yaml.safe_dump(
|
|
dict(data or {}), default_flow_style=False, sort_keys=False, allow_unicode=True
|
|
).strip("\n")
|
|
return f"---\n{dumped}\n---\n{body}"
|
|
except Exception as e: # noqa: BLE001 - never-raise contract
|
|
logger.warning(f"render_frontmatter: serialization failed: {e}")
|
|
return body
|
|
|
|
|
|
def write_frontmatter(path: str, data: Mapping[str, object], body: str = "") -> bool:
|
|
"""Persist ``render_frontmatter(data, body)`` to ``path``. Returns True on success.
|
|
|
|
never-raise (NFR-2): any I/O / serialization error is logged and returns
|
|
``False`` (the caller decides how to degrade); an exception never escapes.
|
|
"""
|
|
try:
|
|
content = render_frontmatter(data, body)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
return True
|
|
except Exception as e: # noqa: BLE001 - never-raise contract
|
|
logger.warning(f"write_frontmatter: cannot write {path}: {e}")
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schema validator — warning-only by default; strict reserved (FR-2 / D3)
|
|
# ---------------------------------------------------------------------------
|
|
@dataclass(frozen=True)
|
|
class SchemaValidation:
|
|
"""Outcome of :func:`validate_schema`.
|
|
|
|
valid: all required fields are present and non-empty.
|
|
missing: the required fields that are absent / None / blank (order = REQUIRED_FIELDS).
|
|
"""
|
|
|
|
valid: bool
|
|
missing: list
|
|
|
|
|
|
def validate_schema(data: Mapping, *, required=REQUIRED_FIELDS) -> SchemaValidation:
|
|
"""Validate that ``data`` carries every required schema field, non-empty.
|
|
|
|
Pure library function (INV-4). A field counts as MISSING when it is absent, or
|
|
its value is ``None`` or — after ``str(...).strip()`` — empty. Returns a
|
|
:class:`SchemaValidation`; never raises (a non-mapping input -> all fields
|
|
missing -> ``valid=False``). This function NEVER influences a gate verdict by
|
|
itself — see :func:`maybe_warn_schema` and the ``frontmatter_validation_strict``
|
|
flag for how strict enforcement is (and is not) wired.
|
|
"""
|
|
missing: list = []
|
|
try:
|
|
mapping = data if isinstance(data, Mapping) else {}
|
|
for fld in required:
|
|
raw = mapping.get(fld)
|
|
if raw is None or str(raw).strip() == "":
|
|
missing.append(fld)
|
|
except Exception as e: # noqa: BLE001 - never-raise contract
|
|
logger.warning(f"validate_schema: unexpected error: {e}")
|
|
# Conservatively report everything missing rather than raise.
|
|
missing = list(required)
|
|
return SchemaValidation(valid=not missing, missing=missing)
|
|
|
|
|
|
def maybe_warn_schema(content: str, doc_label: str = "document") -> SchemaValidation:
|
|
"""Best-effort schema check used at verdict-read sites — warning-only by default.
|
|
|
|
Parses ``content``'s frontmatter and validates it against :data:`REQUIRED_FIELDS`.
|
|
Behaviour is governed by the ``frontmatter_validation_strict`` kill-switch
|
|
(default ``False``):
|
|
|
|
* **default (False)** — when fields are missing, emit a single
|
|
``logger.warning("frontmatter schema incomplete: missing …")`` and return.
|
|
The result is **inert**: callers that pass it through a gate must NOT change
|
|
their ``tuple[bool, str]`` verdict (FR-2 "warning/лог, не blocker"). This
|
|
keeps a machine-verdict doc that lacks the (forward-looking, additive) schema
|
|
readable exactly as before (FR-5 / AC-4) — critical so ORCH-52c does not
|
|
self-block its own deploy (its docs predate the schema).
|
|
* **strict (True)** — RESERVED for a future tightening (ORCH-52d+). The
|
|
validation result is returned the same way; the flag merely documents intent
|
|
and lets a future caller veto. It stays ``False`` in prod and ``.env.staging``.
|
|
|
|
Never raises (NFR-2): a config-read or parse error degrades to ``valid=True``
|
|
(no false warning, no influence on the verdict).
|
|
"""
|
|
try:
|
|
data = parse_frontmatter(content).data
|
|
result = validate_schema(data)
|
|
if not result.valid:
|
|
try:
|
|
from .config import settings
|
|
strict = bool(getattr(settings, "frontmatter_validation_strict", False))
|
|
except Exception: # noqa: BLE001 - config read must never raise here
|
|
strict = False
|
|
logger.warning(
|
|
"frontmatter schema incomplete in %s: missing %s%s",
|
|
doc_label,
|
|
", ".join(result.missing),
|
|
" [strict]" if strict else "",
|
|
)
|
|
return result
|
|
except Exception as e: # noqa: BLE001 - never-raise; inert on error
|
|
logger.debug(f"maybe_warn_schema: unexpected error for {doc_label}: {e}")
|
|
return SchemaValidation(valid=True, missing=[])
|