fix(analysis): activate analyst open-questions -> Needs Input flow (ORCH-120)
All checks were successful
CI / test (push) Successful in 1m14s
CI / test (pull_request) Successful in 1m11s

Activates and completes the previously dead "analyst asks BLOCKING questions ->
01-questions.md -> Needs Input" path. Four coordinated changes, additive, under
kill-switch, self-hosting scope, never-raise; STAGE_TRANSITIONS / QG_CHECKS /
check_* / machine-verdict keys / DB schema are byte-for-byte UNCHANGED (the flow
is a pre-gate engine branch, NOT a Quality Gate; 01-questions.md is a SIGNAL
artifact, NOT a machine-verdict).

- D1 contract + canon: analyst.md documents the 01-questions.md channel (blocking
  questions -> Needs Input, do NOT fabricate deliverables) + resume behaviour; new
  skeleton docs/_templates/01-questions.md; PIPELINE_DOCS.md manifest row + 01-
  prefix note.
- D2 freshness-supersede (DQ-2): pure offline mtime predicate questions_active in
  the new leaf src/analyst_questions.py (a full FRESH package supersedes a stale
  untouched 01-questions.md -> no Needs-Input loop, AC-6).
- D3 priority: questions take priority over "files ready" in
  _handle_analysis_approved_flow (_decide_analysis_outcome + _emit_analysis_*);
  off/out-of-scope runs the ORIGINAL byte-for-byte order (AC-9).
- D4 auto-park: set_task_paused on Needs Input via the ORCH-124 pause axis so the
  repo serial-gate FIFO is not wedged while waiting for a human (AC-4); D5 resume +
  unpark (clear_task_paused) in handle_status_start (analysis branch).

Flags (config.py, safe defaults): analyst_questions_gate_enabled /
analyst_questions_gate_repos (empty -> self-hosting only) /
analyst_needs_input_autopause_enabled.

Tests: test_orch120_analyst_needs_input.py (TC-01 regress + TC-02/03/06/09/10),
test_orch120_serial_gate_needs_input.py (TC-04), test_orch120_resume_unpark.py
(TC-05), test_orch120_questions_artifact_canon.py (TC-08), assert in
test_agent_prompts_canon.py (TC-07). Full suite green (2205 passed).

Refs: ORCH-120

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-17 13:15:27 +03:00
parent 1fcbe06df5
commit d6b495f156
15 changed files with 1082 additions and 70 deletions

View File

@@ -239,6 +239,19 @@ ORCH_SERIAL_GATE_ENABLED=true
ORCH_SERIAL_GATE_REPOS=
ORCH_SERIAL_GATE_FREEZE_ENABLED=true
ORCH_SERIAL_GATE_PAUSE_ENABLED=true
# ORCH-120 (adr-0053): analyst open-questions -> Needs Input. Activates the dead
# "analyst asks BLOCKING questions -> 01-questions.md -> Needs Input" path in
# _handle_analysis_approved_flow. Additive, never-raise, self-hosting scope;
# STAGE_TRANSITIONS / QG_CHECKS / check_* / machine-verdict / DB schema UNCHANGED.
# ANALYST_QUESTIONS_GATE_ENABLED=false -> _handle_analysis_approved_flow runs its
# ORIGINAL pre-ORCH-120 order (files_ok first, then flat isfile check) byte-for-byte.
# ANALYST_QUESTIONS_GATE_REPOS (CSV) -> scope; EMPTY = self-hosting only (orchestrator).
# ANALYST_NEEDS_INPUT_AUTOPAUSE_ENABLED=true (default) -> auto-park a Needs-Input task
# (db.set_task_paused) so the repo serial-gate FIFO does not wedge while we wait for a
# human; unpark on resume. false -> operator-park only (POST /serial-gate/pause).
ORCH_ANALYST_QUESTIONS_GATE_ENABLED=true
ORCH_ANALYST_QUESTIONS_GATE_REPOS=
ORCH_ANALYST_NEEDS_INPUT_AUTOPAUSE_ENABLED=true
# ORCH-090: STOP-status task cancellation (stop active agent + full progress reset)
# and the relaunch-hole close. A dedicated Plane "STOP" status (logical key `stop`,
# fail-closed: absent from _DEFAULT_STATES, so a board without the status -> no-op)

View File

@@ -40,6 +40,21 @@ bug-report (симптом / шаги воспроизведения / лока
**сложным/архитектурным/визуальным** (нужен ADR или макет) — выпусти **полный** analysis-пакет и
помечай в bug-report `escalate: full-cycle` (эскалация в полный цикл, ADR-001 D5 ORCH-019); оператор
снимает багфикс-трек эндпоинтом `POST /bug-fast-track/escalate`.
**Блокирующие вопросы → Needs Input (ORCH-120, adr-0053).** Если бизнес-запрос **блокирующе**
неоднозначен и выпустить корректные 4 deliverables нельзя без ответа заказчика — **НЕ фабрикуй**
требования ради сдачи файлов. Вместо этого через **Write tool** запиши
`docs/work-items/<plane-id>/01-questions.md` (скелет — `docs/_templates/01-questions.md`) со списком
**конкретных** блокирующих вопросов (с вариантами и тем, что разблокирует анализ). Наличие активных
вопросов уводит задачу в **Needs Input** (движок `_handle_analysis_approved_flow` ставит статус +
комментирует вопросы в Plane) — **приоритетно** над «файлы готовы». Это сигнальный артефакт (гейтом
не парсится), пиши его ТОЛЬКО при реальных блокерах.
**Поведение на перезапуске (resume).** После ответа заказчика в Plane тебя перезапускают: прочитай
**свежие комментарии-ответы**, затем (а) если все блокеры сняты — выпусти **полный** валидный пакет
(4 файла); свежий пакет автоматически **supersedeит** старый `01-questions.md` по mtime (повторного
Needs Input не будет); (б) если часть вопросов осталась — **перепиши** `01-questions.md`, оставив
только актуальные блокеры (снова Needs Input). Не оставляй устаревшие вопросы вперемешку с новыми.
</task>
<deliverables>
@@ -52,6 +67,10 @@ bug-report (симптом / шаги воспроизведения / лока
| `03-acceptance-criteria.md` | Критерии приёмки (чёткие условия PASS/FAIL) |
| `04-test-plan.yaml` | План тестов (unit, integration; pytest) |
**When-applicable (сигнальный, ORCH-120):** `01-questions.md` — пишется **только** при блокирующих
открытых вопросах (см. `<task>`) **вместо** сфабрикованных 4 файлов; скелет —
`docs/_templates/01-questions.md`. Не machine-verdict, гейтом не парсится.
**Скелеты:** бери из `docs/_templates/` (одноимённые файлы) — не угадывай структуру.
**Эталон качества/полноты:** заполненные work item **ORCH-088** и **ORCH-073**
ориентируйся на их детальность и формат.

View File

@@ -3,6 +3,7 @@
Формат: [Keep a Changelog](https://keepachangelog.com/). Записи — на смысловой PR/задачу.
## [Unreleased]
- **Открытые вопросы аналитика → Needs Input (приоритет, неблокирование serial-gate, resume)** (ORCH-120, `fix`, трек Bug→escalate full-cycle): активирован и достроен ранее **мёртвый** путь «аналитик задаёт блокирующие вопросы → `01-questions.md` → Needs Input». Четыре согласованных изменения, аддитивно, под kill-switch, скоуп self-hosting, never-raise; `STAGE_TRANSITIONS` / реестр `QG_CHECKS` / семантика и имена `check_*` / machine-verdict-ключи / схема БД — **байт-в-байт не тронуты** (поток — pre-gate-ветка движка, **не** Quality Gate; `01-questions.md`**сигнальный** артефакт, **не** machine-verdict). (1) **Контракт + канон.** `.openclaw/agents/analyst.md` документирует канал «блокирующие вопросы → `01-questions.md`, НЕ фабриковать deliverables» + поведение на resume; новый скелет `docs/_templates/01-questions.md`; строка манифеста + примечание о префиксе `01-` в `docs/_standards/PIPELINE_DOCS.md`. (2) **Приоритет «вопросы активны» > «файлы готовы»** в `_handle_analysis_approved_flow` (DQ-3): чистая логика решения вынесена в leaf `src/analyst_questions.py` (`questions_gate_applies`/`autopause_applies`/`questions_active`), side-effects — в `stage_engine` (`_decide_analysis_outcome`/`_emit_analysis_needs_input`/`_emit_analysis_in_review`/`_emit_analysis_empty`); блокирующие вопросы достигают Needs Input даже при сфабрикованном полном пакете. (3) **Авто-park (DQ-1)** при Needs Input через ось «пауза» ORCH-124 (`db.set_task_paused`) → задача исключается из «активного» предиката serial-gate (ORCH-088), FIFO репо не клинит, пока ждём человека; **resume + unpark** в `handle_status_start` (analysis-ветка, `db.clear_task_paused`). (4) **Гигиена устаревания (DQ-2)** — детерминированный offline freshness-supersede по `mtime` (вопросы активны, пока пакет неполон ИЛИ `01-questions.md` не старше всех 4 deliverables) → полный свежий пакет supersedeит старый файл без зависимости от LLM (нет бесконечной петли Needs Input). Флаги (`config.py`, безопасные дефолты): `analyst_questions_gate_enabled` (kill-switch) / `analyst_questions_gate_repos` (CSV; **пусто → self-hosting only**) / `analyst_needs_input_autopause_enabled` (независимый тумблер авто-park/unpark; `False` → operator-park `POST /serial-gate/pause`). off/out-of-scope → байт-в-байт как до ORCH-120 (enduro не затронут); ORCH-066 (Needs Input только у аналитика) не расширяется. Покрытие — `tests/test_orch120_analyst_needs_input.py` (TC-01 обязательный регресс: красный до фикса, зелёный после), `tests/test_orch120_serial_gate_needs_input.py`, `tests/test_orch120_resume_unpark.py`, `tests/test_orch120_questions_artifact_canon.py` + assert в `tests/test_agent_prompts_canon.py`. ADR: `docs/work-items/ORCH-120/06-adr/ADR-001-analyst-open-questions-needs-input.md`, сквозной `docs/architecture/adr/adr-0053-analyst-open-questions-needs-input-flow.md`.
- **Гигиена run-ownership строки `jobs` — инвариант «queued ⇒ run_id/pid/started_at IS NULL»** (ORCH-126, `fix`, трек Bug): багфикс контрол-плейна (инцидент ORCH-124/125) — при `ORCH_SERIAL_GATE_ENABLED=false` queued analyst-job'ы зависали навсегда (job 2286: `status=queued + run_id=759/760 + pid=35/42 + started_at=NULL` — физически невозможное состояние). **Причина:** ни один путь возврата job в `queued` (restart `requeue_running_jobs` / retry `mark_job('queued')` / transient `mark_job_transient` / reap `reap_running_job('queued')`) **не сбрасывал run-ownership** (`run_id`/`pid`); после рестарта контейнера pid мог быть **переиспользован** ОС`pid_alive(stale)=True` → job-reaper (ORCH-065) Tier-1 «видел живой» фантомный `running` и при `max_concurrency=1` клинил клейм **всей** общей очереди всех проектов. **Инвариант (adr-0052):** `status='queued' ⇒ run_id IS NULL AND pid IS NULL AND started_at IS NULL` — queued-job никогда не несёт run-ownership (история run'а — в `agent_runs`, не в `jobs.run_id`). Фикс на **существующих колонках**: `STAGE_TRANSITIONS` / реестр `QG_CHECKS` / `check_*` / machine-verdict-ключи / **схема БД** — байт-в-байт не тронуты; для здоровых job'ов и enduro поведение байт-в-байт; миграция не требуется. ADR: `docs/work-items/ORCH-126/06-adr/ADR-001-queued-job-run-ownership-hygiene.md`, сквозной `docs/architecture/adr/adr-0052-queued-job-run-ownership-invariant.md`.
- **D1 — Forward-cleanup на всех путях возврата в `queued` (FR-1/AC-1):** `requeue_running_jobs` / `mark_job('queued')` / `mark_job_transient` / `reap_running_job('queued')` выставляют `run_id=NULL, pid=NULL` той же UPDATE-транзакцией, что чистит `started_at`/`finished_at`. Атомарные `status`-guard'ы (`reap_running_job … WHERE status='running'`, rowcount) — **сохранены байт-в-байт** (restart-safe, гонка worker↔reaper↔monitor — TR-4). Каллер-переданный `run_id` для `queued` **игнорируется** (инвариант важнее: `launcher._finalize_permanent`/reaper по-прежнему передают старый `run_id`, но для `queued` он сбрасывается). Безусловно — исправление инварианта данных, без флага (D6).
- **D2 — Чистый claim (FR-2/AC-3):** `claim_next_job` при флипе `queued→running` сбрасывает `pid=NULL, run_id=NULL` тем же существующим UPDATE (defense-in-depth поверх D1) → между claim и стампом `pid` в `_spawn` строка несёт `pid IS NULL`, не чужой pid. SELECT-гейт (`status='queued' AND available_at<=now` + dep/serial-gate) — **не тронут** (offline hot-path, NFR-2; без нового SELECT/сети).

View File

@@ -47,6 +47,7 @@ check_tests_passed → check_staging_status → check_deploy_status`.
|----------|----------------|-----------|------------------|--------------------------|-------------------------|
| `00-business-request.md` | система (Plane webhook `_create_initial_docs`) / заказчик | required | `created` (инициализация) | не гейтится (вход) | — |
| `01-brd.md` | analyst | required | `analysis` | exit-гейт `analysis→architecture` = `check_analysis_approved` (Approved + полнота файлов); helper `check_analysis_complete` (наличие `01/02/03/04`) | — |
| `01-questions.md` | analyst | when-applicable | `analysis` | **сигнальный** (гейтом НЕ парсится); механизм — ветка Needs Input в `_handle_analysis_approved_flow` (ORCH-120, adr-0053): активные блокирующие вопросы → `set_issue_needs_input` (приоритет над «файлы готовы») | — (не machine-verdict) |
| `02-trz.md` | analyst | required | `analysis` | то же | — |
| `03-acceptance-criteria.md` | analyst | required | `analysis` | то же | — |
| `04-test-plan.yaml` | analyst | required | `analysis` | то же | — |
@@ -72,6 +73,10 @@ check_tests_passed → check_staging_status → check_deploy_status`.
- **Категория `when-applicable`** = документ пишется при наличии соответствующего предмета
(инфра / данные / security / post-deploy). Его отсутствие — не нарушение приёмки.
- **`05-…` / `09-…` / `11-…`** — зарезервированные/legacy номера, в текущем каноне не используются.
- **Префикс `01-` (DQ-4 ORCH-120)** — общий для артефактов стадии `analysis` владельца `analyst`:
`01-brd.md` — обязательный deliverable (гейтится `check_analysis_complete`), `01-questions.md`
**сигнальный** when-applicable артефакт того же владельца/стадии. Коллизии нет: файлы разноимённые,
`check_analysis_complete` проверяет ровно `01-brd.md`/`02`/`03`/`04` (`01-questions.md` им не парсится).
---

43
docs/_templates/01-questions.md vendored Normal file
View File

@@ -0,0 +1,43 @@
---
work_item: ORCH-NNN
stage: analysis
author_agent: analyst
status: needs-input
created_at: <YYYY-MM-DD>
model_used: <resolve ORCH-41>
---
# 01 — Открытые вопросы (Open Questions): ORCH-NNN — <название>
Work Item: **ORCH-NNN** · Repo: **<repo>** · Стадия: analysis
> **Сигнальный** when-applicable артефакт (ORCH-120, adr-0053). Пишется аналитиком через **Write
> tool** ТОЛЬКО при **блокирующей** неоднозначности бизнес-запроса, когда выпустить корректные 4
> deliverables нельзя без ответа заказчика. Наличие этого файла с **активными** вопросами уводит
> задачу в **Needs Input** (приоритет над «файлы готовы»). **Не** machine-verdict: гейтом
> (`check_analysis_complete`/`check_analysis_approved`) НЕ парсится — это сигнал движку
> (`_handle_analysis_approved_flow`).
>
> ⚠️ Если блокирующих вопросов НЕТ — **не создавай** этот файл; выпускай полный пакет (`01-brd.md`/
> `02-trz.md`/`03-acceptance-criteria.md`/`04-test-plan.yaml`). Не фабрикуй требования ради сдачи 4
> файлов.
## 1. Контекст
<Что именно в бизнес-запросе (`00-business-request.md`) блокирует выпуск корректного пакета. Какие
факты установлены, а какие — нет. На какой код `src/` это влияет.>
## 2. Блокирующие вопросы
> Каждый вопрос — конкретный, отвечаемый, с вариантами (где уместно) и указанием, почему ответ
> блокирует анализ. Нумеруй (Q-1, Q-2, …).
- **Q-1** — <вопрос>
- Вариант A: <…> (последствие)
- Вариант B: <…> (последствие)
- Почему блокирует: <без ответа нельзя выпустить BR/TRZ, т.к. …>
- **Q-2** — …
## 3. Что разблокирует анализ
<Какие ответы переводят задачу из Needs Input обратно в работу: после ответов заказчика в Plane
аналитик перезапускается (resume), читает свежие комментарии и выпускает полный пакет. Если часть
вопросов снята, а часть осталась — **перепиши** этот файл (оставь только актуальные блокеры), иначе
выпусти 4 deliverables (свежий пакет supersedeит этот файл по mtime, DQ-2).>

View File

@@ -650,7 +650,7 @@ ORCH-027 вводит детерминированный (без LLM) **гейт
`docs/work-items/ORCH-088/08-data-requirements.md`,
`docs/work-items/ORCH-124/06-adr/ADR-001-serial-gate-pause-without-blocking.md`.
### Открытые вопросы аналитика → Needs Input (ORCH-120 — design, [adr-0053](adr/adr-0053-analyst-open-questions-needs-input-flow.md))
### Открытые вопросы аналитика → Needs Input (ORCH-120 — реализовано, [adr-0053](adr/adr-0053-analyst-open-questions-needs-input-flow.md))
При неоднозначном бизнес-запросе у аналитика не было рабочего канала уточнения — он **фабриковал**
требования, чтобы сдать обязательные 4 файла. Механизм «вопросы → Needs Input» в
`_handle_analysis_approved_flow` (`src/stage_engine.py`) существовал, но был **мёртв** (контракт не в
@@ -663,7 +663,10 @@ self-hosting, never-raise; `STAGE_TRANSITIONS`/`QG_CHECKS`/`check_*`/machine-ver
`01-questions.md`, НЕ фабриковать deliverables»; `01-questions.md` стандартизирован как
`when-applicable` сигнальный артефакт (скелет `docs/_templates/` + строка `PIPELINE_DOCS.md`).
- **Приоритет «вопросы активны» > «файлы готовы»** в `_handle_analysis_approved_flow` (DQ-3) →
блокирующие вопросы достигают Needs Input даже при частичных/сфабрикованных deliverables.
блокирующие вопросы достигают Needs Input даже при частичных/сфабрикованных deliverables. Чистая
логика решения — leaf `src/analyst_questions.py` (`questions_gate_applies`/`autopause_applies`/
`questions_active`, never-raise); side-effects (`set_issue_needs_input`/коммент/Telegram/park) —
в `stage_engine` (`_decide_analysis_outcome`/`_emit_analysis_*`).
- **Авто-park (DQ-1)** через ось «пауза» ORCH-124 (`db.set_task_paused` при Needs Input) → задача
исключается из «активного» предиката serial-gate, FIFO репо не клинит, пока ждём человека;
**resume + unpark** в `handle_status_start` (analysis-ветка, `clear_task_paused`).

169
src/analyst_questions.py Normal file
View File

@@ -0,0 +1,169 @@
"""ORCH-120 (adr-0053): analyst open-questions -> Needs Input — pure leaf helpers.
Activates and completes the dead "analyst asks BLOCKING questions ->
``01-questions.md`` -> Needs Input" path in
``stage_engine._handle_analysis_approved_flow``. This module holds ONLY the pure,
unit-testable decision logic; the side effects (set_issue_needs_input / Plane
comment / Telegram / auto-park) stay in ``stage_engine``.
Leaf pattern (mirror of ``coverage_gate`` / ``serial_gate`` / ``labels``): imports
only ``os`` / ``logging`` / ``config`` and lazily ``qg.checks.is_self_hosting_repo``;
NEVER imports ``stage_engine`` / ``launcher`` / ``db``.
What it decides (ADR-001 D2/D3):
* ``questions_gate_applies(repo)`` — whether the ORCH-120 priority+supersede
behaviour is REAL for this repo (kill-switch + scope, mirror of
``coverage_gate_applies``). OFF / out-of-scope -> ``stage_engine`` runs its
ORIGINAL byte-for-byte order (AC-9).
* ``autopause_applies(repo)`` — whether the engine auto-parks a task on Needs
Input (and unparks on resume). Independent sub-tumbler AND the questions gate
(a task is only ever auto-parked from within the questions-active branch).
* ``questions_active(worktree, work_item_id, files_ok)`` — the pure freshness-gated
supersede predicate (DQ-2): are there ACTIVE blocking questions that must win
over "files ready"?
never-raise contract (self-hosting safety): every public function degrades
conservatively and NEVER propagates into the stage engine / launcher / webhook.
"""
from __future__ import annotations
import logging
import os
from .config import settings
logger = logging.getLogger("orchestrator.analyst_questions")
# The analyst's signal artifact (DQ-4: path kept as-is; the engine already reads
# exactly this file — see stage_engine._handle_analysis_approved_flow).
QUESTIONS_FILENAME = "01-questions.md"
# The 4 mandatory analysis deliverables that ``check_analysis_complete`` gates on.
# Used by the mtime freshness-supersede check (DQ-2): a full FRESH package
# supersedes a stale, untouched 01-questions.md left over from a prior run.
DELIVERABLES = (
"01-brd.md",
"02-trz.md",
"03-acceptance-criteria.md",
"04-test-plan.yaml",
)
# ---------------------------------------------------------------------------
# Conditionality (mirrors coverage_gate_applies / serial_gate_applies)
# ---------------------------------------------------------------------------
def questions_gate_applies(repo: str) -> bool:
"""Whether the ORCH-120 questions priority+supersede is REAL for this repo.
Mirrors the ORCH-22 / ORCH-27 / ORCH-43 pattern:
* ``analyst_questions_gate_enabled=False`` -> always False (kill-switch; the
engine runs its ORIGINAL pre-ORCH-120 branch order — zero regression, AC-9).
* ``analyst_questions_gate_repos`` (CSV) non-empty -> real only for the listed
repos.
* empty CSV -> real ONLY for the self-hosting repo (``orchestrator``).
Never raises (AC-10): any error -> False (the safe no-op default that matches
the kill-switch-off behaviour).
"""
try:
if not getattr(settings, "analyst_questions_gate_enabled", False):
return False
raw = (getattr(settings, "analyst_questions_gate_repos", "") or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
return (repo or "").strip().lower() in allowed
# Lazy import keeps this module a leaf (no qg import at module load).
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("questions_gate_applies error for %s: %s", repo, e)
return False
def autopause_applies(repo: str) -> bool:
"""Whether the engine auto-parks on Needs Input / unparks on resume (D4/D5).
Two independent conditions, BOTH required:
* ``analyst_needs_input_autopause_enabled`` (independent sub-tumbler; False ->
operator-park only, via ``POST /serial-gate/pause``), AND
* ``questions_gate_applies(repo)`` — a task is only ever auto-parked from
within the questions-active branch, so the auto-park scope can never exceed
the questions gate (keeps the off/out-of-scope path byte-for-byte, AC-9).
Never raises (AC-10): any error -> False (degrade to operator-park).
"""
try:
if not getattr(settings, "analyst_needs_input_autopause_enabled", False):
return False
return questions_gate_applies(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("autopause_applies error for %s: %s", repo, e)
return False
# ---------------------------------------------------------------------------
# Pure freshness-gated supersede predicate (DQ-2)
# ---------------------------------------------------------------------------
def _work_item_dir(worktree_path: str, work_item_id: str) -> str:
return os.path.join(worktree_path, "docs", "work-items", work_item_id)
def questions_active(worktree_path: str, work_item_id: str, files_ok: bool) -> bool:
"""Are there ACTIVE blocking questions that must win over "files ready" (DQ-2)?
Deterministic and OFFLINE (filesystem only — no network, no git):
* ``01-questions.md`` absent -> NOT active (``False``).
* package incomplete (``files_ok is False``) and the file is present -> active
(``True``): questions exist, deliverables do not -> questions win (AC-2).
* package complete (``files_ok is True``) and the file is present -> freshness
check: **superseded iff ALL 4 deliverables are strictly newer** than
``01-questions.md`` (by ``os.path.getmtime``). Superseded -> NOT active
(``False`` -> In Review, AC-6); otherwise -> active (``True`` -> Needs Input,
AC-1). A full FRESH analyst run always writes the 4 deliverables with a newer
mtime, so a stale untouched 01-questions.md is deterministically superseded
without depending on any LLM action.
Fail directions (never-raise, AC-10 / DQ-2):
* a ``getmtime``/comparison error while the file PROVABLY exists -> treat
questions as **active** (``True``, Needs Input) — safe for "don't build on
guesses".
* a catastrophic error (cannot even determine file presence) -> ``False`` so
``stage_engine`` degrades to its prior ``files_ok`` order + WARNING.
"""
try:
questions_path = os.path.join(
_work_item_dir(worktree_path, work_item_id), QUESTIONS_FILENAME
)
present = os.path.isfile(questions_path)
except Exception as e: # noqa: BLE001 - catastrophic: cannot determine presence
logger.warning(
"questions_active: cannot determine 01-questions.md presence for %s: %s",
work_item_id, e,
)
return False
if not present:
return False
if not files_ok:
# Questions present, deliverables incomplete -> questions take priority.
return True
# Package complete: superseded iff every deliverable is strictly newer than
# the questions file. Any mtime error on a proven-existing file -> active.
try:
q_mtime = os.path.getmtime(questions_path)
base = _work_item_dir(worktree_path, work_item_id)
for name in DELIVERABLES:
dp = os.path.join(base, name)
if not os.path.isfile(dp) or not (os.path.getmtime(dp) > q_mtime):
# A deliverable is missing or not strictly newer -> NOT superseded
# -> questions still active (Needs Input). (files_ok True means the
# gate saw all 4; a missing file here is defensive only.)
return True
# All 4 deliverables strictly newer -> superseded -> In Review.
return False
except Exception as e: # noqa: BLE001 - mtime error on existing file -> active
logger.warning(
"questions_active: freshness check failed for %s -> active (Needs Input): %s",
work_item_id, e,
)
return True

View File

@@ -1029,6 +1029,40 @@ class Settings(BaseSettings):
serial_gate_freeze_enabled: bool = True
serial_gate_pause_enabled: bool = True
# ORCH-120 (adr-0053): analyst open-questions -> Needs Input. Activates and
# completes the dead "analyst asks BLOCKING questions -> 01-questions.md ->
# Needs Input" path in _handle_analysis_approved_flow. Additive, never-raise,
# self-hosting scope; STAGE_TRANSITIONS / QG_CHECKS / check_* / machine-verdict
# keys / DB schema are byte-for-byte UNCHANGED (the flow is a pre-gate engine
# branch, NOT a Quality Gate; 01-questions.md is a SIGNAL artifact, NOT a
# machine-verdict). See docs/work-items/ORCH-120/06-adr/ADR-001-analyst-open-
# questions-needs-input.md.
# analyst_questions_gate_enabled -> kill-switch (env
# ORCH_ANALYST_QUESTIONS_GATE_ENABLED) of the
# priority+supersede behaviour (D3). False ->
# _handle_analysis_approved_flow runs its ORIGINAL
# pre-ORCH-120 order (files_ok first, then a flat
# isfile(01-questions.md) check) byte-for-byte (AC-9).
# analyst_questions_gate_repos -> CSV scope (env
# ORCH_ANALYST_QUESTIONS_GATE_REPOS). Empty -> real
# ONLY for the self-hosting repo (orchestrator) via
# is_self_hosting_repo; non-empty -> membership.
# Mirrors coverage_gate_repos -> enduro untouched.
# analyst_needs_input_autopause_enabled -> independent sub-tumbler (env
# ORCH_ANALYST_NEEDS_INPUT_AUTOPAUSE_ENABLED) for
# auto-park on Needs Input / unpark on resume (D4/D5)
# via the ORCH-124 pause axis (db.set_task_paused /
# clear_task_paused). True -> a Needs-Input task is
# excluded from the serial-gate "active task"
# predicate so the repo FIFO does not wedge while we
# wait for a human. False -> operator-park only
# (POST /serial-gate/pause). Subordinate to the
# questions gate (auto-park only fires from the
# questions-active branch).
analyst_questions_gate_enabled: bool = True
analyst_questions_gate_repos: str = ""
analyst_needs_input_autopause_enabled: bool = True
# ORCH-090: STOP-status task cancellation (stop active agent + full progress
# reset) and the relaunch-hole close. A new logical Plane key `stop` (fail-closed,
# absent from _DEFAULT_STATES) routes to a cancel handler that drives the task to

View File

@@ -30,7 +30,7 @@ import os
import time
from dataclasses import dataclass, field
from .db import get_db, update_task_stage, enqueue_job, get_task_track
from .db import get_db, update_task_stage, enqueue_job, get_task_track, set_task_paused
from .stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
from .git_worktree import get_worktree_path
from .review_parse import extract_review_findings, extract_test_failures
@@ -42,6 +42,7 @@ from . import post_deploy
from . import labels
from . import bug_fast_track
from . import transition_lease
from . import analyst_questions
from .notifications import (
notify_stage_change,
notify_qg_failure,
@@ -708,84 +709,195 @@ def _handle_analysis_approved_flow(
return
files_ok, _ = files_check(repo, work_item_id, branch)
if files_ok:
# Full artifacts ready -> In Review, ask for the Approved STATUS (BUG C).
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
# task_id is threaded through so build_status_comment can resolve the
# analyst duration via agent_runs (ORCH-016 AC-14 DB fallback).
_build_analyst_ready_comment(repo, work_item_id, branch, task_id=task_id),
author="analyst",
)
notify_approve_requested(task_id)
result.note = "analysis-in-review"
logger.info(
f"Task {task_id}: analyst finished, requested Approved status in Plane"
)
# --- ORCH-089 autoApprove: auto-pass the BRD human gate by label --------
# After In Review + the analyst comment + the approve-request (kept for the
# BRD-review clock, transparency and symmetry with the manual path), if the
# issue carries the autoApprove label AND the repo is in scope, auto-advance
# via the SAME path a human Approved takes — never duplicating the
# transition logic. applies() (local, network-free) is checked FIRST so a
# disabled kill-switch / out-of-scope repo costs zero network (AC-8); any
# error / no-label -> fall through to the prior behaviour (return, wait for
# a human, AC-4/AC-6).
if labels.auto_approve_applies(repo) and labels.has_label(
work_item_id, settings.auto_approve_label
):
set_issue_approved(work_item_id) # indication (AC-1), transient
logger.info(
f"Task {task_id}: label {settings.auto_approve_label} -> "
f"BRD auto-approved (analysis -> architecture)"
)
plane_add_comment(
work_item_id,
f"✅ BRD авто-подтверждён (лейбл {settings.auto_approve_label}). "
"Переход на architecture без ручного Approved.",
author="analyst",
)
send_telegram(
f"{link_for(work_item_id)}: BRD авто-подтверждён "
f"(лейбл {settings.auto_approve_label})."
)
# Same advance the human Approved webhook uses: finished_agent=None ->
# check_analysis_approved approved-via-status -> advance analysis ->
# architecture + mark_brd_review_ended (clock) + standard post-effects.
# Re-entrancy is safe: the nested call passes finished_agent=None, so it
# does NOT re-enter this analyst branch (which requires agent=='analyst').
auto = advance_stage(
task_id, current_stage, repo, work_item_id, branch, finished_agent=None
)
result.advanced = auto.advanced
result.to_stage = auto.to_stage
result.enqueued_agent = auto.enqueued_agent
result.enqueued_job_id = auto.enqueued_job_id
result.note = "auto-approved-via-label"
# ORCH-120 (adr-0053 D3): decide the analyst outcome — 'needs-input' |
# 'in-review' | 'empty'. When the questions-gate applies, ACTIVE blocking
# questions (01-questions.md, freshness-supersede aware, D2) take PRIORITY over
# "files ready" (a fabricated full package must not bury the analyst's blocking
# questions, AC-1). Under the kill-switch off / out-of-scope repo the ORIGINAL
# pre-ORCH-120 order runs byte-for-byte (files_ok first, then a flat
# isfile(01-questions.md) check) — zero regression (AC-9). never-raise (AC-10):
# on any error the decision degrades to that original order.
outcome = _decide_analysis_outcome(repo, work_item_id, branch, files_ok)
if outcome == "needs-input":
_emit_analysis_needs_input(task_id, repo, work_item_id, branch, result)
return
if outcome == "in-review":
_emit_analysis_in_review(
task_id, current_stage, repo, work_item_id, branch, result
)
return
_emit_analysis_empty(work_item_id, result)
questions_path = os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/01-questions.md",
def _decide_analysis_outcome(repo, work_item_id, branch, files_ok) -> str:
"""ORCH-120 (adr-0053 D3): 3-way decision for the analyst outcome.
Returns ``'needs-input'`` | ``'in-review'`` | ``'empty'``. never-raise (AC-10):
on any error degrade to the ORIGINAL pre-ORCH-120 order so behaviour is
byte-for-byte the same when the gate is off / out of scope.
"""
try:
worktree = get_worktree_path(repo, branch)
except Exception: # noqa: BLE001 - never-raise; treat as no worktree
worktree = ""
# Questions-gate ON: blocking questions take priority over files_ok (D3).
try:
gate_on = analyst_questions.questions_gate_applies(repo)
except Exception: # noqa: BLE001
gate_on = False
if gate_on:
active = None # None == predicate could not decide -> degrade below
try:
active = analyst_questions.questions_active(
worktree, work_item_id, files_ok
)
except Exception as e: # noqa: BLE001 - never-raise; degrade to original order
logger.warning(
f"questions_active failed for {work_item_id} -> "
f"degrade to original order: {e}"
)
active = None
if active is True:
return "needs-input"
if active is False:
return "in-review" if files_ok else "empty"
# active is None: predicate degraded -> fall through to the ORIGINAL order.
# Kill-switch off / out-of-scope / degraded: ORIGINAL byte-for-byte order
# (files_ok first, then a flat isfile(01-questions.md) check).
if files_ok:
return "in-review"
try:
questions_path = os.path.join(
worktree, f"docs/work-items/{work_item_id}/01-questions.md"
)
if worktree and os.path.isfile(questions_path):
return "needs-input"
except Exception: # noqa: BLE001 - never-raise
pass
return "empty"
def _emit_analysis_in_review(
task_id, current_stage, repo, work_item_id, branch, result: AdvanceResult
):
"""Full artifacts ready -> In Review + request the Approved STATUS (BUG C).
Carries the ORCH-089 autoApprove insertion verbatim. Extracted from
``_handle_analysis_approved_flow`` by ORCH-120 (D3) so the questions-priority
decision can dispatch here without changing this branch's behaviour.
"""
# Full artifacts ready -> In Review, ask for the Approved STATUS (BUG C).
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
# task_id is threaded through so build_status_comment can resolve the
# analyst duration via agent_runs (ORCH-016 AC-14 DB fallback).
_build_analyst_ready_comment(repo, work_item_id, branch, task_id=task_id),
author="analyst",
)
if os.path.isfile(questions_path):
set_issue_needs_input(work_item_id)
with open(questions_path, "r") as qf:
questions_text = qf.read()
notify_approve_requested(task_id)
result.note = "analysis-in-review"
logger.info(
f"Task {task_id}: analyst finished, requested Approved status in Plane"
)
# --- ORCH-089 autoApprove: auto-pass the BRD human gate by label --------
# After In Review + the analyst comment + the approve-request (kept for the
# BRD-review clock, transparency and symmetry with the manual path), if the
# issue carries the autoApprove label AND the repo is in scope, auto-advance
# via the SAME path a human Approved takes — never duplicating the
# transition logic. applies() (local, network-free) is checked FIRST so a
# disabled kill-switch / out-of-scope repo costs zero network (AC-8); any
# error / no-label -> fall through to the prior behaviour (return, wait for
# a human, AC-4/AC-6).
if labels.auto_approve_applies(repo) and labels.has_label(
work_item_id, settings.auto_approve_label
):
set_issue_approved(work_item_id) # indication (AC-1), transient
logger.info(
f"Task {task_id}: label {settings.auto_approve_label} -> "
f"BRD auto-approved (analysis -> architecture)"
)
plane_add_comment(
work_item_id,
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}",
f"✅ BRD авто-подтверждён (лейбл {settings.auto_approve_label}). "
"Переход на architecture без ручного Approved.",
author="analyst",
)
send_telegram(
f"\u2753 {link_for(work_item_id)}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
f" {link_for(work_item_id)}: BRD авто-подтверждён "
f"(лейбл {settings.auto_approve_label})."
)
result.note = "analysis-needs-input"
return
# Same advance the human Approved webhook uses: finished_agent=None ->
# check_analysis_approved approved-via-status -> advance analysis ->
# architecture + mark_brd_review_ended (clock) + standard post-effects.
# Re-entrancy is safe: the nested call passes finished_agent=None, so it
# does NOT re-enter this analyst branch (which requires agent=='analyst').
auto = advance_stage(
task_id, current_stage, repo, work_item_id, branch, finished_agent=None
)
result.advanced = auto.advanced
result.to_stage = auto.to_stage
result.enqueued_agent = auto.enqueued_agent
result.enqueued_job_id = auto.enqueued_job_id
result.note = "auto-approved-via-label"
# No artifacts and no questions.
def _emit_analysis_needs_input(
task_id, repo, work_item_id, branch, result: AdvanceResult
):
"""Blocking questions -> Needs Input + Plane comment + Telegram + auto-park.
The Plane comment / Telegram are preserved verbatim from the original
``_handle_analysis_approved_flow`` questions branch. ORCH-120 (D4) adds the
auto-park: when ``autopause_applies(repo)`` the task is parked
(``db.set_task_paused``) so the repo's serial-gate FIFO is not wedged while we
wait for a human (BR-3 / AC-4). never-raise (AC-10): a file-read or park error
degrades safely and never crashes ``advance_stage``.
"""
set_issue_needs_input(work_item_id)
questions_text = ""
try:
questions_path = os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/01-questions.md",
)
with open(questions_path, "r") as qf:
questions_text = qf.read()
except Exception as e: # noqa: BLE001 - never-raise; comment without body
logger.warning(
f"Task {task_id}: could not read 01-questions.md for {work_item_id}: {e}"
)
plane_add_comment(
work_item_id,
f"❓ Analyst нуждается в уточнении:\n\n{questions_text}",
author="analyst",
)
send_telegram(
f"{link_for(work_item_id)}: Analyst задаёт вопросы. Ответь в Plane."
)
# ORCH-120 (D4): auto-park via the ORCH-124 pause axis so the parked task stops
# holding the repo serial-gate FIFO. autopause_applies() is gated by the
# questions gate, so a kill-switch-off / out-of-scope repo never parks (AC-9).
# set_task_paused is already never-raise (-> False); the extra guard keeps a
# surprise from crashing advance_stage (AC-10). Park failure does NOT undo the
# Needs Input transition (degrades to operator-park, DQ-1).
try:
if analyst_questions.autopause_applies(repo) and set_task_paused(task_id):
logger.info(
f"Task {task_id}: auto-parked on Needs Input "
f"(serial-gate FIFO freed for {repo})"
)
except Exception as e: # noqa: BLE001 - never-raise
logger.warning(f"Task {task_id}: auto-park on Needs Input failed: {e}")
result.note = "analysis-needs-input"
def _emit_analysis_empty(work_item_id, result: AdvanceResult):
"""No artifacts and no questions — a warning comment (verbatim original)."""
plane_add_comment(
work_item_id,
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433.",

View File

@@ -355,6 +355,26 @@ async def handle_status_start(data: dict, project_id: str = ""):
logger.error(f"Failed to post relaunch-hole comment for {work_item_id}: {e}")
return
# ORCH-120 (adr-0053 D5): resume + unpark. The analyst (the sole Needs-Input
# owner, ORCH-066) was auto-parked on Needs Input (D4) so the repo serial-gate
# FIFO would not wedge while we waited for the stakeholder. Now that they
# answered and returned the issue to a working status, clear the pause so the
# re-enqueued analyst-job is claimable (jointly with the ORCH-126 stale
# run_id/pid fix). Gated on `analysis` + autopause_applies (questions gate +
# scope) so off/out-of-scope is byte-for-byte unchanged (AC-9); idempotent and
# never-raise — a resume that was never parked is a no-op.
if current_stage == "analysis":
try:
from .. import analyst_questions
from ..db import clear_task_paused
if analyst_questions.autopause_applies(repo) and clear_task_paused(task_id):
logger.info(
f"Task {task_id}: unparked on analyst resume "
f"(serial-gate FIFO re-entered for {repo})"
)
except Exception as e: # noqa: BLE001 - never-raise: resume must not crash
logger.warning(f"Task {task_id}: unpark on resume failed: {e}")
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: {current_stage}\nNote: Stakeholder returned the issue to In "

View File

@@ -282,6 +282,27 @@ def test_reviewer_carries_overview_docs_axis():
)
def test_orch120_analyst_documents_questions_channel():
"""ORCH-120 (adr-0053) TC-07 (AC-7): analyst.md documents the 01-questions.md
Needs-Input channel (blocking questions -> Needs Input, do NOT fabricate
deliverables) without breaking the 52d canon (guarded by the canon tests above).
Anchored explicitly so a future prompt refactor cannot silently drop the
working contract (the same anti-drift rationale as the traceability axis)."""
text = _read("analyst")
assert "01-questions.md" in text, (
"analyst.md does not document the 01-questions.md Needs-Input channel"
)
assert "Needs Input" in text, (
"analyst.md does not tie 01-questions.md to the Needs Input outcome"
)
# Must instruct NOT to fabricate deliverables (the core BR-1 contract).
assert "фабрик" in text, (
"analyst.md does not carry the 'do not fabricate deliverables' instruction"
)
assert "ORCH-120" in text, "analyst.md does not anchor the questions channel to ORCH-120"
def test_reviewer_overview_axis_covers_system_showcase():
"""ORCH-011 (ADR-001 D7): the ORCH-079 overview-docs axis explicitly extends
to the system showcase `docs/overview/` — a PR changing functionality described

View File

@@ -0,0 +1,305 @@
"""ORCH-120 (adr-0053): analyst open-questions -> Needs Input — engine flow.
Drives ``_handle_analysis_approved_flow`` through the real ``advance_stage(...,
finished_agent='analyst')`` launcher path (pattern of
``tests/test_auto_approve_brd.py``): mocks the Plane/Telegram setters and uses a
temporary worktree + a patched ``check_analysis_complete``.
Covers (04-test-plan.yaml):
TC-01 REGRESS (mandatory): 4 files + ACTIVE 01-questions.md simultaneously ->
Needs Input wins over "files ready" (AC-1). RED before the fix.
TC-02 01-questions.md present, 4 files missing -> Needs Input, question text in
the Plane comment + Telegram (AC-2).
TC-03 Happy-path: no 01-questions.md, 4 files present -> In Review (AC-3).
TC-06 Hygiene: full FRESH package supersedes a stale 01-questions.md -> In
Review, NOT a repeat Needs Input (AC-6).
TC-09 never-raise: a failure in the new logic degrades safely + does not crash
advance_stage (AC-10).
TC-10 Reversibility: kill-switch off OR enduro repo -> ORIGINAL byte-for-byte
order (files_ok first) (AC-9).
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch120_needs_input.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src import analyst_questions # noqa: E402
from src import labels # noqa: E402
from src.config import settings # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
_DELIVERABLES = ("01-brd.md", "02-trz.md", "03-acceptance-criteria.md", "04-test-plan.yaml")
@pytest.fixture(autouse=True)
def fresh(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# Silence Plane/Telegram side effects; capture the channels we assert on.
for name in ("notify_stage_change", "notify_qg_failure", "plane_notify_stage",
"plane_notify_qg", "set_issue_in_review", "set_issue_needs_input",
"set_issue_approved", "notify_approve_requested"):
monkeypatch.setattr(stage_engine, name, MagicMock(), raising=False)
monkeypatch.setattr(stage_engine, "_build_analyst_ready_comment",
lambda *a, **k: "ready", raising=False)
monkeypatch.setattr(stage_engine, "plane_add_comment", MagicMock())
monkeypatch.setattr(stage_engine, "send_telegram", MagicMock())
# autoApprove off by default (TC-03 wants In Review, not auto-advance).
monkeypatch.setattr(labels, "auto_approve_applies", lambda repo: False)
monkeypatch.setattr(labels, "has_label", lambda w, name, p=None: False)
# Questions-gate on for orchestrator by default (mirror prod defaults).
monkeypatch.setattr(settings, "analyst_questions_gate_enabled", True, raising=False)
monkeypatch.setattr(settings, "analyst_questions_gate_repos", "", raising=False)
monkeypatch.setattr(settings, "analyst_needs_input_autopause_enabled", True,
raising=False)
yield
def _make_task(stage="analysis", repo="orchestrator", branch="feature/ORCH-120-x",
wi="ORCH-120"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _wi_dir(worktree, wi="ORCH-120"):
d = os.path.join(worktree, "docs", "work-items", wi)
os.makedirs(d, exist_ok=True)
return d
def _write(path, mtime=None, body="x"):
with open(path, "w") as f:
f.write(body)
if mtime is not None:
os.utime(path, (mtime, mtime))
def _patch_worktree(monkeypatch, worktree):
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: worktree)
def _patch_complete_gate(monkeypatch, ok=True):
def gate(*a, **k):
return (ok, "ok" if ok else "missing artifacts")
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_analysis_complete": gate},
)
def _stage_of(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _paused_at(task_id):
conn = get_db()
row = conn.execute("SELECT paused_at FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
# --- TC-01: REGRESS — questions priority over files_ok -----------------------
def test_tc01_questions_priority_over_files_ready(monkeypatch, tmp_path):
"""4 deliverables + an ACTIVE (newest) 01-questions.md -> Needs Input wins."""
worktree = str(tmp_path)
d = _wi_dir(worktree)
base = 1_000_000
for i, name in enumerate(_DELIVERABLES):
_write(os.path.join(d, name), mtime=base + i)
# 01-questions.md is the NEWEST -> NOT superseded -> active.
_write(os.path.join(d, "01-questions.md"), mtime=base + 100,
body="Q-1 нужно уточнить охват")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-needs-input"
assert stage_engine.set_issue_needs_input.called
assert not stage_engine.set_issue_in_review.called
assert _stage_of(tid) == "analysis" # NOT advanced to architecture
# --- TC-02: questions only, no deliverables ----------------------------------
def test_tc02_questions_only_no_deliverables(monkeypatch, tmp_path):
worktree = str(tmp_path)
d = _wi_dir(worktree)
_write(os.path.join(d, "01-questions.md"), body="Q-1 какой формат вывода?")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=False)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-needs-input"
assert stage_engine.set_issue_needs_input.called
# Question text reached the Plane comment + Telegram.
comment_arg = stage_engine.plane_add_comment.call_args.args[1]
assert "Q-1 какой формат вывода?" in comment_arg
assert stage_engine.send_telegram.called
# --- TC-03: happy-path, no questions -----------------------------------------
def test_tc03_happy_path_no_questions(monkeypatch, tmp_path):
worktree = str(tmp_path)
d = _wi_dir(worktree)
for name in _DELIVERABLES:
_write(os.path.join(d, name))
# No 01-questions.md.
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-in-review"
assert stage_engine.set_issue_in_review.called
assert not stage_engine.set_issue_needs_input.called
assert stage_engine.notify_approve_requested.called
# --- TC-06: hygiene — fresh package supersedes a stale questions file --------
def test_tc06_stale_questions_superseded(monkeypatch, tmp_path):
worktree = str(tmp_path)
d = _wi_dir(worktree)
base = 2_000_000
# 01-questions.md is OLDER than every deliverable -> superseded -> In Review.
_write(os.path.join(d, "01-questions.md"), mtime=base, body="stale Q from last run")
for i, name in enumerate(_DELIVERABLES):
_write(os.path.join(d, name), mtime=base + 100 + i)
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-in-review"
assert stage_engine.set_issue_in_review.called
assert not stage_engine.set_issue_needs_input.called
# --- TC-09: never-raise -------------------------------------------------------
def test_tc09_predicate_error_degrades_to_prior_order(monkeypatch, tmp_path):
"""questions_active raising -> degrade to original order (files_ok -> In Review)."""
worktree = str(tmp_path)
_wi_dir(worktree)
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
def boom(*a, **k):
raise RuntimeError("synthetic predicate failure")
monkeypatch.setattr(analyst_questions, "questions_active", boom)
tid = _make_task()
# Must NOT raise.
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-in-review"
assert stage_engine.set_issue_in_review.called
def test_tc09_park_error_does_not_crash(monkeypatch, tmp_path):
"""A failing set_task_paused must not undo Needs Input nor crash advance_stage."""
worktree = str(tmp_path)
d = _wi_dir(worktree)
_write(os.path.join(d, "01-questions.md"), body="Q-1 ?")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=False)
def boom(task_id):
raise RuntimeError("synthetic park failure")
monkeypatch.setattr(stage_engine, "set_task_paused", boom)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-needs-input"
assert stage_engine.set_issue_needs_input.called
# --- TC-10: reversibility — kill-switch off / enduro -> original order --------
def test_tc10_kill_switch_off_original_order(monkeypatch, tmp_path):
"""Gate off: 4 files + active questions -> In Review (original order), no park."""
worktree = str(tmp_path)
d = _wi_dir(worktree)
base = 3_000_000
for i, name in enumerate(_DELIVERABLES):
_write(os.path.join(d, name), mtime=base + i)
_write(os.path.join(d, "01-questions.md"), mtime=base + 100, body="Q-1 ?")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
monkeypatch.setattr(settings, "analyst_questions_gate_enabled", False, raising=False)
tid = _make_task()
res = advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert res.note == "analysis-in-review"
assert stage_engine.set_issue_in_review.called
assert not stage_engine.set_issue_needs_input.called
assert _paused_at(tid) is None # no auto-park when the gate is off
def test_tc10_enduro_out_of_scope_original_order(monkeypatch, tmp_path):
"""enduro repo (empty CSV -> self-hosting only) -> gate inert -> original order."""
worktree = str(tmp_path)
d = _wi_dir(worktree, wi="ET-9")
base = 4_000_000
for i, name in enumerate(_DELIVERABLES):
_write(os.path.join(d, name), mtime=base + i)
_write(os.path.join(d, "01-questions.md"), mtime=base + 100, body="Q-1 ?")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=True)
tid = _make_task(repo="enduro-trails", branch="feature/ET-9-x", wi="ET-9")
res = advance_stage(tid, "analysis", "enduro-trails", "ET-9",
"feature/ET-9-x", finished_agent="analyst")
assert res.note == "analysis-in-review"
assert stage_engine.set_issue_in_review.called
assert not stage_engine.set_issue_needs_input.called
# --- Auto-park bonus: orchestrator Needs Input parks the task ----------------
def test_autopark_on_needs_input(monkeypatch, tmp_path):
"""ORCH-120 D4: Needs Input on the self-hosting repo auto-parks the task."""
worktree = str(tmp_path)
d = _wi_dir(worktree)
_write(os.path.join(d, "01-questions.md"), body="Q-1 ?")
_patch_worktree(monkeypatch, worktree)
_patch_complete_gate(monkeypatch, ok=False)
tid = _make_task()
advance_stage(tid, "analysis", "orchestrator", "ORCH-120",
"feature/ORCH-120-x", finished_agent="analyst")
assert _paused_at(tid) is not None # task parked -> serial-gate FIFO freed

View File

@@ -0,0 +1,56 @@
"""ORCH-120 (adr-0053) TC-08: canon of the 01-questions.md signal artifact.
Pure-text structural checks (NO src/ import): the skeleton exists and the
PIPELINE_DOCS.md manifest documents 01-questions.md as an analyst-owned,
when-applicable, NON-machine-verdict signal artifact (AC-8).
"""
import os
_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
_TEMPLATE = os.path.join(_REPO_ROOT, "docs", "_templates", "01-questions.md")
_PIPELINE_DOCS = os.path.join(_REPO_ROOT, "docs", "_standards", "PIPELINE_DOCS.md")
def _read(path):
with open(path, encoding="utf-8") as f:
return f.read()
def test_questions_skeleton_exists_and_nonempty():
"""TC-08: docs/_templates/01-questions.md exists and is non-empty."""
assert os.path.isfile(_TEMPLATE), "docs/_templates/01-questions.md is missing"
body = _read(_TEMPLATE)
assert body.strip(), "01-questions.md template is empty"
# Carries the 52c schema fields + signals the analyst stage/owner.
for field in ("work_item", "stage", "author_agent", "status", "created_at",
"model_used"):
assert field in body, f"01-questions.md template omits schema field {field!r}"
assert "stage: analysis" in body
assert "author_agent: analyst" in body
# Documents the three required parts (context / blocking questions / unblocks).
assert "Блокирующие вопросы" in body
assert "разблокирует" in body
def test_pipeline_docs_manifest_row_for_questions():
"""TC-08: PIPELINE_DOCS.md has a manifest row for 01-questions.md."""
text = _read(_PIPELINE_DOCS)
# A manifest table row mentioning the file, the analyst owner and when-applicable.
rows = [ln for ln in text.splitlines()
if "`01-questions.md`" in ln and "|" in ln]
assert rows, "PIPELINE_DOCS.md has no manifest row for 01-questions.md"
row = rows[0]
assert "analyst" in row, "01-questions.md manifest row does not name the analyst owner"
assert "when-applicable" in row, "01-questions.md row not marked when-applicable"
# Signal artifact -> NOT a machine-verdict.
assert ("сигнальн" in row.lower()) or ("не machine-verdict" in row.lower()), (
"01-questions.md row does not mark it as a signal / non-machine-verdict artifact"
)
def test_pipeline_docs_notes_the_01_prefix_convention():
"""TC-08 (DQ-4): PIPELINE_DOCS.md notes the shared `01-` prefix convention."""
text = _read(_PIPELINE_DOCS)
assert "Префикс `01-`" in text or "префикс `01-`" in text, (
"PIPELINE_DOCS.md does not document the 01- prefix convention (DQ-4)"
)

View File

@@ -0,0 +1,100 @@
"""ORCH-120 (adr-0053) TC-05: resume + unpark on analyst relaunch.
When the stakeholder answers and returns the issue to a working status on
``analysis``, ``handle_status_start`` clears the auto-park (``paused_at`` -> NULL)
so the re-enqueued analyst-job is claimable, and relaunches the analyst. The
ORCH-090 relaunch-guard (relaunch only for ``analysis``) is not weakened.
TC-05 paused analysis task + To Analyse, no active job -> unpark + relaunch
analyst; relaunch-guard (analysis only) respected (AC-5).
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch120_resume_unpark.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import patch, AsyncMock, MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db, set_task_paused, is_task_paused # noqa: E402
from src import config as cfg # noqa: E402
from src.webhooks.plane import handle_status_start # noqa: E402
@pytest.fixture(autouse=True)
def fresh(monkeypatch):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# Questions gate + autopause ON for the self-hosting repo (prod defaults).
monkeypatch.setattr(cfg.settings, "analyst_questions_gate_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "analyst_questions_gate_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "analyst_needs_input_autopause_enabled", True,
raising=False)
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
def _make_task(plane_id, stage="analysis", repo="orchestrator", branch="feature/ORCH-120-x",
wi="ORCH-120"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(plane_id, wi, repo, branch, stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
@pytest.mark.asyncio
async def test_tc05_resume_unparks_and_relaunches_analyst():
tid = _make_task("resume-120", stage="analysis")
assert set_task_paused(tid) is True
assert is_task_paused(tid) is True # parked while waiting for a human
data = {"id": "resume-120", "state": {"id": "ip-uuid", "name": "To Analyse"}}
with patch("src.webhooks.plane.enqueue_job", return_value=11) as mock_enqueue, \
patch("src.webhooks.plane.start_pipeline", new_callable=AsyncMock) as mock_start, \
patch("src.plane_sync.add_comment", MagicMock()), \
patch("src.plane_sync.set_issue_analysis") as mock_analysis:
await handle_status_start(data, "proj-1")
mock_start.assert_not_called() # resume, not a fresh pipeline
# Unparked: the re-enqueued analyst-job is claimable again (AC-5).
assert is_task_paused(tid) is False
# Analyst relaunched exactly once (relaunch-guard: only analysis, ORCH-090).
assert mock_enqueue.call_count == 1
assert mock_enqueue.call_args.args[0] == "analyst"
mock_analysis.assert_called_once_with("ORCH-120")
@pytest.mark.asyncio
async def test_tc05_autopause_off_does_not_unpark():
"""Reversibility: with the autopause sub-flag off the resume does not unpark
(operator-park stays the operator's to clear); relaunch still happens."""
tid = _make_task("resume-120b", stage="analysis")
set_task_paused(tid)
data = {"id": "resume-120b", "state": {"id": "ip-uuid", "name": "To Analyse"}}
with patch("src.config.settings.analyst_needs_input_autopause_enabled", False, create=True), \
patch("src.webhooks.plane.enqueue_job", return_value=12) as mock_enqueue, \
patch("src.webhooks.plane.start_pipeline", new_callable=AsyncMock), \
patch("src.plane_sync.add_comment", MagicMock()), \
patch("src.plane_sync.set_issue_analysis"):
await handle_status_start(data, "proj-1")
# Autopause off -> engine-side unpark is inert; the task stays parked (operator
# clears via POST /serial-gate/resume). The relaunch itself is unaffected.
assert is_task_paused(tid) is True
assert mock_enqueue.call_count == 1

View File

@@ -0,0 +1,111 @@
"""ORCH-120 (adr-0053) TC-04: Needs Input auto-park does not wedge serial-gate.
Integration: a self-hosting task A driven into Needs Input through the real
``advance_stage(..., finished_agent='analyst')`` path is auto-parked (D4), which
EXCLUDES it from the serial-gate "active task" predicate (ORCH-088/124) so a later
task B of the same repo can enter ``analysis`` (its analyst-job becomes claimable).
TC-04 earlier task A (analysis) blocks B's analyst-job; once A is driven into
Needs Input (auto-parked), B's analyst-job becomes claimable (AC-4).
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch120_serial_gate.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402
from src import stage_engine # noqa: E402
from src import labels # noqa: E402
from src import config as cfg # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
@pytest.fixture(autouse=True)
def fresh(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# Serial gate + pause axis ON (empty CSV -> all repos).
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_pause_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False)
# Questions gate + autopause ON for orchestrator (prod defaults).
monkeypatch.setattr(cfg.settings, "analyst_questions_gate_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "analyst_questions_gate_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "analyst_needs_input_autopause_enabled", True,
raising=False)
# Silence Plane/Telegram side effects.
for name in ("set_issue_in_review", "set_issue_needs_input", "set_issue_approved",
"notify_approve_requested", "plane_notify_stage"):
monkeypatch.setattr(stage_engine, name, MagicMock(), raising=False)
monkeypatch.setattr(stage_engine, "plane_add_comment", MagicMock())
monkeypatch.setattr(stage_engine, "send_telegram", MagicMock())
monkeypatch.setattr(labels, "auto_approve_applies", lambda repo: False)
monkeypatch.setattr(labels, "has_label", lambda w, name, p=None: False)
yield
def _make_task(wi, stage="analysis", repo="orchestrator"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(wi, wi, repo, f"feature/{wi}", stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _paused_at(task_id):
conn = get_db()
row = conn.execute("SELECT paused_at FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def test_tc04_needs_input_autopark_unblocks_serial_gate(monkeypatch, tmp_path):
a = _make_task("ORCH-120A", stage="analysis") # earlier active task
b = _make_task("ORCH-120B", stage="analysis") # later task awaiting analysis
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
# Before A is parked it holds the repo's FIFO gate -> B's analyst-job blocked.
assert claim_next_job() is None, "active A gates B (FIFO, ORCH-088)"
# Drive A into Needs Input via the engine (01-questions.md present, no
# deliverables) -> auto-park.
worktree = str(tmp_path)
d = os.path.join(worktree, "docs", "work-items", "ORCH-120A")
os.makedirs(d, exist_ok=True)
with open(os.path.join(d, "01-questions.md"), "w") as f:
f.write("Q-1 нужно уточнить охват")
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, br: worktree)
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_analysis_complete": lambda *a, **k: (False, "missing")},
)
res = advance_stage(a, "analysis", "orchestrator", "ORCH-120A",
"feature/ORCH-120A", finished_agent="analyst")
assert res.note == "analysis-needs-input"
assert _paused_at(a) is not None, "A must be auto-parked on Needs Input (D4)"
# Now the parked A no longer holds the FIFO gate -> B's analyst-job is claimable.
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"a parked Needs-Input predecessor must not wedge the repo serial-gate (AC-4)"
)