from fastapi import FastAPI from contextlib import asynccontextmanager import logging from .db import init_db from .webhooks.plane import router as plane_router from .webhooks.gitea import router as gitea_router # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) @asynccontextmanager async def lifespan(app: FastAPI): init_db() # M-1: proper orphan-recovery. # An orphan = an agent_run with no finished_at that is older than the recovery # window. After a uvicorn restart the monitor thread is gone, so its child claude # process (if any) was reparented to init; we cannot kill it by pid (pid is not # persisted). Instead of silently writing exit=-1, we: enumerate each orphan, # mark it exit=-1, log a warning per run, and notify so a human can check/restart. log = logging.getLogger('orchestrator') from .db import get_db conn = get_db() orphan_rows = conn.execute( "SELECT id, task_id, agent FROM agent_runs " "WHERE finished_at IS NULL AND started_at < datetime('now', '-35 minutes')" ).fetchall() for row in orphan_rows: run_id, task_id, agent = row[0], row[1], row[2] conn.execute( "UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-1 WHERE id=?", (run_id,), ) log.warning( f"Orphan run {run_id} (task {task_id}, agent {agent}) recovered — " f"manual check needed (process may have been killed on restart)" ) conn.commit() conn.close() if orphan_rows: try: from .notifications import send_telegram ids = ", ".join(str(r[0]) for r in orphan_rows) send_telegram( f"\u26a0\ufe0f Orchestrator restart: {len(orphan_rows)} orphaned agent run(s) " f"(run_id: {ids}) marked exit=-1. Нужна ручная проверка/перезапуск." ) except Exception: pass log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs") # ORCH-1 (F-2b): queue-recovery. Any job left in 'running' status belongs to a # worker that died on the previous restart -> put it back to 'queued' so the # worker re-picks it up (restart-safe, no lost work). Runs AFTER M-1. from .db import requeue_running_jobs requeued = requeue_running_jobs() if requeued: log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart") # L-2: rotate old per-run logs at startup (best-effort; never fatal). try: import os as _os from .config import settings as _settings from .agents.launcher import prune_run_logs _runs_dir = _os.path.join(_os.path.dirname(_settings.db_path), "runs") _removed = prune_run_logs( _runs_dir, keep_days=_settings.log_keep_days, keep_max=_settings.log_keep_max, ) if _removed: log.info(f"Log rotation: pruned {_removed} old run log(s) from {_runs_dir}") except Exception as e: log.warning(f"Log rotation skipped: {e}") # Start the background job-queue worker (ORCH-1). from .queue_worker import worker worker.start() # ORCH-053: start the stuck-task reconciler AFTER the worker so its active-job # guard sees a fully-initialised queue. Kill-switch: ORCH_RECONCILE_ENABLED. from .reconciler import reconciler reconciler.start() try: yield finally: # Graceful shutdown order mirrors startup in reverse: stop the reconciler # first (it must not enqueue new work while the worker is winding down), # then the worker. Running agents keep going; their jobs are requeued on # next start via queue-recovery if the process dies. reconciler.stop() worker.stop() app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan) app.include_router(plane_router, prefix="/webhook") app.include_router(gitea_router, prefix="/webhook") @app.get("/health") async def health(): return {"status": "ok", "service": "orchestrator"} @app.get("/status") async def status(): from .db import get_db conn = get_db() tasks = conn.execute( "SELECT * FROM tasks WHERE stage != 'done' ORDER BY created_at DESC LIMIT 10" ).fetchall() conn.close() return {"active_tasks": [dict(t) for t in tasks]} @app.get("/queue") async def queue(): """ORCH-1: job-queue observability — status counts + recent jobs.""" from .db import job_status_counts, recent_jobs from .queue_worker import worker from .reconciler import reconciler from . import post_deploy return { "counts": job_status_counts(), "max_concurrency": worker.max_concurrency, "poll_interval": worker.poll_interval, "resilience": worker.status(), "reconcile": reconciler.status(), "post_deploy": post_deploy.status(), "recent": recent_jobs(10), }