feat(worker): preflight gate + circuit breaker + /queue resilience (ORCH-1)
QueueWorker gates claims behind preflight and the CircuitBreaker (open -> pause, no CLI calls + Telegram alert; half-open probes one job; closed on recovery). Wires launcher.on_outcome. /queue exposes resilience snapshot.
This commit is contained in:
@@ -102,5 +102,6 @@ async def queue():
|
||||
"counts": job_status_counts(),
|
||||
"max_concurrency": worker.max_concurrency,
|
||||
"poll_interval": worker.poll_interval,
|
||||
"resilience": worker.status(),
|
||||
"recent": recent_jobs(10),
|
||||
}
|
||||
|
||||
@@ -1,56 +1,181 @@
|
||||
"""ORCH-1 (F-2b): background job-queue worker.
|
||||
"""ORCH-1 (F-2b): background job-queue worker with resilience layer.
|
||||
|
||||
A single background thread polls the `jobs` table and spawns agents:
|
||||
|
||||
while running:
|
||||
if breaker.open and not cooled_down: sleep; continue # don't touch CLI
|
||||
if not preflight.ok: sleep; continue # CLI/net down -> wait
|
||||
while count_running_jobs() < max_concurrency:
|
||||
job = claim_next_job() # atomic queued -> running
|
||||
job = claim_next_job() # atomic queued -> running (available_at-gated)
|
||||
if not job: break
|
||||
launcher.launch_job(job) # spawns claude (Popen) + monitor thread
|
||||
launcher.launch_job(job) # spawns claude (Popen) + monitor thread
|
||||
sleep(poll_interval)
|
||||
|
||||
Design notes
|
||||
------------
|
||||
- We use a plain daemon thread + threading.Event for shutdown rather than an
|
||||
asyncio task: the launcher already manages its own monitor/watchdog threads and
|
||||
blocking Popen, so a thread loop is the simplest, most robust fit.
|
||||
- `launch_job()` is non-blocking: it spawns the process and returns immediately;
|
||||
the monitor thread updates `jobs.status` to done/queued/failed on completion.
|
||||
That status change frees a `count_running_jobs()` slot for the next claim.
|
||||
- Restart-safe: queue-recovery (requeue_running_jobs) runs in main.py lifespan
|
||||
BEFORE the worker starts, so jobs left 'running' by a dead worker get retried.
|
||||
Resilience (ДОПОЛНЕНИЕ):
|
||||
A. Preflight — cheap local CLI/net check (cached, no tokens) gates claiming.
|
||||
B/C. The launcher classifies failures (transient vs permanent) and applies
|
||||
backoff via available_at; the worker only needs to honour available_at
|
||||
(claim_next_job does) and react to transient outcomes via the breaker.
|
||||
D. Circuit breaker — N consecutive transient failures -> open (pause M minutes,
|
||||
no CLI calls, Telegram alert) -> half-open (probe one job) -> closed.
|
||||
|
||||
Design: plain daemon thread + threading.Event (the launcher already manages its
|
||||
own monitor/watchdog threads + blocking Popen).
|
||||
"""
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from .config import settings
|
||||
from .db import claim_next_job, count_running_jobs
|
||||
from .agents.launcher import launcher
|
||||
from . import preflight
|
||||
|
||||
logger = logging.getLogger("orchestrator.queue_worker")
|
||||
|
||||
|
||||
class QueueWorker:
|
||||
"""Background worker that drains the persistent job queue."""
|
||||
class CircuitBreaker:
|
||||
"""Trips after `threshold` consecutive transient failures.
|
||||
|
||||
def __init__(self, max_concurrency: int = None, poll_interval: float = None):
|
||||
States: closed -> (threshold transient) -> open -> (after pause) half-open
|
||||
-> (recovered) closed | (transient again) open.
|
||||
Thread-safe enough for our single-worker + monitor-thread callbacks (a lock
|
||||
guards the counters).
|
||||
"""
|
||||
|
||||
def __init__(self, threshold: int = None, pause_seconds: int = None):
|
||||
self.threshold = threshold if threshold is not None else settings.breaker_threshold
|
||||
self.pause_seconds = (
|
||||
pause_seconds if pause_seconds is not None else settings.breaker_pause_seconds
|
||||
)
|
||||
self._lock = threading.Lock()
|
||||
self.state = "closed" # closed | open | half-open
|
||||
self.consecutive_transient = 0
|
||||
self.opened_at = 0.0
|
||||
self._notify = None # optional callable(message) for alerts
|
||||
|
||||
def set_notifier(self, fn):
|
||||
self._notify = fn
|
||||
|
||||
def record_transient(self):
|
||||
with self._lock:
|
||||
self.consecutive_transient += 1
|
||||
if self.state == "half-open":
|
||||
# Probe failed -> re-open.
|
||||
self._open("circuit re-opened: probe job hit transient again")
|
||||
elif self.consecutive_transient >= self.threshold and self.state == "closed":
|
||||
self._open(
|
||||
f"circuit OPEN: {self.consecutive_transient} consecutive "
|
||||
f"transient failures; pausing {self.pause_seconds}s (no CLI calls)"
|
||||
)
|
||||
|
||||
def record_recovered(self):
|
||||
with self._lock:
|
||||
self.consecutive_transient = 0
|
||||
if self.state in ("half-open", "open"):
|
||||
self.state = "closed"
|
||||
logger.info("Circuit CLOSED: recovered")
|
||||
|
||||
def record_permanent(self):
|
||||
# A clean permanent (code-fault) failure breaks the transient streak.
|
||||
with self._lock:
|
||||
self.consecutive_transient = 0
|
||||
|
||||
def _open(self, msg: str):
|
||||
self.state = "open"
|
||||
self.opened_at = time.time()
|
||||
logger.warning(msg)
|
||||
if self._notify:
|
||||
try:
|
||||
self._notify(f"\U0001f534 {msg}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def allow_claim(self) -> bool:
|
||||
"""Return True if the worker may attempt to claim/launch a job now.
|
||||
|
||||
- closed -> yes.
|
||||
- open -> no until pause elapsed; then transition to half-open (yes, one probe).
|
||||
- half-open -> yes (the single probe).
|
||||
"""
|
||||
with self._lock:
|
||||
if self.state == "closed":
|
||||
return True
|
||||
if self.state == "open":
|
||||
if (time.time() - self.opened_at) >= self.pause_seconds:
|
||||
self.state = "half-open"
|
||||
logger.info("Circuit HALF-OPEN: probing one job")
|
||||
return True
|
||||
return False
|
||||
# half-open: allow the probe.
|
||||
return True
|
||||
|
||||
def snapshot(self) -> dict:
|
||||
with self._lock:
|
||||
remaining = 0
|
||||
if self.state == "open":
|
||||
remaining = max(0, int(self.pause_seconds - (time.time() - self.opened_at)))
|
||||
return {
|
||||
"state": self.state,
|
||||
"consecutive_transient": self.consecutive_transient,
|
||||
"pause_remaining_s": remaining,
|
||||
}
|
||||
|
||||
|
||||
class QueueWorker:
|
||||
"""Background worker that drains the persistent job queue (with resilience)."""
|
||||
|
||||
def __init__(self, max_concurrency: int = None, poll_interval: float = None,
|
||||
breaker: CircuitBreaker = None):
|
||||
self.max_concurrency = (
|
||||
max_concurrency if max_concurrency is not None else settings.max_concurrency
|
||||
)
|
||||
self.poll_interval = (
|
||||
poll_interval if poll_interval is not None else settings.queue_poll_interval
|
||||
)
|
||||
self.breaker = breaker or CircuitBreaker()
|
||||
self.last_preflight_ok = True
|
||||
self.last_preflight_reason = "not checked"
|
||||
self._stop = threading.Event()
|
||||
self._thread: threading.Thread | None = None
|
||||
|
||||
# --- circuit breaker outcome callback wired into the launcher ----------
|
||||
def _on_outcome(self, transient: bool, recovered: bool):
|
||||
if recovered:
|
||||
self.breaker.record_recovered()
|
||||
elif transient:
|
||||
self.breaker.record_transient()
|
||||
else:
|
||||
self.breaker.record_permanent()
|
||||
|
||||
def _drain_once(self):
|
||||
"""Claim and launch jobs until concurrency is full or the queue is empty."""
|
||||
"""Claim and launch jobs until concurrency is full or the queue is empty.
|
||||
|
||||
Gated by the circuit breaker and preflight: if the breaker is open (and
|
||||
not yet cooled down) or preflight fails, we do NOT claim — jobs stay
|
||||
queued and no CLI/tokens are touched.
|
||||
"""
|
||||
if not self.breaker.allow_claim():
|
||||
return
|
||||
ok, reason = preflight.check()
|
||||
self.last_preflight_ok = ok
|
||||
self.last_preflight_reason = reason
|
||||
if not ok:
|
||||
logger.info(f"Preflight not ok ({reason}) -> not claiming jobs this tick")
|
||||
return
|
||||
|
||||
# In half-open we only probe a single job, regardless of max_concurrency.
|
||||
half_open = self.breaker.snapshot()["state"] == "half-open"
|
||||
launched = 0
|
||||
while not self._stop.is_set():
|
||||
if half_open and launched >= 1:
|
||||
return
|
||||
if count_running_jobs() >= self.max_concurrency:
|
||||
return
|
||||
job = claim_next_job()
|
||||
if not job:
|
||||
return
|
||||
launched += 1
|
||||
try:
|
||||
run_id = launcher.launch_job(job)
|
||||
logger.info(
|
||||
@@ -58,8 +183,8 @@ class QueueWorker:
|
||||
f"repo {job['repo']}) -> run_id={run_id}"
|
||||
)
|
||||
except Exception as e:
|
||||
# Launch itself failed (e.g. repo missing): mark the job failed so it
|
||||
# does not wedge as 'running' forever and block the slot.
|
||||
# Launch itself failed (e.g. repo missing): treat as a permanent
|
||||
# launch error so the job does not wedge as 'running' forever.
|
||||
logger.error(f"Worker failed to launch job {job['id']}: {e}")
|
||||
try:
|
||||
from .db import get_job, mark_job
|
||||
@@ -77,20 +202,26 @@ class QueueWorker:
|
||||
def _run(self):
|
||||
logger.info(
|
||||
f"Queue worker started (max_concurrency={self.max_concurrency}, "
|
||||
f"poll_interval={self.poll_interval}s)"
|
||||
f"poll_interval={self.poll_interval}s, breaker_threshold={self.breaker.threshold})"
|
||||
)
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
self._drain_once()
|
||||
except Exception as e:
|
||||
logger.error(f"Queue worker loop error: {e}")
|
||||
# Sleep is interruptible by stop event for fast shutdown.
|
||||
self._stop.wait(self.poll_interval)
|
||||
logger.info("Queue worker stopped")
|
||||
|
||||
def start(self):
|
||||
if self._thread and self._thread.is_alive():
|
||||
return
|
||||
# Wire breaker alerting + launcher outcome callback.
|
||||
try:
|
||||
from .notifications import send_telegram
|
||||
self.breaker.set_notifier(send_telegram)
|
||||
except Exception:
|
||||
pass
|
||||
launcher.on_outcome = self._on_outcome
|
||||
self._stop.clear()
|
||||
self._thread = threading.Thread(
|
||||
target=self._run, name="queue-worker", daemon=True
|
||||
@@ -102,6 +233,14 @@ class QueueWorker:
|
||||
if self._thread:
|
||||
self._thread.join(timeout=timeout)
|
||||
|
||||
def status(self) -> dict:
|
||||
"""Resilience snapshot for /queue."""
|
||||
return {
|
||||
"breaker": self.breaker.snapshot(),
|
||||
"preflight_ok": self.last_preflight_ok,
|
||||
"preflight_reason": self.last_preflight_reason,
|
||||
}
|
||||
|
||||
|
||||
# Module-level singleton used by the FastAPI lifespan.
|
||||
worker = QueueWorker()
|
||||
|
||||
Reference in New Issue
Block a user