# ТЗ — ORCH-053: Sweeper потерянных webhook (реконсиляция застрявших стадий) Work Item ID: ORCH-053 Базовая ветка: `feature/ORCH-053-sweeper-webhook-stuck-task` > Это ТЗ фиксирует **конкретные изменения кода/конфига/доки**. Архитектурные развилки > (потокобезопасность, точная схема дампинга нотификаций, способ вызова async-обработчиков > из sync-потока) фиксирует архитектор в `06-adr/`. Если ТЗ окажется негодным — возврат в > Анализ (не комментировать задним числом). ## 0. Живая разведка ПЕРЕД реализацией (обязательна) Перед кодом разработчик обязан вживую проверить (как сейчас webhook продвигает стадию): - `src/webhooks/gitea.py::handle_ci_status` (success-ветка ~стр.199–217) и `handle_pr`; - `src/webhooks/plane.py::handle_issue_updated / handle_status_start / handle_verdict / start_pipeline`; - `src/stage_engine.py::advance_stage` (унифицированный путь, `finished_agent=None` = webhook-путь); - `src/queue_worker.py` (образец фонового daemon-потока + `threading.Event` + atomic claim); - `src/db.py::has_active_job_for_task / claim_next_job / update_task_stage` (`updated_at`). ## 1. Задействованные модули `src/` | Модуль | Изменение | |--------|-----------| | `src/reconciler.py` | **НОВЫЙ.** Фоновый sweeper/reconciler (класс + module-singleton, паттерн `queue_worker`). Обе ветки F-1 (gate-side) и F-2 (plane-side). | | `src/config.py` | Новые настройки `reconcile_*` (интервал, kill-switch, per-stage grace, plane-poll flag). | | `src/main.py` | Старт/стоп reconciler в `lifespan` (после `worker.start()` / перед `worker.stop()`). | | `src/stage_engine.py` | Тонкий хелпер `advance_if_gate_passed(...)` (или `reconcile_advance`) — обёртка над `advance_stage(..., finished_agent=None)`, **подавляющая повторный спам нотификаций** при провале гейта (продвижение — переиспользуется как есть). | | `src/plane_sync.py` | НОВЫЙ хелпер `list_issues_by_state(project_id, state_uuids) -> list[dict]` (GET issues с пагинацией, фильтр по state). Используется F-2. | | `src/webhooks/gitea.py` | F-3: усилить sha→branch резолв в `handle_ci_status` (fallback на БД-поиск task), логировать неразрезолв на уровне INFO (видимость). | | `src/webhooks/plane.py` | F-2 переиспользует `handle_issue_updated` / `handle_status_start` / `handle_verdict` **без дублирования** логики (возможно, лёгкий рефактор для вызова из reconciler). | | `src/main.py` (API) | F-4 (опц.): расширить `/queue` блоком reconcile-метрик или добавить `GET /reconcile`. | ## 2. F-1 — Gate-side sweeper (реконсиляция по локальной БД) ### Алгоритм одного прохода (`reconcile_gate_once()`) ``` для каждой task где stage NOT IN ('done',) : если has_active_job_for_task(task.id): continue # в работе — не трогаем если get_qg_for_stage(task.stage) is None: continue # created/done — нет гейта grace = grace_for_stage(task.stage) если age(task.updated_at) < grace: continue # ещё не «застряла» # источник истины — гейт; путь продвижения — штатный advance_if_gate_passed(task.id, task.stage, task.repo, task.work_item_id, task.branch) ``` - **Продвижение** идёт через `stage_engine.advance_stage(task_id, stage, repo, work_item_id, branch, finished_agent=None)` — это **тот же** путь, которым пользуется Plane Approved-webhook (`webhooks/plane._try_advance_stage`). Никакой параллельной логики advance. - Для `development` → `advance_stage` прогонит `check_ci_green`; passed → `review` + enqueue `reviewer`. Для `review` → `check_reviewer_verdict` (канонический гейт стадии из `STAGE_TRANSITIONS`, читает `verdict:` из `12-review.md`). Для `testing` → `check_tests_passed`. Для `deploy` → `check_deploy_status`. Для `deploy-staging` → `check_staging_status` (+ merge-gate sub-gate отрабатывает внутри `advance_stage` как обычно). - **Стадия `analysis`** (gQG `check_analysis_approved`): это **человеческий** гейт. В `advance_stage` при `finished_agent=None` он трактуется как `approved-via-status` и продвинет задачу — чего при потере именно **Approved**-webhka мы и хотим **только** если Plane реально в статусе Approved. Поэтому **F-1 НЕ реконсилирует `analysis`** (advance для analysis отдаётся F-2, которая сверяется с реальным статусом Plane). Архитектор фиксирует это решение в ADR (защита от ложного продвижения неодобренного BRD). ### Подавление спама нотификаций (`advance_if_gate_passed`) - Если гейт **passed** → `advance_stage` продвигает и шлёт штатные нотификации advance. - Если гейт **failed** → НЕ повторять `notify_qg_failure`/`plane_notify_qg` на каждом тике. Хелпер вызывает `advance_stage` так, чтобы при провале была **тишина** (лог `INFO`/`DEBUG`), либо реализует продвижение, минуя ветку нотификации провала. Точную форму (флаг в `advance_stage` vs отдельный путь оценки гейта) выбирает архитектор; контракт: **на застрявшей-но-красной задаче sweeper не спамит**. ### Защита от гонки - `has_active_job_for_task` + `update_task_stage` обновляет `updated_at` → следующий тик увидит свежий `updated_at` и не сработает повторно. - Если в момент тика прилетел живой webhook и поставил job — sweeper увидит активный job и пропустит задачу. - `max_concurrency=1`: новый enqueued job встанет в общую очередь (без двойного запуска). ## 3. F-2 — Plane-side reconciler (опрос Plane API) ### Алгоритм одного прохода (`reconcile_plane_once()`) ``` для каждого проекта p в projects.PROJECTS: states = get_project_states(p.plane_project_id) for issue in list_issues_by_state(p.plane_project_id, [states['in_progress'], states['approved'], states['rejected']]): task = get_task_by_plane_id(issue.id) new_state = issue.state # идемпотентность: пропускаем, если есть активный job (живой webhook вот-вот придёт/в работе) если task and has_active_job_for_task(task.id): continue # доигрываем потерянный переход ЧЕРЕЗ существующие обработчики plane.py if new_state == in_progress and task is None: -> handle_status_start(issue_data, p.plane_project_id) elif new_state == approved and task and stage не сдвинут: -> handle_verdict(issue_data, ..., approved=True) elif new_state == rejected and task and не откатана: -> handle_verdict(issue_data, ..., approved=False) else: continue # всё синхронно — тишина ``` - **Переиспользовать** `handle_issue_updated`/`handle_status_start`/`handle_verdict` из `webhooks/plane.py`. Они `async` → reconciler (sync-поток) вызывает их через `asyncio.run(...)` либо собственный event loop. Способ — на усмотрение архитектора; **дублировать логику запрещено**. - `issue_data` собирается в форму, ожидаемую обработчиками (`{"id", "state": {"id":...}, "project", "name", "description_stripped"}`). Недостающие поля (name/description) обработчики сами дотягивают через `fetch_issue_fields` (как сейчас для status-only вебхука). - **Grace для F-2:** не реагировать на issue, чей статус сменился совсем недавно (вебхук мог просто задержаться). Источник «давности» — поле времени из Plane (`updated_at`) и/или локальный grace по `tasks.updated_at`. Архитектор фиксирует точный критерий «потерян, а не задержан». - **Идемпотентность создания (In Progress без задачи):** `start_pipeline` уже защищён (`handle_status_start` создаёт только если `get_task_by_plane_id` пуст). Гонка sweeper↔webhook на создании: оба пройдут проверку «нет задачи» одновременно → возможен дубль. Требование: использовать тот же claim-механизм / уникальность (как `ensure_unique_work_item_id` + проверка существования под защитой). Архитектор обязан описать atomic-claim на создании в ADR. ### `list_issues_by_state` (новый в `plane_sync.py`) - `GET {PLANE_BASE}/workspaces/{WORKSPACE}/projects/{pid}/issues/` с фильтром по state (через query-параметр Plane, либо постфильтрация результата по `issue.state`). - Пагинация (`results` + cursor/next) — обойти все страницы. - Never-raise: при ошибке API/сети → `[]` + лог `warning` (Plane outage деградирует мягко, не роняет тик). ## 4. F-3 — Усиление sha→branch резолва (`webhooks/gitea.py::handle_ci_status`) - Текущая цепочка: `branches[0].name` → `git branch -r --contains `. Добавить fallback **на БД**: если branch не определён, найти task по `repo` среди активных (`stage='development'`) и, при однозначности, использовать её branch; иначе — оставить неразрезолвленным. - Заменить `logger.debug("could not determine branch...")` на `logger.info(...)` (видимость потери). Sweeper (F-1) всё равно подберёт такую задачу — это defense-in-depth, не критпуть. - **Не менять** success/failure-семантику гейта. ## 5. Конфигурация (`src/config.py`, env-prefix `ORCH_`) | Поле | Дефолт | Назначение | |------|--------|-----------| | `reconcile_enabled` | `True` | глобальный kill-switch sweeper'а (self-hosting safety, поэтапный раскат). | | `reconcile_interval_s` | `120` | период фонового прохода (сек). | | `reconcile_plane_enabled` | `True` | отдельный флаг для F-2 (опрос Plane API), чтобы можно было гасить только plane-ветку. | | `reconcile_grace_default_s` | `600` | дефолтный порог «застревания» по `tasks.updated_at`. | | `reconcile_grace_overrides_json` | `""` | JSON-объект per-stage порогов, напр. `{"analysis": 1800, "development": 300, "deploy": 900}`. Невалидный JSON → дефолт (как `agent_timeout_overrides_json`). | | `reconcile_notify_unblock` | `True` | слать Telegram при разблокировке (F-4). | `grace_for_stage(stage)` = override из JSON, иначе `reconcile_grace_default_s`. ## 6. БД - **Изменения схемы НЕ требуются** (предпочтительно, по образцу merge-gate ORCH-043). Стуковость определяется по существующим `tasks.updated_at`, `tasks.stage` и таблице `jobs` (`has_active_job_for_task`). `update_task_stage` уже обновляет `updated_at`. - Если архитектор сочтёт необходимым анти-дребезг (`tasks.last_reconcile_at`) — допускается идемпотентная миграция через `_ensure_column` (как остальные ALTER в `db.py`). По умолчанию — **без новых колонок**. ## 7. API (опционально, F-4) - Расширить `GET /queue` блоком `"reconcile": {...}` (enabled, interval, last_run_ts, unblocked_total, last_unblocked) — по образцу `worker.status()`. - ИЛИ добавить `GET /reconcile` с теми же метриками. Выбор — архитектор. Не обязательно для прохождения AC, но крайне желательно для наблюдаемости. ## 8. Новые QG checks - **Нет.** Sweeper переиспускает существующие гейты из `QG_CHECKS` через `advance_stage`. Реестр `QG_CHECKS` и `STAGE_TRANSITIONS` не меняются. ## 9. Артефакты pipeline / документация (обязательно в ЭТОМ PR) - `docs/architecture/README.md` — раздел про reconciler (компонент + место в resilience-слое). - `docs/work-items/ORCH-053/06-adr/ADR-001-*.md` — архитектурное решение (потоки, гонки, async-вызов обработчиков, подавление спама, grace-критерий, atomic-claim на создании). - `CHANGELOG.md` — запись `feat: ORCH-053 stuck-task reconciler`. - При желании архитектора — global ADR в `docs/architecture/adr/` (сквозной resilience). - `docs/operations/INFRA.md` — упомянуть kill-switch `ORCH_RECONCILE_ENABLED` (self-hosting). ## 10. Нефункциональные требования - **Never-raise в тике:** исключение в обработке одной задачи/issue не должно ронять весь проход (изолировать try/except на единицу работы, как `queue_worker._drain_once`). - **Идемпотентность** — см. §2/§3. - **Restart-safe** — daemon-поток + `threading.Event`, чистый `stop()` в `lifespan.finally`. - **Тишина при синхронности** — нет действий → нет логов уровня INFO/нотификаций. - **Тесты** — см. `04-test-plan.yaml` (моки Plane/Gitea API и QG, без реальной сети).