16 KiB
16 KiB
ТЗ — ORCH-053: Sweeper потерянных webhook (реконсиляция застрявших стадий)
Work Item ID: ORCH-053
Базовая ветка: feature/ORCH-053-sweeper-webhook-stuck-task
Это ТЗ фиксирует конкретные изменения кода/конфига/доки. Архитектурные развилки (потокобезопасность, точная схема дампинга нотификаций, способ вызова async-обработчиков из sync-потока) фиксирует архитектор в
06-adr/. Если ТЗ окажется негодным — возврат в Анализ (не комментировать задним числом).
0. Живая разведка ПЕРЕД реализацией (обязательна)
Перед кодом разработчик обязан вживую проверить (как сейчас webhook продвигает стадию):
src/webhooks/gitea.py::handle_ci_status(success-ветка ~стр.199–217) иhandle_pr;src/webhooks/plane.py::handle_issue_updated / handle_status_start / handle_verdict / start_pipeline;src/stage_engine.py::advance_stage(унифицированный путь,finished_agent=None= webhook-путь);src/queue_worker.py(образец фонового daemon-потока +threading.Event+ atomic claim);src/db.py::has_active_job_for_task / claim_next_job / update_task_stage(updated_at).
1. Задействованные модули src/
| Модуль | Изменение |
|---|---|
src/reconciler.py |
НОВЫЙ. Фоновый sweeper/reconciler (класс + module-singleton, паттерн queue_worker). Обе ветки F-1 (gate-side) и F-2 (plane-side). |
src/config.py |
Новые настройки reconcile_* (интервал, kill-switch, per-stage grace, plane-poll flag). |
src/main.py |
Старт/стоп reconciler в lifespan (после worker.start() / перед worker.stop()). |
src/stage_engine.py |
Тонкий хелпер advance_if_gate_passed(...) (или reconcile_advance) — обёртка над advance_stage(..., finished_agent=None), подавляющая повторный спам нотификаций при провале гейта (продвижение — переиспользуется как есть). |
src/plane_sync.py |
НОВЫЙ хелпер list_issues_by_state(project_id, state_uuids) -> list[dict] (GET issues с пагинацией, фильтр по state). Используется F-2. |
src/webhooks/gitea.py |
F-3: усилить sha→branch резолв в handle_ci_status (fallback на БД-поиск task), логировать неразрезолв на уровне INFO (видимость). |
src/webhooks/plane.py |
F-2 переиспользует handle_issue_updated / handle_status_start / handle_verdict без дублирования логики (возможно, лёгкий рефактор для вызова из reconciler). |
src/main.py (API) |
F-4 (опц.): расширить /queue блоком reconcile-метрик или добавить GET /reconcile. |
2. F-1 — Gate-side sweeper (реконсиляция по локальной БД)
Алгоритм одного прохода (reconcile_gate_once())
для каждой task где stage NOT IN ('done',) :
если has_active_job_for_task(task.id): continue # в работе — не трогаем
если get_qg_for_stage(task.stage) is None: continue # created/done — нет гейта
grace = grace_for_stage(task.stage)
если age(task.updated_at) < grace: continue # ещё не «застряла»
# источник истины — гейт; путь продвижения — штатный
advance_if_gate_passed(task.id, task.stage, task.repo, task.work_item_id, task.branch)
- Продвижение идёт через
stage_engine.advance_stage(task_id, stage, repo, work_item_id, branch, finished_agent=None)— это тот же путь, которым пользуется Plane Approved-webhook (webhooks/plane._try_advance_stage). Никакой параллельной логики advance. - Для
development→advance_stageпрогонитcheck_ci_green; passed →review+ enqueuereviewer. Дляreview→check_reviewer_verdict(канонический гейт стадии изSTAGE_TRANSITIONS, читаетverdict:из12-review.md). Дляtesting→check_tests_passed. Дляdeploy→check_deploy_status. Дляdeploy-staging→check_staging_status(+ merge-gate sub-gate отрабатывает внутриadvance_stageкак обычно). - Стадия
analysis(gQGcheck_analysis_approved): это человеческий гейт. Вadvance_stageприfinished_agent=Noneон трактуется какapproved-via-statusи продвинет задачу — чего при потере именно Approved-webhka мы и хотим только если Plane реально в статусе Approved. Поэтому F-1 НЕ реконсилируетanalysis(advance для analysis отдаётся F-2, которая сверяется с реальным статусом Plane). Архитектор фиксирует это решение в ADR (защита от ложного продвижения неодобренного BRD).
Подавление спама нотификаций (advance_if_gate_passed)
- Если гейт passed →
advance_stageпродвигает и шлёт штатные нотификации advance. - Если гейт failed → НЕ повторять
notify_qg_failure/plane_notify_qgна каждом тике. Хелпер вызываетadvance_stageтак, чтобы при провале была тишина (логINFO/DEBUG), либо реализует продвижение, минуя ветку нотификации провала. Точную форму (флаг вadvance_stagevs отдельный путь оценки гейта) выбирает архитектор; контракт: на застрявшей-но-красной задаче sweeper не спамит.
Защита от гонки
has_active_job_for_task+update_task_stageобновляетupdated_at→ следующий тик увидит свежийupdated_atи не сработает повторно.- Если в момент тика прилетел живой webhook и поставил job — sweeper увидит активный job и пропустит задачу.
max_concurrency=1: новый enqueued job встанет в общую очередь (без двойного запуска).
3. F-2 — Plane-side reconciler (опрос Plane API)
Алгоритм одного прохода (reconcile_plane_once())
для каждого проекта p в projects.PROJECTS:
states = get_project_states(p.plane_project_id)
for issue in list_issues_by_state(p.plane_project_id,
[states['in_progress'], states['approved'], states['rejected']]):
task = get_task_by_plane_id(issue.id)
new_state = issue.state
# идемпотентность: пропускаем, если есть активный job (живой webhook вот-вот придёт/в работе)
если task and has_active_job_for_task(task.id): continue
# доигрываем потерянный переход ЧЕРЕЗ существующие обработчики plane.py
if new_state == in_progress and task is None: -> handle_status_start(issue_data, p.plane_project_id)
elif new_state == approved and task and stage не сдвинут: -> handle_verdict(issue_data, ..., approved=True)
elif new_state == rejected and task and не откатана: -> handle_verdict(issue_data, ..., approved=False)
else: continue # всё синхронно — тишина
- Переиспользовать
handle_issue_updated/handle_status_start/handle_verdictизwebhooks/plane.py. Ониasync→ reconciler (sync-поток) вызывает их черезasyncio.run(...)либо собственный event loop. Способ — на усмотрение архитектора; дублировать логику запрещено. issue_dataсобирается в форму, ожидаемую обработчиками ({"id", "state": {"id":...}, "project", "name", "description_stripped"}). Недостающие поля (name/description) обработчики сами дотягивают черезfetch_issue_fields(как сейчас для status-only вебхука).- Grace для F-2: не реагировать на issue, чей статус сменился совсем недавно (вебхук мог
просто задержаться). Источник «давности» — поле времени из Plane (
updated_at) и/или локальный grace поtasks.updated_at. Архитектор фиксирует точный критерий «потерян, а не задержан». - Идемпотентность создания (In Progress без задачи):
start_pipelineуже защищён (handle_status_startсоздаёт только еслиget_task_by_plane_idпуст). Гонка sweeper↔webhook на создании: оба пройдут проверку «нет задачи» одновременно → возможен дубль. Требование: использовать тот же claim-механизм / уникальность (какensure_unique_work_item_id+ проверка существования под защитой). Архитектор обязан описать atomic-claim на создании в ADR.
list_issues_by_state (новый в plane_sync.py)
GET {PLANE_BASE}/workspaces/{WORKSPACE}/projects/{pid}/issues/с фильтром по state (через query-параметр Plane, либо постфильтрация результата поissue.state).- Пагинация (
results+ cursor/next) — обойти все страницы. - Never-raise: при ошибке API/сети →
[]+ логwarning(Plane outage деградирует мягко, не роняет тик).
4. F-3 — Усиление sha→branch резолва (webhooks/gitea.py::handle_ci_status)
- Текущая цепочка:
branches[0].name→git branch -r --contains <sha>. Добавить fallback на БД: если branch не определён, найти task поrepoсреди активных (stage='development') и, при однозначности, использовать её branch; иначе — оставить неразрезолвленным. - Заменить
logger.debug("could not determine branch...")наlogger.info(...)(видимость потери). Sweeper (F-1) всё равно подберёт такую задачу — это defense-in-depth, не критпуть. - Не менять success/failure-семантику гейта.
5. Конфигурация (src/config.py, env-prefix ORCH_)
| Поле | Дефолт | Назначение |
|---|---|---|
reconcile_enabled |
True |
глобальный kill-switch sweeper'а (self-hosting safety, поэтапный раскат). |
reconcile_interval_s |
120 |
период фонового прохода (сек). |
reconcile_plane_enabled |
True |
отдельный флаг для F-2 (опрос Plane API), чтобы можно было гасить только plane-ветку. |
reconcile_grace_default_s |
600 |
дефолтный порог «застревания» по tasks.updated_at. |
reconcile_grace_overrides_json |
"" |
JSON-объект per-stage порогов, напр. {"analysis": 1800, "development": 300, "deploy": 900}. Невалидный JSON → дефолт (как agent_timeout_overrides_json). |
reconcile_notify_unblock |
True |
слать Telegram при разблокировке (F-4). |
grace_for_stage(stage) = override из JSON, иначе reconcile_grace_default_s.
6. БД
- Изменения схемы НЕ требуются (предпочтительно, по образцу merge-gate ORCH-043).
Стуковость определяется по существующим
tasks.updated_at,tasks.stageи таблицеjobs(has_active_job_for_task).update_task_stageуже обновляетupdated_at. - Если архитектор сочтёт необходимым анти-дребезг (
tasks.last_reconcile_at) — допускается идемпотентная миграция через_ensure_column(как остальные ALTER вdb.py). По умолчанию — без новых колонок.
7. API (опционально, F-4)
- Расширить
GET /queueблоком"reconcile": {...}(enabled, interval, last_run_ts, unblocked_total, last_unblocked) — по образцуworker.status(). - ИЛИ добавить
GET /reconcileс теми же метриками. Выбор — архитектор. Не обязательно для прохождения AC, но крайне желательно для наблюдаемости.
8. Новые QG checks
- Нет. Sweeper переиспускает существующие гейты из
QG_CHECKSчерезadvance_stage. РеестрQG_CHECKSиSTAGE_TRANSITIONSне меняются.
9. Артефакты pipeline / документация (обязательно в ЭТОМ PR)
docs/architecture/README.md— раздел про reconciler (компонент + место в resilience-слое).docs/work-items/ORCH-053/06-adr/ADR-001-*.md— архитектурное решение (потоки, гонки, async-вызов обработчиков, подавление спама, grace-критерий, atomic-claim на создании).CHANGELOG.md— записьfeat: ORCH-053 stuck-task reconciler.- При желании архитектора — global ADR в
docs/architecture/adr/(сквозной resilience). docs/operations/INFRA.md— упомянуть kill-switchORCH_RECONCILE_ENABLED(self-hosting).
10. Нефункциональные требования
- Never-raise в тике: исключение в обработке одной задачи/issue не должно ронять весь
проход (изолировать try/except на единицу работы, как
queue_worker._drain_once). - Идемпотентность — см. §2/§3.
- Restart-safe — daemon-поток +
threading.Event, чистыйstop()вlifespan.finally. - Тишина при синхронности — нет действий → нет логов уровня INFO/нотификаций.
- Тесты — см.
04-test-plan.yaml(моки Plane/Gitea API и QG, без реальной сети).