Files
orchestrator/src/main.py

76 lines
2.6 KiB
Python

from fastapi import FastAPI
from contextlib import asynccontextmanager
import logging
from .db import init_db
from .webhooks.plane import router as plane_router
from .webhooks.gitea import router as gitea_router
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
# M-1: proper orphan-recovery.
# An orphan = an agent_run with no finished_at that is older than the recovery
# window. After a uvicorn restart the monitor thread is gone, so its child claude
# process (if any) was reparented to init; we cannot kill it by pid (pid is not
# persisted). Instead of silently writing exit=-1, we: enumerate each orphan,
# mark it exit=-1, log a warning per run, and notify so a human can check/restart.
log = logging.getLogger('orchestrator')
from .db import get_db
conn = get_db()
orphan_rows = conn.execute(
"SELECT id, task_id, agent FROM agent_runs "
"WHERE finished_at IS NULL AND started_at < datetime('now', '-35 minutes')"
).fetchall()
for row in orphan_rows:
run_id, task_id, agent = row[0], row[1], row[2]
conn.execute(
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-1 WHERE id=?",
(run_id,),
)
log.warning(
f"Orphan run {run_id} (task {task_id}, agent {agent}) recovered — "
f"manual check needed (process may have been killed on restart)"
)
conn.commit()
conn.close()
if orphan_rows:
try:
from .notifications import send_telegram
ids = ", ".join(str(r[0]) for r in orphan_rows)
send_telegram(
f"\u26a0\ufe0f Orchestrator restart: {len(orphan_rows)} orphaned agent run(s) "
f"(run_id: {ids}) marked exit=-1. Нужна ручная проверка/перезапуск."
)
except Exception:
pass
log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs")
yield
app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
app.include_router(plane_router, prefix="/webhook")
app.include_router(gitea_router, prefix="/webhook")
@app.get("/health")
async def health():
return {"status": "ok", "service": "orchestrator"}
@app.get("/status")
async def status():
from .db import get_db
conn = get_db()
tasks = conn.execute(
"SELECT * FROM tasks WHERE stage != 'done' ORDER BY created_at DESC LIMIT 10"
).fetchall()
conn.close()
return {"active_tasks": [dict(t) for t in tasks]}