diff --git a/.env.example b/.env.example index 9e560f2..c1fe3f3 100644 --- a/.env.example +++ b/.env.example @@ -453,6 +453,18 @@ ORCH_REAPER_MAX_RUNNING_S=5400 ORCH_REAPER_FINALIZE_GRACE_S=300 ORCH_LEASE_RECLAIM_ENABLED=true +# ORCH-126 (adr-0052): run-ownership hygiene of the `jobs` row — invariant +# `status='queued' => run_id IS NULL AND pid IS NULL AND started_at IS NULL`. The BASE +# reset on every requeue/claim path (requeue_running_jobs / mark_job('queued') / +# mark_job_transient / reap_running_job('queued') / claim_next_job) is UNCONDITIONAL +# (no flag — it fixes a data invariant). This kill-switch gates ONLY the optional +# detect/self-heal sweep of "impossible" queued rows (a queued job still carrying +# run_id/pid/started_at — the incident state of job 2286) run at startup + on each +# reaper tick, plus its read-only /queue counter (reaper.impossible_queued_total). +# IMPOSSIBLE_QUEUED_SANITIZE_ENABLED -> default true; false -> the sweep is a no-op +# (D1-D3 still enforce the invariant going forward). +ORCH_IMPOSSIBLE_QUEUED_SANITIZE_ENABLED=true + # ORCH-114 (adr-0045): durable transition-ownership lease + expected-stage CAS for # side-effectful stage transitions. Generalises the process-local ORCH-113 finalizer- # liveness into a DURABLE, cross-path owner-exclusion (additive table `transition_lease`) diff --git a/.task-dev.md b/.task-dev.md index 327ea1c..549f85f 100644 --- a/.task-dev.md +++ b/.task-dev.md @@ -1,4 +1,4 @@ -Work item: ORCH-124 +Work item: ORCH-126 Repo: orchestrator -Branch: feature/ORCH-124-bug-serial-gate-treats-backlog +Branch: feature/ORCH-126-bug-queued-job-can-keep-stale- Stage: development \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 17cd6e1..29e3529 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,14 @@ Формат: [Keep a Changelog](https://keepachangelog.com/). Записи — на смысловой PR/задачу. ## [Unreleased] +- **Гигиена run-ownership строки `jobs` — инвариант «queued ⇒ run_id/pid/started_at IS NULL»** (ORCH-126, `fix`, трек Bug): багфикс контрол-плейна (инцидент ORCH-124/125) — при `ORCH_SERIAL_GATE_ENABLED=false` queued analyst-job'ы зависали навсегда (job 2286: `status=queued + run_id=759/760 + pid=35/42 + started_at=NULL` — физически невозможное состояние). **Причина:** ни один путь возврата job в `queued` (restart `requeue_running_jobs` / retry `mark_job('queued')` / transient `mark_job_transient` / reap `reap_running_job('queued')`) **не сбрасывал run-ownership** (`run_id`/`pid`); после рестарта контейнера pid мог быть **переиспользован** ОС → `pid_alive(stale)=True` → job-reaper (ORCH-065) Tier-1 «видел живой» фантомный `running` и при `max_concurrency=1` клинил клейм **всей** общей очереди всех проектов. **Инвариант (adr-0052):** `status='queued' ⇒ run_id IS NULL AND pid IS NULL AND started_at IS NULL` — queued-job никогда не несёт run-ownership (история run'а — в `agent_runs`, не в `jobs.run_id`). Фикс на **существующих колонках**: `STAGE_TRANSITIONS` / реестр `QG_CHECKS` / `check_*` / machine-verdict-ключи / **схема БД** — байт-в-байт не тронуты; для здоровых job'ов и enduro поведение байт-в-байт; миграция не требуется. ADR: `docs/work-items/ORCH-126/06-adr/ADR-001-queued-job-run-ownership-hygiene.md`, сквозной `docs/architecture/adr/adr-0052-queued-job-run-ownership-invariant.md`. + - **D1 — Forward-cleanup на всех путях возврата в `queued` (FR-1/AC-1):** `requeue_running_jobs` / `mark_job('queued')` / `mark_job_transient` / `reap_running_job('queued')` выставляют `run_id=NULL, pid=NULL` той же UPDATE-транзакцией, что чистит `started_at`/`finished_at`. Атомарные `status`-guard'ы (`reap_running_job … WHERE status='running'`, rowcount) — **сохранены байт-в-байт** (restart-safe, гонка worker↔reaper↔monitor — TR-4). Каллер-переданный `run_id` для `queued` **игнорируется** (инвариант важнее: `launcher._finalize_permanent`/reaper по-прежнему передают старый `run_id`, но для `queued` он сбрасывается). Безусловно — исправление инварианта данных, без флага (D6). + - **D2 — Чистый claim (FR-2/AC-3):** `claim_next_job` при флипе `queued→running` сбрасывает `pid=NULL, run_id=NULL` тем же существующим UPDATE (defense-in-depth поверх D1) → между claim и стампом `pid` в `_spawn` строка несёт `pid IS NULL`, не чужой pid. SELECT-гейт (`status='queued' AND available_at<=now` + dep/serial-gate) — **не тронут** (offline hot-path, NFR-2; без нового SELECT/сети). + - **D3 — Окно `_spawn` (FR-3/AC-6):** провал `_spawn` до стампа `pid` (`ensure_worktree`/материализация ветки/запись task-файла) → `queue_worker._drain_once` возвращает job через `mark_job('queued')` → по D1 строка чистая; повторный claim стартует штатно (без «частично стартовавшего» зависания). Нового кода в launcher не потребовалось. + - **D4 — Детект + self-heal невозможного состояния (FR-4/AC-5):** `db.find_impossible_queued_jobs()`/`db.sanitize_impossible_queued()` идемпотентно приводят «невозможные» queued-строки (`queued` с непустым `run_id`/`pid`/`started_at`) к чистому `queued`; вызывается при старте (`main.lifespan` после `requeue_running_jobs`) и на каждом реап-тике (`JobReaper.sanitize_impossible_queued_once`, never-raise) — закрывает уже-существующие аномалии на проблемной БД **без миграции** (TR-7) и забытый будущий 6-й путь возврата (TR-2). Наблюдаемость: структурный WARNING (`job_id`/`run_id`/`pid`) + read-only счётчик `impossible_queued_total`/`last_impossible_queued` в блоке `reaper` снимка `GET /queue`. Kill-switch `impossible_queued_sanitize_enabled` (env `ORCH_IMPOSSIBLE_QUEUED_SANITIZE_ENABLED`, дефолт on; гейтит **только** D4-sweep, базовый сброс D1-D3 безусловен). + - **D5 — Корректность reaper-liveness (FR-5/AC-4) — валидация, не правка:** после D1-D3 reaper на свежеклеймленном `running` видит `pid IS NULL` → Tier-1 (`job_reaper.py:245`: `if pid is not None and not pid_alive(pid)`) пропускает, сбрасывает streak; Tier-3 backstop (`reaper_max_running_s`) — без изменений. **Правка reaper не требуется** — фикс восстанавливает предусловие «`pid` отражает процесс ЭТОГО run'а». Маркированные инварианты ORCH-065/113/114/099 — сохранены (трассировка CLAUDE.md §9). + - **Покрытие:** `tests/test_orch126_queued_stale_run.py` (TC-01 — обязательный регресс, КРАСНЫЙ до фикса / ЗЕЛЁНЫЙ после: stale `running` → `requeue_running_jobs` → чистый `queued`; TC-02…TC-10: сброс на каждом пути, чистый claim, claim без старвейшна при serial-gate off, reaper не реапит `pid IS NULL`, self-heal идемпотентность + счётчик + kill-switch, окно `_spawn`, анти-регресс здорового job'а — терминальные исходы/`run_id`-линк не затронуты). Полный `pytest tests/ -q` — зелёный. + - **Доки:** `docs/architecture/internals.md` (раздел «Инвариант run-ownership строки `jobs`» + аннотации `jobs.run_id`/`pid` + queue-recovery), `.env.example` (флаг `ORCH_IMPOSSIBLE_QUEUED_SANITIZE_ENABLED` в блоке reaper); сквозной ADR `adr-0052` (уже заведён архитектором). - **Serial-gate «пауза без блокировки» — явный per-task park-сигнал** (ORCH-124, `fix`): багфикс (метка `Bug`, эскалирован в full-cycle) инцидента **ORCH-116/ORCH-123**. `serial_gate` определял «активную задачу репо» **исключительно по машинной стадии** `tasks.stage NOT IN ('done','cancelled')`, а Plane-статусы Backlog/Blocked/Needs-Input (слой B индикации, ORCH-066) **не меняют `tasks.stage`** (слой A) ⇒ приостановленный предшественник был неотличим от активного и держал FIFO-гейт закрытым против срочного успешника (ORCH-116 поставлен на паузу, чтобы пропустить фикс ORCH-123 — фикс не стартовал, пока ORCH-116 формально не `done`). У оператора не было чистого механизма «пауза без блокировки», отдельного от cancel (терминал) и от глобального выключения гейта. **Инвариант:** правка **планировщика очереди** (claim) и наблюдаемости, **не** Quality Gate — `STAGE_TRANSITIONS` / состав `QG_CHECKS` / семантика и имена `check_*` / machine-verdict ключи (`verdict:`/`result:`/`deploy_status:`/`staging_status:`/`security_status:`) / схемы существующих таблиц — **байт-в-байт не тронуты**. Аддитивно, под независимым под-флагом, never-raise, restart-safe, fail-OPEN на hot-claim сохранён. ADR: `docs/work-items/ORCH-124/06-adr/ADR-001-serial-gate-pause-without-blocking.md`, сквозной `docs/architecture/adr/adr-0051-serial-gate-pause-without-blocking.md`. - **Механизм (D1):** явный durable DB-сигнал «park» на уровне задачи, инициируемый оператором через API — **не** маппинг Plane-статуса (перегружал бы слой A/B ORCH-066 / анти-паттерн ORCH-059) и **не** `task_deps` (моделирует обратное направление «B ждёт A»). Чистое намерение, отличное от cancel и от kill-switch; DB-резолвимо, offline, webhook-независимо (потерянный webhook не рассинхронит сигнал). - **Хранилище (D2):** аддитивная нуллабельная колонка `tasks.paused_at TEXT` через `_ensure_column` (паттерн `tasks.cancelled_at`/`cancel_requested_at`/`track`; `src/db.py`) — NULL = не на паузе; ISO-таймстамп = поставлена оператором на паузу. На уже-мигрированной БД — no-op; все существующие строки дефолтят в NULL ⇒ поведение до ORCH-124 до первой явной паузы (enduro не затронут на общей прод-БД). Хелперы `db.set_task_paused`/`clear_task_paused`/`is_task_paused` (never-raise; `is_task_paused` на ошибке → «не на паузе» = задача активна = гейт скорее закрыт = анти-stale-base-safe). diff --git a/docs/architecture/internals.md b/docs/architecture/internals.md index 82c958a..42063c3 100644 --- a/docs/architecture/internals.md +++ b/docs/architecture/internals.md @@ -402,8 +402,8 @@ webhook (plane/gitea) background thread (queue_worker) |--------|------------| | `status` | `queued` → `running` → `done` \| `failed` \| `cancelled` (ORCH-090: терминальный исход STOP-отмены, не реквью'ится) | | `attempts` / `max_attempts` | счётчик попыток (инкремент при claim) / лимит ретраев (default 2) | -| `run_id` | FK на `agent_runs.id` после старта | -| `pid` | (ORCH-065) pid агентского процесса (`proc.pid` из `_spawn`); liveness-сигнал для job-reaper. Добавляется `_ensure_column` (idempotent) | +| `run_id` | FK на `agent_runs.id` после старта. **ORCH-126 (adr-0052):** run-ownership; `queued ⇒ run_id IS NULL` (история run'а живёт в `agent_runs`, не в `jobs.run_id`) | +| `pid` | (ORCH-065) pid агентского процесса (`proc.pid` из `_spawn`); liveness-сигнал для job-reaper. Добавляется `_ensure_column` (idempotent). **ORCH-126 (adr-0052):** `queued ⇒ pid IS NULL` — иначе протухший (возможно переиспользованный ОС) pid ложно «оживает» в Tier-1 reaper и клинит очередь | | `task_content` | ТЗ, которое пишется в task-файл агента | | `error` | последняя ошибка | @@ -419,6 +419,10 @@ status='queued'` и проверяет `rowcount`. При гонке двух т В `main.py` lifespan **после** M-1 orphan-recovery вызывается `requeue_running_jobs()`: jobs со статусом `running` (воркер умёр на рестарте) → возвращаются в `queued`. +**ORCH-126 (adr-0052):** возврат в `queued` сбрасывает run-ownership (`run_id=NULL, pid=NULL` +вместе с `started_at`) — мёртвый воркер оставил их протухшими, и фантомный pid заклинил бы +Tier-1 reaper. Сразу следом `reaper.sanitize_impossible_queued_once()` идемпотентно санирует +любые «невозможные» queued-строки (`queued` с непустым `run_id`/`pid`/`started_at`). **ORCH-114 (adr-0045):** сразу следом вызывается `transition_lease.recover_on_startup()` — новый процесс имеет свежий `boot_id`, поэтому ВСЕ записанные ранее `transition_lease` устарели (boot-id mismatch) → реклеймятся, и только что requeued-jobs переисполняют свои @@ -475,6 +479,35 @@ claim делает `_try_advance_stage` (advance+enqueue) — проигравш / `ORCH_LEASE_RECLAIM_ENABLED`; снимок в `GET /queue` (блок `reaper`). Подробнее — adr-0011. +### Инвариант run-ownership строки `jobs` (ORCH-126, adr-0052) + +Колонки `jobs.run_id`/`jobs.pid` — **общий контракт liveness/идентичности run'а** (читают +job-reaper Tier-1 по `pid`, `/metrics` `get_running_agents`). Системный инвариант данных: + +> **`status='queued' ⇒ run_id IS NULL AND pid IS NULL AND started_at IS NULL`.** + +То есть **queued-job никогда не несёт run-ownership** — оно принадлежит ровно одной активной +попытке (`running` после стампа в `_spawn`). Корень дефекта (инцидент ORCH-124/125, job 2286 +`queued + run_id=759 + pid=35 + started_at=NULL`): ни один путь возврата в `queued` не сбрасывал +run-ownership, а после рестарта контейнера pid мог быть **переиспользован** ОС → `pid_alive(stale)` +ложно `True` → reaper «видел живой» фантомный `running` и при `max_concurrency=1` клинил клейм +**всей** общей очереди. Соблюдение (без смены схемы БД): +- **Forward-cleanup** — каждый путь перехода в `queued` (`requeue_running_jobs`, + `mark_job('queued')`, `mark_job_transient`, `reap_running_job('queued')`) выставляет + `run_id=NULL, pid=NULL` той же UPDATE-транзакцией, что чистит `started_at` (атомарные + `status`-guard'ы сохранены). Безусловно (исправление инварианта данных, без флага). +- **Clean claim (defense-in-depth)** — `claim_next_job` при флипе `queued→running` сбрасывает + stale `pid`/`run_id` тем же UPDATE → между claim и стампом `pid` в `_spawn` строка несёт + `pid IS NULL`. SELECT-гейт не тронут (offline hot-path). +- **Self-heal + наблюдаемость** — `db.sanitize_impossible_queued()` идемпотентно санирует + «невозможные» queued-строки при старте (`main.lifespan`) и на каждом реап-тике (never-raise, + kill-switch `ORCH_IMPOSSIBLE_QUEUED_SANITIZE_ENABLED`, дефолт on); счётчик + `impossible_queued_total` в блоке `reaper` снимка `GET /queue`. + +**Норматив:** любой новый путь возврата job в `queued` ОБЯЗАН соблюсти инвариант (сбросить +`run_id`/`pid`); reviewer ловит нарушение как ≥P1. Подробнее — adr-0052, +`docs/work-items/ORCH-126/06-adr/ADR-001-queued-job-run-ownership-hygiene.md`. + ### Конфиг - `ORCH_MAX_CONCURRENCY` (default 1) — лимит параллельных jobs. diff --git a/src/config.py b/src/config.py index 2d637d4..2cc2e00 100644 --- a/src/config.py +++ b/src/config.py @@ -722,6 +722,17 @@ class Settings(BaseSettings): lease_reclaim_enabled: bool = True reaper_finalizer_liveness_enabled: bool = True + # ORCH-126 (D4/FR-4): detect + self-heal "impossible" queued rows — a job that + # is `status='queued'` while still carrying run-ownership (run_id/pid/started_at), + # which is physically impossible (the incident state of job 2286: `queued + + # run_id=759 + pid=35 + started_at=NULL`). The BASE run-ownership reset on every + # requeue/claim path (D1-D3 in src/db.py) is UNCONDITIONAL — this kill-switch + # gates ONLY the optional detect/sanitize sweep (run at startup in main.lifespan + # and on each job-reaper tick) plus its read-only /queue counter. Default on; + # False -> the sweep is a no-op (D1-D3 still enforce the invariant going forward). + # never-raise: a sweep error is isolated and never wedges startup / the reaper. + impossible_queued_sanitize_enabled: bool = True + # ORCH-114 (adr-0045): durable transition-ownership lease + expected-stage CAS for # side-effectful stage transitions. Generalises the process-local ORCH-113 # finalizer-liveness to a DURABLE, cross-path owner-exclusion (additive table diff --git a/src/db.py b/src/db.py index 8089400..3567eef 100644 --- a/src/db.py +++ b/src/db.py @@ -1194,8 +1194,16 @@ def claim_next_job() -> dict | None: return None job_id = row["id"] cur = conn.execute( + # ORCH-126 (D2/FR-2): reset run-ownership on the queued->running flip + # (defense-in-depth over D1). Between this claim and the launcher + # stamping the real pid in _spawn, the row MUST carry pid IS NULL — + # not a stale (possibly OS-reused) pid that the job-reaper's Tier-1 + # liveness probe (ORCH-065) would mistake for this run's process. The + # SELECT gate above is untouched (offline hot path, NFR-2); this is + # part of the existing single UPDATE (no new SELECT / network). "UPDATE jobs SET status='running', " - "attempts = attempts + 1, started_at = datetime('now') " + "attempts = attempts + 1, started_at = datetime('now'), " + "pid = NULL, run_id = NULL " "WHERE id = ? AND status='queued'", (job_id,), ) @@ -1217,12 +1225,19 @@ def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int, Increments `transient_attempts` (separate from the code-fault `attempts`), sets status back to 'queued', and gates re-pickup via `available_at` = now + backoff seconds. started_at/finished_at are cleared. + + ORCH-126 (D1/FR-1): also resets run-ownership (run_id/pid) so the requeued job + obeys the invariant `status='queued' ⇒ run_id IS NULL AND pid IS NULL AND + started_at IS NULL` (see mark_job). The transient bookkeeping + (transient_attempts / available_at backoff) is preserved. """ conn = get_db() sets = [ "status='queued'", "transient_attempts = transient_attempts + 1", "available_at = datetime('now', ?)", + "run_id = NULL", + "pid = NULL", "started_at = NULL", "finished_at = NULL", ] @@ -1249,11 +1264,23 @@ def mark_job( - 'done'/'failed'/'cancelled' (ORCH-090) also stamp finished_at. - 'queued' (requeue for retry) clears started_at/finished_at so the next claim treats it as fresh. + + ORCH-126 (D1/FR-1): a 'queued' requeue ALSO resets run-ownership + (run_id/pid), enforcing the lifecycle invariant + ``status='queued' ⇒ run_id IS NULL AND pid IS NULL AND started_at IS NULL``. + A requeued job must carry NO run-ownership: a stale (and possibly OS-reused) + pid would fool the job-reaper's Tier-1 liveness probe (ORCH-065) into either + guarding a phantom 'running' forever (wedging the shared queue at + max_concurrency=1) or reaping a fresh start. The run history lives in + agent_runs, not jobs.run_id, so dropping the link is safe. Any caller-supplied + run_id is therefore IGNORED for 'queued' (the link is cleared) — callers such + as launcher._finalize_permanent / reaper still pass the old run_id for the + non-queued paths, where it is preserved as before. """ conn = get_db() sets = ["status = ?"] params: list = [status] - if run_id is not None: + if run_id is not None and status != "queued": sets.append("run_id = ?") params.append(run_id) if error is not None: @@ -1262,6 +1289,8 @@ def mark_job( if status in ("done", "failed", "cancelled"): sets.append("finished_at = datetime('now')") elif status == "queued": + sets.append("run_id = NULL") + sets.append("pid = NULL") sets.append("started_at = NULL") sets.append("finished_at = NULL") params.append(job_id) @@ -1477,10 +1506,17 @@ def requeue_running_jobs() -> int: died on restart -> put it back to 'queued'. attempts are kept as-is (the next claim does NOT re-increment beyond what is needed; claim_next_job increments on pickup). Returns the number of requeued jobs. + + ORCH-126 (D1/FR-1): also resets run-ownership (run_id/pid). The dead worker's + run_id/pid are stale by construction after a restart; leaving them would make a + just-requeued job carry a phantom pid that the job-reaper's Tier-1 probe could + mistake for a live process (incident: job 2286 ``queued + run_id=759 + pid=35 + + started_at=NULL``). Enforces ``status='queued' ⇒ run_id/pid/started_at IS NULL``. """ conn = get_db() cur = conn.execute( - "UPDATE jobs SET status='queued', started_at = NULL " + "UPDATE jobs SET status='queued', started_at = NULL, " + "run_id = NULL, pid = NULL " "WHERE status='running'" ) conn.commit() @@ -1632,12 +1668,17 @@ def reap_running_job( Status semantics match ``mark_job``: done/failed stamp ``finished_at``; queued clears ``started_at``/``finished_at`` so the next claim treats it as fresh. + + ORCH-126 (D1/FR-1): a 'queued' reap ALSO resets run-ownership (run_id/pid), + mirroring ``mark_job`` — the invariant ``status='queued' ⇒ run_id/pid/started_at + IS NULL`` (a reaper-requeued job must not carry the dead run's pid). Any + caller-supplied run_id is ignored for 'queued'. """ conn = get_db() try: sets = ["status = ?"] params: list = [status] - if run_id is not None: + if run_id is not None and status != "queued": sets.append("run_id = ?") params.append(run_id) if error is not None: @@ -1646,6 +1687,8 @@ def reap_running_job( if status in ("done", "failed", "cancelled"): # ORCH-090: cancelled is terminal sets.append("finished_at = datetime('now')") elif status == "queued": + sets.append("run_id = NULL") + sets.append("pid = NULL") sets.append("started_at = NULL") sets.append("finished_at = NULL") params.append(job_id) @@ -1659,6 +1702,54 @@ def reap_running_job( conn.close() +def find_impossible_queued_jobs() -> list[dict]: + """ORCH-126 (D4/FR-4): rows that violate the queued lifecycle invariant. + + An "impossible" queued state is ``status='queued' AND (run_id IS NOT NULL OR + pid IS NOT NULL OR started_at IS NOT NULL)`` — run-ownership is set but the run + never actually started (the incident state of job 2286). Read-only; returns the + offending rows' identity columns (id/run_id/pid/started_at + agent/repo) for + diagnostics. Empty list when the invariant holds everywhere. + """ + conn = get_db() + try: + rows = conn.execute( + "SELECT id, run_id, pid, started_at, agent, repo FROM jobs " + "WHERE status='queued' AND (run_id IS NOT NULL OR pid IS NOT NULL " + "OR started_at IS NOT NULL)" + ).fetchall() + finally: + conn.close() + return [dict(r) for r in rows] + + +def sanitize_impossible_queued() -> list[dict]: + """ORCH-126 (D4/FR-4): idempotently restore the queued invariant for any + "impossible" queued row (run-ownership set while queued). + + Returns the rows it healed (pre-heal id/run_id/pid/started_at) so the caller can + log them; empty list when nothing was anomalous. The UPDATE carries the same + ``status='queued'`` guard, so it can never touch a live ``running`` row (TR-1). + Used at startup (``main.lifespan``) and on each reaper tick to self-heal both a + forgotten future requeue path (TR-2) and pre-existing rows on a problematic DB + (TR-7) without a schema migration. + """ + anomalies = find_impossible_queued_jobs() + if not anomalies: + return [] + conn = get_db() + try: + conn.execute( + "UPDATE jobs SET run_id = NULL, pid = NULL, started_at = NULL " + "WHERE status='queued' AND (run_id IS NOT NULL OR pid IS NOT NULL " + "OR started_at IS NOT NULL)" + ) + conn.commit() + finally: + conn.close() + return anomalies + + def get_job(job_id: int) -> dict | None: """Fetch a single job by id.""" conn = get_db() diff --git a/src/job_reaper.py b/src/job_reaper.py index 3d790cb..569b7ba 100644 --- a/src/job_reaper.py +++ b/src/job_reaper.py @@ -154,10 +154,55 @@ class JobReaper: # monitor still owns a deploy-staging finalization. Reset on restart (safe: # startup requeue_running_jobs covers the restart path). self.finalizer_defers_total: int = 0 + # ORCH-126 (D4/FR-4): count of "impossible" queued rows self-healed (queued + # while carrying stale run-ownership) + the last healed batch. Reset on + # restart (safe: the startup sweep + every tick re-detect any residue). + self.impossible_queued_total: int = 0 + self.last_impossible_queued: dict | None = None + + # -- ORCH-126 (D4/FR-4): impossible-queued self-heal ------------------- + def sanitize_impossible_queued_once(self) -> int: + """Detect + self-heal "impossible" queued rows (queued while carrying stale + run-ownership run_id/pid/started_at — the incident state of job 2286). + + Idempotent, never-raise, gated by ``impossible_queued_sanitize_enabled`` + (default on; D1-D3 enforce the invariant going forward regardless). Bumps the + observability counters + logs one WARNING per healed row. Used both at startup + (``main.lifespan``) and on every reaper tick. Returns the count healed. + """ + if not getattr(settings, "impossible_queued_sanitize_enabled", True): + return 0 + try: + from .db import sanitize_impossible_queued + healed = sanitize_impossible_queued() + except Exception as e: # noqa: BLE001 - never break the tick / startup + logger.error("reaper: impossible-queued sanitize failed: %s", e) + return 0 + if healed: + self.impossible_queued_total += len(healed) + self.last_impossible_queued = { + "count": len(healed), + "jobs": [ + {"job_id": r.get("id"), "run_id": r.get("run_id"), + "pid": r.get("pid")} + for r in healed + ], + } + for r in healed: + logger.warning( + "reaper: sanitized impossible queued job %s (run_id=%s, pid=%s, " + "started_at=%s) -> clean queued (ORCH-126 invariant)", + r.get("id"), r.get("run_id"), r.get("pid"), r.get("started_at"), + ) + return len(healed) # -- A: zombie-job reaping -------------------------------------------- def reap_once(self) -> None: """One scan over all ``running`` jobs (per-job never-raise) + lease reclaim.""" + # ORCH-126 (D4/FR-4): self-heal impossible queued rows first (independent of + # reaper_enabled — own kill-switch, never-raise) so a stale run-ownership row + # is normalised before the running-scan reasons about liveness. + self.sanitize_impossible_queued_once() if settings.reaper_enabled: try: running = get_running_jobs() @@ -570,6 +615,13 @@ class JobReaper: "finalizer_liveness_enabled": settings.reaper_finalizer_liveness_enabled, "finalizer_defers_total": self.finalizer_defers_total, "finalizer_owned": _owned, + # ORCH-126 (D4/FR-4): impossible-queued self-heal observability + # (read-only) — kill-switch + cumulative healed count + last batch. + "impossible_queued_sanitize_enabled": getattr( + settings, "impossible_queued_sanitize_enabled", True + ), + "impossible_queued_total": self.impossible_queued_total, + "last_impossible_queued": self.last_impossible_queued, } diff --git a/src/main.py b/src/main.py index 5b0fb11..dd92d21 100644 --- a/src/main.py +++ b/src/main.py @@ -60,6 +60,23 @@ async def lifespan(app: FastAPI): if requeued: log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart") + # ORCH-126 (D4/FR-4): self-heal any "impossible" queued rows — a job left + # `status='queued'` while still carrying run-ownership (run_id/pid/started_at), + # which is physically impossible (the incident state of job 2286). Runs right + # after requeue_running_jobs so a row the dead worker left half-claimed is + # normalised before the worker/reaper start. Routed through the reaper so the + # cumulative /queue counter has a single owner. Idempotent + never-raise; the + # recurring reaper tick keeps it clean thereafter. + try: + from .job_reaper import reaper as _reaper + healed = _reaper.sanitize_impossible_queued_once() + if healed: + log.warning( + f"Queued-hygiene: sanitized {healed} impossible queued job(s) at startup" + ) + except Exception as e: + log.warning(f"Queued-hygiene sanitize skipped: {e}") + # ORCH-114 (adr-0045 / D7 / FR-4): clear durable transition-leases left by the # PREVIOUS process boot. This process has a fresh boot_id, so every prior lease is # stale by construction -> reclaim it so the just-requeued jobs can re-drive their diff --git a/tests/test_orch126_queued_stale_run.py b/tests/test_orch126_queued_stale_run.py new file mode 100644 index 0000000..5541a29 --- /dev/null +++ b/tests/test_orch126_queued_stale_run.py @@ -0,0 +1,317 @@ +"""ORCH-126: run-ownership hygiene of the `jobs` row — invariant +``status='queued' ⇒ run_id IS NULL AND pid IS NULL AND started_at IS NULL``. + +Covers FR-1…FR-5 / AC-1…AC-8 (TC-01..TC-10, see 04-test-plan.yaml). The defect: +no path that returns a job to ``queued`` reset its run-ownership (``run_id`` / +``pid``), so a requeued/restart-recovered job carried a stale (and possibly +OS-reused) pid. The job-reaper (ORCH-065) judges Tier-1 liveness by ``jobs.pid``, +so a reused pid made it guard a phantom ``running`` forever — at ``max_concurrency=1`` +this wedged the claim of EVERY project's queue (incident: job 2286 ``queued + +run_id=759/760 + pid=35/42 + started_at=NULL``). + +No network / no Claude CLI / no real Popen: jobs are seeded directly in an +isolated temp SQLite DB (the convention of test_orch114_transition_ownership.py). +TC-01 is the MANDATORY regression (red before the fix, green after). +""" +import os +import tempfile + +import pytest + +os.environ.setdefault("ORCH_REPOS_DIR", tempfile.gettempdir()) +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db +from src.db import ( + init_db, + get_db, + get_job, + claim_next_job, + mark_job, + mark_job_transient, + reap_running_job, + requeue_running_jobs, + find_impossible_queued_jobs, + sanitize_impossible_queued, +) +from src.job_reaper import JobReaper + +_REPO = "orchestrator" + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "orch126.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + init_db() + # Keep the scheduler gates out of the way of the claim tests (the bug is + # independent of serial-gate / dep-gate semantics — BRD §2 out-of-scope). + monkeypatch.setattr(db.settings, "serial_gate_enabled", False, raising=False) + monkeypatch.setattr(db.settings, "task_deps_enabled", False, raising=False) + yield + + +def _seed_job( + *, + status="queued", + run_id=None, + pid=None, + started_at=None, + finished_at=None, + agent="developer", + repo=_REPO, + attempts=0, + max_attempts=2, + transient_attempts=0, +): + """Insert a job row with arbitrary (possibly invalid) run-ownership columns.""" + conn = get_db() + cur = conn.execute( + "INSERT INTO jobs (agent, repo, status, run_id, pid, started_at, " + "finished_at, attempts, max_attempts, transient_attempts) " + "VALUES (?,?,?,?,?,?,?,?,?,?)", + (agent, repo, status, run_id, pid, started_at, finished_at, + attempts, max_attempts, transient_attempts), + ) + jid = cur.lastrowid + conn.commit() + conn.close() + return jid + + +def _row(job_id): + return get_job(job_id) + + +# --------------------------------------------------------------------------- # +# TC-01 — MANDATORY regression (red -> green): restart-recovery clears ownership +# --------------------------------------------------------------------------- # +def test_tc01_requeue_running_clears_stale_ownership(): + """A job left 'running' with stale run_id+pid+started_at (the incident state) + is restored to a CLEAN queued by requeue_running_jobs() — run_id/pid/started_at + all NULL. Red on the code BEFORE the fix (run_id/pid kept), green after (AC-1/AC-8). + """ + jid = _seed_job( + status="running", run_id=759, pid=35, started_at="2026-06-17 10:00:00", + ) + n = requeue_running_jobs() + assert n == 1 + r = _row(jid) + assert r["status"] == "queued" + assert r["run_id"] is None + assert r["pid"] is None + assert r["started_at"] is None + + +# --------------------------------------------------------------------------- # +# TC-02 — mark_job(..., 'queued') resets run_id and pid +# --------------------------------------------------------------------------- # +def test_tc02_mark_job_queued_resets_ownership(): + jid = _seed_job( + status="running", run_id=760, pid=42, started_at="2026-06-17 10:00:00", + ) + # A real caller (launcher._finalize_permanent) requeues WITH the old run_id — + # the invariant must win and drop it regardless. + mark_job(jid, "queued", run_id=760, error="exit_code=1") + r = _row(jid) + assert r["status"] == "queued" + assert r["run_id"] is None + assert r["pid"] is None + assert r["started_at"] is None + assert r["finished_at"] is None + assert r["error"] == "exit_code=1" + + +# --------------------------------------------------------------------------- # +# TC-03 — mark_job_transient resets ownership but keeps transient bookkeeping +# --------------------------------------------------------------------------- # +def test_tc03_mark_job_transient_resets_ownership_keeps_backoff(): + jid = _seed_job( + status="running", run_id=761, pid=50, started_at="2026-06-17 10:00:00", + transient_attempts=1, + ) + mark_job_transient(jid, 30, error="overloaded") + r = _row(jid) + assert r["status"] == "queued" + assert r["run_id"] is None + assert r["pid"] is None + assert r["started_at"] is None + # Transient bookkeeping preserved. + assert r["transient_attempts"] == 2 + assert r["available_at"] is not None + assert r["error"] == "overloaded" + + +# --------------------------------------------------------------------------- # +# TC-04 — reap_running_job('queued') resets ownership; atomic guard preserved +# --------------------------------------------------------------------------- # +def test_tc04_reap_running_job_queued_resets_and_guard_holds(): + jid = _seed_job( + status="running", run_id=762, pid=60, started_at="2026-06-17 10:00:00", + ) + won = reap_running_job(jid, "queued", run_id=762, error="dead pid") + assert won is True + r = _row(jid) + assert r["status"] == "queued" + assert r["run_id"] is None + assert r["pid"] is None + assert r["started_at"] is None + # Atomic WHERE status='running' guard: a second call on the now-queued row + # must lose (rowcount 0) — restart-safe / race-safe (TR-4). + won2 = reap_running_job(jid, "queued", error="again") + assert won2 is False + + +# --------------------------------------------------------------------------- # +# TC-05 — claim_next_job leaves no stale pid (defense-in-depth, AC-3) +# --------------------------------------------------------------------------- # +def test_tc05_claim_clears_stale_pid_before_spawn(): + """A queued job carrying a stale pid/run_id (impossible state) is claimed into + 'running' with pid/run_id reset to NULL — BEFORE the launcher stamps the real + pid in _spawn. Red before the fix (claim left the stale pid), green after.""" + jid = _seed_job(status="queued", run_id=900, pid=999999) + claimed = claim_next_job() + assert claimed is not None + assert claimed["id"] == jid + assert claimed["status"] == "running" + assert claimed["pid"] is None + assert claimed["run_id"] is None + r = _row(jid) + assert r["status"] == "running" + assert r["pid"] is None + assert r["run_id"] is None + assert r["started_at"] is not None # claim still stamps started_at + + +# --------------------------------------------------------------------------- # +# TC-06 — claim works (no starvation) with serial-gate disabled (AC-2) +# --------------------------------------------------------------------------- # +def test_tc06_claim_starts_with_serial_gate_off(monkeypatch): + monkeypatch.setattr(db.settings, "serial_gate_enabled", False, raising=False) + jid = _seed_job(status="queued", agent="analyst") + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == jid + assert claimed["status"] == "running" + # Queue did not hang: nothing left queued, exactly one running. + counts = db.job_status_counts() + assert counts["queued"] == 0 + assert counts["running"] == 1 + + +# --------------------------------------------------------------------------- # +# TC-07 — reaper does not reap a freshly-claimed running job with pid IS NULL (AC-4) +# --------------------------------------------------------------------------- # +def test_tc07_reaper_skips_pid_null_fresh_running(monkeypatch): + monkeypatch.setattr(db.settings, "lease_reclaim_enabled", False, raising=False) + monkeypatch.setattr(db.settings, "reaper_enabled", True, raising=False) + # Fresh running job: pid NULL (claim reset it, _spawn not yet stamped), + # started_at = now -> small age, no agent_runs row -> exit_code NULL. + jid = _seed_job(status="running", pid=None, run_id=None, + started_at=None) + conn = get_db() + conn.execute("UPDATE jobs SET started_at = datetime('now') WHERE id=?", (jid,)) + conn.commit() + conn.close() + reaper = JobReaper() + reaper.reap_once() + r = _row(jid) + # Tier-1 skips pid IS NULL; Tier-3 backstop not reached -> still running. + assert r["status"] == "running" + + +# --------------------------------------------------------------------------- # +# TC-08 — detect + self-heal the impossible queued state (idempotent, AC-5) +# --------------------------------------------------------------------------- # +def test_tc08_sanitize_impossible_queued_idempotent(): + jid = _seed_job( + status="queued", run_id=759, pid=35, started_at="2026-06-17 10:00:00", + ) + # Detection sees the anomaly. + found = find_impossible_queued_jobs() + assert any(f["id"] == jid for f in found) + # First sanitize heals it and reports it. + healed = sanitize_impossible_queued() + assert len(healed) == 1 and healed[0]["id"] == jid + r = _row(jid) + assert r["status"] == "queued" + assert r["run_id"] is None and r["pid"] is None and r["started_at"] is None + # Idempotent: nothing left to heal on a clean DB. + assert find_impossible_queued_jobs() == [] + assert sanitize_impossible_queued() == [] + + +def test_tc08b_reaper_sanitize_counter_and_status(monkeypatch): + """The reaper's sanitize pass bumps an observability counter exposed in /queue + and never raises; gated by the kill-switch.""" + monkeypatch.setattr(db.settings, "lease_reclaim_enabled", False, raising=False) + monkeypatch.setattr( + db.settings, "impossible_queued_sanitize_enabled", True, raising=False + ) + _seed_job(status="queued", run_id=10, pid=11, started_at="2026-06-17 10:00:00") + reaper = JobReaper() + n = reaper.sanitize_impossible_queued_once() + assert n == 1 + assert reaper.impossible_queued_total == 1 + st = reaper.status() + assert st["impossible_queued_total"] == 1 + assert st["last_impossible_queued"]["count"] == 1 + # Kill-switch off -> no-op even with an anomaly present. + _seed_job(status="queued", run_id=12, pid=13, started_at="2026-06-17 10:00:00") + monkeypatch.setattr( + db.settings, "impossible_queued_sanitize_enabled", False, raising=False + ) + assert reaper.sanitize_impossible_queued_once() == 0 + + +# --------------------------------------------------------------------------- # +# TC-09 — _spawn window: a launch failure before the pid stamp requeues cleanly +# --------------------------------------------------------------------------- # +def test_tc09_spawn_failure_requeues_clean_and_reclaimable(monkeypatch): + """Simulate the queue worker's launch-failure handler: claim flips queued->running + (pid reset to NULL by D2), launch raises before stamping pid, the handler requeues + via mark_job('queued') (D1) -> the row is clean and immediately re-claimable (AC-6). + """ + jid = _seed_job(status="queued", run_id=900, pid=999999, attempts=0, max_attempts=2) + claimed = claim_next_job() + assert claimed["id"] == jid and claimed["pid"] is None # D2 already cleaned + + # _spawn fails before stamping pid -> the queue worker marks it back to queued. + mark_job(jid, "queued", error="launch error: ensure_worktree failed") + r = _row(jid) + assert r["status"] == "queued" + assert r["pid"] is None and r["run_id"] is None and r["started_at"] is None + + # Re-claim starts normally (no "partially started" wedge). + again = claim_next_job() + assert again is not None and again["id"] == jid + assert again["status"] == "running" and again["pid"] is None + + +# --------------------------------------------------------------------------- # +# TC-10 — anti-regression: a healthy job's terminal outcomes are untouched +# --------------------------------------------------------------------------- # +def test_tc10_healthy_job_terminal_outcomes_unchanged(): + # done: run_id link kept, finished_at stamped, NO ownership reset. + jid = _seed_job(status="running", run_id=500, pid=12345, + started_at="2026-06-17 10:00:00") + mark_job(jid, "done", run_id=500) + r = _row(jid) + assert r["status"] == "done" + assert r["run_id"] == 500 # link preserved for done + assert r["finished_at"] is not None + assert r["started_at"] == "2026-06-17 10:00:00" # not cleared for terminal + + # failed: same — run_id kept, finished_at stamped. + jid2 = _seed_job(status="running", run_id=501, pid=222, + started_at="2026-06-17 10:00:00") + mark_job(jid2, "failed", run_id=501, error="boom") + r2 = _row(jid2) + assert r2["status"] == "failed" + assert r2["run_id"] == 501 + assert r2["finished_at"] is not None + assert r2["error"] == "boom" + + # A never-started healthy queued job is NOT flagged as impossible. + healthy = _seed_job(status="queued") + assert all(f["id"] != healthy for f in find_impossible_queued_jobs())