"""ORCH-120 (adr-0053): analyst open-questions -> Needs Input — engine flow. Drives ``_handle_analysis_approved_flow`` through the real ``advance_stage(..., finished_agent='analyst')`` launcher path (pattern of ``tests/test_auto_approve_brd.py``): mocks the Plane/Telegram setters and uses a temporary worktree + a patched ``check_analysis_complete``. Covers (04-test-plan.yaml): TC-01 REGRESS (mandatory): 4 files + ACTIVE 01-questions.md simultaneously -> Needs Input wins over "files ready" (AC-1). RED before the fix. TC-02 01-questions.md present, 4 files missing -> Needs Input, question text in the Plane comment + Telegram (AC-2). TC-03 Happy-path: no 01-questions.md, 4 files present -> In Review (AC-3). TC-06 Hygiene: full FRESH package supersedes a stale 01-questions.md -> In Review, NOT a repeat Needs Input (AC-6). TC-09 never-raise: a failure in the new logic degrades safely + does not crash advance_stage (AC-10). TC-10 Reversibility: kill-switch off OR enduro repo -> ORIGINAL byte-for-byte order (files_ok first) (AC-9). """ import os import tempfile import pytest _test_db = os.path.join(tempfile.gettempdir(), "test_orch120_needs_input.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") from unittest.mock import MagicMock # noqa: E402 import src.db as _db # noqa: E402 from src.db import init_db, get_db # noqa: E402 from src import stage_engine # noqa: E402 from src import analyst_questions # noqa: E402 from src import labels # noqa: E402 from src.config import settings # noqa: E402 from src.stage_engine import advance_stage # noqa: E402 _DELIVERABLES = ("01-brd.md", "02-trz.md", "03-acceptance-criteria.md", "04-test-plan.yaml") @pytest.fixture(autouse=True) def fresh(monkeypatch, tmp_path): monkeypatch.setattr(_db.settings, "db_path", _test_db) if os.path.exists(_test_db): os.unlink(_test_db) init_db() # Silence Plane/Telegram side effects; capture the channels we assert on. for name in ("notify_stage_change", "notify_qg_failure", "plane_notify_stage", "plane_notify_qg", "set_issue_in_review", "set_issue_needs_input", "set_issue_approved", "notify_approve_requested"): monkeypatch.setattr(stage_engine, name, MagicMock(), raising=False) monkeypatch.setattr(stage_engine, "_build_analyst_ready_comment", lambda *a, **k: "ready", raising=False) monkeypatch.setattr(stage_engine, "plane_add_comment", MagicMock()) monkeypatch.setattr(stage_engine, "send_telegram", MagicMock()) # autoApprove off by default (TC-03 wants In Review, not auto-advance). monkeypatch.setattr(labels, "auto_approve_applies", lambda repo: False) monkeypatch.setattr(labels, "has_label", lambda w, name, p=None: False) # Questions-gate on for orchestrator by default (mirror prod defaults). monkeypatch.setattr(settings, "analyst_questions_gate_enabled", True, raising=False) monkeypatch.setattr(settings, "analyst_questions_gate_repos", "", raising=False) monkeypatch.setattr(settings, "analyst_needs_input_autopause_enabled", True, raising=False) yield def _make_task(stage="analysis", repo="orchestrator", branch="feature/ORCH-120-x", wi="ORCH-120"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " "VALUES (?, ?, ?, ?, ?)", (f"plane-{wi}", wi, repo, branch, stage), ) tid = cur.lastrowid conn.commit() conn.close() return tid def _wi_dir(worktree, wi="ORCH-120"): d = os.path.join(worktree, "docs", "work-items", wi) os.makedirs(d, exist_ok=True) return d def _write(path, mtime=None, body="x"): with open(path, "w") as f: f.write(body) if mtime is not None: os.utime(path, (mtime, mtime)) def _patch_worktree(monkeypatch, worktree): monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: worktree) def _patch_complete_gate(monkeypatch, ok=True): def gate(*a, **k): return (ok, "ok" if ok else "missing artifacts") monkeypatch.setattr( stage_engine, "QG_CHECKS", {**stage_engine.QG_CHECKS, "check_analysis_complete": gate}, ) def _stage_of(task_id): conn = get_db() row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone() conn.close() return row[0] def _paused_at(task_id): conn = get_db() row = conn.execute("SELECT paused_at FROM tasks WHERE id=?", (task_id,)).fetchone() conn.close() return row[0] # --- TC-01: REGRESS — questions priority over files_ok ----------------------- def test_tc01_questions_priority_over_files_ready(monkeypatch, tmp_path): """4 deliverables + an ACTIVE (newest) 01-questions.md -> Needs Input wins.""" worktree = str(tmp_path) d = _wi_dir(worktree) base = 1_000_000 for i, name in enumerate(_DELIVERABLES): _write(os.path.join(d, name), mtime=base + i) # 01-questions.md is the NEWEST -> NOT superseded -> active. _write(os.path.join(d, "01-questions.md"), mtime=base + 100, body="Q-1 нужно уточнить охват") _patch_worktree(monkeypatch, worktree) _patch_complete_gate(monkeypatch, ok=True) tid = _make_task() res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120", "feature/ORCH-120-x", finished_agent="analyst") assert res.note == "analysis-needs-input" assert stage_engine.set_issue_needs_input.called assert not stage_engine.set_issue_in_review.called assert _stage_of(tid) == "analysis" # NOT advanced to architecture # --- TC-02: questions only, no deliverables ---------------------------------- def test_tc02_questions_only_no_deliverables(monkeypatch, tmp_path): worktree = str(tmp_path) d = _wi_dir(worktree) _write(os.path.join(d, "01-questions.md"), body="Q-1 какой формат вывода?") _patch_worktree(monkeypatch, worktree) _patch_complete_gate(monkeypatch, ok=False) tid = _make_task() res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120", "feature/ORCH-120-x", finished_agent="analyst") assert res.note == "analysis-needs-input" assert stage_engine.set_issue_needs_input.called # Question text reached the Plane comment + Telegram. comment_arg = stage_engine.plane_add_comment.call_args.args[1] assert "Q-1 какой формат вывода?" in comment_arg assert stage_engine.send_telegram.called # --- TC-03: happy-path, no questions ----------------------------------------- def test_tc03_happy_path_no_questions(monkeypatch, tmp_path): worktree = str(tmp_path) d = _wi_dir(worktree) for name in _DELIVERABLES: _write(os.path.join(d, name)) # No 01-questions.md. _patch_worktree(monkeypatch, worktree) _patch_complete_gate(monkeypatch, ok=True) tid = _make_task() res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120", "feature/ORCH-120-x", finished_agent="analyst") assert res.note == "analysis-in-review" assert stage_engine.set_issue_in_review.called assert not stage_engine.set_issue_needs_input.called assert stage_engine.notify_approve_requested.called # --- TC-06: hygiene — fresh package supersedes a stale questions file -------- def test_tc06_stale_questions_superseded(monkeypatch, tmp_path): worktree = str(tmp_path) d = _wi_dir(worktree) base = 2_000_000 # 01-questions.md is OLDER than every deliverable -> superseded -> In Review. _write(os.path.join(d, "01-questions.md"), mtime=base, body="stale Q from last run") for i, name in enumerate(_DELIVERABLES): _write(os.path.join(d, name), mtime=base + 100 + i) _patch_worktree(monkeypatch, worktree) _patch_complete_gate(monkeypatch, ok=True) tid = _make_task() res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120", "feature/ORCH-120-x", finished_agent="analyst") assert res.note == "analysis-in-review" assert stage_engine.set_issue_in_review.called assert not stage_engine.set_issue_needs_input.called # --- TC-09: never-raise ------------------------------------------------------- def test_tc09_predicate_error_degrades_to_prior_order(monkeypatch, tmp_path): """questions_active raising -> degrade to original order (files_ok -> In Review).""" worktree = str(tmp_path) _wi_dir(worktree) _patch_worktree(monkeypatch, worktree) _patch_complete_gate(monkeypatch, ok=True) def boom(*a, **k): raise RuntimeError("synthetic predicate failure") monkeypatch.setattr(analyst_questions, "questions_active", boom) tid = _make_task() # Must NOT raise. res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120", "feature/ORCH-120-x", finished_agent="analyst") assert res.note == "analysis-in-review" assert stage_engine.set_issue_in_review.called def test_tc09_park_error_does_not_crash(monkeypatch, tmp_path): """A failing set_task_paused must not undo Needs Input nor crash advance_stage.""" worktree = str(tmp_path) d = _wi_dir(worktree) _write(os.path.join(d, "01-questions.md"), body="Q-1 ?") _patch_worktree(monkeypatch, worktree) _patch_complete_gate(monkeypatch, ok=False) def boom(task_id): raise RuntimeError("synthetic park failure") monkeypatch.setattr(stage_engine, "set_task_paused", boom) tid = _make_task() res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120", "feature/ORCH-120-x", finished_agent="analyst") assert res.note == "analysis-needs-input" assert stage_engine.set_issue_needs_input.called # --- TC-10: reversibility — kill-switch off / enduro -> original order -------- def test_tc10_kill_switch_off_original_order(monkeypatch, tmp_path): """Gate off: 4 files + active questions -> In Review (original order), no park.""" worktree = str(tmp_path) d = _wi_dir(worktree) base = 3_000_000 for i, name in enumerate(_DELIVERABLES): _write(os.path.join(d, name), mtime=base + i) _write(os.path.join(d, "01-questions.md"), mtime=base + 100, body="Q-1 ?") _patch_worktree(monkeypatch, worktree) _patch_complete_gate(monkeypatch, ok=True) monkeypatch.setattr(settings, "analyst_questions_gate_enabled", False, raising=False) tid = _make_task() res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120", "feature/ORCH-120-x", finished_agent="analyst") assert res.note == "analysis-in-review" assert stage_engine.set_issue_in_review.called assert not stage_engine.set_issue_needs_input.called assert _paused_at(tid) is None # no auto-park when the gate is off def test_tc10_enduro_out_of_scope_original_order(monkeypatch, tmp_path): """enduro repo (empty CSV -> self-hosting only) -> gate inert -> original order.""" worktree = str(tmp_path) d = _wi_dir(worktree, wi="ET-9") base = 4_000_000 for i, name in enumerate(_DELIVERABLES): _write(os.path.join(d, name), mtime=base + i) _write(os.path.join(d, "01-questions.md"), mtime=base + 100, body="Q-1 ?") _patch_worktree(monkeypatch, worktree) _patch_complete_gate(monkeypatch, ok=True) tid = _make_task(repo="enduro-trails", branch="feature/ET-9-x", wi="ET-9") res = advance_stage(tid, "analysis", "enduro-trails", "ET-9", "feature/ET-9-x", finished_agent="analyst") assert res.note == "analysis-in-review" assert stage_engine.set_issue_in_review.called assert not stage_engine.set_issue_needs_input.called # --- Auto-park bonus: orchestrator Needs Input parks the task ---------------- def test_autopark_on_needs_input(monkeypatch, tmp_path): """ORCH-120 D4: Needs Input on the self-hosting repo auto-parks the task.""" worktree = str(tmp_path) d = _wi_dir(worktree) _write(os.path.join(d, "01-questions.md"), body="Q-1 ?") _patch_worktree(monkeypatch, worktree) _patch_complete_gate(monkeypatch, ok=False) tid = _make_task() advance_stage(tid, "analysis", "orchestrator", "ORCH-120", "feature/ORCH-120-x", finished_agent="analyst") assert _paused_at(tid) is not None # task parked -> serial-gate FIFO freed