from fastapi import FastAPI from contextlib import asynccontextmanager import logging from .db import init_db from .webhooks.plane import router as plane_router from .webhooks.gitea import router as gitea_router # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) @asynccontextmanager async def lifespan(app: FastAPI): init_db() # M-1: proper orphan-recovery. # An orphan = an agent_run with no finished_at that is older than the recovery # window. After a uvicorn restart the monitor thread is gone, so its child claude # process (if any) was reparented to init; we cannot kill it by pid (pid is not # persisted). Instead of silently writing exit=-1, we: enumerate each orphan, # mark it exit=-1, log a warning per run, and notify so a human can check/restart. log = logging.getLogger('orchestrator') from .db import get_db conn = get_db() orphan_rows = conn.execute( "SELECT id, task_id, agent FROM agent_runs " "WHERE finished_at IS NULL AND started_at < datetime('now', '-35 minutes')" ).fetchall() for row in orphan_rows: run_id, task_id, agent = row[0], row[1], row[2] conn.execute( "UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-1 WHERE id=?", (run_id,), ) log.warning( f"Orphan run {run_id} (task {task_id}, agent {agent}) recovered — " f"manual check needed (process may have been killed on restart)" ) conn.commit() conn.close() if orphan_rows: try: from .notifications import send_telegram ids = ", ".join(str(r[0]) for r in orphan_rows) send_telegram( f"\u26a0\ufe0f Orchestrator restart: {len(orphan_rows)} orphaned agent run(s) " f"(run_id: {ids}) marked exit=-1. Нужна ручная проверка/перезапуск." ) except Exception: pass log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs") # ORCH-1 (F-2b): queue-recovery. Any job left in 'running' status belongs to a # worker that died on the previous restart -> put it back to 'queued' so the # worker re-picks it up (restart-safe, no lost work). Runs AFTER M-1. from .db import requeue_running_jobs requeued = requeue_running_jobs() if requeued: log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart") # ORCH-065: proactive startup reclaim of dead/stale merge-leases, next to the # queue-recovery above. A lease held by the previous (now dead) process pid is # released at once instead of waiting for the TTL / a foreign acquire so the # next merge is not blocked. Conditional (merge_gate_repos / self-hosting) and # gated by ORCH_LEASE_RECLAIM_ENABLED; never raises. try: from .job_reaper import reclaim_all_stale_leases reclaimed = reclaim_all_stale_leases() if reclaimed: log.warning(f"Startup lease-reclaim: reclaimed {reclaimed} stale merge-lease(s)") except Exception as e: log.warning(f"Startup lease-reclaim skipped: {e}") # L-2: rotate old per-run logs at startup (best-effort; never fatal). try: import os as _os from .config import settings as _settings from .agents.launcher import prune_run_logs _runs_dir = _os.path.join(_os.path.dirname(_settings.db_path), "runs") _removed = prune_run_logs( _runs_dir, keep_days=_settings.log_keep_days, keep_max=_settings.log_keep_max, ) if _removed: log.info(f"Log rotation: pruned {_removed} old run log(s) from {_runs_dir}") except Exception as e: log.warning(f"Log rotation skipped: {e}") # Start the background job-queue worker (ORCH-1). from .queue_worker import worker worker.start() # ORCH-053: start the stuck-task reconciler AFTER the worker so its active-job # guard sees a fully-initialised queue. Kill-switch: ORCH_RECONCILE_ENABLED. from .reconciler import reconciler reconciler.start() # ORCH-065: start the job-reaper LAST (after requeue_running_jobs + the worker # + the reconciler) so its atomic status='running' guard never races the # startup requeue. It reaps zombie jobs and periodically reclaims stale # merge-leases. Kill-switch: ORCH_REAPER_ENABLED. from .job_reaper import reaper reaper.start() # ORCH-063: start the disk-watchdog LAST (after the reaper). It is independent # of the queue/DB — it only reads host-FS fill and Telegram-alerts at >= # threshold — so the order is not critical, but we follow the daemon # convention. Honours the kill-switch ORCH_DISK_MONITOR_ENABLED (start() is a # no-op when disabled, so behaviour is 1:1 as before). from .disk_watchdog import disk_watchdog disk_watchdog.start() try: yield finally: # ORCH-063: stop the disk-watchdog first (reverse of startup). disk_watchdog.stop() # Graceful shutdown order mirrors startup in reverse: stop the reaper # first, then the reconciler (it must not enqueue new work while the # worker is winding down), then the worker. Running agents keep going; # their jobs are requeued on next start via queue-recovery if the # process dies. reaper.stop() reconciler.stop() worker.stop() app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan) app.include_router(plane_router, prefix="/webhook") app.include_router(gitea_router, prefix="/webhook") @app.get("/health") async def health(): return {"status": "ok", "service": "orchestrator"} @app.get("/status") async def status(): from .db import get_db conn = get_db() tasks = conn.execute( "SELECT * FROM tasks WHERE stage != 'done' ORDER BY created_at DESC LIMIT 10" ).fetchall() conn.close() return {"active_tasks": [dict(t) for t in tasks]} @app.get("/queue") async def queue(): """ORCH-1: job-queue observability — status counts + recent jobs.""" from .db import job_status_counts, recent_jobs from .queue_worker import worker from .reconciler import reconciler from .job_reaper import reaper from . import post_deploy from . import merge_gate from . import task_deps from . import serial_gate from . import labels from .disk_watchdog import disk_watchdog return { "counts": job_status_counts(), "max_concurrency": worker.max_concurrency, "poll_interval": worker.poll_interval, "resilience": worker.status(), "reconcile": reconciler.status(), "reaper": reaper.status(), "post_deploy": post_deploy.status(), "merge_verify": merge_gate.merge_verify_status(), # ORCH-026 (G-2): declarative task-dependency observability (read-only, # NOT a source of truth) — declared edges, blocked tasks, detected cycle. "task_deps": task_deps.snapshot(), # ORCH-088 (D9 / AC-10): per-repo serial-gate observability (read-only) — # active task, queued/waiting analyst-jobs, freeze state. Additive block. "serial_gate": serial_gate.snapshot(), # ORCH-089 (D7): auto-mode-by-label observability (read-only) — kill-switch, # label names, scope. Additive block. "auto_labels": labels.snapshot(), # ORCH-063 (FR-6 / AC-7): disk-watchdog observability (read-only) — # enabled, threshold, interval, last measurement per host-path. Additive # block; never-raise (status() returns {"enabled": ...} minimum on error). "disk_monitor": disk_watchdog.status(), "recent": recent_jobs(10), } @app.post("/serial-gate/unfreeze") async def serial_gate_unfreeze(repo: str = ""): """ORCH-088 (FR-5, ADR-001 D4): manually clear a per-repo rollback-freeze. A freeze set by the post-deploy monitor (DEGRADED) keeps the serial gate CLOSED for the repo until an operator explicitly clears it here. Idempotent: clearing an already-clear repo reports ``cleared: 0``. The next queued analyst-job is then claimable on the next scheduler tick (no restart needed). Alternative manual path (documented in README): ``UPDATE repo_freeze SET cleared_at=datetime('now') WHERE repo=? AND cleared_at IS NULL``. """ from . import serial_gate if not repo or not repo.strip(): return {"ok": False, "error": "missing 'repo'", "repo": repo, "cleared": 0} repo = repo.strip() cleared = serial_gate.clear_repo_freeze(repo) frozen = serial_gate.is_repo_frozen(repo) if cleared: try: from .notifications import send_telegram send_telegram( f"🔥 {repo}: пакет РАЗМОРОЖЕН вручную ({cleared} запис(ь/и) снято). " f"Следующая задача репо стартует на ближайшем цикле." ) except Exception: pass return {"ok": True, "repo": repo, "cleared": cleared, "frozen": frozen}