feat(reaper): job-reaper + stale merge-lease reclaim + idempotent merge finalization

Closes the "zombie jobs" incident class: job status was set only inside
the live launcher process, so a process death left jobs.status='running'
forever; at max_concurrency=1 one zombie blocked ALL projects' queue
(self-hosting risk). Adds a background daemon (src/job_reaper.py) with
three-tier liveness (dead-pid streak / known exit_code / max-running
backstop) whose only mutating write is an atomic terminal flip guarded by
WHERE status='running' (no double-process). For exit0 the canonical QG is
the source of truth via gate-driven advance, not "exit0".

Also proactively reclaims stale merge-lease (dead pid OR TTL) via file
delete only (no git ops), and makes merge finalization idempotent
(pr_already_merged guard + up-to-date short-circuit on re-drive).

New jobs.pid column via idempotent _ensure_column (no migration); pid
stamped in launcher._spawn after Popen. Reaper start/stop in lifespan;
"reaper" snapshot in GET /queue. Kill-switches: ORCH_REAPER_ENABLED,
ORCH_REAPER_INTERVAL_S, ORCH_REAPER_DEAD_TICKS, ORCH_REAPER_MAX_RUNNING_S,
ORCH_LEASE_RECLAIM_ENABLED.

Invariants unchanged (AC-13): STAGE_TRANSITIONS, QG_CHECKS registry,
check_branch_mergeable signature/behaviour, BUG-8 rollback, hook exit
codes. restart-safe, never-raise per unit of background work.

Docs: docs/architecture/README.md, CHANGELOG.md, .env.example.
Tests: tests/test_job_reaper.py, tests/test_merge_lease_reclaim.py,
tests/test_merge_gate.py (TC-16), tests/test_merge_gate_race.py (TC-17),
tests/test_queue.py, tests/test_config.py (TC-19/TC-20). 742 passed.

Refs: ORCH-065

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-07 15:31:37 +00:00
committed by Dev Agent
parent 9f846b5a50
commit 4bebb921ff
15 changed files with 1341 additions and 5 deletions

View File

@@ -117,6 +117,28 @@ ORCH_RECONCILE_GRACE_OVERRIDES_JSON=
ORCH_RECONCILE_NOTIFY_UNBLOCK=true
ORCH_RECONCILE_SKIP_BLOCKED_ENABLED=true
# ORCH-065: job-reaper + proactive merge-lease reclaim. A background daemon thread
# (src/job_reaper.py, started LAST in main.lifespan after requeue_running_jobs) reaps
# zombie 'running' jobs whose monitor/process died before writing the terminal status
# (one zombie at max_concurrency=1 blocks the whole shared queue) and periodically
# reclaims dead/stale merge-leases. Liveness is three-tier: Tier-1 dead jobs.pid
# (os.kill(pid,0)) after REAPER_DEAD_TICKS consecutive dead ticks (anti-false-positive
# for a live agent); Tier-2 agent_runs.exit_code recorded but job still 'running';
# Tier-3 backstop after REAPER_MAX_RUNNING_S. The terminal flip carries an atomic
# status='running' guard so it never double-processes a row racing requeue_running_jobs.
# REAPER_ENABLED -> global kill-switch (false -> strictly prior behaviour).
# REAPER_INTERVAL_S -> background scan period (seconds).
# REAPER_DEAD_TICKS -> consecutive dead-pid ticks before reaping (Tier-1, >=2).
# REAPER_MAX_RUNNING_S -> Tier-3 backstop ceiling; must exceed max agent_timeout+grace.
# LEASE_RECLAIM_ENABLED -> kill-switch for the proactive stale/dead lease reclaim
# (false -> only the legacy lazy TTL reclaim in acquire_merge_lease).
# (reuse) ORCH_MERGE_LOCK_TIMEOUT_S -> lease TTL; ORCH_MERGE_GATE_REPOS -> reclaim scope.
ORCH_REAPER_ENABLED=true
ORCH_REAPER_INTERVAL_S=60
ORCH_REAPER_DEAD_TICKS=2
ORCH_REAPER_MAX_RUNNING_S=3600
ORCH_LEASE_RECLAIM_ENABLED=true
# ORCH-021: post-deploy production monitoring + degradation reaction. After the
# terminal deploy->done transition for an applicable repo, a reserved-agent job
# `post-deploy-monitor` (no LLM, modelled on deploy-finalizer) probes prod over a

File diff suppressed because one or more lines are too long

View File

@@ -295,4 +295,4 @@ ORCH-065 вводит фоновый watchdog, чтобы смерть проц
Схема БД, потоки данных, resilience-слой, детали Dockerfile — [internals.md](internals.md).
---
*Актуально на 2026-06-07. Обновлять при изменении src/stages.py, src/qg/checks.py, src/main.py. Статусы доработок: ORCH-036 (исполняемый самодеплой `deploy`, adr-0007) — реализовано; ORCH-043 (merge-gate, adr-0006) — design, ветка feature/ORCH-043; ORCH-053 (reconciler, adr-0007, src/reconciler.py) — реализовано; ORCH-060 (F-1 skip escalated/Blocked/Needs-Input, `docs/work-items/ORCH-060/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-060 (Guard 1 `developer_retry_count>=MAX_DEVELOPER_RETRIES` + Guard 2 `plane_sync.fetch_issue_state` Blocked/Needs-Input, флаг `ORCH_RECONCILE_SKIP_BLOCKED_ENABLED`); ORCH-058 (провенанс staging-образа: check_staging_image_fresh + staging_check свежего образа + хук-guard, adr-0008) — реализовано в ветке feature/ORCH-058 (обновлять также при изменении src/image_freshness.py, scripts/orchestrator-deploy-hook.sh, Dockerfile); ORCH-061 (толерантность staging-вердикта к инфра-FAIL C9a/C9b, adr-0009, `docs/work-items/ORCH-061/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-061 (обновлять также при изменении src/staging_verdict.py, scripts/staging_check.py, флаг staging_infra_tolerance_enabled); ORCH-021 (post-deploy наблюдение прода + реакция на деградацию, adr-0010, `docs/work-items/ORCH-021/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-021-post-deploy-rollback (reserved-agent job `post-deploy-monitor`: арм в src/stage_engine.py блок `next_stage == "done"`, тик `run_post_deploy_monitor` + перехват в src/agents/launcher.py ДО _spawn; чистая логика src/post_deploy.py never-raise; флаги `post_deploy_*` в src/config.py; блок `post_deploy` в `/queue`; артефакт 16-post-deploy-log.md; self-hosting всегда ALERT_ONLY — тик не рестартит прод; обновлять также при изменении src/post_deploy.py / арм-блока / launcher-перехвата); ORCH-065 (job-reaper + проактивный реклейм merge-lease + идемпотентная финализация merge, adr-0011, `docs/work-items/ORCH-065/06-adr/ADR-001`) — design, ветка feature/ORCH-065 (новый daemon-поток src/job_reaper.py + старт/стоп в src/main.py lifespan; колонка `jobs.pid` через _ensure_column + проставление в src/agents/launcher.py `_spawn`; функции реклейма lease `pid_alive`/`reclaim_stale_lease` + guard `pr_already_merged` в src/merge_gate.py; флаги `reaper_*`/`lease_reclaim_*` в src/config.py; блок `reaper` в `/queue`; обновлять также при изменении этих мест).*
*Актуально на 2026-06-07. Обновлять при изменении src/stages.py, src/qg/checks.py, src/main.py. Статусы доработок: ORCH-036 (исполняемый самодеплой `deploy`, adr-0007) — реализовано; ORCH-043 (merge-gate, adr-0006) — design, ветка feature/ORCH-043; ORCH-053 (reconciler, adr-0007, src/reconciler.py) — реализовано; ORCH-060 (F-1 skip escalated/Blocked/Needs-Input, `docs/work-items/ORCH-060/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-060 (Guard 1 `developer_retry_count>=MAX_DEVELOPER_RETRIES` + Guard 2 `plane_sync.fetch_issue_state` Blocked/Needs-Input, флаг `ORCH_RECONCILE_SKIP_BLOCKED_ENABLED`); ORCH-058 (провенанс staging-образа: check_staging_image_fresh + staging_check свежего образа + хук-guard, adr-0008) — реализовано в ветке feature/ORCH-058 (обновлять также при изменении src/image_freshness.py, scripts/orchestrator-deploy-hook.sh, Dockerfile); ORCH-061 (толерантность staging-вердикта к инфра-FAIL C9a/C9b, adr-0009, `docs/work-items/ORCH-061/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-061 (обновлять также при изменении src/staging_verdict.py, scripts/staging_check.py, флаг staging_infra_tolerance_enabled); ORCH-021 (post-deploy наблюдение прода + реакция на деградацию, adr-0010, `docs/work-items/ORCH-021/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-021-post-deploy-rollback (reserved-agent job `post-deploy-monitor`: арм в src/stage_engine.py блок `next_stage == "done"`, тик `run_post_deploy_monitor` + перехват в src/agents/launcher.py ДО _spawn; чистая логика src/post_deploy.py never-raise; флаги `post_deploy_*` в src/config.py; блок `post_deploy` в `/queue`; артефакт 16-post-deploy-log.md; self-hosting всегда ALERT_ONLY — тик не рестартит прод; обновлять также при изменении src/post_deploy.py / арм-блока / launcher-перехвата); ORCH-065 (job-reaper + проактивный реклейм merge-lease + идемпотентная финализация merge, adr-0011, `docs/work-items/ORCH-065/06-adr/ADR-001`) — реализовано в ветке feature/ORCH-065 (новый daemon-поток src/job_reaper.py + старт/стоп в src/main.py lifespan; колонка `jobs.pid` через _ensure_column + проставление в src/agents/launcher.py `_spawn`; функции реклейма lease `pid_alive`/`reclaim_stale_lease` + guard `pr_already_merged` в src/merge_gate.py; флаги `reaper_*`/`lease_reclaim_*` в src/config.py; блок `reaper` в `/queue`; обновлять также при изменении этих мест).*

View File

@@ -417,6 +417,14 @@ class AgentLauncher:
"UPDATE agent_runs SET output_path = ? WHERE id = ?",
(output_path, run_id),
)
# ORCH-065: stamp the agent process pid onto the job row so the job-reaper
# can probe liveness (os.kill(pid, 0)). proc.pid only exists after Popen,
# so this is a second UPDATE next to run_id/started_at (set above in _spawn).
if job_id is not None:
conn.execute(
"UPDATE jobs SET pid = ? WHERE id = ?",
(proc.pid, job_id),
)
conn.commit()
conn.close()

View File

@@ -296,6 +296,33 @@ class Settings(BaseSettings):
post_deploy_auto_rollback: bool = False
post_deploy_base_url: str = "http://localhost:8500"
# ORCH-065: job-reaper + proactive merge-lease reclaim. A background daemon
# thread (modelled on the reconciler) makes "the monitor thread / process died
# while a job/lease was held" self-heal WITHOUT a restart. Status (done/queued/
# failed) is otherwise only ever set by launcher._monitor_agent -> _finalize_job
# inside the live process; a death there left the jobs row 'running' forever and
# (at max_concurrency=1) wedged the queue of EVERY project (incidents 07.06: jobs
# 236/239/242/254). The same thread proactively reclaims a stale/dead merge-lease
# (ORCH-043) instead of waiting for the lazy TTL on the next foreign acquire. See
# docs/architecture/adr/adr-0011-job-reaper-lease-reclaim.md.
# reaper_enabled -> global kill-switch (false -> strictly prior behaviour;
# only the startup requeue_running_jobs remains).
# reaper_interval_s -> background scan period (seconds).
# reaper_dead_ticks -> Tier-1: consecutive ticks a job's pid must be dead
# before it is reaped (>=2 anti-false-positive; a live
# long-running agent is NEVER reaped).
# reaper_max_running_s -> Tier-3 backstop ceiling: a job 'running' longer than
# this is reaped even when liveness is unknowable. MUST be
# > max agent_timeout + grace so a legit agent is safe.
# lease_reclaim_enabled -> kill-switch for the proactive stale/dead lease reclaim
# (false -> only the legacy lazy TTL reclaim in acquire).
# (reuse) merge_lock_timeout_s -> lease TTL; merge_gate_repos -> reclaim scope.
reaper_enabled: bool = True
reaper_interval_s: int = 60
reaper_dead_ticks: int = 2
reaper_max_running_s: int = 3600
lease_reclaim_enabled: bool = True
# Telegram notifications
telegram_bot_token: str = ""
telegram_chat_id: str = ""

View File

@@ -76,6 +76,11 @@ def init_db():
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
_ensure_column(conn, "jobs", "available_at", "TEXT")
# ORCH-065: pid of the spawned agent process, stamped in launcher._spawn next to
# run_id/started_at. The job-reaper uses it for Tier-1 liveness (os.kill(pid, 0))
# to detect a 'running' job whose process died before _finalize_job. Idempotent
# ALTER (no-op once present) -> safe on the live prod DB.
_ensure_column(conn, "jobs", "pid", "INTEGER")
# ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL
# unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows
# (which have NULL delivery_id) never collide with each other. Restart-safe:
@@ -593,6 +598,76 @@ def requeue_running_jobs() -> int:
return int(n)
def get_running_jobs() -> list[dict]:
"""ORCH-065: snapshot of every 'running' job for the job-reaper scan.
Each row carries the job columns plus three reaper inputs:
* ``running_age_s`` — seconds since ``started_at`` (Tier-3 backstop);
* ``exit_code`` — the linked ``agent_runs.exit_code`` (Tier-2: process
finished but the job is still 'running' -> monitor died mid-finalize);
* ``finished_at_run`` — the linked ``agent_runs.finished_at`` (debug only).
A LEFT JOIN on ``run_id`` keeps jobs with no agent_runs row (exit_code NULL).
Read-only; never mutates. The reaper applies liveness/streak/backstop on top.
"""
conn = get_db()
try:
rows = conn.execute(
"SELECT j.*, "
"CAST(strftime('%s','now') - strftime('%s', j.started_at) AS INTEGER) "
" AS running_age_s, "
"r.exit_code AS exit_code, r.finished_at AS finished_at_run "
"FROM jobs j LEFT JOIN agent_runs r ON r.id = j.run_id "
"WHERE j.status='running'"
).fetchall()
finally:
conn.close()
return [dict(r) for r in rows]
def reap_running_job(
job_id: int,
status: str,
run_id: int | None = None,
error: str | None = None,
) -> bool:
"""ORCH-065: atomic terminal flip of a RUNNING job by the job-reaper.
Mirrors ``mark_job`` but carries the ``status='running'`` guard in the WHERE
clause and reports ``rowcount`` so a late-arriving monitor / the startup
``requeue_running_jobs`` / a second reaper tick can never double-process the
same row (AC-5, restart-safe). Returns True iff THIS call won the flip
(rowcount == 1); False -> someone else already moved the row.
Status semantics match ``mark_job``: done/failed stamp ``finished_at``; queued
clears ``started_at``/``finished_at`` so the next claim treats it as fresh.
"""
conn = get_db()
try:
sets = ["status = ?"]
params: list = [status]
if run_id is not None:
sets.append("run_id = ?")
params.append(run_id)
if error is not None:
sets.append("error = ?")
params.append(error)
if status in ("done", "failed"):
sets.append("finished_at = datetime('now')")
elif status == "queued":
sets.append("started_at = NULL")
sets.append("finished_at = NULL")
params.append(job_id)
cur = conn.execute(
f"UPDATE jobs SET {', '.join(sets)} WHERE id = ? AND status='running'",
params,
)
conn.commit()
return cur.rowcount == 1
finally:
conn.close()
def get_job(job_id: int) -> dict | None:
"""Fetch a single job by id."""
conn = get_db()

370
src/job_reaper.py Normal file
View File

@@ -0,0 +1,370 @@
"""ORCH-065: job-reaper + proactive merge-lease reclaim background daemon.
Three failure classes share one root cause — "the thread/process died while it
still held captured state" — and one inert recovery layer
(``requeue_running_jobs``) that only fires on a process restart:
* **A — zombie jobs.** A job's terminal status (``done``/``queued``/``failed``)
is written ONLY inside ``launcher._monitor_agent -> _finalize_job`` in the
live process. If that thread/process dies between ``proc.wait()`` and the
status write (crash, OOM, self-restart mid-deploy) the ``jobs`` row stays
``running`` forever. At ``max_concurrency=1`` one zombie blocks the claim of
EVERY project's jobs -> the whole shared pipeline stalls.
* **B — stuck merge-lease.** The file lease ``.merge-lease-<repo>.json``
(ORCH-043) is reclaimed only lazily, by TTL, and only when ANOTHER task tries
to acquire it. Holder liveness (pid) is never probed, so a death with the
lease held blocks foreign merges until the TTL expires.
This module is a background daemon thread modelled on ``reconciler``
(``threading.Thread(daemon=True)`` + ``threading.Event``, start/stop in
``main.lifespan``, ``/queue`` snapshot, per-unit never-raise, kill-switch). Each
tick: (1) scans ``running`` jobs and reaps the dead ones via three-tier liveness
detection; (2) proactively reclaims dead/stale merge-leases (mechanism B) for the
in-scope repos.
Liveness (defense in depth, ADR-001 Р-1):
* **Tier-1 (primary): dead pid.** ``jobs.pid`` (stamped by ``launcher._spawn``)
probed with ``merge_gate.pid_alive``. A job is reaped only after
``reaper_dead_ticks`` (>=2) CONSECUTIVE dead-pid ticks — an in-memory streak
counter kills false positives (AC-3); a live agent within its timeout is
never reaped.
* **Tier-2 (completion race): exit_code recorded but job still running.** The
monitor died between writing ``agent_runs.exit_code`` and ``_finalize_job``.
The outcome is KNOWN -> gate-driven advance on exit0, else the standard
transient/permanent contract.
* **Tier-3 (backstop): age ceiling.** A job ``running`` longer than
``reaper_max_running_s`` (deliberately > max ``agent_timeout`` + grace) is
reaped even when liveness cannot be determined (pid reused / unknown).
Action on confirmed death reuses existing contracts (no new merge/stage logic):
* The reaper's ONLY mutating write to a job row is the atomic terminal flip
``db.reap_running_job(... WHERE status='running')`` — so a late-arriving
monitor / the startup ``requeue_running_jobs`` / a second tick can never
double-process a row (AC-5; the loser sees ``rowcount==0``).
* **exit0 (Tier-2):** gate-driven idempotent advance — the source of truth is
the canonical quality gate, NOT "exit0". If the stage already advanced ->
just mark ``done`` (idempotent cleanup). Else run ``launcher._try_advance_stage``
(it runs the canonical QG: artifact/PR present -> green -> advance; absent ->
red -> no advance) and re-check: advanced -> ``done``; still red (e.g. the
monitor died before git-push, so no artifact) -> fall through to the failure
path. This makes a false ``done`` without real work impossible.
* **exit!=0 (Tier-2) / unknown outcome (Tier-1 dead pid, Tier-3 backstop):**
``attempts < max_attempts`` -> ``queued`` (mirrors ``requeue_running_jobs``);
budget exhausted -> ``failed`` + Telegram. We never fabricate exit0.
Invariants (ТЗ §8 / ADR-001): never-raise per unit of work; idempotency (atomic
guard + gate-driven advance); restart-safe (the reaper starts AFTER the startup
``requeue_running_jobs``); silence when nothing is anomalous; the reaper NEVER
restarts/kills the prod container and NEVER pushes ``main``. ``STAGE_TRANSITIONS``
/ ``QG_CHECKS`` and every ``check_*`` signature are unchanged.
See docs/work-items/ORCH-065/06-adr/ADR-001-job-reaper-and-lease-reclaim.md and
the cross-cutting docs/architecture/adr/adr-0011-job-reaper-lease-reclaim.md.
"""
import logging
import threading
from datetime import datetime, timezone
from .config import settings
from .db import (
get_db,
get_running_jobs,
reap_running_job,
)
from .stages import STAGE_TRANSITIONS, get_agent_for_stage
logger = logging.getLogger("orchestrator.job_reaper")
def reclaim_all_stale_leases() -> int:
"""Proactively reclaim dead/stale merge-leases for every in-scope repo.
Used both at startup (``main.lifespan``, next to ``requeue_running_jobs``) and
on every reaper tick (mechanism B). Iterates the merge-gate scope
(``merge_gate_repos`` CSV, else self-hosting ``orchestrator``) and calls the
never-raise ``merge_gate.reclaim_stale_lease`` per repo. Returns the number of
leases actually reclaimed. Never raises (per-repo isolation).
"""
if not settings.lease_reclaim_enabled:
return 0
reclaimed = 0
try:
from . import merge_gate
raw = (settings.merge_gate_repos or "").strip()
if raw:
repos = [r.strip() for r in raw.split(",") if r.strip()]
else:
from .qg.checks import SELF_HOSTING_REPO
repos = [SELF_HOSTING_REPO]
for repo in repos:
try:
if merge_gate.reclaim_stale_lease(repo):
reclaimed += 1
except Exception as e: # noqa: BLE001 - isolate one repo's failure
logger.error("lease-reclaim failed for repo %s: %s", repo, e)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.error("reclaim_all_stale_leases error: %s", e)
return reclaimed
class JobReaper:
"""Background daemon that reaps zombie jobs and reclaims stale merge-leases.
Modelled on ``Reconciler``: a ``threading.Thread(daemon=True)`` + a
``threading.Event`` for a clean stop. The only in-memory state is the
best-effort Tier-1 dead-pid streak counter (``_streak``) and the
observability counters (``reaped_total`` / ``last_reaped`` /
``lease_reclaimed_total`` / ``last_run_ts``); all reset on restart, which is
safe because the startup ``requeue_running_jobs`` covers the restart path.
"""
def __init__(self, interval_s: float | None = None):
self.interval_s = (
interval_s if interval_s is not None else settings.reaper_interval_s
)
self._stop = threading.Event()
self._thread: threading.Thread | None = None
# Tier-1 anti-false-positive: {job_id: consecutive dead-pid ticks}.
self._streak: dict[int, int] = {}
# Best-effort observability (Р-6).
self.last_run_ts: float | None = None
self.reaped_total: int = 0
self.last_reaped: dict | None = None
self.lease_reclaimed_total: int = 0
# -- A: zombie-job reaping --------------------------------------------
def reap_once(self) -> None:
"""One scan over all ``running`` jobs (per-job never-raise) + lease reclaim."""
if settings.reaper_enabled:
try:
running = get_running_jobs()
except Exception as e: # noqa: BLE001 - never break the tick
logger.error("reaper: get_running_jobs failed: %s", e)
running = []
seen: set[int] = set()
for job in running:
jid = job.get("id")
if jid is not None:
seen.add(jid)
try:
self._reap_job(job)
except Exception as e: # noqa: BLE001 - isolate one job's failure
logger.error(
"reaper: job %s (agent=%s) failed: %s",
job.get("id"), job.get("agent"), e,
)
# Forget streaks for rows that are no longer running (reaped / requeued
# / finished by the monitor) so the dict cannot grow unbounded.
self._streak = {k: v for k, v in self._streak.items() if k in seen}
# Mechanism B: proactive stale/dead lease reclaim (own kill-switch).
try:
self.lease_reclaimed_total += reclaim_all_stale_leases()
except Exception as e: # noqa: BLE001 - never break the tick
logger.error("reaper: lease reclaim sweep failed: %s", e)
def _reap_job(self, job: dict) -> None:
"""Apply the three-tier liveness policy to a single running job."""
from . import merge_gate
job_id = job["id"]
pid = job.get("pid")
age = int(job.get("running_age_s") or 0)
exit_code = job.get("exit_code") # from the LEFT JOIN on agent_runs
# Tier-2: the process finished (exit_code recorded) but the job is still
# 'running' -> the monitor died mid-finalize. Outcome is KNOWN.
if exit_code is not None:
self._streak.pop(job_id, None)
self._reap_known_outcome(job, int(exit_code))
return
# Tier-1: dead pid, only after `reaper_dead_ticks` consecutive dead ticks.
if pid is not None and not merge_gate.pid_alive(pid):
n = self._streak.get(job_id, 0) + 1
self._streak[job_id] = n
if n >= max(int(settings.reaper_dead_ticks), 1):
self._streak.pop(job_id, None)
self._reap_unknown_outcome(job, reason=f"dead pid={pid}")
return
logger.info(
"reaper: job %s pid=%s dead (streak %d/%d) — deferring",
job_id, pid, n, settings.reaper_dead_ticks,
)
else:
# Alive / no pid -> reset the streak (must be CONSECUTIVE).
self._streak.pop(job_id, None)
# Tier-3: backstop ceiling (one-shot; reaps even when liveness is unknown).
if age >= int(settings.reaper_max_running_s):
self._streak.pop(job_id, None)
self._reap_unknown_outcome(
job, reason=f"backstop age={age}s>={settings.reaper_max_running_s}s"
)
# -- reap actions ------------------------------------------------------
def _reap_known_outcome(self, job: dict, exit_code: int) -> None:
"""Tier-2: the agent's exit_code is known; drive the job's terminal status."""
if exit_code == 0:
if self._gate_driven_advance(job):
if reap_running_job(job["id"], "done", run_id=job.get("run_id")):
self._note_reap(job, "done", reason="exit0, gate green")
return
# exit0 but the gate is red (e.g. monitor died before git-push -> no
# artifact). Do NOT fabricate 'done'; treat as a failed outcome.
self._reap_unknown_outcome(job, reason="exit0 but gate red")
else:
self._reap_unknown_outcome(job, reason=f"exit={exit_code}")
def _reap_unknown_outcome(self, job: dict, reason: str) -> None:
"""Tier-1/Tier-3 (or exit!=0): outcome not a clean success.
Mirrors ``requeue_running_jobs`` / the permanent-failure contract:
``attempts < max_attempts`` -> ``queued`` (a retry); budget exhausted ->
``failed`` + Telegram. The terminal flip is the atomic ``reap_running_job``
guard, so a racing requeue/monitor never double-processes the row.
"""
job_id = job["id"]
run_id = job.get("run_id")
attempts = int(job.get("attempts") or 0)
max_attempts = int(job.get("max_attempts") or 2)
err = f"reaped: {reason} (run_id={run_id})"
if attempts < max_attempts:
if reap_running_job(job_id, "queued", run_id=run_id, error=err):
self._note_reap(job, "queued", reason=reason)
else:
if reap_running_job(job_id, "failed", run_id=run_id, error=err):
self._note_reap(job, "failed", reason=reason)
self._notify_failed(job, reason)
def _gate_driven_advance(self, job: dict) -> bool:
"""Idempotent, gate-driven stage advance for a reaped exit0 job.
Returns True iff the stage is (or has become) advanced past this agent's
stage — i.e. the canonical quality gate is satisfied and a clean ``done``
is correct. Returns False when the gate is still red (the caller then
routes the job to the failure path instead of a false ``done``).
The advance itself reuses the UNCHANGED ``launcher._try_advance_stage``
(which runs the canonical QG and the unified ``advance_stage``); the
reaper never duplicates ``update_task_stage`` / ``enqueue_job``.
"""
agent = job.get("agent")
repo = job.get("repo")
run_id = job.get("run_id")
branch, stage = self._task_branch_stage(job)
# Candidate stages whose finishing agent is THIS agent (deployer maps to
# both 'testing' and 'deploy-staging', hence a set).
candidates = {s for s in STAGE_TRANSITIONS if get_agent_for_stage(s) == agent}
if stage is None or stage not in candidates:
# Stage already advanced past this agent (or unknown) -> idempotent
# cleanup: a clean 'done' is correct without re-advancing.
return True
if not branch:
return False
try:
from .agents.launcher import launcher
launcher._try_advance_stage(run_id, agent, repo, branch)
except Exception as e: # noqa: BLE001 - never break the reap
logger.error("reaper: gate-driven advance failed for job %s: %s",
job.get("id"), e)
return False
# Re-read the stage: advanced out of the candidate set -> gate was green.
_branch, new_stage = self._task_branch_stage(job)
return new_stage is None or new_stage not in candidates
@staticmethod
def _task_branch_stage(job: dict) -> tuple[str | None, str | None]:
"""Resolve (branch, stage) for the job's task. Never raises."""
task_id = job.get("task_id")
if not task_id:
return None, None
try:
conn = get_db()
row = conn.execute(
"SELECT branch, stage FROM tasks WHERE id = ?", (task_id,)
).fetchone()
conn.close()
if not row:
return None, None
return row["branch"], row["stage"]
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("reaper: task lookup failed for job %s: %s",
job.get("id"), e)
return None, None
def _notify_failed(self, job: dict, reason: str) -> None:
try:
from .notifications import send_telegram
send_telegram(
f"\U0001f6a8 reaper: job {job.get('id')} ({job.get('agent')}, "
f"repo {job.get('repo')}) reaped as FAILED: {reason}"
)
except Exception as e: # noqa: BLE001 - telegram best-effort
logger.warning("reaper: failed-notify telegram error: %s", e)
def _note_reap(self, job: dict, outcome: str, reason: str) -> None:
"""Record + log one successful reap (Р-6 observability)."""
self.reaped_total += 1
self.last_reaped = {
"job_id": job.get("id"),
"agent": job.get("agent"),
"outcome": outcome,
}
logger.warning(
"reaper: job %s (agent=%s, repo=%s, run_id=%s, pid=%s) reaped -> %s (%s)",
job.get("id"), job.get("agent"), job.get("repo"),
job.get("run_id"), job.get("pid"), outcome, reason,
)
# -- loop / lifecycle --------------------------------------------------
def _tick(self) -> None:
try:
self.reap_once()
finally:
self.last_run_ts = datetime.now(timezone.utc).timestamp()
def _run(self) -> None:
logger.info(
"JobReaper started (interval=%ss, enabled=%s, dead_ticks=%s, "
"max_running_s=%s, lease_reclaim=%s)",
self.interval_s, settings.reaper_enabled, settings.reaper_dead_ticks,
settings.reaper_max_running_s, settings.lease_reclaim_enabled,
)
while not self._stop.is_set():
try:
self._tick()
except Exception as e: # noqa: BLE001 - outer never-raise
logger.error("JobReaper loop error: %s", e)
self._stop.wait(self.interval_s)
logger.info("JobReaper stopped")
def start(self) -> None:
"""Start the daemon thread (idempotent: a live thread is a no-op)."""
if self._thread and self._thread.is_alive():
return
self._stop.clear()
self._thread = threading.Thread(
target=self._run, name="job-reaper", daemon=True
)
self._thread.start()
def stop(self, timeout: float = 5.0) -> None:
self._stop.set()
if self._thread:
self._thread.join(timeout=timeout)
def status(self) -> dict:
"""Reaper snapshot for /queue observability (Р-6)."""
return {
"enabled": settings.reaper_enabled,
"interval": self.interval_s,
"last_run_ts": self.last_run_ts,
"reaped_total": self.reaped_total,
"last_reaped": self.last_reaped,
"lease_reclaimed_total": self.lease_reclaimed_total,
}
# Module-level singleton used by the FastAPI lifespan.
reaper = JobReaper()

View File

@@ -60,6 +60,19 @@ async def lifespan(app: FastAPI):
if requeued:
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
# ORCH-065: proactive startup reclaim of dead/stale merge-leases, next to the
# queue-recovery above. A lease held by the previous (now dead) process pid is
# released at once instead of waiting for the TTL / a foreign acquire so the
# next merge is not blocked. Conditional (merge_gate_repos / self-hosting) and
# gated by ORCH_LEASE_RECLAIM_ENABLED; never raises.
try:
from .job_reaper import reclaim_all_stale_leases
reclaimed = reclaim_all_stale_leases()
if reclaimed:
log.warning(f"Startup lease-reclaim: reclaimed {reclaimed} stale merge-lease(s)")
except Exception as e:
log.warning(f"Startup lease-reclaim skipped: {e}")
# L-2: rotate old per-run logs at startup (best-effort; never fatal).
try:
import os as _os
@@ -85,13 +98,22 @@ async def lifespan(app: FastAPI):
from .reconciler import reconciler
reconciler.start()
# ORCH-065: start the job-reaper LAST (after requeue_running_jobs + the worker
# + the reconciler) so its atomic status='running' guard never races the
# startup requeue. It reaps zombie jobs and periodically reclaims stale
# merge-leases. Kill-switch: ORCH_REAPER_ENABLED.
from .job_reaper import reaper
reaper.start()
try:
yield
finally:
# Graceful shutdown order mirrors startup in reverse: stop the reconciler
# first (it must not enqueue new work while the worker is winding down),
# then the worker. Running agents keep going; their jobs are requeued on
# next start via queue-recovery if the process dies.
# Graceful shutdown order mirrors startup in reverse: stop the reaper
# first, then the reconciler (it must not enqueue new work while the
# worker is winding down), then the worker. Running agents keep going;
# their jobs are requeued on next start via queue-recovery if the
# process dies.
reaper.stop()
reconciler.stop()
worker.stop()
@@ -123,6 +145,7 @@ async def queue():
from .db import job_status_counts, recent_jobs
from .queue_worker import worker
from .reconciler import reconciler
from .job_reaper import reaper
from . import post_deploy
return {
"counts": job_status_counts(),
@@ -130,6 +153,7 @@ async def queue():
"poll_interval": worker.poll_interval,
"resilience": worker.status(),
"reconcile": reconciler.status(),
"reaper": reaper.status(),
"post_deploy": post_deploy.status(),
"recent": recent_jobs(10),
}

View File

@@ -338,3 +338,141 @@ def release_merge_lease(repo: str, branch: str | None = None) -> None:
return
except OSError as e:
logger.warning("merge-lease release error for %s: %s", repo, e)
# ---------------------------------------------------------------------------
# ORCH-065: proactive stale/dead merge-lease reclaim (Problem B)
# ---------------------------------------------------------------------------
def pid_alive(pid) -> bool:
"""Return True iff process ``pid`` is alive (``os.kill(pid, 0)`` liveness probe).
Semantics (ADR-001 Р-2, never-raise):
* ``ProcessLookupError`` -> the process is gone -> ``False`` (reclaimable).
* ``PermissionError`` -> the pid exists but is owned by another user ->
``True`` (alive; conservatively do NOT reclaim).
* missing / invalid pid -> ``True`` (conservative: a lease that predates the
pid field, or a malformed pid, is NOT reclaimed on the liveness signal —
the TTL backstop still catches it).
Never raises; any unexpected OS/type error -> conservative ``True``.
"""
if not pid:
return True
try:
os.kill(int(pid), 0)
return True
except ProcessLookupError:
return False
except PermissionError:
return True
except (OSError, ValueError, TypeError):
return True
def _lease_reclaim_applies(repo: str) -> bool:
"""Whether proactive lease-reclaim is REAL for ``repo`` (same scope as merge-gate).
Reuses ``qg.checks._merge_gate_applies`` (``merge_gate_repos`` CSV, else the
self-hosting ``orchestrator``) so reclaim and the gate share one predicate
(ADR-001 Р-2 / FR-2.4). Imported lazily to avoid an import cycle (qg.checks
imports merge_gate lazily inside ``check_branch_mergeable``). Never raises:
any error -> ``False`` (no-op, the safe default).
"""
try:
from .qg.checks import _merge_gate_applies
return _merge_gate_applies(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("lease-reclaim applicability check failed for %s: %s", repo, e)
return False
def reclaim_stale_lease(repo: str) -> bool:
"""Proactively reclaim a dead/stale merge-lease for ``repo`` (ADR-001 Р-2).
Unlike the lazy TTL reclaim inside ``acquire_merge_lease`` (which only fires
when ANOTHER task tries to acquire), this releases the lease as soon as the
holder is provably gone — without waiting for the TTL or a foreign acquire:
* holder pid is dead (``pid_alive`` is False) -> reclaim, OR
* lease age >= ``merge_lock_timeout_s`` (TTL) -> reclaim (AC-7).
A LIVE holder within its TTL is never touched (AC-8 — protects a legitimate
in-flight merge). Reclaim is holder-aware (``release_merge_lease(repo,
branch=holder)``) so it can never delete a lease a different task acquired in
the meantime. Conditional (FR-2.4): real only for ``merge_gate_repos`` /
self-hosting; other repos -> no-op. Kill-switch ``lease_reclaim_enabled``.
Returns True iff a lease was reclaimed. Never raises (AC-9): any read/remove
error is logged and swallowed so a single bad lease never kills the reaper
thread. Does NOT run any git operation — only the lease file is removed.
"""
try:
if not settings.lease_reclaim_enabled:
return False
if not _lease_reclaim_applies(repo):
return False
path = _lease_path(repo)
existing = _read_lease(path)
if existing is None:
return False # no lease (or unreadable -> _read_lease already logged)
holder = existing.get("branch")
pid = existing.get("pid")
age = time.time() - float(existing.get("acquired_at") or 0)
dead = not pid_alive(pid)
expired = age >= settings.merge_lock_timeout_s
if not (dead or expired):
return False # live holder within TTL -> protect legitimate merge
why = f"dead pid={pid}" if dead else f"stale age={age:.0f}s>=TTL"
release_merge_lease(repo, branch=holder)
logger.warning(
"merge-lease for %s reclaimed proactively (%s, holder=%s)",
repo, why, holder,
)
try:
from .notifications import send_telegram
send_telegram(
f"\U0001f527 merge-lease для {repo} освобождён проактивно "
f"({why}, holder={holder})"
)
except Exception as e: # noqa: BLE001 - telegram best-effort, never fatal
logger.warning("lease-reclaim telegram failed for %s: %s", repo, e)
return True
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("reclaim_stale_lease unexpected error for %s: %s", repo, e)
return False
# ---------------------------------------------------------------------------
# ORCH-065: idempotent merge finalization guard (Problem C)
# ---------------------------------------------------------------------------
def pr_already_merged(repo: str, branch: str) -> bool:
"""Return True iff the PR for ``branch`` is ALREADY merged (ADR-001 Р-3, FR-3.2).
A deterministic, read-only guard the merge path consults BEFORE attempting a
(second) merge so a re-driven / reaped task is idempotent: an already-merged
PR -> no-op, never a duplicate merge and never an error. This is the ONLY new
merge-related helper and it does NOT merge — it only READS the PR state via
the existing Gitea client, so it does not introduce duplicate merge logic.
Queries Gitea ``GET /repos/{owner}/{repo}/pulls?state=all&head=<branch>`` and
reports True when any matching PR has ``merged == True``. Never raises (AC-9):
any HTTP/parse error -> ``False`` (conservative: "not known-merged" lets the
normal gate re-evaluate rather than silently skipping a real merge).
"""
try:
import httpx
owner = settings.gitea_owner
headers = {"Authorization": f"token {settings.gitea_token}"}
resp = httpx.get(
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
params={"state": "all", "head": branch},
headers=headers, timeout=_SHORT_TIMEOUT,
)
if resp.status_code != 200:
return False
for pr in resp.json() or []:
if pr.get("merged") is True:
return True
return False
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("pr_already_merged check failed for %s/%s: %s", repo, branch, e)
return False

View File

@@ -165,3 +165,82 @@ def test_staging_infra_tolerance_env_override_true(monkeypatch):
"""The field is read verbatim from its ORCH_* env var."""
monkeypatch.setenv("ORCH_STAGING_INFRA_TOLERANCE_ENABLED", "true")
assert Settings().staging_infra_tolerance_enabled is True
# ---------------------------------------------------------------------------
# ORCH-065 / TC-20: reaper_* + lease_reclaim_* settings defaults + env override.
# ---------------------------------------------------------------------------
_REAPER_ENV = (
"ORCH_REAPER_ENABLED",
"ORCH_REAPER_INTERVAL_S",
"ORCH_REAPER_DEAD_TICKS",
"ORCH_REAPER_MAX_RUNNING_S",
"ORCH_LEASE_RECLAIM_ENABLED",
)
def test_reaper_settings_defaults(monkeypatch):
"""TC-20 / §5: documented defaults when no env is set."""
for name in _REAPER_ENV:
monkeypatch.delenv(name, raising=False)
s = Settings()
assert s.reaper_enabled is True
assert s.reaper_interval_s == 60
assert s.reaper_dead_ticks == 2
assert s.reaper_max_running_s == 3600
assert s.lease_reclaim_enabled is True
def test_reaper_settings_env_override(monkeypatch):
"""TC-20 / §5 / AC-14: each field is read from its ORCH_* env var."""
monkeypatch.setenv("ORCH_REAPER_ENABLED", "false")
monkeypatch.setenv("ORCH_REAPER_INTERVAL_S", "30")
monkeypatch.setenv("ORCH_REAPER_DEAD_TICKS", "5")
monkeypatch.setenv("ORCH_REAPER_MAX_RUNNING_S", "1200")
monkeypatch.setenv("ORCH_LEASE_RECLAIM_ENABLED", "false")
s = Settings()
assert s.reaper_enabled is False
assert s.reaper_interval_s == 30
assert s.reaper_dead_ticks == 5
assert s.reaper_max_running_s == 1200
assert s.lease_reclaim_enabled is False
# ---------------------------------------------------------------------------
# ORCH-065 / TC-19: contracts unchanged — no new stages / QG checks; the
# check_branch_mergeable signature is intact (AC-13).
# ---------------------------------------------------------------------------
def test_tc19_stage_transitions_unchanged():
"""No new pipeline stage was introduced by ORCH-065."""
from src.stages import STAGE_TRANSITIONS
assert set(STAGE_TRANSITIONS) == {
"created", "analysis", "architecture", "development", "review",
"testing", "deploy-staging", "deploy", "done",
}
def test_tc19_qg_checks_registry_unchanged():
"""No new quality-gate check was added to the registry by ORCH-065."""
from src.qg.checks import QG_CHECKS
assert set(QG_CHECKS) == {
"check_analysis_approved",
"check_analysis_complete",
"check_architecture_done",
"check_ci_green",
"check_review_approved",
"check_tests_passed",
"check_reviewer_verdict",
"check_tests_local",
"check_deploy_status",
"check_staging_status",
"check_branch_mergeable",
"check_staging_image_fresh",
}
def test_tc19_check_branch_mergeable_signature_intact():
"""check_branch_mergeable still takes exactly (repo, work_item_id, branch)."""
import inspect
from src.qg.checks import check_branch_mergeable
params = list(inspect.signature(check_branch_mergeable).parameters)
assert params == ["repo", "work_item_id", "branch"]

285
tests/test_job_reaper.py Normal file
View File

@@ -0,0 +1,285 @@
"""ORCH-065: job-reaper unit tests (TC-01..TC-08, TC-21).
The reaper never spawns claude; we drive the DB directly (a 'running' jobs row +
optional agent_runs exit_code/pid) and assert the terminal flip + side-effects.
``os.kill`` liveness is monkeypatched so a 'dead'/'alive' pid is deterministic.
"""
import os
import tempfile
import pytest
# Override env before importing app modules (same convention as test_queue.py).
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch_reaper.db")
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
import src.db as db
from src.db import init_db, get_db, enqueue_job, get_job
import src.job_reaper as jr
from src.job_reaper import JobReaper
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "reaper.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
init_db()
yield
# --- helpers ----------------------------------------------------------------
def _make_running_job(agent="developer", repo="orchestrator", task_id=None,
pid=None, age_s=0, attempts=0, max_attempts=2,
run_id=None, exit_code=None):
"""Insert a job already in 'running' with the given pid/age/attempts.
started_at is back-dated by ``age_s`` seconds so running_age_s reflects it.
When ``exit_code`` is given an agent_runs row is created and linked (Tier-2).
"""
conn = get_db()
if run_id is None and exit_code is not None:
cur = conn.execute(
"INSERT INTO agent_runs (task_id, agent, finished_at, exit_code) "
"VALUES (?, ?, datetime('now'), ?)",
(task_id, agent, exit_code),
)
run_id = cur.lastrowid
cur = conn.execute(
"INSERT INTO jobs (agent, repo, task_id, status, attempts, max_attempts, "
"run_id, pid, started_at) "
"VALUES (?, ?, ?, 'running', ?, ?, ?, ?, datetime('now', ?))",
(agent, repo, task_id, attempts, max_attempts, run_id, pid,
f"-{int(age_s)} seconds"),
)
job_id = cur.lastrowid
conn.commit()
conn.close()
return job_id
def _make_task(repo="orchestrator", branch="feature/x", stage="development",
work_item_id="ORCH-1"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, branch, stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _dead_pid(monkeypatch):
"""Force merge_gate.pid_alive -> False (process gone) for the reaper."""
import src.merge_gate as mg
monkeypatch.setattr(mg, "pid_alive", lambda pid: False)
def _alive_pid(monkeypatch):
import src.merge_gate as mg
monkeypatch.setattr(mg, "pid_alive", lambda pid: True)
# --- TC-01: dead executor -> reaped without process restart -----------------
def test_tc01_dead_pid_reaped_to_queued(monkeypatch):
_dead_pid(monkeypatch)
jid = _make_running_job(pid=999999, attempts=0, max_attempts=2)
r = JobReaper()
r.reap_once() # tick 1 (streak=1, dead_ticks default 2 -> not yet)
assert get_job(jid)["status"] == "running"
r.reap_once() # tick 2 -> reaped
assert get_job(jid)["status"] == "queued"
assert r.reaped_total == 1
assert r.last_reaped["job_id"] == jid
# --- TC-02: live agent within timeout is NEVER reaped -----------------------
def test_tc02_alive_pid_never_reaped(monkeypatch):
_alive_pid(monkeypatch)
jid = _make_running_job(pid=4321, age_s=10)
r = JobReaper()
for _ in range(5):
r.reap_once()
assert get_job(jid)["status"] == "running"
assert r.reaped_total == 0
def test_tc02_alive_within_max_running_not_reaped(monkeypatch):
_alive_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_max_running_s", 3600)
jid = _make_running_job(pid=4321, age_s=1800) # < ceiling, alive
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "running"
# --- TC-03: zombie only after reaper_dead_ticks consecutive ticks -----------
def test_tc03_requires_consecutive_dead_ticks(monkeypatch):
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 3)
import src.merge_gate as mg
# Dead, dead, ALIVE (resets), dead, dead, dead -> reaped only on the 6th tick.
seq = iter([False, False, True, False, False, False])
monkeypatch.setattr(mg, "pid_alive", lambda pid: next(seq))
jid = _make_running_job(pid=999998)
r = JobReaper()
for _ in range(5):
r.reap_once()
assert get_job(jid)["status"] == "running"
r.reap_once() # 6th tick: third CONSECUTIVE dead -> reaped
assert get_job(jid)["status"] == "queued"
# --- TC-04: backstop ceiling reaps even when liveness is unknown ------------
def test_tc04_backstop_ceiling(monkeypatch):
_alive_pid(monkeypatch) # liveness says "alive", but age exceeds the ceiling
monkeypatch.setattr(db.settings, "reaper_max_running_s", 100)
jid = _make_running_job(pid=4321, age_s=500)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued"
assert r.reaped_total == 1
def test_tc04_backstop_no_pid(monkeypatch):
monkeypatch.setattr(db.settings, "reaper_max_running_s", 100)
jid = _make_running_job(pid=None, age_s=500)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued"
# --- TC-05: correct outcome by exit_code (Tier-2) ---------------------------
def test_tc05_exit0_gate_green_done(monkeypatch):
# A developer job runs to LEAVE the 'architecture' stage (-> 'development').
tid = _make_task(stage="architecture")
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0)
# gate green -> advance succeeds (stage leaves the developer candidate set).
import src.agents.launcher as L
monkeypatch.setattr(
L.launcher, "_try_advance_stage",
lambda run_id, agent, repo, branch: db.update_task_stage(tid, "development"),
)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "done"
def test_tc05_exit0_gate_red_requeues(monkeypatch):
tid = _make_task(stage="architecture")
jid = _make_running_job(agent="developer", task_id=tid, exit_code=0,
attempts=0, max_attempts=2)
# gate red -> _try_advance_stage is a no-op (stage stays 'architecture').
import src.agents.launcher as L
monkeypatch.setattr(L.launcher, "_try_advance_stage",
lambda run_id, agent, repo, branch: None)
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued" # exit0 but gate red -> not 'done'
def test_tc05_nonzero_exit_requeue_then_failed(monkeypatch):
sent = []
monkeypatch.setattr(jr, "JobReaper", JobReaper)
tid = _make_task(stage="development")
jid = _make_running_job(agent="developer", task_id=tid, exit_code=1,
attempts=1, max_attempts=2)
r = JobReaper()
import src.notifications as notif
monkeypatch.setattr(notif, "send_telegram", lambda *a, **k: sent.append(a))
r.reap_once() # attempts(1) < max(2) -> queued
assert get_job(jid)["status"] == "queued"
# Now exhaust the budget.
jid2 = _make_running_job(agent="developer", task_id=tid, exit_code=1,
attempts=2, max_attempts=2)
r.reap_once()
assert get_job(jid2)["status"] == "failed"
assert sent, "failed reap must send a Telegram alert"
# --- TC-06: atomicity — reaper vs requeue_running_jobs (status guard) --------
def test_tc06_atomic_no_double_reap(monkeypatch):
_dead_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1)
jid = _make_running_job(pid=999997, attempts=0, max_attempts=2)
# Simulate the startup requeue winning the row first.
n = db.requeue_running_jobs()
assert n == 1
assert get_job(jid)["status"] == "queued"
# The reaper now scans: the row is no longer 'running' -> reap_running_job's
# WHERE status='running' guard yields rowcount 0 -> no second processing.
r = JobReaper()
r.reap_once()
assert get_job(jid)["status"] == "queued"
assert r.reaped_total == 0
def test_tc06_reap_running_job_guard_returns_false_when_not_running():
jid = enqueue_job("developer", "orchestrator") # status 'queued', not running
assert db.reap_running_job(jid, "done") is False
assert get_job(jid)["status"] == "queued"
# --- TC-07: kill-switch reaper_enabled=False -> no-op -----------------------
def test_tc07_kill_switch(monkeypatch):
_dead_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_enabled", False)
monkeypatch.setattr(db.settings, "lease_reclaim_enabled", False)
jid = _make_running_job(pid=999996, age_s=99999)
r = JobReaper()
for _ in range(3):
r.reap_once()
assert get_job(jid)["status"] == "running"
assert r.reaped_total == 0
# --- TC-08: never-raise — a DB/OS error in one tick does not propagate -------
def test_tc08_never_raise_isolates_per_job(monkeypatch):
_dead_pid(monkeypatch)
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1)
good = _make_running_job(pid=111, attempts=0, max_attempts=2)
bad = _make_running_job(pid=222, attempts=0, max_attempts=2)
r = JobReaper()
orig = r._reap_job
def boom(job):
if job["id"] == bad:
raise RuntimeError("simulated per-job failure")
return orig(job)
monkeypatch.setattr(r, "_reap_job", boom)
# Must not raise despite the bad job blowing up.
r.reap_once()
# The good job is still reaped; the bad one is isolated (stays running).
assert get_job(good)["status"] == "queued"
assert get_job(bad)["status"] == "running"
def test_tc08_reap_once_outer_never_raises(monkeypatch):
monkeypatch.setattr(jr, "get_running_jobs",
lambda: (_ for _ in ()).throw(RuntimeError("db down")))
r = JobReaper()
# reap_once swallows... actually get_running_jobs is iterated in the for; the
# _tick wrapper guarantees the loop never dies. Assert _tick is safe.
r._tick()
assert r.last_run_ts is not None
# --- TC-21: startup lease-reclaim + reaper start/stop smoke -----------------
def test_tc21_reaper_start_stop_smoke():
r = JobReaper(interval_s=0.05)
r.start()
assert r._thread is not None and r._thread.is_alive()
r.stop(timeout=2)
assert not r._thread.is_alive()
def test_tc21_reclaim_all_stale_leases_callable(monkeypatch):
# No lease files present -> 0 reclaimed, never raises (registration smoke).
monkeypatch.setattr(db.settings, "lease_reclaim_enabled", True)
assert jr.reclaim_all_stale_leases() == 0

View File

@@ -11,6 +11,7 @@ import subprocess
import tempfile
import time
import httpx
import pytest
# Env before importing app modules (same convention as the other suites).
@@ -299,3 +300,56 @@ def test_tc11_release_missing_is_noop(lease_dir):
# Releasing a non-existent lease never raises.
merge_gate.release_merge_lease("orchestrator", "feature/none")
merge_gate.release_merge_lease("orchestrator") # force form
# ---------------------------------------------------------------------------
# ORCH-065 / TC-16: idempotent merge finalization — pr_already_merged guard.
# ---------------------------------------------------------------------------
class _FakeResp:
def __init__(self, status_code, payload):
self.status_code = status_code
self._payload = payload
def json(self):
return self._payload
def test_tc16_pr_already_merged_true(monkeypatch):
"""A merged PR -> True so a re-driven/reaped task is a no-op (no second merge)."""
monkeypatch.setattr(
httpx, "get",
lambda *a, **k: _FakeResp(200, [{"number": 7, "merged": True}]),
)
assert merge_gate.pr_already_merged("orchestrator", "feature/x") is True
def test_tc16_pr_open_not_merged_false(monkeypatch):
"""An open / not-yet-merged PR -> False (the normal merge path proceeds)."""
monkeypatch.setattr(
httpx, "get",
lambda *a, **k: _FakeResp(200, [{"number": 7, "merged": False}]),
)
assert merge_gate.pr_already_merged("orchestrator", "feature/x") is False
def test_tc16_pr_no_pr_false(monkeypatch):
monkeypatch.setattr(
httpx, "get", lambda *a, **k: _FakeResp(200, []),
)
assert merge_gate.pr_already_merged("orchestrator", "feature/x") is False
def test_tc16_pr_already_merged_never_raises(monkeypatch):
"""Any HTTP/parse error -> False (conservative), never an exception (AC-9)."""
def boom(*a, **k):
raise RuntimeError("gitea down")
monkeypatch.setattr(httpx, "get", boom)
assert merge_gate.pr_already_merged("orchestrator", "feature/x") is False
def test_tc16_pr_non_200_false(monkeypatch):
monkeypatch.setattr(
httpx, "get", lambda *a, **k: _FakeResp(500, None),
)
assert merge_gate.pr_already_merged("orchestrator", "feature/x") is False

View File

@@ -148,3 +148,63 @@ def test_tc24_red_catch_up_fails_and_releases_main_stays_green(race_repo, monkey
assert _origin_main_sha(origin) == main_before
# The lease was released on failure (a later task can proceed).
assert merge_gate._read_lease(merge_gate._lease_path(repo)) is None
# ---------------------------------------------------------------------------
# ORCH-065 / TC-17: recovery — "rebase+re-test green, merge not done, process
# died" -> reaper requeues -> the merge re-drives the STANDARD path WITHOUT a
# second expensive re-test when safe (the branch is already up-to-date). AC-10.
# ---------------------------------------------------------------------------
def test_tc17_redrive_skips_expensive_retest_when_already_caught_up(
race_repo, monkeypatch
):
repo, origin = race_repo
main_before = _origin_main_sha(origin)
# First pass: B catches up (real rebase onto C1) with a GREEN re-test. This is
# the work that completed before the process died — the lease is held, the
# branch is now caught up on origin.
retest_calls = []
def _retest(r, b):
retest_calls.append((r, b))
return True, "re-test green"
monkeypatch.setattr(merge_gate, "retest_branch", _retest)
passed, reason = check_branch_mergeable(repo, "ORCH-B", "feature/B")
assert passed is True
assert reason == "rebased onto main, re-test green"
assert len(retest_calls) == 1 # the expensive re-test ran ONCE
# The process "died" before the merge: release the lease the way the reaper /
# reconciler recovery path would (the row is requeued; the branch stays caught
# up because the rebase was already pushed).
merge_gate.release_merge_lease(repo, "feature/B")
# Re-drive (standard path) after recovery: the branch already contains
# origin/main, so branch_is_behind_main is False and the gate short-circuits to
# the up-to-date pass WITHOUT re-running the expensive rebase+re-test.
assert merge_gate.branch_is_behind_main(repo, "feature/B") is False
passed2, reason2 = check_branch_mergeable(repo, "ORCH-B", "feature/B")
assert passed2 is True
assert reason2 == "branch up-to-date with main"
assert len(retest_calls) == 1 # NOT re-run on the re-drive (no double cost)
# origin/main was never pushed by the gate across the whole recovery.
assert _origin_main_sha(origin) == main_before
def test_tc17_pr_already_merged_makes_redrive_a_noop(race_repo, monkeypatch):
"""If the PR actually merged before the process died, the idempotency guard
reports it so the re-drive is a no-op (no second merge)."""
import httpx
repo, _ = race_repo
class _R:
status_code = 200
@staticmethod
def json():
return [{"merged": True}]
monkeypatch.setattr(httpx, "get", lambda *a, **k: _R())
assert merge_gate.pr_already_merged(repo, "feature/B") is True

View File

@@ -0,0 +1,138 @@
"""ORCH-065: proactive stale/dead merge-lease reclaim (TC-10..TC-15).
Exercises merge_gate.reclaim_stale_lease / pid_alive directly with lease files
written into a tmp repos_dir. No git ops run (reclaim only removes the lease
file). pid liveness is monkeypatched so 'dead'/'alive' are deterministic.
"""
import json
import os
import tempfile
import time
import pytest
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch_lease.db")
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
from src import merge_gate
@pytest.fixture
def repos_dir(tmp_path, monkeypatch):
d = tmp_path / "repos"
d.mkdir()
monkeypatch.setattr(merge_gate.settings, "repos_dir", str(d))
monkeypatch.setattr(merge_gate.settings, "lease_reclaim_enabled", True)
monkeypatch.setattr(merge_gate.settings, "merge_gate_repos", "") # self-hosting only
monkeypatch.setattr(merge_gate.settings, "merge_lock_timeout_s", 300)
return d
def _write_lease(repos_dir, repo, branch="feature/x", pid=1234, age_s=0):
path = os.path.join(str(repos_dir), f".merge-lease-{repo}.json")
holder = {
"branch": branch,
"work_item_id": "ORCH-1",
"task_id": 1,
"acquired_at": time.time() - age_s,
"pid": pid,
}
with open(path, "w", encoding="utf-8") as f:
f.write(json.dumps(holder))
return path
def _no_telegram(monkeypatch):
import src.notifications as notif
monkeypatch.setattr(notif, "send_telegram", lambda *a, **k: None)
# --- TC-10: reclaim a lease with a DEAD pid, proactively --------------------
def test_tc10_reclaim_dead_pid(repos_dir, monkeypatch):
_no_telegram(monkeypatch)
path = _write_lease(repos_dir, "orchestrator", pid=999999, age_s=0)
monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: False)
assert merge_gate.reclaim_stale_lease("orchestrator") is True
assert not os.path.exists(path) # lease removed
# --- TC-11: reclaim by TTL is preserved -------------------------------------
def test_tc11_reclaim_by_ttl(repos_dir, monkeypatch):
_no_telegram(monkeypatch)
# pid alive, but the lease is older than the TTL -> still reclaimed.
path = _write_lease(repos_dir, "orchestrator", pid=4321, age_s=999)
monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: True)
assert merge_gate.reclaim_stale_lease("orchestrator") is True
assert not os.path.exists(path)
# --- TC-12: a LIVE lease within TTL is NOT released -------------------------
def test_tc12_live_lease_protected(repos_dir, monkeypatch):
_no_telegram(monkeypatch)
path = _write_lease(repos_dir, "orchestrator", pid=4321, age_s=10)
monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: True)
assert merge_gate.reclaim_stale_lease("orchestrator") is False
assert os.path.exists(path) # untouched
# --- TC-13: conditional — non-self-hosting repos are a no-op ----------------
def test_tc13_non_scope_repo_noop(repos_dir, monkeypatch):
_no_telegram(monkeypatch)
path = _write_lease(repos_dir, "enduro-trails", pid=999999, age_s=999)
monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: False)
assert merge_gate.reclaim_stale_lease("enduro-trails") is False
assert os.path.exists(path) # out of scope -> untouched
def test_tc13_merge_gate_repos_csv_scope(repos_dir, monkeypatch):
_no_telegram(monkeypatch)
monkeypatch.setattr(merge_gate.settings, "merge_gate_repos", "enduro-trails")
path = _write_lease(repos_dir, "enduro-trails", pid=999999, age_s=0)
monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: False)
assert merge_gate.reclaim_stale_lease("enduro-trails") is True
assert not os.path.exists(path)
# --- TC-14: never-raise on a read/remove error ------------------------------
def test_tc14_never_raise_on_read_error(repos_dir, monkeypatch):
_no_telegram(monkeypatch)
_write_lease(repos_dir, "orchestrator", pid=1, age_s=999)
def boom(path):
raise OSError("simulated read failure")
monkeypatch.setattr(merge_gate, "_read_lease", boom)
# Must not raise; returns False (could not reclaim).
assert merge_gate.reclaim_stale_lease("orchestrator") is False
def test_tc14_no_lease_file_is_noop(repos_dir, monkeypatch):
_no_telegram(monkeypatch)
assert merge_gate.reclaim_stale_lease("orchestrator") is False
# --- TC-15: kill-switch lease_reclaim_enabled=False -------------------------
def test_tc15_kill_switch(repos_dir, monkeypatch):
_no_telegram(monkeypatch)
monkeypatch.setattr(merge_gate.settings, "lease_reclaim_enabled", False)
path = _write_lease(repos_dir, "orchestrator", pid=999999, age_s=999)
monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: False)
assert merge_gate.reclaim_stale_lease("orchestrator") is False
assert os.path.exists(path) # proactive reclaim off -> untouched
# --- pid_alive semantics ----------------------------------------------------
def test_pid_alive_dead_process():
# PID 999999999 almost certainly does not exist.
assert merge_gate.pid_alive(999999999) is False
def test_pid_alive_self():
assert merge_gate.pid_alive(os.getpid()) is True
def test_pid_alive_missing_pid_conservative():
assert merge_gate.pid_alive(None) is True
assert merge_gate.pid_alive(0) is True

View File

@@ -302,3 +302,58 @@ class TestWorkerConcurrency:
assert count_running_jobs() == 0
counts = job_status_counts()
assert counts["failed"] == 1
# ---------------------------------------------------------------------------
# ORCH-065: job-reaper unblocks the shared queue (TC-09) + /queue block (TC-18)
# ---------------------------------------------------------------------------
class TestReaperUnblocksQueue:
def test_tc09_reap_unblocks_claim_at_concurrency_1(self, monkeypatch):
"""A zombie 'running' row at max_concurrency=1 blocks every claim; once the
reaper reaps it the next queued job can be claimed (AC-2)."""
import src.merge_gate as mg
from src.job_reaper import JobReaper
monkeypatch.setattr(db.settings, "reaper_dead_ticks", 1)
monkeypatch.setattr(mg, "pid_alive", lambda pid: False) # zombie pid dead
# A zombie row stuck 'running' with a dead pid.
conn = db.get_db()
cur = conn.execute(
"INSERT INTO jobs (agent, repo, status, attempts, max_attempts, pid, "
"started_at) VALUES ('developer','r','running',2,2,999999,datetime('now'))"
)
zombie = cur.lastrowid
conn.commit()
conn.close()
# A second job waits in the queue behind it.
nxt = enqueue_job("analyst", "r")
# At concurrency 1 the slot is fully occupied -> nothing else can run.
assert count_running_jobs() == 1
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
JobReaper().reap_once() # dead pid, attempts>=max -> failed
assert get_job(zombie)["status"] == "failed"
assert count_running_jobs() == 0
# Queue is unblocked: the next job claims successfully.
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == nxt
def test_tc18_queue_endpoint_has_reaper_block(self):
"""GET /queue exposes the reaper observability block (AC-15).
Calls the endpoint coroutine directly (no lifespan / no background
threads / no network) so the test stays hermetic.
"""
import asyncio
import src.main as main
body = asyncio.run(main.queue())
assert "reaper" in body
reaper = body["reaper"]
for key in ("enabled", "interval", "last_run_ts", "reaped_total",
"last_reaped", "lease_reclaimed_total"):
assert key in reaper