feat(worker): background queue worker + lifespan + queue-recovery + /queue (ORCH-1)
queue_worker.QueueWorker drains the queue respecting max_concurrency. main.py lifespan: queue-recovery (requeue running jobs) after M-1 orphan-recovery, starts worker and stops it on shutdown. New GET /queue endpoint (counts + recent jobs).
This commit is contained in:
33
src/main.py
33
src/main.py
@@ -51,7 +51,25 @@ async def lifespan(app: FastAPI):
|
||||
except Exception:
|
||||
pass
|
||||
log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs")
|
||||
yield
|
||||
|
||||
# ORCH-1 (F-2b): queue-recovery. Any job left in 'running' status belongs to a
|
||||
# worker that died on the previous restart -> put it back to 'queued' so the
|
||||
# worker re-picks it up (restart-safe, no lost work). Runs AFTER M-1.
|
||||
from .db import requeue_running_jobs
|
||||
requeued = requeue_running_jobs()
|
||||
if requeued:
|
||||
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
|
||||
|
||||
# Start the background job-queue worker (ORCH-1).
|
||||
from .queue_worker import worker
|
||||
worker.start()
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
# Graceful shutdown of the worker (running agents keep going; their jobs
|
||||
# are requeued on next start via queue-recovery if the process dies).
|
||||
worker.stop()
|
||||
|
||||
|
||||
app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
|
||||
@@ -73,3 +91,16 @@ async def status():
|
||||
).fetchall()
|
||||
conn.close()
|
||||
return {"active_tasks": [dict(t) for t in tasks]}
|
||||
|
||||
|
||||
@app.get("/queue")
|
||||
async def queue():
|
||||
"""ORCH-1: job-queue observability — status counts + recent jobs."""
|
||||
from .db import job_status_counts, recent_jobs
|
||||
from .queue_worker import worker
|
||||
return {
|
||||
"counts": job_status_counts(),
|
||||
"max_concurrency": worker.max_concurrency,
|
||||
"poll_interval": worker.poll_interval,
|
||||
"recent": recent_jobs(10),
|
||||
}
|
||||
|
||||
107
src/queue_worker.py
Normal file
107
src/queue_worker.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""ORCH-1 (F-2b): background job-queue worker.
|
||||
|
||||
A single background thread polls the `jobs` table and spawns agents:
|
||||
|
||||
while running:
|
||||
while count_running_jobs() < max_concurrency:
|
||||
job = claim_next_job() # atomic queued -> running
|
||||
if not job: break
|
||||
launcher.launch_job(job) # spawns claude (Popen) + monitor thread
|
||||
sleep(poll_interval)
|
||||
|
||||
Design notes
|
||||
------------
|
||||
- We use a plain daemon thread + threading.Event for shutdown rather than an
|
||||
asyncio task: the launcher already manages its own monitor/watchdog threads and
|
||||
blocking Popen, so a thread loop is the simplest, most robust fit.
|
||||
- `launch_job()` is non-blocking: it spawns the process and returns immediately;
|
||||
the monitor thread updates `jobs.status` to done/queued/failed on completion.
|
||||
That status change frees a `count_running_jobs()` slot for the next claim.
|
||||
- Restart-safe: queue-recovery (requeue_running_jobs) runs in main.py lifespan
|
||||
BEFORE the worker starts, so jobs left 'running' by a dead worker get retried.
|
||||
"""
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from .config import settings
|
||||
from .db import claim_next_job, count_running_jobs
|
||||
from .agents.launcher import launcher
|
||||
|
||||
logger = logging.getLogger("orchestrator.queue_worker")
|
||||
|
||||
|
||||
class QueueWorker:
|
||||
"""Background worker that drains the persistent job queue."""
|
||||
|
||||
def __init__(self, max_concurrency: int = None, poll_interval: float = None):
|
||||
self.max_concurrency = (
|
||||
max_concurrency if max_concurrency is not None else settings.max_concurrency
|
||||
)
|
||||
self.poll_interval = (
|
||||
poll_interval if poll_interval is not None else settings.queue_poll_interval
|
||||
)
|
||||
self._stop = threading.Event()
|
||||
self._thread: threading.Thread | None = None
|
||||
|
||||
def _drain_once(self):
|
||||
"""Claim and launch jobs until concurrency is full or the queue is empty."""
|
||||
while not self._stop.is_set():
|
||||
if count_running_jobs() >= self.max_concurrency:
|
||||
return
|
||||
job = claim_next_job()
|
||||
if not job:
|
||||
return
|
||||
try:
|
||||
run_id = launcher.launch_job(job)
|
||||
logger.info(
|
||||
f"Worker launched job {job['id']} ({job['agent']}, "
|
||||
f"repo {job['repo']}) -> run_id={run_id}"
|
||||
)
|
||||
except Exception as e:
|
||||
# Launch itself failed (e.g. repo missing): mark the job failed so it
|
||||
# does not wedge as 'running' forever and block the slot.
|
||||
logger.error(f"Worker failed to launch job {job['id']}: {e}")
|
||||
try:
|
||||
from .db import get_job, mark_job
|
||||
|
||||
j = get_job(job["id"])
|
||||
attempts = j.get("attempts", 0) if j else 0
|
||||
max_attempts = j.get("max_attempts", 2) if j else 2
|
||||
if attempts < max_attempts:
|
||||
mark_job(job["id"], "queued", error=f"launch error: {e}")
|
||||
else:
|
||||
mark_job(job["id"], "failed", error=f"launch error: {e}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _run(self):
|
||||
logger.info(
|
||||
f"Queue worker started (max_concurrency={self.max_concurrency}, "
|
||||
f"poll_interval={self.poll_interval}s)"
|
||||
)
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
self._drain_once()
|
||||
except Exception as e:
|
||||
logger.error(f"Queue worker loop error: {e}")
|
||||
# Sleep is interruptible by stop event for fast shutdown.
|
||||
self._stop.wait(self.poll_interval)
|
||||
logger.info("Queue worker stopped")
|
||||
|
||||
def start(self):
|
||||
if self._thread and self._thread.is_alive():
|
||||
return
|
||||
self._stop.clear()
|
||||
self._thread = threading.Thread(
|
||||
target=self._run, name="queue-worker", daemon=True
|
||||
)
|
||||
self._thread.start()
|
||||
|
||||
def stop(self, timeout: float = 5.0):
|
||||
self._stop.set()
|
||||
if self._thread:
|
||||
self._thread.join(timeout=timeout)
|
||||
|
||||
|
||||
# Module-level singleton used by the FastAPI lifespan.
|
||||
worker = QueueWorker()
|
||||
Reference in New Issue
Block a user