"""ORCH-1 (F-2b): background job-queue worker with resilience layer. A single background thread polls the `jobs` table and spawns agents: while running: if breaker.open and not cooled_down: sleep; continue # don't touch CLI if not preflight.ok: sleep; continue # CLI/net down -> wait while count_running_jobs() < max_concurrency: job = claim_next_job() # atomic queued -> running (available_at-gated) if not job: break launcher.launch_job(job) # spawns claude (Popen) + monitor thread sleep(poll_interval) Resilience (ДОПОЛНЕНИЕ): A. Preflight — cheap local CLI/net check (cached, no tokens) gates claiming. B/C. The launcher classifies failures (transient vs permanent) and applies backoff via available_at; the worker only needs to honour available_at (claim_next_job does) and react to transient outcomes via the breaker. D. Circuit breaker — N consecutive transient failures -> open (pause M minutes, no CLI calls, Telegram alert) -> half-open (probe one job) -> closed. Design: plain daemon thread + threading.Event (the launcher already manages its own monitor/watchdog threads + blocking Popen). """ import time import logging import threading from .config import settings from .db import claim_next_job, count_running_jobs from .agents.launcher import launcher from . import preflight logger = logging.getLogger("orchestrator.queue_worker") class CircuitBreaker: """Trips after `threshold` consecutive transient failures. States: closed -> (threshold transient) -> open -> (after pause) half-open -> (recovered) closed | (transient again) open. Thread-safe enough for our single-worker + monitor-thread callbacks (a lock guards the counters). """ def __init__(self, threshold: int = None, pause_seconds: int = None): self.threshold = threshold if threshold is not None else settings.breaker_threshold self.pause_seconds = ( pause_seconds if pause_seconds is not None else settings.breaker_pause_seconds ) self._lock = threading.Lock() self.state = "closed" # closed | open | half-open self.consecutive_transient = 0 self.opened_at = 0.0 self._notify = None # optional callable(message) for alerts def set_notifier(self, fn): self._notify = fn def record_transient(self): with self._lock: self.consecutive_transient += 1 if self.state == "half-open": # Probe failed -> re-open. self._open("circuit re-opened: probe job hit transient again") elif self.consecutive_transient >= self.threshold and self.state == "closed": self._open( f"circuit OPEN: {self.consecutive_transient} consecutive " f"transient failures; pausing {self.pause_seconds}s (no CLI calls)" ) def record_recovered(self): with self._lock: self.consecutive_transient = 0 if self.state in ("half-open", "open"): self.state = "closed" logger.info("Circuit CLOSED: recovered") def record_permanent(self): # A clean permanent (code-fault) failure breaks the transient streak. with self._lock: self.consecutive_transient = 0 def _open(self, msg: str): self.state = "open" self.opened_at = time.time() logger.warning(msg) if self._notify: try: self._notify(f"\U0001f534 {msg}") except Exception: pass def allow_claim(self) -> bool: """Return True if the worker may attempt to claim/launch a job now. - closed -> yes. - open -> no until pause elapsed; then transition to half-open (yes, one probe). - half-open -> yes (the single probe). """ with self._lock: if self.state == "closed": return True if self.state == "open": if (time.time() - self.opened_at) >= self.pause_seconds: self.state = "half-open" logger.info("Circuit HALF-OPEN: probing one job") return True return False # half-open: allow the probe. return True def snapshot(self) -> dict: with self._lock: remaining = 0 if self.state == "open": remaining = max(0, int(self.pause_seconds - (time.time() - self.opened_at))) return { "state": self.state, "consecutive_transient": self.consecutive_transient, "pause_remaining_s": remaining, } class QueueWorker: """Background worker that drains the persistent job queue (with resilience).""" def __init__(self, max_concurrency: int = None, poll_interval: float = None, breaker: CircuitBreaker = None): self.max_concurrency = ( max_concurrency if max_concurrency is not None else settings.max_concurrency ) self.poll_interval = ( poll_interval if poll_interval is not None else settings.queue_poll_interval ) self.breaker = breaker or CircuitBreaker() self.last_preflight_ok = True self.last_preflight_reason = "not checked" self._stop = threading.Event() self._thread: threading.Thread | None = None # --- circuit breaker outcome callback wired into the launcher ---------- def _on_outcome(self, transient: bool, recovered: bool): if recovered: self.breaker.record_recovered() elif transient: self.breaker.record_transient() else: self.breaker.record_permanent() def _drain_once(self): """Claim and launch jobs until concurrency is full or the queue is empty. Gated by the circuit breaker and preflight: if the breaker is open (and not yet cooled down) or preflight fails, we do NOT claim — jobs stay queued and no CLI/tokens are touched. """ if not self.breaker.allow_claim(): return ok, reason = preflight.check() self.last_preflight_ok = ok self.last_preflight_reason = reason if not ok: logger.info(f"Preflight not ok ({reason}) -> not claiming jobs this tick") return # In half-open we only probe a single job, regardless of max_concurrency. half_open = self.breaker.snapshot()["state"] == "half-open" launched = 0 while not self._stop.is_set(): if half_open and launched >= 1: return if count_running_jobs() >= self.max_concurrency: return job = claim_next_job() if not job: return launched += 1 try: run_id = launcher.launch_job(job) logger.info( f"Worker launched job {job['id']} ({job['agent']}, " f"repo {job['repo']}) -> run_id={run_id}" ) except Exception as e: # Launch itself failed (e.g. repo missing): treat as a permanent # launch error so the job does not wedge as 'running' forever. logger.error(f"Worker failed to launch job {job['id']}: {e}") try: from .db import get_job, mark_job j = get_job(job["id"]) attempts = j.get("attempts", 0) if j else 0 max_attempts = j.get("max_attempts", 2) if j else 2 if attempts < max_attempts: mark_job(job["id"], "queued", error=f"launch error: {e}") else: mark_job(job["id"], "failed", error=f"launch error: {e}") except Exception: pass def _run(self): logger.info( f"Queue worker started (max_concurrency={self.max_concurrency}, " f"poll_interval={self.poll_interval}s, breaker_threshold={self.breaker.threshold})" ) while not self._stop.is_set(): try: self._drain_once() except Exception as e: logger.error(f"Queue worker loop error: {e}") self._stop.wait(self.poll_interval) logger.info("Queue worker stopped") def start(self): if self._thread and self._thread.is_alive(): return # Wire breaker alerting + launcher outcome callback. try: from .notifications import send_telegram self.breaker.set_notifier(send_telegram) except Exception: pass launcher.on_outcome = self._on_outcome self._stop.clear() self._thread = threading.Thread( target=self._run, name="queue-worker", daemon=True ) self._thread.start() def stop(self, timeout: float = 5.0): self._stop.set() if self._thread: self._thread.join(timeout=timeout) def status(self) -> dict: """Resilience snapshot for /queue.""" return { "breaker": self.breaker.snapshot(), "preflight_ok": self.last_preflight_ok, "preflight_reason": self.last_preflight_reason, } # Module-level singleton used by the FastAPI lifespan. worker = QueueWorker()