diff --git a/.env.example b/.env.example index 94d3a65..aa74772 100644 --- a/.env.example +++ b/.env.example @@ -107,6 +107,20 @@ ORCH_PREMERGE_REBASE_ALWAYS=true # cache them into job_deps (the scheduler then reads only the DB). ORCH_TASK_DEPS_ENABLED=true ORCH_TASK_DEPS_SOURCE=db +# ORCH-088 (Stage 1, serial e2e): per-repo serial gate. A NEW task's analyst-job does +# NOT enter analysis (no branch cut, no analyst) while the same repo has an EARLIER +# unfinished task (FIFO, tasks.id < the job's task) OR the repo is frozen. The branch +# cut is DEFERRED from start_pipeline to the analyst-job claim so its base is a fresh +# origin/main already containing the predecessor (anti-stale-base). Gate lives in +# claim_next_job (offline hot-path, fail-OPEN on error); freeze (FR-5) is a durable +# repo_freeze row set on post-deploy DEGRADED, cleared manually via +# POST /serial-gate/unfreeze?repo=. Leaf src/serial_gate.py (never-raise). +# SERIAL_GATE_ENABLED=false -> claim AND start_pipeline are 1:1 as before ORCH-088. +# SERIAL_GATE_REPOS (CSV) -> scope; EMPTY = ALL repos (not self-hosting-only). +# SERIAL_GATE_FREEZE_ENABLED=false -> the rollback-freeze layer is off (not set/read). +ORCH_SERIAL_GATE_ENABLED=true +ORCH_SERIAL_GATE_REPOS= +ORCH_SERIAL_GATE_FREEZE_ENABLED=true # ORCH-071/073: merge-verify under-gate on the `deploy -> done` edge (врезка in # advance_stage, NOT a new STAGE_TRANSITIONS edge / registered QG). A deterministic # merge-actor merges the feature code-PR via the Gitea PR-merge API (never push/ diff --git a/.task-dev.md b/.task-dev.md index ff21934..295f511 100644 --- a/.task-dev.md +++ b/.task-dev.md @@ -1,4 +1,4 @@ -Work item: ORCH-061 +Work item: ORCH-088 Repo: orchestrator -Branch: feature/ORCH-061-bug-deploy-staging-development +Branch: feature/ORCH-088-orch-88-10-20 Stage: development \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 522fc1d..f4cc4c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,11 @@ Формат: [Keep a Changelog](https://keepachangelog.com/). Записи — на смысловой PR/задачу. ## [Unreleased] +- **Per-repo serial gate: пакетный автономный режим (Этап 1, serial e2e)** (ORCH-088, `feat`): закрыт **логический** stale-анализ — ветка задачи N+1 срезалась на входе в анализ (`start_pipeline._create_gitea_branch`) от `main`, ещё не содержащего код предшественника N (физическое затирание уже закрыто ORCH-026). Новая задача репо не входит в `analysis` (не режет ветку, не запускает analyst), пока в репо есть незавершённая задача или репо заморожен. Аддитивно, под kill-switch, область репо, never-raise, restart-safe; `STAGE_TRANSITIONS`/`QG_CHECKS`/`check_*` — **без изменений**. + - **Gate-в-claim** (`db.claim_next_job`): analyst-job (`jobs.agent='analyst'`) применимого репо не выбирается, если `EXISTS` **более ранняя** незавершённая задача репо (`t2.id < jobs.task_id`) ИЛИ активна строка `repo_freeze`. Фрагмент строится в leaf `src/serial_gate.py::build_claim_clause` (санитизация repo-токенов `^[A-Za-z0-9._-]+$`, **fail-OPEN** на любой ошибке построения — не заклинить очередь всех проектов, AC-8); только локальная БД (offline hot-path, NFR-2). Job'ы уже активной задачи проходят свободно. **FIFO-уточнение реализации (FR-2):** ADR-001 D1 фиксировал псевдо-SQL `t2.id != jobs.task_id`; при `!=` пакет одновременно созданных свежих задач (все в `analysis`) взаимно блокировался бы → дедлок всей serial-очереди (воспроизведено). `<` допускает ровно самую раннюю задачу и сериализует остальные за ней (строго по одной, FIFO по `jobs.id`), сохраняя AC-1 и не блокируя rework-analyst собственной задачи (R-7). + - **Отложенный срез ветки (анти-stale-base, AC-6):** для применимого репо `start_pipeline` создаёт task-row + enqueue analyst, но **не** создаёт Gitea-ветку/docs; срез релоцирован в `launcher._spawn` (новый `_materialize_deferred_branch`, sync через `asyncio.run` в worker-потоке, R-4) на момент claim analyst-job, когда `origin/main` уже содержит предшественника (`done` ⇔ SHA-в-main, ORCH-071/073). `ensure_worktree` режет от свежего `origin/main` ⇒ AC-6 структурно. Идемпотентно (`_create_gitea_branch` 409 / `_create_initial_docs` 422 = no-op) → безопасно при реклейме/рестарте. Ожидающая задача = `queued` analyst-job без ветки; `tasks.branch` хранится как имя (R-5). + - **Durable per-repo freeze (FR-5):** новая аддитивная append-only таблица `repo_freeze(id, repo, frozen_at, reason, work_item_id, cleared_at)` (`CREATE TABLE/INDEX IF NOT EXISTS` в `init_db`, идемпотентно, restart-safe). Post-deploy `DEGRADED` (`stage_engine.run_post_deploy_monitor`) → `serial_gate.set_repo_freeze` + Telegram-алерт «пакет заморожен»; gate закрыт безусловно (деградировавшая задача уже `done`, BR-7 ⇒ отдельный сигнал, независимый от `stage`) до **ручного** снятия — новый эндпоинт `POST /serial-gate/unfreeze?repo=` (`clear_repo_freeze`, идемпотентно, + Telegram-подтверждение; альтернатива — `UPDATE repo_freeze SET cleared_at=datetime('now') …`). freeze в Python-слое (`is_repo_frozen`) → **fail-CLOSED** (безопасность прода, AC-9). Независимый тумблер `serial_gate_freeze_enabled`. + - **Конфигурация (`src/config.py`):** `serial_gate_enabled` (kill-switch, `ORCH_SERIAL_GATE_ENABLED`, дефолт true → claim+start_pipeline 1:1 как сейчас при false), `serial_gate_repos` (CSV, `ORCH_SERIAL_GATE_REPOS`; **пусто ⇒ все репо**, в отличие от self-hosting-only ORCH-35/43/58; оператор может сузить), `serial_gate_freeze_enabled` (`ORCH_SERIAL_GATE_FREEZE_ENABLED`). Наблюдаемость — аддитивный блок `serial_gate` в `GET /queue` (per-repo `active_task`/`waiting`/`frozen`+reason+at); существующие ключи не меняются. **NFR-6:** freeze — пассивная остановка стартов, прод-контейнер не рестартится/не роняется. Cross-repo параллелизм сохранён (FR-3/AC-4); при выключенном флаге — нулевая регрессия (enduro не затронут, AC-7). ADR `docs/work-items/ORCH-088/06-adr/ADR-001-serial-gate.md`, данные `08-data-requirements.md`, сквозной `adr-0017`. Документация: `docs/architecture/README.md` (раздел serial gate + `/queue` + таблица API + раздел БД), `CLAUDE.md`. Тесты: `tests/test_serial_gate.py` (TC-01/02/03/08/15/16/17/19/21), `tests/test_serial_gate_e2e.py` (TC-04/05/06), `tests/test_serial_gate_freeze.py` (TC-07/09/10/11/12/18/22), `tests/test_serial_gate_branch.py` (TC-13/14), `tests/test_queue_endpoint.py` (TC-20). - **CI-фикс: per-run путь логов из хардкода `/app/data/runs` в `settings.runs_dir`** (ORCH-087, `fix`): тест `tests/test_launcher.py::TestEffortStamp::test_spawn_stamps_resolved_effort` падал в CI (`PermissionError: [Errno 13] … '/app'`) — зелёный локально-в-контейнере (где `/app` есть), красный на CI-хосте (act_runner hostexecutor, юзер без доступа к `/app`). **Корень:** `launcher._spawn` хардкодил `output_path="/app/data/runs/{run_id}.log"` + `os.makedirs('/app/data/runs')`, а тест дёргал `_spawn`, не замокав путь → makedirs на недоступном `/app` бросал. **Фикс (корень, не только тест):** базовый каталог per-run логов вынесен в `Settings.runs_dir` (env `ORCH_RUNS_DIR`, дефолт `/app/data/runs` — прод-layout 1:1); новый хелпер `launcher._run_log_path(run_id)` = `/{run_id}.log` стал единым источником пути (использован в `_spawn` + три прежних inline-строки логов/алертов). Тест `monkeypatch`-ит `settings.runs_dir` на `tmp_path` → окружение-независим (подтверждено прогоном с принудительно недоступным `/app`). `STAGE_TRANSITIONS`/`QG_CHECKS`/схема БД — без изменений. Документация: `README.md` (таблица env), `CHANGELOG.md`. - **Live-трекер: зачистка осиротевших карточек + эффорт в строке стадии + честное итоговое время** (ORCH-087, `fix`): в чат периодически попадали «замёрзшие» сироты — старая карточка с заголовком `📍 To Analyse` висела на задаче, реально дошедшей до `deploy` (скриншот ORCH-082). **Корень (G0/ADR-001):** указатель `tasks.tracker_message_id` — скаляр (знает лишь ПОСЛЕДНИЙ `message_id`), поэтому при рассинхроне bump-режима (доминанты: гонка двух `update_task_tracker` и `delete`-fail+`send`-ok) ссылка на прежнюю карточку терялась навсегда → сирота не удалялась и больше не обновлялась (рендер исправен — застывал именно потерянный mid). **Фикс (bump сохранён дефолтом — фича «карточка внизу» ORCH-042/067):** - **G1 — полный учёт mid:** аддитивная таблица-леджер `tracker_messages(task_id, message_id, created_at, deleted_at)` (`src/db.py`) + хелперы `add_tracker_message`/`get_open_tracker_messages`/`mark_tracker_message_deleted`. На каждом bump зачищаются ВСЕ незакрытые mid (`deleted_at IS NULL`), а не только скаляр: успех/«already gone» (`_DELETE_GONE_MARKERS`) → `deleted_at`; transient-`delete` → остаётся для ретрая; новый mid в леджер + `set_tracker_message_id` ТОЛЬКО при успешном `send` (R-3/BR-6). Остаточная гонка самозалечивается за один переход (лок не вводится). Скаляр `tracker_message_id` сохранён (BC). Known-limitation: Telegram 48ч (сироты старше неудаляемы). diff --git a/CLAUDE.md b/CLAUDE.md index f375f01..f366544 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -7,7 +7,7 @@ - Backend: FastAPI + uvicorn (Python 3.12) - БД: SQLite (`src/db.py`) - Агенты: Claude CLI (`ORCH_CLAUDE_BIN`), по одному промпту на роль в `.openclaw/agents/`. **ORCH-74:** модель/эффорт агента берутся ТОЛЬКО из config (`resolve_agent_model`/`resolve_agent_effort`, ORCH-41) — frontmatter `model:` удалён как мёртвый, frontmatter описательный; имя модели валидируется форматом `^claude-…$` перед `--model` (never-break). -- Очередь задач: собственная (SQLite `jobs`, `src/queue_worker.py`, ORCH-1). **ORCH-026:** `claim_next_job` гейтит задачи с незавершёнными зависимостями (`job_deps`, `NOT EXISTS`) без занятия слота `max_concurrency`; декларации/детект циклов — leaf `src/task_deps.py` (kill-switch `ORCH_TASK_DEPS_ENABLED`). Сериализация мержа одного репо — безусловный pre-merge rebase под merge-lease (`ORCH_PREMERGE_REBASE_ALWAYS`). +- Очередь задач: собственная (SQLite `jobs`, `src/queue_worker.py`, ORCH-1). **ORCH-026:** `claim_next_job` гейтит задачи с незавершёнными зависимостями (`job_deps`, `NOT EXISTS`) без занятия слота `max_concurrency`; декларации/детект циклов — leaf `src/task_deps.py` (kill-switch `ORCH_TASK_DEPS_ENABLED`). Сериализация мержа одного репо — безусловный pre-merge rebase под merge-lease (`ORCH_PREMERGE_REBASE_ALWAYS`). **ORCH-088 (serial gate, Этап 1):** новая задача репо не входит в `analysis` (analyst-job не выбирается, ветка не режется), пока в репо есть **более ранняя** незавершённая задача (`t2.id < jobs.task_id`, FIFO) ИЛИ репо заморожен (`repo_freeze`). Срез ветки **отложен** со `start_pipeline` на момент claim analyst-job (`launcher._materialize_deferred_branch`) — база = свежий `origin/main` с кодом предшественника (анти-stale-base). Post-deploy `DEGRADED` → durable per-repo freeze (`repo_freeze`, `cleared_at IS NULL` = активен) + Telegram; снятие — вручную `POST /serial-gate/unfreeze?repo=…`. Leaf `src/serial_gate.py` (claim — fail-OPEN, freeze — fail-CLOSED); флаги `ORCH_SERIAL_GATE_ENABLED` (kill-switch), `ORCH_SERIAL_GATE_REPOS` (CSV; пусто = все репо), `ORCH_SERIAL_GATE_FREEZE_ENABLED`. Блок `serial_gate` в `GET /queue`. `STAGE_TRANSITIONS`/`QG_CHECKS` не тронуты. - Контейнеризация: Docker + Compose - CI/CD: Gitea Actions (`.gitea/workflows/`) - Деплой: docker compose на mva154 diff --git a/docs/architecture/README.md b/docs/architecture/README.md index a8bee3d..6729dfa 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -92,7 +92,7 @@ Self-hosting зацикливался на `deploy-staging`: `scripts/staging_ch Подробнее: [adr-0015](adr/adr-0015-task-deps-and-merge-serialization.md), детально — `docs/work-items/ORCH-026/06-adr/ADR-001-merge-serialization-and-task-deps.md`. -### Per-repo serial gate: пакетный автономный режим (ORCH-088 — design) +### Per-repo serial gate: пакетный автономный режим (ORCH-088 — реализовано) Эпик «10–20 задач за ночь», Этап 1 (serial e2e). Закрывает **stale-анализ**: ветка задачи N+1 срезалась на входе в анализ (`start_pipeline._create_gitea_branch`) от `main`, ещё не содержащего код предшественника N (физическое код-затирание уже закрыто ORCH-026; ORCH-088 — **логический** разрыв). @@ -100,9 +100,14 @@ Self-hosting зацикливался на `deploy-staging`: `scripts/staging_ch есть незавершённая задача (`stage != 'done'`) или репо заморожен. Аддитивно, под kill-switch, область репо, never-raise, restart-safe; `STAGE_TRANSITIONS` / `QG_CHECKS` / `check_*` — **без изменений**. - **Gate-в-claim** (`db.claim_next_job`) — analyst-job (`jobs.agent='analyst'`) применимого репо не - выбирается, если `EXISTS` другая незавершённая задача репо (`t2.id != jobs.task_id` — rework-analyst - не блокирует себя) ИЛИ активна строка `repo_freeze`. По образцу `task_deps` `NOT EXISTS` (ORCH-026); - только локальная БД (offline hot-path, NFR-2). Job'ы уже активной задачи проходят свободно. + выбирается, если `EXISTS` **более ранняя** незавершённая задача репо (`t2.id < jobs.task_id`) ИЛИ + активна строка `repo_freeze`. По образцу `task_deps` `NOT EXISTS` (ORCH-026); только локальная БД + (offline hot-path, NFR-2). Job'ы уже активной задачи проходят свободно. **FIFO-уточнение реализации + (FR-2):** ADR-001 D1 фиксировал псевдо-SQL `t2.id != jobs.task_id`; при `!=` пакет одновременно + созданных свежих задач (все в `analysis`) взаимно блокировался бы (каждая — «другая незавершённая» + для остальных) ⇒ дедлок всей serial-очереди. `<` допускает ровно самую раннюю задачу и сериализует + остальные за ней (строго по одной, FIFO по `jobs.id`), при этом по-прежнему не блокирует rework-analyst + собственной задачи (R-7) и сохраняет AC-1. - **Отложенный срез ветки (анти-stale-base, AC-6):** для применимого репо `start_pipeline` создаёт task-row + enqueue analyst, но **не** создаёт Gitea-ветку/docs; срез релоцируется на момент claim analyst-job (launcher), когда `origin/main` уже содержит предшественника (`done` ⇔ SHA-в-main, @@ -594,6 +599,7 @@ Monitoring after Deploy → Done - `agent_runs` — запуски агентов (run_id, usage, cost) - `jobs` — очередь задач (ORCH-1); колонка `pid` (ORCH-065) — pid агентского процесса для liveness-детекции зомби job-reaper'ом - `job_deps` — декларативные зависимости задач (ORCH-026, Уровень B): `(task_id, depends_on_task_id)`, аддитивная; источник истины планировщика для гейта «B ждёт A» +- `repo_freeze` — durable per-repo rollback-freeze (ORCH-088, FR-5): `(id, repo, frozen_at, reason, work_item_id, cleared_at)`, аддитивная append-only; активный freeze ⇔ строка репо с `cleared_at IS NULL`. Выставляется post-deploy `DEGRADED` (`set_repo_freeze`), снимается вручную (`POST /serial-gate/unfreeze` → `cleared_at=now`). Гейтит serial-claim безусловно (деградировавшая задача уже `done`) ## Изоляция (git worktree, ORCH-2) Каждая задача исполняется в отдельном git worktree, ветки не пересекаются. Репозитории проектов разделены под `/repos/`. @@ -603,7 +609,8 @@ Monitoring after Deploy → Done |--------|------|----------| | GET | `/health` | health check | | GET | `/status` | активные задачи (stage != done) | -| GET | `/queue` | очередь: counts + max_concurrency + resilience + reconcile (ORCH-053) + reaper (ORCH-065) + post_deploy (ORCH-021) + последние jobs | +| GET | `/queue` | очередь: counts + max_concurrency + resilience + reconcile (ORCH-053) + reaper (ORCH-065) + post_deploy (ORCH-021) + task_deps (ORCH-026) + serial_gate (ORCH-088) + последние jobs | +| POST | `/serial-gate/unfreeze` | ORCH-088 (FR-5): ручное снятие per-repo rollback-freeze (query/body `repo=`) → `{ok, repo, cleared, frozen}`; идемпотентно. Альтернатива — `UPDATE repo_freeze SET cleared_at=datetime('now') WHERE repo=? AND cleared_at IS NULL` | | POST | `/webhook/plane` | Plane webhook | | POST | `/webhook/gitea` | Gitea webhook (push, PR, CI status) | @@ -621,3 +628,4 @@ Monitoring after Deploy → Done *Актуально на 2026-06-07. Обновлять при изменении src/stages.py, src/qg/checks.py, src/main.py. Статусы доработок: ORCH-036 (исполняемый самодеплой `deploy`, adr-0007) — реализовано; ORCH-043 (merge-gate, adr-0006) — design, ветка feature/ORCH-043; ORCH-053 (reconciler, adr-0007, src/reconciler.py) — реализовано; ORCH-060 (F-1 skip escalated/Blocked/Needs-Input, `docs/work-items/ORCH-060/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-060 (Guard 1 `developer_retry_count>=MAX_DEVELOPER_RETRIES` + Guard 2 `plane_sync.fetch_issue_state` Blocked/Needs-Input, флаг `ORCH_RECONCILE_SKIP_BLOCKED_ENABLED`); ORCH-058 (провенанс staging-образа: check_staging_image_fresh + staging_check свежего образа + хук-guard, adr-0008) — реализовано в ветке feature/ORCH-058 (обновлять также при изменении src/image_freshness.py, scripts/orchestrator-deploy-hook.sh, Dockerfile); ORCH-061 (толерантность staging-вердикта к инфра-FAIL C9a/C9b, adr-0009, `docs/work-items/ORCH-061/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-061 (обновлять также при изменении src/staging_verdict.py, scripts/staging_check.py, флаг staging_infra_tolerance_enabled); ORCH-021 (post-deploy наблюдение прода + реакция на деградацию, adr-0010, `docs/work-items/ORCH-021/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-021-post-deploy-rollback (reserved-agent job `post-deploy-monitor`: арм в src/stage_engine.py блок `next_stage == "done"`, тик `run_post_deploy_monitor` + перехват в src/agents/launcher.py ДО _spawn; чистая логика src/post_deploy.py never-raise; флаги `post_deploy_*` в src/config.py; блок `post_deploy` в `/queue`; артефакт 16-post-deploy-log.md; self-hosting всегда ALERT_ONLY — тик не рестартит прод; обновлять также при изменении src/post_deploy.py / арм-блока / launcher-перехвата); ORCH-065 (job-reaper + проактивный реклейм merge-lease + идемпотентная финализация merge, adr-0011, `docs/work-items/ORCH-065/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-065 (новый daemon-поток src/job_reaper.py + старт/стоп в src/main.py lifespan; колонка `jobs.pid` через _ensure_column + проставление в src/agents/launcher.py `_spawn`; функции реклейма lease `pid_alive`/`reclaim_stale_lease` + guard `pr_already_merged` в src/merge_gate.py (консультируется merge-актором — промпт `.openclaw/agents/deployer.md`); флаги `reaper_*`/`lease_reclaim_*` в src/config.py; блок `reaper` в `/queue`; обновлять также при изменении этих мест); ORCH-059 (выделенный статус-триггер прод-деплоя «Confirm Deploy», ADR `docs/work-items/ORCH-059/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-059 (маппинг `"Confirm Deploy"→"confirm_deploy"` в src/plane_sync.py `_PLANE_NAME_TO_KEY`, НЕ в `_DEFAULT_STATES` = fail-closed; ветка `handle_confirm_deploy` + fail-closed `.get("confirm_deploy")` в src/webhooks/plane.py `handle_issue_updated`; keyword-only `confirm_deploy` в src/stage_engine.py `advance_stage` — Фаза B деплоит ТОЛЬКО при `confirm_deploy=True`, иначе `Approved`-на-`deploy` = no-op; CTA Фазы A просит «Confirm Deploy»; эксплуатация — статус доски «Confirm Deploy» в Plane-проекте ORCH, `docs/work-items/ORCH-059/07-infra-requirements.md`).* *Актуально на 2026-06-07. Обновлять при изменении src/stages.py, src/qg/checks.py, src/main.py. Статусы доработок: ORCH-036 (исполняемый самодеплой `deploy`, adr-0007) — реализовано; ORCH-043 (merge-gate, adr-0006) — design, ветка feature/ORCH-043; ORCH-053 (reconciler, adr-0007, src/reconciler.py) — реализовано; ORCH-060 (F-1 skip escalated/Blocked/Needs-Input, `docs/work-items/ORCH-060/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-060 (Guard 1 `developer_retry_count>=MAX_DEVELOPER_RETRIES` + Guard 2 `plane_sync.fetch_issue_state` Blocked/Needs-Input, флаг `ORCH_RECONCILE_SKIP_BLOCKED_ENABLED`); ORCH-058 (провенанс staging-образа: check_staging_image_fresh + staging_check свежего образа + хук-guard, adr-0008) — реализовано в ветке feature/ORCH-058 (обновлять также при изменении src/image_freshness.py, scripts/orchestrator-deploy-hook.sh, Dockerfile); ORCH-061 (толерантность staging-вердикта к инфра-FAIL C9a/C9b, adr-0009, `docs/work-items/ORCH-061/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-061 (обновлять также при изменении src/staging_verdict.py, scripts/staging_check.py, флаг staging_infra_tolerance_enabled); ORCH-021 (post-deploy наблюдение прода + реакция на деградацию, adr-0010, `docs/work-items/ORCH-021/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-021-post-deploy-rollback (reserved-agent job `post-deploy-monitor`: арм в src/stage_engine.py блок `next_stage == "done"`, тик `run_post_deploy_monitor` + перехват в src/agents/launcher.py ДО _spawn; чистая логика src/post_deploy.py never-raise; флаги `post_deploy_*` в src/config.py; блок `post_deploy` в `/queue`; артефакт 16-post-deploy-log.md; self-hosting всегда ALERT_ONLY — тик не рестартит прод; обновлять также при изменении src/post_deploy.py / арм-блока / launcher-перехвата); ORCH-065 (job-reaper + проактивный реклейм merge-lease + идемпотентная финализация merge, adr-0011, `docs/work-items/ORCH-065/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-065 (новый daemon-поток src/job_reaper.py + старт/стоп в src/main.py lifespan; колонка `jobs.pid` через _ensure_column + проставление в src/agents/launcher.py `_spawn`; функции реклейма lease `pid_alive`/`reclaim_stale_lease` + guard `pr_already_merged` в src/merge_gate.py (консультируется merge-актором — промпт `.openclaw/agents/deployer.md`); флаги `reaper_*`/`lease_reclaim_*` в src/config.py; блок `reaper` в `/queue`; обновлять также при изменении этих мест); ORCH-066 (осмысленная статусная модель Plane — слой B, `docs/work-items/ORCH-066/06-adr/ADR-001-plane-status-model.md`) — реализовано в ветке feature/ORCH-066-plane (только Plane-индикация: новые ключи `to_analyse`/`analysis`/`code_review`/`awaiting_deploy`/`deploying`/`monitoring` в `_PLANE_NAME_TO_KEY`/`_DEFAULT_STATES` + project-relative `_STATE_ALIAS_FALLBACK` в get_project_states + `_STAGE_TO_STATE_KEY` analysis/review + 5 новых `set_issue_*` в src/plane_sync.py; триггер `in_progress`→`to_analyse` и `set_issue_analysis` в src/webhooks/plane.py; Phase A→Awaiting Deploy / Phase B→Deploying / terminal-sync split monitoring↔done / post-deploy monitor HEALTHY→Done DEGRADED→Blocked в src/stage_engine.py; F-2 триггер `to_analyse` + Guard 2 skip-set с вычитанием base_working в src/reconciler.py; `STAGE_TRANSITIONS`/QG/схема БД НЕ трогаются; без kill-switch — раскат гейтится созданием 6 Plane-статусов оператором, `docs/work-items/ORCH-066/07-infra-requirements.md`; обновлять при изменении этих мест).* *Актуально на 2026-06-07. Обновлять при изменении src/stages.py, src/qg/checks.py, src/main.py. Статусы доработок: ORCH-036 (исполняемый самодеплой `deploy`, adr-0007) — реализовано; ORCH-043 (merge-gate, adr-0006) — design, ветка feature/ORCH-043; ORCH-053 (reconciler, adr-0007, src/reconciler.py) — реализовано; ORCH-060 (F-1 skip escalated/Blocked/Needs-Input, `docs/work-items/ORCH-060/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-060 (Guard 1 `developer_retry_count>=MAX_DEVELOPER_RETRIES` + Guard 2 `plane_sync.fetch_issue_state` Blocked/Needs-Input, флаг `ORCH_RECONCILE_SKIP_BLOCKED_ENABLED`); ORCH-058 (провенанс staging-образа: check_staging_image_fresh + staging_check свежего образа + хук-guard, adr-0008) — реализовано в ветке feature/ORCH-058 (обновлять также при изменении src/image_freshness.py, scripts/orchestrator-deploy-hook.sh, Dockerfile); ORCH-061 (толерантность staging-вердикта к инфра-FAIL C9a/C9b, adr-0009, `docs/work-items/ORCH-061/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-061 (обновлять также при изменении src/staging_verdict.py, scripts/staging_check.py, флаг staging_infra_tolerance_enabled); ORCH-021 (post-deploy наблюдение прода + реакция на деградацию, adr-0010, `docs/work-items/ORCH-021/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-021-post-deploy-rollback (reserved-agent job `post-deploy-monitor`: арм в src/stage_engine.py блок `next_stage == "done"`, тик `run_post_deploy_monitor` + перехват в src/agents/launcher.py ДО _spawn; чистая логика src/post_deploy.py never-raise; флаги `post_deploy_*` в src/config.py; блок `post_deploy` в `/queue`; артефакт 16-post-deploy-log.md; self-hosting всегда ALERT_ONLY — тик не рестартит прод; обновлять также при изменении src/post_deploy.py / арм-блока / launcher-перехвата); ORCH-065 (job-reaper + проактивный реклейм merge-lease + идемпотентная финализация merge, adr-0011, `docs/work-items/ORCH-065/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-065 (новый daemon-поток src/job_reaper.py + старт/стоп в src/main.py lifespan; колонка `jobs.pid` через _ensure_column + проставление в src/agents/launcher.py `_spawn`; функции реклейма lease `pid_alive`/`reclaim_stale_lease` + guard `pr_already_merged` в src/merge_gate.py (консультируется merge-актором — промпт `.openclaw/agents/deployer.md`); флаги `reaper_*`/`lease_reclaim_*` в src/config.py; блок `reaper` в `/queue`; обновлять также при изменении этих мест); ORCH-068 (livelock-fix reconciler F-2: терминал-исключение по группе состояния + `_note_unblock` только при подтверждённом state change + дедуп; TTL `_STATES_CACHE`, `docs/work-items/ORCH-068/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-068 (D1 терминал-гард по группе `_is_terminal_state` + `get_project_state_groups` в src/plane_sync.py; D2 сравнение стадии до/после `_dispatch` + дедуп-словарь в src/reconciler.py; TTL-запись `_STATES_CACHE` + флаг `plane_states_ttl_s` в src/config.py; счётчики `skipped_terminal_total`/`deduped_total` в `/queue`; обновлять также при изменении src/reconciler.py F-2, src/plane_sync.py `get_project_states`/`get_project_state_groups`/`_STATES_CACHE`).* +*Актуально на 2026-06-09. Статус доработки: ORCH-088 (per-repo serial gate, Этап 1 serial e2e, adr-0017, `docs/work-items/ORCH-088/06-adr/ADR-001-serial-gate.md`) — реализовано в ветке feature/ORCH-088 (leaf src/serial_gate.py never-raise: gate-фрагмент в src/db.py `claim_next_job` fail-OPEN c FIFO-условием `t2.id < jobs.task_id` + freeze `repo_freeze.cleared_at IS NULL`, freeze-решения fail-CLOSED; отложенный срез ветки src/webhooks/plane.py `start_pipeline` → src/agents/launcher.py `_materialize_deferred_branch` (sync `asyncio.run` в worker-потоке) при claim analyst-job; durable freeze таблица `repo_freeze` (idempotent миграция в init_db) + `set_repo_freeze` в src/stage_engine.py DEGRADED-ветке `run_post_deploy_monitor` + ручное снятие `POST /serial-gate/unfreeze` в src/main.py; флаги `serial_gate_enabled`/`serial_gate_repos`/`serial_gate_freeze_enabled` в src/config.py; блок `serial_gate` в `GET /queue`; `STAGE_TRANSITIONS`/`QG_CHECKS` НЕ трогаются; обновлять также при изменении этих мест).* diff --git a/src/agents/launcher.py b/src/agents/launcher.py index 2675e21..3d4f796 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -418,6 +418,32 @@ class AgentLauncher: pass return None + def _materialize_deferred_branch( + self, repo: str, branch: str, work_item_id: str | None, title: str | None + ) -> None: + """ORCH-088 (ADR-001 D1): create the deferred Gitea branch + initial docs. + + Relocated from ``webhooks.plane.start_pipeline``: the two coroutines are run + SYNCHRONOUSLY here (this method executes in the worker THREAD — no running + event loop — so ``asyncio.run`` is safe, R-4). Sequence mirrors the original + start_pipeline order so the downstream worktree/PR flow is identical: + ``_create_gitea_branch`` (from a fresh ``main``) -> ``_create_initial_docs``. + Both are idempotent (409/422 -> no-op) so a re-claim after a restart is safe. + A transient Gitea error PROPAGATES so the caller (_spawn) fails the launch and + the queue worker requeues the job for a later tick (never a half-cut state). + """ + import asyncio + from ..webhooks.plane import _create_gitea_branch, _create_initial_docs + + name = title or work_item_id or branch + logger.info( + f"ORCH-088: materialising deferred branch '{branch}' for {repo} " + f"({work_item_id}) at analyst-job claim" + ) + asyncio.run(_create_gitea_branch(repo, branch)) + if work_item_id: + asyncio.run(_create_initial_docs(repo, branch, work_item_id, name)) + def _spawn(self, agent: str, repo: str, task_content: str = None, task_id: int = None, job_id: int = None) -> int: """Shared spawn implementation for launch() and launch_job(). @@ -437,9 +463,33 @@ class AgentLauncher: raise FileNotFoundError(f"Repo not found: {local_repo_path}") # Determine branch (needed before we touch the worktree / task file). - _br_row = get_db().execute("SELECT branch FROM tasks WHERE id=?", (task_id,)).fetchone() if task_id else None + _br_row = ( + get_db().execute( + "SELECT branch, work_item_id, title FROM tasks WHERE id=?", (task_id,) + ).fetchone() + if task_id else None + ) agent_branch = _br_row[0] if _br_row else "main" + # ORCH-088 (FR-1/AC-6, ADR-001 D1): materialise a DEFERRED branch cut. When + # the serial gate applies, start_pipeline did NOT create the Gitea branch / + # initial docs — they were deferred to this claim so the cut happens from a + # fresh origin/main that already contains the predecessor. We only reach this + # claim because the gate is OPEN (predecessor done), so it is now safe. This + # runs ONLY for the analyst-job (pipeline entry); every later stage reuses the + # existing branch. Idempotent (409/422 -> no-op) so a re-claim is safe. On a + # transient Gitea error this raises -> _drain_once requeues the job (R-4). + if agent == "analyst" and _br_row is not None: + try: + from .. import serial_gate + _applies = serial_gate.serial_gate_applies(repo) + except Exception: # noqa: BLE001 - never let the gate check block a launch + _applies = False + if _applies: + self._materialize_deferred_branch( + repo, agent_branch, _br_row[1], _br_row[2] + ) + # ORCH-41: resolve the Plane project uuid for this repo so per-project # model/effort overrides apply. Unknown repo -> None (env/default only). from ..projects import get_project_by_repo diff --git a/src/config.py b/src/config.py index b650a46..f0eb05e 100644 --- a/src/config.py +++ b/src/config.py @@ -433,6 +433,31 @@ class Settings(BaseSettings): task_deps_enabled: bool = True task_deps_source: str = "db" + # ORCH-088 (Этап 1, serial e2e): per-repo serial gate. A new task's analyst-job + # does NOT enter analysis (no branch cut, no analyst agent) while the same repo + # has another unfinished task (tasks.stage != 'done') OR the repo is frozen + # (repo_freeze). The gate lives in claim_next_job (offline-safe hot path, like + # the ORCH-026 dep-gate) + the branch cut is deferred from start_pipeline to the + # analyst-job claim (launcher) so the branch base is always a fresh origin/main + # that already contains the predecessor (anti-stale-base, AC-6). All additive, + # never-raise, restart-safe; STAGE_TRANSITIONS / QG_CHECKS unchanged. See + # docs/work-items/ORCH-088/06-adr/ADR-001-serial-gate.md. + # serial_gate_enabled -> kill-switch (env ORCH_SERIAL_GATE_ENABLED). + # False -> claim_next_job AND start_pipeline are 1:1 + # as before ORCH-088 (clause omitted, branch cut in + # start_pipeline) — zero regression (AC-7). + # serial_gate_repos -> CSV scope (env ORCH_SERIAL_GATE_REPOS). Empty -> + # applies to ALL registered repos (D5); non-empty -> + # only the listed repos. Repo tokens are sanitised + # (^[A-Za-z0-9._-]+$) before being embedded in SQL. + # serial_gate_freeze_enabled-> independent tumbler for the FR-5 rollback-freeze + # layer (env ORCH_SERIAL_GATE_FREEZE_ENABLED). False + # -> freeze is neither set (post-deploy DEGRADED) nor + # consulted in the claim gate. + serial_gate_enabled: bool = True + serial_gate_repos: str = "" + serial_gate_freeze_enabled: bool = True + # ORCH-073 (ADR-001 Р-4): main-integrity regression guard. After the merge-verify # under-gate confirms the deployed SHA is an ancestor of origin/main (FR-1), a # secondary deterministic (no-LLM) guard checks that a declarative set of markers diff --git a/src/db.py b/src/db.py index 967f387..05bec92 100644 --- a/src/db.py +++ b/src/db.py @@ -168,6 +168,26 @@ def init_db(): CREATE INDEX IF NOT EXISTS idx_tracker_messages_open ON tracker_messages(task_id) WHERE deleted_at IS NULL; """) + # ORCH-088 (FR-5, ADR-001 D2): durable per-repo rollback-freeze. After a + # post-deploy DEGRADED verdict the repo is frozen so the serial gate stays + # CLOSED unconditionally (the degraded task is already stage='done' — BR-7 — so + # the ordinary active-task gate would not hold it) until an operator clears it + # via POST /serial-gate/unfreeze. Append-only journal: an ACTIVE freeze for repo + # R ⇔ a row with repo=R AND cleared_at IS NULL. Purely ADDITIVE (CREATE + # TABLE/INDEX IF NOT EXISTS) -> idempotent, restart-safe on the live shared prod + # DB (enduro-trails data untouched). See 08-data-requirements.md. + conn.executescript(""" + CREATE TABLE IF NOT EXISTS repo_freeze ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + repo TEXT NOT NULL, + frozen_at TEXT NOT NULL DEFAULT (datetime('now')), + reason TEXT, + work_item_id TEXT, + cleared_at TEXT + ); + CREATE INDEX IF NOT EXISTS idx_repo_freeze_active + ON repo_freeze (repo, cleared_at); + """) conn.commit() conn.close() @@ -588,6 +608,19 @@ def claim_next_job() -> dict | None: " WHERE d.task_id = jobs.task_id AND t.stage != 'done'" ") " ) + # ORCH-088 (FR-1, ADR-001 D1): per-repo serial gate. An analyst-job of a NEW + # task is NOT claimable while the same repo has another unfinished task OR is + # frozen. The fragment is built in the serial_gate leaf (sanitised repo scope, + # fail-OPEN on any build error so a transient fault never wedges the queue of + # ALL projects — AC-8). Jobs of an already-active task (architect/.../deployer) + # are unaffected — the gate keys on jobs.agent='analyst' only. Reads only the + # local DB (offline-safe hot path, NFR-2). + serial_gate = "" + try: + from . import serial_gate as _serial_gate + serial_gate = _serial_gate.build_claim_clause() + except Exception: # noqa: BLE001 - fail-OPEN: never wedge the claim + serial_gate = "" conn = get_db() try: while True: @@ -595,6 +628,7 @@ def claim_next_job() -> dict | None: "SELECT id FROM jobs WHERE status='queued' " "AND (available_at IS NULL OR available_at <= datetime('now')) " f"{dep_gate}" + f"{serial_gate}" "ORDER BY id LIMIT 1" ).fetchone() if not row: diff --git a/src/main.py b/src/main.py index a602b24..fa7cf8c 100644 --- a/src/main.py +++ b/src/main.py @@ -149,6 +149,7 @@ async def queue(): from . import post_deploy from . import merge_gate from . import task_deps + from . import serial_gate return { "counts": job_status_counts(), "max_concurrency": worker.max_concurrency, @@ -161,5 +162,37 @@ async def queue(): # ORCH-026 (G-2): declarative task-dependency observability (read-only, # NOT a source of truth) — declared edges, blocked tasks, detected cycle. "task_deps": task_deps.snapshot(), + # ORCH-088 (D9 / AC-10): per-repo serial-gate observability (read-only) — + # active task, queued/waiting analyst-jobs, freeze state. Additive block. + "serial_gate": serial_gate.snapshot(), "recent": recent_jobs(10), } + + +@app.post("/serial-gate/unfreeze") +async def serial_gate_unfreeze(repo: str = ""): + """ORCH-088 (FR-5, ADR-001 D4): manually clear a per-repo rollback-freeze. + + A freeze set by the post-deploy monitor (DEGRADED) keeps the serial gate CLOSED + for the repo until an operator explicitly clears it here. Idempotent: clearing + an already-clear repo reports ``cleared: 0``. The next queued analyst-job is then + claimable on the next scheduler tick (no restart needed). Alternative manual path + (documented in README): ``UPDATE repo_freeze SET cleared_at=datetime('now') + WHERE repo=? AND cleared_at IS NULL``. + """ + from . import serial_gate + if not repo or not repo.strip(): + return {"ok": False, "error": "missing 'repo'", "repo": repo, "cleared": 0} + repo = repo.strip() + cleared = serial_gate.clear_repo_freeze(repo) + frozen = serial_gate.is_repo_frozen(repo) + if cleared: + try: + from .notifications import send_telegram + send_telegram( + f"🔥 {repo}: пакет РАЗМОРОЖЕН вручную ({cleared} запис(ь/и) снято). " + f"Следующая задача репо стартует на ближайшем цикле." + ) + except Exception: + pass + return {"ok": True, "repo": repo, "cleared": cleared, "frozen": frozen} diff --git a/src/serial_gate.py b/src/serial_gate.py new file mode 100644 index 0000000..ae273b7 --- /dev/null +++ b/src/serial_gate.py @@ -0,0 +1,404 @@ +"""ORCH-088 (Этап 1, serial e2e): per-repo serial gate + durable rollback-freeze. + +Leaf module — pure, unit-testable logic over the existing ``tasks`` / ``jobs`` +tables and the additive ``repo_freeze`` table (see src/db.py / +08-data-requirements.md). Mirrors the leaf pattern of ``src/task_deps.py`` / +``src/post_deploy.py``: imports only ``db`` + ``config`` (and lazily +``projects`` for the snapshot), never ``stage_engine`` / ``launcher``. + +What it enforces (ADR-001): + * A NEW task's analyst-job does NOT enter analysis (no branch cut, no analyst + agent) while the same repo has ANOTHER unfinished task (``tasks.stage != + 'done'``) OR the repo is frozen. The gate is a SQL fragment spliced into + ``db.claim_next_job`` (offline hot path) — ``build_claim_clause``. + * After a post-deploy ``DEGRADED`` verdict the repo is frozen + (``set_repo_freeze``); the gate stays CLOSED until an operator clears it + (``clear_repo_freeze``). The degraded task is already ``stage='done'`` (BR-7) + so freeze is a SEPARATE durable signal, not derived from a stage. + +never-raise contract (self-hosting safety): every public function degrades +conservatively and NEVER propagates into the worker / webhook / stage engine. +Two deliberately different failure directions (ADR-001 D10, NFR-1): + * hot-claim clause build -> fail-OPEN ("" fragment): a transient DB/build error + must not wedge the queue of ALL projects (AC-8). + * freeze decision (``is_repo_frozen``) -> fail-CLOSED (``True``): when we cannot + confirm the ABSENCE of a freeze we keep the gate closed for prod safety (AC-9). +""" +from __future__ import annotations + +import logging +import re + +from . import db +from .config import settings + +logger = logging.getLogger("orchestrator.serial_gate") + +# Repo tokens embedded into the claim SQL ``IN (...)`` list must match this — a +# guard against a broken/injected ORCH_SERIAL_GATE_REPOS CSV (R-6). The CSV is an +# operator config (not user input), but the guard is mandatory; an invalid token +# is silently dropped. +_REPO_TOKEN = re.compile(r"^[A-Za-z0-9._-]+$") + + +# --------------------------------------------------------------------------- +# Conditionality (mirrors post_deploy_applies / _merge_gate_applies) +# --------------------------------------------------------------------------- +def _scope_repos() -> set[str]: + """Sanitised set of in-scope repo tokens from ``serial_gate_repos`` (CSV). + + Empty/blank CSV -> empty set, meaning "apply to ALL repos" (D5). Invalid + tokens (regex miss) are dropped. Never raises. + """ + try: + raw = (settings.serial_gate_repos or "").strip() + except Exception: # noqa: BLE001 + return set() + if not raw: + return set() + out: set[str] = set() + for tok in raw.split(","): + t = tok.strip() + if t and _REPO_TOKEN.match(t): + out.add(t) + elif t: + logger.warning("serial_gate: dropping invalid repo token %r from CSV", t) + return out + + +def serial_gate_applies(repo: str) -> bool: + """Whether the serial gate is REAL for this repo (D5 / AC-7). + + * ``serial_gate_enabled=False`` -> always False (kill-switch; claim and + start_pipeline are 1:1 as before ORCH-088). + * ``serial_gate_repos`` (CSV) non-empty -> real only for listed repos. + * empty CSV -> real for ALL repos (serial e2e + anti-stale-base help every + repo, unlike the self-hosting-only ORCH-35/43/58 gates). + Never raises -> False on error (degrade to "gate inert", the safe-for-flow + default that matches the kill-switch off behaviour). + """ + try: + if not getattr(settings, "serial_gate_enabled", False): + return False + scope = _scope_repos() + if scope: + return (repo or "").strip() in scope + return True + except Exception as e: # noqa: BLE001 - never-raise + logger.warning("serial_gate_applies error for %s: %s", repo, e) + return False + + +def _freeze_layer_enabled() -> bool: + """Whether the FR-5 freeze layer is active (independent tumbler, D7).""" + try: + return bool(getattr(settings, "serial_gate_freeze_enabled", False)) + except Exception: # noqa: BLE001 + return False + + +# --------------------------------------------------------------------------- +# Read helpers (active task + freeze) — only the local DB +# --------------------------------------------------------------------------- +def repo_has_active_task(repo: str, exclude_task_id: int | None = None) -> bool: + """True iff repo has a task with ``stage != 'done'`` (excluding one task). + + ``exclude_task_id`` is the task being evaluated (a new/rework task must not + count ITSELF as the active task that blocks it — R-7). Observability/Python + mirror of the SQL gate; never raises -> False on error. + """ + try: + conn = db.get_db() + try: + if exclude_task_id is not None: + row = conn.execute( + "SELECT 1 FROM tasks WHERE repo=? AND id != ? AND stage != 'done' LIMIT 1", + (repo, exclude_task_id), + ).fetchone() + else: + row = conn.execute( + "SELECT 1 FROM tasks WHERE repo=? AND stage != 'done' LIMIT 1", + (repo,), + ).fetchone() + return row is not None + finally: + conn.close() + except Exception as e: # noqa: BLE001 - never-raise + logger.warning("repo_has_active_task error for %s: %s", repo, e) + return False + + +def _active_freeze_row(repo: str) -> dict | None: + """Most-recent active (``cleared_at IS NULL``) freeze row for repo, or None. + + Raises on a real DB error (the caller decides fail-open vs fail-closed) — this + private helper does NOT swallow so ``is_repo_frozen`` can fail CLOSED. + """ + conn = db.get_db() + try: + row = conn.execute( + "SELECT repo, frozen_at, reason, work_item_id FROM repo_freeze " + "WHERE repo=? AND cleared_at IS NULL ORDER BY id DESC LIMIT 1", + (repo,), + ).fetchone() + return dict(row) if row else None + finally: + conn.close() + + +def is_repo_frozen(repo: str) -> bool: + """True iff repo currently has an active freeze (FR-5). + + fail-CLOSED (AC-9): when the freeze layer is enabled and we CANNOT confirm the + absence of a freeze (DB error), return True — keep the gate closed for prod + safety. When the freeze layer is disabled the repo is never considered frozen. + """ + if not _freeze_layer_enabled(): + return False + try: + return _active_freeze_row(repo) is not None + except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt (AC-9) + logger.warning("is_repo_frozen error for %s -> fail-CLOSED (frozen): %s", repo, e) + return True + + +# --------------------------------------------------------------------------- +# Freeze mutators (FR-5) +# --------------------------------------------------------------------------- +def set_repo_freeze(repo: str, reason: str = "", work_item_id: str | None = None) -> bool: + """Insert a durable freeze row for repo (no-op when the freeze layer is off). + + Append-only: a repeated DEGRADED while already frozen simply adds another row + (history); ``is_repo_frozen``'s EXISTS is idempotent. Returns True iff a row + was inserted. never-raise -> False on error (a freeze write failure must not + crash the post-deploy monitor tick). + """ + if not _freeze_layer_enabled(): + logger.info("set_repo_freeze: freeze layer disabled, skipping for %s", repo) + return False + if not repo: + return False + try: + conn = db.get_db() + try: + conn.execute( + "INSERT INTO repo_freeze (repo, reason, work_item_id) VALUES (?, ?, ?)", + (repo, reason or None, work_item_id), + ) + conn.commit() + finally: + conn.close() + logger.warning( + "serial_gate: repo %s FROZEN (reason=%r, work_item=%s) — next task will " + "NOT start until manual unfreeze", repo, reason, work_item_id, + ) + return True + except Exception as e: # noqa: BLE001 - never-raise + logger.error("set_repo_freeze error for %s: %s", repo, e) + return False + + +def clear_repo_freeze(repo: str) -> int: + """Clear ALL active freeze rows for repo (operator unfreeze, D4). + + Sets ``cleared_at=now`` on every row with ``cleared_at IS NULL``. Idempotent + (a repeat clears 0 rows). Returns the number of rows cleared. never-raise -> 0 + on error. + """ + if not repo: + return 0 + try: + conn = db.get_db() + try: + cur = conn.execute( + "UPDATE repo_freeze SET cleared_at=datetime('now') " + "WHERE repo=? AND cleared_at IS NULL", + (repo,), + ) + conn.commit() + n = cur.rowcount or 0 + finally: + conn.close() + if n: + logger.warning("serial_gate: repo %s UNFROZEN (%d row(s) cleared)", repo, n) + return n + except Exception as e: # noqa: BLE001 - never-raise + logger.error("clear_repo_freeze error for %s: %s", repo, e) + return 0 + + +# --------------------------------------------------------------------------- +# Hot-claim SQL fragment (fail-OPEN) — ADR-001 D1 +# --------------------------------------------------------------------------- +def build_claim_clause() -> str: + """Build the ``AND NOT (...)`` fragment spliced into ``claim_next_job``. + + Blocks an analyst-job whose repo (a) has an EARLIER-queued unfinished task or + (b) is frozen. Only ``jobs.agent='analyst'`` is gated — jobs of an + already-active task pass freely (else the single active task could never + advance). + + Ordering term — ``t2.id < jobs.task_id`` (FIFO, ADR-001 D1 / FR-2): a task is + blocked only by EARLIER tasks (lower ``tasks.id``) that are not yet done. This + is the FIFO refinement of the ADR's pseudo-SQL ``t2.id != jobs.task_id``: with + ``!=`` a BATCH of fresh tasks all sitting in ``analysis`` would mutually block + (each is "another unfinished task" for the others) -> the whole serial queue + deadlocks, contradicting FR-2 ("строго по одной, FIFO по jobs.id"). ``<`` admits + exactly the oldest unfinished task and serialises the rest behind it, while + still never self-blocking a new/rework analyst-job on its OWN row (R-7) and + keeping AC-1 (a newer task is held by the older active one) intact. + + Repo scope: empty CSV -> no repo filter (all repos); non-empty CSV -> ``AND + jobs.repo IN ('a','b')`` with sanitised tokens (R-6). + + fail-OPEN (AC-8): kill-switch off OR any build error -> ``""`` (claim behaves + exactly as before ORCH-088). The trailing space keeps the spliced SQL valid. + """ + try: + if not getattr(settings, "serial_gate_enabled", False): + return "" + scope = _scope_repos() + if scope: + # All tokens already passed the _REPO_TOKEN regex -> safe to embed. + repo_in = ", ".join(f"'{t}'" for t in sorted(scope)) + repo_scope = f"AND jobs.repo IN ({repo_in}) " + else: + repo_scope = "" + active_clause = ( + "EXISTS (SELECT 1 FROM tasks t2 " + "WHERE t2.repo = jobs.repo AND t2.id < jobs.task_id " + "AND t2.stage != 'done') " + ) + if _freeze_layer_enabled(): + freeze_clause = ( + "OR EXISTS (SELECT 1 FROM repo_freeze f " + "WHERE f.repo = jobs.repo AND f.cleared_at IS NULL) " + ) + else: + freeze_clause = "" + return ( + "AND NOT ( jobs.agent = 'analyst' " + f"{repo_scope}" + f"AND ( {active_clause}{freeze_clause}) " + ") " + ) + except Exception as e: # noqa: BLE001 - fail-OPEN: never wedge the queue + logger.warning("build_claim_clause error -> fail-OPEN (no gate): %s", e) + return "" + + +# --------------------------------------------------------------------------- +# Observability snapshot for GET /queue (D9 / AC-10) +# --------------------------------------------------------------------------- +def _known_repos() -> list[str]: + """Registered repo names (best-effort) plus any repo with live gate state.""" + repos: set[str] = set() + try: + from . import projects + for p in projects.PROJECTS: + if getattr(p, "repo", None): + repos.add(p.repo) + except Exception: # noqa: BLE001 + pass + # Also surface repos that have an active freeze or a queued analyst-job even if + # they are not in the static registry (defensive — never hide a frozen repo). + try: + conn = db.get_db() + try: + for (r,) in conn.execute( + "SELECT DISTINCT repo FROM repo_freeze WHERE cleared_at IS NULL" + ).fetchall(): + if r: + repos.add(r) + for (r,) in conn.execute( + "SELECT DISTINCT repo FROM jobs WHERE status='queued' AND agent='analyst'" + ).fetchall(): + if r: + repos.add(r) + finally: + conn.close() + except Exception: # noqa: BLE001 + pass + return sorted(repos) + + +def _per_repo_snapshot(repo: str) -> dict: + """Per-repo gate state for the /queue snapshot (never raises here).""" + active_task = None + waiting: list[dict] = [] + try: + conn = db.get_db() + try: + row = conn.execute( + "SELECT work_item_id, stage FROM tasks " + "WHERE repo=? AND stage != 'done' ORDER BY id LIMIT 1", + (repo,), + ).fetchone() + if row: + active_task = {"work_item_id": row["work_item_id"], "stage": row["stage"]} + for j in conn.execute( + "SELECT j.id AS job_id, t.work_item_id AS work_item_id, t.stage AS stage " + "FROM jobs j LEFT JOIN tasks t ON t.id = j.task_id " + "WHERE j.repo=? AND j.status='queued' AND j.agent='analyst' " + "ORDER BY j.id", + (repo,), + ).fetchall(): + waiting.append({ + "job_id": j["job_id"], + "work_item_id": j["work_item_id"], + "stage": j["stage"], + }) + finally: + conn.close() + except Exception as e: # noqa: BLE001 + logger.warning("serial_gate per-repo snapshot error for %s: %s", repo, e) + frozen = is_repo_frozen(repo) + frozen_reason = None + frozen_at = None + if frozen: + try: + fr = _active_freeze_row(repo) + if fr: + frozen_reason = fr.get("reason") + frozen_at = fr.get("frozen_at") + except Exception: # noqa: BLE001 + pass + return { + "active_task": active_task, + "waiting": waiting, + "frozen": frozen, + "frozen_reason": frozen_reason, + "frozen_at": frozen_at, + } + + +def snapshot() -> dict: + """Read-only serial-gate summary for GET /queue (D9 / AC-10). + + Additive block; existing /queue keys are untouched. never-raise: any error -> + a minimal dict with the flags and empty per-repo data. + """ + try: + enabled = bool(getattr(settings, "serial_gate_enabled", False)) + except Exception: # noqa: BLE001 + enabled = False + try: + repos_cfg = getattr(settings, "serial_gate_repos", "") or "" + except Exception: # noqa: BLE001 + repos_cfg = "" + try: + per_repo = {r: _per_repo_snapshot(r) for r in _known_repos()} + return { + "enabled": enabled, + "freeze_enabled": _freeze_layer_enabled(), + "repos": repos_cfg, + "per_repo": per_repo, + } + except Exception as e: # noqa: BLE001 - never-raise -> minimal dict + logger.warning("serial_gate snapshot error: %s", e) + return { + "enabled": enabled, + "freeze_enabled": False, + "repos": repos_cfg, + "per_repo": {}, + } diff --git a/src/stage_engine.py b/src/stage_engine.py index e76af93..a48ae72 100644 --- a/src/stage_engine.py +++ b/src/stage_engine.py @@ -1708,6 +1708,25 @@ def run_post_deploy_monitor(job: dict): except Exception as e: # noqa: BLE001 - never break the tick logger.warning(f"post-deploy: set Blocked failed for {work_item_id}: {e}") + # ORCH-088 (FR-5, ADR-001 D3): durable per-repo rollback-freeze. The degraded + # task is already stage='done' (BR-7), so the ordinary active-task gate would + # NOT hold the next task — we need a separate durable signal. Freeze the repo so + # the serial gate stays CLOSED until an operator clears it (POST + # /serial-gate/unfreeze). never-raise (set_repo_freeze swallows its own errors); + # the freeze is a PASSIVE start-block, it does NOT touch the prod container (NFR-6). + try: + from . import serial_gate + reason = f"post-deploy DEGRADED ({checks_failed}/{checks_total}) action={action_taken}" + if serial_gate.set_repo_freeze(repo, reason, work_item_id): + _notify_post_deploy( + work_item_id, + f"🧊 {repo}: пакет ЗАМОРОЖЕН после пост-деплой DEGRADED " + f"({work_item_id}). Следующая задача репо НЕ стартует до ручного " + f"снятия: POST /serial-gate/unfreeze?repo={repo}.", + ) + except Exception as e: # noqa: BLE001 - never break the tick + logger.warning(f"post-deploy: set_repo_freeze failed for {repo}: {e}") + post_deploy.write_post_deploy_log( repo, work_item_id, branch, post_deploy.DEGRADED, action_taken, settings.post_deploy_window_s, checks_total, checks_failed, diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index c56bbbd..597011f 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -573,20 +573,36 @@ async def start_pipeline(data: dict, project_id: str = ""): return task_id = task_row["id"] - # Create branch in Gitea - try: - await _create_gitea_branch(repo, branch) - except Exception as e: - logger.error(f"Failed to create branch '{branch}': {e}") - # Task is created, branch creation failed — log but don't crash - notify_error(0, f"Branch creation failed: {e}") - return + # ORCH-088 (FR-1/AC-6, ADR-001 D1): DEFER the branch cut for an applicable repo. + # Creating the Gitea branch here (T0, issue -> analysis) would cut it from `main` + # BEFORE the predecessor is merged -> stale base. When the serial gate applies we + # do NOT create the branch / initial docs now; the analyst-job sits in the queue + # (status='queued', no branch) and the gate keeps it there until the predecessor + # reaches stage='done'. The branch + docs are then materialised at claim time in + # launcher._spawn from a fresh origin/main (anti-stale-base). The task row already + # stores `branch` as a NAME (R-5) — only the git ref is deferred. + from .. import serial_gate + defer_branch = serial_gate.serial_gate_applies(repo) + if not defer_branch: + # Create branch in Gitea + try: + await _create_gitea_branch(repo, branch) + except Exception as e: + logger.error(f"Failed to create branch '{branch}': {e}") + # Task is created, branch creation failed — log but don't crash + notify_error(0, f"Branch creation failed: {e}") + return - # Create initial docs structure via Gitea API (create file) - try: - await _create_initial_docs(repo, branch, work_item_id, name) - except Exception as e: - logger.error(f"Failed to create initial docs: {e}") + # Create initial docs structure via Gitea API (create file) + try: + await _create_initial_docs(repo, branch, work_item_id, name) + except Exception as e: + logger.error(f"Failed to create initial docs: {e}") + else: + logger.info( + f"Task {work_item_id}: serial gate applies for {repo} -> deferring branch " + f"cut to analyst-job claim (anti-stale-base, ORCH-088)" + ) logger.info(f"Task created: {work_item_id} ({name}), branch={branch}, stage=analysis") diff --git a/tests/test_plane_webhook.py b/tests/test_plane_webhook.py index ec73c30..1f5a6b1 100644 --- a/tests/test_plane_webhook.py +++ b/tests/test_plane_webhook.py @@ -79,6 +79,11 @@ def setup(monkeypatch): monkeypatch.setattr(P.settings, "db_path", _test_db) import src.db as _db monkeypatch.setattr(_db.settings, "db_path", _test_db) + # ORCH-088: these are pre-ORCH-088 repo-routing tests that assert the branch is + # cut DURING start_pipeline. With the serial gate ON (default) the branch cut is + # deferred to the analyst-job claim, so pin them to the kill-switch-off (legacy) + # path — branch timing is out of scope here (covered by test_serial_gate_branch). + monkeypatch.setattr(_db.settings, "serial_gate_enabled", False, raising=False) if os.path.exists(_test_db): os.unlink(_test_db) init_db() diff --git a/tests/test_queue_endpoint.py b/tests/test_queue_endpoint.py new file mode 100644 index 0000000..9e51d74 --- /dev/null +++ b/tests/test_queue_endpoint.py @@ -0,0 +1,61 @@ +"""ORCH-088 — GET /queue additive serial_gate block (AC-10 / TC-20). + +The /queue payload must gain an additive ``serial_gate`` block WITHOUT changing +any pre-existing key (counts/max_concurrency/reconcile/reaper/post_deploy/ +task_deps/recent ...). +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_queue_endpoint.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db # noqa: E402 +from src import config as cfg # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "q.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False) + init_db() + yield + + +def test_queue_has_serial_gate_block_and_keeps_existing_keys(): + import asyncio + from src import main + + payload = asyncio.run(main.queue()) + + # Pre-existing keys are all still present (no contract break). + for key in ( + "counts", "max_concurrency", "poll_interval", "resilience", "reconcile", + "reaper", "post_deploy", "merge_verify", "task_deps", "recent", + ): + assert key in payload, f"existing /queue key '{key}' must be preserved" + + # New additive block. + assert "serial_gate" in payload + sg = payload["serial_gate"] + assert sg["enabled"] is True + assert "repos" in sg and "freeze_enabled" in sg + assert isinstance(sg["per_repo"], dict) + + +def test_queue_serial_gate_reflects_freeze(): + import asyncio + from src import main + from src import serial_gate + + serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-900") + payload = asyncio.run(main.queue()) + per = payload["serial_gate"]["per_repo"] + assert "orchestrator" in per + assert per["orchestrator"]["frozen"] is True + assert per["orchestrator"]["frozen_reason"] == "DEGRADED" diff --git a/tests/test_serial_gate.py b/tests/test_serial_gate.py new file mode 100644 index 0000000..ada6b8b --- /dev/null +++ b/tests/test_serial_gate.py @@ -0,0 +1,188 @@ +"""ORCH-088 — per-repo serial gate, unit tests (real tmp SQLite). + +Covers (04-test-plan.yaml): + TC-01 claim_next_job does NOT claim an analyst-job of a NEW task B while the + repo has an unfinished task A (gate closed). + TC-02 serial_gate_applies: enabled + empty CSV -> True for any repo; CSV + membership -> True; repo outside CSV -> False; disabled -> False. + TC-03 jobs of an ALREADY-active task (architect/developer/.../deployer) are + never gated — the single active task advances freely. + TC-08 per-repo: an active orchestrator task does NOT gate an enduro analyst-job. + TC-15 kill-switch off -> claim is 1:1 as before ORCH-088. + TC-16 repo outside a non-empty CSV -> gate inert for that repo. + TC-17 DB/build error in the gate -> fail-OPEN: claim does not crash, still claims. + TC-19 snapshot() shape + never-raise. + TC-21 STAGE_TRANSITIONS / QG_CHECKS registries unchanged (no new QG check). +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402 +from src import serial_gate # noqa: E402 +from src import config as cfg # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "sg.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + # Feature ON by default; freeze layer ON; empty CSV (all repos). + monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False) + # Keep the unrelated dep-gate inert so claim semantics isolate the serial gate. + monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False) + init_db() + yield + + +def _make_task(work_item_id, stage="analysis", repo="orchestrator"): + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) " + "VALUES (?, ?, ?, ?, ?, ?)", + (work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +def _set_stage(task_id, stage): + conn = get_db() + conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id)) + conn.commit() + conn.close() + + +# --------------------------------------------------------------- TC-01 +def test_gate_closed_when_repo_has_active_task(): + _make_task("ORCH-201", stage="development") # active predecessor + b = _make_task("ORCH-202", stage="analysis") # new task + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + # A is unfinished -> the analyst-job of B is NOT claimable. + assert claim_next_job() is None, "analyst-job must be gated by active task A" + # /queue shows B waiting + an active task for the repo. + snap = serial_gate.snapshot() + per = snap["per_repo"]["orchestrator"] + assert per["active_task"]["work_item_id"] == "ORCH-201" + assert any(w["job_id"] == job_b for w in per["waiting"]) + + +# --------------------------------------------------------------- TC-02 +def test_serial_gate_applies_scopes(monkeypatch): + # enabled + empty CSV -> all repos. + assert serial_gate.serial_gate_applies("orchestrator") is True + assert serial_gate.serial_gate_applies("enduro-trails") is True + # CSV membership. + monkeypatch.setattr(cfg.settings, "serial_gate_repos", "orchestrator", raising=False) + assert serial_gate.serial_gate_applies("orchestrator") is True + assert serial_gate.serial_gate_applies("enduro-trails") is False + # kill-switch off -> never applies. + monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False) + assert serial_gate.serial_gate_applies("orchestrator") is False + + +# --------------------------------------------------------------- TC-03 +def test_non_analyst_job_of_active_task_passes(): + a = _make_task("ORCH-210", stage="development") + # an unrelated unfinished task in the same repo (would close the gate for analyst) + _make_task("ORCH-211", stage="analysis") + for role in ("architect", "developer", "reviewer", "tester", "deployer"): + jid = enqueue_job(role, "orchestrator", role, task_id=a) + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == jid, ( + f"{role}-job of an active task must never be gated" + ) + # finish it so the next role's job is the only queued one. + db.mark_job(jid, "done") + + +# --------------------------------------------------------------- TC-08 +def test_per_repo_isolation(): + # orchestrator busy; enduro gets a brand-new analyst-job. + _make_task("ORCH-220", stage="development", repo="orchestrator") + b = _make_task("ET-220", stage="analysis", repo="enduro-trails") + job_b = enqueue_job("analyst", "enduro-trails", "B", task_id=b) + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b, ( + "orchestrator's active task must not gate enduro's analyst-job" + ) + + +# --------------------------------------------------------------- TC-15 +def test_kill_switch_off_is_inert(monkeypatch): + monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False) + _make_task("ORCH-230", stage="development") # active task + b = _make_task("ORCH-231", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b, ( + "with the kill-switch off the gate must be inert (claims as before)" + ) + + +# --------------------------------------------------------------- TC-16 +def test_repo_outside_csv_not_gated(monkeypatch): + monkeypatch.setattr(cfg.settings, "serial_gate_repos", "enduro-trails", raising=False) + _make_task("ORCH-240", stage="development") # active orchestrator task + b = _make_task("ORCH-241", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b, ( + "orchestrator is outside the CSV scope -> gate must not apply" + ) + + +# --------------------------------------------------------------- TC-17 +def test_build_clause_error_fails_open(monkeypatch): + """A build error in the gate clause must fail-OPEN (claim still proceeds).""" + _make_task("ORCH-250", stage="development") # would close the gate + b = _make_task("ORCH-251", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + + def _boom(): + raise RuntimeError("clause build down") + + monkeypatch.setattr(serial_gate, "build_claim_clause", _boom, raising=True) + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b, ( + "a gate build error must fail-OPEN, not wedge the queue (AC-8)" + ) + + +# --------------------------------------------------------------- TC-19 +def test_snapshot_shape_and_never_raises(monkeypatch): + snap = serial_gate.snapshot() + assert snap["enabled"] is True + assert "repos" in snap and "freeze_enabled" in snap + assert isinstance(snap["per_repo"], dict) + # never-raise: a DB failure -> minimal dict with flags, empty per_repo. + monkeypatch.setattr( + serial_gate, "_known_repos", + lambda: (_ for _ in ()).throw(RuntimeError("db down")), + raising=True, + ) + snap2 = serial_gate.snapshot() + assert snap2["per_repo"] == {} + assert snap2["enabled"] is True + + +# --------------------------------------------------------------- TC-21 +def test_registries_unchanged(): + from src.stages import STAGE_TRANSITIONS + from src.qg.checks import QG_CHECKS + assert set(STAGE_TRANSITIONS) == { + "created", "analysis", "architecture", "development", "review", + "testing", "deploy-staging", "deploy", "done", + } + # No serial-gate QG check was introduced (the gate is a scheduler condition). + assert not any("serial" in k for k in QG_CHECKS), "no new QG check expected" diff --git a/tests/test_serial_gate_branch.py b/tests/test_serial_gate_branch.py new file mode 100644 index 0000000..b4aeb96 --- /dev/null +++ b/tests/test_serial_gate_branch.py @@ -0,0 +1,153 @@ +"""ORCH-088 — deferred branch cut / anti-stale-base (FR-1/AC-6). + +Covers (04-test-plan.yaml): + TC-13 while the serial gate applies, start_pipeline does NOT create the Gitea + branch / initial docs (the cut is deferred to the analyst-job claim); + with the kill-switch off it creates them immediately (1:1 as before). + TC-14 a branch cut at claim time (ensure_worktree on a not-yet-existing branch) + is based on a FRESH origin/main that already contains the predecessor: + git merge-base --is-ancestor is true. +""" +import os +import subprocess +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate_branch.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db # noqa: E402 +from src import config as cfg # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "branch.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False) + monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False) + monkeypatch.setattr(cfg.settings, "task_deps_source", "db", raising=False) + init_db() + yield + + +# --------------------------------------------------------------- TC-13 +async def _drive_start_pipeline(monkeypatch, gate_applies: bool): + from src.webhooks import plane + from src import plane_sync + from src.projects import ProjectConfig + + proj = ProjectConfig( + plane_project_id="proj-uuid", + repo="orchestrator", + work_item_prefix="ORCH", + name="orch", + ) + monkeypatch.setattr(plane, "get_project_by_plane_id", lambda pid: proj) + monkeypatch.setattr(plane, "_qg0_errors", lambda name, desc: []) + monkeypatch.setattr(plane, "ensure_unique_work_item_id", lambda wid, repo: wid) + monkeypatch.setattr( + plane, "create_task_atomic", + lambda *a, **k: ({"id": 1, "work_item_id": "ORCH-500"}, True), + ) + monkeypatch.setattr(plane_sync, "fetch_issue_sequence_id", lambda *a, **k: 500) + monkeypatch.setattr(plane_sync, "set_issue_analysis", lambda *a, **k: None) + monkeypatch.setattr(plane_sync, "add_comment", lambda *a, **k: None) + monkeypatch.setattr(cfg.settings, "serial_gate_enabled", gate_applies, raising=False) + + enq = [] + monkeypatch.setattr(plane, "enqueue_job", lambda *a, **k: (enq.append((a, k)) or 99)) + + branch_calls, docs_calls = [], [] + + async def _branch_spy(repo, branch): + branch_calls.append((repo, branch)) + + async def _docs_spy(repo, branch, wi, name): + docs_calls.append((repo, branch, wi, name)) + + monkeypatch.setattr(plane, "_create_gitea_branch", _branch_spy) + monkeypatch.setattr(plane, "_create_initial_docs", _docs_spy) + + data = { + "id": "issue-uuid-1", + "name": "Add serial gate", + "description_stripped": "A sufficiently long description for QG-0 to pass.", + "project": "proj-uuid", + } + await plane.start_pipeline(data, project_id="proj-uuid") + return branch_calls, docs_calls, enq + + +def test_branch_cut_deferred_when_gate_applies(monkeypatch): + import asyncio + branch_calls, docs_calls, enq = asyncio.run( + _drive_start_pipeline(monkeypatch, gate_applies=True) + ) + assert branch_calls == [], "branch must NOT be cut in start_pipeline while gated" + assert docs_calls == [], "initial docs must NOT be created while gated" + # The analyst-job is still enqueued (it waits in the queue without a branch). + assert any(a[0] == "analyst" for a, k in enq), "analyst-job must still be enqueued" + + +def test_branch_cut_immediate_when_kill_switch_off(monkeypatch): + import asyncio + branch_calls, docs_calls, enq = asyncio.run( + _drive_start_pipeline(monkeypatch, gate_applies=False) + ) + assert branch_calls, "with the gate off the branch is cut in start_pipeline (1:1)" + assert docs_calls, "with the gate off initial docs are created in start_pipeline" + + +# --------------------------------------------------------------- TC-14 +def _git(*args, cwd): + env = { + **os.environ, + "GIT_AUTHOR_NAME": "t", "GIT_AUTHOR_EMAIL": "t@t", + "GIT_COMMITTER_NAME": "t", "GIT_COMMITTER_EMAIL": "t@t", + } + return subprocess.run(["git", *args], cwd=cwd, env=env, + capture_output=True, text=True, check=True) + + +def test_deferred_branch_base_contains_predecessor(tmp_path, monkeypatch): + """A branch cut at claim time is based on a fresh origin/main with A's code.""" + from src import git_worktree + + origin = tmp_path / "origin.git" + origin.mkdir() + _git("init", "--bare", "-b", "main", str(origin), cwd=tmp_path) + + repos_dir = tmp_path / "repos" + wt_dir = tmp_path / "wt" + repos_dir.mkdir() + wt_dir.mkdir() + repo = "orchestrator" + clone = repos_dir / repo + _git("clone", str(origin), str(clone), cwd=tmp_path) + + # Predecessor A: commit on main + push to origin (== "A merged at its done"). + (clone / "a.txt").write_text("A's code\n") + _git("add", "a.txt", cwd=clone) + _git("commit", "-m", "task A", cwd=clone) + _git("push", "origin", "main", cwd=clone) + sha_a = _git("rev-parse", "HEAD", cwd=clone).stdout.strip() + + monkeypatch.setattr(git_worktree.settings, "repos_dir", str(repos_dir), raising=False) + monkeypatch.setattr(git_worktree.settings, "worktrees_dir", str(wt_dir), raising=False) + + # Branch B does not exist yet -> ensure_worktree cuts it from fresh origin/main. + wt = git_worktree.ensure_worktree(repo, "feature/ORCH-B") + head_b = _git("rev-parse", "HEAD", cwd=wt).stdout.strip() + + # AC-6: A's commit is an ancestor of B's base. + r = subprocess.run( + ["git", "-C", wt, "merge-base", "--is-ancestor", sha_a, head_b], + capture_output=True, + ) + assert r.returncode == 0, "branch B base must contain predecessor A's commit (AC-6)" diff --git a/tests/test_serial_gate_e2e.py b/tests/test_serial_gate_e2e.py new file mode 100644 index 0000000..c12fd3a --- /dev/null +++ b/tests/test_serial_gate_e2e.py @@ -0,0 +1,113 @@ +"""ORCH-088 — serial gate end-to-end queue behaviour (real tmp SQLite). + +Covers (04-test-plan.yaml): + TC-04 after A.stage='done' the waiting analyst-job of B is claimed (gate opens + automatically — no manual action). + TC-05 a queue of 3 tasks of one repo is processed strictly one-at-a-time, FIFO + by jobs.id: while A is unfinished neither B nor C starts. + TC-06 restart-safe: the active task is derived from the DB (tasks.repo + + stage!='done'), not in-memory — re-reading state keeps the gate closed. +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate_e2e.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402 +from src import config as cfg # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "e2e.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False) + monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False) + init_db() + yield + + +def _make_task(work_item_id, stage="analysis", repo="orchestrator"): + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " + "VALUES (?, ?, ?, ?, ?)", + (work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +def _set_stage(task_id, stage): + conn = get_db() + conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id)) + conn.commit() + conn.close() + + +# --------------------------------------------------------------- TC-04 +def test_next_starts_automatically_when_predecessor_done(): + a = _make_task("ORCH-301", stage="development") + b = _make_task("ORCH-302", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + + assert claim_next_job() is None, "B gated while A unfinished" + + # A reaches done -> the gate opens on the NEXT claim tick, no manual action. + _set_stage(a, "done") + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b + + +# --------------------------------------------------------------- TC-05 +def test_three_tasks_processed_one_at_a_time_fifo(): + a = _make_task("ORCH-310", stage="analysis") + b = _make_task("ORCH-311", stage="analysis") + c = _make_task("ORCH-312", stage="analysis") + job_a = enqueue_job("analyst", "orchestrator", "A", task_id=a) + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + job_c = enqueue_job("analyst", "orchestrator", "C", task_id=c) + + # Only the FIFO-first task (A, lowest id) is claimable. + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_a + assert claim_next_job() is None, "B and C must wait while A is unfinished" + + # A runs through to done; now B (next) is claimable, C still waits. + db.mark_job(job_a, "done") + _set_stage(a, "done") + claimed_b = claim_next_job() + assert claimed_b is not None and claimed_b["id"] == job_b + assert claim_next_job() is None, "C must wait while B is unfinished" + + # B done -> C claimable last (strict FIFO order preserved). + db.mark_job(job_b, "done") + _set_stage(b, "done") + claimed_c = claim_next_job() + assert claimed_c is not None and claimed_c["id"] == job_c + + +# --------------------------------------------------------------- TC-06 +def test_restart_safe_active_task_from_db(): + a = _make_task("ORCH-320", stage="development") + b = _make_task("ORCH-321", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + assert claim_next_job() is None + + # Simulate a restart: there is NO in-memory state — the gate recomputes purely + # from the DB. Re-running init_db (idempotent) + a fresh claim must still gate B. + init_db() + assert claim_next_job() is None, "after restart the gate is still closed (DB-derived)" + + _set_stage(a, "done") + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b diff --git a/tests/test_serial_gate_freeze.py b/tests/test_serial_gate_freeze.py new file mode 100644 index 0000000..d58071d --- /dev/null +++ b/tests/test_serial_gate_freeze.py @@ -0,0 +1,160 @@ +"""ORCH-088 — rollback-freeze layer (FR-5) tests (real tmp SQLite). + +Covers (04-test-plan.yaml): + TC-07 freeze survives a restart (durable in DB) — next task stays gated. + TC-09 freeze of orchestrator does NOT affect enduro-trails (per-repo). + TC-10 post-deploy DEGRADED -> durable freeze row + Telegram alert attempted. + TC-11 an active freeze gates the next analyst-job even with NO unfinished task + (the degraded task is already done — BR-7). + TC-12 manual clear_repo_freeze -> next task is claimable again. + TC-18 is_repo_frozen fails CLOSED on a read error (frozen=True on doubt). + TC-22 repo_freeze migration is idempotent (re-init does not dup / crash). +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate_freeze.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402 +from src import serial_gate # noqa: E402 +from src import config as cfg # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "freeze.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False) + monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False) + init_db() + yield + + +def _make_task(work_item_id, stage="analysis", repo="orchestrator"): + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " + "VALUES (?, ?, ?, ?, ?)", + (work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +# --------------------------------------------------------------- TC-07 +def test_freeze_survives_restart(): + b = _make_task("ORCH-401", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + assert serial_gate.set_repo_freeze("orchestrator", "post-deploy DEGRADED", "ORCH-400") is True + + assert claim_next_job() is None, "frozen repo gates the analyst-job" + # Simulate restart: no in-memory state, re-init (idempotent) -> still frozen. + init_db() + assert serial_gate.is_repo_frozen("orchestrator") is True + assert claim_next_job() is None, "freeze is durable across restart" + assert job_b # referenced + + +# --------------------------------------------------------------- TC-09 +def test_freeze_is_per_repo(): + serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-410") + b = _make_task("ET-410", stage="analysis", repo="enduro-trails") + job_b = enqueue_job("analyst", "enduro-trails", "B", task_id=b) + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b, ( + "an orchestrator freeze must not gate enduro-trails" + ) + assert serial_gate.is_repo_frozen("enduro-trails") is False + + +# --------------------------------------------------------------- TC-10 +def test_post_deploy_degraded_sets_freeze_and_alerts(tmp_path, monkeypatch): + from src import stage_engine, post_deploy + + # Sandbox the post-deploy sentinel state dir so a prior DONE marker can't + # short-circuit the tick (state lives under settings.repos_dir). + monkeypatch.setattr(post_deploy.settings, "repos_dir", str(tmp_path), raising=False) + + a = _make_task("ORCH-420", stage="done", repo="orchestrator") + job = {"task_id": a, "repo": "orchestrator"} + + # Avoid network / git / worktree; force a DEGRADED verdict. + monkeypatch.setattr(post_deploy, "probe_signals", + lambda *a, **k: post_deploy.ProbeResult(False, 2, 2, "down")) + monkeypatch.setattr(post_deploy, "classify", lambda *a, **k: post_deploy.DEGRADED) + monkeypatch.setattr(post_deploy, "write_post_deploy_log", lambda *a, **k: True) + monkeypatch.setattr(stage_engine, "set_issue_blocked", lambda *a, **k: None) + + alerts = [] + monkeypatch.setattr(stage_engine, "_notify_post_deploy", + lambda wi, msg: alerts.append(msg)) + + stage_engine.run_post_deploy_monitor(job) + + # Durable freeze row written + a freeze alert attempted. + assert serial_gate.is_repo_frozen("orchestrator") is True + assert any("ЗАМОРОЖЕН" in m for m in alerts), f"freeze alert missing: {alerts}" + + +# --------------------------------------------------------------- TC-11 +def test_freeze_gates_even_without_unfinished_task(): + _make_task("ORCH-430", stage="done") # degraded task already done + b = _make_task("ORCH-431", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + # Without freeze B would be claimable (A done, no earlier unfinished). Freeze it. + serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-430") + assert claim_next_job() is None, "active freeze gates the next analyst-job (BR-7)" + assert job_b + + +# --------------------------------------------------------------- TC-12 +def test_manual_unfreeze_lets_next_start(): + _make_task("ORCH-440", stage="done") + b = _make_task("ORCH-441", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-440") + assert claim_next_job() is None + + cleared = serial_gate.clear_repo_freeze("orchestrator") + assert cleared >= 1 + assert serial_gate.is_repo_frozen("orchestrator") is False + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b + # Idempotent: clearing again clears nothing. + assert serial_gate.clear_repo_freeze("orchestrator") == 0 + + +# --------------------------------------------------------------- TC-18 +def test_is_repo_frozen_fails_closed(monkeypatch): + def _boom(repo): + raise RuntimeError("freeze read down") + + monkeypatch.setattr(serial_gate, "_active_freeze_row", _boom, raising=True) + # Freeze layer enabled + cannot confirm absence -> fail CLOSED (True). + assert serial_gate.is_repo_frozen("orchestrator") is True + # Freeze layer OFF -> never frozen, even on a read error. + monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", False, raising=False) + assert serial_gate.is_repo_frozen("orchestrator") is False + + +# --------------------------------------------------------------- TC-22 +def test_repo_freeze_migration_idempotent(): + # Re-running init_db must not crash or duplicate the table/index. + init_db() + init_db() + conn = get_db() + cols = [r[1] for r in conn.execute("PRAGMA table_info(repo_freeze)").fetchall()] + conn.close() + assert {"repo", "frozen_at", "reason", "work_item_id", "cleared_at"}.issubset(set(cols)) + # A freeze still functions after repeated migration. + assert serial_gate.set_repo_freeze("orchestrator", "x", "ORCH-450") is True + assert serial_gate.is_repo_frozen("orchestrator") is True diff --git a/tests/test_status_trigger.py b/tests/test_status_trigger.py index 4be7fc8..99e452e 100644 --- a/tests/test_status_trigger.py +++ b/tests/test_status_trigger.py @@ -39,6 +39,11 @@ def setup(monkeypatch): monkeypatch.setattr(P.settings, "db_path", _test_db) import src.db as _db monkeypatch.setattr(_db.settings, "db_path", _test_db) + # ORCH-088: this suite asserts the branch is cut DURING start_pipeline. With the + # serial gate ON (default) the cut is deferred to the analyst-job claim, so pin + # to the kill-switch-off (legacy) path — branch timing is out of scope here + # (covered by test_serial_gate_branch). + monkeypatch.setattr(_db.settings, "serial_gate_enabled", False, raising=False) if os.path.exists(_test_db): os.unlink(_test_db) init_db()