diff --git a/.env.example b/.env.example index 9e4bf8d..9119703 100644 --- a/.env.example +++ b/.env.example @@ -121,6 +121,24 @@ ORCH_TASK_DEPS_SOURCE=db ORCH_SERIAL_GATE_ENABLED=true ORCH_SERIAL_GATE_REPOS= ORCH_SERIAL_GATE_FREEZE_ENABLED=true +# ORCH-090: STOP-status task cancellation (stop active agent + full progress reset) +# and the relaunch-hole close. A dedicated Plane "STOP" status (logical key `stop`, +# fail-closed: absent from _DEFAULT_STATES, so a board without the status -> no-op) +# routes to a cancel handler that drives the task to the system-terminal state +# `cancelled` (stop agent via the graceful SIGTERM cascade, cancel all jobs, remove +# worktree + delete the remote feature branch [never main / never force-push], +# tombstone the natural keys for a clean re-create via "To Analyse"; docs preserved). +# STOP during a critical merge/deploy window is DEFERRED until the irreversible step +# finishes honestly. The relaunch-hole gate restricts the "To Analyse" agent relaunch +# to the `analysis` stage (the sole Needs-Input owner). Additive, never-raise. +# Infra precondition: create a "STOP" status with the `cancelled` group on the ORCH +# board (07-infra-requirements.md). Leaf src/cancel.py. +# STOP_STATUS_ENABLED=false -> STOP handling AND the relaunch-hole gate are inert +# (behaviour strictly as before ORCH-090). +# STOP_STATUS_REPOS (CSV) -> scope; EMPTY = ALL repos (cancellation is meaningful +# for enduro too). +ORCH_STOP_STATUS_ENABLED=true +ORCH_STOP_STATUS_REPOS= # ORCH-071/073: merge-verify under-gate on the `deploy -> done` edge (врезка in # advance_stage, NOT a new STAGE_TRANSITIONS edge / registered QG). A deterministic # merge-actor merges the feature code-PR via the Gitea PR-merge API (never push/ diff --git a/CHANGELOG.md b/CHANGELOG.md index a0c77e1..3c4cad0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,15 @@ Формат: [Keep a Changelog](https://keepachangelog.com/). Записи — на смысловой PR/задачу. ## [Unreleased] +- **Отмена задачи: Plane-статус STOP (остановка агента + полный сброс) + закрытие дыры релонча** (ORCH-090, `feat`): выделенный Plane-статус **STOP** — единый декларативный механизм отмены задачи вместо ручной хирургии по БД/процессам. Вводит **новое системное терминальное состояние `cancelled`** (стадия `tasks.stage='cancelled'` + job-исход `jobs.status='cancelled'`), равноправное `done`. **Аддитивно, под kill-switch, never-raise, restart-safe:** `STAGE_TRANSITIONS` (exit-гейты рёбер) / `QG_CHECKS` / `check_*` / семантика существующих статусов — **не тронуты** (`cancelled` — терминальный сток, не новое ребро); enduro не затронут; при `stop_status_enabled=false` — нулевая регрессия. + - **Распознавание (fail-closed):** новый логический ключ `stop` в `_PLANE_NAME_TO_KEY` (`"STOP" → "stop"`), **намеренно отсутствует** в `_DEFAULT_STATES` (по образцу `confirm_deploy`/ORCH-059) → доска без статуса STOP резолвит `None` → ветка не активируется (нет `KeyError`, нет слепой отмены). `handle_issue_updated` маршрутизирует `stop` → `handle_stop` → `stage_engine.cancel_task` (проверяется ПЕРВЫМ, до to_analyse/approved/rejected). + - **Полный сброс (вне критичного окна, AC-1..AC-4):** graceful SIGTERM активного агента через переиспользуемый каскад `launcher.stop_process` (вынесен из `_watchdog`: SIGTERM → grace → SIGKILL) по `jobs.pid`; `db.cancel_jobs_for_task` (queued/running → терминальный `cancelled`, нигде не реквью'ится — `claim_next_job` берёт только `queued`); `git_worktree.remove_worktree` + новый never-raise `src/gitea.py::delete_remote_branch` (удаляет **только** feature-ветку; `main`/`master` — явный гард-отказ; без force-push); durable `stage='cancelled'` + `cancelled_at`; **тумбстон** натуральных ключей суффиксом `#cancelled-`. Docs-артефакты (`01..17`) сохраняются. + - **Уточнение ADR-001 D4 (при реализации):** ADR предлагал сохранить `plane_issue_id` нетронутым, но `get_task_by_plane_id`/`create_task_atomic` матчат по `plane_id OR plane_issue_id` — нетумбстоненный `plane_issue_id` заблокировал бы clean-slate re-create (BR-3/TR-4). Поэтому `plane_issue_id` тоже тумбстонится; исходный UUID (== исходный `plane_id` во всех путях создания) парсится из детерминированного суффикса для аудита. Зафиксировано в коде/`docs/architecture/README.md`/CLAUDE.md. + - **Безопасное прерывание merge/deploy (AC-7, NFR-3):** STOP в критическом окне (self-deploy `INITIATED`-sentinel ORCH-036 / держание merge-lease ORCH-043) → **отложенная отмена** (`cancel.in_critical_window` fail-CLOSED): durable `tasks.cancel_requested_at`, снимаются только `queued`-job'ы (running-актор деплоя/мержа не трогается), алерт; детерминированный `run_deploy_finalizer` доводит необратимый шаг до честного исхода и применяет отмену (`cancel_task(force=True)`; задача, дошедшая до `done`, — честный no-op, код уже в проде). STOP **никогда** не трогает `main`/force-push/прод-контейнер/detached-процесс. + - **Кросс-каттинг (adr-0026):** предикат «задача терминальна» расширен `{done}` → `{done, cancelled}` в `serial_gate.py` (ORCH-088: `repo_has_active_task`, claim-фрагмент, snapshot), `db.claim_next_job`/`get_unfinished_dependencies` (task_deps ORCH-026) и `stages.py`-сток — иначе отменённая задача заклинила бы очередь репо (TR-1); reconciler-терминал-скип уже знал `cancelled` (ORCH-086 D2). `job_reaper`/`queue_worker` ПЕРЕД авто-requeue сверяют терминал задачи → помечают job `cancelled`, не реквью'ят (закрыта гонка SIGTERM/reaper, TR-2). + - **Закрытие дыры релонча (AC-5, D6):** `handle_status_start` больше не релончит агента середины пайплайна при ручном переводе в промежуточный статус — relaunch ограничен стадией `analysis` (единственный владелец Needs Input, ORCH-066); единственный вход к запуску пайплайна остаётся «To Analyse» (`start_pipeline`). Под `stop_status_enabled=false` гейт инертен (1:1 как раньше). + - **Флаги/наблюдаемость:** `stop_status_enabled` (kill-switch, env `ORCH_STOP_STATUS_ENABLED`) + `stop_status_repos` (CSV, пусто → все репо); leaf `src/cancel.py` (`applies`/`in_critical_window`/`snapshot`, never-raise); read-only блок `stop` в `GET /queue`; лог + Telegram (кликабельный номер) + Plane-коммент + `update_task_tracker`. Аддитивные идемпотентные миграции (`_ensure_column` для `cancelled_at`/`cancel_requested_at`). **Инфра-предусловие:** создать статус **STOP** с группой `cancelled` на доске Plane проекта ORCH (его отсутствие = fail-safe no-op). + - Тесты: `tests/test_stop_status.py` (TC-01..TC-14 + D7-кейсы, 26 кейсов; SIGTERM/git/gitea замоканы — ни один тест не шлёт сигнал/не трогает сеть); обновлены анти-регресс-тесты STAGE_TRANSITIONS 5 прошлых задач (добавлен терминал-сток `cancelled`); полный регресс `tests/` зелёный (1345). Документация: `docs/architecture/README.md` (статус «реализовано» + блок `/queue` + раздел БД), `CLAUDE.md`, `README.md`, `.env.example`. ADR: `docs/work-items/ORCH-090/06-adr/ADR-001-stop-cancel-task.md`, сквозной `docs/architecture/adr/adr-0026-stop-cancel-task.md`. Откат: `ORCH_STOP_STATUS_ENABLED=false` (аддитивные колонки/терминал-набор инертны при отсутствии отменённых задач). - **Build-cache-pruner: авто-prune docker build cache на mva154** (ORCH-062, `feat`): новый фоновый daemon-поток `src/build_cache_pruner.py` (каркас `disk_watchdog`) — «вторая половина» disk-watchdog (ORCH-063): **watchdog сигналит — pruner убирает**. Устраняет корень инцидента 07.06.2026 (docker build cache ≈11 ГБ → диск mva154 100% → падение self-hosting-конвейера всех проектов) **автоматически, без оператора**. **Аддитивно, never-raise:** `STAGE_TRANSITIONS`/`QG_CHECKS`/`check_*`/`_parse_*`/`src/stage_engine.py`/схема БД — **не тронуты**, новой миграции нет (состояние last-run/last-result — in-memory, best-effort). - **Периодическая уборка (FR-1/AC-1):** каждые `build_cache_prune_interval_s` (дефолт **21600с = 6ч**) тик выполняет **строго `docker builder prune -f --filter until=`** (BuildKit GC). Анти-частота — pure-функция `decide_prune(prev_run_ts, now, interval_s)` (юнит-тестируема без потока/таймера, время инъецируется). Дефолт `until=24h` удерживает тёплый недавний кэш (BR-2/AC-2); `-a/--all` (`build_cache_prune_all`, дефолт `False`) — **только в паре** с возрастным фильтром. - **Self-hosting безопасность (FR-3/AC-3):** команда затрагивает **только** build cache — **нет** `docker image prune`/`docker system prune`, удаления образов/контейнеров запущенных сервисов, остановки/рестарта контейнеров; прод-контейнер `orchestrator` **никогда** не рестартится. Уборка исполняется **на хосте через ssh** (`deploy_ssh_user@deploy_ssh_host`, тот же канал, что `image_freshness`/`self_deploy` — в образе нет docker CLI). Нет ssh-таргета → тик no-op (наблюдаемо в `status().last_error`). diff --git a/CLAUDE.md b/CLAUDE.md index 8c61b44..0d79278 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -111,6 +111,39 @@ created → analysis → architecture → development → review → testing → Детали — `docs/work-items/ORCH-089/06-adr/ADR-001-auto-label-gates.md`, `docs/architecture/adr/adr-0018-auto-label-gates.md`. +## Отмена задачи: статус STOP (ORCH-090) +Выделенный Plane-статус **STOP** — операторская кнопка «отменить + сбросить» задачу. Вводит +**новое системное терминальное состояние `cancelled`** (стадия `tasks.stage='cancelled'` + job-исход +`jobs.status='cancelled'`), равноправное `done`. Логический ключ `stop` — **fail-closed** (нет в +`_DEFAULT_STATES`, по образцу `confirm_deploy`/ORCH-059): доска без статуса STOP → ветка не +активируется. Маршрут `handle_issue_updated → handle_stop → stage_engine.cancel_task`: +- **Полный сброс** (вне критичного окна): graceful SIGTERM активного агента (`launcher.stop_process`, + переиспользует каскад `_watchdog`), все job'ы → терминальный `cancelled` (не реквью'ятся: + `claim_next_job` берёт только `queued`, reaper/worker сверяют терминал задачи — TR-2), удаление + worktree + **рабочей** Gitea-ветки (`gitea.delete_remote_branch`, **никогда** `main`, без + force-push), durable `stage='cancelled'` + **тумбстон** натуральных ключей (`plane_id`/ + `work_item_id`/`plane_issue_id` → суффикс `#cancelled-`; ADR-001 D4 уточнён: тумбстонится и + `plane_issue_id`, т.к. `get_task_by_plane_id`/`create_task_atomic` матчат по нему — иначе re-create + коллизирует; исходный UUID парсится из суффикса для аудита). Docs-артефакты (`01..17`) сохраняются. +- **STOP в критичном окне merge/deploy** (ADR-001 D7): `cancel.in_critical_window` (INITIATED-sentinel + self-deploy ORCH-036 / держание merge-lease ORCH-043) → **отложенная** отмена: `tasks.cancel_requested_at`, + снимаются только `queued` job'ы (running-актор деплоя/мержа не трогается), алерт; детерминированный + finalizer (`run_deploy_finalizer`) доводит необратимый шаг до честного исхода и применяет отмену + (`force=True`). STOP **никогда** не трогает `main`/force-push/прод-контейнер/detached-процесс (NFR-3). +- **Кросс-каттинг (adr-0026):** предикат «задача терминальна» расширен `{done}` → `{done, cancelled}` + в `serial_gate`/`task_deps`/`stages.py`-сток (иначе отменённая задача заклинит очередь репо); + reconciler-терминал-скип уже знал `cancelled` (ORCH-086). `STAGE_TRANSITIONS` exit-гейты рёбер / + `QG_CHECKS` / `check_*` — **не тронуты** (`cancelled` — сток, не ребро). +- **Дыра релонча закрыта (D6):** relaunch агента в `handle_status_start` ограничен стадией `analysis` + (единственный владелец Needs Input, ORCH-066); ручной перевод существующей задачи в иной промежуточный + статус больше не релончит середину пайплайна. Запуск пайплайна — только «To Analyse» → `start_pipeline`. +- Флаги `stop_status_enabled` (kill-switch; `False` → всё инертно, нулевая регрессия) / `stop_status_repos` + (CSV; пусто → все репо). Leaf `src/cancel.py` (never-raise). Read-only блок `stop` в `GET /queue`. + Аддитивные колонки `tasks.cancelled_at`/`cancel_requested_at` (`_ensure_column`). **Инфра-предусловие:** + создать статус **STOP** с группой `cancelled` на доске ORCH (его отсутствие = fail-safe no-op). Детали — + `docs/work-items/ORCH-090/06-adr/ADR-001-stop-cancel-task.md`, + `docs/architecture/adr/adr-0026-stop-cancel-task.md`. + ## Конвенции - Conventional Commits (`feat:`, `fix:`, `docs:`, `refactor:`, `test:`) - Ветки: `feature/ORCH-NNN-slug`, `fix/ORCH-NNN-slug` diff --git a/README.md b/README.md index 0b116c4..780c276 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,8 @@ uvicorn src.main:app --reload --port 8500 | `ORCH_RECONCILE_NOTIFY_UNBLOCK` | Telegram при разблокировке застрявшей задачи | `true` | | `ORCH_RECONCILE_SKIP_BLOCKED_ENABLED` | F-1 Guard 2 (ORCH-060): пропуск задач в Plane-статусе Blocked / Needs Input; `false` глушит только сетевой Guard 2 (Guard 1 escalated всегда активен) | `true` | | `ORCH_QG0_TITLE_MAX` | Верхний лимит длины заголовка QG-0 (вход `_qg0_errors`); невалидное/пустое значение → дефолт (ORCH-069) | `200` | +| `ORCH_STOP_STATUS_ENABLED` | Kill-switch отмены задачи по Plane-статусу **STOP** + закрытия дыры релонча (ORCH-090); `false` → поведение 1:1 как до ORCH-090 | `true` | +| `ORCH_STOP_STATUS_REPOS` | CSV область репо для STOP-отмены; пусто = все репо (ORCH-090) | `""` | ## Очередь задач (ORCH-1 / F-2b) @@ -154,7 +156,30 @@ Webhook-хэндлеры больше не спавнят claude-агентов - **Ретраи.** Упавший job (exit≠0) ретраится пока `attempts < max_attempts`, потом `failed` + Telegram-нотификация. -Статусы job: `queued → running → done | failed`. Наблюдаемость — через `GET /queue`. +Статусы job: `queued → running → done | failed`; **`cancelled`** — терминальный +исход STOP-отмены (ORCH-090), нигде не реквью'ится. Наблюдаемость — через `GET /queue`. + +## Отмена задачи: статус STOP (ORCH-090) + +Перевод задачи в выделенный Plane-статус **STOP** отменяет её: оркестратор +останавливает активного агента (graceful SIGTERM-каскад), снимает все job'ы +(терминальный `cancelled`, без авто-requeue), удаляет worktree и **рабочую** +ветку в Gitea (**никогда** `main`, без force-push), сбрасывает прогресс в +durable-терминал `tasks.stage='cancelled'` и тумбстонит натуральные ключи +(`#cancelled-`), чтобы повторный «To Analyse» создал задачу **с нуля**. +Docs-артефакты (`01..17`) сохраняются. STOP во время критичного шага merge/deploy +— **откладывается** до его честного завершения (никакого half-merge / рестарта +прода). Параллельно закрыта «дыра релонча»: ручной перевод в промежуточный рабочий +статус больше не релончит агента — единственный вход к запуску пайплайна остаётся +«To Analyse» (релонч агента сменой статуса разрешён только на стадии `analysis` — +владельце Needs Input). Всё под kill-switch `ORCH_STOP_STATUS_ENABLED`, аддитивно, +never-raise. Наблюдаемость — блок `stop` в `GET /queue`. Деталь — `docs/work-items/ +ORCH-090/06-adr/ADR-001-stop-cancel-task.md` + сквозной +`docs/architecture/adr/adr-0026-stop-cancel-task.md`. + +> **Инфра-предусловие:** на доске Plane проекта ORCH создать статус **«STOP»** с +> группой `cancelled`. До создания статуса фича в fail-safe (нет UUID → ветка STOP +> не активируется). **Resilience-слой:** дешёвый preflight (CLI/net, кэш, без токенов) гейтит claim; 429/overload детектится по логу (transient vs permanent), transient ретраится с diff --git a/docs/architecture/README.md b/docs/architecture/README.md index 8d89675..facca2a 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -278,7 +278,7 @@ Phase A ждёт ручного `Confirm Deploy`, ORCH-059). ORCH-089 снима `docs/work-items/ORCH-089/06-adr/ADR-001-auto-label-gates.md`, `docs/work-items/ORCH-089/07-infra-requirements.md`. -### STOP / отмена задачи: терминал `cancelled` + закрытие дыры релонча (ORCH-090 — design) +### STOP / отмена задачи: терминал `cancelled` + закрытие дыры релонча (ORCH-090 — реализовано) До ORCH-090 не было штатного способа отменить задачу (ручная хирургия по БД/процессам) и существовала **дыра релонча**: `handle_status_start` при существующей задаче без активного job @@ -298,9 +298,16 @@ Phase A ждёт ручного `Confirm Deploy`, ORCH-059). ORCH-089 снима - **Каскад отмены:** graceful SIGTERM активному агенту (переиспользование каскада `launcher._watchdog` по `jobs.pid`); `cancel_jobs_for_task` (queued/running → `cancelled`, не реквью'ятся); снятие таймеров/мониторов (brd-clock, post-deploy monitor, defer'ы); - `remove_worktree` + never-raise удаление **только feature-ветки** Gitea (`main` неприкосновенен, - без force-push); **тумбстон** `plane_id`/`work_item_id` (`#cancelled-`) → повторный - «To Analyse» создаёт задачу с нуля; docs-артефакты (`01..17`) сохраняются. + `remove_worktree` + never-raise удаление **только feature-ветки** Gitea (`gitea.delete_remote_branch`; + `main`/`master` неприкосновенны — явный гард; без force-push); **тумбстон** `plane_id`/`work_item_id`/ + **`plane_issue_id`** (суффикс `#cancelled-`) → `get_task_by_plane_id` возвращает None → повторный + «To Analyse» создаёт задачу с нуля; docs-артефакты (`01..17`) сохраняются. Аддитивные колонки + `tasks.cancelled_at`/`cancel_requested_at` (`_ensure_column`). + > **Уточнение ADR-001 D4 (при реализации):** ADR предлагал сохранить `plane_issue_id` нетронутым, но + > `get_task_by_plane_id`/`create_task_atomic` матчат по `plane_id OR plane_issue_id` — нетумбстоненный + > `plane_issue_id` оставил бы отменённую строку «находимой» и заблокировал бы re-create (BR-3/TR-4). + > Поэтому он тоже тумбстонится; исходный UUID (== исходный `plane_id` во всех путях создания) парсится + > из детерминированного суффикса для аудита. - **Безопасное прерывание merge/deploy:** STOP в критическом окне (self-deploy `INITIATED`-sentinel ORCH-036, держание merge-lease ORCH-043/071) → **отложенная отмена** (durable `cancel_requested_at`, отмена только `queued`-job'ов, алерт); необратимый шаг доводится до @@ -782,9 +789,9 @@ Monitoring after Deploy → Done ## База данных (SQLite) - `events` — входящие вебхуки (дедуп) -- `tasks` — задачи и их стадии +- `tasks` — задачи и их стадии; колонки `cancelled_at`/`cancel_requested_at` (ORCH-090) — durable-метки STOP-отмены (вторая — отложенная отмена в критичном окне merge/deploy). Терминальная стадия `cancelled` (сток, параллельно `done`); натуральные ключи отменённой строки тумбстонятся суффиксом `#cancelled-` (`plane_id`/`work_item_id`/`plane_issue_id`) - `agent_runs` — запуски агентов (run_id, usage, cost) -- `jobs` — очередь задач (ORCH-1); колонка `pid` (ORCH-065) — pid агентского процесса для liveness-детекции зомби job-reaper'ом +- `jobs` — очередь задач (ORCH-1); статусы `queued|running|done|failed|cancelled` (ORCH-090: `cancelled` — терминальный исход STOP, нигде не реквью'ится); колонка `pid` (ORCH-065) — pid агентского процесса для liveness-детекции зомби job-reaper'ом - `job_deps` — декларативные зависимости задач (ORCH-026, Уровень B): `(task_id, depends_on_task_id)`, аддитивная; источник истины планировщика для гейта «B ждёт A» - `repo_freeze` — durable per-repo rollback-freeze (ORCH-088, FR-5): `(id, repo, frozen_at, reason, work_item_id, cleared_at)`, аддитивная append-only; активный freeze ⇔ строка репо с `cleared_at IS NULL`. Выставляется post-deploy `DEGRADED` (`set_repo_freeze`), снимается вручную (`POST /serial-gate/unfreeze` → `cleared_at=now`). Гейтит serial-claim безусловно (деградировавшая задача уже `done`) @@ -796,7 +803,7 @@ Monitoring after Deploy → Done |--------|------|----------| | GET | `/health` | health check | | GET | `/status` | активные задачи (stage != done) | -| GET | `/queue` | очередь: counts + max_concurrency + resilience + reconcile (ORCH-053) + reaper (ORCH-065) + post_deploy (ORCH-021) + task_deps (ORCH-026) + serial_gate (ORCH-088) + последние jobs | +| GET | `/queue` | очередь: counts + max_concurrency + resilience + reconcile (ORCH-053) + reaper (ORCH-065) + post_deploy (ORCH-021) + task_deps (ORCH-026) + serial_gate (ORCH-088) + auto_labels (ORCH-089) + stop (ORCH-090) + последние jobs | | POST | `/serial-gate/unfreeze` | ORCH-088 (FR-5): ручное снятие per-repo rollback-freeze (query/body `repo=`) → `{ok, repo, cleared, frozen}`; идемпотентно. Альтернатива — `UPDATE repo_freeze SET cleared_at=datetime('now') WHERE repo=? AND cleared_at IS NULL` | | POST | `/webhook/plane` | Plane webhook | | POST | `/webhook/gitea` | Gitea webhook (push, PR, CI status) | diff --git a/src/agents/launcher.py b/src/agents/launcher.py index 3d4f796..15eb41d 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -679,17 +679,47 @@ class AgentLauncher: if timeout is None: timeout = self._resolve_timeout(agent) time.sleep(timeout) + # ORCH-090: the SIGTERM->grace->SIGKILL cascade is now a reusable helper + # (stop_process) shared with the STOP-cancellation path. The timeout + # watchdog just sleeps the timeout, then drives the cascade. + logger.warning( + f"Agent run_id={run_id} exceeded {timeout}s timeout (pid={pid})" + ) + self.stop_process(pid, run_id, reason=f"timeout>{timeout}s") + def stop_process(self, pid: int, run_id: int | None, *, reason: str = "stop") -> bool: + """ORCH-7 / ORCH-090 (ADR-001 D2): graceful SIGTERM->grace->SIGKILL cascade. + + Extracted from ``_watchdog`` so the STOP-cancellation path + (``stage_engine.cancel_task``) stops an active agent through the SAME + graceful cascade instead of a new "dirty" kill (AC-1). Send SIGTERM, give + the process up to ``settings.agent_kill_grace_seconds`` to flush and exit, + SIGKILL only if it is still alive after the grace; stamp ``agent_runs`` + exit_code=-9 via ``_record_kill`` whenever a kill actually happened. + + never-raise; ``ProcessLookupError`` is tolerated at every step (the process + may already be gone). Returns True iff a SIGTERM was delivered to a live + process; False when the process was already gone (no record — the monitor's + ``proc.wait()`` owns that exit). + """ + if pid is None: + return False # Phase 1: SIGTERM (graceful). If the process is already gone, we're done. try: os.kill(pid, signal.SIGTERM) logger.warning( - f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM " - f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s" + f"stop_process ({reason}): sent SIGTERM to pid={pid} " + f"(run_id={run_id}), grace={settings.agent_kill_grace_seconds}s" ) except ProcessLookupError: - logger.info(f"Agent run_id={run_id} already exited before SIGTERM") - return # nothing to record: the monitor's proc.wait() owns the exit + logger.info( + f"stop_process ({reason}): pid={pid} already exited " + f"(run_id={run_id}); nothing to record" + ) + return False + except Exception as e: # noqa: BLE001 - never-raise + logger.warning(f"stop_process SIGTERM error pid={pid}: {e}") + return False # Phase 2: poll for graceful exit within the grace window. grace = settings.agent_kill_grace_seconds @@ -702,21 +732,27 @@ class AgentLauncher: os.kill(pid, 0) # signal 0 = liveness probe, does not kill except ProcessLookupError: logger.info( - f"Agent run_id={run_id} exited gracefully after SIGTERM " - f"({waited:.1f}s); no SIGKILL needed" + f"stop_process ({reason}): pid={pid} exited gracefully after " + f"SIGTERM ({waited:.1f}s); no SIGKILL needed" ) self._record_kill(run_id) - return + return True + except Exception: # noqa: BLE001 - probe error -> escalate to SIGKILL + break # Phase 3: still alive -> hard SIGKILL. try: os.kill(pid, signal.SIGKILL) logger.warning( - f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL" + f"stop_process ({reason}): pid={pid} did not exit within {grace}s " + f"grace: sent SIGKILL" ) except ProcessLookupError: - logger.info(f"Agent run_id={run_id} exited just before SIGKILL") + logger.info(f"stop_process ({reason}): pid={pid} exited just before SIGKILL") + except Exception as e: # noqa: BLE001 - never-raise + logger.warning(f"stop_process SIGKILL error pid={pid}: {e}") self._record_kill(run_id) + return True @staticmethod def _record_kill(run_id: int): diff --git a/src/cancel.py b/src/cancel.py new file mode 100644 index 0000000..f30256d --- /dev/null +++ b/src/cancel.py @@ -0,0 +1,144 @@ +"""ORCH-090 (ADR-001 D9 / adr-0026): STOP-cancellation leaf — pure decision logic. + +Leaf module mirroring ``src/serial_gate.py`` / ``src/labels.py``: pure, +unit-testable, never-raise functions over config + the existing DB / deploy-state. +Module-level imports are limited to ``config`` (and ``re``); the critical-window +probe lazily imports ``self_deploy`` / ``merge_gate`` so a cycle can never form and +an import failure degrades safely. + +What it answers: + * ``applies(repo)`` — is STOP-cancellation REAL for this repo? + * ``in_critical_window(task)``— is the task inside an irreversible merge/deploy + step where cancellation must be DEFERRED (ADR-001 D7) instead of applied now? + * ``snapshot()`` — read-only summary for ``GET /queue`` (AC-10). + +The ORCHESTRATION of a cancellation (SIGTERM, cancel-jobs, worktree/branch +cleanup, key tombstone, notifications) lives in ``stage_engine.cancel_task`` — this +leaf only decides, it never mutates. + +never-raise contract (self-hosting safety): every public function degrades +conservatively. ``applies`` -> False on error (gate inert, the kill-switch-off +default). ``in_critical_window`` -> True on doubt (fail-CLOSED: when we cannot +confirm we are OUTSIDE a critical window, DEFER cancellation rather than risk +tearing a half-merge / detached prod deploy, NFR-3 / TR-3). +""" +from __future__ import annotations + +import logging +import re + +from .config import settings + +logger = logging.getLogger("orchestrator.cancel") + +# Repo tokens in the CSV scope must match this (mirrors serial_gate._REPO_TOKEN). +_REPO_TOKEN = re.compile(r"^[A-Za-z0-9._-]+$") + + +def _scope_repos() -> set[str]: + """Sanitised set of in-scope repo tokens from ``stop_status_repos`` (CSV). + + Empty/blank CSV -> empty set, meaning "apply to ALL repos" (D9). Invalid tokens + (regex miss) are dropped. Never raises. + """ + try: + raw = (settings.stop_status_repos or "").strip() + except Exception: # noqa: BLE001 + return set() + if not raw: + return set() + out: set[str] = set() + for tok in raw.split(","): + t = tok.strip() + if t and _REPO_TOKEN.match(t): + out.add(t) + elif t: + logger.warning("cancel: dropping invalid repo token %r from CSV", t) + return out + + +def applies(repo: str) -> bool: + """Whether STOP-cancellation is REAL for this repo (D9 / AC-8). + + * ``stop_status_enabled=False`` -> always False (kill-switch; STOP handling and + the relaunch-hole gate are 1:1 as before ORCH-090). + * ``stop_status_repos`` (CSV) non-empty -> real only for listed repos. + * empty CSV -> real for ALL repos (cancellation is meaningful for enduro too). + Never raises -> False on error (degrade to "inert", matching kill-switch off). + """ + try: + if not getattr(settings, "stop_status_enabled", False): + return False + scope = _scope_repos() + if scope: + return (repo or "").strip() in scope + return True + except Exception as e: # noqa: BLE001 - never-raise + logger.warning("cancel.applies error for %s: %s", repo, e) + return False + + +def in_critical_window(task: dict) -> bool: + """Is the task inside an irreversible merge/deploy step (ADR-001 D7 / AC-7)? + + A STOP that lands here must NOT tear the step apart (half-merge / detached prod + deploy / dead prod container, NFR-3). Two markers (existing, no new state): + * self-deploy Phase B initiated — the ``INITIATED`` sentinel in + ``/.deploy-state-//`` (ORCH-036); + * the task currently HOLDS the per-repo merge-lease + ``/.merge-lease-.json`` (ORCH-043), holder branch == task + branch. + + fail-CLOSED (TR-3): any error/uncertainty -> True (DEFER cancellation). Outside + the window -> False (apply the full reset immediately). + """ + if not task: + return False + repo = task.get("repo") + work_item_id = task.get("work_item_id") + branch = task.get("branch") + try: + from . import self_deploy + if self_deploy.has_marker(repo, work_item_id, self_deploy.INITIATED): + return True + except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt + logger.warning("cancel.in_critical_window self_deploy probe error: %s", e) + return True + try: + from . import merge_gate + holder = merge_gate.current_lease_holder(repo) + if holder and branch and holder == branch: + return True + except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt + logger.warning("cancel.in_critical_window merge-lease probe error: %s", e) + return True + return False + + +def snapshot() -> dict: + """Read-only STOP-cancellation summary for GET /queue (AC-10). + + Additive block; existing /queue keys are untouched. never-raise -> a minimal + dict with the flags on error. + """ + try: + enabled = bool(getattr(settings, "stop_status_enabled", False)) + except Exception: # noqa: BLE001 + enabled = False + try: + repos_cfg = getattr(settings, "stop_status_repos", "") or "" + except Exception: # noqa: BLE001 + repos_cfg = "" + try: + from . import db + stats = db.cancelled_tasks_snapshot(10) + except Exception as e: # noqa: BLE001 - never-raise + logger.warning("cancel.snapshot error: %s", e) + stats = {"count": 0, "pending": 0, "recent": []} + return { + "enabled": enabled, + "repos": repos_cfg, + "cancelled_count": stats.get("count", 0), + "deferred_pending": stats.get("pending", 0), + "recent": stats.get("recent", []), + } diff --git a/src/config.py b/src/config.py index 8080608..bca46f8 100644 --- a/src/config.py +++ b/src/config.py @@ -605,6 +605,25 @@ class Settings(BaseSettings): serial_gate_repos: str = "" serial_gate_freeze_enabled: bool = True + # ORCH-090: STOP-status task cancellation (stop active agent + full progress + # reset) and the relaunch-hole close. A new logical Plane key `stop` (fail-closed, + # absent from _DEFAULT_STATES) routes to a cancel handler that drives the task to + # the new system-terminal state `cancelled` (stage + durable). Additive, + # never-raise, restart-safe; STAGE_TRANSITIONS / QG_CHECKS / check_* / existing + # status semantics are NOT touched. See + # docs/work-items/ORCH-090/06-adr/ADR-001-stop-cancel-task.md and the cross-cutting + # docs/architecture/adr/adr-0026-stop-cancel-task.md. + # stop_status_enabled -> kill-switch (env ORCH_STOP_STATUS_ENABLED). False -> + # STOP handling AND the relaunch-hole gate are inert + # (behaviour strictly as before ORCH-090 — zero + # regression, AC-8). + # stop_status_repos -> CSV scope (env ORCH_STOP_STATUS_REPOS). Empty -> applies + # to ALL repos (cancellation is meaningful for enduro too); + # non-empty -> only the listed repos. Tokens are sanitised + # (^[A-Za-z0-9._-]+$) by the cancel leaf. + stop_status_enabled: bool = True + stop_status_repos: str = "" + # ORCH-073 (ADR-001 Р-4): main-integrity regression guard. After the merge-verify # under-gate confirms the deployed SHA is an ancestor of origin/main (FR-1), a # secondary deterministic (no-LLM) guard checks that a declarative set of markers diff --git a/src/db.py b/src/db.py index 05bec92..513c712 100644 --- a/src/db.py +++ b/src/db.py @@ -59,7 +59,7 @@ def init_db(): repo TEXT NOT NULL, task_id INTEGER, -- FK tasks.id (nullable) task_content TEXT, -- written to the agent task_file - status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed + status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed|cancelled (ORCH-090: cancelled is a terminal outcome, never requeued) attempts INTEGER NOT NULL DEFAULT 0, max_attempts INTEGER NOT NULL DEFAULT 2, run_id INTEGER, -- agent_runs.id once started @@ -129,6 +129,17 @@ def init_db(): # tracker can show "твоё время" without recomputing from activity history. _ensure_column(conn, "tasks", "brd_review_started_at", "TEXT") _ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT") + # ORCH-090 (08-data-requirements.md): STOP-cancellation durable markers. Both are + # additive, idempotent (_ensure_column is a no-op once present) -> safe on the live + # shared prod DB (enduro untouched). The durable terminal itself is tasks.stage= + # 'cancelled' (already understood by the reconciler terminal-skip); these columns + # are audit/observability + the deferred-cancel signal. + # cancelled_at -> timestamp the task was cancelled (NULL otherwise). + # cancel_requested_at -> STOP arrived inside a critical merge/deploy window + # (ADR-001 D7): cancellation is DEFERRED until the + # irreversible step finishes honestly, then applied. + _ensure_column(conn, "tasks", "cancelled_at", "TEXT") + _ensure_column(conn, "tasks", "cancel_requested_at", "TEXT") # ORCH-026 (Level B): declarative task dependencies. job_deps stores the # directed edge "task_id (B) is blocked-by depends_on_task_id (A)". The # scheduler gate in claim_next_job keeps B queued until every A reaches @@ -231,6 +242,13 @@ def get_active_tasks_for_reconcile() -> list[dict]: ``age_s`` = seconds since ``tasks.updated_at`` (computed in SQL against UTC 'now', matching how ``update_task_stage`` stamps ``updated_at``). The reconciler applies the per-stage grace and active-job guard on top. + + ORCH-090 (adr-0026): a ``cancelled`` task is DELIBERATELY still returned here + and skipped by the reconciler's own terminal-skip (``stage in + ('done','cancelled')``, ORCH-086 D2) — narrowing the query to exclude + ``cancelled`` would lose the observability skip-counter increment that ORCH-086 + relies on. The terminal set is harmonised in the *scheduler* predicates + (serial_gate / task_deps), not here. """ conn = get_db() try: @@ -605,7 +623,9 @@ def claim_next_job() -> dict | None: dep_gate = ( "AND NOT EXISTS (" " SELECT 1 FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id " - " WHERE d.task_id = jobs.task_id AND t.stage != 'done'" + # ORCH-090 (adr-0026): a cancelled predecessor is TERMINAL -> the + # dependent must NOT wait on it forever. Terminal set = {done,cancelled}. + " WHERE d.task_id = jobs.task_id AND t.stage NOT IN ('done','cancelled')" ") " ) # ORCH-088 (FR-1, ADR-001 D1): per-repo serial gate. An analyst-job of a NEW @@ -683,11 +703,11 @@ def mark_job( run_id: int | None = None, error: str | None = None, ): - """Update a job's status (queued|running|done|failed). + """Update a job's status (queued|running|done|failed|cancelled). - run_id (optional): link to the agent_runs row that executed this job. - error (optional): last error message (for failed/retry). - - 'done'/'failed' also stamp finished_at. + - 'done'/'failed'/'cancelled' (ORCH-090) also stamp finished_at. - 'queued' (requeue for retry) clears started_at/finished_at so the next claim treats it as fresh. """ @@ -700,7 +720,7 @@ def mark_job( if error is not None: sets.append("error = ?") params.append(error) - if status in ("done", "failed"): + if status in ("done", "failed", "cancelled"): sets.append("finished_at = datetime('now')") elif status == "queued": sets.append("started_at = NULL") @@ -728,6 +748,178 @@ def has_active_job_for_task(task_id: int) -> bool: return row is not None +# --------------------------------------------------------------------------- +# ORCH-090: STOP-cancellation helpers (task + jobs terminal state) +# --------------------------------------------------------------------------- + +def get_task(task_id: int) -> dict | None: + """Fetch a single task row by id (None when absent).""" + conn = get_db() + try: + row = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone() + finally: + conn.close() + return dict(row) if row else None + + +def get_active_jobs_for_task(task_id: int) -> list[dict]: + """ORCH-090: queued/running jobs of a task (for STOP — stop agent + cancel). + + Returns the full job rows (incl. ``pid`` / ``run_id`` / ``status``) so the + cancel orchestrator can SIGTERM the running agent by ``jobs.pid`` and then flip + every job to the terminal ``cancelled`` outcome. + """ + conn = get_db() + try: + rows = conn.execute( + "SELECT * FROM jobs WHERE task_id = ? AND status IN ('queued','running') " + "ORDER BY id", + (task_id,), + ).fetchall() + finally: + conn.close() + return [dict(r) for r in rows] + + +def cancel_jobs_for_task(task_id: int, only_queued: bool = False) -> int: + """ORCH-090 (ADR-001 D3): flip a task's jobs to the terminal ``cancelled`` outcome. + + Guarded UPDATE over ``status IN ('queued','running')`` (or only ``'queued'`` when + ``only_queued`` — the deferred-cancel path inside a critical merge/deploy window, + D7, which must NOT cancel the still-running deploy/merge actor). ``cancelled`` is + never requeued: ``claim_next_job`` only selects ``status='queued'`` and the reaper + / worker check the task's terminal stage before any requeue. Returns the number of + jobs cancelled. never-raise -> 0 on error. + """ + statuses = "('queued')" if only_queued else "('queued','running')" + try: + conn = get_db() + try: + cur = conn.execute( + f"UPDATE jobs SET status='cancelled', finished_at=datetime('now') " + f"WHERE task_id = ? AND status IN {statuses}", + (task_id,), + ) + conn.commit() + return cur.rowcount or 0 + finally: + conn.close() + except Exception: + return 0 + + +def mark_task_cancelled(task_id: int) -> bool: + """ORCH-090 (ADR-001 D4): durable terminal + natural-key tombstone for a task. + + Atomically (single UPDATE): + * ``stage='cancelled'`` (durable terminal, understood by the reconciler skip); + * ``cancelled_at=now``, ``cancel_requested_at=NULL`` (clear any deferred flag); + * TOMBSTONE the natural keys so a later "To Analyse" re-creates the task FROM + SCRATCH: ``plane_id`` / ``work_item_id`` / ``plane_issue_id`` get a + deterministic ``#cancelled-`` suffix -> ``get_task_by_plane_id`` returns + None and the anti-dup / uniqueness guards no longer collide. The row is NOT + deleted (durable audit). + + ADR-001 D4 refinement (ORCH-090): the ADR proposed keeping ``plane_issue_id`` + untouched for audit, but ``get_task_by_plane_id`` / ``create_task_atomic`` match + on ``plane_id OR plane_issue_id`` — leaving ``plane_issue_id`` matchable would + keep the cancelled row "findable" and BLOCK the clean-slate re-create (BR-3 / + TR-4). We therefore suffix it too; the ``#cancelled-`` tag is deterministic + and parseable, so the original Plane issue UUID (== the original ``plane_id`` in + every create path) is still fully recoverable for audit. + + Idempotent-safe: the suffix is only appended when not already present (a repeat + STOP on an already-cancelled row does not double-suffix). Returns True iff the + row was updated. never-raise -> False on error. + """ + try: + conn = get_db() + try: + row = conn.execute( + "SELECT plane_id, work_item_id, plane_issue_id FROM tasks WHERE id = ?", + (task_id,), + ).fetchone() + if not row: + return False + suffix = f"#cancelled-{task_id}" + + def _tomb(v): + v = v or "" + return v if suffix in v else f"{v}{suffix}" + + plane_id = _tomb(row["plane_id"]) + work_item_id = _tomb(row["work_item_id"]) + plane_issue_id = _tomb(row["plane_issue_id"]) + conn.execute( + "UPDATE tasks SET stage='cancelled', cancelled_at=datetime('now'), " + "cancel_requested_at=NULL, plane_id=?, work_item_id=?, plane_issue_id=?, " + "updated_at=datetime('now') WHERE id = ?", + (plane_id, work_item_id, plane_issue_id, task_id), + ) + conn.commit() + return True + finally: + conn.close() + except Exception: + return False + + +def set_task_cancel_requested(task_id: int) -> bool: + """ORCH-090 (ADR-001 D7): mark a deferred cancellation (STOP in critical window). + + Idempotent: only stamps ``cancel_requested_at`` the first time. The deterministic + deploy/merge finalizer reads it once the irreversible step completes and then + applies the full cancellation. never-raise -> False on error. + """ + try: + conn = get_db() + try: + conn.execute( + "UPDATE tasks SET cancel_requested_at=datetime('now') " + "WHERE id = ? AND cancel_requested_at IS NULL", + (task_id,), + ) + conn.commit() + return True + finally: + conn.close() + except Exception: + return False + + +def cancelled_tasks_snapshot(limit: int = 10) -> dict: + """ORCH-090 (AC-10): read-only cancellation summary for GET /queue. + + Returns ``{count, pending, recent}`` where ``count`` is the number of cancelled + tasks, ``pending`` the number with a deferred (not-yet-applied) cancellation, and + ``recent`` the last ``limit`` cancelled tasks. never-raise -> minimal dict. + """ + try: + conn = get_db() + try: + count = conn.execute( + "SELECT COUNT(*) FROM tasks WHERE stage='cancelled'" + ).fetchone()[0] + pending = conn.execute( + "SELECT COUNT(*) FROM tasks WHERE cancel_requested_at IS NOT NULL " + "AND stage != 'cancelled'" + ).fetchone()[0] + recent = [ + {"work_item_id": r["work_item_id"], "repo": r["repo"], + "cancelled_at": r["cancelled_at"]} + for r in conn.execute( + "SELECT work_item_id, repo, cancelled_at FROM tasks " + "WHERE stage='cancelled' ORDER BY cancelled_at DESC LIMIT ?", + (limit,), + ).fetchall() + ] + finally: + conn.close() + return {"count": int(count), "pending": int(pending), "recent": recent} + except Exception: + return {"count": 0, "pending": 0, "recent": []} + + def count_running_jobs() -> int: """Number of jobs currently in 'running' status (for max_concurrency).""" conn = get_db() @@ -815,7 +1007,7 @@ def reap_running_job( if error is not None: sets.append("error = ?") params.append(error) - if status in ("done", "failed"): + if status in ("done", "failed", "cancelled"): # ORCH-090: cancelled is terminal sets.append("finished_at = datetime('now')") elif status == "queued": sets.append("started_at = NULL") @@ -948,7 +1140,9 @@ def get_unfinished_dependencies(task_id: int) -> list[dict]: rows = conn.execute( "SELECT t.id AS id, t.work_item_id AS work_item_id, t.stage AS stage " "FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id " - "WHERE d.task_id = ? AND t.stage != 'done'", + # ORCH-090 (adr-0026): {done,cancelled} are both terminal -> a + # cancelled predecessor no longer blocks the dependent. + "WHERE d.task_id = ? AND t.stage NOT IN ('done','cancelled')", (task_id,), ).fetchall() finally: diff --git a/src/gitea.py b/src/gitea.py new file mode 100644 index 0000000..ec0d101 --- /dev/null +++ b/src/gitea.py @@ -0,0 +1,65 @@ +"""ORCH-090 (ADR-001 D8 / adr-0026): minimal Gitea branch helpers. + +Leaf module — a single never-raise helper used by the STOP-cancellation path to +delete a cancelled task's REMOTE feature branch. Deliberately tiny and dependency +-light (only ``config`` + ``httpx``) so it can be imported from the stage engine +without cycles. + +Self-hosting safety (NFR-3): this helper deletes ONLY the named feature branch +via the Gitea API. It NEVER touches ``main`` (a guard rejects it outright) and +NEVER force-pushes — there is no push path here at all. +""" +import logging + +import httpx + +from .config import settings + +logger = logging.getLogger("orchestrator.gitea") + +# Branches that must never be deleted by an automated cancel (self-hosting safety). +_PROTECTED_BRANCHES = {"main", "master"} + + +def delete_remote_branch(repo: str, branch: str) -> bool: + """Delete a remote feature branch in Gitea (never-raise). + + ``DELETE /api/v1/repos/{owner}/{repo}/branches/{branch}``. Used by + ``stage_engine.cancel_task`` to reset a cancelled task's progress (D8). A 404 + (branch already gone) is treated as success — the goal state (branch absent) is + reached. Returns True iff the branch is confirmed absent after the call. + + Guards: + * empty repo/branch -> no-op (False); + * a protected branch (``main``/``master``) -> refused with an error log + (NFR-3: STOP must never delete ``main``). + Any network/API error is logged and swallowed (the worktree is cleaned locally + regardless); returns False so the caller can note a best-effort miss. + """ + if not repo or not branch: + return False + if branch.strip().lower() in _PROTECTED_BRANCHES: + logger.error( + "delete_remote_branch REFUSED for protected branch %r in %s (self-hosting safety)", + branch, repo, + ) + return False + owner = settings.gitea_owner + url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/branches/{branch}" + headers = {"Authorization": f"token {settings.gitea_token}"} + try: + resp = httpx.delete(url, headers=headers, timeout=10) + if resp.status_code in (204, 200): + logger.info("Deleted remote branch %s in %s/%s", branch, owner, repo) + return True + if resp.status_code == 404: + logger.info("Remote branch %s already absent in %s/%s", branch, owner, repo) + return True + logger.warning( + "delete_remote_branch %s in %s/%s returned %s: %s", + branch, owner, repo, resp.status_code, resp.text[:200], + ) + return False + except Exception as e: # noqa: BLE001 - never-raise + logger.warning("delete_remote_branch error for %s/%s/%s: %s", owner, repo, branch, e) + return False diff --git a/src/job_reaper.py b/src/job_reaper.py index f71928c..1546d48 100644 --- a/src/job_reaper.py +++ b/src/job_reaper.py @@ -325,6 +325,16 @@ class JobReaper: attempts = int(job.get("attempts") or 0) max_attempts = int(job.get("max_attempts") or 2) err = f"reaped: {reason} (run_id={run_id})" + # ORCH-090 (adr-0026 / TR-2): the source of truth for "do not revive" is the + # task's TERMINAL stage, not the job status. If the task is already terminal + # ({done,cancelled}) — e.g. STOP flipped it to 'cancelled' while this job was + # still 'running' (dead pid) — flip the job to the terminal 'cancelled' + # outcome instead of requeueing it (closes the SIGTERM/reaper requeue race). + _branch, _stage, _wid = self._task_meta(job) + if _stage in ("done", "cancelled"): + if reap_running_job(job_id, "cancelled", run_id=run_id, error=err): + self._note_reap(job, "cancelled", reason=f"{reason} (task terminal={_stage})") + return if attempts < max_attempts: if reap_running_job(job_id, "queued", run_id=run_id, error=err): self._note_reap(job, "queued", reason=reason) diff --git a/src/main.py b/src/main.py index 48b484f..fdd9fa2 100644 --- a/src/main.py +++ b/src/main.py @@ -171,6 +171,7 @@ async def queue(): from . import task_deps from . import serial_gate from . import labels + from . import cancel from .disk_watchdog import disk_watchdog from .build_cache_pruner import build_cache_pruner return { @@ -191,6 +192,10 @@ async def queue(): # ORCH-089 (D7): auto-mode-by-label observability (read-only) — kill-switch, # label names, scope. Additive block. "auto_labels": labels.snapshot(), + # ORCH-090 (AC-10): STOP-cancellation observability (read-only) — kill-switch, + # repo scope, cancelled/deferred counts, recent cancellations. Additive block; + # never-raise. + "stop": cancel.snapshot(), # ORCH-063 (FR-6 / AC-7): disk-watchdog observability (read-only) — # enabled, threshold, interval, last measurement per host-path. Additive # block; never-raise (status() returns {"enabled": ...} minimum on error). diff --git a/src/merge_gate.py b/src/merge_gate.py index 2cb78ff..1341016 100644 --- a/src/merge_gate.py +++ b/src/merge_gate.py @@ -340,6 +340,21 @@ def release_merge_lease(repo: str, branch: str | None = None) -> None: logger.warning("merge-lease release error for %s: %s", repo, e) +def current_lease_holder(repo: str) -> str | None: + """ORCH-090: branch currently holding the per-repo merge-lease, or None. + + Read-only helper used by ``cancel.in_critical_window`` to decide whether a STOP + must be DEFERRED (the task is mid-merge). Never raises -> None on missing/corrupt + lease or any error (the caller treats an error as fail-CLOSED itself). + """ + try: + existing = _read_lease(_lease_path(repo)) + return existing.get("branch") if existing else None + except Exception as e: # noqa: BLE001 - never-raise + logger.warning("current_lease_holder error for %s: %s", repo, e) + return None + + # --------------------------------------------------------------------------- # ORCH-065: proactive stale/dead merge-lease reclaim (Problem B) # --------------------------------------------------------------------------- diff --git a/src/plane_sync.py b/src/plane_sync.py index 0bd2fd2..88f651e 100644 --- a/src/plane_sync.py +++ b/src/plane_sync.py @@ -148,6 +148,13 @@ _PLANE_NAME_TO_KEY: dict[str, str] = { # this board status (enduro / API fallback) fail-closed — no UUID, no # confirm-deploy branch, no KeyError (accessed via .get). "Confirm Deploy": "confirm_deploy", + # ORCH-090: dedicated operator "STOP" status — the cancel trigger. Like + # ORCH-059's Confirm Deploy it is INTENTIONALLY ABSENT from _DEFAULT_STATES + # (fail-closed): environments without the status (enduro / API fallback) + # resolve `stop` to None via .get -> the cancel branch simply never activates + # (no UUID, no KeyError, no blind cancel). Create a STOP status with the + # `cancelled` group on the board to enable it (07-infra-requirements.md). + "STOP": "stop", # ORCH-066: meaningful per-stage / human-input statuses (layer B). "To Analyse": "to_analyse", "Analysis": "analysis", diff --git a/src/queue_worker.py b/src/queue_worker.py index ab3984e..dd098d3 100644 --- a/src/queue_worker.py +++ b/src/queue_worker.py @@ -187,12 +187,18 @@ class QueueWorker: # launch error so the job does not wedge as 'running' forever. logger.error(f"Worker failed to launch job {job['id']}: {e}") try: - from .db import get_job, mark_job + from .db import get_job, mark_job, get_task j = get_job(job["id"]) attempts = j.get("attempts", 0) if j else 0 max_attempts = j.get("max_attempts", 2) if j else 2 - if attempts < max_attempts: + # ORCH-090 (adr-0026 / TR-2): never requeue a job whose task is + # already terminal ({done,cancelled}) — a STOP that landed between + # claim and launch must win over the retry budget. + task = get_task(job.get("task_id")) if job.get("task_id") else None + if task and task.get("stage") in ("done", "cancelled"): + mark_job(job["id"], "cancelled", error=f"launch error (task terminal): {e}") + elif attempts < max_attempts: mark_job(job["id"], "queued", error=f"launch error: {e}") else: mark_job(job["id"], "failed", error=f"launch error: {e}") diff --git a/src/serial_gate.py b/src/serial_gate.py index ae273b7..0675e98 100644 --- a/src/serial_gate.py +++ b/src/serial_gate.py @@ -110,14 +110,19 @@ def repo_has_active_task(repo: str, exclude_task_id: int | None = None) -> bool: try: conn = db.get_db() try: + # ORCH-090 (adr-0026): terminal set is {done,cancelled}. A cancelled + # task must NOT count as "active" or it would block the repo's serial + # gate forever. if exclude_task_id is not None: row = conn.execute( - "SELECT 1 FROM tasks WHERE repo=? AND id != ? AND stage != 'done' LIMIT 1", + "SELECT 1 FROM tasks WHERE repo=? AND id != ? " + "AND stage NOT IN ('done','cancelled') LIMIT 1", (repo, exclude_task_id), ).fetchone() else: row = conn.execute( - "SELECT 1 FROM tasks WHERE repo=? AND stage != 'done' LIMIT 1", + "SELECT 1 FROM tasks WHERE repo=? " + "AND stage NOT IN ('done','cancelled') LIMIT 1", (repo,), ).fetchone() return row is not None @@ -264,10 +269,12 @@ def build_claim_clause() -> str: repo_scope = f"AND jobs.repo IN ({repo_in}) " else: repo_scope = "" + # ORCH-090 (adr-0026): {done,cancelled} are both terminal — an EARLIER + # cancelled task no longer holds the FIFO serial gate closed. active_clause = ( "EXISTS (SELECT 1 FROM tasks t2 " "WHERE t2.repo = jobs.repo AND t2.id < jobs.task_id " - "AND t2.stage != 'done') " + "AND t2.stage NOT IN ('done','cancelled')) " ) if _freeze_layer_enabled(): freeze_clause = ( @@ -329,9 +336,10 @@ def _per_repo_snapshot(repo: str) -> dict: try: conn = db.get_db() try: + # ORCH-090 (adr-0026): terminal set {done,cancelled}. row = conn.execute( "SELECT work_item_id, stage FROM tasks " - "WHERE repo=? AND stage != 'done' ORDER BY id LIMIT 1", + "WHERE repo=? AND stage NOT IN ('done','cancelled') ORDER BY id LIMIT 1", (repo,), ).fetchone() if row: diff --git a/src/stage_engine.py b/src/stage_engine.py index c088604..fac7819 100644 --- a/src/stage_engine.py +++ b/src/stage_engine.py @@ -1656,6 +1656,28 @@ def run_deploy_finalizer(job: dict): finished_agent="deployer", ) + # ORCH-090 (ADR-001 D7 / AC-7): a STOP that arrived during the prod deploy was + # DEFERRED (cancel_requested_at). The irreversible step has now finished honestly + # above, so apply the deferred cancellation. force=True bypasses ONLY the + # critical-window guard (the INITIATED marker may still linger) — a task that + # reached terminal 'done' (SUCCESS) is an honest no-op (code is already in prod); + # a FAILED deploy rolled back to development is fully reset now. + try: + from .db import get_task as _get_task + t = _get_task(task_id) + if t and t.get("cancel_requested_at") and t.get("stage") != "cancelled": + logger.warning( + "Task %s: applying deferred STOP after deploy finalize", task_id + ) + cancel_task( + task_id, + reason="deferred STOP applied after deploy finalize", + source="deferred", + force=True, + ) + except Exception as e: # noqa: BLE001 - never break the finalizer + logger.warning("Task %s: deferred-cancel application failed: %s", task_id, e) + def run_post_deploy_monitor(job: dict): """ORCH-021 — one post-deploy monitor tick (reserved-agent, no LLM). @@ -1825,3 +1847,182 @@ def _notify_post_deploy(work_item_id: str, message: str) -> None: plane_add_comment(work_item_id, message, author="deployer") except Exception as e: # noqa: BLE001 - never break the tick logger.warning(f"post-deploy notify plane failed for {work_item_id}: {e}") + + +# --------------------------------------------------------------------------- +# ORCH-090 (ADR-001 / adr-0026): STOP-cancellation orchestration +# --------------------------------------------------------------------------- + +def cancel_task( + task_id: int, + *, + reason: str = "", + source: str = "stop", + force: bool = False, +) -> dict: + """Cancel a task: stop the active agent + full progress reset (ORCH-090). + + The single orchestration point behind the Plane STOP status (``webhooks/plane. + handle_stop``). Drives the task to the system-terminal state ``cancelled``: + + 1. **Idempotency (BR-5 / AC-6):** an absent task or one already terminal + (``stage in {done,cancelled}``) is a no-op — no re-kill, no re-cleanup, no + duplicate notification. + 2. **Critical window (ADR-001 D7 / AC-7):** if the task is mid merge/deploy + (``cancel.in_critical_window``) and not ``force``, the cancellation is + DEFERRED: stamp ``cancel_requested_at``, cancel ONLY queued jobs (never the + running deploy/merge actor), alert, and return — the deterministic deploy + finalizer applies the cancel once the irreversible step finishes honestly. + STOP NEVER touches ``main`` / force-pushes / restarts the prod container. + 3. **Full reset:** SIGTERM the running agent through the graceful cascade + (``launcher.stop_process``), cancel all jobs (terminal ``cancelled``), + clear deploy-state + release a held merge-lease (best-effort), remove the + worktree, delete the remote feature branch, then tombstone the natural keys + + flip ``stage='cancelled'`` (durable). Docs artefacts are NOT touched. + 4. **Observability (AC-10):** log + Telegram + Plane comment + tracker update. + + ``force=True`` bypasses ONLY the critical-window guard (used by the deploy + finalizer to apply a deferred cancel after the step completes) — it never + overrides the terminal-stage idempotency. Returns a small result dict for + tests/observability. never-raise: any error is logged; a notify failure never + aborts the cancellation. + """ + from .db import ( + get_task, get_active_jobs_for_task, cancel_jobs_for_task, + mark_task_cancelled, set_task_cancel_requested, + ) + from . import cancel as cancel_mod + + result = {"ok": False, "task_id": task_id, "deferred": False, + "stopped": 0, "cancelled_jobs": 0, "note": None} + + task = get_task(task_id) + if not task: + result["note"] = "no-task" + logger.info("cancel_task: no task row for task_id=%s", task_id) + return result + + stage = task.get("stage") + repo = task.get("repo") + branch = task.get("branch") or "" + work_item_id = task.get("work_item_id") or "" + + # (1) Idempotency: already terminal -> no-op. + if stage in ("done", "cancelled"): + result["ok"] = True + result["note"] = f"already-terminal:{stage}" + logger.info( + "cancel_task: task %s (%s) already terminal (stage=%s) -> no-op", + task_id, work_item_id, stage, + ) + return result + + # (2) Critical merge/deploy window -> DEFER (unless forced by the finalizer). + if not force and cancel_mod.in_critical_window(task): + set_task_cancel_requested(task_id) + result["cancelled_jobs"] = cancel_jobs_for_task(task_id, only_queued=True) + result["deferred"] = True + result["ok"] = True + result["note"] = "deferred-critical-window" + msg = ( + f"⏸️ {link_for(work_item_id)}: STOP получен во время " + f"критичного шага (merge/deploy) — отмена ОТЛОЖЕНА до честного " + f"завершения шага. main/прод не трогаются." + ) + _notify_cancel(work_item_id, task_id, msg) + logger.warning( + "cancel_task: task %s (%s) in critical window -> deferred cancel " + "(queued jobs cancelled=%s)", task_id, work_item_id, result["cancelled_jobs"], + ) + return result + + # (3) Full reset ---------------------------------------------------------- + # 3a. Stop the active agent through the graceful cascade (AC-1). Capture the + # running jobs BEFORE cancelling them so we still know their pids. + stopped = 0 + try: + from .agents.launcher import launcher + for job in get_active_jobs_for_task(task_id): + if job.get("status") == "running" and job.get("pid"): + try: + if launcher.stop_process( + job["pid"], job.get("run_id"), reason=f"STOP cancel task {task_id}" + ): + stopped += 1 + except Exception as e: # noqa: BLE001 - never-raise + logger.warning("cancel_task: stop_process failed for job %s: %s", + job.get("id"), e) + except Exception as e: # noqa: BLE001 - never-raise + logger.warning("cancel_task: agent-stop step failed for task %s: %s", task_id, e) + result["stopped"] = stopped + + # 3b. Cancel ALL jobs (terminal 'cancelled', never requeued). + result["cancelled_jobs"] = cancel_jobs_for_task(task_id) + + # 3c. Clear deploy-state sentinels + release a held merge-lease (best-effort). + # Outside a critical window the task does not hold the lease / has no + # INITIATED marker, but clearing is idempotent and harmless. + try: + self_deploy.clear_state(repo, work_item_id) + except Exception as e: # noqa: BLE001 + logger.warning("cancel_task: clear deploy-state failed for %s: %s", work_item_id, e) + try: + merge_gate.release_merge_lease(repo, branch) + except Exception as e: # noqa: BLE001 + logger.warning("cancel_task: merge-lease release failed for %s: %s", branch, e) + + # 3d. Remove the worktree + delete the remote feature branch (never main). + if branch: + try: + from .git_worktree import remove_worktree + remove_worktree(repo, branch) + except Exception as e: # noqa: BLE001 - never-raise + logger.warning("cancel_task: remove_worktree failed for %s/%s: %s", + repo, branch, e) + try: + from . import gitea + gitea.delete_remote_branch(repo, branch) + except Exception as e: # noqa: BLE001 - never-raise + logger.warning("cancel_task: delete_remote_branch failed for %s/%s: %s", + repo, branch, e) + + # 3e. Durable terminal + natural-key tombstone (docs artefacts untouched). + mark_task_cancelled(task_id) + + # (4) Observability. + note = f" ({reason})" if reason else "" + msg = ( + f"\U0001f6d1 {link_for(work_item_id)}: задача ОТМЕНЕНА (STOP){note}. " + f"Агент остановлен, job'ы сняты ({result['cancelled_jobs']}), ветка/worktree " + f"удалены, прогресс сброшен. Docs сохранены. Перезапуск — только «To Analyse»." + ) + _notify_cancel(work_item_id, task_id, msg) + result["ok"] = True + result["note"] = "cancelled" if not force else "cancelled-deferred-applied" + logger.warning( + "cancel_task: task %s (%s, repo=%s) CANCELLED (source=%s, force=%s): " + "stopped=%s, cancelled_jobs=%s", task_id, work_item_id, repo, source, force, + stopped, result["cancelled_jobs"], + ) + return result + + +def _notify_cancel(work_item_id: str, task_id: int, message: str) -> None: + """Best-effort Telegram + Plane comment + tracker update for a cancellation. + + Never raises — a notification failure must not abort the cancel (ORCH-090 FR-8). + """ + try: + send_telegram(message) + except Exception as e: # noqa: BLE001 + logger.warning("cancel notify telegram failed for %s: %s", work_item_id, e) + if work_item_id: + try: + plane_add_comment(work_item_id, message, author="deployer") + except Exception as e: # noqa: BLE001 + logger.warning("cancel notify plane failed for %s: %s", work_item_id, e) + try: + from .notifications import update_task_tracker + update_task_tracker(task_id) + except Exception as e: # noqa: BLE001 + logger.warning("cancel notify tracker failed for task %s: %s", task_id, e) diff --git a/src/stages.py b/src/stages.py index 408e3ab..0fad74b 100644 --- a/src/stages.py +++ b/src/stages.py @@ -19,6 +19,13 @@ STAGE_TRANSITIONS = { "deploy-staging": {"next": "deploy", "agent": "deployer", "qg": "check_staging_status"}, "deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"}, "done": {"next": None, "agent": None, "qg": None}, + # ORCH-090 (adr-0026): system-terminal sink for a STOP-cancelled task. This is + # NOT a new pipeline edge — no exit-gate of any edge changes — it only makes + # get_next_stage('cancelled') correctly return None (parallel to 'done'). The + # scheduler terminal predicate is `stage IN ('done','cancelled')`; the points + # that recognise it carry the ORCH-090 marker (serial_gate / task_deps / + # reconciler / job_reaper). + "cancelled": {"next": None, "agent": None, "qg": None}, } diff --git a/src/task_deps.py b/src/task_deps.py index 97c1353..418e032 100644 --- a/src/task_deps.py +++ b/src/task_deps.py @@ -37,9 +37,12 @@ def is_task_ready(task_id: int) -> tuple[bool, list[str]]: """Return ``(ready, waiting_on)`` for a task. ``ready`` is True when the task has no declared dependency whose predecessor - is still un-done (``tasks.stage != 'done'``). ``waiting_on`` is the list of - predecessor work-item ids (e.g. ``["ORCH-010"]``) the task is still blocked - by — used for the Telegram waiting-line / Plane visibility. + is still un-done. ORCH-090 (adr-0026): the terminal set is + ``{done, cancelled}`` — a CANCELLED predecessor is terminal and no longer + blocks the dependent (the actual SQL predicate lives in + ``db.get_unfinished_dependencies`` / ``db.claim_next_job``). ``waiting_on`` is + the list of predecessor work-item ids (e.g. ``["ORCH-010"]``) the task is still + blocked by — used for the Telegram waiting-line / Plane visibility. never-raise: any error -> ``(True, [])`` (fail OPEN — consistent with the scheduler omitting the gate when the DB read fails; a transient error must diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index 597011f..c632678 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -160,8 +160,15 @@ async def handle_issue_updated(data: dict, project_id: str = ""): # fallback) resolve to None, so the branch simply never activates (no KeyError, # no blind deploy). Checked before `approved` so the two gestures never alias. confirm_state = proj_states.get("confirm_deploy") + # ORCH-090: dedicated operator STOP status -> cancel the task (stop agent + full + # reset). fail-closed via .get (no UUID on a board without the status -> None -> + # branch never activates, exactly like confirm_deploy). Checked FIRST so a STOP + # is never aliased by to_analyse/approved/rejected. + stop_state = proj_states.get("stop") # ORCH-066: start/resume trigger is `To Analyse` (human entry-point). - if new_state == proj_states["to_analyse"]: + if stop_state and new_state == stop_state: + await handle_stop(data, project_id) + elif new_state == proj_states["to_analyse"]: await handle_status_start(data, project_id) elif confirm_state and new_state == confirm_state: await handle_confirm_deploy(data, project_id) @@ -212,6 +219,44 @@ async def handle_confirm_deploy(data: dict, project_id: str = ""): ) +async def handle_stop(data: dict, project_id: str = ""): + """ORCH-090: a human flipped the issue to the dedicated STOP status — cancel + the task (stop the active agent + full progress reset). + + Resolves the task by plane_id and delegates to the unified + ``stage_engine.cancel_task`` (run off the event loop via asyncio.to_thread — it + is synchronous and may sleep during the graceful SIGTERM cascade). Guards: + * kill-switch / repo-scope via ``cancel.applies(repo)`` (False -> no-op-log); + * idempotent — an absent / already-terminal task is a no-op inside cancel_task. + Contract is never-raise (NFR-5): any error is logged, the webhook flow never + crashes. + """ + import asyncio + from .. import cancel + from ..stage_engine import cancel_task + + plane_id = str(data.get("id") or "") + task = get_task_by_plane_id(plane_id) + if not task: + logger.info(f"STOP for {plane_id} but no task found, ignoring (no-op)") + return + + task_id = task["id"] + repo = task.get("repo", "") + if not cancel.applies(repo): + logger.info( + f"STOP for {plane_id} (task {task_id}, repo={repo}) but cancellation is " + f"not applicable (kill-switch off / out of scope); no-op" + ) + return + + logger.info(f"Task {task_id}: STOP status -> cancelling (stop agent + full reset)") + try: + await asyncio.to_thread(cancel_task, task_id, reason="Plane STOP status", source="stop") + except Exception as e: # never-raise: the webhook flow must not crash + logger.error(f"STOP handling failed for task {task_id}: {e}") + + async def handle_status_start(data: dict, project_id: str = ""): """An issue moved into In Progress. @@ -279,6 +324,36 @@ async def handle_status_start(data: dict, project_id: str = ""): ) return + # ORCH-090 (ADR-001 D6 / AC-5): close the relaunch hole. The legitimate "answer + # to Needs Input" resume is owned ONLY by the analyst (ORCH-066 — the sole + # Needs-Input setter). A manual move of an EXISTING task at any OTHER stage to + # "To Analyse" must NOT silently relaunch the mid-pipeline agent on the old + # branch (the incident pattern). Gate the relaunch to `analysis`; any other + # stage -> no-op-with-log + a best-effort Plane hint to use STOP -> To Analyse + # for a clean-slate restart. Under the kill-switch off this gate is inert + # (behaviour 1:1 as before ORCH-090). + from ..config import settings as _settings + if getattr(_settings, "stop_status_enabled", False) and current_stage != "analysis": + logger.info( + f"Status->To Analyse for {plane_id}: existing task on stage " + f"'{current_stage}' — NOT relaunching {stage_agent} (relaunch-hole closed, " + f"ORCH-090). Use STOP then To Analyse to restart from scratch." + ) + try: + _add_comment( + work_item_id, + "ℹ️ Перезапуск " + "агента сменой " + "рабочего статуса " + "отключён (ORCH-090). Для " + "перезапуска с нуля: " + "STOP → To Analyse.", + author=stage_agent, + ) + except Exception as e: + logger.error(f"Failed to post relaunch-hole comment for {work_item_id}: {e}") + return + task_desc = ( f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" f"Stage: {current_stage}\nNote: Stakeholder returned the issue to In " diff --git a/tests/test_auto_labels_invariants.py b/tests/test_auto_labels_invariants.py index 11d0c11..25eb05a 100644 --- a/tests/test_auto_labels_invariants.py +++ b/tests/test_auto_labels_invariants.py @@ -13,9 +13,10 @@ os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") def test_tc26_stage_transitions_unchanged(): from src.stages import STAGE_TRANSITIONS + # ORCH-090 (adr-0026): `cancelled` terminal sink added (parallel to `done`). assert set(STAGE_TRANSITIONS) == { "created", "analysis", "architecture", "development", "review", - "testing", "deploy-staging", "deploy", "done", + "testing", "deploy-staging", "deploy", "done", "cancelled", } # The two human gates still use their existing QG names (unchanged). assert STAGE_TRANSITIONS["analysis"]["qg"] == "check_analysis_approved" diff --git a/tests/test_config.py b/tests/test_config.py index ea4d0cf..697864b 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -219,11 +219,15 @@ def test_reaper_settings_env_override(monkeypatch): # check_branch_mergeable signature is intact (AC-13). # --------------------------------------------------------------------------- def test_tc19_stage_transitions_unchanged(): - """No new pipeline stage was introduced by ORCH-065.""" + """No new pipeline EDGE was introduced by ORCH-065. + + ORCH-090 (adr-0026) adds `cancelled` as a terminal SINK (parallel to `done`), + which is not a new edge — no exit-gate of any edge changed. + """ from src.stages import STAGE_TRANSITIONS assert set(STAGE_TRANSITIONS) == { "created", "analysis", "architecture", "development", "review", - "testing", "deploy-staging", "deploy", "done", + "testing", "deploy-staging", "deploy", "done", "cancelled", } diff --git a/tests/test_plane_status_model.py b/tests/test_plane_status_model.py index 268dbf1..33edd00 100644 --- a/tests/test_plane_status_model.py +++ b/tests/test_plane_status_model.py @@ -125,6 +125,8 @@ def test_tc22_stage_transitions_unchanged(): "deploy-staging": {"next": "deploy", "agent": "deployer", "qg": "check_staging_status"}, "deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"}, "done": {"next": None, "agent": None, "qg": None}, + # ORCH-090 (adr-0026): terminal SINK for a STOP-cancelled task. + "cancelled": {"next": None, "agent": None, "qg": None}, } diff --git a/tests/test_qg_registry_snapshot.py b/tests/test_qg_registry_snapshot.py index 0067f7b..1c8c44a 100644 --- a/tests/test_qg_registry_snapshot.py +++ b/tests/test_qg_registry_snapshot.py @@ -56,6 +56,9 @@ _EXPECTED_TRANSITIONS = { "deploy-staging": {"next": "deploy", "agent": "deployer", "qg": "check_staging_status"}, "deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"}, "done": {"next": None, "agent": None, "qg": None}, + # ORCH-090 (adr-0026): terminal SINK for a STOP-cancelled task (parallel to + # `done`; not a new edge — no exit-gate changed). + "cancelled": {"next": None, "agent": None, "qg": None}, } diff --git a/tests/test_serial_gate.py b/tests/test_serial_gate.py index ada6b8b..dfa8b97 100644 --- a/tests/test_serial_gate.py +++ b/tests/test_serial_gate.py @@ -180,9 +180,11 @@ def test_snapshot_shape_and_never_raises(monkeypatch): def test_registries_unchanged(): from src.stages import STAGE_TRANSITIONS from src.qg.checks import QG_CHECKS + # ORCH-090 (adr-0026): `cancelled` is added as a terminal SINK (parallel to + # `done`), NOT a new pipeline edge — serial-gate FIFO semantics are unchanged. assert set(STAGE_TRANSITIONS) == { "created", "analysis", "architecture", "development", "review", - "testing", "deploy-staging", "deploy", "done", + "testing", "deploy-staging", "deploy", "done", "cancelled", } # No serial-gate QG check was introduced (the gate is a scheduler condition). assert not any("serial" in k for k in QG_CHECKS), "no new QG check expected" diff --git a/tests/test_stages_invariants.py b/tests/test_stages_invariants.py index a800cfb..171581a 100644 --- a/tests/test_stages_invariants.py +++ b/tests/test_stages_invariants.py @@ -39,6 +39,9 @@ _EXPECTED_TRANSITIONS = { "deploy-staging": {"next": "deploy", "agent": "deployer", "qg": "check_staging_status"}, "deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"}, "done": {"next": None, "agent": None, "qg": None}, + # ORCH-090 (adr-0026): terminal SINK for a STOP-cancelled task (parallel to + # `done`; not a new edge — no exit-gate changed). + "cancelled": {"next": None, "agent": None, "qg": None}, } diff --git a/tests/test_stop_status.py b/tests/test_stop_status.py new file mode 100644 index 0000000..986f8b2 --- /dev/null +++ b/tests/test_stop_status.py @@ -0,0 +1,454 @@ +"""ORCH-090 — STOP-status task cancellation + relaunch-hole close (unit + integ). + +Covers 04-test-plan.yaml TC-01..TC-14 + the ADR-001 D7 deferred-cancel path: + + TC-01 STOP recognised + routed to handle_stop; unknown task -> no-op, never-raise. + TC-02 active agent stopped via launcher.stop_process by jobs.pid; idle -> no-op. + TC-03 queued+running jobs of the task -> terminal 'cancelled'; claim skips them. + TC-04 reaper does NOT requeue a job of a terminal (cancelled) task. + TC-05 full reset: remove_worktree + delete_remote_branch called; main untouched. + TC-06 docs artefacts (and the task row) survive the reset. + TC-07 idempotency: STOP on cancelled / done / missing -> no-op, no exception. + TC-08 kill-switch off -> STOP inert; relaunch-hole gate inert. + TC-09 GET /queue carries a read-only `stop` block; never-raise. + TC-10 relaunch-hole closed: manual To Analyse on a mid-pipeline task -> no job. + TC-11 To Analyse on analysis (idle) relaunches analyst; new task -> start_pipeline. + TC-12 terminal-skip / restart-safe: reconciler skips a cancelled task; cancelled + jobs are not revived by requeue_running_jobs. + TC-13 e2e STOP: agent stopped, jobs cancelled, branch/worktree removed, durable + 'cancelled', keys tombstoned, notifications fired. + TC-14 additive DB migration is idempotent (re-init_db) + columns present. + D7 STOP in a critical merge/deploy window is DEFERRED, then applied by the + deploy finalizer. +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_stop_status.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import ( # noqa: E402 + init_db, get_db, claim_next_job, get_task, + cancel_jobs_for_task, mark_task_cancelled, get_task_by_plane_id, + requeue_running_jobs, get_job, +) +from src import config as cfg # noqa: E402 +from src import cancel as cancel_mod # noqa: E402 +from src import stage_engine # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "stop.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + # STOP feature ON, all repos. Isolate repos_dir so the critical-window probe + # (deploy markers / merge-lease) sees a clean tree by default. + monkeypatch.setattr(cfg.settings, "stop_status_enabled", True, raising=False) + monkeypatch.setattr(cfg.settings, "stop_status_repos", "", raising=False) + monkeypatch.setattr(cfg.settings, "repos_dir", str(tmp_path / "repos"), raising=False) + monkeypatch.setattr(cfg.settings, "host_repos_dir", str(tmp_path / "repos"), raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False) + monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False) + # Silence network side effects of cancel notifications. + monkeypatch.setattr("src.stage_engine.plane_add_comment", lambda *a, **k: None, raising=False) + monkeypatch.setattr("src.notifications.update_task_tracker", lambda *a, **k: None, raising=False) + init_db() + yield + + +# --------------------------------------------------------------------------- helpers +def _make_task(plane_id, work_item_id, stage="development", repo="orchestrator", + branch=None): + branch = branch or f"feature/{work_item_id}-slug" + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (plane_id, work_item_id, repo, branch, stage, plane_id, work_item_id), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +def _make_job(task_id, repo="orchestrator", agent="developer", status="running", + pid=None, run_id=None, attempts=1, max_attempts=2): + conn = get_db() + cur = conn.execute( + "INSERT INTO jobs (agent, repo, task_id, status, pid, run_id, attempts, max_attempts) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (agent, repo, task_id, status, pid, run_id, attempts, max_attempts), + ) + jid = cur.lastrowid + conn.commit() + conn.close() + return jid + + +def _job_status(job_id): + j = get_job(job_id) + return j["status"] if j else None + + +def _stub_full_reset(monkeypatch): + """Stub the side-effecting cleanup steps (signals / git / gitea) of a full reset.""" + calls = {"stop": [], "worktree": [], "branch": []} + from src.agents.launcher import launcher + + def _stop(pid, run_id, *, reason="stop"): + calls["stop"].append((pid, run_id, reason)) + return True + monkeypatch.setattr(launcher, "stop_process", _stop, raising=True) + monkeypatch.setattr("src.git_worktree.remove_worktree", + lambda repo, branch: calls["worktree"].append((repo, branch)), + raising=True) + monkeypatch.setattr("src.gitea.delete_remote_branch", + lambda repo, branch: calls["branch"].append((repo, branch)) or True, + raising=True) + return calls + + +# =========================================================================== TC-01 +@pytest.mark.asyncio +async def test_tc01_stop_routed_and_unknown_is_noop(monkeypatch): + from src.webhooks import plane as plane_wh + + proj_states = { + "stop": "STOP-UUID", "to_analyse": "TA-UUID", "approved": "AP-UUID", + "rejected": "RJ-UUID", "confirm_deploy": None, + } + monkeypatch.setattr("src.plane_sync.get_project_states", lambda pid: proj_states) + seen = [] + + async def _stub_stop(data, project_id=""): + seen.append(data.get("id")) + monkeypatch.setattr(plane_wh, "handle_stop", _stub_stop) + + # STOP state -> routed to handle_stop. + await plane_wh.handle_issue_updated({"id": "PL-1", "state": {"id": "STOP-UUID"}}, "proj") + assert seen == ["PL-1"] + + # A non-STOP state does not route to handle_stop. + await plane_wh.handle_issue_updated({"id": "PL-2", "state": {"id": "AP-UUID"}}, "proj") + assert seen == ["PL-1"] + + # Unknown task on the real handler -> no-op, never raises. + await plane_wh.handle_stop({"id": "does-not-exist"}, "proj") + + +# =========================================================================== TC-02 +def test_tc02_stop_active_agent_by_pid(monkeypatch): + calls = _stub_full_reset(monkeypatch) + tid = _make_task("PL-10", "ORCH-310", stage="development") + _make_job(tid, status="running", pid=4242, run_id=77) + + res = stage_engine.cancel_task(tid) + assert res["ok"] and not res["deferred"] + assert calls["stop"] == [(4242, 77, f"STOP cancel task {tid}")] + assert res["stopped"] == 1 + + +def test_tc02_idle_agent_no_stop(monkeypatch): + calls = _stub_full_reset(monkeypatch) + tid = _make_task("PL-11", "ORCH-311", stage="development") + _make_job(tid, status="queued", pid=None) # no running process + + res = stage_engine.cancel_task(tid) + assert res["ok"] and res["stopped"] == 0 + assert calls["stop"] == [] + + +# =========================================================================== TC-03 +def test_tc03_jobs_cancelled_and_claim_skips(monkeypatch): + _stub_full_reset(monkeypatch) + tid = _make_task("PL-20", "ORCH-320", stage="development") + jq = _make_job(tid, status="queued") + jr = _make_job(tid, status="running", pid=None) + + stage_engine.cancel_task(tid) + assert _job_status(jq) == "cancelled" + assert _job_status(jr) == "cancelled" + # claim_next_job selects only status='queued' -> a cancelled job is never claimed. + assert claim_next_job() is None + + +def test_tc03_cancel_jobs_helper_only_queued(monkeypatch): + tid = _make_task("PL-21", "ORCH-321") + jq = _make_job(tid, status="queued") + jr = _make_job(tid, status="running", pid=None) + n = cancel_jobs_for_task(tid, only_queued=True) + assert n == 1 + assert _job_status(jq) == "cancelled" + assert _job_status(jr) == "running" # the running deploy/merge actor is left alone + + +# =========================================================================== TC-04 +def test_tc04_reaper_does_not_requeue_terminal_task(monkeypatch): + from src.job_reaper import JobReaper + tid = _make_task("PL-30", "ORCH-330", stage="development") + jid = _make_job(tid, status="running", pid=999999, attempts=1, max_attempts=2) + # Task is flipped to cancelled (as STOP would) while the job is still running. + mark_task_cancelled(tid) + + reaper = JobReaper() + job = get_job(jid) + reaper._reap_unknown_outcome(job, reason="dead pid") + # NOT requeued (attempts terminal 'cancelled'. + assert _job_status(jid) == "cancelled" + + +# =========================================================================== TC-05 +def test_tc05_full_reset_removes_branch_and_worktree(monkeypatch): + calls = _stub_full_reset(monkeypatch) + tid = _make_task("PL-40", "ORCH-340", stage="review", branch="feature/ORCH-340-x") + + stage_engine.cancel_task(tid) + assert calls["worktree"] == [("orchestrator", "feature/ORCH-340-x")] + assert calls["branch"] == [("orchestrator", "feature/ORCH-340-x")] + + +def test_tc05_delete_remote_branch_refuses_main(): + from src import gitea + # main is never deletable by the cancel path (self-hosting safety, NFR-3). + assert gitea.delete_remote_branch("orchestrator", "main") is False + assert gitea.delete_remote_branch("orchestrator", "master") is False + + +# =========================================================================== TC-06 +def test_tc06_docs_and_task_row_survive(monkeypatch, tmp_path): + _stub_full_reset(monkeypatch) + tid = _make_task("PL-50", "ORCH-350", stage="development") + # A stand-in docs artefact: cancel must not delete it. + docs = tmp_path / "docs" / "work-items" / "ORCH-350" + docs.mkdir(parents=True) + (docs / "02-trz.md").write_text("trz") + + stage_engine.cancel_task(tid) + assert (docs / "02-trz.md").exists(), "docs artefacts must be preserved" + # The task ROW is kept (durable audit), flipped to cancelled. + assert get_task(tid)["stage"] == "cancelled" + + +# =========================================================================== TC-07 +def test_tc07_idempotent_on_cancelled_done_missing(monkeypatch): + calls = _stub_full_reset(monkeypatch) + # already cancelled + tid = _make_task("PL-60", "ORCH-360", stage="cancelled") + res = stage_engine.cancel_task(tid) + assert res["ok"] and res["note"].startswith("already-terminal") + assert calls["stop"] == [] and calls["branch"] == [] + # done + tid2 = _make_task("PL-61", "ORCH-361", stage="done") + res2 = stage_engine.cancel_task(tid2) + assert res2["note"].startswith("already-terminal") + # missing + res3 = stage_engine.cancel_task(999999) + assert res3["note"] == "no-task" + + +# =========================================================================== TC-08 +def test_tc08_kill_switch_off_inert(monkeypatch): + monkeypatch.setattr(cfg.settings, "stop_status_enabled", False, raising=False) + assert cancel_mod.applies("orchestrator") is False + + +@pytest.mark.asyncio +async def test_tc08_kill_switch_off_handle_stop_noop(monkeypatch): + monkeypatch.setattr(cfg.settings, "stop_status_enabled", False, raising=False) + calls = _stub_full_reset(monkeypatch) + from src.webhooks import plane as plane_wh + tid = _make_task("PL-70", "ORCH-370", stage="development") + _make_job(tid, status="running", pid=4242) + await plane_wh.handle_stop({"id": "PL-70"}, "proj") + # Nothing was cancelled (kill-switch off -> applies() False -> no-op). + assert calls["stop"] == [] + assert get_task(tid)["stage"] == "development" + + +def test_tc08_scope_csv(monkeypatch): + monkeypatch.setattr(cfg.settings, "stop_status_repos", "enduro-trails", raising=False) + assert cancel_mod.applies("enduro-trails") is True + assert cancel_mod.applies("orchestrator") is False + + +# =========================================================================== TC-09 +def test_tc09_queue_has_stop_block_and_keeps_keys(monkeypatch): + import asyncio + from src import main + payload = asyncio.run(main.queue()) + for key in ("counts", "serial_gate", "task_deps", "auto_labels", "recent"): + assert key in payload, f"existing /queue key '{key}' preserved" + assert "stop" in payload + blk = payload["stop"] + assert blk["enabled"] is True + assert "repos" in blk and "cancelled_count" in blk and "recent" in blk + + +def test_tc09_snapshot_never_raises(monkeypatch): + # Force a DB error inside the snapshot -> minimal dict, no raise. + monkeypatch.setattr("src.db.cancelled_tasks_snapshot", + lambda *a, **k: (_ for _ in ()).throw(RuntimeError("boom"))) + snap = cancel_mod.snapshot() + assert snap["enabled"] is True and snap["cancelled_count"] == 0 + + +# =========================================================================== TC-10 +@pytest.mark.asyncio +async def test_tc10_relaunch_hole_closed_midpipeline(monkeypatch): + from src.webhooks import plane as plane_wh + monkeypatch.setattr("src.plane_sync.add_comment", lambda *a, **k: None, raising=False) + monkeypatch.setattr("src.plane_sync.set_issue_analysis", lambda *a, **k: None, raising=False) + tid = _make_task("PL-80", "ORCH-380", stage="development") + + await plane_wh.handle_status_start({"id": "PL-80"}, "proj") + # No stage agent was relaunched (no job created) for a mid-pipeline task. + conn = get_db() + n = conn.execute("SELECT COUNT(*) FROM jobs WHERE task_id=?", (tid,)).fetchone()[0] + conn.close() + assert n == 0 + + +# =========================================================================== TC-11 +@pytest.mark.asyncio +async def test_tc11_analysis_idle_relaunches_analyst(monkeypatch): + from src.webhooks import plane as plane_wh + monkeypatch.setattr("src.plane_sync.add_comment", lambda *a, **k: None, raising=False) + monkeypatch.setattr("src.plane_sync.set_issue_analysis", lambda *a, **k: None, raising=False) + tid = _make_task("PL-90", "ORCH-390", stage="analysis") + + await plane_wh.handle_status_start({"id": "PL-90"}, "proj") + conn = get_db() + rows = conn.execute("SELECT agent FROM jobs WHERE task_id=?", (tid,)).fetchall() + conn.close() + assert [r[0] for r in rows] == ["analyst"], "analyst resume is still legitimate" + + +@pytest.mark.asyncio +async def test_tc11_new_task_starts_pipeline(monkeypatch): + from src.webhooks import plane as plane_wh + started = [] + + async def _stub_start(data, project_id=""): + started.append(data.get("id")) + monkeypatch.setattr(plane_wh, "start_pipeline", _stub_start) + await plane_wh.handle_status_start({"id": "PL-NEW"}, "proj") + assert started == ["PL-NEW"] # the ONLY pipeline-start entry point + + +# =========================================================================== TC-12 +def test_tc12_reconciler_skips_cancelled(monkeypatch): + from src.reconciler import Reconciler + # Avoid any Plane network in the gate pass. + monkeypatch.setattr("src.reconciler.fetch_issue_state", + lambda *a, **k: (_ for _ in ()).throw(AssertionError("no net")), + raising=False) + tid = _make_task("PL-100", "ORCH-400", stage="development") + mark_task_cancelled(tid) + rec = Reconciler() + rec.reconcile_gate_once() + assert rec.skipped_terminal_total == 1 + + +def test_tc12_requeue_running_does_not_revive_cancelled(monkeypatch): + tid = _make_task("PL-101", "ORCH-401", stage="development") + jc = _make_job(tid, status="running", pid=None) + cancel_jobs_for_task(tid) # -> cancelled + assert _job_status(jc) == "cancelled" + # Startup recovery flips only 'running' jobs; a cancelled job is untouched. + requeue_running_jobs() + assert _job_status(jc) == "cancelled" + + +# =========================================================================== TC-13 +def test_tc13_end_to_end_stop(monkeypatch): + calls = _stub_full_reset(monkeypatch) + tid = _make_task("PL-110", "ORCH-410", stage="review", branch="feature/ORCH-410-e2e") + jr = _make_job(tid, status="running", pid=5555, run_id=11) + jq = _make_job(tid, status="queued") + + res = stage_engine.cancel_task(tid, reason="Plane STOP status") + assert res["ok"] and not res["deferred"] + # agent stopped + assert calls["stop"] and calls["stop"][0][0] == 5555 + # jobs cancelled + assert _job_status(jr) == "cancelled" and _job_status(jq) == "cancelled" + # worktree + branch removed + assert calls["worktree"] and calls["branch"] + # durable terminal + key tombstone (re-create via To Analyse no longer collides) + t = get_task(tid) + assert t["stage"] == "cancelled" and t["cancelled_at"] + assert t["plane_id"].endswith(f"#cancelled-{tid}") + assert t["work_item_id"].endswith(f"#cancelled-{tid}") + # plane_issue_id is tombstoned too (the lookup ORs on it) but the original UUID + # remains recoverable from the parseable suffix (audit link preserved). + assert t["plane_issue_id"] == f"PL-110#cancelled-{tid}" + assert t["plane_issue_id"].split("#cancelled-")[0] == "PL-110" + assert get_task_by_plane_id("PL-110") is None # freed for a fresh start + + +# =========================================================================== TC-14 +def test_tc14_migration_idempotent_and_columns_present(): + # Re-running init_db must not fail (idempotent _ensure_column). + init_db() + init_db() + conn = get_db() + cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()} + conn.close() + assert "cancelled_at" in cols and "cancel_requested_at" in cols + + +def test_tc14_existing_contracts_intact(): + # The additive job status set still has the original statuses working. + tid = _make_task("PL-120", "ORCH-420") + jid = _make_job(tid, status="queued") + # A queued job is still claimable when no gate blocks it. + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == jid + + +# =========================================================================== D7 +def test_d7_stop_in_critical_window_defers(monkeypatch): + calls = _stub_full_reset(monkeypatch) + from src import self_deploy + tid = _make_task("PL-130", "ORCH-430", stage="deploy", branch="feature/ORCH-430-d") + # self-deploy Phase B initiated -> critical window. + self_deploy.write_marker("orchestrator", "ORCH-430", self_deploy.INITIATED, content="1") + jq = _make_job(tid, status="queued") + jr = _make_job(tid, status="running", pid=7777) # the deploy actor + + res = stage_engine.cancel_task(tid) + assert res["deferred"] is True and res["ok"] + # Only queued jobs cancelled; the running deploy actor is NOT killed. + assert _job_status(jq) == "cancelled" + assert _job_status(jr) == "running" + assert calls["stop"] == [] and calls["branch"] == [] + # The deferred flag is durable; the task is NOT yet terminal. + t = get_task(tid) + assert t["cancel_requested_at"] and t["stage"] == "deploy" + + +def test_d7_in_critical_window_detection(monkeypatch): + from src import self_deploy + task = {"repo": "orchestrator", "work_item_id": "ORCH-431", "branch": "feature/x"} + assert cancel_mod.in_critical_window(task) is False + self_deploy.write_marker("orchestrator", "ORCH-431", self_deploy.INITIATED, content="1") + assert cancel_mod.in_critical_window(task) is True + + +def test_d7_deferred_applied_by_finalizer(monkeypatch): + """After the irreversible step finishes, the finalizer applies the deferred cancel.""" + calls = _stub_full_reset(monkeypatch) + tid = _make_task("PL-140", "ORCH-440", stage="development", branch="feature/ORCH-440-d") + # Mark a deferred cancellation pending (as the critical-window path would). + db.set_task_cancel_requested(tid) + + # force=True is what run_deploy_finalizer uses once the step completed honestly. + res = stage_engine.cancel_task(tid, force=True, source="deferred") + assert res["ok"] and not res["deferred"] + assert get_task(tid)["stage"] == "cancelled" + assert calls["branch"], "deferred cancel applies the full reset"