feat(reconciler): sweeper потерянных webhook (реконсиляция застрявших стадий)
Конвейер продвигается только входящими webhook; потерянное событие (502 на ребилде, отсутствие ретраев у Plane/Gitea, неразрезолвленный sha→branch) оставляет задачу молча застрявшей (класс инцидента ORCH-044). Новый фоновый daemon-поток src/reconciler.py (паттерн queue_worker) доигрывает пропущенный переход через те же штатные гейты/обработчики, что и webhook: - F-1 gate-side: для задач stage≠done, без активного job и age(updated_at) ≥ grace_for_stage(stage) — read-only пред-оценка канонического QG; зелёный → stage_engine.advance_stage(..., finished_agent=None); красный → тишина (спам нотификаций структурно невозможен). analysis F-1 не трогает (человеческий гейт). - F-2 plane-side: опрос Plane API per-project (plane_sync.list_issues_by_state, курсорная пагинация, never-raise) → реплей In Progress/Approved/Rejected через существующие handle_status_start/handle_verdict (async из sync-потока, asyncio.run). - F-3: усиление sha→branch в handle_ci_status — БД-fallback по единственной development-задаче repo (неоднозначность → не резолвим), debug→info. - Анти-дубль на создании (db.create_task_atomic под process-wide Lock): гонка reconcile↔webhook не плодит второй task/branch/worktree/analyst-job (AC-4). - F-4 observability: лог-строка разблокировки + Telegram + блок reconcile в /queue. Старт/стоп в main.lifespan (после worker.start() / перед worker.stop()), restart-safe, never-raise на единицу работы. Kill-switches ORCH_RECONCILE_ENABLED / ORCH_RECONCILE_PLANE_ENABLED + grace-настройки. Схема БД и реестры STAGE_TRANSITIONS/QG_CHECKS не менялись. Тесты: test_reconciler.py, test_reconciler_plane.py, test_gitea_sha_resolve.py, test_config.py (33 новых, 563 всего зелёные). Документация обновлена (golden source): architecture/README.md, INFRA.md, README.md, CHANGELOG.md, adr-0007 → accepted. Refs: ORCH-053 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
17
.env.example
17
.env.example
@@ -36,3 +36,20 @@ ORCH_MERGE_RETEST_TARGET=tests/
|
||||
ORCH_MERGE_LOCK_TIMEOUT_S=300
|
||||
ORCH_MERGE_DEFER_DELAY_S=60
|
||||
ORCH_MERGE_DEFER_MAX_ATTEMPTS=5
|
||||
|
||||
# ORCH-053: stuck-task reconciler (sweeper for lost webhooks). A background daemon
|
||||
# replays a missed stage transition through the SAME gates/handlers a webhook would,
|
||||
# fixing tasks that got stuck on a dropped event (502 on rebuild, no Plane/Gitea
|
||||
# retries, unresolved sha->branch).
|
||||
# ENABLED -> global kill-switch (self-hosting safety / staged rollout).
|
||||
# PLANE_ENABLED -> separate flag for the F-2 Plane-API poll (mute only F-2).
|
||||
# INTERVAL_S -> background sweep period (seconds).
|
||||
# GRACE_DEFAULT_S -> default "stuck" threshold on tasks.updated_at (seconds).
|
||||
# GRACE_OVERRIDES_JSON -> per-stage thresholds, e.g. {"development":300}; bad JSON -> default.
|
||||
# NOTIFY_UNBLOCK -> send a Telegram message when a stuck task is unblocked.
|
||||
ORCH_RECONCILE_ENABLED=true
|
||||
ORCH_RECONCILE_PLANE_ENABLED=true
|
||||
ORCH_RECONCILE_INTERVAL_S=120
|
||||
ORCH_RECONCILE_GRACE_DEFAULT_S=600
|
||||
ORCH_RECONCILE_GRACE_OVERRIDES_JSON=
|
||||
ORCH_RECONCILE_NOTIFY_UNBLOCK=true
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- **Sweeper потерянных webhook (реконсиляция застрявших стадий)** (ORCH-053): фоновый daemon-поток `src/reconciler.py` (паттерн `queue_worker`), который устраняет тихое застревание задач, когда конвейер не двигается из-за потерянного события (502 на ребилде инстанса, отсутствие ретраев у Plane/Gitea, неразрезолвленный `sha→branch` — класс инцидента ORCH-044). Реконсилятор периодически (`reconcile_interval_s`) доигрывает пропущенный переход **через те же штатные гейты/обработчики**, что и webhook, не дублируя логику конвейера: **F-1 gate-side** (`reconcile_gate_once`) — для задач `stage≠done`, без активного job и `age(updated_at) ≥ grace_for_stage(stage)` делает read-only пред-оценку канонического QG стадии; зелёный → продвижение строго через неизменный `stage_engine.advance_stage(..., finished_agent=None)`; красный → тишина (спам нотификаций структурно невозможен — `advance_stage` на красном гейте не вызывается вовсе); `analysis` F-1 не трогает (человеческий гейт). **F-2 plane-side** (`reconcile_plane_once`) — опрос Plane API per-project (новый `plane_sync.list_issues_by_state`, курсорная пагинация, never-raise) и реплей In Progress / Approved / Rejected через существующие `webhooks.plane.handle_status_start` / `handle_verdict` (async-обработчики вызываются из sync-потока через `asyncio.run`). **F-3** — усиление `sha→branch` в `handle_ci_status`: при неразрезолвленном sha — БД-fallback по единственной development-задаче repo (`db.get_development_tasks_by_repo`; неоднозначность → не резолвим, ложного матча нет), `logger.debug`→`logger.info` для видимости потерянного CI-события. Анти-дубль на создании задачи (`db.create_task_atomic` под process-wide `threading.Lock`: SELECT-exists→INSERT, проигравший в гонке reconcile↔webhook не плодит второй task/branch/worktree/стартовый analyst-job). Старт/стоп в `main.lifespan` (после `worker.start()` / перед `worker.stop()`), restart-safe, never-raise на единицу работы. Наблюдаемость (F-4): при разблокировке — лог-строка `reconciler: <wi> <stage> разблокирована (потерян webhook)` + Telegram (`reconcile_notify_unblock`) и блок `reconcile` в `GET /queue`. Kill-switches: `ORCH_RECONCILE_ENABLED` (глобально), `ORCH_RECONCILE_PLANE_ENABLED` (гасит только F-2), `ORCH_RECONCILE_INTERVAL_S` (120), `ORCH_RECONCILE_GRACE_DEFAULT_S` (600), `ORCH_RECONCILE_GRACE_OVERRIDES_JSON` (per-stage), `ORCH_RECONCILE_NOTIFY_UNBLOCK` (true). Схема БД и реестры (`STAGE_TRANSITIONS`/`QG_CHECKS`) НЕ менялись. ADR `docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md`, глобальный `docs/architecture/adr/adr-0007-reconciler.md`. Тесты: `tests/test_reconciler.py`, `tests/test_reconciler_plane.py`, `tests/test_gitea_sha_resolve.py`, `tests/test_config.py`.
|
||||
- **Merge-gate: авто-rebase на текущий `origin/main` + повторный прогон тестов + сериализация мержей** (ORCH-043): детерминированный (без LLM) суб-гейт на ребре `deploy-staging → deploy`, выполняемый ПЕРЕД мержем PR деплоером. Закрывает класс гонок «две зелёные ветки в одном репо ломают `main`»: пайплайн валидирует ветку против того `main`, от которого она ответвилась, а не против `main` в момент мержа — между «ветка зелёная» и «ветка смержена» параллельная задача может сдвинуть `main` (семантический конфликт: git мержит без текстового конфликта, но совмещённый `main` красный). Для self-hosting репозитория `orchestrator` это означало бы красный `main` инструмента, обслуживающего ВСЕ проекты. Новый модуль `src/merge_gate.py` (контракт «never raise», все git-операции — в per-branch worktree, ORCH-2/S-4): `branch_is_behind_main` (`git merge-base --is-ancestor origin/main HEAD`), `auto_rebase_onto_main` (rebase + `git push --force-with-lease` ТОЛЬКО ветки задачи — `main` НИКОГДА не пушится; текстовый конфликт → `rebase --abort` + чистый worktree), `retest_branch` (`python -m pytest <target>` в догнанном worktree, бюджет `merge_retest_timeout_s`), файловый merge-lease (`acquire_merge_lease`/`release_merge_lease`, атомарный `O_CREAT|O_EXCL`, holder-aware release, реклейм протухшего/битого лиза — без изменения схемы БД). Новый quality-gate `check_branch_mergeable` (`src/qg/checks.py`, зарегистрирован в `QG_CHECKS`) композирует примитивы под лизом: kill-switch/вне-области → no-op pass; lock занят → `(False, "merge-lock busy")` (сигнал DEFER, не код-фолт); ветка свежая → pass (лиз ДЕРЖИТСЯ до мержа); отстала → rebase → конфликт = fail+release, чисто → retest → зелёный = pass (лиз держится) / красный|timeout = fail+release. Интеграция в `src/stage_engine.py` (суб-гейт на `deploy-staging`, БЕЗ новой стадии в `STAGE_TRANSITIONS`): pass → advance на `deploy`; «merge-lock busy» → DEFER (повторная постановка деплоера на `deploy-staging` с задержкой `available_at`, анти-дедлок при `max_concurrency=1`, restart-safe счётчик по `task_content`, лимит `merge_defer_max_attempts` → block+Telegram); конфликт/красный retest → ROLLBACK на `development` + ретрай developer-а (кап `MAX_DEVELOPER_RETRIES`, без бесконечного баунса). Лиз освобождается на `deploy→done`, на rollback и по webhook смерженного PR (`src/webhooks/gitea.py`). Новый параметр `enqueue_job(..., available_at_delay_s=...)` (`src/db.py`) — отложенная постановка без изменения схемы. Условность раскатки (зеркало ORCH-35): `merge_gate_repos` (CSV) или по умолчанию только self-hosting `orchestrator`; глобальный kill-switch `merge_gate_enabled`. Новые настройки `ORCH_MERGE_GATE_ENABLED` (true), `ORCH_MERGE_GATE_REPOS` (""), `ORCH_MERGE_RETEST_TIMEOUT_S` (600), `ORCH_MERGE_RETEST_TARGET` (tests/), `ORCH_MERGE_LOCK_TIMEOUT_S` (300), `ORCH_MERGE_DEFER_DELAY_S` (60), `ORCH_MERGE_DEFER_MAX_ATTEMPTS` (5). ADR `docs/work-items/ORCH-043/06-adr/ADR-001-merge-gate.md`, глобальный `docs/architecture/adr/adr-0006-merge-gate.md`. Тесты: `tests/test_merge_gate.py`, `tests/test_qg_merge_gate.py`, `tests/test_merge_gate_race.py`, `tests/test_stage_engine.py::TestMergeGate`, `tests/test_config.py`.
|
||||
- **Режим `bump` live-трекера Telegram** (ORCH-042): новый `ORCH_TRACKER_MODE` (`Settings.tracker_mode`, дефолт `edit`) выбирает поведение карточки задачи. `edit` (как было) — карточка редактируется на месте (`editMessageText`). `bump` — на каждом обновлении старое сообщение удаляется и карточка отправляется заново вниз чата (best-effort `delete_telegram(старый_id)` → `send_telegram(text, disable_notification=True)` → `set_tracker_message_id(new_id)`), чтобы актуальный статус всегда был последним в чате при активной переписке. Инвариант «одна карточка на задачу» сохранён в обоих режимах: за один вызов `update_task_tracker` шлётся ≤1 нового сообщения; `set_tracker_message_id` вызывается ТОЛЬКО при успешном send (транзиентный `None` не затирает указатель); результат delete НЕ блокирует отправку новой карточки (delete-fail у сообщения >48ч → всё равно шлём новое). Резолюция режима в `notifications` (case-insensitive, trim): всё, что ≠ `"bump"` (включая пустое/мусор) → `edit` → нулевая регрессия и оркестратор не падает на любом значении флага. Новый low-level helper `delete_telegram(message_id) -> bool` (контракт «never raises», маркеры `_DELETE_GONE_MARKERS`): `ok:true` или «уже нет / нельзя удалить» → `True`; неизвестный `ok:false`/5xx/исключение → `False`; нет кредов → `False` без HTTP. Сигнатуры `send_telegram`/`edit_telegram`/`update_task_tracker` и схема БД (`tasks.tracker_message_id`) не менялись. ADR `docs/work-items/ORCH-042/06-adr/ADR-001-tracker-bump-mode.md`. Тесты: `tests/test_tracker_bump.py`, `tests/test_config.py`.
|
||||
- **Дословный текст findings reviewer/tester встраивается в `task_desc` заворота** (ORCH-046): при откате на `development` строка `task_desc` (попадает в `.task-dev.md` developer-агента) теперь несёт суть претензий, а не только ссылку на файл — устраняет «испорченный телефон», из-за которого агент шёл «читать файл», терял ключевые P0/P1 / причину FAIL и заворачивался снова, выжигая `MAX_DEVELOPER_RETRIES` и токены. Новый defensive-модуль `src/review_parse.py` (контракт «never raise», как `src/frontmatter.py`): `extract_review_findings(path)` — дословные пункты P0/P1 из секции `## Findings` файла `12-review.md`; `extract_test_failures(path)` — релевантный фрагмент тела `13-test-report.md` (приоритет `## Вывод pytest` → FAIL-строки `## Результаты` → `## Итог`). Обе функции усекают результат до `MAX_FINDINGS_CHARS`/`MAX_FAILURES_CHARS` (≈2000) с маркером `…(truncated)`. Две rollback-ветки `src/stage_engine.py` (reviewer REQUEST_CHANGES, tester `check_tests_passed` FAIL) встраивают извлечённый текст и **сохраняют ссылку** на полный файл («Полный контекст»); при пустом/битом артефакте — graceful-фоллбэк на прежнюю ссылку-строку (никаких исключений в `advance_stage`). Tester-ветка дополнительно всегда включает `reason` гейта. Последовательность отката, `_developer_retry_count`, поля `AdvanceResult` и реестр `QG_CHECKS` не менялись. ADR `docs/work-items/ORCH-046/06-adr/ADR-001-embed-findings-in-task-desc.md`. Тесты: `tests/test_review_parse.py`, `tests/test_stage_engine.py::TestRollbackTaskDescEmbedding`.
|
||||
|
||||
@@ -129,6 +129,12 @@ uvicorn src.main:app --reload --port 8500
|
||||
| `ORCH_TRANSIENT_MAX_ATTEMPTS` | Ретраи для 429/недоступности | `5` |
|
||||
| `ORCH_BREAKER_THRESHOLD` | transient подряд до открытия breaker | `3` |
|
||||
| `ORCH_BREAKER_PAUSE_SECONDS` | Пауза при открытом breaker | `300` |
|
||||
| `ORCH_RECONCILE_ENABLED` | Kill-switch sweeper потерянных webhook (ORCH-053) | `true` |
|
||||
| `ORCH_RECONCILE_PLANE_ENABLED` | Отдельный флаг F-2 (опрос Plane API) | `true` |
|
||||
| `ORCH_RECONCILE_INTERVAL_S` | Период фонового прохода reconciler, сек | `120` |
|
||||
| `ORCH_RECONCILE_GRACE_DEFAULT_S` | Порог «застряла» по `tasks.updated_at`, сек | `600` |
|
||||
| `ORCH_RECONCILE_GRACE_OVERRIDES_JSON` | Per-stage пороги, напр. `{"development":300}` | `""` |
|
||||
| `ORCH_RECONCILE_NOTIFY_UNBLOCK` | Telegram при разблокировке застрявшей задачи | `true` |
|
||||
|
||||
## Очередь задач (ORCH-1 / F-2b)
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
- **Quality Gates** (`src/qg/checks.py`) — проверки выхода со стадии, реестр `QG_CHECKS`.
|
||||
- **Agent Launcher** (`src/agents/launcher.py`) — запуск Claude CLI агентов в изолированном git worktree, мониторинг, auto-advance.
|
||||
- **Queue** (`src/queue_worker.py`, ORCH-1) — персистентная очередь задач (SQLite `jobs`), atomic claim, max_concurrency, ретраи, restart-safe.
|
||||
- **Reconciler** (`src/reconciler.py`, ORCH-053 — design, [adr-0007](adr/adr-0007-reconciler.md)) — фоновый daemon-поток (паттерн `queue_worker`), реконсилирует рассинхрон «источник истины ≠ стадия задачи» при потерянном webhook. F-1 gate-side (продвигает застрявшую стадию по локальной БД через штатный `advance_stage`), F-2 plane-side (опрос Plane API → `handle_*` из `plane.py`). Источник истины — гейт/Plane, не событие; идемпотентность (active-job guard + atomic-claim + grace); kill-switch `ORCH_RECONCILE_ENABLED`. `analysis` F-1 не трогает (человеческий гейт).
|
||||
- **Reconciler** (`src/reconciler.py`, ORCH-053 — реализовано, [adr-0007](adr/adr-0007-reconciler.md)) — фоновый daemon-поток (паттерн `queue_worker`), стартует/останавливается в `main.lifespan` (после `worker.start()` / перед `worker.stop()`). Реконсилирует рассинхрон «источник истины ≠ стадия задачи» при потерянном webhook. F-1 gate-side (продвигает застрявшую стадию по локальной БД через штатный `advance_stage(..., finished_agent=None)`), F-2 plane-side (опрос Plane API → `handle_*` из `plane.py`), F-3 (БД-fallback `sha→branch` в `handle_ci_status`). Источник истины — гейт/Plane, не событие; идемпотентность (active-job guard + atomic-claim + grace); kill-switch `ORCH_RECONCILE_ENABLED`. `analysis` F-1 не трогает (человеческий гейт). Наблюдаемость — блок `reconcile` в `GET /queue`.
|
||||
- **Project Registry** (`src/projects.py`, ORCH-6) — Plane project id → repo + prefix; фильтрация вебхуков по проекту.
|
||||
- **Plane Sync** (`src/plane_sync.py`) — синхронизация статусов/комментариев в Plane.
|
||||
|
||||
@@ -53,7 +53,7 @@ created → analysis → architecture → development → review → testing →
|
||||
|
||||
Подробнее: [adr-0006](adr/adr-0006-merge-gate.md), детально — `docs/work-items/ORCH-043/06-adr/ADR-001-merge-gate.md`.
|
||||
|
||||
### Reconciler: реконсиляция потерянных webhook (ORCH-053 — design)
|
||||
### Reconciler: реконсиляция потерянных webhook (ORCH-053 — реализовано)
|
||||
Конвейер продвигается только входящими webhook; потерянное событие (502 на ребилде,
|
||||
нет ретраев у Plane/Gitea, неразрезолвленный `sha→branch`) → задача застревает молча
|
||||
(инцидент ORCH-044). Фоновый поток `reconciler` периодически (`reconcile_interval_s`)
|
||||
@@ -65,7 +65,15 @@ created → analysis → architecture → development → review → testing →
|
||||
тишина (спам нотификаций структурно невозможен). `analysis` не реконсилируется.
|
||||
- **F-2 plane-side:** опрос Plane API per-project → `handle_status_start` /
|
||||
`handle_verdict` из `webhooks/plane.py` (логика не дублируется).
|
||||
- **F-3:** усиление `sha→branch` в `handle_ci_status` (БД-fallback).
|
||||
- **F-3:** усиление `sha→branch` в `handle_ci_status` (БД-fallback по единственной
|
||||
development-задаче repo; неоднозначность → не резолвим).
|
||||
- **F-4 observability:** при разблокировке — лог-строка `reconciler: <wi> <stage>
|
||||
разблокирована (потерян webhook)` + Telegram (`reconcile_notify_unblock`); снимок
|
||||
состояния в `GET /queue` (блок `reconcile`).
|
||||
|
||||
Реализация: `src/reconciler.py` (daemon-поток по образцу `queue_worker`), стартует в
|
||||
`main.lifespan` **после** `worker.start()`, останавливается в `finally` **перед**
|
||||
`worker.stop()`.
|
||||
|
||||
Инварианты: источник истины — гейт/Plane, не событие; идемпотентность (active-job
|
||||
guard + atomic-claim на создании под process-wide Lock + grace + `max_concurrency=1`);
|
||||
@@ -117,7 +125,7 @@ never-raise на единицу работы; тишина при синхрон
|
||||
|--------|------|----------|
|
||||
| GET | `/health` | health check |
|
||||
| GET | `/status` | активные задачи (stage != done) |
|
||||
| GET | `/queue` | очередь: counts + max_concurrency + последние jobs |
|
||||
| GET | `/queue` | очередь: counts + max_concurrency + resilience + reconcile (ORCH-053) + последние jobs |
|
||||
| POST | `/webhook/plane` | Plane webhook |
|
||||
| POST | `/webhook/gitea` | Gitea webhook (push, PR, CI status) |
|
||||
|
||||
@@ -131,4 +139,4 @@ never-raise на единицу работы; тишина при синхрон
|
||||
Схема БД, потоки данных, resilience-слой, детали Dockerfile — [internals.md](internals.md).
|
||||
|
||||
---
|
||||
*Актуально на 2026-06-06. Обновлять при изменении src/stages.py, src/qg/checks.py, src/main.py. ORCH-043: merge-gate — design (см. adr-0006), реализация в ветке feature/ORCH-043. ORCH-053: reconciler — design (см. adr-0007), реализация в ветке feature/ORCH-053.*
|
||||
*Актуально на 2026-06-06. Обновлять при изменении src/stages.py, src/qg/checks.py, src/main.py. ORCH-043: merge-gate — design (см. adr-0006), реализация в ветке feature/ORCH-043. ORCH-053: reconciler — реализовано (см. adr-0007, src/reconciler.py).*
|
||||
|
||||
@@ -11,7 +11,7 @@ Per-work-item решения живут в `docs/work-items/<id>/06-adr/ADR-NNN-
|
||||
| adr-0004 | Поллинг с ретраем в check_ci_green (фикс CI-race) | accepted | 2026-06-05 | ORCH-045 |
|
||||
| adr-0005 | Контейнеры бегут под uid:gid хоста (1000:1000) | accepted | 2026-06-06 | ORCH-040 |
|
||||
| adr-0006 | Merge-gate (догон main + re-test + сериализация слияний) | proposed | 2026-06-06 | ORCH-043 |
|
||||
| adr-0007 | Reconciler застрявших стадий (sweeper потерянных webhook) | proposed | 2026-06-06 | ORCH-053 |
|
||||
| adr-0007 | Reconciler застрявших стадий (sweeper потерянных webhook) | accepted | 2026-06-06 | ORCH-053 |
|
||||
|
||||
## Формат
|
||||
**Контекст → Решение → Альтернативы → Последствия → Связи.** Статус: proposed / accepted / superseded.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# adr-0007: Reconciler застрявших стадий (sweeper потерянных webhook)
|
||||
|
||||
- **Статус:** proposed
|
||||
- **Статус:** accepted (реализовано в `src/reconciler.py`)
|
||||
- **Дата:** 2026-06-06
|
||||
- **Задача:** ORCH-053
|
||||
- **Детальный ADR:** `docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md`
|
||||
|
||||
@@ -75,6 +75,12 @@ ADR `docs/work-items/ORCH-040/06-adr/ADR-001-run-agents-as-host-uid.md` и гл
|
||||
| `ORCH_AGENT_EFFORT_DEFAULT` | режим работы `--effort` по умолчанию (ORCH-41): low\|medium\|high\|xhigh\|max; дефолт `high` |
|
||||
| `ORCH_AGENT_EFFORT_<AGENT>` | per-agent effort; дефолт: думающие → high, tester/deployer → medium |
|
||||
| `ORCH_AGENT_FALLBACK_MODEL` | опц. фолбэк-модель при overloaded (`--fallback-model`); пусто → без флага |
|
||||
| `ORCH_RECONCILE_ENABLED` | kill-switch sweeper потерянных webhook (ORCH-053); дефолт `true`. **При инциденте/раскатке** — `false` глушит весь фоновый reconciler |
|
||||
| `ORCH_RECONCILE_PLANE_ENABLED` | отдельный флаг F-2 (опрос Plane API); `false` гасит только plane-ветку, F-1 продолжает работать; дефолт `true` |
|
||||
| `ORCH_RECONCILE_INTERVAL_S` | период фонового прохода reconciler, сек; дефолт `120` |
|
||||
| `ORCH_RECONCILE_GRACE_DEFAULT_S` | порог «застряла» по `tasks.updated_at`, сек; дефолт `600` |
|
||||
| `ORCH_RECONCILE_GRACE_OVERRIDES_JSON` | per-stage пороги, напр. `{"development":300}`; невалидный JSON → дефолт |
|
||||
| `ORCH_RECONCILE_NOTIFY_UNBLOCK` | слать Telegram при разблокировке застрявшей задачи; дефолт `true` |
|
||||
| `DEPLOY_SSH_USER` / `_HOST` / `DEPLOY_HOOK_SCRIPT` | параметры деплой-хука |
|
||||
|
||||
**Секреты — только в `.env` / `.env.staging` на хосте, в гит НЕ коммитятся.** Канон — `.env.example`, `.env.staging.example`.
|
||||
|
||||
@@ -152,6 +152,28 @@ class Settings(BaseSettings):
|
||||
merge_defer_delay_s: int = 60
|
||||
merge_defer_max_attempts: int = 5
|
||||
|
||||
# ORCH-053: stuck-task reconciler (sweeper for lost webhooks). A background
|
||||
# daemon thread reconciles the "source of truth (gate / Plane) != task stage"
|
||||
# drift left behind by a dropped webhook (502 on rebuild, no Plane/Gitea
|
||||
# retries, unresolved sha->branch). See docs/architecture/adr/adr-0007-reconciler.md.
|
||||
# reconcile_enabled -> global kill-switch (self-hosting safety,
|
||||
# staged rollout, env ORCH_RECONCILE_ENABLED).
|
||||
# reconcile_interval_s -> background sweep period (seconds).
|
||||
# reconcile_plane_enabled -> separate flag for the F-2 Plane-API poll so
|
||||
# only the plane branch can be muted.
|
||||
# reconcile_grace_default_s -> default "stuck" threshold on tasks.updated_at.
|
||||
# reconcile_grace_overrides_json -> JSON object of per-stage thresholds, e.g.
|
||||
# {"analysis": 1800, "development": 300}. Invalid
|
||||
# JSON -> default (mirrors agent_timeout_overrides_json).
|
||||
# reconcile_notify_unblock -> send a Telegram message when a stuck task is
|
||||
# unblocked (F-4 observability).
|
||||
reconcile_enabled: bool = True
|
||||
reconcile_interval_s: int = 120
|
||||
reconcile_plane_enabled: bool = True
|
||||
reconcile_grace_default_s: int = 600
|
||||
reconcile_grace_overrides_json: str = ""
|
||||
reconcile_notify_unblock: bool = True
|
||||
|
||||
# Telegram notifications
|
||||
telegram_bot_token: str = ""
|
||||
telegram_chat_id: str = ""
|
||||
|
||||
93
src/db.py
93
src/db.py
@@ -1,6 +1,15 @@
|
||||
import sqlite3
|
||||
import threading
|
||||
from .config import settings
|
||||
|
||||
# ORCH-053 (F-2 anti-dup): process-wide lock guarding the SELECT-exists -> INSERT
|
||||
# task-creation claim. The prod topology is a single uvicorn process per DB
|
||||
# (staging/prod isolated), with the webhook running in uvicorn's asyncio thread
|
||||
# and the reconciler in its own thread of the SAME process -> a threading.Lock
|
||||
# covers both sides of the create race without a schema migration. See
|
||||
# docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md §4.
|
||||
_CREATE_TASK_LOCK = threading.Lock()
|
||||
|
||||
|
||||
def get_db() -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(settings.db_path)
|
||||
@@ -145,6 +154,90 @@ def get_task_by_repo_branch(repo: str, branch: str) -> dict | None:
|
||||
return None
|
||||
|
||||
|
||||
def get_active_tasks_for_reconcile() -> list[dict]:
|
||||
"""ORCH-053 (F-1): tasks eligible for the gate-side sweeper.
|
||||
|
||||
Returns every task whose stage is not terminal ('done'), each augmented with
|
||||
``age_s`` = seconds since ``tasks.updated_at`` (computed in SQL against UTC
|
||||
'now', matching how ``update_task_stage`` stamps ``updated_at``). The
|
||||
reconciler applies the per-stage grace and active-job guard on top.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"SELECT *, "
|
||||
"CAST(strftime('%s','now') - strftime('%s', updated_at) AS INTEGER) AS age_s "
|
||||
"FROM tasks WHERE stage != 'done'"
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
def get_development_tasks_by_repo(repo: str) -> list[dict]:
|
||||
"""ORCH-053 (F-3): tasks of a repo currently on the 'development' stage.
|
||||
|
||||
Used as the sha->branch DB fallback in handle_ci_status: a CI-status webhook
|
||||
whose branch could not be resolved (no branches[], empty
|
||||
``git branch -r --contains``) is matched to the unique development task of
|
||||
the repo (ambiguity -> caller leaves it unresolved).
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM tasks WHERE repo = ? AND stage = 'development'", (repo,)
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
def create_task_atomic(
|
||||
plane_id: str,
|
||||
work_item_id: str,
|
||||
repo: str,
|
||||
branch: str,
|
||||
stage: str,
|
||||
title: str,
|
||||
) -> tuple[dict, bool]:
|
||||
"""ORCH-053 (AC-4): atomically claim creation of a task for a plane_id.
|
||||
|
||||
Performs SELECT-exists -> INSERT under the process-wide ``_CREATE_TASK_LOCK``
|
||||
so a race between the live Plane webhook and the F-2 reconciler (both seeing
|
||||
"no task yet" for the same plane_id) cannot create two task rows / branches /
|
||||
worktrees / starter analyst jobs.
|
||||
|
||||
Returns ``(row, created)``:
|
||||
* ``created=True`` -> THIS caller inserted the row and owns the follow-up
|
||||
work (branch / docs / analyst enqueue);
|
||||
* ``created=False`` -> a task for this plane_id already existed (the other
|
||||
racer won); ``row`` is the existing task and the caller must NOT duplicate
|
||||
the follow-up work.
|
||||
"""
|
||||
with _CREATE_TASK_LOCK:
|
||||
conn = get_db()
|
||||
try:
|
||||
existing = conn.execute(
|
||||
"SELECT * FROM tasks WHERE plane_id = ? OR plane_issue_id = ?",
|
||||
(plane_id, plane_id),
|
||||
).fetchone()
|
||||
if existing:
|
||||
return dict(existing), False
|
||||
cur = conn.execute(
|
||||
"INSERT INTO tasks "
|
||||
"(plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(plane_id, work_item_id, repo, branch, stage, plane_id, title),
|
||||
)
|
||||
conn.commit()
|
||||
row = conn.execute(
|
||||
"SELECT * FROM tasks WHERE id = ?", (cur.lastrowid,)
|
||||
).fetchone()
|
||||
return dict(row), True
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def update_task_stage(task_id: int, stage: str):
|
||||
"""Update task stage and timestamp."""
|
||||
conn = get_db()
|
||||
|
||||
14
src/main.py
14
src/main.py
@@ -80,11 +80,19 @@ async def lifespan(app: FastAPI):
|
||||
from .queue_worker import worker
|
||||
worker.start()
|
||||
|
||||
# ORCH-053: start the stuck-task reconciler AFTER the worker so its active-job
|
||||
# guard sees a fully-initialised queue. Kill-switch: ORCH_RECONCILE_ENABLED.
|
||||
from .reconciler import reconciler
|
||||
reconciler.start()
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
# Graceful shutdown of the worker (running agents keep going; their jobs
|
||||
# are requeued on next start via queue-recovery if the process dies).
|
||||
# Graceful shutdown order mirrors startup in reverse: stop the reconciler
|
||||
# first (it must not enqueue new work while the worker is winding down),
|
||||
# then the worker. Running agents keep going; their jobs are requeued on
|
||||
# next start via queue-recovery if the process dies.
|
||||
reconciler.stop()
|
||||
worker.stop()
|
||||
|
||||
|
||||
@@ -114,10 +122,12 @@ async def queue():
|
||||
"""ORCH-1: job-queue observability — status counts + recent jobs."""
|
||||
from .db import job_status_counts, recent_jobs
|
||||
from .queue_worker import worker
|
||||
from .reconciler import reconciler
|
||||
return {
|
||||
"counts": job_status_counts(),
|
||||
"max_concurrency": worker.max_concurrency,
|
||||
"poll_interval": worker.poll_interval,
|
||||
"resilience": worker.status(),
|
||||
"reconcile": reconciler.status(),
|
||||
"recent": recent_jobs(10),
|
||||
}
|
||||
|
||||
@@ -356,6 +356,62 @@ def fetch_issue_fields(issue_id: str, project_id: str) -> tuple[str, str]:
|
||||
return "", ""
|
||||
|
||||
|
||||
def list_issues_by_state(project_id: str, state_uuids: list[str]) -> list[dict]:
|
||||
"""ORCH-053 (F-2): list a project's issues whose state is in ``state_uuids``.
|
||||
|
||||
GETs ``/workspaces/{ws}/projects/{pid}/issues/`` and walks ALL pages
|
||||
(Plane's cursor pagination: ``results`` + ``next_cursor`` /
|
||||
``next_page_results``), keeping only issues whose state uuid is one of the
|
||||
requested ones. The filter is applied client-side on ``issue.state`` (a dict
|
||||
``{id,...}`` or a bare uuid string) so it works regardless of whether Plane's
|
||||
query-param state filter is honoured.
|
||||
|
||||
Never raises: on any network / API / shape error it logs a warning and
|
||||
returns ``[]`` so a Plane outage degrades the F-2 tick softly instead of
|
||||
crashing it.
|
||||
"""
|
||||
if not project_id or not state_uuids:
|
||||
return []
|
||||
wanted = set(state_uuids)
|
||||
out: list[dict] = []
|
||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/"
|
||||
try:
|
||||
cursor = None
|
||||
pages = 0
|
||||
while True:
|
||||
params: dict = {"per_page": 100}
|
||||
if cursor:
|
||||
params["cursor"] = cursor
|
||||
resp = httpx.get(url, headers=PLANE_HEADERS, params=params, timeout=10)
|
||||
resp.raise_for_status()
|
||||
body = resp.json()
|
||||
if isinstance(body, dict):
|
||||
items = body.get("results", [])
|
||||
else:
|
||||
items = body if isinstance(body, list) else []
|
||||
for issue in items:
|
||||
state = issue.get("state")
|
||||
sid = state.get("id") if isinstance(state, dict) else state
|
||||
if sid in wanted:
|
||||
out.append(issue)
|
||||
# Pagination: continue only while Plane reports more pages.
|
||||
pages += 1
|
||||
if not isinstance(body, dict):
|
||||
break
|
||||
has_more = bool(body.get("next_page_results"))
|
||||
next_cursor = body.get("next_cursor")
|
||||
if not has_more or not next_cursor or pages >= 100:
|
||||
break
|
||||
cursor = next_cursor
|
||||
return out
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"list_issues_by_state: API failed for project {project_id[:8]}..., "
|
||||
f"returning []. Error: {e}"
|
||||
)
|
||||
return []
|
||||
|
||||
|
||||
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
|
||||
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
|
||||
project_id = _resolve_project_id(work_item_id, project_id)
|
||||
|
||||
332
src/reconciler.py
Normal file
332
src/reconciler.py
Normal file
@@ -0,0 +1,332 @@
|
||||
"""ORCH-053: stuck-task reconciler (sweeper for lost webhooks).
|
||||
|
||||
The pipeline advances ONLY on incoming webhooks (Plane status / Gitea CI/PR). A
|
||||
dropped event (502 on a rebuilding instance, no Plane/Gitea retries, an
|
||||
unresolved ``sha->branch``) leaves the source of truth (the gate / the Plane
|
||||
status) changed while the task stays put — a silently stuck task (incident
|
||||
ORCH-044). None of the existing resilience layers (``requeue_running_jobs``,
|
||||
orphan-recovery, events de-dup, ``ci_poll``) reconcile this
|
||||
"source-of-truth != task-stage" drift; they all work at the jobs/agent_runs
|
||||
level, not the stage transition.
|
||||
|
||||
This module is a background daemon thread (modelled on ``queue_worker``) that
|
||||
periodically replays the missed transition through the SAME standard gates /
|
||||
handlers a webhook would use:
|
||||
|
||||
* **F-1 gate-side** (``reconcile_gate_once``): for each task with
|
||||
``stage != 'done'``, no active job and ``age(updated_at) >=
|
||||
grace_for_stage(stage)``, do a read-only pre-evaluation of the stage's
|
||||
canonical quality gate; green -> advance through the unchanged
|
||||
``stage_engine.advance_stage(..., finished_agent=None)``; red -> silence
|
||||
(no advance, no notification). ``analysis`` is NOT reconciled here (human
|
||||
gate; owned by F-2).
|
||||
|
||||
* **F-2 plane-side** (``reconcile_plane_once``): poll the Plane API per
|
||||
project (``list_issues_by_state``) and replay In Progress / Approved /
|
||||
Rejected through ``webhooks.plane.handle_status_start`` /
|
||||
``handle_verdict`` (no logic duplicated).
|
||||
|
||||
Invariants: source of truth is the gate / Plane (not the event); advance only
|
||||
via ``advance_stage``; idempotency (active-job guard + atomic create-claim +
|
||||
grace + ``max_concurrency=1``); never-raise per unit of work; silence when in
|
||||
sync; restart-safe; kill-switch ``ORCH_RECONCILE_ENABLED``
|
||||
(+ ``ORCH_RECONCILE_PLANE_ENABLED`` mutes only F-2). The DB schema and the
|
||||
registries (``STAGE_TRANSITIONS`` / ``QG_CHECKS``) are unchanged.
|
||||
|
||||
See docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md and the
|
||||
cross-cutting docs/architecture/adr/adr-0007-reconciler.md.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from .config import settings
|
||||
from .db import (
|
||||
get_active_tasks_for_reconcile,
|
||||
get_task_by_plane_id,
|
||||
has_active_job_for_task,
|
||||
)
|
||||
from .stage_engine import advance_if_gate_passed
|
||||
from .stages import get_qg_for_stage
|
||||
from .plane_sync import get_project_states, list_issues_by_state
|
||||
from .webhooks.plane import handle_status_start, handle_verdict
|
||||
from .notifications import send_telegram
|
||||
from . import projects
|
||||
|
||||
logger = logging.getLogger("orchestrator.reconciler")
|
||||
|
||||
|
||||
def _parse_grace_overrides(raw: str) -> dict[str, int]:
|
||||
"""Parse ``reconcile_grace_overrides_json`` into {stage: seconds}.
|
||||
|
||||
Invalid / non-object JSON -> {} (caller falls back to the default grace),
|
||||
mirroring the never-raise contract of ``agent_timeout_overrides_json``.
|
||||
"""
|
||||
if not raw or not raw.strip():
|
||||
return {}
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.warning(f"reconcile_grace_overrides_json is not valid JSON, ignoring: {e}")
|
||||
return {}
|
||||
if not isinstance(data, dict):
|
||||
logger.warning("reconcile_grace_overrides_json must be a JSON object, ignoring")
|
||||
return {}
|
||||
out: dict[str, int] = {}
|
||||
for k, v in data.items():
|
||||
try:
|
||||
out[str(k)] = int(v)
|
||||
except (ValueError, TypeError):
|
||||
logger.warning(f"reconcile_grace_overrides_json[{k}] is not an int, ignoring")
|
||||
return out
|
||||
|
||||
|
||||
def grace_for_stage(stage: str) -> int:
|
||||
"""Per-stage "stuck" threshold (seconds): override from JSON, else default."""
|
||||
overrides = _parse_grace_overrides(settings.reconcile_grace_overrides_json)
|
||||
return overrides.get(stage, settings.reconcile_grace_default_s)
|
||||
|
||||
|
||||
def _age_seconds_iso(ts: str) -> float | None:
|
||||
"""Age in seconds of a Plane ISO-8601 timestamp (e.g. issue.updated_at).
|
||||
|
||||
Returns None when the value is missing / unparseable (caller decides the
|
||||
fallback). Handles a trailing 'Z' and treats naive timestamps as UTC.
|
||||
"""
|
||||
if not ts:
|
||||
return None
|
||||
try:
|
||||
text = ts.strip()
|
||||
if text.endswith("Z"):
|
||||
text = text[:-1] + "+00:00"
|
||||
dt = datetime.fromisoformat(text)
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return (datetime.now(timezone.utc) - dt).total_seconds()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class Reconciler:
|
||||
"""Background daemon that reconciles webhook-induced stage drift.
|
||||
|
||||
Modelled on ``QueueWorker``: a plain ``threading.Thread(daemon=True)`` +
|
||||
``threading.Event`` for a clean stop. No correctness-critical state is held
|
||||
in memory — every tick re-reads the DB / Plane; the observability counters
|
||||
(``last_run_ts`` / ``unblocked_total`` / ``last_unblocked``) are best-effort
|
||||
and may reset on restart (AC-11 allows this).
|
||||
"""
|
||||
|
||||
def __init__(self, interval_s: float | None = None):
|
||||
self.interval_s = (
|
||||
interval_s if interval_s is not None else settings.reconcile_interval_s
|
||||
)
|
||||
self._stop = threading.Event()
|
||||
self._thread: threading.Thread | None = None
|
||||
# Best-effort observability (F-4).
|
||||
self.last_run_ts: float | None = None
|
||||
self.unblocked_total: int = 0
|
||||
self.last_unblocked: str | None = None
|
||||
|
||||
# -- F-1: gate-side ----------------------------------------------------
|
||||
def reconcile_gate_once(self) -> None:
|
||||
"""One F-1 pass over all non-terminal tasks (per-task never-raise)."""
|
||||
if not settings.reconcile_enabled:
|
||||
return
|
||||
for task in get_active_tasks_for_reconcile():
|
||||
try:
|
||||
self._reconcile_gate_task(task)
|
||||
except Exception as e: # noqa: BLE001 - isolate one task's failure
|
||||
logger.error(
|
||||
f"reconciler F-1: task {task.get('id')} "
|
||||
f"(stage={task.get('stage')}) failed: {e}"
|
||||
)
|
||||
|
||||
def _reconcile_gate_task(self, task: dict) -> None:
|
||||
task_id = task["id"]
|
||||
stage = task["stage"]
|
||||
# AC-16: analysis is a human gate -> owned by F-2, never F-1.
|
||||
if stage == "analysis":
|
||||
return
|
||||
# created / done have no gate to evaluate.
|
||||
if get_qg_for_stage(stage) is None:
|
||||
return
|
||||
# AC-3: a queued/running job means the task is legitimately in flight (or
|
||||
# a live webhook just enqueued one) -> do not touch it.
|
||||
if has_active_job_for_task(task_id):
|
||||
return
|
||||
# AC-5: respect the per-stage grace ("stuck", not just busy).
|
||||
age_s = task.get("age_s") or 0
|
||||
if age_s < grace_for_stage(stage):
|
||||
return
|
||||
result = advance_if_gate_passed(
|
||||
task_id,
|
||||
stage,
|
||||
task["repo"],
|
||||
task.get("work_item_id") or "",
|
||||
task.get("branch") or "",
|
||||
)
|
||||
if result is not None and getattr(result, "advanced", False):
|
||||
self._note_unblock(task.get("work_item_id") or str(task_id), stage)
|
||||
|
||||
# -- F-2: plane-side ---------------------------------------------------
|
||||
def reconcile_plane_once(self) -> None:
|
||||
"""One F-2 pass: poll Plane per project and replay missed transitions."""
|
||||
if not settings.reconcile_enabled or not settings.reconcile_plane_enabled:
|
||||
return
|
||||
for proj in projects.PROJECTS:
|
||||
try:
|
||||
self._reconcile_plane_project(proj)
|
||||
except Exception as e: # noqa: BLE001 - isolate one project's failure
|
||||
logger.error(f"reconciler F-2: project {proj.repo} failed: {e}")
|
||||
|
||||
def _reconcile_plane_project(self, proj) -> None:
|
||||
pid = proj.plane_project_id
|
||||
# Resolve the actionable state uuids per-project (never hardcode).
|
||||
states = get_project_states(pid)
|
||||
in_progress = states["in_progress"]
|
||||
approved = states["approved"]
|
||||
rejected = states["rejected"]
|
||||
issues = list_issues_by_state(pid, [in_progress, approved, rejected])
|
||||
for issue in issues:
|
||||
try:
|
||||
self._reconcile_plane_issue(
|
||||
issue, pid, in_progress, approved, rejected
|
||||
)
|
||||
except Exception as e: # noqa: BLE001 - isolate one issue's failure
|
||||
logger.error(
|
||||
f"reconciler F-2: issue {issue.get('id')} failed: {e}"
|
||||
)
|
||||
|
||||
def _reconcile_plane_issue(
|
||||
self, issue: dict, project_id: str,
|
||||
in_progress: str, approved: str, rejected: str,
|
||||
) -> None:
|
||||
issue_id = str(issue.get("id") or "")
|
||||
if not issue_id:
|
||||
return
|
||||
state = issue.get("state")
|
||||
new_state = state.get("id") if isinstance(state, dict) else state
|
||||
|
||||
# Grace ("lost, not merely delayed"): use the issue's own updated_at age.
|
||||
# A missing/unparseable timestamp is treated as old enough (the active-job
|
||||
# guard + atomic create-claim still prevent doubling).
|
||||
age = _age_seconds_iso(issue.get("updated_at") or "")
|
||||
if age is not None and age < settings.reconcile_grace_default_s:
|
||||
return
|
||||
|
||||
task = get_task_by_plane_id(issue_id)
|
||||
# AC-3/AC-4: a live webhook is in flight for this task -> skip.
|
||||
if task is not None and has_active_job_for_task(task["id"]):
|
||||
return
|
||||
|
||||
# issue_data in the shape the plane handlers expect; missing name /
|
||||
# description are pulled by the handlers themselves (fetch_issue_fields).
|
||||
issue_data = {
|
||||
"id": issue_id,
|
||||
"state": {"id": new_state},
|
||||
"project": project_id,
|
||||
"name": issue.get("name", ""),
|
||||
"description_stripped": issue.get("description_stripped", ""),
|
||||
}
|
||||
|
||||
if new_state == in_progress and task is None:
|
||||
# In Progress without a task -> start the pipeline (lost start webhook).
|
||||
self._dispatch(handle_status_start, issue_data, project_id)
|
||||
self._note_unblock(issue_id, "analysis")
|
||||
elif new_state == approved and task is not None:
|
||||
# Approved but the stage never advanced -> replay the verdict.
|
||||
self._dispatch(handle_verdict, issue_data, project_id, approved=True)
|
||||
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"])
|
||||
elif new_state == rejected and task is not None:
|
||||
# Rejected but never rolled back -> replay the verdict.
|
||||
self._dispatch(handle_verdict, issue_data, project_id, approved=False)
|
||||
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"])
|
||||
# else: everything is in sync -> silence (AC-10).
|
||||
|
||||
@staticmethod
|
||||
def _dispatch(coro_fn, *args, **kwargs) -> None:
|
||||
"""Run an async plane handler from this sync thread.
|
||||
|
||||
``asyncio.run`` spins a fresh event loop per call, which is required
|
||||
because ``handle_verdict -> _try_advance_stage`` uses
|
||||
``asyncio.to_thread`` (needs a running loop). The handlers are
|
||||
REUSED verbatim — no pipeline logic is duplicated here.
|
||||
"""
|
||||
asyncio.run(coro_fn(*args, **kwargs))
|
||||
|
||||
# -- observability (F-4) ----------------------------------------------
|
||||
def _note_unblock(self, work_item_id: str, stage: str) -> None:
|
||||
"""Record + announce that a stuck task was unblocked (AC-12).
|
||||
|
||||
Fires only on an actual state change (an advance / replayed transition),
|
||||
never per idle tick, so it does not conflict with AC-9 / AC-10.
|
||||
"""
|
||||
self.unblocked_total += 1
|
||||
self.last_unblocked = work_item_id
|
||||
logger.info(
|
||||
f"reconciler: {work_item_id} {stage} разблокирована (потерян webhook)"
|
||||
)
|
||||
if settings.reconcile_notify_unblock:
|
||||
try:
|
||||
send_telegram(
|
||||
f"\U0001f527 reconciler: {work_item_id} {stage} "
|
||||
f"разблокирована (потерян webhook)"
|
||||
)
|
||||
except Exception as e: # noqa: BLE001 - never break the tick
|
||||
logger.warning(f"reconciler: unblock telegram failed: {e}")
|
||||
|
||||
# -- loop / lifecycle --------------------------------------------------
|
||||
def _tick(self) -> None:
|
||||
if settings.reconcile_enabled:
|
||||
self.reconcile_gate_once() # F-1
|
||||
if settings.reconcile_plane_enabled:
|
||||
self.reconcile_plane_once() # F-2
|
||||
self.last_run_ts = datetime.now(timezone.utc).timestamp()
|
||||
|
||||
def _run(self) -> None:
|
||||
logger.info(
|
||||
f"Reconciler started (interval={self.interval_s}s, "
|
||||
f"enabled={settings.reconcile_enabled}, "
|
||||
f"plane_enabled={settings.reconcile_plane_enabled})"
|
||||
)
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
self._tick()
|
||||
except Exception as e: # noqa: BLE001 - outer never-raise
|
||||
logger.error(f"Reconciler loop error: {e}")
|
||||
self._stop.wait(self.interval_s)
|
||||
logger.info("Reconciler stopped")
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start the daemon thread (idempotent: a live thread is a no-op)."""
|
||||
if self._thread and self._thread.is_alive():
|
||||
return
|
||||
self._stop.clear()
|
||||
self._thread = threading.Thread(
|
||||
target=self._run, name="reconciler", daemon=True
|
||||
)
|
||||
self._thread.start()
|
||||
|
||||
def stop(self, timeout: float = 5.0) -> None:
|
||||
self._stop.set()
|
||||
if self._thread:
|
||||
self._thread.join(timeout=timeout)
|
||||
|
||||
def status(self) -> dict:
|
||||
"""Reconcile snapshot for /queue observability."""
|
||||
return {
|
||||
"enabled": settings.reconcile_enabled,
|
||||
"plane_enabled": settings.reconcile_plane_enabled,
|
||||
"interval": self.interval_s,
|
||||
"last_run_ts": self.last_run_ts,
|
||||
"unblocked_total": self.unblocked_total,
|
||||
"last_unblocked": self.last_unblocked,
|
||||
}
|
||||
|
||||
|
||||
# Module-level singleton used by the FastAPI lifespan.
|
||||
reconciler = Reconciler()
|
||||
@@ -318,6 +318,75 @@ def advance_stage(
|
||||
return result
|
||||
|
||||
|
||||
def advance_if_gate_passed(
|
||||
task_id: int,
|
||||
current_stage: str,
|
||||
repo: str,
|
||||
work_item_id: str,
|
||||
branch: str,
|
||||
) -> AdvanceResult | None:
|
||||
"""ORCH-053 (F-1): reconcile a stuck stage by advancing it ONLY if its
|
||||
quality gate is already green — without spamming failure notifications.
|
||||
|
||||
This is the thin wrapper the reconciler uses so that:
|
||||
|
||||
* The source of truth stays the GATE, and the advance path stays the
|
||||
UNCHANGED unified ``advance_stage(..., finished_agent=None)`` (the same
|
||||
path the Plane Approved-webhook uses). The reconciler never duplicates
|
||||
``update_task_stage`` / ``enqueue_job`` (AC-2).
|
||||
|
||||
* On a stable-RED gate the sweeper is structurally silent: we do a cheap
|
||||
read-only pre-evaluation of the gate and, if it fails, return ``None``
|
||||
WITHOUT ever calling ``advance_stage`` — so the QG-failure notification
|
||||
branch inside ``advance_stage`` (``agent is None`` ->
|
||||
``notify_qg_failure`` + ``plane_notify_qg``) cannot fire on any tick
|
||||
(AC-9). Spam is impossible by construction.
|
||||
|
||||
``analysis`` is intentionally NOT reconciled here: its gate
|
||||
(``check_analysis_approved``) is a HUMAN gate; with ``finished_agent=None``
|
||||
``advance_stage`` would treat it as approved-via-status and could advance an
|
||||
unapproved BRD. The analysis advance is owned by the Plane-side reconciler
|
||||
(F-2), which checks the real Plane status (AC-16).
|
||||
|
||||
Returns the ``AdvanceResult`` from ``advance_stage`` when the gate passed,
|
||||
or ``None`` when the stage is not eligible / the gate is red / on any error
|
||||
(never raises — the caller isolates per-task failures).
|
||||
"""
|
||||
try:
|
||||
# AC-16: F-1 never reconciles the human analysis gate.
|
||||
if current_stage == "analysis":
|
||||
return None
|
||||
|
||||
qg_name = get_qg_for_stage(current_stage)
|
||||
if not qg_name:
|
||||
# created / done -> no gate to evaluate.
|
||||
return None
|
||||
|
||||
# Read-only pre-evaluation with the SAME dispatcher the webhook path uses.
|
||||
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
|
||||
if not passed:
|
||||
# Stable-red -> stay silent (no advance_stage call -> no QG-failure
|
||||
# notification on this or any later tick).
|
||||
logger.debug(
|
||||
f"reconciler: task {task_id} gate '{qg_name}' still red "
|
||||
f"({reason}); leaving on '{current_stage}'"
|
||||
)
|
||||
return None
|
||||
|
||||
# Gate is green: advance via the unchanged unified path. It re-runs the
|
||||
# (idempotent, read-only) gate, advances the stage, sends the STANDARD
|
||||
# advance notifications and enqueues the next agent.
|
||||
return advance_stage(
|
||||
task_id, current_stage, repo, work_item_id, branch, finished_agent=None
|
||||
)
|
||||
except Exception as e: # noqa: BLE001 - never-raise per ORCH-053 NFR
|
||||
logger.error(
|
||||
f"advance_if_gate_passed failed for task_id={task_id} "
|
||||
f"stage={current_stage}: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def _build_analyst_ready_comment(
|
||||
repo: str, work_item_id: str, branch: str, task_id: int | None = None
|
||||
) -> str:
|
||||
|
||||
@@ -144,6 +144,36 @@ async def handle_push(payload: dict):
|
||||
logger.info(f"Task {task_id}: source push detected on '{branch}', waiting for CI")
|
||||
|
||||
|
||||
def _resolve_branch_via_db(repo_name: str) -> str:
|
||||
"""ORCH-053 (F-3): resolve a CI-status SHA to a branch via the tasks DB.
|
||||
|
||||
Returns the branch of the SINGLE development-stage task for ``repo_name``.
|
||||
If there are zero or several such tasks the match is ambiguous -> return ""
|
||||
(the caller leaves the branch unresolved; never a false match). Logged at
|
||||
INFO for visibility. Never raises.
|
||||
"""
|
||||
try:
|
||||
from ..db import get_development_tasks_by_repo
|
||||
devs = get_development_tasks_by_repo(repo_name)
|
||||
except Exception as e: # noqa: BLE001 - defensive, never break the webhook
|
||||
logger.info(f"CI status: sha->branch DB fallback errored for {repo_name}: {e}")
|
||||
return ""
|
||||
if len(devs) == 1:
|
||||
branch = devs[0].get("branch") or ""
|
||||
if branch:
|
||||
logger.info(
|
||||
f"CI status: sha->branch resolved via DB fallback to '{branch}' "
|
||||
f"(unique development task in {repo_name})"
|
||||
)
|
||||
return branch
|
||||
if len(devs) > 1:
|
||||
logger.info(
|
||||
f"CI status: sha->branch DB fallback ambiguous "
|
||||
f"({len(devs)} development tasks in {repo_name}), leaving unresolved"
|
||||
)
|
||||
return ""
|
||||
|
||||
|
||||
async def handle_ci_status(payload: dict):
|
||||
"""
|
||||
CI status update:
|
||||
@@ -178,7 +208,15 @@ async def handle_ci_status(payload: dict):
|
||||
except Exception:
|
||||
pass
|
||||
if not branch:
|
||||
logger.debug(f"CI status event: could not determine branch for sha={sha}")
|
||||
# ORCH-053 (F-3): DB fallback — when the SHA cannot be resolved to a
|
||||
# branch (lost on a 502 rebuild, etc.), match it to the UNIQUE
|
||||
# development-stage task of this repo. Ambiguity (more than one) is
|
||||
# left unresolved to avoid a false match; the F-1 sweeper still picks
|
||||
# such a task up later (defense-in-depth, not the critical path).
|
||||
branch = _resolve_branch_via_db(repo_name)
|
||||
if not branch:
|
||||
# logger.info (was debug) so a lost CI event is VISIBLE in the logs.
|
||||
logger.info(f"CI status event: could not determine branch for sha={sha}")
|
||||
return
|
||||
|
||||
repo_name = payload.get("repository", {}).get("name", settings.default_repo)
|
||||
|
||||
@@ -17,6 +17,7 @@ from ..db import (
|
||||
update_task_stage,
|
||||
enqueue_job,
|
||||
insert_event_dedup,
|
||||
create_task_atomic,
|
||||
)
|
||||
from ._dedup import plane_delivery_id
|
||||
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
||||
@@ -496,15 +497,21 @@ async def start_pipeline(data: dict, project_id: str = ""):
|
||||
f"branch collision for {repo}; disambiguated to unique branch {branch}"
|
||||
)
|
||||
|
||||
# Insert task into DB
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(plane_id, work_item_id, repo, branch, "analysis", plane_id, name),
|
||||
# Insert task into DB — ORCH-053 (AC-4): atomic anti-dup claim under a
|
||||
# process-wide lock. If the F-2 reconciler and this live webhook race on the
|
||||
# same plane_id, exactly one wins (created=True); the loser sees the existing
|
||||
# task and returns WITHOUT creating a second branch / worktree / analyst job.
|
||||
task_row, created = create_task_atomic(
|
||||
plane_id, work_item_id, repo, branch, "analysis", name
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
if not created:
|
||||
logger.info(
|
||||
f"start_pipeline: task for plane_id={plane_id} already exists "
|
||||
f"(id={task_row['id']}, work_item_id={task_row.get('work_item_id')}), "
|
||||
f"skipping duplicate creation"
|
||||
)
|
||||
return
|
||||
task_id = task_row["id"]
|
||||
|
||||
# Create branch in Gitea
|
||||
try:
|
||||
@@ -523,20 +530,17 @@ async def start_pipeline(data: dict, project_id: str = ""):
|
||||
|
||||
logger.info(f"Task created: {work_item_id} ({name}), branch={branch}, stage=analysis")
|
||||
|
||||
# Launch analyst agent
|
||||
# Launch analyst agent (task_id from the atomic create above).
|
||||
try:
|
||||
task_row = get_db().execute("SELECT id FROM tasks WHERE work_item_id=?", (work_item_id,)).fetchone()
|
||||
if task_row:
|
||||
task_id = task_row[0]
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}"
|
||||
)
|
||||
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
|
||||
# Post start comment to Plane
|
||||
from ..plane_sync import add_comment as _add_comment
|
||||
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}"
|
||||
)
|
||||
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
|
||||
# Post start comment to Plane
|
||||
from ..plane_sync import add_comment as _add_comment
|
||||
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")
|
||||
|
||||
|
||||
@@ -37,6 +37,10 @@ def _no_telegram(monkeypatch):
|
||||
monkeypatch.setattr("src.webhooks.plane.send_telegram", _noop, raising=False)
|
||||
monkeypatch.setattr("src.agents.launcher.send_telegram", _noop, raising=False)
|
||||
monkeypatch.setattr("src.queue_worker.send_telegram", _noop, raising=False)
|
||||
# ORCH-053: the reconciler binds send_telegram as a MODULE-LEVEL name
|
||||
# (from .notifications import send_telegram), so the source patch alone would
|
||||
# not intercept its unblock notification — patch it here too.
|
||||
monkeypatch.setattr("src.reconciler.send_telegram", _noop, raising=False)
|
||||
yield
|
||||
|
||||
|
||||
|
||||
@@ -72,3 +72,46 @@ def test_merge_gate_settings_env_override(monkeypatch):
|
||||
assert s.merge_lock_timeout_s == 90
|
||||
assert s.merge_defer_delay_s == 5
|
||||
assert s.merge_defer_max_attempts == 9
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-053 / TC-22: reconcile_* settings defaults + env override.
|
||||
# ---------------------------------------------------------------------------
|
||||
_RECONCILE_ENV = (
|
||||
"ORCH_RECONCILE_ENABLED",
|
||||
"ORCH_RECONCILE_INTERVAL_S",
|
||||
"ORCH_RECONCILE_PLANE_ENABLED",
|
||||
"ORCH_RECONCILE_GRACE_DEFAULT_S",
|
||||
"ORCH_RECONCILE_GRACE_OVERRIDES_JSON",
|
||||
"ORCH_RECONCILE_NOTIFY_UNBLOCK",
|
||||
)
|
||||
|
||||
|
||||
def test_reconcile_settings_defaults(monkeypatch):
|
||||
"""TC-22 / AC-13: documented defaults when no env is set."""
|
||||
for name in _RECONCILE_ENV:
|
||||
monkeypatch.delenv(name, raising=False)
|
||||
s = Settings()
|
||||
assert s.reconcile_enabled is True
|
||||
assert s.reconcile_interval_s == 120
|
||||
assert s.reconcile_plane_enabled is True
|
||||
assert s.reconcile_grace_default_s == 600
|
||||
assert s.reconcile_grace_overrides_json == ""
|
||||
assert s.reconcile_notify_unblock is True
|
||||
|
||||
|
||||
def test_reconcile_settings_env_override(monkeypatch):
|
||||
"""TC-22 / AC-13: each field is read from its ORCH_* env var."""
|
||||
monkeypatch.setenv("ORCH_RECONCILE_ENABLED", "false")
|
||||
monkeypatch.setenv("ORCH_RECONCILE_INTERVAL_S", "300")
|
||||
monkeypatch.setenv("ORCH_RECONCILE_PLANE_ENABLED", "false")
|
||||
monkeypatch.setenv("ORCH_RECONCILE_GRACE_DEFAULT_S", "900")
|
||||
monkeypatch.setenv("ORCH_RECONCILE_GRACE_OVERRIDES_JSON", '{"development": 300}')
|
||||
monkeypatch.setenv("ORCH_RECONCILE_NOTIFY_UNBLOCK", "false")
|
||||
s = Settings()
|
||||
assert s.reconcile_enabled is False
|
||||
assert s.reconcile_interval_s == 300
|
||||
assert s.reconcile_plane_enabled is False
|
||||
assert s.reconcile_grace_default_s == 900
|
||||
assert s.reconcile_grace_overrides_json == '{"development": 300}'
|
||||
assert s.reconcile_notify_unblock is False
|
||||
|
||||
119
tests/test_gitea_sha_resolve.py
Normal file
119
tests/test_gitea_sha_resolve.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""ORCH-053 (F-3): sha->branch resolution hardening in handle_ci_status.
|
||||
|
||||
When a CI-status webhook carries no ``branches[]`` and the SHA cannot be
|
||||
resolved to a feature branch via ``git branch -r --contains`` (lost on a 502
|
||||
rebuild, shallow clone, etc.), handle_ci_status now falls back to the tasks DB
|
||||
and matches the UNIQUE development-stage task of the repo. Ambiguity (more than
|
||||
one development task) is deliberately left unresolved so it can never make a
|
||||
false match.
|
||||
|
||||
The git subprocess and the network QG / Plane / Telegram side effects are mocked
|
||||
so the handler runs offline against a real isolated sqlite DB.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import tempfile
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_gitea_sha.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
from unittest.mock import MagicMock # noqa: E402
|
||||
|
||||
import src.db as _db # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src.webhooks import gitea as gitea_mod # noqa: E402
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fresh_db(monkeypatch):
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def silence_and_stub_git(monkeypatch):
|
||||
# git branch -r --contains <sha> resolves to nothing (forces the DB fallback).
|
||||
monkeypatch.setattr(
|
||||
gitea_mod.subprocess, "run",
|
||||
lambda *a, **k: SimpleNamespace(stdout="", returncode=0),
|
||||
)
|
||||
# Mute the network side effects bound module-level in gitea.
|
||||
for name in ("notify_stage_change", "notify_qg_failure", "notify_error",
|
||||
"plane_notify_stage"):
|
||||
monkeypatch.setattr(gitea_mod, name, MagicMock(), raising=False)
|
||||
|
||||
|
||||
def _make_dev_task(branch, wi, repo="enduro-trails"):
|
||||
conn = get_db()
|
||||
cur = conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
||||
"VALUES (?, ?, ?, ?, 'development')",
|
||||
(f"plane-{wi}", wi, repo, branch),
|
||||
)
|
||||
tid = cur.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return tid
|
||||
|
||||
|
||||
def _stage_of(task_id):
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT stage FROM tasks WHERE id = ?", (task_id,)).fetchone()
|
||||
conn.close()
|
||||
return row["stage"]
|
||||
|
||||
|
||||
def _ci_payload(sha="deadbeef", repo="enduro-trails", state="success"):
|
||||
return {
|
||||
"state": state,
|
||||
"sha": sha,
|
||||
"branches": [], # no branch in the event -> forces resolution
|
||||
"repository": {"name": repo},
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-18: unique development task -> DB fallback resolves the branch, advances.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc18_db_fallback_unique_match_advances(monkeypatch):
|
||||
ci = MagicMock(return_value=(True, "CI green"))
|
||||
monkeypatch.setattr(gitea_mod, "check_ci_green", ci)
|
||||
|
||||
tid = _make_dev_task("feature/ET-050-x", "ET-050")
|
||||
|
||||
asyncio.run(gitea_mod.handle_ci_status(_ci_payload()))
|
||||
|
||||
assert _stage_of(tid) == "review"
|
||||
ci.assert_called_once()
|
||||
# The fallback resolved to the unique dev task's branch.
|
||||
assert ci.call_args.args[1] == "feature/ET-050-x"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-19: several development tasks -> ambiguous -> no false match, no advance.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc19_db_fallback_ambiguous_no_match(monkeypatch, caplog):
|
||||
ci = MagicMock(return_value=(True, "CI green"))
|
||||
monkeypatch.setattr(gitea_mod, "check_ci_green", ci)
|
||||
|
||||
t1 = _make_dev_task("feature/ET-051-a", "ET-051")
|
||||
t2 = _make_dev_task("feature/ET-052-b", "ET-052")
|
||||
|
||||
with caplog.at_level("INFO", logger="orchestrator.webhooks.gitea"):
|
||||
asyncio.run(gitea_mod.handle_ci_status(_ci_payload()))
|
||||
|
||||
# Ambiguity -> branch unresolved -> handler returns before touching the gate.
|
||||
assert _stage_of(t1) == "development"
|
||||
assert _stage_of(t2) == "development"
|
||||
ci.assert_not_called()
|
||||
assert "could not determine branch" in caplog.text
|
||||
379
tests/test_reconciler.py
Normal file
379
tests/test_reconciler.py
Normal file
@@ -0,0 +1,379 @@
|
||||
"""ORCH-053: tests for the gate-side stuck-task reconciler (F-1) + lifecycle.
|
||||
|
||||
These cover the F-1 sweeper (``Reconciler.reconcile_gate_once``), the per-stage
|
||||
grace / config (``grace_for_stage``), the no-spam guarantee, the analysis carve-
|
||||
out (AC-16), never-raise isolation, the kill-switch, the unblock observability
|
||||
(AC-12 / F-4) and the restart-safe daemon thread (AC-11).
|
||||
|
||||
Everything that touches the network (the quality gate, Plane sync, Telegram) is
|
||||
mocked at the src.stage_engine / src.reconciler level so the reconciler runs
|
||||
against a real isolated sqlite DB (same convention as test_stage_engine.py).
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
# Isolated test DB (set BEFORE importing src.* so settings picks it up).
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_reconciler.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
from unittest.mock import MagicMock # noqa: E402
|
||||
|
||||
import src.db as _db # noqa: E402
|
||||
from src.db import init_db, get_db, enqueue_job # noqa: E402
|
||||
from src import stage_engine # noqa: E402
|
||||
from src import reconciler as reconciler_mod # noqa: E402
|
||||
from src.reconciler import Reconciler, grace_for_stage # noqa: E402
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
@pytest.fixture(autouse=True)
|
||||
def fresh_db(monkeypatch):
|
||||
"""Fresh isolated DB per test."""
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def silence_side_effects(monkeypatch):
|
||||
"""No-op every Plane/Telegram/notification side effect in the engine so the
|
||||
real advance_stage runs deterministically and offline."""
|
||||
for name in (
|
||||
"notify_stage_change",
|
||||
"notify_qg_failure",
|
||||
"notify_approve_requested",
|
||||
"notify_error",
|
||||
"send_telegram",
|
||||
"plane_notify_stage",
|
||||
"plane_notify_qg",
|
||||
"plane_add_comment",
|
||||
"set_issue_in_review",
|
||||
"set_issue_needs_input",
|
||||
"set_issue_in_progress",
|
||||
"set_issue_blocked",
|
||||
"set_issue_done",
|
||||
):
|
||||
monkeypatch.setattr(stage_engine, name, MagicMock(), raising=False)
|
||||
|
||||
|
||||
def _make_task(stage, *, repo="enduro-trails", branch="feature/ET-001-x",
|
||||
wi="ET-001", age_s=None):
|
||||
"""Insert a task; if age_s is given, backdate updated_at by that many secs."""
|
||||
conn = get_db()
|
||||
cur = conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
(f"plane-{wi}", wi, repo, branch, stage),
|
||||
)
|
||||
task_id = cur.lastrowid
|
||||
if age_s is not None:
|
||||
conn.execute(
|
||||
"UPDATE tasks SET updated_at = datetime('now', ?) WHERE id = ?",
|
||||
(f"-{int(age_s)} seconds", task_id),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return task_id
|
||||
|
||||
|
||||
def _stage_of(task_id):
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT stage FROM tasks WHERE id = ?", (task_id,)).fetchone()
|
||||
conn.close()
|
||||
return row["stage"]
|
||||
|
||||
|
||||
def _jobs_for(task_id, agent=None):
|
||||
conn = get_db()
|
||||
if agent:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM jobs WHERE task_id = ? AND agent = ?", (task_id, agent)
|
||||
).fetchall()
|
||||
else:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM jobs WHERE task_id = ?", (task_id,)
|
||||
).fetchall()
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
def _green_ci(monkeypatch, value=(True, "CI green")):
|
||||
"""Patch the check_ci_green entry in QG_CHECKS; return the mock."""
|
||||
m = MagicMock(return_value=value)
|
||||
monkeypatch.setitem(stage_engine.QG_CHECKS, "check_ci_green", m)
|
||||
return m
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-01: happy path — stuck development task is advanced to review
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc01_advances_stuck_development_task(monkeypatch):
|
||||
_green_ci(monkeypatch)
|
||||
task_id = _make_task("development", age_s=3600) # well past grace
|
||||
|
||||
Reconciler().reconcile_gate_once()
|
||||
|
||||
assert _stage_of(task_id) == "review"
|
||||
reviewer_jobs = _jobs_for(task_id, "reviewer")
|
||||
assert len(reviewer_jobs) == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-02: source of truth is the gate — advance goes through advance_stage
|
||||
# with finished_agent=None (no own update_task_stage/enqueue_job).
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc02_advances_via_advance_stage_finished_agent_none(monkeypatch):
|
||||
_green_ci(monkeypatch)
|
||||
spy = MagicMock(wraps=stage_engine.advance_stage)
|
||||
# advance_if_gate_passed resolves advance_stage as a module global.
|
||||
monkeypatch.setattr(stage_engine, "advance_stage", spy)
|
||||
|
||||
task_id = _make_task("development", age_s=3600)
|
||||
Reconciler().reconcile_gate_once()
|
||||
|
||||
assert spy.call_count == 1
|
||||
# finished_agent must be None (the webhook path).
|
||||
_args, kwargs = spy.call_args
|
||||
assert kwargs.get("finished_agent", "MISSING") is None
|
||||
assert spy.call_args.args[0] == task_id
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-03: task with an active job is skipped — gate not evaluated, no advance.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc03_active_job_skipped(monkeypatch):
|
||||
ci = _green_ci(monkeypatch)
|
||||
spy = MagicMock(wraps=stage_engine.advance_stage)
|
||||
monkeypatch.setattr(stage_engine, "advance_stage", spy)
|
||||
|
||||
task_id = _make_task("development", age_s=3600)
|
||||
enqueue_job("reviewer", "enduro-trails", task_id=task_id) # active (queued)
|
||||
|
||||
Reconciler().reconcile_gate_once()
|
||||
|
||||
assert _stage_of(task_id) == "development"
|
||||
ci.assert_not_called()
|
||||
spy.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-04: per-stage grace — fresh task untouched, at-threshold task eligible.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc04_grace_boundary(monkeypatch):
|
||||
monkeypatch.setattr(reconciler_mod.settings, "reconcile_grace_default_s", 600)
|
||||
_green_ci(monkeypatch)
|
||||
|
||||
fresh = _make_task("development", branch="feature/ET-002-fresh",
|
||||
wi="ET-002", age_s=10) # < grace -> untouched
|
||||
stuck = _make_task("development", branch="feature/ET-003-stuck",
|
||||
wi="ET-003", age_s=3600) # >= grace -> advanced
|
||||
|
||||
Reconciler().reconcile_gate_once()
|
||||
|
||||
assert _stage_of(fresh) == "development"
|
||||
assert _stage_of(stuck) == "review"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-05: grace_for_stage reads overrides JSON; bad JSON -> default, no crash.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc05_grace_for_stage_overrides(monkeypatch):
|
||||
monkeypatch.setattr(reconciler_mod.settings, "reconcile_grace_default_s", 600)
|
||||
monkeypatch.setattr(
|
||||
reconciler_mod.settings,
|
||||
"reconcile_grace_overrides_json",
|
||||
'{"development": 30, "review": 7200}',
|
||||
)
|
||||
assert grace_for_stage("development") == 30
|
||||
assert grace_for_stage("review") == 7200
|
||||
# missing key -> default
|
||||
assert grace_for_stage("testing") == 600
|
||||
|
||||
|
||||
def test_tc05_grace_for_stage_invalid_json_falls_back(monkeypatch):
|
||||
monkeypatch.setattr(reconciler_mod.settings, "reconcile_grace_default_s", 600)
|
||||
monkeypatch.setattr(
|
||||
reconciler_mod.settings, "reconcile_grace_overrides_json", "{not valid json"
|
||||
)
|
||||
# Must not raise, must fall back to the default.
|
||||
assert grace_for_stage("development") == 600
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-06: no spam — a stable-red gate never advances and never notifies, even
|
||||
# across many ticks.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc06_red_gate_no_spam(monkeypatch):
|
||||
_green_ci(monkeypatch, value=(False, "CI red"))
|
||||
task_id = _make_task("development", age_s=3600)
|
||||
|
||||
rec = Reconciler()
|
||||
for _ in range(5):
|
||||
rec.reconcile_gate_once()
|
||||
|
||||
assert _stage_of(task_id) == "development"
|
||||
# The QG-failure notification branch inside advance_stage must never fire,
|
||||
# because advance_if_gate_passed returns None on a red gate (no advance call).
|
||||
stage_engine.notify_qg_failure.assert_not_called()
|
||||
stage_engine.plane_notify_qg.assert_not_called()
|
||||
assert rec.unblocked_total == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-07: silence when in sync — done / busy / within-grace tasks => no advance.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc07_silence_when_in_sync(monkeypatch):
|
||||
_green_ci(monkeypatch)
|
||||
spy = MagicMock(wraps=stage_engine.advance_stage)
|
||||
monkeypatch.setattr(stage_engine, "advance_stage", spy)
|
||||
|
||||
_make_task("done", branch="feature/ET-010-done", wi="ET-010", age_s=3600)
|
||||
fresh = _make_task("development", branch="feature/ET-011-fresh",
|
||||
wi="ET-011", age_s=5)
|
||||
busy = _make_task("development", branch="feature/ET-012-busy",
|
||||
wi="ET-012", age_s=3600)
|
||||
enqueue_job("reviewer", "enduro-trails", task_id=busy)
|
||||
|
||||
rec = Reconciler()
|
||||
rec.reconcile_gate_once()
|
||||
|
||||
spy.assert_not_called()
|
||||
assert rec.unblocked_total == 0
|
||||
assert _stage_of(fresh) == "development"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-08 (AC-16): F-1 never advances the human analysis gate.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc08_analysis_not_advanced_by_f1(monkeypatch):
|
||||
# Even if the analysis gate would "pass", F-1 must not touch analysis.
|
||||
monkeypatch.setitem(
|
||||
stage_engine.QG_CHECKS, "check_analysis_approved",
|
||||
MagicMock(return_value=(True, "approved")),
|
||||
)
|
||||
spy = MagicMock(wraps=stage_engine.advance_stage)
|
||||
monkeypatch.setattr(stage_engine, "advance_stage", spy)
|
||||
|
||||
task_id = _make_task("analysis", age_s=3600)
|
||||
Reconciler().reconcile_gate_once()
|
||||
|
||||
assert _stage_of(task_id) == "analysis"
|
||||
spy.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-09: never-raise — one task blowing up does not stop the others.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc09_never_raise_isolates_failure(monkeypatch):
|
||||
calls = []
|
||||
|
||||
def boom(task_id, stage, repo, wi, branch):
|
||||
calls.append(task_id)
|
||||
raise RuntimeError("boom")
|
||||
|
||||
monkeypatch.setattr(reconciler_mod, "advance_if_gate_passed", boom)
|
||||
|
||||
t1 = _make_task("development", branch="feature/ET-020-a", wi="ET-020", age_s=3600)
|
||||
t2 = _make_task("development", branch="feature/ET-021-b", wi="ET-021", age_s=3600)
|
||||
|
||||
# Must not raise despite both tasks raising inside advance_if_gate_passed.
|
||||
Reconciler().reconcile_gate_once()
|
||||
|
||||
assert set(calls) == {t1, t2} # both attempted
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-10: kill-switches.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc10_kill_switch_disables_gate(monkeypatch):
|
||||
monkeypatch.setattr(reconciler_mod.settings, "reconcile_enabled", False)
|
||||
spy = MagicMock(wraps=stage_engine.advance_stage)
|
||||
monkeypatch.setattr(stage_engine, "advance_stage", spy)
|
||||
_green_ci(monkeypatch)
|
||||
|
||||
task_id = _make_task("development", age_s=3600)
|
||||
Reconciler().reconcile_gate_once()
|
||||
|
||||
assert _stage_of(task_id) == "development"
|
||||
spy.assert_not_called()
|
||||
|
||||
|
||||
def test_tc10_plane_switch_mutes_only_f2(monkeypatch):
|
||||
monkeypatch.setattr(reconciler_mod.settings, "reconcile_enabled", True)
|
||||
monkeypatch.setattr(reconciler_mod.settings, "reconcile_plane_enabled", False)
|
||||
|
||||
plane_pass = MagicMock()
|
||||
monkeypatch.setattr(reconciler_mod.Reconciler, "_reconcile_plane_project", plane_pass)
|
||||
# F-2 muted -> reconcile_plane_once is a no-op.
|
||||
Reconciler().reconcile_plane_once()
|
||||
plane_pass.assert_not_called()
|
||||
|
||||
# F-1 still runs.
|
||||
_green_ci(monkeypatch)
|
||||
task_id = _make_task("development", age_s=3600)
|
||||
Reconciler().reconcile_gate_once()
|
||||
assert _stage_of(task_id) == "review"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-20: observability — explicit unblock log line + telegram (AC-12 / F-4).
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc20_unblock_logs_and_notifies(monkeypatch, caplog):
|
||||
_green_ci(monkeypatch)
|
||||
monkeypatch.setattr(reconciler_mod.settings, "reconcile_notify_unblock", True)
|
||||
tg = MagicMock()
|
||||
monkeypatch.setattr(reconciler_mod, "send_telegram", tg)
|
||||
|
||||
_make_task("development", wi="ET-042", age_s=3600)
|
||||
|
||||
rec = Reconciler()
|
||||
with caplog.at_level("INFO", logger="orchestrator.reconciler"):
|
||||
rec.reconcile_gate_once()
|
||||
|
||||
# Exact AC-12 contract string.
|
||||
assert "reconciler: ET-042 development разблокирована (потерян webhook)" in caplog.text
|
||||
assert rec.unblocked_total == 1
|
||||
assert rec.last_unblocked == "ET-042"
|
||||
tg.assert_called_once()
|
||||
|
||||
|
||||
def test_tc20_no_telegram_when_disabled(monkeypatch):
|
||||
_green_ci(monkeypatch)
|
||||
monkeypatch.setattr(reconciler_mod.settings, "reconcile_notify_unblock", False)
|
||||
tg = MagicMock()
|
||||
monkeypatch.setattr(reconciler_mod, "send_telegram", tg)
|
||||
|
||||
_make_task("development", wi="ET-043", age_s=3600)
|
||||
Reconciler().reconcile_gate_once()
|
||||
|
||||
tg.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-21: restart-safe daemon thread — start/stop/idempotent start.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc21_daemon_thread_lifecycle(monkeypatch):
|
||||
# Avoid any real work in the loop: disable both branches, big interval.
|
||||
monkeypatch.setattr(reconciler_mod.settings, "reconcile_enabled", False)
|
||||
rec = Reconciler(interval_s=60)
|
||||
|
||||
rec.start()
|
||||
assert rec._thread is not None and rec._thread.is_alive()
|
||||
first_thread = rec._thread
|
||||
|
||||
# Idempotent: a second start does not spawn a new thread.
|
||||
rec.start()
|
||||
assert rec._thread is first_thread
|
||||
|
||||
rec.stop(timeout=5.0)
|
||||
assert not first_thread.is_alive()
|
||||
297
tests/test_reconciler_plane.py
Normal file
297
tests/test_reconciler_plane.py
Normal file
@@ -0,0 +1,297 @@
|
||||
"""ORCH-053: tests for the Plane-side reconciler (F-2) + sha-resolve helpers.
|
||||
|
||||
F-2 polls the Plane API per project (``list_issues_by_state``) and REPLAYS a
|
||||
missed In Progress / Approved / Rejected transition through the EXISTING
|
||||
``webhooks.plane.handle_status_start`` / ``handle_verdict`` handlers — it never
|
||||
duplicates pipeline logic. These tests mock those handlers (AsyncMock) and the
|
||||
Plane API helpers, and verify the dispatch / idempotency / multi-project rules.
|
||||
|
||||
TC-15 is the AC-4 anti-dup integration test for ``create_task_atomic`` against a
|
||||
real isolated sqlite DB under concurrency.
|
||||
TC-16 exercises ``plane_sync.list_issues_by_state`` directly (pagination + the
|
||||
never-raise contract).
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
import threading
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_reconciler_plane.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock # noqa: E402
|
||||
|
||||
import src.db as _db # noqa: E402
|
||||
from src.db import init_db, get_db, enqueue_job, create_task_atomic # noqa: E402
|
||||
from src import reconciler as reconciler_mod # noqa: E402
|
||||
from src import plane_sync # noqa: E402
|
||||
from src.reconciler import Reconciler # noqa: E402
|
||||
|
||||
_IN_PROGRESS = "uuid-in-progress"
|
||||
_APPROVED = "uuid-approved"
|
||||
_REJECTED = "uuid-rejected"
|
||||
_OLD_TS = "2020-01-01T00:00:00Z" # well past any grace
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fresh_db(monkeypatch):
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def single_project(monkeypatch):
|
||||
"""Restrict F-2 to a single fake project and stub its state resolution."""
|
||||
proj = SimpleNamespace(
|
||||
plane_project_id="proj-1", repo="enduro-trails", work_item_prefix="ET",
|
||||
)
|
||||
monkeypatch.setattr(reconciler_mod.projects, "PROJECTS", [proj])
|
||||
monkeypatch.setattr(
|
||||
reconciler_mod, "get_project_states",
|
||||
lambda pid: {
|
||||
"in_progress": _IN_PROGRESS,
|
||||
"approved": _APPROVED,
|
||||
"rejected": _REJECTED,
|
||||
},
|
||||
)
|
||||
return proj
|
||||
|
||||
|
||||
def _make_task(plane_id, stage="review", repo="enduro-trails",
|
||||
branch="feature/ET-001-x", wi="ET-001"):
|
||||
conn = get_db()
|
||||
cur = conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(plane_id, wi, repo, branch, stage, plane_id),
|
||||
)
|
||||
tid = cur.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return tid
|
||||
|
||||
|
||||
def _patch_handlers(monkeypatch):
|
||||
start = AsyncMock()
|
||||
verdict = AsyncMock()
|
||||
monkeypatch.setattr(reconciler_mod, "handle_status_start", start)
|
||||
monkeypatch.setattr(reconciler_mod, "handle_verdict", verdict)
|
||||
return start, verdict
|
||||
|
||||
|
||||
def _patch_issues(monkeypatch, issues):
|
||||
monkeypatch.setattr(
|
||||
reconciler_mod, "list_issues_by_state", lambda pid, states: list(issues)
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-11: In Progress without a task -> handle_status_start once.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc11_in_progress_without_task_starts_pipeline(monkeypatch, single_project):
|
||||
start, verdict = _patch_handlers(monkeypatch)
|
||||
_patch_issues(monkeypatch, [
|
||||
{"id": "iss-1", "state": {"id": _IN_PROGRESS}, "updated_at": _OLD_TS,
|
||||
"name": "Some issue"},
|
||||
])
|
||||
|
||||
Reconciler().reconcile_plane_once()
|
||||
|
||||
assert start.call_count == 1
|
||||
issue_data, project_id = start.call_args.args
|
||||
assert issue_data["id"] == "iss-1"
|
||||
assert issue_data["state"]["id"] == _IN_PROGRESS
|
||||
assert project_id == "proj-1"
|
||||
verdict.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-12: Approved with an existing task, no active job -> handle_verdict(True).
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc12_approved_replays_verdict(monkeypatch, single_project):
|
||||
start, verdict = _patch_handlers(monkeypatch)
|
||||
_make_task("iss-2", stage="review")
|
||||
_patch_issues(monkeypatch, [
|
||||
{"id": "iss-2", "state": {"id": _APPROVED}, "updated_at": _OLD_TS},
|
||||
])
|
||||
|
||||
Reconciler().reconcile_plane_once()
|
||||
|
||||
assert verdict.call_count == 1
|
||||
assert verdict.call_args.kwargs.get("approved") is True
|
||||
start.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-13: Rejected with an existing task -> handle_verdict(False).
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc13_rejected_replays_verdict(monkeypatch, single_project):
|
||||
start, verdict = _patch_handlers(monkeypatch)
|
||||
_make_task("iss-3", stage="review")
|
||||
_patch_issues(monkeypatch, [
|
||||
{"id": "iss-3", "state": {"id": _REJECTED}, "updated_at": _OLD_TS},
|
||||
])
|
||||
|
||||
Reconciler().reconcile_plane_once()
|
||||
|
||||
assert verdict.call_count == 1
|
||||
assert verdict.call_args.kwargs.get("approved") is False
|
||||
start.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-14: idempotency — an active job means a live webhook is in flight -> skip.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc14_active_job_skips(monkeypatch, single_project):
|
||||
start, verdict = _patch_handlers(monkeypatch)
|
||||
tid = _make_task("iss-4", stage="review")
|
||||
enqueue_job("reviewer", "enduro-trails", task_id=tid) # active
|
||||
_patch_issues(monkeypatch, [
|
||||
{"id": "iss-4", "state": {"id": _APPROVED}, "updated_at": _OLD_TS},
|
||||
])
|
||||
|
||||
Reconciler().reconcile_plane_once()
|
||||
|
||||
start.assert_not_called()
|
||||
verdict.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-14b: within-grace issue is left alone (lost, not merely delayed).
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc14b_within_grace_skipped(monkeypatch, single_project):
|
||||
from datetime import datetime, timezone
|
||||
start, verdict = _patch_handlers(monkeypatch)
|
||||
_make_task("iss-5", stage="review")
|
||||
fresh_ts = datetime.now(timezone.utc).isoformat()
|
||||
_patch_issues(monkeypatch, [
|
||||
{"id": "iss-5", "state": {"id": _APPROVED}, "updated_at": fresh_ts},
|
||||
])
|
||||
|
||||
Reconciler().reconcile_plane_once()
|
||||
|
||||
start.assert_not_called()
|
||||
verdict.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-15 (AC-4): atomic anti-dup — concurrent create_task_atomic for one
|
||||
# plane_id yields exactly ONE row and ONE created=True.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc15_create_task_atomic_no_duplicate():
|
||||
results = []
|
||||
barrier = threading.Barrier(8)
|
||||
|
||||
def worker():
|
||||
barrier.wait() # maximise the race
|
||||
row, created = create_task_atomic(
|
||||
"plane-dup", "ET-099", "enduro-trails",
|
||||
"feature/ET-099-x", "analysis", "Dup race",
|
||||
)
|
||||
results.append((row["id"], created))
|
||||
|
||||
threads = [threading.Thread(target=worker) for _ in range(8)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
created_flags = [c for _, c in results]
|
||||
assert created_flags.count(True) == 1 # exactly one winner
|
||||
assert created_flags.count(False) == 7 # the rest see the existing row
|
||||
|
||||
conn = get_db()
|
||||
n = conn.execute(
|
||||
"SELECT COUNT(*) FROM tasks WHERE plane_id = 'plane-dup'"
|
||||
).fetchone()[0]
|
||||
conn.close()
|
||||
assert n == 1 # only one task row ever created
|
||||
|
||||
# All callers see the same row id (the single task).
|
||||
assert len({rid for rid, _ in results}) == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-16: list_issues_by_state — never-raise on API error, filter+paginate on OK.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc16_list_issues_never_raises_on_error(monkeypatch):
|
||||
def boom(*a, **k):
|
||||
raise RuntimeError("plane down")
|
||||
|
||||
monkeypatch.setattr(plane_sync.httpx, "get", boom)
|
||||
out = plane_sync.list_issues_by_state("proj-1", [_APPROVED])
|
||||
assert out == []
|
||||
|
||||
|
||||
def test_tc16_list_issues_paginates_and_filters(monkeypatch):
|
||||
page1 = {
|
||||
"results": [
|
||||
{"id": "a", "state": {"id": _APPROVED}},
|
||||
{"id": "b", "state": {"id": "other"}},
|
||||
],
|
||||
"next_page_results": True,
|
||||
"next_cursor": "cur2",
|
||||
}
|
||||
page2 = {
|
||||
"results": [
|
||||
{"id": "c", "state": _APPROVED}, # bare-uuid state shape
|
||||
{"id": "d", "state": {"id": _REJECTED}},
|
||||
],
|
||||
"next_page_results": False,
|
||||
"next_cursor": None,
|
||||
}
|
||||
pages = iter([page1, page2])
|
||||
|
||||
def fake_get(url, headers=None, params=None, timeout=None):
|
||||
resp = MagicMock()
|
||||
resp.json.return_value = next(pages)
|
||||
resp.raise_for_status.return_value = None
|
||||
return resp
|
||||
|
||||
monkeypatch.setattr(plane_sync.httpx, "get", fake_get)
|
||||
|
||||
out = plane_sync.list_issues_by_state("proj-1", [_APPROVED, _REJECTED])
|
||||
ids = {i["id"] for i in out}
|
||||
assert ids == {"a", "c", "d"} # 'b' filtered out (state 'other')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TC-17: F-2 polls EVERY registry project and resolves states per-project.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_tc17_polls_all_projects_resolves_states_per_project(monkeypatch):
|
||||
_patch_handlers(monkeypatch)
|
||||
from src import projects as projects_mod
|
||||
projects_mod.reload_projects()
|
||||
expected_ids = {p.plane_project_id for p in projects_mod.PROJECTS}
|
||||
assert len(expected_ids) >= 2 # enduro + orchestrator in the default registry
|
||||
|
||||
states_calls = []
|
||||
issues_calls = []
|
||||
|
||||
def fake_states(pid):
|
||||
states_calls.append(pid)
|
||||
return {"in_progress": _IN_PROGRESS, "approved": _APPROVED, "rejected": _REJECTED}
|
||||
|
||||
def fake_issues(pid, states):
|
||||
issues_calls.append((pid, tuple(states)))
|
||||
return []
|
||||
|
||||
monkeypatch.setattr(reconciler_mod, "get_project_states", fake_states)
|
||||
monkeypatch.setattr(reconciler_mod, "list_issues_by_state", fake_issues)
|
||||
|
||||
Reconciler().reconcile_plane_once()
|
||||
|
||||
assert set(states_calls) == expected_ids
|
||||
assert {pid for pid, _ in issues_calls} == expected_ids
|
||||
# state uuids are resolved per-project (not hardcoded): each call carries them.
|
||||
for _pid, states in issues_calls:
|
||||
assert set(states) == {_IN_PROGRESS, _APPROVED, _REJECTED}
|
||||
Reference in New Issue
Block a user