QueueWorker gates claims behind preflight and the CircuitBreaker (open -> pause, no CLI calls + Telegram alert; half-open probes one job; closed on recovery). Wires launcher.on_outcome. /queue exposes resilience snapshot.
247 lines
9.4 KiB
Python
247 lines
9.4 KiB
Python
"""ORCH-1 (F-2b): background job-queue worker with resilience layer.
|
|
|
|
A single background thread polls the `jobs` table and spawns agents:
|
|
|
|
while running:
|
|
if breaker.open and not cooled_down: sleep; continue # don't touch CLI
|
|
if not preflight.ok: sleep; continue # CLI/net down -> wait
|
|
while count_running_jobs() < max_concurrency:
|
|
job = claim_next_job() # atomic queued -> running (available_at-gated)
|
|
if not job: break
|
|
launcher.launch_job(job) # spawns claude (Popen) + monitor thread
|
|
sleep(poll_interval)
|
|
|
|
Resilience (ДОПОЛНЕНИЕ):
|
|
A. Preflight — cheap local CLI/net check (cached, no tokens) gates claiming.
|
|
B/C. The launcher classifies failures (transient vs permanent) and applies
|
|
backoff via available_at; the worker only needs to honour available_at
|
|
(claim_next_job does) and react to transient outcomes via the breaker.
|
|
D. Circuit breaker — N consecutive transient failures -> open (pause M minutes,
|
|
no CLI calls, Telegram alert) -> half-open (probe one job) -> closed.
|
|
|
|
Design: plain daemon thread + threading.Event (the launcher already manages its
|
|
own monitor/watchdog threads + blocking Popen).
|
|
"""
|
|
import time
|
|
import logging
|
|
import threading
|
|
|
|
from .config import settings
|
|
from .db import claim_next_job, count_running_jobs
|
|
from .agents.launcher import launcher
|
|
from . import preflight
|
|
|
|
logger = logging.getLogger("orchestrator.queue_worker")
|
|
|
|
|
|
class CircuitBreaker:
|
|
"""Trips after `threshold` consecutive transient failures.
|
|
|
|
States: closed -> (threshold transient) -> open -> (after pause) half-open
|
|
-> (recovered) closed | (transient again) open.
|
|
Thread-safe enough for our single-worker + monitor-thread callbacks (a lock
|
|
guards the counters).
|
|
"""
|
|
|
|
def __init__(self, threshold: int = None, pause_seconds: int = None):
|
|
self.threshold = threshold if threshold is not None else settings.breaker_threshold
|
|
self.pause_seconds = (
|
|
pause_seconds if pause_seconds is not None else settings.breaker_pause_seconds
|
|
)
|
|
self._lock = threading.Lock()
|
|
self.state = "closed" # closed | open | half-open
|
|
self.consecutive_transient = 0
|
|
self.opened_at = 0.0
|
|
self._notify = None # optional callable(message) for alerts
|
|
|
|
def set_notifier(self, fn):
|
|
self._notify = fn
|
|
|
|
def record_transient(self):
|
|
with self._lock:
|
|
self.consecutive_transient += 1
|
|
if self.state == "half-open":
|
|
# Probe failed -> re-open.
|
|
self._open("circuit re-opened: probe job hit transient again")
|
|
elif self.consecutive_transient >= self.threshold and self.state == "closed":
|
|
self._open(
|
|
f"circuit OPEN: {self.consecutive_transient} consecutive "
|
|
f"transient failures; pausing {self.pause_seconds}s (no CLI calls)"
|
|
)
|
|
|
|
def record_recovered(self):
|
|
with self._lock:
|
|
self.consecutive_transient = 0
|
|
if self.state in ("half-open", "open"):
|
|
self.state = "closed"
|
|
logger.info("Circuit CLOSED: recovered")
|
|
|
|
def record_permanent(self):
|
|
# A clean permanent (code-fault) failure breaks the transient streak.
|
|
with self._lock:
|
|
self.consecutive_transient = 0
|
|
|
|
def _open(self, msg: str):
|
|
self.state = "open"
|
|
self.opened_at = time.time()
|
|
logger.warning(msg)
|
|
if self._notify:
|
|
try:
|
|
self._notify(f"\U0001f534 {msg}")
|
|
except Exception:
|
|
pass
|
|
|
|
def allow_claim(self) -> bool:
|
|
"""Return True if the worker may attempt to claim/launch a job now.
|
|
|
|
- closed -> yes.
|
|
- open -> no until pause elapsed; then transition to half-open (yes, one probe).
|
|
- half-open -> yes (the single probe).
|
|
"""
|
|
with self._lock:
|
|
if self.state == "closed":
|
|
return True
|
|
if self.state == "open":
|
|
if (time.time() - self.opened_at) >= self.pause_seconds:
|
|
self.state = "half-open"
|
|
logger.info("Circuit HALF-OPEN: probing one job")
|
|
return True
|
|
return False
|
|
# half-open: allow the probe.
|
|
return True
|
|
|
|
def snapshot(self) -> dict:
|
|
with self._lock:
|
|
remaining = 0
|
|
if self.state == "open":
|
|
remaining = max(0, int(self.pause_seconds - (time.time() - self.opened_at)))
|
|
return {
|
|
"state": self.state,
|
|
"consecutive_transient": self.consecutive_transient,
|
|
"pause_remaining_s": remaining,
|
|
}
|
|
|
|
|
|
class QueueWorker:
|
|
"""Background worker that drains the persistent job queue (with resilience)."""
|
|
|
|
def __init__(self, max_concurrency: int = None, poll_interval: float = None,
|
|
breaker: CircuitBreaker = None):
|
|
self.max_concurrency = (
|
|
max_concurrency if max_concurrency is not None else settings.max_concurrency
|
|
)
|
|
self.poll_interval = (
|
|
poll_interval if poll_interval is not None else settings.queue_poll_interval
|
|
)
|
|
self.breaker = breaker or CircuitBreaker()
|
|
self.last_preflight_ok = True
|
|
self.last_preflight_reason = "not checked"
|
|
self._stop = threading.Event()
|
|
self._thread: threading.Thread | None = None
|
|
|
|
# --- circuit breaker outcome callback wired into the launcher ----------
|
|
def _on_outcome(self, transient: bool, recovered: bool):
|
|
if recovered:
|
|
self.breaker.record_recovered()
|
|
elif transient:
|
|
self.breaker.record_transient()
|
|
else:
|
|
self.breaker.record_permanent()
|
|
|
|
def _drain_once(self):
|
|
"""Claim and launch jobs until concurrency is full or the queue is empty.
|
|
|
|
Gated by the circuit breaker and preflight: if the breaker is open (and
|
|
not yet cooled down) or preflight fails, we do NOT claim — jobs stay
|
|
queued and no CLI/tokens are touched.
|
|
"""
|
|
if not self.breaker.allow_claim():
|
|
return
|
|
ok, reason = preflight.check()
|
|
self.last_preflight_ok = ok
|
|
self.last_preflight_reason = reason
|
|
if not ok:
|
|
logger.info(f"Preflight not ok ({reason}) -> not claiming jobs this tick")
|
|
return
|
|
|
|
# In half-open we only probe a single job, regardless of max_concurrency.
|
|
half_open = self.breaker.snapshot()["state"] == "half-open"
|
|
launched = 0
|
|
while not self._stop.is_set():
|
|
if half_open and launched >= 1:
|
|
return
|
|
if count_running_jobs() >= self.max_concurrency:
|
|
return
|
|
job = claim_next_job()
|
|
if not job:
|
|
return
|
|
launched += 1
|
|
try:
|
|
run_id = launcher.launch_job(job)
|
|
logger.info(
|
|
f"Worker launched job {job['id']} ({job['agent']}, "
|
|
f"repo {job['repo']}) -> run_id={run_id}"
|
|
)
|
|
except Exception as e:
|
|
# Launch itself failed (e.g. repo missing): treat as a permanent
|
|
# launch error so the job does not wedge as 'running' forever.
|
|
logger.error(f"Worker failed to launch job {job['id']}: {e}")
|
|
try:
|
|
from .db import get_job, mark_job
|
|
|
|
j = get_job(job["id"])
|
|
attempts = j.get("attempts", 0) if j else 0
|
|
max_attempts = j.get("max_attempts", 2) if j else 2
|
|
if attempts < max_attempts:
|
|
mark_job(job["id"], "queued", error=f"launch error: {e}")
|
|
else:
|
|
mark_job(job["id"], "failed", error=f"launch error: {e}")
|
|
except Exception:
|
|
pass
|
|
|
|
def _run(self):
|
|
logger.info(
|
|
f"Queue worker started (max_concurrency={self.max_concurrency}, "
|
|
f"poll_interval={self.poll_interval}s, breaker_threshold={self.breaker.threshold})"
|
|
)
|
|
while not self._stop.is_set():
|
|
try:
|
|
self._drain_once()
|
|
except Exception as e:
|
|
logger.error(f"Queue worker loop error: {e}")
|
|
self._stop.wait(self.poll_interval)
|
|
logger.info("Queue worker stopped")
|
|
|
|
def start(self):
|
|
if self._thread and self._thread.is_alive():
|
|
return
|
|
# Wire breaker alerting + launcher outcome callback.
|
|
try:
|
|
from .notifications import send_telegram
|
|
self.breaker.set_notifier(send_telegram)
|
|
except Exception:
|
|
pass
|
|
launcher.on_outcome = self._on_outcome
|
|
self._stop.clear()
|
|
self._thread = threading.Thread(
|
|
target=self._run, name="queue-worker", daemon=True
|
|
)
|
|
self._thread.start()
|
|
|
|
def stop(self, timeout: float = 5.0):
|
|
self._stop.set()
|
|
if self._thread:
|
|
self._thread.join(timeout=timeout)
|
|
|
|
def status(self) -> dict:
|
|
"""Resilience snapshot for /queue."""
|
|
return {
|
|
"breaker": self.breaker.snapshot(),
|
|
"preflight_ok": self.last_preflight_ok,
|
|
"preflight_reason": self.last_preflight_reason,
|
|
}
|
|
|
|
|
|
# Module-level singleton used by the FastAPI lifespan.
|
|
worker = QueueWorker()
|