Files
orchestrator/tests/test_orch120_analyst_needs_input.py
claude-bot d6b495f156
All checks were successful
CI / test (push) Successful in 1m14s
CI / test (pull_request) Successful in 1m11s
fix(analysis): activate analyst open-questions -> Needs Input flow (ORCH-120)
Activates and completes the previously dead "analyst asks BLOCKING questions ->
01-questions.md -> Needs Input" path. Four coordinated changes, additive, under
kill-switch, self-hosting scope, never-raise; STAGE_TRANSITIONS / QG_CHECKS /
check_* / machine-verdict keys / DB schema are byte-for-byte UNCHANGED (the flow
is a pre-gate engine branch, NOT a Quality Gate; 01-questions.md is a SIGNAL
artifact, NOT a machine-verdict).

- D1 contract + canon: analyst.md documents the 01-questions.md channel (blocking
  questions -> Needs Input, do NOT fabricate deliverables) + resume behaviour; new
  skeleton docs/_templates/01-questions.md; PIPELINE_DOCS.md manifest row + 01-
  prefix note.
- D2 freshness-supersede (DQ-2): pure offline mtime predicate questions_active in
  the new leaf src/analyst_questions.py (a full FRESH package supersedes a stale
  untouched 01-questions.md -> no Needs-Input loop, AC-6).
- D3 priority: questions take priority over "files ready" in
  _handle_analysis_approved_flow (_decide_analysis_outcome + _emit_analysis_*);
  off/out-of-scope runs the ORIGINAL byte-for-byte order (AC-9).
- D4 auto-park: set_task_paused on Needs Input via the ORCH-124 pause axis so the
  repo serial-gate FIFO is not wedged while waiting for a human (AC-4); D5 resume +
  unpark (clear_task_paused) in handle_status_start (analysis branch).

Flags (config.py, safe defaults): analyst_questions_gate_enabled /
analyst_questions_gate_repos (empty -> self-hosting only) /
analyst_needs_input_autopause_enabled.

Tests: test_orch120_analyst_needs_input.py (TC-01 regress + TC-02/03/06/09/10),
test_orch120_serial_gate_needs_input.py (TC-04), test_orch120_resume_unpark.py
(TC-05), test_orch120_questions_artifact_canon.py (TC-08), assert in
test_agent_prompts_canon.py (TC-07). Full suite green (2205 passed).

Refs: ORCH-120

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-17 13:15:27 +03:00

306 lines
12 KiB
Python

"""ORCH-120 (adr-0053): analyst open-questions -> Needs Input — engine flow.
Drives ``_handle_analysis_approved_flow`` through the real ``advance_stage(...,
finished_agent='analyst')`` launcher path (pattern of
``tests/test_auto_approve_brd.py``): mocks the Plane/Telegram setters and uses a
temporary worktree + a patched ``check_analysis_complete``.
Covers (04-test-plan.yaml):
TC-01 REGRESS (mandatory): 4 files + ACTIVE 01-questions.md simultaneously ->
Needs Input wins over "files ready" (AC-1). RED before the fix.
TC-02 01-questions.md present, 4 files missing -> Needs Input, question text in
the Plane comment + Telegram (AC-2).
TC-03 Happy-path: no 01-questions.md, 4 files present -> In Review (AC-3).
TC-06 Hygiene: full FRESH package supersedes a stale 01-questions.md -> In
Review, NOT a repeat Needs Input (AC-6).
TC-09 never-raise: a failure in the new logic degrades safely + does not crash
advance_stage (AC-10).
TC-10 Reversibility: kill-switch off OR enduro repo -> ORIGINAL byte-for-byte
order (files_ok first) (AC-9).
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch120_needs_input.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src import analyst_questions # noqa: E402
from src import labels # noqa: E402
from src.config import settings # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
_DELIVERABLES = ("01-brd.md", "02-trz.md", "03-acceptance-criteria.md", "04-test-plan.yaml")
@pytest.fixture(autouse=True)
def fresh(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# Silence Plane/Telegram side effects; capture the channels we assert on.
for name in ("notify_stage_change", "notify_qg_failure", "plane_notify_stage",
"plane_notify_qg", "set_issue_in_review", "set_issue_needs_input",
"set_issue_approved", "notify_approve_requested"):
monkeypatch.setattr(stage_engine, name, MagicMock(), raising=False)
monkeypatch.setattr(stage_engine, "_build_analyst_ready_comment",
lambda *a, **k: "ready", raising=False)
monkeypatch.setattr(stage_engine, "plane_add_comment", MagicMock())
monkeypatch.setattr(stage_engine, "send_telegram", MagicMock())
# autoApprove off by default (TC-03 wants In Review, not auto-advance).
monkeypatch.setattr(labels, "auto_approve_applies", lambda repo: False)
monkeypatch.setattr(labels, "has_label", lambda w, name, p=None: False)
# Questions-gate on for orchestrator by default (mirror prod defaults).
monkeypatch.setattr(settings, "analyst_questions_gate_enabled", True, raising=False)
monkeypatch.setattr(settings, "analyst_questions_gate_repos", "", raising=False)
monkeypatch.setattr(settings, "analyst_needs_input_autopause_enabled", True,
raising=False)
yield
def _make_task(stage="analysis", repo="orchestrator", branch="feature/ORCH-120-x",
wi="ORCH-120"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _wi_dir(worktree, wi="ORCH-120"):
d = os.path.join(worktree, "docs", "work-items", wi)
os.makedirs(d, exist_ok=True)
return d
def _write(path, mtime=None, body="x"):
with open(path, "w") as f:
f.write(body)
if mtime is not None:
os.utime(path, (mtime, mtime))
def _patch_worktree(monkeypatch, worktree):
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: worktree)
def _patch_complete_gate(monkeypatch, ok=True):
def gate(*a, **k):
return (ok, "ok" if ok else "missing artifacts")
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_analysis_complete": gate},
)
def _stage_of(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _paused_at(task_id):
conn = get_db()
row = conn.execute("SELECT paused_at FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
# --- TC-01: REGRESS — questions priority over files_ok -----------------------
def test_tc01_questions_priority_over_files_ready(monkeypatch, tmp_path):
"""4 deliverables + an ACTIVE (newest) 01-questions.md -> Needs Input wins."""
worktree = str(tmp_path)
d = _wi_dir(worktree)
base = 1_000_000
for i, name in enumerate(_DELIVERABLES):
_write(os.path.join(d, name), mtime=base + i)
# 01-questions.md is the NEWEST -> NOT superseded -> active.
_write(os.path.join(d, "01-questions.md"), mtime=base + 100,
body="Q-1 нужно уточнить охват")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-needs-input"
assert stage_engine.set_issue_needs_input.called
assert not stage_engine.set_issue_in_review.called
assert _stage_of(tid) == "analysis" # NOT advanced to architecture
# --- TC-02: questions only, no deliverables ----------------------------------
def test_tc02_questions_only_no_deliverables(monkeypatch, tmp_path):
worktree = str(tmp_path)
d = _wi_dir(worktree)
_write(os.path.join(d, "01-questions.md"), body="Q-1 какой формат вывода?")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=False)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-needs-input"
assert stage_engine.set_issue_needs_input.called
# Question text reached the Plane comment + Telegram.
comment_arg = stage_engine.plane_add_comment.call_args.args[1]
assert "Q-1 какой формат вывода?" in comment_arg
assert stage_engine.send_telegram.called
# --- TC-03: happy-path, no questions -----------------------------------------
def test_tc03_happy_path_no_questions(monkeypatch, tmp_path):
worktree = str(tmp_path)
d = _wi_dir(worktree)
for name in _DELIVERABLES:
_write(os.path.join(d, name))
# No 01-questions.md.
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-in-review"
assert stage_engine.set_issue_in_review.called
assert not stage_engine.set_issue_needs_input.called
assert stage_engine.notify_approve_requested.called
# --- TC-06: hygiene — fresh package supersedes a stale questions file --------
def test_tc06_stale_questions_superseded(monkeypatch, tmp_path):
worktree = str(tmp_path)
d = _wi_dir(worktree)
base = 2_000_000
# 01-questions.md is OLDER than every deliverable -> superseded -> In Review.
_write(os.path.join(d, "01-questions.md"), mtime=base, body="stale Q from last run")
for i, name in enumerate(_DELIVERABLES):
_write(os.path.join(d, name), mtime=base + 100 + i)
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-in-review"
assert stage_engine.set_issue_in_review.called
assert not stage_engine.set_issue_needs_input.called
# --- TC-09: never-raise -------------------------------------------------------
def test_tc09_predicate_error_degrades_to_prior_order(monkeypatch, tmp_path):
"""questions_active raising -> degrade to original order (files_ok -> In Review)."""
worktree = str(tmp_path)
_wi_dir(worktree)
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
def boom(*a, **k):
raise RuntimeError("synthetic predicate failure")
monkeypatch.setattr(analyst_questions, "questions_active", boom)
tid = _make_task()
# Must NOT raise.
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-in-review"
assert stage_engine.set_issue_in_review.called
def test_tc09_park_error_does_not_crash(monkeypatch, tmp_path):
"""A failing set_task_paused must not undo Needs Input nor crash advance_stage."""
worktree = str(tmp_path)
d = _wi_dir(worktree)
_write(os.path.join(d, "01-questions.md"), body="Q-1 ?")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=False)
def boom(task_id):
raise RuntimeError("synthetic park failure")
monkeypatch.setattr(stage_engine, "set_task_paused", boom)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-needs-input"
assert stage_engine.set_issue_needs_input.called
# --- TC-10: reversibility — kill-switch off / enduro -> original order --------
def test_tc10_kill_switch_off_original_order(monkeypatch, tmp_path):
"""Gate off: 4 files + active questions -> In Review (original order), no park."""
worktree = str(tmp_path)
d = _wi_dir(worktree)
base = 3_000_000
for i, name in enumerate(_DELIVERABLES):
_write(os.path.join(d, name), mtime=base + i)
_write(os.path.join(d, "01-questions.md"), mtime=base + 100, body="Q-1 ?")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
monkeypatch.setattr(settings, "analyst_questions_gate_enabled", False, raising=False)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-in-review"
assert stage_engine.set_issue_in_review.called
assert not stage_engine.set_issue_needs_input.called
assert _paused_at(tid) is None # no auto-park when the gate is off
def test_tc10_enduro_out_of_scope_original_order(monkeypatch, tmp_path):
"""enduro repo (empty CSV -> self-hosting only) -> gate inert -> original order."""
worktree = str(tmp_path)
d = _wi_dir(worktree, wi="ET-9")
base = 4_000_000
for i, name in enumerate(_DELIVERABLES):
_write(os.path.join(d, name), mtime=base + i)
_write(os.path.join(d, "01-questions.md"), mtime=base + 100, body="Q-1 ?")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
tid = _make_task(repo="enduro-trails", branch="feature/ET-9-x", wi="ET-9")
res = advance_stage(tid, "analysis", "enduro-trails", "ET-9",
"feature/ET-9-x", finished_agent="analyst")
assert res.note == "analysis-in-review"
assert stage_engine.set_issue_in_review.called
assert not stage_engine.set_issue_needs_input.called
# --- Auto-park bonus: orchestrator Needs Input parks the task ----------------
def test_autopark_on_needs_input(monkeypatch, tmp_path):
"""ORCH-120 D4: Needs Input on the self-hosting repo auto-parks the task."""
worktree = str(tmp_path)
d = _wi_dir(worktree)
_write(os.path.join(d, "01-questions.md"), body="Q-1 ?")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=False)
tid = _make_task()
advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert _paused_at(tid) is not None # task parked -> serial-gate FIFO freed