src/frontmatter.py grows from a single-key reader into the full machine
contract: reader (read_frontmatter_value, unchanged), one parse primitive
(parse_frontmatter), writer (render/write_frontmatter), schema validator
(validate_schema/REQUIRED_FIELDS, warning-only by default) and a shared
strip_frontmatter helper. The five verdict gates (check_reviewer_verdict,
_parse_tests_verdict, _parse_deploy_status, _parse_staging_status,
parse_security_status) now read through the single parse_frontmatter point
instead of duplicated ad-hoc YAML logic; review_parse._strip_frontmatter and
security_gate.extract_security_findings reuse the shared helper.
Strictly backward compatible + never-raise: STAGE_TRANSITIONS, the QG_CHECKS
composition, verdict semantics (incl. ORCH-047 three-field tester + negative
token priority), reason-strings and worktree->origin/main fallback are 1:1.
The schema validator never influences a gate verdict by default; hard-fail is
reserved behind the frontmatter_validation_strict kill-switch (default False).
New formal handoff spec docs/_standards/HANDOFF_PROTOCOL.md ("stage -> required
output" + required frontmatter schema), aligned 1:1 with PIPELINE_DOCS.md.
Tests: test_frontmatter.py (TC-01..07), test_qg_verdicts.py (TC-08..15),
test_security_gate.py (TC-12), test_stages_invariants.py (TC-16). Full
tests/ green (1212).
Refs: ORCH-076
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
209 lines
7.9 KiB
Python
209 lines
7.9 KiB
Python
"""Defensive extractors for reviewer / tester artifact bodies (ORCH-046).
|
||
|
||
When a task is rolled back to ``development`` the stage engine builds the
|
||
``task_desc`` that ends up in the developer agent's ``.task-dev.md``. Historically
|
||
that text only carried a *link* to the artifact file (12-review.md /
|
||
13-test-report.md); the developer agent had to go read the file, and the key
|
||
must-fix points (reviewer P0/P1 findings, tester failure reason) were lost in
|
||
transit — "испорченный телефон" that burns the retry budget.
|
||
|
||
This module extracts the **verbatim** must-fix text so the stage engine can embed
|
||
it directly in ``task_desc`` (ADR docs/work-items/ORCH-046/06-adr/ADR-001-*).
|
||
|
||
Contract — **never raises** (mirrors ``src/frontmatter.py`` and
|
||
``src/qg/checks.py::_parse_tests_verdict``): any error — missing file, IOError,
|
||
malformed markdown/YAML, missing section — yields ``""``. The caller then falls
|
||
back to the previous link-only ``task_desc``. No network calls; disk reads only.
|
||
"""
|
||
|
||
import logging
|
||
import re
|
||
|
||
logger = logging.getLogger("orchestrator.review_parse")
|
||
|
||
# Truncation limits (module-level per ТЗ §2.3). The full context always stays in
|
||
# the artifact file; the embedded text is a focused excerpt.
|
||
MAX_FINDINGS_CHARS = 2000
|
||
MAX_FAILURES_CHARS = 2000
|
||
|
||
_TRUNCATED_MARKER = "\n…(truncated)"
|
||
|
||
# Recognize a `### P0`/`### P1` subsection header by the presence of the P0/P1
|
||
# token, tolerant to case and the dash/em-dash that follows it.
|
||
_P01_HEADER_RE = re.compile(r"(?<![A-Za-z0-9])p[01](?![0-9])", re.IGNORECASE)
|
||
|
||
|
||
def _read(path: str) -> str | None:
|
||
"""Read a file as UTF-8. Never raises; returns None on any OS error."""
|
||
try:
|
||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||
return f.read()
|
||
except OSError as e:
|
||
logger.debug(f"review_parse: cannot open {path}: {e}")
|
||
return None
|
||
|
||
|
||
def _strip_frontmatter(content: str) -> str:
|
||
"""Drop a leading ``--- … ---`` YAML frontmatter block, if present.
|
||
|
||
ORCH-52c: delegates to the unified ``frontmatter.strip_frontmatter`` helper
|
||
(single source of frontmatter logic). Behaviour is identical (only a well-formed
|
||
>= 3-segment leading block is stripped) and the never-raise -> input contract is
|
||
preserved.
|
||
"""
|
||
from .frontmatter import strip_frontmatter
|
||
return strip_frontmatter(content)
|
||
|
||
|
||
def _truncate(text: str, limit: int) -> str:
|
||
"""Trim ``text`` to ``limit`` chars, appending a truncation marker if cut."""
|
||
if len(text) <= limit:
|
||
return text
|
||
return text[:limit].rstrip() + _TRUNCATED_MARKER
|
||
|
||
|
||
def _section_body(md: str, heading_token: str) -> str:
|
||
"""Return the body lines under the first ``## <…heading_token…>`` heading.
|
||
|
||
Capture stops at the next level-2 (``## ``) heading. Matching is
|
||
case-insensitive substring match on the heading line, so callers pass a token
|
||
like ``"Вывод pytest"`` or ``"Findings"``. ``### ``-level headers do NOT
|
||
delimit the section (they start with ``"### "``, not ``"## "``).
|
||
"""
|
||
out: list[str] = []
|
||
capturing = False
|
||
for line in md.splitlines():
|
||
if line.startswith("## "):
|
||
if capturing:
|
||
break
|
||
if heading_token.lower() in line.lower():
|
||
capturing = True
|
||
continue
|
||
if capturing:
|
||
out.append(line)
|
||
return "\n".join(out)
|
||
|
||
|
||
def _is_placeholder_item(text: str) -> bool:
|
||
"""True for empty or template-placeholder list items (non-substantive).
|
||
|
||
The canonical reviewer template seeds each severity with
|
||
``- [ ] <описание> (если есть)``. Such lines must be ignored so an empty P0/P1
|
||
subsection does not leak the placeholder into ``task_desc``.
|
||
"""
|
||
t = text.strip()
|
||
if not t:
|
||
return True
|
||
if "(если есть)" in t:
|
||
return True
|
||
# An item whose entire payload is an angle-bracket placeholder, e.g. "<описание>".
|
||
if t.startswith("<") and t.endswith(">"):
|
||
return True
|
||
return False
|
||
|
||
|
||
def _item_payload(line: str) -> str | None:
|
||
"""If ``line`` is a markdown list item, return its payload text; else None.
|
||
|
||
Handles ``- foo``, ``* foo`` and checkbox forms ``- [ ] foo`` / ``- [x] foo``.
|
||
"""
|
||
m = re.match(r"\s*[-*]\s+(?:\[[ xX]?\]\s*)?(.*)$", line)
|
||
if not m:
|
||
return None
|
||
return m.group(1)
|
||
|
||
|
||
def _findings_subsections(findings_body: str):
|
||
"""Yield ``(header_line, body_lines)`` for each ``### `` subsection."""
|
||
header: str | None = None
|
||
body: list[str] = []
|
||
for line in findings_body.splitlines():
|
||
if line.startswith("### "):
|
||
if header is not None:
|
||
yield header, body
|
||
header = line
|
||
body = []
|
||
elif header is not None:
|
||
body.append(line)
|
||
if header is not None:
|
||
yield header, body
|
||
|
||
|
||
def extract_review_findings(path: str) -> str:
|
||
"""Дословный текст P0/P1 findings из 12-review.md. Never raises; '' при ошибке/пусто.
|
||
|
||
Reads the ``## Findings`` section of a reviewer report and returns the verbatim
|
||
P0 (Blocker) and P1 (Must fix) subsection items, suitable for embedding in a
|
||
rollback ``task_desc``. P2/P3 are ignored. Empty/placeholder-only subsections
|
||
are skipped; if no substantive P0/P1 item exists, returns ``""``. The result is
|
||
truncated to ``MAX_FINDINGS_CHARS``.
|
||
"""
|
||
content = _read(path)
|
||
if content is None:
|
||
return ""
|
||
|
||
try:
|
||
body = _strip_frontmatter(content)
|
||
findings_body = _section_body(body, "Findings")
|
||
if not findings_body.strip():
|
||
return ""
|
||
|
||
blocks: list[str] = []
|
||
for header, sub_body in _findings_subsections(findings_body):
|
||
if not _P01_HEADER_RE.search(header):
|
||
continue
|
||
kept: list[str] = []
|
||
for line in sub_body:
|
||
payload = _item_payload(line)
|
||
if payload is None:
|
||
continue
|
||
if _is_placeholder_item(payload):
|
||
continue
|
||
kept.append(line.rstrip())
|
||
if kept:
|
||
blocks.append("\n".join([header.rstrip(), *kept]))
|
||
|
||
if not blocks:
|
||
return ""
|
||
return _truncate("\n\n".join(blocks), MAX_FINDINGS_CHARS)
|
||
except Exception as e: # defensive: never raise out of the extractor
|
||
logger.debug(f"review_parse: extract_review_findings failed for {path}: {e}")
|
||
return ""
|
||
|
||
|
||
def extract_test_failures(path: str) -> str:
|
||
"""Релевантный фрагмент тела 13-test-report.md (причина FAIL). Never raises; '' при ошибке/пусто.
|
||
|
||
Picks the first non-empty source, in priority order:
|
||
1. ``## Вывод pytest`` — the pytest run output (shows failing tests);
|
||
2. rows of the ``## Результаты`` table that contain ``FAIL``;
|
||
3. ``## Итог`` — the verdict summary.
|
||
The result is truncated to ``MAX_FAILURES_CHARS``. The gate ``reason`` is added
|
||
by the caller; this returns the report-body excerpt on top of it.
|
||
"""
|
||
content = _read(path)
|
||
if content is None:
|
||
return ""
|
||
|
||
try:
|
||
# 1. pytest output.
|
||
pytest_out = _section_body(content, "Вывод pytest").strip()
|
||
if pytest_out:
|
||
return _truncate(pytest_out, MAX_FAILURES_CHARS)
|
||
|
||
# 2. FAIL rows from the results table.
|
||
results = _section_body(content, "Результаты")
|
||
fail_rows = [ln.rstrip() for ln in results.splitlines() if "FAIL" in ln.upper()]
|
||
if fail_rows:
|
||
return _truncate("\n".join(fail_rows).strip(), MAX_FAILURES_CHARS)
|
||
|
||
# 3. Verdict summary.
|
||
itog = _section_body(content, "Итог").strip()
|
||
if itog:
|
||
return _truncate(itog, MAX_FAILURES_CHARS)
|
||
|
||
return ""
|
||
except Exception as e: # defensive: never raise out of the extractor
|
||
logger.debug(f"review_parse: extract_test_failures failed for {path}: {e}")
|
||
return ""
|