From b6d4426a4846c465c9027ff1f1e353746f393f10 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Tue, 2 Jun 2026 23:58:44 +0300 Subject: [PATCH] feat(worker): background queue worker + lifespan + queue-recovery + /queue (ORCH-1) queue_worker.QueueWorker drains the queue respecting max_concurrency. main.py lifespan: queue-recovery (requeue running jobs) after M-1 orphan-recovery, starts worker and stops it on shutdown. New GET /queue endpoint (counts + recent jobs). --- src/main.py | 33 +++++++++++++- src/queue_worker.py | 107 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 src/queue_worker.py diff --git a/src/main.py b/src/main.py index f1952e8..62e5ef9 100644 --- a/src/main.py +++ b/src/main.py @@ -51,7 +51,25 @@ async def lifespan(app: FastAPI): except Exception: pass log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs") - yield + + # ORCH-1 (F-2b): queue-recovery. Any job left in 'running' status belongs to a + # worker that died on the previous restart -> put it back to 'queued' so the + # worker re-picks it up (restart-safe, no lost work). Runs AFTER M-1. + from .db import requeue_running_jobs + requeued = requeue_running_jobs() + if requeued: + log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart") + + # Start the background job-queue worker (ORCH-1). + from .queue_worker import worker + worker.start() + + try: + yield + finally: + # Graceful shutdown of the worker (running agents keep going; their jobs + # are requeued on next start via queue-recovery if the process dies). + worker.stop() app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan) @@ -73,3 +91,16 @@ async def status(): ).fetchall() conn.close() return {"active_tasks": [dict(t) for t in tasks]} + + +@app.get("/queue") +async def queue(): + """ORCH-1: job-queue observability — status counts + recent jobs.""" + from .db import job_status_counts, recent_jobs + from .queue_worker import worker + return { + "counts": job_status_counts(), + "max_concurrency": worker.max_concurrency, + "poll_interval": worker.poll_interval, + "recent": recent_jobs(10), + } diff --git a/src/queue_worker.py b/src/queue_worker.py new file mode 100644 index 0000000..20106e9 --- /dev/null +++ b/src/queue_worker.py @@ -0,0 +1,107 @@ +"""ORCH-1 (F-2b): background job-queue worker. + +A single background thread polls the `jobs` table and spawns agents: + + while running: + while count_running_jobs() < max_concurrency: + job = claim_next_job() # atomic queued -> running + if not job: break + launcher.launch_job(job) # spawns claude (Popen) + monitor thread + sleep(poll_interval) + +Design notes +------------ +- We use a plain daemon thread + threading.Event for shutdown rather than an + asyncio task: the launcher already manages its own monitor/watchdog threads and + blocking Popen, so a thread loop is the simplest, most robust fit. +- `launch_job()` is non-blocking: it spawns the process and returns immediately; + the monitor thread updates `jobs.status` to done/queued/failed on completion. + That status change frees a `count_running_jobs()` slot for the next claim. +- Restart-safe: queue-recovery (requeue_running_jobs) runs in main.py lifespan + BEFORE the worker starts, so jobs left 'running' by a dead worker get retried. +""" +import logging +import threading + +from .config import settings +from .db import claim_next_job, count_running_jobs +from .agents.launcher import launcher + +logger = logging.getLogger("orchestrator.queue_worker") + + +class QueueWorker: + """Background worker that drains the persistent job queue.""" + + def __init__(self, max_concurrency: int = None, poll_interval: float = None): + self.max_concurrency = ( + max_concurrency if max_concurrency is not None else settings.max_concurrency + ) + self.poll_interval = ( + poll_interval if poll_interval is not None else settings.queue_poll_interval + ) + self._stop = threading.Event() + self._thread: threading.Thread | None = None + + def _drain_once(self): + """Claim and launch jobs until concurrency is full or the queue is empty.""" + while not self._stop.is_set(): + if count_running_jobs() >= self.max_concurrency: + return + job = claim_next_job() + if not job: + return + try: + run_id = launcher.launch_job(job) + logger.info( + f"Worker launched job {job['id']} ({job['agent']}, " + f"repo {job['repo']}) -> run_id={run_id}" + ) + except Exception as e: + # Launch itself failed (e.g. repo missing): mark the job failed so it + # does not wedge as 'running' forever and block the slot. + logger.error(f"Worker failed to launch job {job['id']}: {e}") + try: + from .db import get_job, mark_job + + j = get_job(job["id"]) + attempts = j.get("attempts", 0) if j else 0 + max_attempts = j.get("max_attempts", 2) if j else 2 + if attempts < max_attempts: + mark_job(job["id"], "queued", error=f"launch error: {e}") + else: + mark_job(job["id"], "failed", error=f"launch error: {e}") + except Exception: + pass + + def _run(self): + logger.info( + f"Queue worker started (max_concurrency={self.max_concurrency}, " + f"poll_interval={self.poll_interval}s)" + ) + while not self._stop.is_set(): + try: + self._drain_once() + except Exception as e: + logger.error(f"Queue worker loop error: {e}") + # Sleep is interruptible by stop event for fast shutdown. + self._stop.wait(self.poll_interval) + logger.info("Queue worker stopped") + + def start(self): + if self._thread and self._thread.is_alive(): + return + self._stop.clear() + self._thread = threading.Thread( + target=self._run, name="queue-worker", daemon=True + ) + self._thread.start() + + def stop(self, timeout: float = 5.0): + self._stop.set() + if self._thread: + self._thread.join(timeout=timeout) + + +# Module-level singleton used by the FastAPI lifespan. +worker = QueueWorker()