Files
orchestrator/tests/test_orch126_queued_stale_run.py
claude-bot d7e7a4d817
All checks were successful
CI / test (push) Successful in 1m14s
CI / test (pull_request) Successful in 1m15s
fix(queue): enforce queued ⇒ no run-ownership invariant (ORCH-126)
Queued analyst-jobs hung forever even with ORCH_SERIAL_GATE_ENABLED=false
(incident ORCH-124/125, job 2286: queued + run_id=759/760 + pid=35/42 +
started_at=NULL — physically impossible). No path returning a job to
'queued' reset its run-ownership (run_id/pid); after a container restart a
reused pid made pid_alive(stale)=True, so the job-reaper Tier-1 saw a phantom
'running' and at max_concurrency=1 wedged the claim of the whole shared queue.

Enforce the invariant `status='queued' ⇒ run_id IS NULL AND pid IS NULL AND
started_at IS NULL` on existing columns (no schema change):

- D1 forward-cleanup: requeue_running_jobs / mark_job('queued') /
  mark_job_transient / reap_running_job('queued') reset run_id=NULL, pid=NULL
  in the same UPDATE that clears started_at; atomic status-guards preserved.
- D2 clean claim: claim_next_job resets pid/run_id on the queued->running flip
  (defense-in-depth) so the row carries pid IS NULL until _spawn stamps it.
- D4 self-heal + observability: db.find_impossible_queued_jobs /
  sanitize_impossible_queued run at startup (main.lifespan) and on each reaper
  tick (JobReaper.sanitize_impossible_queued_once, never-raise); counter
  impossible_queued_total in the GET /queue reaper block. Kill-switch
  ORCH_IMPOSSIBLE_QUEUED_SANITIZE_ENABLED (default on; gates only the D4 sweep).
- D5: reaper Tier-1 unchanged — the fix restores its precondition (pid reflects
  THIS run). Marked invariants ORCH-065/113/114/099 preserved.

Tests: tests/test_orch126_queued_stale_run.py (TC-01 mandatory regression
red->green; TC-02..TC-10). Full pytest tests/ -q green (2189 passed).
Docs: internals.md (run-ownership invariant section), .env.example, CHANGELOG;
cross-cutting adr-0052.

Refs: ORCH-126
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-17 11:39:26 +03:00

318 lines
13 KiB
Python

"""ORCH-126: run-ownership hygiene of the `jobs` row — invariant
``status='queued' ⇒ run_id IS NULL AND pid IS NULL AND started_at IS NULL``.
Covers FR-1…FR-5 / AC-1…AC-8 (TC-01..TC-10, see 04-test-plan.yaml). The defect:
no path that returns a job to ``queued`` reset its run-ownership (``run_id`` /
``pid``), so a requeued/restart-recovered job carried a stale (and possibly
OS-reused) pid. The job-reaper (ORCH-065) judges Tier-1 liveness by ``jobs.pid``,
so a reused pid made it guard a phantom ``running`` forever — at ``max_concurrency=1``
this wedged the claim of EVERY project's queue (incident: job 2286 ``queued +
run_id=759/760 + pid=35/42 + started_at=NULL``).
No network / no Claude CLI / no real Popen: jobs are seeded directly in an
isolated temp SQLite DB (the convention of test_orch114_transition_ownership.py).
TC-01 is the MANDATORY regression (red before the fix, green after).
"""
import os
import tempfile
import pytest
os.environ.setdefault("ORCH_REPOS_DIR", tempfile.gettempdir())
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db
from src.db import (
init_db,
get_db,
get_job,
claim_next_job,
mark_job,
mark_job_transient,
reap_running_job,
requeue_running_jobs,
find_impossible_queued_jobs,
sanitize_impossible_queued,
)
from src.job_reaper import JobReaper
_REPO = "orchestrator"
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "orch126.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
init_db()
# Keep the scheduler gates out of the way of the claim tests (the bug is
# independent of serial-gate / dep-gate semantics — BRD §2 out-of-scope).
monkeypatch.setattr(db.settings, "serial_gate_enabled", False, raising=False)
monkeypatch.setattr(db.settings, "task_deps_enabled", False, raising=False)
yield
def _seed_job(
*,
status="queued",
run_id=None,
pid=None,
started_at=None,
finished_at=None,
agent="developer",
repo=_REPO,
attempts=0,
max_attempts=2,
transient_attempts=0,
):
"""Insert a job row with arbitrary (possibly invalid) run-ownership columns."""
conn = get_db()
cur = conn.execute(
"INSERT INTO jobs (agent, repo, status, run_id, pid, started_at, "
"finished_at, attempts, max_attempts, transient_attempts) "
"VALUES (?,?,?,?,?,?,?,?,?,?)",
(agent, repo, status, run_id, pid, started_at, finished_at,
attempts, max_attempts, transient_attempts),
)
jid = cur.lastrowid
conn.commit()
conn.close()
return jid
def _row(job_id):
return get_job(job_id)
# --------------------------------------------------------------------------- #
# TC-01 — MANDATORY regression (red -> green): restart-recovery clears ownership
# --------------------------------------------------------------------------- #
def test_tc01_requeue_running_clears_stale_ownership():
"""A job left 'running' with stale run_id+pid+started_at (the incident state)
is restored to a CLEAN queued by requeue_running_jobs() — run_id/pid/started_at
all NULL. Red on the code BEFORE the fix (run_id/pid kept), green after (AC-1/AC-8).
"""
jid = _seed_job(
status="running", run_id=759, pid=35, started_at="2026-06-17 10:00:00",
)
n = requeue_running_jobs()
assert n == 1
r = _row(jid)
assert r["status"] == "queued"
assert r["run_id"] is None
assert r["pid"] is None
assert r["started_at"] is None
# --------------------------------------------------------------------------- #
# TC-02 — mark_job(..., 'queued') resets run_id and pid
# --------------------------------------------------------------------------- #
def test_tc02_mark_job_queued_resets_ownership():
jid = _seed_job(
status="running", run_id=760, pid=42, started_at="2026-06-17 10:00:00",
)
# A real caller (launcher._finalize_permanent) requeues WITH the old run_id —
# the invariant must win and drop it regardless.
mark_job(jid, "queued", run_id=760, error="exit_code=1")
r = _row(jid)
assert r["status"] == "queued"
assert r["run_id"] is None
assert r["pid"] is None
assert r["started_at"] is None
assert r["finished_at"] is None
assert r["error"] == "exit_code=1"
# --------------------------------------------------------------------------- #
# TC-03 — mark_job_transient resets ownership but keeps transient bookkeeping
# --------------------------------------------------------------------------- #
def test_tc03_mark_job_transient_resets_ownership_keeps_backoff():
jid = _seed_job(
status="running", run_id=761, pid=50, started_at="2026-06-17 10:00:00",
transient_attempts=1,
)
mark_job_transient(jid, 30, error="overloaded")
r = _row(jid)
assert r["status"] == "queued"
assert r["run_id"] is None
assert r["pid"] is None
assert r["started_at"] is None
# Transient bookkeeping preserved.
assert r["transient_attempts"] == 2
assert r["available_at"] is not None
assert r["error"] == "overloaded"
# --------------------------------------------------------------------------- #
# TC-04 — reap_running_job('queued') resets ownership; atomic guard preserved
# --------------------------------------------------------------------------- #
def test_tc04_reap_running_job_queued_resets_and_guard_holds():
jid = _seed_job(
status="running", run_id=762, pid=60, started_at="2026-06-17 10:00:00",
)
won = reap_running_job(jid, "queued", run_id=762, error="dead pid")
assert won is True
r = _row(jid)
assert r["status"] == "queued"
assert r["run_id"] is None
assert r["pid"] is None
assert r["started_at"] is None
# Atomic WHERE status='running' guard: a second call on the now-queued row
# must lose (rowcount 0) — restart-safe / race-safe (TR-4).
won2 = reap_running_job(jid, "queued", error="again")
assert won2 is False
# --------------------------------------------------------------------------- #
# TC-05 — claim_next_job leaves no stale pid (defense-in-depth, AC-3)
# --------------------------------------------------------------------------- #
def test_tc05_claim_clears_stale_pid_before_spawn():
"""A queued job carrying a stale pid/run_id (impossible state) is claimed into
'running' with pid/run_id reset to NULL — BEFORE the launcher stamps the real
pid in _spawn. Red before the fix (claim left the stale pid), green after."""
jid = _seed_job(status="queued", run_id=900, pid=999999)
claimed = claim_next_job()
assert claimed is not None
assert claimed["id"] == jid
assert claimed["status"] == "running"
assert claimed["pid"] is None
assert claimed["run_id"] is None
r = _row(jid)
assert r["status"] == "running"
assert r["pid"] is None
assert r["run_id"] is None
assert r["started_at"] is not None # claim still stamps started_at
# --------------------------------------------------------------------------- #
# TC-06 — claim works (no starvation) with serial-gate disabled (AC-2)
# --------------------------------------------------------------------------- #
def test_tc06_claim_starts_with_serial_gate_off(monkeypatch):
monkeypatch.setattr(db.settings, "serial_gate_enabled", False, raising=False)
jid = _seed_job(status="queued", agent="analyst")
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == jid
assert claimed["status"] == "running"
# Queue did not hang: nothing left queued, exactly one running.
counts = db.job_status_counts()
assert counts["queued"] == 0
assert counts["running"] == 1
# --------------------------------------------------------------------------- #
# TC-07 — reaper does not reap a freshly-claimed running job with pid IS NULL (AC-4)
# --------------------------------------------------------------------------- #
def test_tc07_reaper_skips_pid_null_fresh_running(monkeypatch):
monkeypatch.setattr(db.settings, "lease_reclaim_enabled", False, raising=False)
monkeypatch.setattr(db.settings, "reaper_enabled", True, raising=False)
# Fresh running job: pid NULL (claim reset it, _spawn not yet stamped),
# started_at = now -> small age, no agent_runs row -> exit_code NULL.
jid = _seed_job(status="running", pid=None, run_id=None,
started_at=None)
conn = get_db()
conn.execute("UPDATE jobs SET started_at = datetime('now') WHERE id=?", (jid,))
conn.commit()
conn.close()
reaper = JobReaper()
reaper.reap_once()
r = _row(jid)
# Tier-1 skips pid IS NULL; Tier-3 backstop not reached -> still running.
assert r["status"] == "running"
# --------------------------------------------------------------------------- #
# TC-08 — detect + self-heal the impossible queued state (idempotent, AC-5)
# --------------------------------------------------------------------------- #
def test_tc08_sanitize_impossible_queued_idempotent():
jid = _seed_job(
status="queued", run_id=759, pid=35, started_at="2026-06-17 10:00:00",
)
# Detection sees the anomaly.
found = find_impossible_queued_jobs()
assert any(f["id"] == jid for f in found)
# First sanitize heals it and reports it.
healed = sanitize_impossible_queued()
assert len(healed) == 1 and healed[0]["id"] == jid
r = _row(jid)
assert r["status"] == "queued"
assert r["run_id"] is None and r["pid"] is None and r["started_at"] is None
# Idempotent: nothing left to heal on a clean DB.
assert find_impossible_queued_jobs() == []
assert sanitize_impossible_queued() == []
def test_tc08b_reaper_sanitize_counter_and_status(monkeypatch):
"""The reaper's sanitize pass bumps an observability counter exposed in /queue
and never raises; gated by the kill-switch."""
monkeypatch.setattr(db.settings, "lease_reclaim_enabled", False, raising=False)
monkeypatch.setattr(
db.settings, "impossible_queued_sanitize_enabled", True, raising=False
)
_seed_job(status="queued", run_id=10, pid=11, started_at="2026-06-17 10:00:00")
reaper = JobReaper()
n = reaper.sanitize_impossible_queued_once()
assert n == 1
assert reaper.impossible_queued_total == 1
st = reaper.status()
assert st["impossible_queued_total"] == 1
assert st["last_impossible_queued"]["count"] == 1
# Kill-switch off -> no-op even with an anomaly present.
_seed_job(status="queued", run_id=12, pid=13, started_at="2026-06-17 10:00:00")
monkeypatch.setattr(
db.settings, "impossible_queued_sanitize_enabled", False, raising=False
)
assert reaper.sanitize_impossible_queued_once() == 0
# --------------------------------------------------------------------------- #
# TC-09 — _spawn window: a launch failure before the pid stamp requeues cleanly
# --------------------------------------------------------------------------- #
def test_tc09_spawn_failure_requeues_clean_and_reclaimable(monkeypatch):
"""Simulate the queue worker's launch-failure handler: claim flips queued->running
(pid reset to NULL by D2), launch raises before stamping pid, the handler requeues
via mark_job('queued') (D1) -> the row is clean and immediately re-claimable (AC-6).
"""
jid = _seed_job(status="queued", run_id=900, pid=999999, attempts=0, max_attempts=2)
claimed = claim_next_job()
assert claimed["id"] == jid and claimed["pid"] is None # D2 already cleaned
# _spawn fails before stamping pid -> the queue worker marks it back to queued.
mark_job(jid, "queued", error="launch error: ensure_worktree failed")
r = _row(jid)
assert r["status"] == "queued"
assert r["pid"] is None and r["run_id"] is None and r["started_at"] is None
# Re-claim starts normally (no "partially started" wedge).
again = claim_next_job()
assert again is not None and again["id"] == jid
assert again["status"] == "running" and again["pid"] is None
# --------------------------------------------------------------------------- #
# TC-10 — anti-regression: a healthy job's terminal outcomes are untouched
# --------------------------------------------------------------------------- #
def test_tc10_healthy_job_terminal_outcomes_unchanged():
# done: run_id link kept, finished_at stamped, NO ownership reset.
jid = _seed_job(status="running", run_id=500, pid=12345,
started_at="2026-06-17 10:00:00")
mark_job(jid, "done", run_id=500)
r = _row(jid)
assert r["status"] == "done"
assert r["run_id"] == 500 # link preserved for done
assert r["finished_at"] is not None
assert r["started_at"] == "2026-06-17 10:00:00" # not cleared for terminal
# failed: same — run_id kept, finished_at stamped.
jid2 = _seed_job(status="running", run_id=501, pid=222,
started_at="2026-06-17 10:00:00")
mark_job(jid2, "failed", run_id=501, error="boom")
r2 = _row(jid2)
assert r2["status"] == "failed"
assert r2["run_id"] == 501
assert r2["finished_at"] is not None
assert r2["error"] == "boom"
# A never-started healthy queued job is NOT flagged as impossible.
healthy = _seed_job(status="queued")
assert all(f["id"] != healthy for f in find_impossible_queued_jobs())