Files
orchestrator/src/main.py
claude-bot 4bebb921ff feat(reaper): job-reaper + stale merge-lease reclaim + idempotent merge finalization
Closes the "zombie jobs" incident class: job status was set only inside
the live launcher process, so a process death left jobs.status='running'
forever; at max_concurrency=1 one zombie blocked ALL projects' queue
(self-hosting risk). Adds a background daemon (src/job_reaper.py) with
three-tier liveness (dead-pid streak / known exit_code / max-running
backstop) whose only mutating write is an atomic terminal flip guarded by
WHERE status='running' (no double-process). For exit0 the canonical QG is
the source of truth via gate-driven advance, not "exit0".

Also proactively reclaims stale merge-lease (dead pid OR TTL) via file
delete only (no git ops), and makes merge finalization idempotent
(pr_already_merged guard + up-to-date short-circuit on re-drive).

New jobs.pid column via idempotent _ensure_column (no migration); pid
stamped in launcher._spawn after Popen. Reaper start/stop in lifespan;
"reaper" snapshot in GET /queue. Kill-switches: ORCH_REAPER_ENABLED,
ORCH_REAPER_INTERVAL_S, ORCH_REAPER_DEAD_TICKS, ORCH_REAPER_MAX_RUNNING_S,
ORCH_LEASE_RECLAIM_ENABLED.

Invariants unchanged (AC-13): STAGE_TRANSITIONS, QG_CHECKS registry,
check_branch_mergeable signature/behaviour, BUG-8 rollback, hook exit
codes. restart-safe, never-raise per unit of background work.

Docs: docs/architecture/README.md, CHANGELOG.md, .env.example.
Tests: tests/test_job_reaper.py, tests/test_merge_lease_reclaim.py,
tests/test_merge_gate.py (TC-16), tests/test_merge_gate_race.py (TC-17),
tests/test_queue.py, tests/test_config.py (TC-19/TC-20). 742 passed.

Refs: ORCH-065

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-07 16:14:45 +00:00

160 lines
6.1 KiB
Python

from fastapi import FastAPI
from contextlib import asynccontextmanager
import logging
from .db import init_db
from .webhooks.plane import router as plane_router
from .webhooks.gitea import router as gitea_router
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
# M-1: proper orphan-recovery.
# An orphan = an agent_run with no finished_at that is older than the recovery
# window. After a uvicorn restart the monitor thread is gone, so its child claude
# process (if any) was reparented to init; we cannot kill it by pid (pid is not
# persisted). Instead of silently writing exit=-1, we: enumerate each orphan,
# mark it exit=-1, log a warning per run, and notify so a human can check/restart.
log = logging.getLogger('orchestrator')
from .db import get_db
conn = get_db()
orphan_rows = conn.execute(
"SELECT id, task_id, agent FROM agent_runs "
"WHERE finished_at IS NULL AND started_at < datetime('now', '-35 minutes')"
).fetchall()
for row in orphan_rows:
run_id, task_id, agent = row[0], row[1], row[2]
conn.execute(
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-1 WHERE id=?",
(run_id,),
)
log.warning(
f"Orphan run {run_id} (task {task_id}, agent {agent}) recovered — "
f"manual check needed (process may have been killed on restart)"
)
conn.commit()
conn.close()
if orphan_rows:
try:
from .notifications import send_telegram
ids = ", ".join(str(r[0]) for r in orphan_rows)
send_telegram(
f"\u26a0\ufe0f Orchestrator restart: {len(orphan_rows)} orphaned agent run(s) "
f"(run_id: {ids}) marked exit=-1. Нужна ручная проверка/перезапуск."
)
except Exception:
pass
log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs")
# ORCH-1 (F-2b): queue-recovery. Any job left in 'running' status belongs to a
# worker that died on the previous restart -> put it back to 'queued' so the
# worker re-picks it up (restart-safe, no lost work). Runs AFTER M-1.
from .db import requeue_running_jobs
requeued = requeue_running_jobs()
if requeued:
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
# ORCH-065: proactive startup reclaim of dead/stale merge-leases, next to the
# queue-recovery above. A lease held by the previous (now dead) process pid is
# released at once instead of waiting for the TTL / a foreign acquire so the
# next merge is not blocked. Conditional (merge_gate_repos / self-hosting) and
# gated by ORCH_LEASE_RECLAIM_ENABLED; never raises.
try:
from .job_reaper import reclaim_all_stale_leases
reclaimed = reclaim_all_stale_leases()
if reclaimed:
log.warning(f"Startup lease-reclaim: reclaimed {reclaimed} stale merge-lease(s)")
except Exception as e:
log.warning(f"Startup lease-reclaim skipped: {e}")
# L-2: rotate old per-run logs at startup (best-effort; never fatal).
try:
import os as _os
from .config import settings as _settings
from .agents.launcher import prune_run_logs
_runs_dir = _os.path.join(_os.path.dirname(_settings.db_path), "runs")
_removed = prune_run_logs(
_runs_dir,
keep_days=_settings.log_keep_days,
keep_max=_settings.log_keep_max,
)
if _removed:
log.info(f"Log rotation: pruned {_removed} old run log(s) from {_runs_dir}")
except Exception as e:
log.warning(f"Log rotation skipped: {e}")
# Start the background job-queue worker (ORCH-1).
from .queue_worker import worker
worker.start()
# ORCH-053: start the stuck-task reconciler AFTER the worker so its active-job
# guard sees a fully-initialised queue. Kill-switch: ORCH_RECONCILE_ENABLED.
from .reconciler import reconciler
reconciler.start()
# ORCH-065: start the job-reaper LAST (after requeue_running_jobs + the worker
# + the reconciler) so its atomic status='running' guard never races the
# startup requeue. It reaps zombie jobs and periodically reclaims stale
# merge-leases. Kill-switch: ORCH_REAPER_ENABLED.
from .job_reaper import reaper
reaper.start()
try:
yield
finally:
# Graceful shutdown order mirrors startup in reverse: stop the reaper
# first, then the reconciler (it must not enqueue new work while the
# worker is winding down), then the worker. Running agents keep going;
# their jobs are requeued on next start via queue-recovery if the
# process dies.
reaper.stop()
reconciler.stop()
worker.stop()
app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
app.include_router(plane_router, prefix="/webhook")
app.include_router(gitea_router, prefix="/webhook")
@app.get("/health")
async def health():
return {"status": "ok", "service": "orchestrator"}
@app.get("/status")
async def status():
from .db import get_db
conn = get_db()
tasks = conn.execute(
"SELECT * FROM tasks WHERE stage != 'done' ORDER BY created_at DESC LIMIT 10"
).fetchall()
conn.close()
return {"active_tasks": [dict(t) for t in tasks]}
@app.get("/queue")
async def queue():
"""ORCH-1: job-queue observability — status counts + recent jobs."""
from .db import job_status_counts, recent_jobs
from .queue_worker import worker
from .reconciler import reconciler
from .job_reaper import reaper
from . import post_deploy
return {
"counts": job_status_counts(),
"max_concurrency": worker.max_concurrency,
"poll_interval": worker.poll_interval,
"resilience": worker.status(),
"reconcile": reconciler.status(),
"reaper": reaper.status(),
"post_deploy": post_deploy.status(),
"recent": recent_jobs(10),
}