From f314ae09e597e7d7b6e8f105b33b77045f55e875 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 00:12:17 +0300 Subject: [PATCH] feat(worker): preflight gate + circuit breaker + /queue resilience (ORCH-1) QueueWorker gates claims behind preflight and the CircuitBreaker (open -> pause, no CLI calls + Telegram alert; half-open probes one job; closed on recovery). Wires launcher.on_outcome. /queue exposes resilience snapshot. --- src/main.py | 1 + src/queue_worker.py | 181 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 161 insertions(+), 21 deletions(-) diff --git a/src/main.py b/src/main.py index 62e5ef9..1cebb7a 100644 --- a/src/main.py +++ b/src/main.py @@ -102,5 +102,6 @@ async def queue(): "counts": job_status_counts(), "max_concurrency": worker.max_concurrency, "poll_interval": worker.poll_interval, + "resilience": worker.status(), "recent": recent_jobs(10), } diff --git a/src/queue_worker.py b/src/queue_worker.py index 20106e9..ab3984e 100644 --- a/src/queue_worker.py +++ b/src/queue_worker.py @@ -1,56 +1,181 @@ -"""ORCH-1 (F-2b): background job-queue worker. +"""ORCH-1 (F-2b): background job-queue worker with resilience layer. A single background thread polls the `jobs` table and spawns agents: while running: + if breaker.open and not cooled_down: sleep; continue # don't touch CLI + if not preflight.ok: sleep; continue # CLI/net down -> wait while count_running_jobs() < max_concurrency: - job = claim_next_job() # atomic queued -> running + job = claim_next_job() # atomic queued -> running (available_at-gated) if not job: break - launcher.launch_job(job) # spawns claude (Popen) + monitor thread + launcher.launch_job(job) # spawns claude (Popen) + monitor thread sleep(poll_interval) -Design notes ------------- -- We use a plain daemon thread + threading.Event for shutdown rather than an - asyncio task: the launcher already manages its own monitor/watchdog threads and - blocking Popen, so a thread loop is the simplest, most robust fit. -- `launch_job()` is non-blocking: it spawns the process and returns immediately; - the monitor thread updates `jobs.status` to done/queued/failed on completion. - That status change frees a `count_running_jobs()` slot for the next claim. -- Restart-safe: queue-recovery (requeue_running_jobs) runs in main.py lifespan - BEFORE the worker starts, so jobs left 'running' by a dead worker get retried. +Resilience (ДОПОЛНЕНИЕ): + A. Preflight — cheap local CLI/net check (cached, no tokens) gates claiming. + B/C. The launcher classifies failures (transient vs permanent) and applies + backoff via available_at; the worker only needs to honour available_at + (claim_next_job does) and react to transient outcomes via the breaker. + D. Circuit breaker — N consecutive transient failures -> open (pause M minutes, + no CLI calls, Telegram alert) -> half-open (probe one job) -> closed. + +Design: plain daemon thread + threading.Event (the launcher already manages its +own monitor/watchdog threads + blocking Popen). """ +import time import logging import threading from .config import settings from .db import claim_next_job, count_running_jobs from .agents.launcher import launcher +from . import preflight logger = logging.getLogger("orchestrator.queue_worker") -class QueueWorker: - """Background worker that drains the persistent job queue.""" +class CircuitBreaker: + """Trips after `threshold` consecutive transient failures. - def __init__(self, max_concurrency: int = None, poll_interval: float = None): + States: closed -> (threshold transient) -> open -> (after pause) half-open + -> (recovered) closed | (transient again) open. + Thread-safe enough for our single-worker + monitor-thread callbacks (a lock + guards the counters). + """ + + def __init__(self, threshold: int = None, pause_seconds: int = None): + self.threshold = threshold if threshold is not None else settings.breaker_threshold + self.pause_seconds = ( + pause_seconds if pause_seconds is not None else settings.breaker_pause_seconds + ) + self._lock = threading.Lock() + self.state = "closed" # closed | open | half-open + self.consecutive_transient = 0 + self.opened_at = 0.0 + self._notify = None # optional callable(message) for alerts + + def set_notifier(self, fn): + self._notify = fn + + def record_transient(self): + with self._lock: + self.consecutive_transient += 1 + if self.state == "half-open": + # Probe failed -> re-open. + self._open("circuit re-opened: probe job hit transient again") + elif self.consecutive_transient >= self.threshold and self.state == "closed": + self._open( + f"circuit OPEN: {self.consecutive_transient} consecutive " + f"transient failures; pausing {self.pause_seconds}s (no CLI calls)" + ) + + def record_recovered(self): + with self._lock: + self.consecutive_transient = 0 + if self.state in ("half-open", "open"): + self.state = "closed" + logger.info("Circuit CLOSED: recovered") + + def record_permanent(self): + # A clean permanent (code-fault) failure breaks the transient streak. + with self._lock: + self.consecutive_transient = 0 + + def _open(self, msg: str): + self.state = "open" + self.opened_at = time.time() + logger.warning(msg) + if self._notify: + try: + self._notify(f"\U0001f534 {msg}") + except Exception: + pass + + def allow_claim(self) -> bool: + """Return True if the worker may attempt to claim/launch a job now. + + - closed -> yes. + - open -> no until pause elapsed; then transition to half-open (yes, one probe). + - half-open -> yes (the single probe). + """ + with self._lock: + if self.state == "closed": + return True + if self.state == "open": + if (time.time() - self.opened_at) >= self.pause_seconds: + self.state = "half-open" + logger.info("Circuit HALF-OPEN: probing one job") + return True + return False + # half-open: allow the probe. + return True + + def snapshot(self) -> dict: + with self._lock: + remaining = 0 + if self.state == "open": + remaining = max(0, int(self.pause_seconds - (time.time() - self.opened_at))) + return { + "state": self.state, + "consecutive_transient": self.consecutive_transient, + "pause_remaining_s": remaining, + } + + +class QueueWorker: + """Background worker that drains the persistent job queue (with resilience).""" + + def __init__(self, max_concurrency: int = None, poll_interval: float = None, + breaker: CircuitBreaker = None): self.max_concurrency = ( max_concurrency if max_concurrency is not None else settings.max_concurrency ) self.poll_interval = ( poll_interval if poll_interval is not None else settings.queue_poll_interval ) + self.breaker = breaker or CircuitBreaker() + self.last_preflight_ok = True + self.last_preflight_reason = "not checked" self._stop = threading.Event() self._thread: threading.Thread | None = None + # --- circuit breaker outcome callback wired into the launcher ---------- + def _on_outcome(self, transient: bool, recovered: bool): + if recovered: + self.breaker.record_recovered() + elif transient: + self.breaker.record_transient() + else: + self.breaker.record_permanent() + def _drain_once(self): - """Claim and launch jobs until concurrency is full or the queue is empty.""" + """Claim and launch jobs until concurrency is full or the queue is empty. + + Gated by the circuit breaker and preflight: if the breaker is open (and + not yet cooled down) or preflight fails, we do NOT claim — jobs stay + queued and no CLI/tokens are touched. + """ + if not self.breaker.allow_claim(): + return + ok, reason = preflight.check() + self.last_preflight_ok = ok + self.last_preflight_reason = reason + if not ok: + logger.info(f"Preflight not ok ({reason}) -> not claiming jobs this tick") + return + + # In half-open we only probe a single job, regardless of max_concurrency. + half_open = self.breaker.snapshot()["state"] == "half-open" + launched = 0 while not self._stop.is_set(): + if half_open and launched >= 1: + return if count_running_jobs() >= self.max_concurrency: return job = claim_next_job() if not job: return + launched += 1 try: run_id = launcher.launch_job(job) logger.info( @@ -58,8 +183,8 @@ class QueueWorker: f"repo {job['repo']}) -> run_id={run_id}" ) except Exception as e: - # Launch itself failed (e.g. repo missing): mark the job failed so it - # does not wedge as 'running' forever and block the slot. + # Launch itself failed (e.g. repo missing): treat as a permanent + # launch error so the job does not wedge as 'running' forever. logger.error(f"Worker failed to launch job {job['id']}: {e}") try: from .db import get_job, mark_job @@ -77,20 +202,26 @@ class QueueWorker: def _run(self): logger.info( f"Queue worker started (max_concurrency={self.max_concurrency}, " - f"poll_interval={self.poll_interval}s)" + f"poll_interval={self.poll_interval}s, breaker_threshold={self.breaker.threshold})" ) while not self._stop.is_set(): try: self._drain_once() except Exception as e: logger.error(f"Queue worker loop error: {e}") - # Sleep is interruptible by stop event for fast shutdown. self._stop.wait(self.poll_interval) logger.info("Queue worker stopped") def start(self): if self._thread and self._thread.is_alive(): return + # Wire breaker alerting + launcher outcome callback. + try: + from .notifications import send_telegram + self.breaker.set_notifier(send_telegram) + except Exception: + pass + launcher.on_outcome = self._on_outcome self._stop.clear() self._thread = threading.Thread( target=self._run, name="queue-worker", daemon=True @@ -102,6 +233,14 @@ class QueueWorker: if self._thread: self._thread.join(timeout=timeout) + def status(self) -> dict: + """Resilience snapshot for /queue.""" + return { + "breaker": self.breaker.snapshot(), + "preflight_ok": self.last_preflight_ok, + "preflight_reason": self.last_preflight_reason, + } + # Module-level singleton used by the FastAPI lifespan. worker = QueueWorker()