From 1a2e881513b807005581db50434a677775899d11 Mon Sep 17 00:00:00 2001 From: claude-bot Date: Sun, 7 Jun 2026 15:31:37 +0000 Subject: [PATCH] feat(reaper): job-reaper + stale merge-lease reclaim + idempotent merge finalization Closes the "zombie jobs" incident class: job status was set only inside the live launcher process, so a process death left jobs.status='running' forever; at max_concurrency=1 one zombie blocked ALL projects' queue (self-hosting risk). Adds a background daemon (src/job_reaper.py) with three-tier liveness (dead-pid streak / known exit_code / max-running backstop) whose only mutating write is an atomic terminal flip guarded by WHERE status='running' (no double-process). For exit0 the canonical QG is the source of truth via gate-driven advance, not "exit0". Also proactively reclaims stale merge-lease (dead pid OR TTL) via file delete only (no git ops), and makes merge finalization idempotent (pr_already_merged guard + up-to-date short-circuit on re-drive). New jobs.pid column via idempotent _ensure_column (no migration); pid stamped in launcher._spawn after Popen. Reaper start/stop in lifespan; "reaper" snapshot in GET /queue. Kill-switches: ORCH_REAPER_ENABLED, ORCH_REAPER_INTERVAL_S, ORCH_REAPER_DEAD_TICKS, ORCH_REAPER_MAX_RUNNING_S, ORCH_LEASE_RECLAIM_ENABLED. Invariants unchanged (AC-13): STAGE_TRANSITIONS, QG_CHECKS registry, check_branch_mergeable signature/behaviour, BUG-8 rollback, hook exit codes. restart-safe, never-raise per unit of background work. Docs: docs/architecture/README.md, CHANGELOG.md, .env.example. Tests: tests/test_job_reaper.py, tests/test_merge_lease_reclaim.py, tests/test_merge_gate.py (TC-16), tests/test_merge_gate_race.py (TC-17), tests/test_queue.py, tests/test_config.py (TC-19/TC-20). 742 passed. Refs: ORCH-065 Co-Authored-By: Claude Opus 4.7 --- .env.example | 22 ++ CHANGELOG.md | 1 + docs/architecture/README.md | 2 +- src/agents/launcher.py | 8 + src/config.py | 27 +++ src/db.py | 75 ++++++ src/job_reaper.py | 370 ++++++++++++++++++++++++++++++ src/main.py | 32 ++- src/merge_gate.py | 138 +++++++++++ tests/test_config.py | 79 +++++++ tests/test_job_reaper.py | 285 +++++++++++++++++++++++ tests/test_merge_gate.py | 54 +++++ tests/test_merge_gate_race.py | 60 +++++ tests/test_merge_lease_reclaim.py | 138 +++++++++++ tests/test_queue.py | 55 +++++ 15 files changed, 1341 insertions(+), 5 deletions(-) create mode 100644 src/job_reaper.py create mode 100644 tests/test_job_reaper.py create mode 100644 tests/test_merge_lease_reclaim.py diff --git a/.env.example b/.env.example index 9a74109..dc9e36b 100644 --- a/.env.example +++ b/.env.example @@ -117,6 +117,28 @@ ORCH_RECONCILE_GRACE_OVERRIDES_JSON= ORCH_RECONCILE_NOTIFY_UNBLOCK=true ORCH_RECONCILE_SKIP_BLOCKED_ENABLED=true +# ORCH-065: job-reaper + proactive merge-lease reclaim. A background daemon thread +# (src/job_reaper.py, started LAST in main.lifespan after requeue_running_jobs) reaps +# zombie 'running' jobs whose monitor/process died before writing the terminal status +# (one zombie at max_concurrency=1 blocks the whole shared queue) and periodically +# reclaims dead/stale merge-leases. Liveness is three-tier: Tier-1 dead jobs.pid +# (os.kill(pid,0)) after REAPER_DEAD_TICKS consecutive dead ticks (anti-false-positive +# for a live agent); Tier-2 agent_runs.exit_code recorded but job still 'running'; +# Tier-3 backstop after REAPER_MAX_RUNNING_S. The terminal flip carries an atomic +# status='running' guard so it never double-processes a row racing requeue_running_jobs. +# REAPER_ENABLED -> global kill-switch (false -> strictly prior behaviour). +# REAPER_INTERVAL_S -> background scan period (seconds). +# REAPER_DEAD_TICKS -> consecutive dead-pid ticks before reaping (Tier-1, >=2). +# REAPER_MAX_RUNNING_S -> Tier-3 backstop ceiling; must exceed max agent_timeout+grace. +# LEASE_RECLAIM_ENABLED -> kill-switch for the proactive stale/dead lease reclaim +# (false -> only the legacy lazy TTL reclaim in acquire_merge_lease). +# (reuse) ORCH_MERGE_LOCK_TIMEOUT_S -> lease TTL; ORCH_MERGE_GATE_REPOS -> reclaim scope. +ORCH_REAPER_ENABLED=true +ORCH_REAPER_INTERVAL_S=60 +ORCH_REAPER_DEAD_TICKS=2 +ORCH_REAPER_MAX_RUNNING_S=3600 +ORCH_LEASE_RECLAIM_ENABLED=true + # ORCH-021: post-deploy production monitoring + degradation reaction. After the # terminal deploy->done transition for an applicable repo, a reserved-agent job # `post-deploy-monitor` (no LLM, modelled on deploy-finalizer) probes prod over a diff --git a/CHANGELOG.md b/CHANGELOG.md index e3f1d8b..e814af7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ## [Unreleased] ### Added +- **Job-reaper + проактивный реклейм протухшего merge-lease + идемпотентная финализация merge** (ORCH-065): закрыт класс инцидентов «zombie jobs» — статус job выставлялся ТОЛЬКО в живом процессе launcher'а, поэтому гибель процесса (OOM/рестарт инстанса/segfault Claude-CLI) оставляла строку `jobs.status='running'` навсегда; при `max_concurrency=1` один такой зомби намертво блокировал очередь ВСЕХ проектов (self-hosting: enduro-trails встаёт из-за зомби ORCH-задачи). Плюс два смежных дефекта: застрявший merge-lease (`.merge-lease-.json` реклеймился лишь лениво по TTL при чужом acquire, живость pid-holder'а не проверялась) и неидемпотентная финализация merge (rebase+re-test зелёные, но процесс умер до самого merge → нет повторного проигрывания). Решение — новый фоновый daemon-поток **`src/job_reaper.py`** (контракт «never-raise на единицу работы», паттерн `reconciler`/`queue_worker`): периодический тик (`reaper_interval_s`) сканирует `running`-jobs трёхуровневой проверкой живости (ADR Р-1): **Tier-1** мёртвый pid (`os.kill(pid, 0)` → `ProcessLookupError`) с анти-false-positive порогом `reaper_dead_ticks` подряд-мёртвых тиков (стрик в памяти); **Tier-2** `agent_runs.exit_code` записан, но job всё ещё `running` (исход известен — процесс завершился, но не успел флипнуть статус); **Tier-3** backstop-потолок `reaper_max_running_s`. Единственная мутирующая запись reaper'а — атомарный терминальный флип через `db.reap_running_job(... WHERE status='running')` (rowcount==1 у победителя, проигравший в гонке с `requeue_running_jobs`/launcher видит rowcount==0 — без двойной обработки, TC-06). Для Tier-2 exit0 источник истины — канонический QG (не «exit0»): gate-driven advance (`_gate_driven_advance` → штатный `launcher._try_advance_stage`, кандидат-стадии агента из `STAGE_TRANSITIONS`) проигрывается ПЕРЕД флипом — зелёный гейт → `done`, красный → путь неуспеха (requeue в пределах `attempts post_deploy_5xx_threshold`; иначе `HEALTHY` — одиночный глюк не откатывает), `decide_action` (self-hosting → ВСЕГДА `ALERT_ONLY`; не-self + `post_deploy_auto_rollback=true` → `ROLLBACK`; иначе `ALERT_ONLY`), `map_rollback_exit_code` (`0→ROLLBACK_OK`, иначе `ROLLBACK_FAILED`), sentinel-state хелперы (`armed`/`series`/`done` под `/.post-deploy-state-//`, restart-safe счётчики), `build_rollback_command`/`run_rollback` (ssh-хук `--rollback` с прод-env, синхронно — только для не-self), `build/write_post_deploy_log` (артефакт `16-post-deploy-log.md`), `arm_monitor` (идемпотентный арм + первый отложенный job), `status` (снимок для `/queue`). **Механизм наблюдения — reserved-agent job `post-deploy-monitor`** (детерминированный, no-LLM, калька `deploy-finalizer`, НЕ стадия и НЕ daemon): арм в `stage_engine.advance_stage` в блоке `next_stage == "done"` ПОСЛЕ terminal-sync/release-lease (`post_deploy.arm_monitor`, sentinel `armed` = идемпотентность при двойном webhook/reconciler/finalizer); один тик = один job — перехват в `agents/launcher.launch_job` ДО `_spawn` → `stage_engine.run_post_deploy_monitor` (один опрос → append в `series` → `classify` → перепостановка с задержкой `available_at_delay_s` ИЛИ реакция+артефакт+`mark_done`); бюджет тиков `window_s/interval_s` (анти-livelock). **Self-hosting safety (BR-5):** для `orchestrator` тик НИКОГДА не откатывает/рестартит прод-контейнер — реакция всегда `ALERT_ONLY` (громкий Telegram + Plane-коммент с запросом ручного approve); авто-rollback хуком `--rollback` — только для не-self репо при `post_deploy_auto_rollback=true` (целевой контейнер ≠ orchestrator). Наблюдаемость — блок `post_deploy` в `GET /queue` (enabled/window/interval/активные наблюдения). Артефакт `16-post-deploy-log.md` (YAML-frontmatter `post_deploy_status`/`action_taken`/`window_s`/`checks_total`/`checks_failed`) — машиночитаемо для петли уроков ORCH-8; best-effort. Новые настройки: `ORCH_POST_DEPLOY_MONITOR_ENABLED` (true, kill-switch), `ORCH_POST_DEPLOY_REPOS` (CSV; пусто → только self-hosting), `ORCH_POST_DEPLOY_WINDOW_S` (900), `ORCH_POST_DEPLOY_INTERVAL_S` (30), `ORCH_POST_DEPLOY_FAIL_THRESHOLD` (3), `ORCH_POST_DEPLOY_5XX_THRESHOLD` (0.5), `ORCH_POST_DEPLOY_AUTO_ROLLBACK` (false), `ORCH_POST_DEPLOY_BASE_URL` (http://localhost:8500); параметры отката переиспользуют `deploy_prod_*`. Инварианты НЕ менялись: `STAGE_TRANSITIONS`, реестр `QG_CHECKS`, `check_deploy_status`/`_parse_deploy_status`, terminal-sync `deploy→done`, merge-gate, exit-код-контракт хука (0/1/2), схема БД (без миграций; состояние — sentinel-файлы). Условность как ORCH-35/36/43/58. ADR `docs/work-items/ORCH-021/06-adr/ADR-001-post-deploy-monitor.md`, глобальный `docs/architecture/adr/adr-0010-post-deploy-monitor.md`. Тесты: `tests/test_post_deploy.py`, `tests/test_post_deploy_integration.py`. - **Провенанс staging-образа перед BUILD-ONCE retag в прод (свежесть артефакта, INV-FRESH)** (ORCH-058): BUILD-ONCE retag (ORCH-036) промоутит staging-образ (`orchestrator-orchestrator-staging`) в прод **без rebuild**, полагаясь на «образ свеж и провалидирован» — гарантии не было: конвейер нигде не пересобирал staging-образ из провалидированного коммита, поэтому retag мог тихо промоутнуть УСТАРЕВШИЙ образ (инцидент LESSONS_ORCH-036 п.4 — зелёный деплой молча откатывал прод). Закрыто **двумя слоями (defense in depth), только для self-hosting**. Новый модуль `src/image_freshness.py` (контракт «never raise», по образцу `merge_gate`): `provenance_verdict` (чистая функция вердикта match/mismatch/fail-closed), `validated_revision` (`git rev-parse HEAD` в worktree валидированного коммита — единый якорь и для штампа A, и для `EXPECTED_REVISION` B), `image_revision` (OCI-лейбл `org.opencontainers.image.revision` через `docker image inspect`, ``/ошибка → пусто), `rebuild_staging_image` (ssh-хук `--build-staging`), `image_freshness_applies` (условность), `check_staging_image_fresh` (композитный QG). **Strategy A (liveness):** новый детерминированный QG-под-чек `check_staging_image_fresh` (зарегистрирован в `QG_CHECKS`, `src/qg/checks.py`) на ребре `deploy-staging → deploy` ПОСЛЕ merge-gate и ДО Phase A — пересобирает staging-образ из worktree валидированного коммита (хук `--build-staging`, `--build-arg GIT_SHA=`), пересоздаёт 8501 и прогоняет `staging_check.py --mode stub` против свежего 8501 (health + e2e, внутри staging-контейнера через `docker exec` — канон ORCH-048) → валидируем РОВНО тот артефакт (build + e2e), что промоутится в прод (AC-4); FAIL/не-ноль staging_check → откат на `development` (как merge-gate, кап `MAX_DEVELOPER_RETRIES`). `rebuild_staging_image` пробрасывает в хук **явный** staging-таргет (service/port/profile/container), исключая дрейф на прод 8500. Сборки/recreate/validate — **только staging (8501)**, прод (8500) не трогается. **Strategy B (safety):** `Dockerfile` штампует `LABEL org.opencontainers.image.revision=$GIT_SHA` (`ARG GIT_SHA`); `build_deploy_command` (`src/self_deploy.py`) пробрасывает `EXPECTED_REVISION`; хост-хук шагом 2b ПЕРЕД `docker tag` fail-closed сверяет лейбл `revision` у `SOURCE_IMAGE` с `EXPECTED_REVISION` — несовпадение / пустой лейбл / ошибка inspect → `exit 1` (FAILED → БАГ-8 откат), делает тихий промоут устаревшего образа структурно невозможным даже при проигравшей гонку/отключённой A. Хост-хук `scripts/orchestrator-deploy-hook.sh` расширен **обратно-совместимым** режимом `--build-staging` (пересборка+recreate staging, exit 0/1) и fail-closed guard'ом (активен только при заданном `EXPECTED_REVISION`). Единый kill-switch `ORCH_IMAGE_FRESHNESS_ENABLED` (true) включает A+B **как целое** (нет «B без A» = вечного fail-fast); область — `ORCH_IMAGE_FRESHNESS_REPOS` (CSV; пусто → только self-hosting `orchestrator`). Контракты НЕ менялись: `STAGE_TRANSITIONS` (под-гейт ребра, не стадия), exit-code-контракт хука (0/1/2), `map_exit_code_to_status`, `check_deploy_status`/`_parse_deploy_status`, БАГ-8, terminal-sync, merge-gate; схема БД — без миграций. ADR `docs/work-items/ORCH-058/06-adr/ADR-001-staging-image-provenance.md`, глобальный `docs/architecture/adr/adr-0008-staging-image-provenance.md`. Документация: `docs/architecture/README.md`, `docs/operations/DEPLOY_HOOK.md`, `docs/operations/STAGING.md`, `docs/operations/INFRA.md`, `.env.example`. Тесты: `tests/test_image_freshness.py`, `tests/test_deploy_hook_provenance.py`, `tests/test_deploy_build_once.py` (TC-06), `tests/test_deploy_hook_mapping.py` (TC-09), `tests/test_stage_engine.py::TestImageFreshnessGate`, `tests/test_qg_registry_snapshot.py`, `tests/test_config.py`. - **Исполняемый самодеплой стадии `deploy` (стадия дёргает хост-хук, manual-approve)** (ORCH-036): стадия `deploy` перестаёт быть «бумажной» — для self-hosting репозитория `orchestrator` `deploy_status: SUCCESS` означает ДОКАЗАННЫЙ health-ok реального рестарта прод-контейнера (8500), а не декларацию LLM. Критический путь self-restart детерминирован (без LLM), по образцу merge-gate ORCH-043, и разбит на три фазы (`src/stage_engine.py` + новый модуль `src/self_deploy.py`): **Фаза A** (вход в `deploy`) — вместо запуска прод-deployer'а при `deploy_require_manual_approve=true` задача переводится в approval-pending (`set_issue_in_review`) и ждёт ручного approve; restart-safe маркер `approve-requested`. **Фаза B** (человек ставит статус Plane → `Approved`; `advance_stage(deploy, finished_agent=None)`) — запускается **detached host-процесс** (`ssh + setsid` → `scripts/orchestrator-deploy-hook.sh`, чтобы рестарт 8500 пережил гибель контейнера; орк НЕ убивает себя из docker.sock) с build-once retag staging-образа (`SOURCE_IMAGE`), ставится детерминированный **finalizer-job**; маркер `initiated` — идемпотентность повторного Approved. **Фаза C** (`run_deploy_finalizer`, reserved-agent `deploy-finalizer`, claim'ится новым контейнером после рестарта) — читает sentinel `result` (exit-code хука, записан host-обёрткой), `not-ready` → defer (бюджет `deploy_finalize_max_attempts`, restart-safe по `task_content`), маппит `0→SUCCESS / 1|2|иное→FAILED` (чистая функция `map_exit_code_to_status`, unit-тест), пишет `14-deploy-log.md` и вызывает `advance_stage(deploy, finished_agent="deployer")` → существующие контракты: `SUCCESS → done` + release merge-lease, `FAILED → откат БАГ-8 на development` + `set_issue_blocked`. Уведомления Plane+Telegram на approve-request / initiate / success / rollback (BR-5, ни одного «молчаливого» деплоя). Хост-хук `scripts/orchestrator-deploy-hook.sh` расширен **обратно-совместимым** `SOURCE_IMAGE`: при заданном — `docker tag $SOURCE_IMAGE $TARGET_IMAGE` перед `up -d --no-build` (деплой РОВНО протестированного образа, без `docker build`); не задан → прежнее поведение; exit-code-контракт (0/1/2) и health-loop (10×6с, авто-rollback) не тронуты. Restart-safe состояние — sentinel-файлы (`/.deploy-state-//`), без миграции БД. Условность как ORCH-35: реальный самодеплой только для `is_self_hosting_repo("orchestrator")`; прочие репо (enduro-trails) — прежний синхронный ssh-путь агентом. Контракты НЕ менялись: `STAGE_TRANSITIONS`, реестр `QG_CHECKS`, `check_deploy_status`/`_parse_deploy_status` (frontmatter-only), terminal-sync `deploy→done`, merge-gate (ORCH-43), БАГ-8. Флаг `DEPLOY_REQUIRE_MANUAL_APPROVE` остаётся `true` (полный авто — отдельная задача ORCH-54). Новые настройки: `ORCH_DEPLOY_REQUIRE_MANUAL_APPROVE` (true), `ORCH_DEPLOY_SSH_USER`, `ORCH_DEPLOY_SSH_HOST`, `ORCH_DEPLOY_HOOK_SCRIPT`, `ORCH_DEPLOY_PROD_SOURCE_IMAGE`, `ORCH_DEPLOY_PROD_TARGET_SERVICE/PORT/IMAGE`, `ORCH_DEPLOY_FINALIZE_DELAY_S`, `ORCH_DEPLOY_FINALIZE_MAX_ATTEMPTS`. ADR `docs/work-items/ORCH-036/06-adr/ADR-001-executable-self-deploy.md`, глобальный `docs/architecture/adr/adr-0007-executable-self-deploy.md`. Документация: `.openclaw/agents/deployer.md` (стадия `deploy` = вызов хука, запрет self-restart), `docs/operations/INFRA.md`, `docs/operations/DEPLOY_HOOK.md`. Тесты: `tests/test_deploy_hook_mapping.py`, `tests/test_deploy_approve.py`, `tests/test_deploy_routing.py`, `tests/test_deploy_rollback.py`, `tests/test_deploy_notifications.py`, `tests/test_deploy_build_once.py`, `tests/test_deploy_terminal_sync.py`, `tests/test_staging_precondition.py`, `tests/test_deploy_hook_rollback_sim.py`. diff --git a/docs/architecture/README.md b/docs/architecture/README.md index 559e7dc..a84c865 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -295,4 +295,4 @@ ORCH-065 вводит фоновый watchdog, чтобы смерть проц Схема БД, потоки данных, resilience-слой, детали Dockerfile — [internals.md](internals.md). --- -*Актуально на 2026-06-07. Обновлять при изменении src/stages.py, src/qg/checks.py, src/main.py. Статусы доработок: ORCH-036 (исполняемый самодеплой `deploy`, adr-0007) — реализовано; ORCH-043 (merge-gate, adr-0006) — design, ветка feature/ORCH-043; ORCH-053 (reconciler, adr-0007, src/reconciler.py) — реализовано; ORCH-060 (F-1 skip escalated/Blocked/Needs-Input, `docs/work-items/ORCH-060/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-060 (Guard 1 `developer_retry_count>=MAX_DEVELOPER_RETRIES` + Guard 2 `plane_sync.fetch_issue_state` Blocked/Needs-Input, флаг `ORCH_RECONCILE_SKIP_BLOCKED_ENABLED`); ORCH-058 (провенанс staging-образа: check_staging_image_fresh + staging_check свежего образа + хук-guard, adr-0008) — реализовано в ветке feature/ORCH-058 (обновлять также при изменении src/image_freshness.py, scripts/orchestrator-deploy-hook.sh, Dockerfile); ORCH-061 (толерантность staging-вердикта к инфра-FAIL C9a/C9b, adr-0009, `docs/work-items/ORCH-061/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-061 (обновлять также при изменении src/staging_verdict.py, scripts/staging_check.py, флаг staging_infra_tolerance_enabled); ORCH-021 (post-deploy наблюдение прода + реакция на деградацию, adr-0010, `docs/work-items/ORCH-021/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-021-post-deploy-rollback (reserved-agent job `post-deploy-monitor`: арм в src/stage_engine.py блок `next_stage == "done"`, тик `run_post_deploy_monitor` + перехват в src/agents/launcher.py ДО _spawn; чистая логика src/post_deploy.py never-raise; флаги `post_deploy_*` в src/config.py; блок `post_deploy` в `/queue`; артефакт 16-post-deploy-log.md; self-hosting всегда ALERT_ONLY — тик не рестартит прод; обновлять также при изменении src/post_deploy.py / арм-блока / launcher-перехвата); ORCH-065 (job-reaper + проактивный реклейм merge-lease + идемпотентная финализация merge, adr-0011, `docs/work-items/ORCH-065/06-adr/ADR-001`) — design, ветка feature/ORCH-065 (новый daemon-поток src/job_reaper.py + старт/стоп в src/main.py lifespan; колонка `jobs.pid` через _ensure_column + проставление в src/agents/launcher.py `_spawn`; функции реклейма lease `pid_alive`/`reclaim_stale_lease` + guard `pr_already_merged` в src/merge_gate.py; флаги `reaper_*`/`lease_reclaim_*` в src/config.py; блок `reaper` в `/queue`; обновлять также при изменении этих мест).* +*Актуально на 2026-06-07. Обновлять при изменении src/stages.py, src/qg/checks.py, src/main.py. Статусы доработок: ORCH-036 (исполняемый самодеплой `deploy`, adr-0007) — реализовано; ORCH-043 (merge-gate, adr-0006) — design, ветка feature/ORCH-043; ORCH-053 (reconciler, adr-0007, src/reconciler.py) — реализовано; ORCH-060 (F-1 skip escalated/Blocked/Needs-Input, `docs/work-items/ORCH-060/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-060 (Guard 1 `developer_retry_count>=MAX_DEVELOPER_RETRIES` + Guard 2 `plane_sync.fetch_issue_state` Blocked/Needs-Input, флаг `ORCH_RECONCILE_SKIP_BLOCKED_ENABLED`); ORCH-058 (провенанс staging-образа: check_staging_image_fresh + staging_check свежего образа + хук-guard, adr-0008) — реализовано в ветке feature/ORCH-058 (обновлять также при изменении src/image_freshness.py, scripts/orchestrator-deploy-hook.sh, Dockerfile); ORCH-061 (толерантность staging-вердикта к инфра-FAIL C9a/C9b, adr-0009, `docs/work-items/ORCH-061/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-061 (обновлять также при изменении src/staging_verdict.py, scripts/staging_check.py, флаг staging_infra_tolerance_enabled); ORCH-021 (post-deploy наблюдение прода + реакция на деградацию, adr-0010, `docs/work-items/ORCH-021/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-021-post-deploy-rollback (reserved-agent job `post-deploy-monitor`: арм в src/stage_engine.py блок `next_stage == "done"`, тик `run_post_deploy_monitor` + перехват в src/agents/launcher.py ДО _spawn; чистая логика src/post_deploy.py never-raise; флаги `post_deploy_*` в src/config.py; блок `post_deploy` в `/queue`; артефакт 16-post-deploy-log.md; self-hosting всегда ALERT_ONLY — тик не рестартит прод; обновлять также при изменении src/post_deploy.py / арм-блока / launcher-перехвата); ORCH-065 (job-reaper + проактивный реклейм merge-lease + идемпотентная финализация merge, adr-0011, `docs/work-items/ORCH-065/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-065 (новый daemon-поток src/job_reaper.py + старт/стоп в src/main.py lifespan; колонка `jobs.pid` через _ensure_column + проставление в src/agents/launcher.py `_spawn`; функции реклейма lease `pid_alive`/`reclaim_stale_lease` + guard `pr_already_merged` в src/merge_gate.py; флаги `reaper_*`/`lease_reclaim_*` в src/config.py; блок `reaper` в `/queue`; обновлять также при изменении этих мест).* diff --git a/src/agents/launcher.py b/src/agents/launcher.py index ec957d8..b356eb1 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -417,6 +417,14 @@ class AgentLauncher: "UPDATE agent_runs SET output_path = ? WHERE id = ?", (output_path, run_id), ) + # ORCH-065: stamp the agent process pid onto the job row so the job-reaper + # can probe liveness (os.kill(pid, 0)). proc.pid only exists after Popen, + # so this is a second UPDATE next to run_id/started_at (set above in _spawn). + if job_id is not None: + conn.execute( + "UPDATE jobs SET pid = ? WHERE id = ?", + (proc.pid, job_id), + ) conn.commit() conn.close() diff --git a/src/config.py b/src/config.py index 1161959..fc00219 100644 --- a/src/config.py +++ b/src/config.py @@ -296,6 +296,33 @@ class Settings(BaseSettings): post_deploy_auto_rollback: bool = False post_deploy_base_url: str = "http://localhost:8500" + # ORCH-065: job-reaper + proactive merge-lease reclaim. A background daemon + # thread (modelled on the reconciler) makes "the monitor thread / process died + # while a job/lease was held" self-heal WITHOUT a restart. Status (done/queued/ + # failed) is otherwise only ever set by launcher._monitor_agent -> _finalize_job + # inside the live process; a death there left the jobs row 'running' forever and + # (at max_concurrency=1) wedged the queue of EVERY project (incidents 07.06: jobs + # 236/239/242/254). The same thread proactively reclaims a stale/dead merge-lease + # (ORCH-043) instead of waiting for the lazy TTL on the next foreign acquire. See + # docs/architecture/adr/adr-0011-job-reaper-lease-reclaim.md. + # reaper_enabled -> global kill-switch (false -> strictly prior behaviour; + # only the startup requeue_running_jobs remains). + # reaper_interval_s -> background scan period (seconds). + # reaper_dead_ticks -> Tier-1: consecutive ticks a job's pid must be dead + # before it is reaped (>=2 anti-false-positive; a live + # long-running agent is NEVER reaped). + # reaper_max_running_s -> Tier-3 backstop ceiling: a job 'running' longer than + # this is reaped even when liveness is unknowable. MUST be + # > max agent_timeout + grace so a legit agent is safe. + # lease_reclaim_enabled -> kill-switch for the proactive stale/dead lease reclaim + # (false -> only the legacy lazy TTL reclaim in acquire). + # (reuse) merge_lock_timeout_s -> lease TTL; merge_gate_repos -> reclaim scope. + reaper_enabled: bool = True + reaper_interval_s: int = 60 + reaper_dead_ticks: int = 2 + reaper_max_running_s: int = 3600 + lease_reclaim_enabled: bool = True + # Telegram notifications telegram_bot_token: str = "" telegram_chat_id: str = "" diff --git a/src/db.py b/src/db.py index 0e0358a..04c67d9 100644 --- a/src/db.py +++ b/src/db.py @@ -76,6 +76,11 @@ def init_db(): # (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table). _ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0") _ensure_column(conn, "jobs", "available_at", "TEXT") + # ORCH-065: pid of the spawned agent process, stamped in launcher._spawn next to + # run_id/started_at. The job-reaper uses it for Tier-1 liveness (os.kill(pid, 0)) + # to detect a 'running' job whose process died before _finalize_job. Idempotent + # ALTER (no-op once present) -> safe on the live prod DB. + _ensure_column(conn, "jobs", "pid", "INTEGER") # ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL # unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows # (which have NULL delivery_id) never collide with each other. Restart-safe: @@ -593,6 +598,76 @@ def requeue_running_jobs() -> int: return int(n) +def get_running_jobs() -> list[dict]: + """ORCH-065: snapshot of every 'running' job for the job-reaper scan. + + Each row carries the job columns plus three reaper inputs: + * ``running_age_s`` — seconds since ``started_at`` (Tier-3 backstop); + * ``exit_code`` — the linked ``agent_runs.exit_code`` (Tier-2: process + finished but the job is still 'running' -> monitor died mid-finalize); + * ``finished_at_run`` — the linked ``agent_runs.finished_at`` (debug only). + + A LEFT JOIN on ``run_id`` keeps jobs with no agent_runs row (exit_code NULL). + Read-only; never mutates. The reaper applies liveness/streak/backstop on top. + """ + conn = get_db() + try: + rows = conn.execute( + "SELECT j.*, " + "CAST(strftime('%s','now') - strftime('%s', j.started_at) AS INTEGER) " + " AS running_age_s, " + "r.exit_code AS exit_code, r.finished_at AS finished_at_run " + "FROM jobs j LEFT JOIN agent_runs r ON r.id = j.run_id " + "WHERE j.status='running'" + ).fetchall() + finally: + conn.close() + return [dict(r) for r in rows] + + +def reap_running_job( + job_id: int, + status: str, + run_id: int | None = None, + error: str | None = None, +) -> bool: + """ORCH-065: atomic terminal flip of a RUNNING job by the job-reaper. + + Mirrors ``mark_job`` but carries the ``status='running'`` guard in the WHERE + clause and reports ``rowcount`` so a late-arriving monitor / the startup + ``requeue_running_jobs`` / a second reaper tick can never double-process the + same row (AC-5, restart-safe). Returns True iff THIS call won the flip + (rowcount == 1); False -> someone else already moved the row. + + Status semantics match ``mark_job``: done/failed stamp ``finished_at``; queued + clears ``started_at``/``finished_at`` so the next claim treats it as fresh. + """ + conn = get_db() + try: + sets = ["status = ?"] + params: list = [status] + if run_id is not None: + sets.append("run_id = ?") + params.append(run_id) + if error is not None: + sets.append("error = ?") + params.append(error) + if status in ("done", "failed"): + sets.append("finished_at = datetime('now')") + elif status == "queued": + sets.append("started_at = NULL") + sets.append("finished_at = NULL") + params.append(job_id) + cur = conn.execute( + f"UPDATE jobs SET {', '.join(sets)} WHERE id = ? AND status='running'", + params, + ) + conn.commit() + return cur.rowcount == 1 + finally: + conn.close() + + def get_job(job_id: int) -> dict | None: """Fetch a single job by id.""" conn = get_db() diff --git a/src/job_reaper.py b/src/job_reaper.py new file mode 100644 index 0000000..fbbc9a3 --- /dev/null +++ b/src/job_reaper.py @@ -0,0 +1,370 @@ +"""ORCH-065: job-reaper + proactive merge-lease reclaim background daemon. + +Three failure classes share one root cause — "the thread/process died while it +still held captured state" — and one inert recovery layer +(``requeue_running_jobs``) that only fires on a process restart: + + * **A — zombie jobs.** A job's terminal status (``done``/``queued``/``failed``) + is written ONLY inside ``launcher._monitor_agent -> _finalize_job`` in the + live process. If that thread/process dies between ``proc.wait()`` and the + status write (crash, OOM, self-restart mid-deploy) the ``jobs`` row stays + ``running`` forever. At ``max_concurrency=1`` one zombie blocks the claim of + EVERY project's jobs -> the whole shared pipeline stalls. + * **B — stuck merge-lease.** The file lease ``.merge-lease-.json`` + (ORCH-043) is reclaimed only lazily, by TTL, and only when ANOTHER task tries + to acquire it. Holder liveness (pid) is never probed, so a death with the + lease held blocks foreign merges until the TTL expires. + +This module is a background daemon thread modelled on ``reconciler`` +(``threading.Thread(daemon=True)`` + ``threading.Event``, start/stop in +``main.lifespan``, ``/queue`` snapshot, per-unit never-raise, kill-switch). Each +tick: (1) scans ``running`` jobs and reaps the dead ones via three-tier liveness +detection; (2) proactively reclaims dead/stale merge-leases (mechanism B) for the +in-scope repos. + +Liveness (defense in depth, ADR-001 Р-1): + * **Tier-1 (primary): dead pid.** ``jobs.pid`` (stamped by ``launcher._spawn``) + probed with ``merge_gate.pid_alive``. A job is reaped only after + ``reaper_dead_ticks`` (>=2) CONSECUTIVE dead-pid ticks — an in-memory streak + counter kills false positives (AC-3); a live agent within its timeout is + never reaped. + * **Tier-2 (completion race): exit_code recorded but job still running.** The + monitor died between writing ``agent_runs.exit_code`` and ``_finalize_job``. + The outcome is KNOWN -> gate-driven advance on exit0, else the standard + transient/permanent contract. + * **Tier-3 (backstop): age ceiling.** A job ``running`` longer than + ``reaper_max_running_s`` (deliberately > max ``agent_timeout`` + grace) is + reaped even when liveness cannot be determined (pid reused / unknown). + +Action on confirmed death reuses existing contracts (no new merge/stage logic): + * The reaper's ONLY mutating write to a job row is the atomic terminal flip + ``db.reap_running_job(... WHERE status='running')`` — so a late-arriving + monitor / the startup ``requeue_running_jobs`` / a second tick can never + double-process a row (AC-5; the loser sees ``rowcount==0``). + * **exit0 (Tier-2):** gate-driven idempotent advance — the source of truth is + the canonical quality gate, NOT "exit0". If the stage already advanced -> + just mark ``done`` (idempotent cleanup). Else run ``launcher._try_advance_stage`` + (it runs the canonical QG: artifact/PR present -> green -> advance; absent -> + red -> no advance) and re-check: advanced -> ``done``; still red (e.g. the + monitor died before git-push, so no artifact) -> fall through to the failure + path. This makes a false ``done`` without real work impossible. + * **exit!=0 (Tier-2) / unknown outcome (Tier-1 dead pid, Tier-3 backstop):** + ``attempts < max_attempts`` -> ``queued`` (mirrors ``requeue_running_jobs``); + budget exhausted -> ``failed`` + Telegram. We never fabricate exit0. + +Invariants (ТЗ §8 / ADR-001): never-raise per unit of work; idempotency (atomic +guard + gate-driven advance); restart-safe (the reaper starts AFTER the startup +``requeue_running_jobs``); silence when nothing is anomalous; the reaper NEVER +restarts/kills the prod container and NEVER pushes ``main``. ``STAGE_TRANSITIONS`` +/ ``QG_CHECKS`` and every ``check_*`` signature are unchanged. + +See docs/work-items/ORCH-065/06-adr/ADR-001-job-reaper-and-lease-reclaim.md and +the cross-cutting docs/architecture/adr/adr-0011-job-reaper-lease-reclaim.md. +""" + +import logging +import threading +from datetime import datetime, timezone + +from .config import settings +from .db import ( + get_db, + get_running_jobs, + reap_running_job, +) +from .stages import STAGE_TRANSITIONS, get_agent_for_stage + +logger = logging.getLogger("orchestrator.job_reaper") + + +def reclaim_all_stale_leases() -> int: + """Proactively reclaim dead/stale merge-leases for every in-scope repo. + + Used both at startup (``main.lifespan``, next to ``requeue_running_jobs``) and + on every reaper tick (mechanism B). Iterates the merge-gate scope + (``merge_gate_repos`` CSV, else self-hosting ``orchestrator``) and calls the + never-raise ``merge_gate.reclaim_stale_lease`` per repo. Returns the number of + leases actually reclaimed. Never raises (per-repo isolation). + """ + if not settings.lease_reclaim_enabled: + return 0 + reclaimed = 0 + try: + from . import merge_gate + raw = (settings.merge_gate_repos or "").strip() + if raw: + repos = [r.strip() for r in raw.split(",") if r.strip()] + else: + from .qg.checks import SELF_HOSTING_REPO + repos = [SELF_HOSTING_REPO] + for repo in repos: + try: + if merge_gate.reclaim_stale_lease(repo): + reclaimed += 1 + except Exception as e: # noqa: BLE001 - isolate one repo's failure + logger.error("lease-reclaim failed for repo %s: %s", repo, e) + except Exception as e: # noqa: BLE001 - never-raise contract + logger.error("reclaim_all_stale_leases error: %s", e) + return reclaimed + + +class JobReaper: + """Background daemon that reaps zombie jobs and reclaims stale merge-leases. + + Modelled on ``Reconciler``: a ``threading.Thread(daemon=True)`` + a + ``threading.Event`` for a clean stop. The only in-memory state is the + best-effort Tier-1 dead-pid streak counter (``_streak``) and the + observability counters (``reaped_total`` / ``last_reaped`` / + ``lease_reclaimed_total`` / ``last_run_ts``); all reset on restart, which is + safe because the startup ``requeue_running_jobs`` covers the restart path. + """ + + def __init__(self, interval_s: float | None = None): + self.interval_s = ( + interval_s if interval_s is not None else settings.reaper_interval_s + ) + self._stop = threading.Event() + self._thread: threading.Thread | None = None + # Tier-1 anti-false-positive: {job_id: consecutive dead-pid ticks}. + self._streak: dict[int, int] = {} + # Best-effort observability (Р-6). + self.last_run_ts: float | None = None + self.reaped_total: int = 0 + self.last_reaped: dict | None = None + self.lease_reclaimed_total: int = 0 + + # -- A: zombie-job reaping -------------------------------------------- + def reap_once(self) -> None: + """One scan over all ``running`` jobs (per-job never-raise) + lease reclaim.""" + if settings.reaper_enabled: + try: + running = get_running_jobs() + except Exception as e: # noqa: BLE001 - never break the tick + logger.error("reaper: get_running_jobs failed: %s", e) + running = [] + seen: set[int] = set() + for job in running: + jid = job.get("id") + if jid is not None: + seen.add(jid) + try: + self._reap_job(job) + except Exception as e: # noqa: BLE001 - isolate one job's failure + logger.error( + "reaper: job %s (agent=%s) failed: %s", + job.get("id"), job.get("agent"), e, + ) + # Forget streaks for rows that are no longer running (reaped / requeued + # / finished by the monitor) so the dict cannot grow unbounded. + self._streak = {k: v for k, v in self._streak.items() if k in seen} + # Mechanism B: proactive stale/dead lease reclaim (own kill-switch). + try: + self.lease_reclaimed_total += reclaim_all_stale_leases() + except Exception as e: # noqa: BLE001 - never break the tick + logger.error("reaper: lease reclaim sweep failed: %s", e) + + def _reap_job(self, job: dict) -> None: + """Apply the three-tier liveness policy to a single running job.""" + from . import merge_gate + + job_id = job["id"] + pid = job.get("pid") + age = int(job.get("running_age_s") or 0) + exit_code = job.get("exit_code") # from the LEFT JOIN on agent_runs + + # Tier-2: the process finished (exit_code recorded) but the job is still + # 'running' -> the monitor died mid-finalize. Outcome is KNOWN. + if exit_code is not None: + self._streak.pop(job_id, None) + self._reap_known_outcome(job, int(exit_code)) + return + + # Tier-1: dead pid, only after `reaper_dead_ticks` consecutive dead ticks. + if pid is not None and not merge_gate.pid_alive(pid): + n = self._streak.get(job_id, 0) + 1 + self._streak[job_id] = n + if n >= max(int(settings.reaper_dead_ticks), 1): + self._streak.pop(job_id, None) + self._reap_unknown_outcome(job, reason=f"dead pid={pid}") + return + logger.info( + "reaper: job %s pid=%s dead (streak %d/%d) — deferring", + job_id, pid, n, settings.reaper_dead_ticks, + ) + else: + # Alive / no pid -> reset the streak (must be CONSECUTIVE). + self._streak.pop(job_id, None) + + # Tier-3: backstop ceiling (one-shot; reaps even when liveness is unknown). + if age >= int(settings.reaper_max_running_s): + self._streak.pop(job_id, None) + self._reap_unknown_outcome( + job, reason=f"backstop age={age}s>={settings.reaper_max_running_s}s" + ) + + # -- reap actions ------------------------------------------------------ + def _reap_known_outcome(self, job: dict, exit_code: int) -> None: + """Tier-2: the agent's exit_code is known; drive the job's terminal status.""" + if exit_code == 0: + if self._gate_driven_advance(job): + if reap_running_job(job["id"], "done", run_id=job.get("run_id")): + self._note_reap(job, "done", reason="exit0, gate green") + return + # exit0 but the gate is red (e.g. monitor died before git-push -> no + # artifact). Do NOT fabricate 'done'; treat as a failed outcome. + self._reap_unknown_outcome(job, reason="exit0 but gate red") + else: + self._reap_unknown_outcome(job, reason=f"exit={exit_code}") + + def _reap_unknown_outcome(self, job: dict, reason: str) -> None: + """Tier-1/Tier-3 (or exit!=0): outcome not a clean success. + + Mirrors ``requeue_running_jobs`` / the permanent-failure contract: + ``attempts < max_attempts`` -> ``queued`` (a retry); budget exhausted -> + ``failed`` + Telegram. The terminal flip is the atomic ``reap_running_job`` + guard, so a racing requeue/monitor never double-processes the row. + """ + job_id = job["id"] + run_id = job.get("run_id") + attempts = int(job.get("attempts") or 0) + max_attempts = int(job.get("max_attempts") or 2) + err = f"reaped: {reason} (run_id={run_id})" + if attempts < max_attempts: + if reap_running_job(job_id, "queued", run_id=run_id, error=err): + self._note_reap(job, "queued", reason=reason) + else: + if reap_running_job(job_id, "failed", run_id=run_id, error=err): + self._note_reap(job, "failed", reason=reason) + self._notify_failed(job, reason) + + def _gate_driven_advance(self, job: dict) -> bool: + """Idempotent, gate-driven stage advance for a reaped exit0 job. + + Returns True iff the stage is (or has become) advanced past this agent's + stage — i.e. the canonical quality gate is satisfied and a clean ``done`` + is correct. Returns False when the gate is still red (the caller then + routes the job to the failure path instead of a false ``done``). + + The advance itself reuses the UNCHANGED ``launcher._try_advance_stage`` + (which runs the canonical QG and the unified ``advance_stage``); the + reaper never duplicates ``update_task_stage`` / ``enqueue_job``. + """ + agent = job.get("agent") + repo = job.get("repo") + run_id = job.get("run_id") + branch, stage = self._task_branch_stage(job) + # Candidate stages whose finishing agent is THIS agent (deployer maps to + # both 'testing' and 'deploy-staging', hence a set). + candidates = {s for s in STAGE_TRANSITIONS if get_agent_for_stage(s) == agent} + if stage is None or stage not in candidates: + # Stage already advanced past this agent (or unknown) -> idempotent + # cleanup: a clean 'done' is correct without re-advancing. + return True + if not branch: + return False + try: + from .agents.launcher import launcher + launcher._try_advance_stage(run_id, agent, repo, branch) + except Exception as e: # noqa: BLE001 - never break the reap + logger.error("reaper: gate-driven advance failed for job %s: %s", + job.get("id"), e) + return False + # Re-read the stage: advanced out of the candidate set -> gate was green. + _branch, new_stage = self._task_branch_stage(job) + return new_stage is None or new_stage not in candidates + + @staticmethod + def _task_branch_stage(job: dict) -> tuple[str | None, str | None]: + """Resolve (branch, stage) for the job's task. Never raises.""" + task_id = job.get("task_id") + if not task_id: + return None, None + try: + conn = get_db() + row = conn.execute( + "SELECT branch, stage FROM tasks WHERE id = ?", (task_id,) + ).fetchone() + conn.close() + if not row: + return None, None + return row["branch"], row["stage"] + except Exception as e: # noqa: BLE001 - never-raise contract + logger.warning("reaper: task lookup failed for job %s: %s", + job.get("id"), e) + return None, None + + def _notify_failed(self, job: dict, reason: str) -> None: + try: + from .notifications import send_telegram + send_telegram( + f"\U0001f6a8 reaper: job {job.get('id')} ({job.get('agent')}, " + f"repo {job.get('repo')}) reaped as FAILED: {reason}" + ) + except Exception as e: # noqa: BLE001 - telegram best-effort + logger.warning("reaper: failed-notify telegram error: %s", e) + + def _note_reap(self, job: dict, outcome: str, reason: str) -> None: + """Record + log one successful reap (Р-6 observability).""" + self.reaped_total += 1 + self.last_reaped = { + "job_id": job.get("id"), + "agent": job.get("agent"), + "outcome": outcome, + } + logger.warning( + "reaper: job %s (agent=%s, repo=%s, run_id=%s, pid=%s) reaped -> %s (%s)", + job.get("id"), job.get("agent"), job.get("repo"), + job.get("run_id"), job.get("pid"), outcome, reason, + ) + + # -- loop / lifecycle -------------------------------------------------- + def _tick(self) -> None: + try: + self.reap_once() + finally: + self.last_run_ts = datetime.now(timezone.utc).timestamp() + + def _run(self) -> None: + logger.info( + "JobReaper started (interval=%ss, enabled=%s, dead_ticks=%s, " + "max_running_s=%s, lease_reclaim=%s)", + self.interval_s, settings.reaper_enabled, settings.reaper_dead_ticks, + settings.reaper_max_running_s, settings.lease_reclaim_enabled, + ) + while not self._stop.is_set(): + try: + self._tick() + except Exception as e: # noqa: BLE001 - outer never-raise + logger.error("JobReaper loop error: %s", e) + self._stop.wait(self.interval_s) + logger.info("JobReaper stopped") + + def start(self) -> None: + """Start the daemon thread (idempotent: a live thread is a no-op).""" + if self._thread and self._thread.is_alive(): + return + self._stop.clear() + self._thread = threading.Thread( + target=self._run, name="job-reaper", daemon=True + ) + self._thread.start() + + def stop(self, timeout: float = 5.0) -> None: + self._stop.set() + if self._thread: + self._thread.join(timeout=timeout) + + def status(self) -> dict: + """Reaper snapshot for /queue observability (Р-6).""" + return { + "enabled": settings.reaper_enabled, + "interval": self.interval_s, + "last_run_ts": self.last_run_ts, + "reaped_total": self.reaped_total, + "last_reaped": self.last_reaped, + "lease_reclaimed_total": self.lease_reclaimed_total, + } + + +# Module-level singleton used by the FastAPI lifespan. +reaper = JobReaper() diff --git a/src/main.py b/src/main.py index c21e5b2..b610cb3 100644 --- a/src/main.py +++ b/src/main.py @@ -60,6 +60,19 @@ async def lifespan(app: FastAPI): if requeued: log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart") + # ORCH-065: proactive startup reclaim of dead/stale merge-leases, next to the + # queue-recovery above. A lease held by the previous (now dead) process pid is + # released at once instead of waiting for the TTL / a foreign acquire so the + # next merge is not blocked. Conditional (merge_gate_repos / self-hosting) and + # gated by ORCH_LEASE_RECLAIM_ENABLED; never raises. + try: + from .job_reaper import reclaim_all_stale_leases + reclaimed = reclaim_all_stale_leases() + if reclaimed: + log.warning(f"Startup lease-reclaim: reclaimed {reclaimed} stale merge-lease(s)") + except Exception as e: + log.warning(f"Startup lease-reclaim skipped: {e}") + # L-2: rotate old per-run logs at startup (best-effort; never fatal). try: import os as _os @@ -85,13 +98,22 @@ async def lifespan(app: FastAPI): from .reconciler import reconciler reconciler.start() + # ORCH-065: start the job-reaper LAST (after requeue_running_jobs + the worker + # + the reconciler) so its atomic status='running' guard never races the + # startup requeue. It reaps zombie jobs and periodically reclaims stale + # merge-leases. Kill-switch: ORCH_REAPER_ENABLED. + from .job_reaper import reaper + reaper.start() + try: yield finally: - # Graceful shutdown order mirrors startup in reverse: stop the reconciler - # first (it must not enqueue new work while the worker is winding down), - # then the worker. Running agents keep going; their jobs are requeued on - # next start via queue-recovery if the process dies. + # Graceful shutdown order mirrors startup in reverse: stop the reaper + # first, then the reconciler (it must not enqueue new work while the + # worker is winding down), then the worker. Running agents keep going; + # their jobs are requeued on next start via queue-recovery if the + # process dies. + reaper.stop() reconciler.stop() worker.stop() @@ -123,6 +145,7 @@ async def queue(): from .db import job_status_counts, recent_jobs from .queue_worker import worker from .reconciler import reconciler + from .job_reaper import reaper from . import post_deploy return { "counts": job_status_counts(), @@ -130,6 +153,7 @@ async def queue(): "poll_interval": worker.poll_interval, "resilience": worker.status(), "reconcile": reconciler.status(), + "reaper": reaper.status(), "post_deploy": post_deploy.status(), "recent": recent_jobs(10), } diff --git a/src/merge_gate.py b/src/merge_gate.py index dc7a0e6..246aeb8 100644 --- a/src/merge_gate.py +++ b/src/merge_gate.py @@ -338,3 +338,141 @@ def release_merge_lease(repo: str, branch: str | None = None) -> None: return except OSError as e: logger.warning("merge-lease release error for %s: %s", repo, e) + + +# --------------------------------------------------------------------------- +# ORCH-065: proactive stale/dead merge-lease reclaim (Problem B) +# --------------------------------------------------------------------------- +def pid_alive(pid) -> bool: + """Return True iff process ``pid`` is alive (``os.kill(pid, 0)`` liveness probe). + + Semantics (ADR-001 Р-2, never-raise): + * ``ProcessLookupError`` -> the process is gone -> ``False`` (reclaimable). + * ``PermissionError`` -> the pid exists but is owned by another user -> + ``True`` (alive; conservatively do NOT reclaim). + * missing / invalid pid -> ``True`` (conservative: a lease that predates the + pid field, or a malformed pid, is NOT reclaimed on the liveness signal — + the TTL backstop still catches it). + Never raises; any unexpected OS/type error -> conservative ``True``. + """ + if not pid: + return True + try: + os.kill(int(pid), 0) + return True + except ProcessLookupError: + return False + except PermissionError: + return True + except (OSError, ValueError, TypeError): + return True + + +def _lease_reclaim_applies(repo: str) -> bool: + """Whether proactive lease-reclaim is REAL for ``repo`` (same scope as merge-gate). + + Reuses ``qg.checks._merge_gate_applies`` (``merge_gate_repos`` CSV, else the + self-hosting ``orchestrator``) so reclaim and the gate share one predicate + (ADR-001 Р-2 / FR-2.4). Imported lazily to avoid an import cycle (qg.checks + imports merge_gate lazily inside ``check_branch_mergeable``). Never raises: + any error -> ``False`` (no-op, the safe default). + """ + try: + from .qg.checks import _merge_gate_applies + return _merge_gate_applies(repo) + except Exception as e: # noqa: BLE001 - never-raise contract + logger.warning("lease-reclaim applicability check failed for %s: %s", repo, e) + return False + + +def reclaim_stale_lease(repo: str) -> bool: + """Proactively reclaim a dead/stale merge-lease for ``repo`` (ADR-001 Р-2). + + Unlike the lazy TTL reclaim inside ``acquire_merge_lease`` (which only fires + when ANOTHER task tries to acquire), this releases the lease as soon as the + holder is provably gone — without waiting for the TTL or a foreign acquire: + + * holder pid is dead (``pid_alive`` is False) -> reclaim, OR + * lease age >= ``merge_lock_timeout_s`` (TTL) -> reclaim (AC-7). + + A LIVE holder within its TTL is never touched (AC-8 — protects a legitimate + in-flight merge). Reclaim is holder-aware (``release_merge_lease(repo, + branch=holder)``) so it can never delete a lease a different task acquired in + the meantime. Conditional (FR-2.4): real only for ``merge_gate_repos`` / + self-hosting; other repos -> no-op. Kill-switch ``lease_reclaim_enabled``. + + Returns True iff a lease was reclaimed. Never raises (AC-9): any read/remove + error is logged and swallowed so a single bad lease never kills the reaper + thread. Does NOT run any git operation — only the lease file is removed. + """ + try: + if not settings.lease_reclaim_enabled: + return False + if not _lease_reclaim_applies(repo): + return False + path = _lease_path(repo) + existing = _read_lease(path) + if existing is None: + return False # no lease (or unreadable -> _read_lease already logged) + holder = existing.get("branch") + pid = existing.get("pid") + age = time.time() - float(existing.get("acquired_at") or 0) + dead = not pid_alive(pid) + expired = age >= settings.merge_lock_timeout_s + if not (dead or expired): + return False # live holder within TTL -> protect legitimate merge + why = f"dead pid={pid}" if dead else f"stale age={age:.0f}s>=TTL" + release_merge_lease(repo, branch=holder) + logger.warning( + "merge-lease for %s reclaimed proactively (%s, holder=%s)", + repo, why, holder, + ) + try: + from .notifications import send_telegram + send_telegram( + f"\U0001f527 merge-lease для {repo} освобождён проактивно " + f"({why}, holder={holder})" + ) + except Exception as e: # noqa: BLE001 - telegram best-effort, never fatal + logger.warning("lease-reclaim telegram failed for %s: %s", repo, e) + return True + except Exception as e: # noqa: BLE001 - never-raise contract + logger.warning("reclaim_stale_lease unexpected error for %s: %s", repo, e) + return False + + +# --------------------------------------------------------------------------- +# ORCH-065: idempotent merge finalization guard (Problem C) +# --------------------------------------------------------------------------- +def pr_already_merged(repo: str, branch: str) -> bool: + """Return True iff the PR for ``branch`` is ALREADY merged (ADR-001 Р-3, FR-3.2). + + A deterministic, read-only guard the merge path consults BEFORE attempting a + (second) merge so a re-driven / reaped task is idempotent: an already-merged + PR -> no-op, never a duplicate merge and never an error. This is the ONLY new + merge-related helper and it does NOT merge — it only READS the PR state via + the existing Gitea client, so it does not introduce duplicate merge logic. + + Queries Gitea ``GET /repos/{owner}/{repo}/pulls?state=all&head=`` and + reports True when any matching PR has ``merged == True``. Never raises (AC-9): + any HTTP/parse error -> ``False`` (conservative: "not known-merged" lets the + normal gate re-evaluate rather than silently skipping a real merge). + """ + try: + import httpx + owner = settings.gitea_owner + headers = {"Authorization": f"token {settings.gitea_token}"} + resp = httpx.get( + f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls", + params={"state": "all", "head": branch}, + headers=headers, timeout=_SHORT_TIMEOUT, + ) + if resp.status_code != 200: + return False + for pr in resp.json() or []: + if pr.get("merged") is True: + return True + return False + except Exception as e: # noqa: BLE001 - never-raise contract + logger.warning("pr_already_merged check failed for %s/%s: %s", repo, branch, e) + return False diff --git a/tests/test_config.py b/tests/test_config.py index ea44e0c..6957461 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -165,3 +165,82 @@ def test_staging_infra_tolerance_env_override_true(monkeypatch): """The field is read verbatim from its ORCH_* env var.""" monkeypatch.setenv("ORCH_STAGING_INFRA_TOLERANCE_ENABLED", "true") assert Settings().staging_infra_tolerance_enabled is True + + +# --------------------------------------------------------------------------- +# ORCH-065 / TC-20: reaper_* + lease_reclaim_* settings defaults + env override. +# --------------------------------------------------------------------------- +_REAPER_ENV = ( + "ORCH_REAPER_ENABLED", + "ORCH_REAPER_INTERVAL_S", + "ORCH_REAPER_DEAD_TICKS", + "ORCH_REAPER_MAX_RUNNING_S", + "ORCH_LEASE_RECLAIM_ENABLED", +) + + +def test_reaper_settings_defaults(monkeypatch): + """TC-20 / §5: documented defaults when no env is set.""" + for name in _REAPER_ENV: + monkeypatch.delenv(name, raising=False) + s = Settings() + assert s.reaper_enabled is True + assert s.reaper_interval_s == 60 + assert s.reaper_dead_ticks == 2 + assert s.reaper_max_running_s == 3600 + assert s.lease_reclaim_enabled is True + + +def test_reaper_settings_env_override(monkeypatch): + """TC-20 / §5 / AC-14: each field is read from its ORCH_* env var.""" + monkeypatch.setenv("ORCH_REAPER_ENABLED", "false") + monkeypatch.setenv("ORCH_REAPER_INTERVAL_S", "30") + monkeypatch.setenv("ORCH_REAPER_DEAD_TICKS", "5") + monkeypatch.setenv("ORCH_REAPER_MAX_RUNNING_S", "1200") + monkeypatch.setenv("ORCH_LEASE_RECLAIM_ENABLED", "false") + s = Settings() + assert s.reaper_enabled is False + assert s.reaper_interval_s == 30 + assert s.reaper_dead_ticks == 5 + assert s.reaper_max_running_s == 1200 + assert s.lease_reclaim_enabled is False + + +# --------------------------------------------------------------------------- +# ORCH-065 / TC-19: contracts unchanged — no new stages / QG checks; the +# check_branch_mergeable signature is intact (AC-13). +# --------------------------------------------------------------------------- +def test_tc19_stage_transitions_unchanged(): + """No new pipeline stage was introduced by ORCH-065.""" + from src.stages import STAGE_TRANSITIONS + assert set(STAGE_TRANSITIONS) == { + "created", "analysis", "architecture", "development", "review", + "testing", "deploy-staging", "deploy", "done", + } + + +def test_tc19_qg_checks_registry_unchanged(): + """No new quality-gate check was added to the registry by ORCH-065.""" + from src.qg.checks import QG_CHECKS + assert set(QG_CHECKS) == { + "check_analysis_approved", + "check_analysis_complete", + "check_architecture_done", + "check_ci_green", + "check_review_approved", + "check_tests_passed", + "check_reviewer_verdict", + "check_tests_local", + "check_deploy_status", + "check_staging_status", + "check_branch_mergeable", + "check_staging_image_fresh", + } + + +def test_tc19_check_branch_mergeable_signature_intact(): + """check_branch_mergeable still takes exactly (repo, work_item_id, branch).""" + import inspect + from src.qg.checks import check_branch_mergeable + params = list(inspect.signature(check_branch_mergeable).parameters) + assert params == ["repo", "work_item_id", "branch"] diff --git a/tests/test_job_reaper.py b/tests/test_job_reaper.py new file mode 100644 index 0000000..683cff4 --- /dev/null +++ b/tests/test_job_reaper.py @@ -0,0 +1,285 @@ +"""ORCH-065: job-reaper unit tests (TC-01..TC-08, TC-21). + +The reaper never spawns claude; we drive the DB directly (a 'running' jobs row + +optional agent_runs exit_code/pid) and assert the terminal flip + side-effects. +``os.kill`` liveness is monkeypatched so a 'dead'/'alive' pid is deterministic. +""" +import os +import tempfile + +import pytest + +# Override env before importing app modules (same convention as test_queue.py). +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch_reaper.db") +os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() +os.environ["ORCH_GITEA_TOKEN"] = "test-token" +os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" + +import src.db as db +from src.db import init_db, get_db, enqueue_job, get_job +import src.job_reaper as jr +from src.job_reaper import JobReaper + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "reaper.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + init_db() + yield + + +# --- helpers ---------------------------------------------------------------- +def _make_running_job(agent="developer", repo="orchestrator", task_id=None, + pid=None, age_s=0, attempts=0, max_attempts=2, + run_id=None, exit_code=None): + """Insert a job already in 'running' with the given pid/age/attempts. + + started_at is back-dated by ``age_s`` seconds so running_age_s reflects it. + When ``exit_code`` is given an agent_runs row is created and linked (Tier-2). + """ + conn = get_db() + if run_id is None and exit_code is not None: + cur = conn.execute( + "INSERT INTO agent_runs (task_id, agent, finished_at, exit_code) " + "VALUES (?, ?, datetime('now'), ?)", + (task_id, agent, exit_code), + ) + run_id = cur.lastrowid + cur = conn.execute( + "INSERT INTO jobs (agent, repo, task_id, status, attempts, max_attempts, " + "run_id, pid, started_at) " + "VALUES (?, ?, ?, 'running', ?, ?, ?, ?, datetime('now', ?))", + (agent, repo, task_id, attempts, max_attempts, run_id, pid, + f"-{int(age_s)} seconds"), + ) + job_id = cur.lastrowid + conn.commit() + conn.close() + return job_id + + +def _make_task(repo="orchestrator", branch="feature/x", stage="development", + work_item_id="ORCH-1"): + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " + "VALUES (?, ?, ?, ?, ?)", + (work_item_id, work_item_id, repo, branch, stage), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +def _dead_pid(monkeypatch): + """Force merge_gate.pid_alive -> False (process gone) for the reaper.""" + import src.merge_gate as mg + monkeypatch.setattr(mg, "pid_alive", lambda pid: False) + + +def _alive_pid(monkeypatch): + import src.merge_gate as mg + monkeypatch.setattr(mg, "pid_alive", lambda pid: True) + + +# --- TC-01: dead executor -> reaped without process restart ----------------- +def test_tc01_dead_pid_reaped_to_queued(monkeypatch): + _dead_pid(monkeypatch) + jid = _make_running_job(pid=999999, attempts=0, max_attempts=2) + r = JobReaper() + r.reap_once() # tick 1 (streak=1, dead_ticks default 2 -> not yet) + assert get_job(jid)["status"] == "running" + r.reap_once() # tick 2 -> reaped + assert get_job(jid)["status"] == "queued" + assert r.reaped_total == 1 + assert r.last_reaped["job_id"] == jid + + +# --- TC-02: live agent within timeout is NEVER reaped ----------------------- +def test_tc02_alive_pid_never_reaped(monkeypatch): + _alive_pid(monkeypatch) + jid = _make_running_job(pid=4321, age_s=10) + r = JobReaper() + for _ in range(5): + r.reap_once() + assert get_job(jid)["status"] == "running" + assert r.reaped_total == 0 + + +def test_tc02_alive_within_max_running_not_reaped(monkeypatch): + _alive_pid(monkeypatch) + monkeypatch.setattr(db.settings, "reaper_max_running_s", 3600) + jid = _make_running_job(pid=4321, age_s=1800) # < ceiling, alive + r = JobReaper() + r.reap_once() + assert get_job(jid)["status"] == "running" + + +# --- TC-03: zombie only after reaper_dead_ticks consecutive ticks ----------- +def test_tc03_requires_consecutive_dead_ticks(monkeypatch): + monkeypatch.setattr(db.settings, "reaper_dead_ticks", 3) + import src.merge_gate as mg + # Dead, dead, ALIVE (resets), dead, dead, dead -> reaped only on the 6th tick. + seq = iter([False, False, True, False, False, False]) + monkeypatch.setattr(mg, "pid_alive", lambda pid: next(seq)) + jid = _make_running_job(pid=999998) + r = JobReaper() + for _ in range(5): + r.reap_once() + assert get_job(jid)["status"] == "running" + r.reap_once() # 6th tick: third CONSECUTIVE dead -> reaped + assert get_job(jid)["status"] == "queued" + + +# --- TC-04: backstop ceiling reaps even when liveness is unknown ------------ +def test_tc04_backstop_ceiling(monkeypatch): + _alive_pid(monkeypatch) # liveness says "alive", but age exceeds the ceiling + monkeypatch.setattr(db.settings, "reaper_max_running_s", 100) + jid = _make_running_job(pid=4321, age_s=500) + r = JobReaper() + r.reap_once() + assert get_job(jid)["status"] == "queued" + assert r.reaped_total == 1 + + +def test_tc04_backstop_no_pid(monkeypatch): + monkeypatch.setattr(db.settings, "reaper_max_running_s", 100) + jid = _make_running_job(pid=None, age_s=500) + r = JobReaper() + r.reap_once() + assert get_job(jid)["status"] == "queued" + + +# --- TC-05: correct outcome by exit_code (Tier-2) --------------------------- +def test_tc05_exit0_gate_green_done(monkeypatch): + # A developer job runs to LEAVE the 'architecture' stage (-> 'development'). + tid = _make_task(stage="architecture") + jid = _make_running_job(agent="developer", task_id=tid, exit_code=0) + # gate green -> advance succeeds (stage leaves the developer candidate set). + import src.agents.launcher as L + monkeypatch.setattr( + L.launcher, "_try_advance_stage", + lambda run_id, agent, repo, branch: db.update_task_stage(tid, "development"), + ) + r = JobReaper() + r.reap_once() + assert get_job(jid)["status"] == "done" + + +def test_tc05_exit0_gate_red_requeues(monkeypatch): + tid = _make_task(stage="architecture") + jid = _make_running_job(agent="developer", task_id=tid, exit_code=0, + attempts=0, max_attempts=2) + # gate red -> _try_advance_stage is a no-op (stage stays 'architecture'). + import src.agents.launcher as L + monkeypatch.setattr(L.launcher, "_try_advance_stage", + lambda run_id, agent, repo, branch: None) + r = JobReaper() + r.reap_once() + assert get_job(jid)["status"] == "queued" # exit0 but gate red -> not 'done' + + +def test_tc05_nonzero_exit_requeue_then_failed(monkeypatch): + sent = [] + monkeypatch.setattr(jr, "JobReaper", JobReaper) + tid = _make_task(stage="development") + jid = _make_running_job(agent="developer", task_id=tid, exit_code=1, + attempts=1, max_attempts=2) + r = JobReaper() + import src.notifications as notif + monkeypatch.setattr(notif, "send_telegram", lambda *a, **k: sent.append(a)) + r.reap_once() # attempts(1) < max(2) -> queued + assert get_job(jid)["status"] == "queued" + + # Now exhaust the budget. + jid2 = _make_running_job(agent="developer", task_id=tid, exit_code=1, + attempts=2, max_attempts=2) + r.reap_once() + assert get_job(jid2)["status"] == "failed" + assert sent, "failed reap must send a Telegram alert" + + +# --- TC-06: atomicity — reaper vs requeue_running_jobs (status guard) -------- +def test_tc06_atomic_no_double_reap(monkeypatch): + _dead_pid(monkeypatch) + monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1) + jid = _make_running_job(pid=999997, attempts=0, max_attempts=2) + # Simulate the startup requeue winning the row first. + n = db.requeue_running_jobs() + assert n == 1 + assert get_job(jid)["status"] == "queued" + # The reaper now scans: the row is no longer 'running' -> reap_running_job's + # WHERE status='running' guard yields rowcount 0 -> no second processing. + r = JobReaper() + r.reap_once() + assert get_job(jid)["status"] == "queued" + assert r.reaped_total == 0 + + +def test_tc06_reap_running_job_guard_returns_false_when_not_running(): + jid = enqueue_job("developer", "orchestrator") # status 'queued', not running + assert db.reap_running_job(jid, "done") is False + assert get_job(jid)["status"] == "queued" + + +# --- TC-07: kill-switch reaper_enabled=False -> no-op ----------------------- +def test_tc07_kill_switch(monkeypatch): + _dead_pid(monkeypatch) + monkeypatch.setattr(db.settings, "reaper_enabled", False) + monkeypatch.setattr(db.settings, "lease_reclaim_enabled", False) + jid = _make_running_job(pid=999996, age_s=99999) + r = JobReaper() + for _ in range(3): + r.reap_once() + assert get_job(jid)["status"] == "running" + assert r.reaped_total == 0 + + +# --- TC-08: never-raise — a DB/OS error in one tick does not propagate ------- +def test_tc08_never_raise_isolates_per_job(monkeypatch): + _dead_pid(monkeypatch) + monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1) + good = _make_running_job(pid=111, attempts=0, max_attempts=2) + bad = _make_running_job(pid=222, attempts=0, max_attempts=2) + + r = JobReaper() + orig = r._reap_job + + def boom(job): + if job["id"] == bad: + raise RuntimeError("simulated per-job failure") + return orig(job) + + monkeypatch.setattr(r, "_reap_job", boom) + # Must not raise despite the bad job blowing up. + r.reap_once() + # The good job is still reaped; the bad one is isolated (stays running). + assert get_job(good)["status"] == "queued" + assert get_job(bad)["status"] == "running" + + +def test_tc08_reap_once_outer_never_raises(monkeypatch): + monkeypatch.setattr(jr, "get_running_jobs", + lambda: (_ for _ in ()).throw(RuntimeError("db down"))) + r = JobReaper() + # reap_once swallows... actually get_running_jobs is iterated in the for; the + # _tick wrapper guarantees the loop never dies. Assert _tick is safe. + r._tick() + assert r.last_run_ts is not None + + +# --- TC-21: startup lease-reclaim + reaper start/stop smoke ----------------- +def test_tc21_reaper_start_stop_smoke(): + r = JobReaper(interval_s=0.05) + r.start() + assert r._thread is not None and r._thread.is_alive() + r.stop(timeout=2) + assert not r._thread.is_alive() + + +def test_tc21_reclaim_all_stale_leases_callable(monkeypatch): + # No lease files present -> 0 reclaimed, never raises (registration smoke). + monkeypatch.setattr(db.settings, "lease_reclaim_enabled", True) + assert jr.reclaim_all_stale_leases() == 0 diff --git a/tests/test_merge_gate.py b/tests/test_merge_gate.py index 4168e27..7554bb9 100644 --- a/tests/test_merge_gate.py +++ b/tests/test_merge_gate.py @@ -11,6 +11,7 @@ import subprocess import tempfile import time +import httpx import pytest # Env before importing app modules (same convention as the other suites). @@ -299,3 +300,56 @@ def test_tc11_release_missing_is_noop(lease_dir): # Releasing a non-existent lease never raises. merge_gate.release_merge_lease("orchestrator", "feature/none") merge_gate.release_merge_lease("orchestrator") # force form + + +# --------------------------------------------------------------------------- +# ORCH-065 / TC-16: idempotent merge finalization — pr_already_merged guard. +# --------------------------------------------------------------------------- +class _FakeResp: + def __init__(self, status_code, payload): + self.status_code = status_code + self._payload = payload + + def json(self): + return self._payload + + +def test_tc16_pr_already_merged_true(monkeypatch): + """A merged PR -> True so a re-driven/reaped task is a no-op (no second merge).""" + monkeypatch.setattr( + httpx, "get", + lambda *a, **k: _FakeResp(200, [{"number": 7, "merged": True}]), + ) + assert merge_gate.pr_already_merged("orchestrator", "feature/x") is True + + +def test_tc16_pr_open_not_merged_false(monkeypatch): + """An open / not-yet-merged PR -> False (the normal merge path proceeds).""" + monkeypatch.setattr( + httpx, "get", + lambda *a, **k: _FakeResp(200, [{"number": 7, "merged": False}]), + ) + assert merge_gate.pr_already_merged("orchestrator", "feature/x") is False + + +def test_tc16_pr_no_pr_false(monkeypatch): + monkeypatch.setattr( + httpx, "get", lambda *a, **k: _FakeResp(200, []), + ) + assert merge_gate.pr_already_merged("orchestrator", "feature/x") is False + + +def test_tc16_pr_already_merged_never_raises(monkeypatch): + """Any HTTP/parse error -> False (conservative), never an exception (AC-9).""" + def boom(*a, **k): + raise RuntimeError("gitea down") + + monkeypatch.setattr(httpx, "get", boom) + assert merge_gate.pr_already_merged("orchestrator", "feature/x") is False + + +def test_tc16_pr_non_200_false(monkeypatch): + monkeypatch.setattr( + httpx, "get", lambda *a, **k: _FakeResp(500, None), + ) + assert merge_gate.pr_already_merged("orchestrator", "feature/x") is False diff --git a/tests/test_merge_gate_race.py b/tests/test_merge_gate_race.py index f9c5ea5..0e65fc6 100644 --- a/tests/test_merge_gate_race.py +++ b/tests/test_merge_gate_race.py @@ -148,3 +148,63 @@ def test_tc24_red_catch_up_fails_and_releases_main_stays_green(race_repo, monkey assert _origin_main_sha(origin) == main_before # The lease was released on failure (a later task can proceed). assert merge_gate._read_lease(merge_gate._lease_path(repo)) is None + + +# --------------------------------------------------------------------------- +# ORCH-065 / TC-17: recovery — "rebase+re-test green, merge not done, process +# died" -> reaper requeues -> the merge re-drives the STANDARD path WITHOUT a +# second expensive re-test when safe (the branch is already up-to-date). AC-10. +# --------------------------------------------------------------------------- +def test_tc17_redrive_skips_expensive_retest_when_already_caught_up( + race_repo, monkeypatch +): + repo, origin = race_repo + main_before = _origin_main_sha(origin) + + # First pass: B catches up (real rebase onto C1) with a GREEN re-test. This is + # the work that completed before the process died — the lease is held, the + # branch is now caught up on origin. + retest_calls = [] + + def _retest(r, b): + retest_calls.append((r, b)) + return True, "re-test green" + + monkeypatch.setattr(merge_gate, "retest_branch", _retest) + passed, reason = check_branch_mergeable(repo, "ORCH-B", "feature/B") + assert passed is True + assert reason == "rebased onto main, re-test green" + assert len(retest_calls) == 1 # the expensive re-test ran ONCE + + # The process "died" before the merge: release the lease the way the reaper / + # reconciler recovery path would (the row is requeued; the branch stays caught + # up because the rebase was already pushed). + merge_gate.release_merge_lease(repo, "feature/B") + + # Re-drive (standard path) after recovery: the branch already contains + # origin/main, so branch_is_behind_main is False and the gate short-circuits to + # the up-to-date pass WITHOUT re-running the expensive rebase+re-test. + assert merge_gate.branch_is_behind_main(repo, "feature/B") is False + passed2, reason2 = check_branch_mergeable(repo, "ORCH-B", "feature/B") + assert passed2 is True + assert reason2 == "branch up-to-date with main" + assert len(retest_calls) == 1 # NOT re-run on the re-drive (no double cost) + # origin/main was never pushed by the gate across the whole recovery. + assert _origin_main_sha(origin) == main_before + + +def test_tc17_pr_already_merged_makes_redrive_a_noop(race_repo, monkeypatch): + """If the PR actually merged before the process died, the idempotency guard + reports it so the re-drive is a no-op (no second merge).""" + import httpx + repo, _ = race_repo + + class _R: + status_code = 200 + + @staticmethod + def json(): + return [{"merged": True}] + + monkeypatch.setattr(httpx, "get", lambda *a, **k: _R()) + assert merge_gate.pr_already_merged(repo, "feature/B") is True diff --git a/tests/test_merge_lease_reclaim.py b/tests/test_merge_lease_reclaim.py new file mode 100644 index 0000000..f9d421d --- /dev/null +++ b/tests/test_merge_lease_reclaim.py @@ -0,0 +1,138 @@ +"""ORCH-065: proactive stale/dead merge-lease reclaim (TC-10..TC-15). + +Exercises merge_gate.reclaim_stale_lease / pid_alive directly with lease files +written into a tmp repos_dir. No git ops run (reclaim only removes the lease +file). pid liveness is monkeypatched so 'dead'/'alive' are deterministic. +""" +import json +import os +import tempfile +import time + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch_lease.db") +os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() +os.environ["ORCH_GITEA_TOKEN"] = "test-token" +os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" + +from src import merge_gate + + +@pytest.fixture +def repos_dir(tmp_path, monkeypatch): + d = tmp_path / "repos" + d.mkdir() + monkeypatch.setattr(merge_gate.settings, "repos_dir", str(d)) + monkeypatch.setattr(merge_gate.settings, "lease_reclaim_enabled", True) + monkeypatch.setattr(merge_gate.settings, "merge_gate_repos", "") # self-hosting only + monkeypatch.setattr(merge_gate.settings, "merge_lock_timeout_s", 300) + return d + + +def _write_lease(repos_dir, repo, branch="feature/x", pid=1234, age_s=0): + path = os.path.join(str(repos_dir), f".merge-lease-{repo}.json") + holder = { + "branch": branch, + "work_item_id": "ORCH-1", + "task_id": 1, + "acquired_at": time.time() - age_s, + "pid": pid, + } + with open(path, "w", encoding="utf-8") as f: + f.write(json.dumps(holder)) + return path + + +def _no_telegram(monkeypatch): + import src.notifications as notif + monkeypatch.setattr(notif, "send_telegram", lambda *a, **k: None) + + +# --- TC-10: reclaim a lease with a DEAD pid, proactively -------------------- +def test_tc10_reclaim_dead_pid(repos_dir, monkeypatch): + _no_telegram(monkeypatch) + path = _write_lease(repos_dir, "orchestrator", pid=999999, age_s=0) + monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: False) + assert merge_gate.reclaim_stale_lease("orchestrator") is True + assert not os.path.exists(path) # lease removed + + +# --- TC-11: reclaim by TTL is preserved ------------------------------------- +def test_tc11_reclaim_by_ttl(repos_dir, monkeypatch): + _no_telegram(monkeypatch) + # pid alive, but the lease is older than the TTL -> still reclaimed. + path = _write_lease(repos_dir, "orchestrator", pid=4321, age_s=999) + monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: True) + assert merge_gate.reclaim_stale_lease("orchestrator") is True + assert not os.path.exists(path) + + +# --- TC-12: a LIVE lease within TTL is NOT released ------------------------- +def test_tc12_live_lease_protected(repos_dir, monkeypatch): + _no_telegram(monkeypatch) + path = _write_lease(repos_dir, "orchestrator", pid=4321, age_s=10) + monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: True) + assert merge_gate.reclaim_stale_lease("orchestrator") is False + assert os.path.exists(path) # untouched + + +# --- TC-13: conditional — non-self-hosting repos are a no-op ---------------- +def test_tc13_non_scope_repo_noop(repos_dir, monkeypatch): + _no_telegram(monkeypatch) + path = _write_lease(repos_dir, "enduro-trails", pid=999999, age_s=999) + monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: False) + assert merge_gate.reclaim_stale_lease("enduro-trails") is False + assert os.path.exists(path) # out of scope -> untouched + + +def test_tc13_merge_gate_repos_csv_scope(repos_dir, monkeypatch): + _no_telegram(monkeypatch) + monkeypatch.setattr(merge_gate.settings, "merge_gate_repos", "enduro-trails") + path = _write_lease(repos_dir, "enduro-trails", pid=999999, age_s=0) + monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: False) + assert merge_gate.reclaim_stale_lease("enduro-trails") is True + assert not os.path.exists(path) + + +# --- TC-14: never-raise on a read/remove error ------------------------------ +def test_tc14_never_raise_on_read_error(repos_dir, monkeypatch): + _no_telegram(monkeypatch) + _write_lease(repos_dir, "orchestrator", pid=1, age_s=999) + + def boom(path): + raise OSError("simulated read failure") + + monkeypatch.setattr(merge_gate, "_read_lease", boom) + # Must not raise; returns False (could not reclaim). + assert merge_gate.reclaim_stale_lease("orchestrator") is False + + +def test_tc14_no_lease_file_is_noop(repos_dir, monkeypatch): + _no_telegram(monkeypatch) + assert merge_gate.reclaim_stale_lease("orchestrator") is False + + +# --- TC-15: kill-switch lease_reclaim_enabled=False ------------------------- +def test_tc15_kill_switch(repos_dir, monkeypatch): + _no_telegram(monkeypatch) + monkeypatch.setattr(merge_gate.settings, "lease_reclaim_enabled", False) + path = _write_lease(repos_dir, "orchestrator", pid=999999, age_s=999) + monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: False) + assert merge_gate.reclaim_stale_lease("orchestrator") is False + assert os.path.exists(path) # proactive reclaim off -> untouched + + +# --- pid_alive semantics ---------------------------------------------------- +def test_pid_alive_dead_process(): + # PID 999999999 almost certainly does not exist. + assert merge_gate.pid_alive(999999999) is False + + +def test_pid_alive_self(): + assert merge_gate.pid_alive(os.getpid()) is True + + +def test_pid_alive_missing_pid_conservative(): + assert merge_gate.pid_alive(None) is True + assert merge_gate.pid_alive(0) is True diff --git a/tests/test_queue.py b/tests/test_queue.py index f6342e8..ce2d831 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -302,3 +302,58 @@ class TestWorkerConcurrency: assert count_running_jobs() == 0 counts = job_status_counts() assert counts["failed"] == 1 + + +# --------------------------------------------------------------------------- +# ORCH-065: job-reaper unblocks the shared queue (TC-09) + /queue block (TC-18) +# --------------------------------------------------------------------------- +class TestReaperUnblocksQueue: + def test_tc09_reap_unblocks_claim_at_concurrency_1(self, monkeypatch): + """A zombie 'running' row at max_concurrency=1 blocks every claim; once the + reaper reaps it the next queued job can be claimed (AC-2).""" + import src.merge_gate as mg + from src.job_reaper import JobReaper + + monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1) + monkeypatch.setattr(mg, "pid_alive", lambda pid: False) # zombie pid dead + + # A zombie row stuck 'running' with a dead pid. + conn = db.get_db() + cur = conn.execute( + "INSERT INTO jobs (agent, repo, status, attempts, max_attempts, pid, " + "started_at) VALUES ('developer','r','running',2,2,999999,datetime('now'))" + ) + zombie = cur.lastrowid + conn.commit() + conn.close() + + # A second job waits in the queue behind it. + nxt = enqueue_job("analyst", "r") + + # At concurrency 1 the slot is fully occupied -> nothing else can run. + assert count_running_jobs() == 1 + + monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) + JobReaper().reap_once() # dead pid, attempts>=max -> failed + + assert get_job(zombie)["status"] == "failed" + assert count_running_jobs() == 0 + # Queue is unblocked: the next job claims successfully. + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == nxt + + def test_tc18_queue_endpoint_has_reaper_block(self): + """GET /queue exposes the reaper observability block (AC-15). + + Calls the endpoint coroutine directly (no lifespan / no background + threads / no network) so the test stays hermetic. + """ + import asyncio + import src.main as main + + body = asyncio.run(main.queue()) + assert "reaper" in body + reaper = body["reaper"] + for key in ("enabled", "interval", "last_run_ts", "reaped_total", + "last_reaped", "lease_reclaimed_total"): + assert key in reaper