fix(stage-engine): durable transition-ownership lease + expected-stage CAS (ORCH-114)
All checks were successful
CI / test (push) Successful in 1m11s
CI / test (pull_request) Successful in 1m8s

Close the root class of the ORCH-110/111/112/113 incident chain: side-effectful
stage transitions had no single ownership. `advance_stage` is re-enterable and wrote
the stage with a bare `UPDATE ... WHERE id=?` (no compare-and-swap), while >=5 actors
(monitor / Plane-webhook / reconciler F-1 / job-reaper / deploy-finalizer) enter the
same transition independently. A concurrent or post-restart re-entry therefore
re-applied irreversible effects (merge_pr / coverage-ratchet / image-rebuild /
prod-deploy initiation) and produced a contradictory rollback<->done (incident
ORCH-111, job 1914 / PR #130).

Two complementary layers, both additive, under one kill-switch, never-raise:
  1. Durable transition-lease (new table `transition_lease`) — owner-exclusion on
     ENTRY to the side-effectful region: a second actor that sees a LIVE owner does
     not start the heavy sub-gates at all (prevention, not post-hoc repair).
  2. Expected-stage CAS (`db.update_task_stage_cas`) — atomicity on the stage WRITE:
     a lost race aborts with NO side effect. Also closes the 6 paths that write the
     stage in bypass of advance_stage (gitea x5 + plane rollback).

Owner liveness = owner_pid + owner_boot_id (NOT a heartbeat — a blocking 900s merge
re-test cannot beat one; ADR-001 D3), making restart recovery free (a fresh boot_id
renders every prior lease stale -> reclaimed by recover_on_startup). The lease has no
own TTL: its hard age ceiling is the reaper Tier-3 backstop reaper_max_running_s, so
the cross-cutting budget invariant ORCH-065/109/110/113 is untouched.

Generalises ORCH-113 finalizer-liveness (process-local, Tier-2, deploy-staging) to a
durable cross-path lease: the reaper consults it on all relevant paths (defer live,
reclaim dead; Tier-3 ignores the marker -> bounded; a reap force-releases the lease);
reconciler F-1 and the Plane webhook defer on an active lease; main.lifespan calls
recover_on_startup() after requeue_running_jobs. finalizer_liveness.py is unchanged
(it remains the kill-switch-off fallback).

Scope self-hosting (transition_lease_repos="" -> orchestrator only; enduro untouched).
Kill-switch ORCH_TRANSITION_LEASE_ENABLED=false -> CAS degenerates to the prior
unconditional update_task_stage, lease inert, reaper -> ORCH-113 fallback (byte-for-
byte pre-ORCH-114). STAGE_TRANSITIONS / QG_CHECKS / check_* / machine-verdict keys /
existing table schemas — byte-for-byte (one additive table, no epoch column on tasks).

Observability: read-only `transition_lease` block in GET /queue + a Telegram alert on
forced/stale reclaim + optional POST /transition-lease/release?work_item=<id>.

Coverage: tests/test_orch114_transition_ownership.py (TC-01 mandatory regression of
the ORCH-111 class — red before fix, green after; TC-02..TC-14). Full suite green
(2048 passed); the 4 webhook tests that spied on the removed gitea.update_task_stage
were updated to spy on the new commit_stage_cas write path.

ADR: docs/work-items/ORCH-114/06-adr/ADR-001-transition-ownership-lease-and-stage-cas.md
Cross-cutting: docs/architecture/adr/adr-0045-transition-ownership-lease-and-stage-cas.md

Refs: ORCH-114
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-15 17:37:11 +03:00
parent 0939893c70
commit 0bda3c145b
15 changed files with 1591 additions and 82 deletions

View File

@@ -3,6 +3,9 @@
Формат: [Keep a Changelog](https://keepachangelog.com/). Записи — на смысловой PR/задачу.
## [Unreleased]
- **Ownership-lease для side-effectful переходов стадий + умное восстановление при старте** (ORCH-114, `fix`, bug→escalate full-cycle): закрыт **корневой класс** инцидент-цепочки ORCH-110/111/112/113 — у side-effectful переходов стадий не было единого владения. `advance_stage` ре-ентерабельна и пишет стадию «голым» `UPDATE … WHERE id=?` (без compare-and-swap), а ≥5 акторов (монитор / Plane-webhook / reconciler F-1 / job-reaper / deploy-finalizer) входят в один переход независимо → конкурентный или после-рестартовый повторный вход **дважды** применял необратимые эффекты (merge_pr / coverage-ratchet / image-rebuild / инициация прод-деплоя) и давал **противоречие rollback↔done** (инцидент ORCH-111, job 1914 / PR #130). Два комплементарных слоя, оба аддитивные, под единым kill-switch, never-raise: **(1) durable transition-lease** (новая таблица `transition_lease`) — владение на ВХОДЕ в side-effectful регион (второй актор, увидев живого владельца, не стартует тяжёлые под-гейты вовсе — предотвращение, не починка постфактум); **(2) expected-stage CAS** (`update_task_stage_cas`) — на ЗАПИСИ стадии (проигравший гонку — аборт без побочных эффектов), что закрывает и **6 путей записи стадии в обход `advance_stage`** (gitea×5 + plane rollback). Liveness владельца = `owner_pid` + `owner_boot_id` (НЕ heartbeat: блокирующий 900s merge re-test не может бить heartbeat — довод самого ORCH-113), что делает рестарт-recovery бесплатным (новый процесс → новый boot-id → все прежние lease мгновенно устаревшие → реклеймятся). Lease без собственного TTL: его потолок возраста = Tier-3 backstop `reaper_max_running_s` (5400) → сквозной бюджет ORCH-065/109/110/113 не тронут. `STAGE_TRANSITIONS` / реестр `QG_CHECKS` / семантика и имена `check_*` / machine-verdict-ключи / **схемы существующих таблиц** — байт-в-байт (одна аддитивная таблица, без epoch-колонки на `tasks`). Скоуп self-hosting (`transition_lease_repos=""` → только `orchestrator`; enduro не затронут); kill-switch `ORCH_TRANSITION_LEASE_ENABLED=false` → CAS вырождается в прежний безусловный `update_task_stage`, lease инертен → поведение байт-в-байт до ORCH-114. ADR: `docs/work-items/ORCH-114/06-adr/ADR-001-transition-ownership-lease-and-stage-cas.md`, сквозной `docs/architecture/adr/adr-0045-transition-ownership-lease-and-stage-cas.md`.
- **Leaf `src/transition_lease.py` (новый, чистый never-raise):** по образцу `serial_gate`/`coverage_gate`/`finalizer_liveness` (импортирует только `db`+`config`, лениво `merge_gate.pid_alive`/`qg.checks`/`notifications`; НЕ импортирует `stage_engine`/`launcher`) — `applies(repo)` / `acquire(task_id, owner, run_id, stage)` (атомарный rowcount-guard `INSERT … ON CONFLICT DO NOTHING` после очистки stale-строки) / `is_held_by_live_owner(task_id)` (fail-closed → defer на сомнении) / `release(task_id, force=False)` (holder-aware по boot) / `reclaim_if_stale` / `recover_on_startup` / `commit_stage_cas(task_id, expected, new, repo)` (flag-off → unconditional `update_task_stage`; flag-on → CAS) / `snapshot()`.
- **Интеграция:** `advance_stage` захватывает lease на входе в side-effectful ребро (`deploy-staging`/`deploy`), пишет стадию через CAS, освобождает lease в `try/finally` (на любом исходе, включая исключение/откат); job-reaper `_finalizer_owns` обобщён с процесс-локального ORCH-113 (Tier-2/`deploy-staging`) на **durable cross-path** lease (defer при живом владельце; Tier-3 backstop игнорирует маркер → bounded reclaim; реап force-освобождает lease); reconciler F-1 и Plane-webhook (`_try_advance_stage`) делают **defer** при активном lease; `main.lifespan` зовёт `recover_on_startup()` после `requeue_running_jobs`. Наблюдаемость — read-only блок `transition_lease` в `GET /queue` + Telegram-алерт на форсированный/устаревший реклейм + опциональный `POST /transition-lease/release?work_item=<id>`. Покрытие — `tests/test_orch114_transition_ownership.py` (TC-01 обязательный регресс класса ORCH-111: красный до фикса, зелёный после; TC-02…TC-14). Флаги (`config.py`, дефолт = боевое): `transition_lease_enabled` (env `ORCH_TRANSITION_LEASE_ENABLED`), `transition_lease_repos` (env `ORCH_TRANSITION_LEASE_REPOS`).
- **Гигиена shared deploy-базы: устойчивый self-deploy `git pull` к грязному дереву** (ORCH-112, `fix`, bug→escalate full-cycle): устранён инцидент ORCH-111 — self-deploy падал на шаге `git pull origin main` хост-хука с `error: Your local changes to the following files would be overwritten by merge: src/config.py` (грязь от неуспешной/отменённой/брошенной задачи ORCH-104 в общем main checkout) → деплой вставал → ручное вмешательство (на self-hosting — групповой риск). Решение — **resilient-pull, встроенный в прод-deploy-хук** (`--deploy`): перед `git pull` хук при обнаружении грязи приводит deploy-базу к чистому актуальному `origin/main` (`git fetch` + `git reset --hard origin/main` + **скоупленный** `git clean -fd`). Аддитивно, под kill-switch, never-raise, скоуп self-hosting; `STAGE_TRANSITIONS` / реестр `QG_CHECKS` / семантика и имена `check_*` / machine-verdict-ключи / схема БД / exit-code-контракт хука (0/1/2, ORCH-036) — **байт-в-байт не тронуты** (это устойчивость deploy-пути, **не** Quality Gate и **не** стадия). ADR: `docs/work-items/ORCH-112/06-adr/ADR-001-deploy-base-checkout-hygiene.md`, сквозной `docs/architecture/adr/adr-0044-deploy-base-checkout-hygiene.md`.
- **Leaf `src/checkout_hygiene.py` (новый, чистый never-raise):** по образцу `serial_gate`/`cancel`/`self_deploy` (импортирует только `config`, лениво `self_deploy`/`qg.checks`/`notifications`) — `applies(repo)` (kill-switch `checkout_hygiene_enabled` + скоуп `checkout_hygiene_repos`, пусто → self-hosting only, локально и ПЕРВЫМ), `hook_env(repo, work_item_id)` (env-префикс `CHECKOUT_HYGIENE=1 HYGIENE_REPORT=<host-path>`, инжектится в detached-команду хука только при `applies==True`, иначе `""` → голый pull 1:1), `read_report`/`alert_dirty` (наблюдаемость), `snapshot()` (read-only блок `GET /queue`).
- **Хук-блок «2a. Resilient pull» (`scripts/orchestrator-deploy-hook.sh`):** между шагом «1. Capture PREV_IMG» и «2. Pull», под `if [[ "${CHECKOUT_HYGIENE:-0}" == "1" ]]`. **Сохранность (NFR-2, жёсткий контракт):** `git clean`**только `-fd`, НИКОГДА `-x`** (иначе удалил бы gitignored `.env`/прод-секреты, `data/*.db`/БД, `build/`); явные `-e '.deploy-prev-image-*'` и `-e 'deploy-hook.log'` (untracked-но-НЕ-ignored — иначе сломался бы rollback `do_rollback`); sibling `<repos_dir>/.deploy-state-*`/`.merge-lease-*.json` (под родителем `$REPO`) и `.git/worktrees/*` (внутри `.git/`) — вне области `git clean` в `$REPO`. Каждый git-шаг — `|| log "...continuing"` (never-break): сбой гигиены не ухудшает исход относительно текущего голого pull; на чистой базе блок — no-op (happy-path и exit-коды байт-в-байт). `--build-staging` (build из worktree, без pull) не затронут.

View File

@@ -12,6 +12,7 @@
- **Agent Launcher** (`src/agents/launcher.py`) — запуск Claude CLI агентов в изолированном git worktree, мониторинг, auto-advance. Модель/эффорт каждого агента резолвятся из config (`resolve_agent_model`/`resolve_agent_effort`, ORCH-41), а не из frontmatter промпта. **ORCH-74:** имя модели валидируется форматом `^claude-…$` (`is_valid_model`) перед `--model`; невалидное → лог + откат на следующий уровень/CLI-дефолт (never-break, как `VALID_EFFORTS` для эффорта). Тот же предикат гардит inline-чтение `--fallback-model`. **ORCH-109 ([adr-0040](adr/adr-0040-agent-timeout-budgets-and-launch-model-stamp.md)):** (1) резолвенная **модель стампится в `agent_runs.model` в момент launch** (`_spawn`, объединённый `UPDATE … SET model=?, effort=?` рядом со стампом эффорта ORCH-087; пустой резолв → `NULL`; never-raise) → модель видна не-`null` при любом исходе прогона, включая timeout-kill (`exit_code=-9`), и in-flight в `GET /metrics`/`GET /queue` (`get_running_agents` уже отдаёт `model`); постфактум `record_usage` (`model=COALESCE(?, model)`) остаётся **обогащением**, не единственным источником истины. (2) **Per-role wall-clock бюджеты** через выделенные ключи `agent_timeout_developer_s=3600`/`agent_timeout_reviewer_s=3000` (лестница `_resolve_timeout`: `agent_timeout_overrides_json` → выделенный ключ роли → `agent_timeout_seconds=1800`; прочие роли — байт-в-байт; малформный/вне-диапазонный конфиг → дефолт + WARNING). Инвариант reaper ORCH-065 сохранён синхронным поднятием `reaper_max_running_s` 3600→**5400** (`5400 > max(timeout)3600 + grace20`). FR-5 анти-salvage — структурно: продвижение гейтится `if exit_code==0`, timeout-kill → `_finalize_job` (retry/fail), не advance. `STAGE_TRANSITIONS`/`QG_CHECKS`/схема БД не тронуты.
- **Queue** (`src/queue_worker.py`, ORCH-1) — персистентная очередь задач (SQLite `jobs`), atomic claim, max_concurrency, ретраи, restart-safe. **ORCH-026:** `claim_next_job` гейтит задачи с незавершёнными зависимостями (`job_deps`, `NOT EXISTS`) без занятия слота; декларации/циклы — leaf `src/task_deps.py`.
- **Job-reaper** (`src/job_reaper.py`, ORCH-065 — [adr-0011](adr/adr-0011-job-reaper-lease-reclaim.md)) — фоновый daemon-поток (каркас `reconciler`), стартует/останавливается в `main.lifespan` (после `reconciler.start()` / перед `worker.stop()`). Детектирует «мёртвый» `running`-job **без рестарта** процесса (Tier-1 мёртвый `jobs.pid` после `reaper_dead_ticks` тиков; Tier-2 `agent_runs.exit_code` записан, а job ещё `running`; Tier-3 backstop `reaper_max_running_s`) и приводит строку к корректному статусу через те же контракты (`_try_advance_stage`/`_finalize_job`, gate-driven; exit≠0/неизвестно → `attempts<max``queued`, иначе `failed`+Telegram). Атомарный reap-claim (guard `status='running'`) совместим со стартовым `requeue_running_jobs`. Тот же поток периодически делает проактивный реклейм stale/dead merge-lease (см. ниже). never-raise; kill-switch `ORCH_REAPER_ENABLED`; снимок в `GET /queue` (блок `reaper`). **ORCH-113 ([adr-0043](adr/adr-0043-reaper-finalizer-liveness-ownership.md)):** на ребре `deploy-staging → deploy` тяжёлые edge-под-гейты (security/merge-gate re-test/coverage/image-freshness) исполняются в потоке монитора **после** штампа `finished_at` и **до** `_finalize_job` — минуты, а Tier-2 `finished_age_s` меряется от `finished_at`, поэтому живой долго финализирующий монитор ошибочно реапился (инцидент ORCH-111: повторный re-test → ложный откат `deploy-staging → development` параллельно успешному deploy). Фикс — процесс-локальный реестр владения финализацией (leaf `src/finalizer_liveness.py`, never-raise): монитор `mark()`/`clear()` (try/finally), reaper в Tier-2 при `stage=="deploy-staging"` И активном владении делает **defer** (не повторяет advance); Tier-3 backstop маркер игнорирует (мёртвый/застрявший finalizer добивается в ограниченное время). In-memory restart-safe через `requeue_running_jobs` (вызов до старта reaper); схема БД, `reaper_finalize_grace_s`/`reaper_max_running_s` и сквозной бюджет не тронуты. Kill-switch `ORCH_REAPER_FINALIZER_LIVENESS_ENABLED`.
- **Transition-ownership lease** (`src/transition_lease.py` + таблица `transition_lease`, ORCH-114 — реализовано, [adr-0045](adr/adr-0045-transition-ownership-lease-and-stage-cas.md)) — чистый **never-raise** leaf (паттерн `serial_gate`/`coverage_gate`/`finalizer_liveness`; импортирует только `db`+`config`, лениво `merge_gate.pid_alive`/`qg.checks`/`notifications`; НЕ импортирует `stage_engine`/`launcher`), закрывающий корневой класс инцидент-цепочки ORCH-110/111/112/113 — у side-effectful переходов стадий не было единого владения. Два слоя, оба под единым kill-switch: **(1) durable transition-lease** (владение на ВХОДЕ в side-effectful регион — аддитивная таблица `transition_lease`, `task_id PK`; `acquire` атомарным rowcount-guard `INSERT … ON CONFLICT DO NOTHING` после очистки stale-строки; liveness владельца = `owner_pid`+`owner_boot_id`, НЕ heartbeat → рестарт-recovery бесплатен) и **(2) expected-stage CAS** `db.update_task_stage_cas` (на ЗАПИСИ стадии; проигравший гонку — аборт без побочных эффектов; покрывает и 6 путей записи в обход `advance_stage`). `advance_stage` захватывает lease на side-effectful рёбрах (`deploy-staging`/`deploy`) и освобождает в `try/finally`; job-reaper `_finalizer_owns` обобщён до durable cross-path lease (defer при живом, реклейм мёртвого; Tier-3 backstop игнорирует маркер → bounded; реап force-освобождает lease); reconciler F-1 и Plane-webhook делают defer; `main.lifespan` зовёт `recover_on_startup()` после `requeue_running_jobs`. Lease без своего TTL (потолок = Tier-3 `reaper_max_running_s`) → сквозной бюджет ORCH-065/109/110/113 цел. Скоуп self-hosting (`transition_lease_repos=""` → только `orchestrator`); kill-switch `ORCH_TRANSITION_LEASE_ENABLED=false` → CAS вырождается в прежний `update_task_stage`, lease инертен, reaper → ORCH-113 fallback (байт-в-байт). Наблюдаемость — блок `transition_lease` в `GET /queue` + Telegram-алерт на форсированный/устаревший реклейм + опц. `POST /transition-lease/release?work_item=<id>`. `STAGE_TRANSITIONS`/`QG_CHECKS`/`check_*`/machine-verdict/схемы существующих таблиц — байт-в-байт. Подробнее ниже (§ «Единое владение side-effectful переходами»). Детали — `docs/work-items/ORCH-114/06-adr/ADR-001-transition-ownership-lease-and-stage-cas.md`.
- **Reconciler** (`src/reconciler.py`, ORCH-053 — реализовано, [adr-0007](adr/adr-0007-reconciler.md)) — фоновый daemon-поток (паттерн `queue_worker`), стартует/останавливается в `main.lifespan` (после `worker.start()` / перед `worker.stop()`). Реконсилирует рассинхрон «источник истины ≠ стадия задачи» при потерянном webhook. F-1 gate-side (продвигает застрявшую стадию по локальной БД через штатный `advance_stage(..., finished_agent=None)`), F-2 plane-side (опрос Plane API → `handle_*` из `plane.py`), F-3 (БД-fallback `sha→branch` в `handle_ci_status`). Источник истины — гейт/Plane, не событие; идемпотентность (active-job guard + atomic-claim + grace); kill-switch `ORCH_RECONCILE_ENABLED`. `analysis` F-1 не трогает (человеческий гейт). F-1 также пропускает escalated (retry≥лимита) и Blocked/Needs-Input задачи (ORCH-060). Наблюдаемость — блок `reconcile` в `GET /queue`.
- **Disk-watchdog** (`src/disk_watchdog.py`, ORCH-063 — [adr-0024](adr/adr-0024-disk-watchdog.md)) — фоновый daemon-поток (каркас `reconciler`/`job_reaper`), стартует/останавливается в `main.lifespan` (старт последним — после `reaper.start()`; стоп первым в reverse-порядке; гард `disk_monitor_enabled`). Каждые `disk_monitor_interval_s` (дефолт 300с) меряет заполнение **хост-ФС** по смонтированным bind-путям (`/repos`, `/app/data`) через stdlib `shutil.disk_usage` (не overlay `/` контейнера, не субпроцесс `df`; дедуп путей по `st_dev`). Решение об алерте — pure-функция `decide_action(used_pct, threshold, prev_state, now, realert_s)`: алерт на пересечении порога (дефолт **85%**), cooldown-повтор `disk_monitor_realert_s` (анти-спам, не на каждом тике), однократный recovery при возврате ниже порога. Алерт — `send_telegram` (notifying, best-effort). Состояние анти-спама — in-memory (без миграции БД). never-raise (per-path/per-tick/per-send); только читает и уведомляет — не трогает диск/контейнер, не рестартит прод (self-hosting безопасность). Kill-switch `ORCH_DISK_MONITOR_ENABLED`; снимок — блок `disk_monitor` в `GET /queue` (`enabled`/`threshold_pct`/`interval_s`/`realert_s`/`paths`[`used_pct`/`free_gb`/`alerting`/`last_alert_at`]). `STAGE_TRANSITIONS`/`QG_CHECKS`/схема БД — не тронуты. Детали — `docs/work-items/ORCH-063/06-adr/ADR-001-disk-watchdog.md`.
- **Build-cache-pruner** (`src/build_cache_pruner.py`, ORCH-062 — [adr-0025](adr/adr-0025-build-cache-pruner.md)) — фоновый daemon-поток (каркас `disk_watchdog`), стартует/останавливается в `main.lifespan` (старт последним — после `disk_watchdog.start()`; стоп первым в reverse; гард `build_cache_prune_enabled`). «Вторая половина» disk-watchdog: **watchdog сигналит — pruner убирает**. Каждые `build_cache_prune_interval_s` (дефолт 21600с = 6ч) выполняет **строго `docker builder prune -f --filter until=<until>`** (BuildKit GC; дефолт `until=24h` — удаляет build cache старше суток, тёплый кэш сохраняет; `-a` опционально, только в паре с фильтром). Затрагивает **только** build cache — НЕ образы/контейнеры; рестарт docker daemon/прода не выполняется (self-hosting безопасность). В контейнере нет `docker` CLI (`Dockerfile:11`), поэтому уборка идёт **на хосте через ssh** каналом `deploy_ssh_user@deploy_ssh_host` (как `image_freshness`/`self_deploy`); пустой `deploy_ssh_host` → тик no-op (скоуп на self-host). never-raise (per-команда/per-tick); учёт результата in-memory (без миграции БД). Kill-switch `ORCH_BUILD_CACHE_PRUNE_ENABLED`; снимок — блок `build_cache_prune` в `GET /queue` (`enabled`/`interval_s`/`until`/`last_run_ts`/`last_reclaimed`/`last_error`). `STAGE_TRANSITIONS`/`QG_CHECKS`/схема БД — не тронуты. Детали — `docs/work-items/ORCH-062/06-adr/ADR-001-build-cache-pruner.md`.
@@ -1255,7 +1256,7 @@ finalizer-liveness ownership); детально —
`docs/work-items/ORCH-065/06-adr/ADR-001-job-reaper-and-lease-reclaim.md`,
`docs/work-items/ORCH-113/06-adr/ADR-001-reaper-finalizer-liveness-ownership.md`.
### Единое владение side-effectful переходами: durable transition-lease + stage-CAS (ORCH-114 — design)
### Единое владение side-effectful переходами: durable transition-lease + stage-CAS (ORCH-114 — реализовано)
Корневой класс инцидент-цепочки ORCH-110/111/112/113: **у side-effectful переходов стадий
нет единого владения**. `db.update_task_stage` — голый `UPDATE … WHERE id=?` без CAS;
`advance_stage` ре-ентерабельна и исполняет минуты-длинные необратимые под-гейты

View File

@@ -188,6 +188,21 @@ CREATE TABLE events (
payload TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- ORCH-114 (adr-0045): durable transition-ownership lease. ОДНА аддитивная таблица
-- (CREATE TABLE IF NOT EXISTS, паттерн repo_freeze/coverage_baseline/lessons) — одна
-- строка = ≤1 активный владелец side-effectful перехода задачи. Живость владельца =
-- owner_boot_id (нонс старта процесса; рестарт ⇒ смена ⇒ прежний lease мёртв) +
-- pid_alive(owner_pid). БЕЗ epoch/version-колонки на tasks (стадия = версия CAS).
CREATE TABLE transition_lease (
task_id INTEGER PRIMARY KEY,
owner TEXT NOT NULL, -- monitor|reaper|reconciler|webhook|finalizer|engine
owner_pid INTEGER,
owner_boot_id TEXT,
run_id INTEGER,
stage TEXT, -- from-стадия захвата (контекст/наблюдаемость)
acquired_at TEXT DEFAULT (datetime('now'))
);
```
## Deployment
@@ -369,7 +384,13 @@ status='queued'` и проверяет `rowcount`. При гонке двух т
В `main.py` lifespan **после** M-1 orphan-recovery вызывается `requeue_running_jobs()`:
jobs со статусом `running` (воркер умёр на рестарте) → возвращаются в `queued`.
Потом стартует воркер; на shutdown — `worker.stop()` (Event.set + join).
**ORCH-114 (adr-0045):** сразу следом вызывается `transition_lease.recover_on_startup()`
новый процесс имеет свежий `boot_id`, поэтому ВСЕ записанные ранее `transition_lease`
устарели (boot-id mismatch) → реклеймятся, и только что requeued-jobs переисполняют свои
side-effectful переходы **последовательно** (один владелец), без двойного необратимого
эффекта. Идемпотентность самого re-drive обеспечивают существующие авторитетные факты
(SHA-in-main ORCH-071/073, маркер `INITIATED` ORCH-036, coverage-ratchet CAS ORCH-027) —
НЕ новый recovery-мозг. Потом стартует воркер; на shutdown — `worker.stop()` (Event.set + join).
### Job-reaper (ORCH-065, рестарт НЕ требуется)

View File

@@ -590,6 +590,43 @@ class Settings(BaseSettings):
lease_reclaim_enabled: bool = True
reaper_finalizer_liveness_enabled: bool = True
# ORCH-114 (adr-0045): durable transition-ownership lease + expected-stage CAS for
# side-effectful stage transitions. Generalises the process-local ORCH-113
# finalizer-liveness to a DURABLE, cross-path owner-exclusion (additive table
# `transition_lease`) so a concurrent OR post-restart re-entry into a side-effectful
# transition (reaper / reconciler / webhook / startup-requeue) is deferred or a
# no-op instead of re-applying an irreversible effect (merge_pr / coverage-ratchet /
# image-rebuild / prod-deploy initiation / contradictory rollback↔done). Two
# complementary layers, both gated by the SINGLE kill-switch below:
# (1) durable lease on ENTRY to the side-effectful region (a second actor seeing a
# live owner does not start the heavy sub-gates at all — prevention, not repair);
# (2) expected-stage CAS on the stage WRITE (update_task_stage_cas: a lost race ->
# abort with NO side effect), which also closes the 6 paths that write the
# stage in bypass of advance_stage (gitea/plane direct update_task_stage).
# Liveness of the owner = owner_pid + owner_boot_id (NOT a heartbeat — a blocking
# 900s merge re-test cannot beat a heartbeat; ADR-001 D3), which makes restart
# recovery free (a new process -> new boot_id -> all prior leases are instantly
# stale -> reclaimed). The lease has NO own TTL: its hard age ceiling IS the reaper
# Tier-3 backstop reaper_max_running_s (5400), so the cross-cutting budget invariant
# ORCH-065/109/110/113 is untouched. STAGE_TRANSITIONS / QG_CHECKS / check_* /
# machine-verdict keys / existing table schemas — byte-for-byte. never-raise:
# hot-path guard fail-open (never wedge the shared queue), prod-safety fail-closed.
# See docs/work-items/ORCH-114/06-adr/ADR-001-transition-ownership-lease-and-stage-cas.md
# and the cross-cutting docs/architecture/adr/adr-0045-…md.
# transition_lease_enabled -> SINGLE kill-switch (env ORCH_TRANSITION_LEASE_ENABLED).
# False -> the lease is neither written nor read AND the
# CAS degenerates to the prior unconditional
# update_task_stage -> behaviour byte-for-byte as before
# ORCH-114 (reaper -> ORCH-113 in-memory fallback,
# reconciler/webhook skip-guard inert). Default True.
# transition_lease_repos -> CSV scope (env ORCH_TRANSITION_LEASE_REPOS). Empty ->
# applies ONLY to the self-hosting repo (orchestrator),
# where the irreversible side-effectful edges live;
# non-empty -> only the listed repos. Mirrors
# coverage_gate_repos -> enduro untouched at the default.
transition_lease_enabled: bool = True
transition_lease_repos: str = ""
# ORCH-063: disk-watchdog — background heartbeat that measures host-FS fill via
# the mounted bind-paths and Telegram-alerts the operator at >= threshold. On
# 07.06.2026 the mva154 host disk silently hit 100% and stalled the WHOLE

View File

@@ -263,6 +263,28 @@ def init_db():
_ensure_column(conn, "lessons", "attribution", "TEXT")
_ensure_column(conn, "lessons", "target_repo", "TEXT")
_ensure_column(conn, "lessons", "target_domain", "TEXT")
# ORCH-114 (adr-0045 / 08-data-requirements.md): durable transition-ownership
# lease. ONE additive object (CREATE TABLE IF NOT EXISTS, pattern repo_freeze/
# coverage_baseline/lessons) -> idempotent, restart-safe on the shared prod DB;
# existing tables (tasks/jobs/agent_runs/...) untouched byte-for-byte (NFR-3,
# AC-11). One row per task = at most one active owner of a side-effectful
# transition. Liveness of the holder = owner_boot_id (this process's start nonce)
# + owner_pid (os.getpid of the holding process); a row from a previous boot is
# instantly stale on restart -> reclaimed (ADR-001 D3). No index needed (access by
# PK task_id; snapshot() is a full-scan over a tiny table). The src/transition_lease.py
# leaf wraps all access in its never-raise contract. NO epoch/version column (D2:
# for the one-process model the stage IS the CAS version).
conn.executescript("""
CREATE TABLE IF NOT EXISTS transition_lease (
task_id INTEGER PRIMARY KEY,
owner TEXT NOT NULL,
owner_pid INTEGER,
owner_boot_id TEXT,
run_id INTEGER,
stage TEXT,
acquired_at TEXT NOT NULL DEFAULT (datetime('now'))
);
""")
conn.commit()
conn.close()
@@ -679,6 +701,39 @@ def update_task_stage(task_id: int, stage: str):
conn.close()
def update_task_stage_cas(task_id: int, expected_stage: str, new_stage: str) -> bool:
"""ORCH-114 (adr-0045 / FR-2): compare-and-swap variant of update_task_stage.
Writes the stage ONLY when the task is still at ``expected_stage`` (the value the
caller read before running the side-effectful region) — ``UPDATE … SET stage=?
WHERE id=? AND stage=?`` — and reports whether THIS writer won. Returns:
* ``True`` -> ``rowcount == 1``: the CAS succeeded, the stage moved exactly once.
* ``False`` -> ``rowcount == 0``: the task is no longer at ``expected_stage``
(another actor already advanced/rolled it back, or the row is gone) -> the
caller MUST abort WITHOUT applying any side effect (merge_pr / ratchet /
rebuild / deploy-init / enqueue) — it lost the race.
In the current one-process model each side-effectful edge leads to a DISTINCT
next stage, so the stage itself is a complete version for the compare-and-swap;
no separate epoch/version column is needed (ADR-001 D2). The plain
``update_task_stage`` above is kept unchanged for the kill-switch-off path and
for non-side-effectful writes. Mirrors the atomic rowcount-guard idiom of
``claim_next_job`` / ``reap_running_job``.
"""
conn = get_db()
try:
cur = conn.execute(
"UPDATE tasks SET stage = ?, updated_at = datetime('now') "
"WHERE id = ? AND stage = ?",
(new_stage, task_id, expected_stage),
)
conn.commit()
return cur.rowcount == 1
finally:
conn.close()
# ---------------------------------------------------------------------------
# ORCH-019: bug-fast-track task type (tasks.track) helpers
# ---------------------------------------------------------------------------

View File

@@ -434,18 +434,35 @@ class JobReaper:
return None, None, None
def _finalizer_owns(self, job: dict) -> bool:
"""ORCH-113 (adr-0043 / D3): True iff a LIVE monitor still owns this job's
``deploy-staging`` finalization, so the Tier-2 reap must be deferred.
"""True iff a LIVE actor still owns this job's side-effectful finalization, so
the Tier-2 reap must be deferred.
Order matters for the zero-regression contract: the kill-switch is checked
FIRST (disabled -> ``False`` with no DB lookup, so the path is byte-for-byte
prior); then the stage is scoped to ``deploy-staging`` only (the sole edge
whose in-thread finalization runs for minutes — every other stage is left
untouched); only then is the process-local ownership marker consulted. Never
raises -> ``False`` on any error (conservative: never block reaping when
ownership is unknowable, so the Tier-3 backstop is never neutered).
ORCH-114 (adr-0045 / D6) GENERALISES the ORCH-113 process-local, Tier-2,
``deploy-staging``-only marker to a DURABLE, cross-path lease: when the
transition-lease applies to this repo, consult ``transition_lease`` keyed on
the task (covers EVERY relevant edge — deploy-staging AND deploy->done — and
survives restart). Otherwise (kill-switch off) fall back to the unchanged
ORCH-113 in-memory ``finalizer_liveness`` (Tier-2 / ``deploy-staging`` only),
so the disabled path is byte-for-byte prior.
Either way the Tier-3 backstop (``reaper_max_running_s``) IGNORES this marker
(it does not call here), so a stuck/dead finalizer is still reaped in bounded
time. Never raises -> ``False`` on any error (conservative: never block reaping
when ownership is unknowable, so the backstop is never neutered).
"""
try:
repo = job.get("repo")
# ORCH-114: durable cross-path lease (when enabled for this repo).
try:
from . import transition_lease
if transition_lease.applies(repo):
return transition_lease.is_held_by_live_owner(job.get("task_id"))
except Exception as e: # noqa: BLE001 - fall back to ORCH-113 on any error
logger.warning(
"reaper: transition-lease check failed for job %s: %s",
job.get("id"), e,
)
# ORCH-113 fallback (kill-switch off): process-local, Tier-2/deploy-staging.
if not settings.reaper_finalizer_liveness_enabled:
return False
_branch, stage, _wid = self._task_meta(job)
@@ -472,6 +489,18 @@ class JobReaper:
def _note_reap(self, job: dict, outcome: str, reason: str) -> None:
"""Record + log one successful reap (Р-6 observability)."""
# ORCH-114 (adr-0045 / D6): a reap reclaims the job, so its durable
# transition-lease must NOT outlive it — force-release (any owner/boot) so a
# requeued job can re-acquire cleanly. never-raise; no-op when the lease is
# disabled / no row exists.
try:
from . import transition_lease
transition_lease.release(job.get("task_id"), force=True)
except Exception as e: # noqa: BLE001 - never break the reap
logger.warning(
"reaper: transition-lease force-release failed for job %s: %s",
job.get("id"), e,
)
self.reaped_total += 1
self.last_reaped = {
"job_id": job.get("id"),

View File

@@ -60,6 +60,25 @@ async def lifespan(app: FastAPI):
if requeued:
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
# ORCH-114 (adr-0045 / D7 / FR-4): clear durable transition-leases left by the
# PREVIOUS process boot. This process has a fresh boot_id, so every prior lease is
# stale by construction -> reclaim it so the just-requeued jobs can re-drive their
# side-effectful transitions cleanly. Idempotency of the re-drive comes from the
# authoritative durable facts (SHA-in-main / the INITIATED self-deploy marker /
# the coverage-ratchet CAS), NOT from a new recovery brain — the lease only
# guarantees the re-drive runs SEQUENTIALLY (one owner), never concurrently. Runs
# AFTER requeue_running_jobs and BEFORE the reaper starts. never raises.
try:
from . import transition_lease
cleared_leases = transition_lease.recover_on_startup()
if cleared_leases:
log.warning(
f"Transition-lease recovery: cleared {cleared_leases} stale lease(s) "
f"from a previous boot"
)
except Exception as e:
log.warning(f"Transition-lease recovery skipped: {e}")
# ORCH-065: proactive startup reclaim of dead/stale merge-leases, next to the
# queue-recovery above. A lease held by the previous (now dead) process pid is
# released at once instead of waiting for the TTL / a foreign acquire so the
@@ -215,6 +234,7 @@ async def queue():
from . import bug_fast_track
from . import lessons
from . import checkout_hygiene
from . import transition_lease
from .disk_watchdog import disk_watchdog
from .build_cache_pruner import build_cache_pruner
return {
@@ -258,6 +278,11 @@ async def queue():
# ORCH-112 (D3): deploy-base checkout-hygiene observability (read-only) —
# kill-switch + scope. Additive block; never-raise.
"checkout_hygiene": checkout_hygiene.snapshot(),
# ORCH-114 (adr-0045 / D10 / FR-6): durable transition-ownership lease
# observability (read-only) — kill-switch, scope, boot_id, active holders
# (owner/stage/age/live) + defer/reclaim/CAS-lost counters. Additive block;
# never-raise.
"transition_lease": transition_lease.snapshot(),
# ORCH-098 (FR-4 / AC-4): lessons-journal observability (read-only) —
# kill-switch + counts by type/status + last N lessons. Additive block;
# never-raise (snapshot() returns {"enabled": ...} minimum on error).
@@ -324,6 +349,39 @@ async def serial_gate_unfreeze(repo: str = ""):
return {"ok": True, "repo": repo, "cleared": cleared, "frozen": frozen}
@app.post("/transition-lease/release")
async def transition_lease_release(work_item: str = ""):
"""ORCH-114 (adr-0045 / D10): operator manual reclaim of a stuck transition-lease.
By образцу ``POST /serial-gate/unfreeze``: if a lease somehow outlives its owner
(the normal try/finally release + the reaper force-release + the Tier-3 backstop
should make this unnecessary), an operator can force-release it by work-item id so
a re-approve / the reconciler can re-drive the transition. Idempotent: releasing a
free task reports ``released: false``. Read-only/never-raise otherwise.
"""
from . import transition_lease
from .db import get_task_by_work_item_id
if not work_item or not work_item.strip():
return {"ok": False, "error": "missing 'work_item'", "work_item": work_item}
work_item = work_item.strip()
task = get_task_by_work_item_id(work_item)
if not task:
return {"ok": False, "error": "task not found", "work_item": work_item}
task_id = task["id"]
held_before = transition_lease.is_held_by_live_owner(task_id)
transition_lease.release(task_id, force=True)
if held_before:
try:
from .notifications import send_telegram, link_for
send_telegram(
f"🔓 {link_for(work_item)}: transition-lease сброшен вручную "
f"(task {task_id}). Переход может быть пере-исполнен."
)
except Exception:
pass
return {"ok": True, "work_item": work_item, "task_id": task_id, "released": held_before}
@app.post("/fs-normalize/check")
async def fs_normalize_check(normalize: bool = False):
"""ORCH-057 (D6 / AC-4): force a fresh legacy-ownership detect (bypass the TTL

View File

@@ -70,6 +70,7 @@ from .webhooks.plane import handle_status_start, handle_verdict
from .notifications import send_telegram, link_for
from . import projects
from . import task_deps
from . import transition_lease
logger = logging.getLogger("orchestrator.reconciler")
@@ -153,6 +154,10 @@ class Reconciler:
# ORCH-068 observability: terminal-state skips and dedup suppressions.
self.skipped_terminal_total: int = 0
self.deduped_total: int = 0
# ORCH-114 (adr-0045 / FR-5): F-1 advances deferred because a live actor owns
# the task's side-effectful transition (transition-lease active). Reset on
# restart (safe: a live lease is itself recovered/reclaimed on restart).
self.transition_lease_defers_total: int = 0
# ORCH-068 (TR-3): in-memory dedup guard {issue_id -> last unblocked
# state uuid}. Best-effort (resets on restart, like unblocked_total);
# suppresses a repeat unblock notification for the same issue+state.
@@ -246,6 +251,19 @@ class Reconciler:
if cyc:
task_deps.handle_cycle(cyc)
return
# ORCH-114 (adr-0045 / FR-5, AC-7): a live actor already owns this task's
# side-effectful transition -> F-1 must NOT advance it in parallel. Silent
# defer (mirrors the escalated/Blocked/task-deps skip-guards above); the owner
# finishes the transition or, on death, the reaper reclaims it in bounded time.
# fail-safe: is_held_by_live_owner is conservative (True on doubt -> defer).
# never raises; no-op (False) when the lease is disabled / repo out of scope.
if transition_lease.is_held_by_live_owner(task_id):
self.transition_lease_defers_total += 1
logger.debug(
f"reconciler F-1: task {task_id} has an active transition-lease — "
f"deferring advance to its owner"
)
return
result = advance_if_gate_passed(
task_id,
stage,
@@ -596,6 +614,8 @@ class Reconciler:
# ORCH-068 observability.
"skipped_terminal_total": self.skipped_terminal_total,
"deduped_total": self.deduped_total,
# ORCH-114 observability: F-1 advances deferred to a live lease owner.
"transition_lease_defers_total": self.transition_lease_defers_total,
}

View File

@@ -41,6 +41,7 @@ from . import self_deploy
from . import post_deploy
from . import labels
from . import bug_fast_track
from . import transition_lease
from .notifications import (
notify_stage_change,
notify_qg_failure,
@@ -173,6 +174,20 @@ def developer_retry_count(task_id: int) -> int:
_developer_retry_count = developer_retry_count
def _is_side_effectful_edge(current_stage: str | None, next_stage: str | None) -> bool:
"""ORCH-114 (adr-0045 D4): does this ``advance_stage`` edge run IRREVERSIBLE work
that must be owned by exactly one actor (lease on entry)?
* ``deploy-staging`` (-> deploy): the heavy edge sub-gates (security / merge-gate
re-test / coverage / image-freshness rebuild) + Phase A.
* ``deploy`` (-> done OR Phase B): merge_pr / coverage-ratchet / proof-of-merge,
or the detached prod-deploy initiation (confirm_deploy).
Every other edge (created -> … -> testing) is reversible and is protected by the
CAS-on-write alone (no lease). Pure, never raises.
"""
return current_stage in ("deploy-staging", "deploy")
def advance_stage(
task_id: int,
current_stage: str,
@@ -210,6 +225,12 @@ def advance_stage(
"""
result = AdvanceResult(from_stage=current_stage)
agent = finished_agent
# ORCH-114 (adr-0045): set True once we acquire the durable transition-lease on a
# side-effectful edge, so the finally below ALWAYS releases it (on success, on a
# lost CAS, on a sub-gate rollback, and on ANY exception caught by the outer
# except). Released holder-aware (this process only) so a reaper reclaim + reacquire
# in between is never clobbered.
_lease_held = False
try:
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
@@ -240,6 +261,28 @@ def advance_stage(
result.note = "terminal"
return result
# --- ORCH-114 transition-ownership lease: acquire on ENTRY (ADR-001 D5) ----
# On a side-effectful edge (deploy-staging / deploy) acquire the DURABLE
# owner-exclusion lease BEFORE the Phase B / sub-gate / merge-verify region. A
# second concurrent actor (reaper / reconciler / webhook / a re-driven startup
# job) that sees a live owner gets a clean "busy" defer here and does NOT start
# the heavy region at all — this is what kills the double-effect class
# (incident ORCH-111) at the root. Released in the `finally` below. Kill-switch
# off / repo out of scope -> applies() False -> no lease, byte-for-byte prior.
if _is_side_effectful_edge(current_stage, next_stage) and transition_lease.applies(repo):
if not transition_lease.acquire(
task_id, finished_agent or "engine", run_id=None, stage=current_stage
):
logger.info(
f"Task {task_id}: transition-lease busy on "
f"{current_stage}->{next_stage} — deferring (another actor owns "
f"this transition)"
)
result.note = "transition-lease-busy"
result.advanced = False
return result
_lease_held = True
# --- ORCH-036/059 Phase B: "Confirm Deploy" on `deploy` -> initiate ----
# ORCH-059: the prod-deploy trigger is now the DEDICATED "Confirm Deploy"
# status (confirm_deploy=True), NOT the overloaded "Approved". On the
@@ -399,7 +442,23 @@ def advance_stage(
return result
# --- Advance ---------------------------------------------------------
update_task_stage(task_id, next_stage)
# ORCH-114 (adr-0045 / FR-2): expected-stage compare-and-swap. Writes the
# stage only if the task is STILL at current_stage (the value we read on
# entry); a lost race (another writer advanced/rolled back first) returns
# False -> abort here WITHOUT any side effect (no notify / no arm / no
# terminal-sync / no enqueue). Kill-switch off / repo out of scope ->
# degenerates to the prior unconditional update_task_stage (returns True) ->
# byte-for-byte prior behaviour. Defense-in-depth: under the lease acquired
# above this CAS practically always wins; it also covers the narrow
# consult->acquire window and any bypass writer (TR-5).
if not transition_lease.commit_stage_cas(task_id, current_stage, next_stage, repo):
logger.info(
f"Task {task_id}: stage-CAS lost on {current_stage}->{next_stage}"
f"aborting without side effects (another writer advanced first)"
)
result.note = "stage-cas-lost"
result.advanced = False
return result
# Telegram live tracker: the analysis->architecture advance is the human
# Approved gate clearing -> stamp the END of "Ревью БРД" (the only
# human time). Idempotent: only the first stamp counts.
@@ -510,6 +569,16 @@ def advance_stage(
logger.error(f"advance_stage failed for task_id={task_id}: {e}")
result.note = f"error: {e}"
return result
finally:
# ORCH-114 (adr-0045 / AC-3): release the transition-lease on EVERY exit —
# normal advance, lost CAS, sub-gate rollback, Phase A/B early return, and any
# exception caught above — so the lease never "leaks" and wedges the task.
# holder-aware (force=False): only releases a row this process owns.
if _lease_held:
try:
transition_lease.release(task_id)
except Exception as e: # noqa: BLE001 - never-raise (Tier-3 backstop bounds it)
logger.warning(f"Task {task_id}: transition-lease release failed: {e}")
def advance_if_gate_passed(
@@ -1482,7 +1551,21 @@ def _handle_self_deploy_phase_a(
restart-safe `approve-requested` marker records that Phase A ran. The merge
lease stays HELD.
"""
update_task_stage(task_id, "deploy")
# ORCH-114 (adr-0045 / D4): this IS the deploy-staging -> deploy stage write on
# the self-hosting path (advance_stage's line-402 CAS is not reached — Phase A
# returns first). Use the same expected-stage CAS. It runs under the transition-
# lease acquired by advance_stage, so it practically always wins; a lost CAS
# (a concurrent writer despite the lease) -> abort Phase A WITHOUT initiating the
# prod-deploy ask / autoDeploy (no double effect). Kill-switch off / repo out of
# scope -> unconditional update (byte-for-byte).
if not transition_lease.commit_stage_cas(task_id, current_stage, "deploy", repo):
logger.info(
f"Task {task_id}: Phase A stage-CAS lost ({current_stage}->deploy) — "
f"aborting Phase A without side effects"
)
result.note = "phase-a-cas-lost"
result.advanced = False
return
notify_stage_change(task_id, current_stage, "deploy")
result.advanced = True
result.to_stage = "deploy"

471
src/transition_lease.py Normal file
View File

@@ -0,0 +1,471 @@
"""ORCH-114 (adr-0045): durable transition-ownership lease + expected-stage CAS.
Leaf module — pure, never-raise (pattern of ``serial_gate`` / ``coverage_gate`` /
``finalizer_liveness``: imports only ``db`` + ``config`` and lazily
``merge_gate.pid_alive`` / ``qg.checks.is_self_hosting_repo`` / ``notifications``;
it NEVER imports ``stage_engine`` / ``launcher`` and talks to no network).
The bug class it closes
-----------------------
``stage_engine.advance_stage`` is the single entry to side-effectful transitions
(the heavy ``deploy-staging -> deploy`` edge sub-gates — security / merge-gate
re-test / coverage / image-freshness — and the ``deploy -> done`` merge-verify:
``merge_pr`` / coverage-ratchet / proof-of-merge). It is RE-ENTERABLE: at least
five actors (monitor / Plane-webhook / reconciler F-1 / job-reaper / deploy
finalizer) can enter the SAME transition independently, and the stage write was a
bare ``UPDATE … WHERE id=?`` with no compare-and-swap. Two concurrent — or a
post-restart re-driven — entry therefore re-applied irreversible effects and
produced contradictory outcomes (one path rolled back to ``development`` while
another merged + finished — incident ORCH-111, job 1914 / PR #130). ORCH-113
closed only the in-memory, Tier-2, ``deploy-staging``-only slice of this; it is
lost on restart.
Two complementary layers (ADR-001 D1), both gated by one kill-switch:
1. **Durable lease (owner-exclusion on ENTRY).** A row in the additive
``transition_lease`` table (one per task) records "an actor owns this task's
side-effectful transition". A second actor that sees a LIVE owner does not
start the heavy sub-gates AT ALL (prevention, not post-hoc repair).
2. **Expected-stage CAS (atomicity on the WRITE).** ``update_task_stage_cas``
writes the stage only when the task is still at the expected stage; a lost
race aborts with NO side effect. It also closes the six paths that write the
stage in BYPASS of ``advance_stage`` (gitea / plane direct ``update_task_stage``).
Liveness without a heartbeat (ADR-001 D3)
-----------------------------------------
An owner is LIVE ⇔ ``owner_boot_id == <this process's boot id>`` AND
``merge_gate.pid_alive(owner_pid)``. There is NO heartbeat (a blocking 900 s merge
re-test cannot beat one — the very argument ORCH-113 used to reject heartbeats).
This makes restart recovery free: a new process has a new ``boot_id`` so every row
written by a previous process is instantly stale and reclaimed
(``recover_on_startup``). Within the one-process model every live owner shares the
SAME boot id and pid, so a same-boot row is by definition owned by the (alive)
current process; only a different-boot row can be stale — which is why the
acquire/recover logic keys staleness on the boot id.
No own TTL (ADR-001 D8): the lease's hard age ceiling IS the reaper Tier-3 backstop
``reaper_max_running_s`` (the reaper force-releases the lease when it reaps), so the
cross-cutting budget invariant ORCH-065/109/110/113 is untouched.
never-raise (ADR-001 D9 / NFR-1): every public function is isolated. The
directional defaults:
* ``acquire`` error -> ``False`` (busy): the caller DEFERS/aborts a side-effectful
transition rather than risk a double effect (fail-CLOSED to no-double-effect).
* ``is_held_by_live_owner`` error -> ``True`` (treat as held): the consulting
reconciler / webhook / reaper conservatively DEFERS (the safe action; the reaper
Tier-3 backstop still bounds a genuinely stuck task).
* ``commit_stage_cas`` error on the CAS path -> ``False``: abort the write, never a
blind overwrite.
The hot claim path (``db.claim_next_job``) is deliberately NOT touched, so a lease
bug can never wedge the shared queue of all projects (AC-8 ORCH-088 intact).
See docs/work-items/ORCH-114/06-adr/ADR-001-transition-ownership-lease-and-stage-cas.md
and the cross-cutting docs/architecture/adr/adr-0045-transition-ownership-lease-and-stage-cas.md.
"""
from __future__ import annotations
import logging
import os
import secrets
import threading
from . import db
from .config import settings
logger = logging.getLogger("orchestrator.transition_lease")
# Per-process boot nonce (ADR-001 D3). Generated ONCE at import: every lease row a
# previous process wrote carries a DIFFERENT boot id and is therefore instantly
# stale after a restart -> reclaimed by recover_on_startup / acquire. Not derived
# from the clock so it cannot collide across a fast restart.
_BOOT_ID = secrets.token_hex(16)
# Best-effort observability counters (reset on restart, like the reaper's). Guarded
# by a lock because the monitor / reaper / reconciler / webhook threads all touch
# them. Never a source of truth — purely for GET /queue.
_LOCK = threading.Lock()
_COUNTERS: dict[str, int] = {
"acquired_total": 0, # leases successfully acquired
"busy_total": 0, # acquire deferred — a live owner already held it
"released_total": 0, # normal try/finally releases
"cas_lost_total": 0, # stage-CAS lost the race (aborted without side effect)
"stale_reclaims_total": 0, # rows reclaimed because the owner was not live
"force_reclaims_total": 0, # rows force-released (reaper / operator)
}
def _bump(key: str, n: int = 1) -> None:
try:
with _LOCK:
_COUNTERS[key] = _COUNTERS.get(key, 0) + n
except Exception: # noqa: BLE001 - counters never break a caller
pass
def boot_id() -> str:
"""This process's boot nonce (exposed for tests / observability)."""
return _BOOT_ID
# ---------------------------------------------------------------------------
# Conditionality (mirrors coverage_gate_applies — self-hosting-only by default)
# ---------------------------------------------------------------------------
def _enabled() -> bool:
try:
return bool(getattr(settings, "transition_lease_enabled", False))
except Exception: # noqa: BLE001
return False
def applies(repo: str) -> bool:
"""Whether the transition-lease + CAS are REAL for this repo (ADR-001 D10).
* ``transition_lease_enabled=False`` -> always False (kill-switch; the lease is
neither written nor read AND ``commit_stage_cas`` degenerates to the prior
unconditional ``update_task_stage`` -> behaviour byte-for-byte as before
ORCH-114).
* ``transition_lease_repos`` (CSV) non-empty -> real only for the listed repos.
* empty CSV -> real ONLY for the self-hosting repo (``orchestrator``), where the
irreversible side-effectful edges live (mirrors coverage_gate_repos -> enduro
untouched at the default).
Never raises -> False on error (the safe "mechanism inert" default == kill-switch
off).
"""
try:
if not _enabled():
return False
raw = (getattr(settings, "transition_lease_repos", "") or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
return (repo or "").strip().lower() in allowed
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("transition_lease.applies error for %s: %s", repo, e)
return False
# ---------------------------------------------------------------------------
# Liveness
# ---------------------------------------------------------------------------
def _pid_alive(pid) -> bool:
"""Probe ``pid`` liveness via ``merge_gate.pid_alive`` (ADR-001 references it for
a single shared semantics). Lazy import keeps this module a leaf; on import error
fall back to a conservative ``True`` (a lease whose pid we cannot probe is treated
as live — the boot-id check below + the Tier-3 backstop still bound it).
"""
try:
from .merge_gate import pid_alive
return pid_alive(pid)
except Exception: # noqa: BLE001
return True
def _row_is_live(owner_boot_id, owner_pid) -> bool:
"""True iff the lease owner is LIVE (this process's boot AND a live pid).
A row from a DIFFERENT boot id (a previous process) is dead by construction
(ADR-001 D3); a same-boot row is owned by the current — alive — process, but we
still probe the pid for forward-compatibility with a future multi-process model.
"""
if owner_boot_id != _BOOT_ID:
return False
return _pid_alive(owner_pid)
def is_held_by_live_owner(task_id) -> bool:
"""True iff an active lease row for ``task_id`` is owned by a LIVE actor.
Consulted by the reconciler F-1 / Plane-webhook DEFER guards and the reaper.
Returns ``False`` when there is no row or the owner is stale. Fail-CLOSED on any
error -> ``True`` (treat as held): the consulting caller DEFERS, which is always
the safe-against-double-effect action (the reaper Tier-3 backstop still bounds a
truly stuck task). When the mechanism is disabled -> ``False`` (no defer).
"""
if task_id is None:
return False
if not _enabled():
return False
try:
conn = db.get_db()
try:
row = conn.execute(
"SELECT owner_boot_id, owner_pid FROM transition_lease WHERE task_id=?",
(task_id,),
).fetchone()
finally:
conn.close()
if row is None:
return False
return _row_is_live(row["owner_boot_id"], row["owner_pid"])
except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt (conservative defer)
logger.warning(
"transition_lease.is_held_by_live_owner error for task %s -> "
"fail-CLOSED (defer): %s", task_id, e,
)
return True
# ---------------------------------------------------------------------------
# Acquire / release / reclaim
# ---------------------------------------------------------------------------
def _clear_stale_row(conn, task_id) -> int:
"""Delete the lease row for ``task_id`` IFF its owner is not live. Returns the
rowcount. Uses the caller's connection (same transaction as the INSERT in
``acquire``). Raises on a real DB fault (the caller swallows)."""
row = conn.execute(
"SELECT owner_boot_id, owner_pid FROM transition_lease WHERE task_id=?",
(task_id,),
).fetchone()
if row is None:
return 0
if _row_is_live(row["owner_boot_id"], row["owner_pid"]):
return 0
cur = conn.execute("DELETE FROM transition_lease WHERE task_id=?", (task_id,))
return cur.rowcount or 0
def acquire(task_id, owner: str, run_id=None, stage: str | None = None) -> bool:
"""Acquire the side-effectful-transition lease for ``task_id`` (ADR-001 D5).
Atomic rowcount-guard (pattern ``claim_next_job`` / ``reap_running_job``): a stale
row (owner from a previous boot / dead pid) is cleared first, then an
``INSERT … ON CONFLICT(task_id) DO NOTHING`` competes only with LIVE same-process
owners. ``rowcount == 1`` -> WE won. ``rowcount == 0`` -> a live owner already
holds it -> ``False`` (the caller DEFERS without starting the heavy region).
Kill-switch off -> ``True`` (no-op acquire; the caller proceeds exactly as before
ORCH-114; ``release`` is then an idempotent no-op). ``task_id is None`` -> ``True``
(a job with no task cannot be leased — legacy direct ``launch()``; proceed).
never-raise: any error -> ``False`` (busy) so the caller DEFERS a side-effectful
transition rather than risk a double effect (fail-CLOSED to no-double-effect,
ADR-001 D9).
"""
if not _enabled():
return True
if task_id is None:
return True
try:
conn = db.get_db()
try:
_clear_stale_row(conn, task_id)
cur = conn.execute(
"INSERT INTO transition_lease "
"(task_id, owner, owner_pid, owner_boot_id, run_id, stage) "
"VALUES (?, ?, ?, ?, ?, ?) "
"ON CONFLICT(task_id) DO NOTHING",
(task_id, owner or "engine", os.getpid(), _BOOT_ID, run_id, stage),
)
conn.commit()
won = (cur.rowcount == 1)
finally:
conn.close()
if won:
_bump("acquired_total")
return True
_bump("busy_total")
logger.info(
"transition_lease: task %s busy (a live owner holds the transition); "
"%s defers", task_id, owner,
)
return False
except Exception as e: # noqa: BLE001 - fail-CLOSED (busy) to avoid double effects
logger.warning("transition_lease.acquire error for task %s: %s", task_id, e)
return False
def release(task_id, force: bool = False) -> None:
"""Release the lease for ``task_id`` (ADR-001 D5). Idempotent, never raises.
* ``force=False`` (normal try/finally release in ``advance_stage``): delete only
a row owned by THIS process (``owner_boot_id == boot``), so a release delayed
past a reaper-reclaim-then-reacquire can never delete a lease a DIFFERENT
process/owner acquired afterwards (holder-aware, mirrors ``release_merge_lease``).
* ``force=True`` (reaper reap / operator endpoint): delete unconditionally.
"""
if task_id is None:
return
if not _enabled():
return
try:
conn = db.get_db()
try:
if force:
cur = conn.execute(
"DELETE FROM transition_lease WHERE task_id=?", (task_id,)
)
else:
cur = conn.execute(
"DELETE FROM transition_lease WHERE task_id=? AND owner_boot_id=?",
(task_id, _BOOT_ID),
)
conn.commit()
n = cur.rowcount or 0
finally:
conn.close()
if n:
_bump("force_reclaims_total" if force else "released_total", n)
except Exception as e: # noqa: BLE001 - never-raise (a leaked lease is bounded by Tier-3)
logger.warning("transition_lease.release error for task %s: %s", task_id, e)
def reclaim_if_stale(task_id) -> bool:
"""Reclaim (delete) the lease row for ``task_id`` IFF its owner is not live.
Returns True iff a stale row was reclaimed. Used by the operator endpoint and as
a backstop. never-raise -> False on error.
"""
if task_id is None or not _enabled():
return False
try:
conn = db.get_db()
try:
n = _clear_stale_row(conn, task_id)
conn.commit()
finally:
conn.close()
if n:
_bump("stale_reclaims_total", n)
logger.warning("transition_lease: reclaimed stale lease for task %s", task_id)
return n > 0
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("transition_lease.reclaim_if_stale error for task %s: %s", task_id, e)
return False
def recover_on_startup() -> int:
"""Clear every lease left by a PREVIOUS process boot (ADR-001 D7).
Called from ``main.lifespan`` right after ``requeue_running_jobs`` and BEFORE the
reaper starts. A fresh process boot id means every existing row predates this
process -> stale -> deleted, so the requeued jobs re-drive their transitions
cleanly (idempotency comes from the authoritative durable facts — SHA-in-main,
the INITIATED self-deploy marker, the coverage-ratchet CAS — NOT from a new
recovery brain). Returns the number of rows cleared. never-raise -> 0 on error.
"""
if not _enabled():
return 0
try:
conn = db.get_db()
try:
cur = conn.execute(
"DELETE FROM transition_lease "
"WHERE owner_boot_id IS NULL OR owner_boot_id != ?",
(_BOOT_ID,),
)
conn.commit()
n = cur.rowcount or 0
finally:
conn.close()
if n:
_bump("stale_reclaims_total", n)
logger.warning(
"transition_lease.recover_on_startup: cleared %d stale lease(s) from a "
"previous boot", n,
)
# FR-6 / AC-12: a forced/stale reclaim is observable (Telegram alert). A
# restart-time bulk reclaim is summarised (per-task clickable alerts come
# from the operator endpoint). best-effort, never-raise.
try:
from .notifications import send_telegram
send_telegram(
f"♻️ Transition-lease recovery: сброшено {n} устаревш"
f"(ий/их) lease после рестарта (переходы будут пере-исполнены "
f"последовательно)."
)
except Exception: # noqa: BLE001 - alert is best-effort
pass
return n
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("transition_lease.recover_on_startup error: %s", e)
return 0
# ---------------------------------------------------------------------------
# Stage write — compare-and-swap wrapper (ADR-001 D5 / FR-2)
# ---------------------------------------------------------------------------
def commit_stage_cas(task_id, expected_stage: str, new_stage: str, repo: str) -> bool:
"""Write the task stage under the ORCH-114 contract. Returns True iff the write
was applied (and the caller may proceed with side effects), False iff the writer
lost the CAS race (the caller MUST abort WITHOUT side effects).
* ``applies(repo)`` False (kill-switch off / repo out of scope) -> the prior
unconditional ``update_task_stage`` (byte-for-byte) -> always True. Not wrapped
in a swallowing try, so a DB error propagates EXACTLY as before ORCH-114.
* ``applies(repo)`` True -> ``update_task_stage_cas`` (expected-stage compare-and-
swap). A lost race -> False (no side effect). never-raise on the CAS path: a DB
error -> False (abort the write; never a blind overwrite).
"""
try:
scoped = applies(repo)
except Exception: # noqa: BLE001 - applies already never-raises; belt-and-suspenders
scoped = False
if not scoped:
db.update_task_stage(task_id, new_stage)
return True
try:
won = db.update_task_stage_cas(task_id, expected_stage, new_stage)
if not won:
_bump("cas_lost_total")
return won
except Exception as e: # noqa: BLE001 - abort the write (no blind overwrite)
logger.warning(
"transition_lease.commit_stage_cas error for task %s (%s->%s): %s",
task_id, expected_stage, new_stage, e,
)
return False
# ---------------------------------------------------------------------------
# Observability snapshot for GET /queue (ADR-001 D10 / FR-6)
# ---------------------------------------------------------------------------
def snapshot() -> dict:
"""Read-only transition-lease summary for GET /queue. Additive block; existing
/queue keys untouched. never-raise -> a minimal dict on error.
"""
try:
enabled = _enabled()
except Exception: # noqa: BLE001
enabled = False
try:
repos_cfg = getattr(settings, "transition_lease_repos", "") or ""
except Exception: # noqa: BLE001
repos_cfg = ""
holders: list[dict] = []
try:
conn = db.get_db()
try:
rows = conn.execute(
"SELECT task_id, owner, owner_pid, owner_boot_id, run_id, stage, "
"acquired_at, "
"CAST(strftime('%s','now') - strftime('%s', acquired_at) AS INTEGER) "
" AS age_s "
"FROM transition_lease ORDER BY task_id"
).fetchall()
finally:
conn.close()
for r in rows:
holders.append({
"task_id": r["task_id"],
"owner": r["owner"],
"stage": r["stage"],
"run_id": r["run_id"],
"age_s": r["age_s"],
"live": _row_is_live(r["owner_boot_id"], r["owner_pid"]),
})
except Exception as e: # noqa: BLE001 - never break /queue
logger.warning("transition_lease.snapshot error: %s", e)
try:
with _LOCK:
counters = dict(_COUNTERS)
except Exception: # noqa: BLE001
counters = {}
return {
"enabled": enabled,
"repos": repos_cfg,
"boot_id": _BOOT_ID,
"active": len(holders),
"holders": holders,
"counters": counters,
}

View File

@@ -13,7 +13,6 @@ from ..config import settings
from ..db import (
get_db,
get_task_by_repo_branch,
update_task_stage,
enqueue_job,
insert_event_dedup,
)
@@ -24,6 +23,7 @@ from ..notifications import notify_stage_change, notify_qg_failure, notify_error
from ..agents.launcher import launcher
from ..plane_sync import notify_stage_change as plane_notify_stage
from ..projects import get_project_by_repo
from .. import transition_lease
logger = logging.getLogger("orchestrator.webhooks.gitea")
@@ -124,18 +124,25 @@ async def handle_push(payload: dict):
if has_adr:
# Advance to development
next_stage = "development"
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
# ORCH-114 (adr-0045 / D4, TR-4): this push-driven advance writes the stage
# in BYPASS of advance_stage -> route through the expected-stage CAS so it
# cannot clobber a concurrent authoritative write; a lost race skips the
# notify + enqueue (no duplicate agent). Kill-switch off -> unconditional
# (byte-for-byte).
if transition_lease.commit_stage_cas(task_id, current_stage, next_stage, repo_name):
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: push triggered {current_stage}{next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: push triggered {current_stage}{next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
logger.info(f"Task {task_id}: push-advance stage-CAS lost ({current_stage}->{next_stage}); another writer moved it")
elif current_stage == "development":
# Source files pushed — just log, wait for CI
@@ -239,18 +246,22 @@ async def handle_ci_status(payload: dict):
passed, reason = check_ci_green(repo_name, branch)
if passed:
next_stage = "review"
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
# ORCH-114 (adr-0045 / D4, TR-4): CI-green advance in BYPASS of
# advance_stage -> expected-stage CAS; a lost race skips notify + enqueue.
if transition_lease.commit_stage_cas(task_id, current_stage, next_stage, repo_name):
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
logger.info(f"Task {task_id}: CI-green stage-CAS lost ({current_stage}->{next_stage}); another writer moved it")
else:
notify_qg_failure(task_id, current_stage, "check_ci_green", reason)
@@ -330,18 +341,22 @@ async def handle_pr(payload: dict):
passed, reason = check_review_approved(repo_name, pr_number)
if passed:
next_stage = "testing"
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
# ORCH-114 (adr-0045 / D4, TR-4): PR-approved advance in BYPASS of
# advance_stage -> expected-stage CAS; a lost race skips notify + enqueue.
if transition_lease.commit_stage_cas(task_id, current_stage, next_stage, repo_name):
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
logger.info(f"Task {task_id}: PR-approved stage-CAS lost ({current_stage}->{next_stage}); another writer moved it")
else:
notify_qg_failure(task_id, current_stage, "check_review_approved", reason)
@@ -355,18 +370,24 @@ async def handle_pr(payload: dict):
conn.close()
if retry_count < MAX_DEV_RETRIES:
# Back to development, relaunch developer
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
try:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n"
f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
)
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to relaunch developer: {e}")
# Back to development, relaunch developer.
# ORCH-114 (adr-0045 / D4, TR-4): REQUEST_CHANGES rollback writes the
# stage in BYPASS of advance_stage -> expected-stage CAS so it cannot
# clobber a concurrent authoritative write (e.g. a task that already
# advanced); a lost race skips the rollback + developer relaunch.
if transition_lease.commit_stage_cas(task_id, current_stage, "development", repo_name):
notify_stage_change(task_id, current_stage, "development")
try:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n"
f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
)
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to relaunch developer: {e}")
else:
logger.info(f"Task {task_id}: REQUEST_CHANGES rollback stage-CAS lost ({current_stage}->development); another writer moved it")
else:
notify_error(task_id, f"Max developer retries ({MAX_DEV_RETRIES}) reached, escalating")
logger.error(f"Task {task_id}: max retries reached, needs manual intervention")
@@ -395,6 +416,11 @@ async def handle_pr(payload: dict):
f"deployer verdict (check_deploy_status), ignoring merge-driven done."
)
return
update_task_stage(task_id, "done")
notify_stage_change(task_id, current_stage, "done")
logger.info(f"Task {task_id}: PR merged, stage → done")
# ORCH-114 (adr-0045 / D4, TR-4): merge-driven done writes the stage in BYPASS
# of advance_stage -> expected-stage CAS so a concurrent authoritative writer
# is not clobbered; a lost race skips the (idempotent) notify.
if transition_lease.commit_stage_cas(task_id, current_stage, "done", repo_name):
notify_stage_change(task_id, current_stage, "done")
logger.info(f"Task {task_id}: PR merged, stage → done")
else:
logger.info(f"Task {task_id}: merge-driven done stage-CAS lost ({current_stage}->done); another writer moved it")

View File

@@ -14,7 +14,6 @@ from ..db import (
get_task_by_plane_id,
get_next_work_item_id,
ensure_unique_work_item_id,
update_task_stage,
enqueue_job,
insert_event_dedup,
create_task_atomic,
@@ -35,6 +34,7 @@ from ..projects import (
get_project_by_repo,
known_plane_project_ids,
)
from .. import transition_lease
logger = logging.getLogger("orchestrator.webhooks.plane")
@@ -803,7 +803,17 @@ async def _rollback_stage(
if not prev_stage:
logger.info(f"Task {task_id}: rejected at {current_stage} but no previous stage")
return
update_task_stage(task_id, prev_stage)
# ORCH-114 (adr-0045 / D4, TR-4): this Rejected-rollback writes the stage in
# BYPASS of advance_stage. Route it through the expected-stage CAS so it can never
# clobber an authoritative write made by a concurrent owner (e.g. a deploy->done
# finalizer) — a lost race aborts the rollback WITHOUT its side effects. Kill-switch
# off / repo out of scope -> unconditional update (byte-for-byte).
if not transition_lease.commit_stage_cas(task_id, current_stage, prev_stage, repo):
logger.info(
f"Task {task_id}: rollback stage-CAS lost ({current_stage}->{prev_stage}) "
f"— task already moved by another writer; skipping rollback"
)
return
notify_stage_change(task_id, current_stage, prev_stage)
# Feature 3: plane_notify_stage moves the board to the prev stage's status.
plane_notify_stage(work_item_id, current_stage, prev_stage)
@@ -857,10 +867,25 @@ async def _try_advance_stage(
advance_stage). It is True ONLY on the "Confirm Deploy" path
(handle_confirm_deploy) and gates Phase B of the self-hosting prod deploy; the
plain Approved path (handle_verdict) leaves it at the default False.
ORCH-114 (adr-0045 / FR-5, AC-8): if a live actor already owns this task's
side-effectful transition (transition-lease active), DEFER — do not re-enter the
transition in parallel. The late legitimate signal is not lost: once the owner
releases (or dies and the reaper reclaims), a re-approve / the reconciler re-drives
it, or advance_stage becomes an idempotent no-op against the authoritative facts
(SHA-in-main / INITIATED). never raises; no-op when the lease is disabled / repo
out of scope.
"""
import asyncio
from ..stage_engine import advance_stage
if transition_lease.is_held_by_live_owner(task_id):
logger.info(
f"Task {task_id}: transition-lease active — deferring webhook advance "
f"from {current_stage} (confirm_deploy={confirm_deploy})"
)
return
await asyncio.to_thread(
advance_stage,
task_id,

View File

@@ -133,3 +133,28 @@ def _disable_merge_verify(monkeypatch):
_cfg.settings, "merge_verify_autocreate_pr_enabled", False, raising=False
)
yield
@pytest.fixture(autouse=True)
def _disable_transition_lease(monkeypatch):
"""ORCH-114: disable the transition-ownership lease + expected-stage CAS by
default in ALL tests.
The prod default is ON for the self-hosting repo (``transition_lease_enabled=True``,
``transition_lease_repos=""`` -> orchestrator only). Left ON, the expected-stage
CAS (``update_task_stage_cas``) would change the stage-write semantics for every
existing test that calls ``advance_stage`` / the gitea-plane webhook handlers with
repo ``orchestrator`` (a CAS write needs the task row to actually BE at the
expected stage; the bare ``update_task_stage`` did not). We therefore default the
kill-switch OFF for the whole suite (mirrors ``_disable_merge_verify`` /
``_disable_*`` precedent), which makes ``commit_stage_cas`` degenerate to the prior
unconditional ``update_task_stage`` and the lease inert -> the existing 2000+ tests
stay byte-for-byte (AC-9). The dedicated ORCH-114 test module
(``test_orch114_transition_ownership.py``) re-enables it via its own monkeypatch,
scoping the feature ON to just those tests.
"""
from src import config as _cfg
monkeypatch.setattr(
_cfg.settings, "transition_lease_enabled", False, raising=False
)
yield

View File

@@ -0,0 +1,645 @@
"""ORCH-114 (adr-0045): durable transition-ownership lease + expected-stage CAS.
Covers FR-1…FR-7 / AC-1…AC-13 (TC-01..TC-14, see 04-test-plan.yaml). The mechanism
prevents a concurrent OR post-restart re-entry into a side-effectful stage transition
(``deploy-staging -> deploy`` sub-gates, ``deploy -> done`` merge-verify, Phase C
finalize) from re-applying an irreversible effect or producing a contradictory
rollback↔done — incident ORCH-111.
No network / no real git / no docker / no prod: the heavy edge sub-gates and the
finalization handlers are stubbed with call-counters and the DB is driven directly
(the same convention as test_orch113_reaper_finalizer_liveness.py).
The autouse conftest fixture defaults the kill-switch OFF for the whole suite; this
module re-enables it per test (``_enable``) so the feature is scoped ON here.
"""
import inspect
import os
import tempfile
import pytest
os.environ.setdefault("ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_orch114.db"))
os.environ.setdefault("ORCH_REPOS_DIR", tempfile.gettempdir())
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db
from src.db import init_db, get_db, get_job, update_task_stage_cas
import src.transition_lease as tl
import src.stage_engine as se
from src.job_reaper import JobReaper
_REPO = "orchestrator" # self-hosting -> transition_lease.applies(repo) is True
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "orch114.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
init_db()
# Reset the leaf's in-memory counters between tests (process-local module state).
with tl._LOCK:
for k in tl._COUNTERS:
tl._COUNTERS[k] = 0
yield
def _enable(monkeypatch, repos: str = ""):
"""Turn the ORCH-114 mechanism ON (it is OFF by default via conftest)."""
monkeypatch.setattr(db.settings, "transition_lease_enabled", True, raising=False)
monkeypatch.setattr(db.settings, "transition_lease_repos", repos, raising=False)
def _disable(monkeypatch):
monkeypatch.setattr(db.settings, "transition_lease_enabled", False, raising=False)
# --- helpers ----------------------------------------------------------------
def _make_task(stage="deploy-staging", repo=_REPO, branch="feature/orch114",
work_item_id="ORCH-114"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, branch, stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _task_stage(tid):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (tid,)).fetchone()
conn.close()
return row[0] if row else None
def _make_running_job(agent="deployer", repo=_REPO, task_id=None, pid=None,
age_s=0, attempts=0, max_attempts=2, run_id=None,
exit_code=0, finished_age_s=600):
conn = get_db()
if run_id is None and exit_code is not None:
cur = conn.execute(
"INSERT INTO agent_runs (task_id, agent, finished_at, exit_code) "
"VALUES (?, ?, datetime('now', ?), ?)",
(task_id, agent, f"-{int(finished_age_s)} seconds", exit_code),
)
run_id = cur.lastrowid
cur = conn.execute(
"INSERT INTO jobs (agent, repo, task_id, status, attempts, max_attempts, "
"run_id, pid, started_at) "
"VALUES (?, ?, ?, 'running', ?, ?, ?, ?, datetime('now', ?))",
(agent, repo, task_id, attempts, max_attempts, run_id, pid,
f"-{int(age_s)} seconds"),
)
job_id = cur.lastrowid
conn.commit()
conn.close()
return job_id
def _stub_side_effects(monkeypatch):
"""Patch the deploy-staging edge sub-gates + Phase A with call-counters.
Each sub-gate returns False (no intervention) so advance_stage proceeds to Phase
A; Phase A is stubbed to a counter that does NOT touch the network/prod. Returns
the shared ``counts`` dict.
"""
counts = {"security": 0, "merge": 0, "coverage": 0, "image": 0, "phase_a": 0}
def _mk(key):
def _fake(task_id, current_stage, repo, work_item_id, branch, agent, result):
counts[key] += 1
return False # no intervention -> advance continues
return _fake
monkeypatch.setattr(se, "_handle_security_gate", _mk("security"))
monkeypatch.setattr(se, "_handle_merge_gate", _mk("merge"))
monkeypatch.setattr(se, "_handle_coverage_gate", _mk("coverage"))
monkeypatch.setattr(se, "_handle_image_freshness", _mk("image"))
def _fake_phase_a(task_id, current_stage, repo, work_item_id, branch, result):
counts["phase_a"] += 1
result.advanced = True
result.to_stage = "deploy"
monkeypatch.setattr(se, "_handle_self_deploy_phase_a", _fake_phase_a)
# The QG (check_staging_status) is the entry gate; force it green so we reach the
# side-effectful sub-gates instead of rolling back.
monkeypatch.setattr(se, "_run_qg", lambda *a, **k: (True, "ok"))
return counts
# ===========================================================================
# TC-01 — MANDATORY regression: no double effect on concurrent entry (AC-1)
# ===========================================================================
def test_tc01_concurrent_entry_no_double_effect(monkeypatch):
_enable(monkeypatch)
counts = _stub_side_effects(monkeypatch)
tid = _make_task(stage="deploy-staging")
# Actor A — a LIVE finalizer — owns the transition (acquired on entry).
assert tl.acquire(tid, "monitor", stage="deploy-staging") is True
# Actor B (reaper/reconciler/webhook re-drive) enters the SAME transition.
res_b = se.advance_stage(tid, "deploy-staging", _REPO, "ORCH-114", "feature/orch114",
finished_agent=None)
# Busy -> deferred WITHOUT any side effect, stage unchanged.
assert res_b.note == "transition-lease-busy"
assert res_b.advanced is False
assert counts == {"security": 0, "merge": 0, "coverage": 0, "image": 0, "phase_a": 0}
assert _task_stage(tid) == "deploy-staging"
# The owner finishes (release), then drives the transition exactly once.
tl.release(tid, force=True)
res_a = se.advance_stage(tid, "deploy-staging", _REPO, "ORCH-114", "feature/orch114",
finished_agent="deployer")
# Each side-effectful step ran EXACTLY once (one consistent outcome).
assert counts == {"security": 1, "merge": 1, "coverage": 1, "image": 1, "phase_a": 1}
assert res_a.advanced is True
def test_tc01_red_before_fix_demonstration(monkeypatch):
"""With the kill-switch OFF (== before ORCH-114) the second actor is NOT blocked
and re-runs every sub-gate -> the double-effect bug. This is the RED that the
lease turns GREEN."""
_disable(monkeypatch)
counts = _stub_side_effects(monkeypatch)
tid = _make_task(stage="deploy-staging")
# acquire is a no-op when disabled -> no owner-exclusion.
assert tl.acquire(tid, "monitor", stage="deploy-staging") is True
se.advance_stage(tid, "deploy-staging", _REPO, "ORCH-114", "feature/orch114",
finished_agent=None)
# Without the lease the "second" actor ran the side effects (the bug).
assert counts["merge"] == 1 and counts["security"] == 1
# ===========================================================================
# TC-02 — expected-stage CAS on the stage write (AC-2)
# ===========================================================================
def test_tc02_cas_first_wins_second_lost(monkeypatch):
tid = _make_task(stage="review")
# First writer with the correct expectation wins.
assert update_task_stage_cas(tid, "review", "testing") is True
assert _task_stage(tid) == "testing"
# Second writer with the now-stale expectation loses; stage is NOT re-mutated.
assert update_task_stage_cas(tid, "review", "development") is False
assert _task_stage(tid) == "testing"
def test_tc02_commit_cas_killswitch_off_unconditional(monkeypatch):
"""Kill-switch off / repo out of scope -> commit_stage_cas degenerates to the
prior unconditional update_task_stage (byte-for-byte: the expected_stage is
ignored, the write always lands)."""
_disable(monkeypatch)
tid = _make_task(stage="review")
# Even a WRONG expected stage writes unconditionally when the mechanism is off.
assert tl.commit_stage_cas(tid, "totally-wrong", "testing", _REPO) is True
assert _task_stage(tid) == "testing"
def test_tc02_commit_cas_enabled_does_real_cas(monkeypatch):
_enable(monkeypatch)
tid = _make_task(stage="review")
# Wrong expectation -> CAS lost, no write.
assert tl.commit_stage_cas(tid, "wrong", "testing", _REPO) is False
assert _task_stage(tid) == "review"
# Correct expectation -> CAS won.
assert tl.commit_stage_cas(tid, "review", "testing", _REPO) is True
assert _task_stage(tid) == "testing"
# ===========================================================================
# TC-03 — ownership lifecycle: acquire / release / reclaim (AC-3)
# ===========================================================================
def test_tc03_acquire_release_visible_durably(monkeypatch):
_enable(monkeypatch)
tid = _make_task()
assert tl.is_held_by_live_owner(tid) is False
assert tl.acquire(tid, "monitor", run_id=7, stage="deploy-staging") is True
assert tl.is_held_by_live_owner(tid) is True
# Durable: a fresh DB read (snapshot) sees the holder.
snap = tl.snapshot()
assert snap["active"] == 1
assert snap["holders"][0]["task_id"] == tid
assert snap["holders"][0]["owner"] == "monitor"
assert snap["holders"][0]["live"] is True
# A second acquire by another actor is busy while the live owner holds it.
assert tl.acquire(tid, "reaper", stage="deploy-staging") is False
tl.release(tid, force=True)
assert tl.is_held_by_live_owner(tid) is False
def test_tc03_release_in_finally_on_exception(monkeypatch):
"""advance_stage must release the lease even when a sub-gate raises (try/finally)."""
_enable(monkeypatch)
monkeypatch.setattr(se, "_run_qg", lambda *a, **k: (True, "ok"))
def _boom(*a, **k):
raise RuntimeError("sub-gate exploded")
monkeypatch.setattr(se, "_handle_security_gate", _boom)
tid = _make_task(stage="deploy-staging")
res = se.advance_stage(tid, "deploy-staging", _REPO, "ORCH-114", "feature/orch114",
finished_agent="deployer")
# The outer except swallowed the error; the finally released the lease.
assert res.advanced is False
assert tl.is_held_by_live_owner(tid) is False
# ===========================================================================
# TC-04 — reaper defers on a live lease, cross-path (beyond deploy-staging) (AC-4)
# ===========================================================================
def test_tc04_reaper_defers_on_deploy_edge(monkeypatch):
"""ORCH-114 generalises ORCH-113 beyond Tier-2/deploy-staging: a live lease on the
deploy->done edge also defers the reaper."""
_enable(monkeypatch)
monkeypatch.setattr(JobReaper, "_gate_is_green",
lambda self, stage, job, branch, wid: True)
calls = []
import src.agents.launcher as L
monkeypatch.setattr(L.launcher, "_try_advance_stage",
lambda *a, **k: calls.append(a))
tid = _make_task(stage="deploy") # NOT deploy-staging -> proves generalisation
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=600)
assert tl.acquire(tid, "finalizer", stage="deploy") is True
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "running" # not reaped
assert calls == [] # no second advance
assert r.finalizer_defers_total == 1
# ===========================================================================
# TC-05 — reaper reaps a dead/stale lease in bounded time (Tier-3) (AC-5)
# ===========================================================================
def test_tc05_tier3_backstop_reaps_and_releases_lease(monkeypatch):
_enable(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_max_running_s", 1000)
tid = _make_task(stage="deploy")
jid = _make_running_job(task_id=tid, exit_code=0, finished_age_s=10,
age_s=2000, attempts=0, max_attempts=2)
assert tl.acquire(tid, "finalizer", stage="deploy") is True
r = JobReaper()
r.reap_once()
# Backstop reaps regardless of the marker; the lease is force-released with the job.
assert get_job(jid)["status"] == "queued"
assert tl.is_held_by_live_owner(tid) is False
def test_tc05_reclaim_if_stale_removes_dead_boot_row(monkeypatch):
_enable(monkeypatch)
tid = _make_task()
# A row from a PREVIOUS process boot (a dead owner) is stale.
conn = get_db()
conn.execute(
"INSERT INTO transition_lease (task_id, owner, owner_pid, owner_boot_id) "
"VALUES (?, 'monitor', 1, 'OLD-DEAD-BOOT')",
(tid,),
)
conn.commit()
conn.close()
assert tl.is_held_by_live_owner(tid) is False # stale -> not live
assert tl.reclaim_if_stale(tid) is True
assert tl.snapshot()["active"] == 0
def test_tc05_budget_invariant_preserved():
"""The lease introduced no new TTL; the cross-cutting reaper budget is untouched."""
s = db.settings
assert s.reaper_max_running_s == 5400
assert s.reaper_finalize_grace_s == 300
sigma = s.merge_retest_timeout_s + s.coverage_run_timeout_s
assert s.reaper_max_running_s > sigma + s.reaper_finalize_grace_s
# ===========================================================================
# TC-06 — smart restart recovery (AC-6)
# ===========================================================================
def test_tc06_recover_on_startup_clears_previous_boot_lease(monkeypatch):
_enable(monkeypatch)
tid = _make_task(stage="deploy")
# Simulate a process that died MID-finalization: a lease row with a DIFFERENT boot.
conn = get_db()
conn.execute(
"INSERT INTO transition_lease (task_id, owner, owner_pid, owner_boot_id) "
"VALUES (?, 'finalizer', 999999, 'PREVIOUS-BOOT')",
(tid,),
)
conn.commit()
conn.close()
# Before recovery the row is stale (boot mismatch) -> not a live owner.
assert tl.is_held_by_live_owner(tid) is False
# Startup recovery (after requeue_running_jobs) clears it deterministically.
assert tl.recover_on_startup() == 1
assert tl.snapshot()["active"] == 0
# The requeued job can now re-drive the transition cleanly (no stale owner blocks).
assert tl.acquire(tid, "monitor", stage="deploy") is True
def test_tc06_recovery_does_not_touch_current_boot_lease(monkeypatch):
"""A lease this very process holds must NOT be cleared by recovery (only previous
boots are stale)."""
_enable(monkeypatch)
tid = _make_task()
assert tl.acquire(tid, "monitor", stage="deploy-staging") is True
assert tl.recover_on_startup() == 0 # current-boot lease is live, kept
assert tl.is_held_by_live_owner(tid) is True
# ===========================================================================
# TC-07 — reconciler F-1 defers on an active lease (AC-7)
# ===========================================================================
def test_tc07_reconciler_f1_defers(monkeypatch):
_enable(monkeypatch)
from src.reconciler import Reconciler
import src.reconciler as rec
# Spy on the advance path; it must NOT be called while the lease is held.
advanced = []
monkeypatch.setattr(rec, "advance_if_gate_passed",
lambda *a, **k: advanced.append(a))
# Pass the cheap local guards so we reach the lease check.
monkeypatch.setattr(rec, "has_active_job_for_task", lambda *a, **k: False)
monkeypatch.setattr(rec, "developer_retry_count", lambda *a, **k: 0)
monkeypatch.setattr(rec, "MAX_DEVELOPER_RETRIES", 3, raising=False)
monkeypatch.setattr(rec, "grace_for_stage", lambda *a, **k: 0)
r = Reconciler()
monkeypatch.setattr(r, "_resolve_issue_status", lambda task: ({}, {}, None))
monkeypatch.setattr(r, "_is_terminal_state", lambda *a, **k: False)
monkeypatch.setattr(r, "_is_blocked_or_needs_input", lambda *a, **k: False)
tid = _make_task(stage="review")
assert tl.acquire(tid, "monitor", stage="review") is True
r._reconcile_gate_task({
"id": tid, "stage": "review", "repo": _REPO,
"work_item_id": "ORCH-114", "branch": "feature/orch114", "age_s": 10_000,
})
assert advanced == [] # F-1 deferred
assert r.transition_lease_defers_total == 1
# ===========================================================================
# TC-08 — webhook path defers on an active lease (AC-8)
# ===========================================================================
def test_tc08_plane_webhook_defers(monkeypatch):
_enable(monkeypatch)
import asyncio
from src.webhooks.plane import _try_advance_stage
called = []
monkeypatch.setattr(se, "advance_stage", lambda *a, **k: called.append(a))
tid = _make_task(stage="deploy")
assert tl.acquire(tid, "finalizer", stage="deploy") is True
# Lease held -> the webhook advance is deferred (advance_stage NOT invoked).
asyncio.run(_try_advance_stage(tid, "deploy", _REPO, "ORCH-114", "feature/orch114"))
assert called == []
# The late legitimate signal is not lost: after release it advances.
tl.release(tid, force=True)
asyncio.run(_try_advance_stage(tid, "deploy", _REPO, "ORCH-114", "feature/orch114"))
assert len(called) == 1
# ===========================================================================
# TC-09 — kill-switch off -> byte-for-byte prior (AC-9)
# ===========================================================================
def test_tc09_killswitch_off_inert(monkeypatch):
_disable(monkeypatch)
tid = _make_task(stage="review")
# Lease neither written nor read.
assert tl.acquire(tid, "monitor", stage="review") is True # no-op True
assert tl.is_held_by_live_owner(tid) is False
assert tl.snapshot()["enabled"] is False
assert tl.snapshot()["active"] == 0
# CAS degenerates to the unconditional update (expected ignored).
assert tl.commit_stage_cas(tid, "anything", "testing", _REPO) is True
assert _task_stage(tid) == "testing"
def test_tc09_applies_scope(monkeypatch):
_enable(monkeypatch) # empty repos CSV -> self-hosting only
assert tl.applies("orchestrator") is True
assert tl.applies("enduro-trails") is False
# Explicit CSV scope.
_enable(monkeypatch, repos="enduro-trails")
assert tl.applies("enduro-trails") is True
assert tl.applies("orchestrator") is False
# ===========================================================================
# TC-10 — never-raise + fail-open (hot path) / fail-closed (prod safety) (AC-10)
# ===========================================================================
def test_tc10_never_raise_on_db_error(monkeypatch):
_enable(monkeypatch)
def _boom(*a, **k):
raise RuntimeError("DB exploded")
monkeypatch.setattr(tl.db, "get_db", _boom)
# acquire -> fail-CLOSED (busy) so a side-effectful caller DEFERS (no double effect).
assert tl.acquire(123, "monitor", stage="deploy") is False
# is_held_by_live_owner -> fail-CLOSED (treat as held -> conservative defer).
assert tl.is_held_by_live_owner(123) is True
# release / reclaim / recover / snapshot never raise.
tl.release(123, force=True)
assert tl.reclaim_if_stale(123) is False
assert tl.recover_on_startup() == 0
assert isinstance(tl.snapshot(), dict)
def test_tc10_commit_cas_error_aborts_write(monkeypatch):
_enable(monkeypatch)
monkeypatch.setattr(tl.db, "update_task_stage_cas",
lambda *a, **k: (_ for _ in ()).throw(RuntimeError("boom")))
# CAS error -> abort the write (never a blind overwrite) -> False, no raise.
assert tl.commit_stage_cas(1, "review", "testing", _REPO) is False
def test_tc10_hot_claim_path_not_touched():
"""AC-8 ORCH-088 intact: the hot claim path does NOT consult the transition-lease,
so a lease bug can never wedge the shared queue (fail-open by construction)."""
src_claim = inspect.getsource(db.claim_next_job)
assert "transition_lease" not in src_claim
# ===========================================================================
# TC-11 — structural audit: pipeline invariants untouched, storage additive (AC-11)
# ===========================================================================
def test_tc11_stage_transitions_and_qg_untouched():
from src.stages import STAGE_TRANSITIONS
from src.qg.checks import QG_CHECKS
# The canonical edge order is intact (no new stages/edges).
assert STAGE_TRANSITIONS["deploy-staging"]["next"] == "deploy"
assert STAGE_TRANSITIONS["deploy-staging"]["qg"] == "check_staging_status"
assert STAGE_TRANSITIONS["deploy"]["next"] == "done"
# The QG registry still carries the machine-verdict gates byte-for-byte.
for name in ("check_staging_status", "check_deploy_status", "check_coverage_gate"):
assert name in QG_CHECKS
def test_tc11_storage_additive_existing_tables_unchanged():
conn = get_db()
# The additive table exists (CREATE TABLE IF NOT EXISTS).
row = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='transition_lease'"
).fetchone()
assert row is not None
# `tasks` schema is byte-for-byte: NO epoch/version column was added (ADR D2).
cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()}
conn.close()
assert "epoch" not in cols and "version" not in cols
assert {"id", "stage", "repo", "branch", "work_item_id"} <= cols
def test_tc11_bypass_paths_use_cas_not_unconditional_write():
"""The 6 bypass writers (gitea x5 + plane rollback) + the main advance write route
through commit_stage_cas; none does an unconditional update_task_stage on the
concurrent path (TR-4)."""
import src.webhooks.gitea as g
import src.webhooks.plane as p
gsrc = inspect.getsource(g)
assert "commit_stage_cas" in gsrc
# The gitea handlers no longer import / call the bare update_task_stage.
assert "update_task_stage(" not in gsrc
psrc = inspect.getsource(p._rollback_stage)
assert "commit_stage_cas" in psrc
assert "update_task_stage(" not in psrc
# The main advance write uses CAS.
asrc = inspect.getsource(se.advance_stage)
assert "commit_stage_cas(task_id, current_stage, next_stage" in asrc
# ===========================================================================
# TC-12 — observability (AC-12)
# ===========================================================================
def test_tc12_snapshot_shape_and_counters(monkeypatch):
_enable(monkeypatch)
tid = _make_task(stage="deploy-staging")
tl.acquire(tid, "monitor", run_id=3, stage="deploy-staging")
snap = tl.snapshot()
assert snap["enabled"] is True
assert snap["active"] == 1
assert set(snap.keys()) >= {"enabled", "repos", "boot_id", "active", "holders", "counters"}
h = snap["holders"][0]
assert {"task_id", "owner", "stage", "age_s", "live"} <= set(h.keys())
assert snap["counters"]["acquired_total"] >= 1
def test_tc12_forced_reclaim_emits_telegram(monkeypatch):
_enable(monkeypatch)
sent = []
monkeypatch.setattr("src.notifications.send_telegram",
lambda *a, **k: sent.append(a), raising=False)
tid = _make_task()
# A previous-boot (stale) lease that recovery force-reclaims at startup.
conn = get_db()
conn.execute(
"INSERT INTO transition_lease (task_id, owner, owner_pid, owner_boot_id) "
"VALUES (?, 'finalizer', 1, 'PREV-BOOT')",
(tid,),
)
conn.commit()
conn.close()
assert tl.recover_on_startup() == 1
assert len(sent) == 1 # forced/stale reclaim is observable via Telegram
def test_tc12_queue_block_wired():
"""GET /queue carries the additive transition_lease block (read-only)."""
import src.main as main_mod
qsrc = inspect.getsource(main_mod.queue)
assert '"transition_lease": transition_lease.snapshot()' in qsrc
# ===========================================================================
# TC-13 — self-hosting safety (AC-13)
# ===========================================================================
def _code_only(module) -> str:
"""Return the module source with comments AND string literals stripped, so a
structural audit scans EXECUTABLE code only (not docstring prose). Mirrors the
tokenize approach of tests/test_no_host_hardcodes.py."""
import io
import tokenize
src = inspect.getsource(module)
out = []
for tok in tokenize.generate_tokens(io.StringIO(src).readline):
if tok.type in (tokenize.COMMENT, tokenize.STRING):
continue
out.append(tok.string)
return " ".join(out)
def test_tc13_leaf_has_no_dangerous_side_effects():
"""The ownership mechanism never restarts the prod container, never pushes /
force-pushes main, never spawns a subprocess and never touches the detached
deploy process. Scans EXECUTABLE code only (docstring prose is excluded)."""
code = _code_only(tl)
forbidden = ["subprocess", "system", "docker", "force_push", "Popen",
"os.kill", "restart", "rmtree", "remove"]
for token in forbidden:
assert token not in code, f"transition_lease must not reference {token!r} in code"
def test_tc13_leaf_imports_only_safe_modules():
"""The leaf imports only db + config at module load (lazily merge_gate / qg /
notifications) — it never imports stage_engine / launcher / self_deploy."""
src_tl = inspect.getsource(tl)
assert "import stage_engine" not in src_tl
assert "from .stage_engine" not in src_tl
assert "import launcher" not in src_tl
assert "self_deploy" not in src_tl
# ===========================================================================
# TC-14 — full pipeline happy-path with the mechanism ON (BR-8)
# ===========================================================================
def test_tc14_single_actor_happy_path_one_set_of_effects(monkeypatch):
"""A single advance on deploy-staging with the mechanism ON runs each sub-gate
exactly once and leaves NO lease behind (clean acquire+release)."""
_enable(monkeypatch)
counts = _stub_side_effects(monkeypatch)
tid = _make_task(stage="deploy-staging")
res = se.advance_stage(tid, "deploy-staging", _REPO, "ORCH-114", "feature/orch114",
finished_agent="deployer")
assert counts == {"security": 1, "merge": 1, "coverage": 1, "image": 1, "phase_a": 1}
assert res.advanced is True
# The lease was released in the finally (no leak).
assert tl.is_held_by_live_owner(tid) is False
def test_tc14_deploy_to_done_finalize_advances_via_cas(monkeypatch):
"""The deploy->done finalize path (Phase C) reaches the terminal write via the CAS
and releases the lease (single consistent done)."""
_enable(monkeypatch)
monkeypatch.setattr(se, "_run_qg", lambda *a, **k: (True, "ok"))
# merge-verify CONFIRMED (no HOLD) so advance proceeds to done.
monkeypatch.setattr(se, "_handle_merge_verify", lambda *a, **k: False)
# Avoid post-deploy / plane side effects on the done write.
monkeypatch.setattr(se.post_deploy, "post_deploy_applies", lambda *a, **k: False)
monkeypatch.setattr(se, "set_issue_done", lambda *a, **k: None, raising=False)
monkeypatch.setattr(se.merge_gate, "release_merge_lease", lambda *a, **k: None)
monkeypatch.setattr(se, "enqueue_job", lambda *a, **k: 1, raising=False)
tid = _make_task(stage="deploy")
res = se.advance_stage(tid, "deploy", _REPO, "ORCH-114", "feature/orch114",
finished_agent="deployer")
assert res.advanced is True
assert _task_stage(tid) == "done"
assert tl.is_held_by_live_owner(tid) is False

View File

@@ -396,15 +396,19 @@ def _mock_db_with_retry_count(count):
@patch("src.webhooks.gitea.notify_error")
@patch("src.webhooks.gitea.notify_qg_failure")
@patch("src.webhooks.gitea.enqueue_job")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.transition_lease.commit_stage_cas")
@patch("src.webhooks.gitea.get_db")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_ci_failure_development_retries_developer_under_limit(
mock_proj, mock_task, mock_get_db, mock_update_stage,
mock_proj, mock_task, mock_get_db, mock_commit_cas,
mock_enqueue, mock_qg, mock_err,
):
"""retry_count < MAX_DEV_RETRIES → relaunch developer, stage untouched."""
"""retry_count < MAX_DEV_RETRIES → relaunch developer, stage untouched.
ORCH-114: the CI-failure path never writes the stage (no advance) -> the
expected-stage CAS write helper is never invoked.
"""
from src.webhooks.gitea import handle_ci_status
mock_proj.return_value = {"repo": "enduro-trails"}
@@ -423,19 +427,19 @@ def test_ci_failure_development_retries_developer_under_limit(
assert mock_enqueue.call_args[0][0] == "developer"
# No escalation.
assert not mock_err.called
# Stage stays on development — no update_task_stage in the CI-failure path.
assert not mock_update_stage.called
# Stage stays on development — no stage write in the CI-failure path.
assert not mock_commit_cas.called
@patch("src.webhooks.gitea.notify_error")
@patch("src.webhooks.gitea.notify_qg_failure")
@patch("src.webhooks.gitea.enqueue_job")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.transition_lease.commit_stage_cas")
@patch("src.webhooks.gitea.get_db")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_ci_failure_development_escalates_at_limit(
mock_proj, mock_task, mock_get_db, mock_update_stage,
mock_proj, mock_task, mock_get_db, mock_commit_cas,
mock_enqueue, mock_qg, mock_err,
):
"""retry_count >= MAX_DEV_RETRIES → escalate via notify_error, no relaunch."""
@@ -458,8 +462,8 @@ def test_ci_failure_development_escalates_at_limit(
err_msg = " ".join(str(a) for a in mock_err.call_args[0])
assert "Max developer retries" in err_msg
assert "after CI failure" in err_msg
# Stage untouched.
assert not mock_update_stage.called
# Stage untouched (no stage write).
assert not mock_commit_cas.called
# ---------------------------------------------------------------------------
@@ -483,11 +487,11 @@ def _merged_pr_payload(branch="feature/ET-012-x"):
@patch("src.webhooks.gitea.notify_stage_change")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.transition_lease.commit_stage_cas")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_merge_on_deploy_stage_does_not_set_done(
mock_proj, mock_task, mock_update_stage, mock_notify,
mock_proj, mock_task, mock_commit_cas, mock_notify,
):
"""FIX 1: merge at deploy stage is ignored — done is gated by deployer verdict."""
from src.webhooks.gitea import handle_pr
@@ -499,28 +503,34 @@ def test_merge_on_deploy_stage_does_not_set_done(
asyncio.run(handle_pr(_merged_pr_payload()))
# The merge-driven done path must NOT run on deploy.
assert not mock_update_stage.called
# The merge-driven done path must NOT run on deploy (no stage write).
assert not mock_commit_cas.called
assert not mock_notify.called
@patch("src.webhooks.gitea.notify_stage_change")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.transition_lease.commit_stage_cas")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_merge_on_non_deploy_stage_sets_done(
mock_proj, mock_task, mock_update_stage, mock_notify,
mock_proj, mock_task, mock_commit_cas, mock_notify,
):
"""FIX 1: merge behaviour is preserved for non-deploy stages (e.g. review)."""
"""FIX 1: merge behaviour is preserved for non-deploy stages (e.g. review).
ORCH-114: the merge-driven done write now goes through the expected-stage CAS
helper (commit_stage_cas(task_id, current_stage, "done", repo)); on a won CAS the
notify still fires.
"""
from src.webhooks.gitea import handle_pr
mock_proj.return_value = {"repo": "enduro-trails"}
mock_task.return_value = {
"id": 2, "stage": "review", "work_item_id": "ET-013",
}
mock_commit_cas.return_value = True
asyncio.run(handle_pr(_merged_pr_payload(branch="feature/ET-013-x")))
# Non-deploy stages still get the merge-driven done.
mock_update_stage.assert_called_once_with(2, "done")
# Non-deploy stages still get the merge-driven done (review -> done via CAS).
mock_commit_cas.assert_called_once_with(2, "review", "done", "enduro-trails")
assert mock_notify.called