Files

16 KiB
Raw Permalink Blame History

ТЗ — ORCH-053: Sweeper потерянных webhook (реконсиляция застрявших стадий)

Work Item ID: ORCH-053 Базовая ветка: feature/ORCH-053-sweeper-webhook-stuck-task

Это ТЗ фиксирует конкретные изменения кода/конфига/доки. Архитектурные развилки (потокобезопасность, точная схема дампинга нотификаций, способ вызова async-обработчиков из sync-потока) фиксирует архитектор в 06-adr/. Если ТЗ окажется негодным — возврат в Анализ (не комментировать задним числом).

0. Живая разведка ПЕРЕД реализацией (обязательна)

Перед кодом разработчик обязан вживую проверить (как сейчас webhook продвигает стадию):

  • src/webhooks/gitea.py::handle_ci_status (success-ветка ~стр.199217) и handle_pr;
  • src/webhooks/plane.py::handle_issue_updated / handle_status_start / handle_verdict / start_pipeline;
  • src/stage_engine.py::advance_stage (унифицированный путь, finished_agent=None = webhook-путь);
  • src/queue_worker.py (образец фонового daemon-потока + threading.Event + atomic claim);
  • src/db.py::has_active_job_for_task / claim_next_job / update_task_stage (updated_at).

1. Задействованные модули src/

Модуль Изменение
src/reconciler.py НОВЫЙ. Фоновый sweeper/reconciler (класс + module-singleton, паттерн queue_worker). Обе ветки F-1 (gate-side) и F-2 (plane-side).
src/config.py Новые настройки reconcile_* (интервал, kill-switch, per-stage grace, plane-poll flag).
src/main.py Старт/стоп reconciler в lifespan (после worker.start() / перед worker.stop()).
src/stage_engine.py Тонкий хелпер advance_if_gate_passed(...) (или reconcile_advance) — обёртка над advance_stage(..., finished_agent=None), подавляющая повторный спам нотификаций при провале гейта (продвижение — переиспользуется как есть).
src/plane_sync.py НОВЫЙ хелпер list_issues_by_state(project_id, state_uuids) -> list[dict] (GET issues с пагинацией, фильтр по state). Используется F-2.
src/webhooks/gitea.py F-3: усилить sha→branch резолв в handle_ci_status (fallback на БД-поиск task), логировать неразрезолв на уровне INFO (видимость).
src/webhooks/plane.py F-2 переиспользует handle_issue_updated / handle_status_start / handle_verdict без дублирования логики (возможно, лёгкий рефактор для вызова из reconciler).
src/main.py (API) F-4 (опц.): расширить /queue блоком reconcile-метрик или добавить GET /reconcile.

2. F-1 — Gate-side sweeper (реконсиляция по локальной БД)

Алгоритм одного прохода (reconcile_gate_once())

для каждой task где stage NOT IN ('done',) :
    если has_active_job_for_task(task.id):           continue   # в работе — не трогаем
    если get_qg_for_stage(task.stage) is None:       continue   # created/done — нет гейта
    grace = grace_for_stage(task.stage)
    если age(task.updated_at) < grace:               continue   # ещё не «застряла»
    # источник истины — гейт; путь продвижения — штатный
    advance_if_gate_passed(task.id, task.stage, task.repo, task.work_item_id, task.branch)
  • Продвижение идёт через stage_engine.advance_stage(task_id, stage, repo, work_item_id, branch, finished_agent=None) — это тот же путь, которым пользуется Plane Approved-webhook (webhooks/plane._try_advance_stage). Никакой параллельной логики advance.
  • Для developmentadvance_stage прогонит check_ci_green; passed → review + enqueue reviewer. Для reviewcheck_reviewer_verdict (канонический гейт стадии из STAGE_TRANSITIONS, читает verdict: из 12-review.md). Для testingcheck_tests_passed. Для deploycheck_deploy_status. Для deploy-stagingcheck_staging_status (+ merge-gate sub-gate отрабатывает внутри advance_stage как обычно).
  • Стадия analysis (gQG check_analysis_approved): это человеческий гейт. В advance_stage при finished_agent=None он трактуется как approved-via-status и продвинет задачу — чего при потере именно Approved-webhka мы и хотим только если Plane реально в статусе Approved. Поэтому F-1 НЕ реконсилирует analysis (advance для analysis отдаётся F-2, которая сверяется с реальным статусом Plane). Архитектор фиксирует это решение в ADR (защита от ложного продвижения неодобренного BRD).

Подавление спама нотификаций (advance_if_gate_passed)

  • Если гейт passedadvance_stage продвигает и шлёт штатные нотификации advance.
  • Если гейт failedНЕ повторять notify_qg_failure/plane_notify_qg на каждом тике. Хелпер вызывает advance_stage так, чтобы при провале была тишина (лог INFO/DEBUG), либо реализует продвижение, минуя ветку нотификации провала. Точную форму (флаг в advance_stage vs отдельный путь оценки гейта) выбирает архитектор; контракт: на застрявшей-но-красной задаче sweeper не спамит.

Защита от гонки

  • has_active_job_for_task + update_task_stage обновляет updated_at → следующий тик увидит свежий updated_at и не сработает повторно.
  • Если в момент тика прилетел живой webhook и поставил job — sweeper увидит активный job и пропустит задачу.
  • max_concurrency=1: новый enqueued job встанет в общую очередь (без двойного запуска).

3. F-2 — Plane-side reconciler (опрос Plane API)

Алгоритм одного прохода (reconcile_plane_once())

для каждого проекта p в projects.PROJECTS:
    states = get_project_states(p.plane_project_id)
    for issue in list_issues_by_state(p.plane_project_id,
                                      [states['in_progress'], states['approved'], states['rejected']]):
        task = get_task_by_plane_id(issue.id)
        new_state = issue.state
        # идемпотентность: пропускаем, если есть активный job (живой webhook вот-вот придёт/в работе)
        если task and has_active_job_for_task(task.id):  continue
        # доигрываем потерянный переход ЧЕРЕЗ существующие обработчики plane.py
        if new_state == in_progress and task is None:        -> handle_status_start(issue_data, p.plane_project_id)
        elif new_state == approved and task and stage не сдвинут:  -> handle_verdict(issue_data, ..., approved=True)
        elif new_state == rejected and task and не откатана:       -> handle_verdict(issue_data, ..., approved=False)
        else: continue   # всё синхронно — тишина
  • Переиспользовать handle_issue_updated/handle_status_start/handle_verdict из webhooks/plane.py. Они async → reconciler (sync-поток) вызывает их через asyncio.run(...) либо собственный event loop. Способ — на усмотрение архитектора; дублировать логику запрещено.
  • issue_data собирается в форму, ожидаемую обработчиками ({"id", "state": {"id":...}, "project", "name", "description_stripped"}). Недостающие поля (name/description) обработчики сами дотягивают через fetch_issue_fields (как сейчас для status-only вебхука).
  • Grace для F-2: не реагировать на issue, чей статус сменился совсем недавно (вебхук мог просто задержаться). Источник «давности» — поле времени из Plane (updated_at) и/или локальный grace по tasks.updated_at. Архитектор фиксирует точный критерий «потерян, а не задержан».
  • Идемпотентность создания (In Progress без задачи): start_pipeline уже защищён (handle_status_start создаёт только если get_task_by_plane_id пуст). Гонка sweeper↔webhook на создании: оба пройдут проверку «нет задачи» одновременно → возможен дубль. Требование: использовать тот же claim-механизм / уникальность (как ensure_unique_work_item_id + проверка существования под защитой). Архитектор обязан описать atomic-claim на создании в ADR.

list_issues_by_state (новый в plane_sync.py)

  • GET {PLANE_BASE}/workspaces/{WORKSPACE}/projects/{pid}/issues/ с фильтром по state (через query-параметр Plane, либо постфильтрация результата по issue.state).
  • Пагинация (results + cursor/next) — обойти все страницы.
  • Never-raise: при ошибке API/сети → [] + лог warning (Plane outage деградирует мягко, не роняет тик).

4. F-3 — Усиление sha→branch резолва (webhooks/gitea.py::handle_ci_status)

  • Текущая цепочка: branches[0].namegit branch -r --contains <sha>. Добавить fallback на БД: если branch не определён, найти task по repo среди активных (stage='development') и, при однозначности, использовать её branch; иначе — оставить неразрезолвленным.
  • Заменить logger.debug("could not determine branch...") на logger.info(...) (видимость потери). Sweeper (F-1) всё равно подберёт такую задачу — это defense-in-depth, не критпуть.
  • Не менять success/failure-семантику гейта.

5. Конфигурация (src/config.py, env-prefix ORCH_)

Поле Дефолт Назначение
reconcile_enabled True глобальный kill-switch sweeper'а (self-hosting safety, поэтапный раскат).
reconcile_interval_s 120 период фонового прохода (сек).
reconcile_plane_enabled True отдельный флаг для F-2 (опрос Plane API), чтобы можно было гасить только plane-ветку.
reconcile_grace_default_s 600 дефолтный порог «застревания» по tasks.updated_at.
reconcile_grace_overrides_json "" JSON-объект per-stage порогов, напр. {"analysis": 1800, "development": 300, "deploy": 900}. Невалидный JSON → дефолт (как agent_timeout_overrides_json).
reconcile_notify_unblock True слать Telegram при разблокировке (F-4).

grace_for_stage(stage) = override из JSON, иначе reconcile_grace_default_s.

6. БД

  • Изменения схемы НЕ требуются (предпочтительно, по образцу merge-gate ORCH-043). Стуковость определяется по существующим tasks.updated_at, tasks.stage и таблице jobs (has_active_job_for_task). update_task_stage уже обновляет updated_at.
  • Если архитектор сочтёт необходимым анти-дребезг (tasks.last_reconcile_at) — допускается идемпотентная миграция через _ensure_column (как остальные ALTER в db.py). По умолчанию — без новых колонок.

7. API (опционально, F-4)

  • Расширить GET /queue блоком "reconcile": {...} (enabled, interval, last_run_ts, unblocked_total, last_unblocked) — по образцу worker.status().
  • ИЛИ добавить GET /reconcile с теми же метриками. Выбор — архитектор. Не обязательно для прохождения AC, но крайне желательно для наблюдаемости.

8. Новые QG checks

  • Нет. Sweeper переиспускает существующие гейты из QG_CHECKS через advance_stage. Реестр QG_CHECKS и STAGE_TRANSITIONS не меняются.

9. Артефакты pipeline / документация (обязательно в ЭТОМ PR)

  • docs/architecture/README.md — раздел про reconciler (компонент + место в resilience-слое).
  • docs/work-items/ORCH-053/06-adr/ADR-001-*.md — архитектурное решение (потоки, гонки, async-вызов обработчиков, подавление спама, grace-критерий, atomic-claim на создании).
  • CHANGELOG.md — запись feat: ORCH-053 stuck-task reconciler.
  • При желании архитектора — global ADR в docs/architecture/adr/ (сквозной resilience).
  • docs/operations/INFRA.md — упомянуть kill-switch ORCH_RECONCILE_ENABLED (self-hosting).

10. Нефункциональные требования

  • Never-raise в тике: исключение в обработке одной задачи/issue не должно ронять весь проход (изолировать try/except на единицу работы, как queue_worker._drain_once).
  • Идемпотентность — см. §2/§3.
  • Restart-safe — daemon-поток + threading.Event, чистый stop() в lifespan.finally.
  • Тишина при синхронности — нет действий → нет логов уровня INFO/нотификаций.
  • Тесты — см. 04-test-plan.yaml (моки Plane/Gitea API и QG, без реальной сети).