Files
orchestrator/src/queue_worker.py
claude-bot ebbf2e7a2d feat(cancel): STOP-status task cancellation + relaunch-hole close (ORCH-090)
Introduce the dedicated Plane STOP status as a single declarative task-cancel
mechanism: stop the active agent (graceful SIGTERM cascade), cancel all jobs
(terminal `cancelled`, never requeued), remove the worktree + delete the remote
feature branch (never main, never force-push), drive the task to the new
system-terminal state `cancelled` and tombstone the natural keys so a later
"To Analyse" re-creates it from scratch (docs artefacts preserved). STOP during a
critical merge/deploy window is deferred until the irreversible step finishes
honestly. Also closes the relaunch hole: handle_status_start relaunch is gated to
the `analysis` stage; the only pipeline-start entry point remains "To Analyse".

Cross-cutting (adr-0026): the "task terminal" predicate is widened {done} ->
{done, cancelled} in serial_gate / task_deps / stages sink + reaper/worker
requeue guards. STAGE_TRANSITIONS exit-gates / QG_CHECKS / check_* are unchanged
(`cancelled` is a sink, not a new edge). Additive, never-raise, restart-safe,
under kill-switch ORCH_STOP_STATUS_ENABLED (off -> zero regression).

New: src/cancel.py (leaf), src/gitea.py (delete_remote_branch), tasks columns
cancelled_at/cancel_requested_at, jobs status `cancelled`, GET /queue `stop` block.
Tests: tests/test_stop_status.py (TC-01..TC-14 + D7); full suite green (1345).
Docs updated in-PR (architecture README, CLAUDE.md, README.md, .env.example,
CHANGELOG). ADR-001 D4 refinement: plane_issue_id is tombstoned too (the lookup
ORs on it) — original UUID recoverable from the parseable suffix.

Refs: ORCH-090

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 21:31:56 +03:00

253 lines
9.9 KiB
Python

"""ORCH-1 (F-2b): background job-queue worker with resilience layer.
A single background thread polls the `jobs` table and spawns agents:
while running:
if breaker.open and not cooled_down: sleep; continue # don't touch CLI
if not preflight.ok: sleep; continue # CLI/net down -> wait
while count_running_jobs() < max_concurrency:
job = claim_next_job() # atomic queued -> running (available_at-gated)
if not job: break
launcher.launch_job(job) # spawns claude (Popen) + monitor thread
sleep(poll_interval)
Resilience (ДОПОЛНЕНИЕ):
A. Preflight — cheap local CLI/net check (cached, no tokens) gates claiming.
B/C. The launcher classifies failures (transient vs permanent) and applies
backoff via available_at; the worker only needs to honour available_at
(claim_next_job does) and react to transient outcomes via the breaker.
D. Circuit breaker — N consecutive transient failures -> open (pause M minutes,
no CLI calls, Telegram alert) -> half-open (probe one job) -> closed.
Design: plain daemon thread + threading.Event (the launcher already manages its
own monitor/watchdog threads + blocking Popen).
"""
import time
import logging
import threading
from .config import settings
from .db import claim_next_job, count_running_jobs
from .agents.launcher import launcher
from . import preflight
logger = logging.getLogger("orchestrator.queue_worker")
class CircuitBreaker:
"""Trips after `threshold` consecutive transient failures.
States: closed -> (threshold transient) -> open -> (after pause) half-open
-> (recovered) closed | (transient again) open.
Thread-safe enough for our single-worker + monitor-thread callbacks (a lock
guards the counters).
"""
def __init__(self, threshold: int = None, pause_seconds: int = None):
self.threshold = threshold if threshold is not None else settings.breaker_threshold
self.pause_seconds = (
pause_seconds if pause_seconds is not None else settings.breaker_pause_seconds
)
self._lock = threading.Lock()
self.state = "closed" # closed | open | half-open
self.consecutive_transient = 0
self.opened_at = 0.0
self._notify = None # optional callable(message) for alerts
def set_notifier(self, fn):
self._notify = fn
def record_transient(self):
with self._lock:
self.consecutive_transient += 1
if self.state == "half-open":
# Probe failed -> re-open.
self._open("circuit re-opened: probe job hit transient again")
elif self.consecutive_transient >= self.threshold and self.state == "closed":
self._open(
f"circuit OPEN: {self.consecutive_transient} consecutive "
f"transient failures; pausing {self.pause_seconds}s (no CLI calls)"
)
def record_recovered(self):
with self._lock:
self.consecutive_transient = 0
if self.state in ("half-open", "open"):
self.state = "closed"
logger.info("Circuit CLOSED: recovered")
def record_permanent(self):
# A clean permanent (code-fault) failure breaks the transient streak.
with self._lock:
self.consecutive_transient = 0
def _open(self, msg: str):
self.state = "open"
self.opened_at = time.time()
logger.warning(msg)
if self._notify:
try:
self._notify(f"\U0001f534 {msg}")
except Exception:
pass
def allow_claim(self) -> bool:
"""Return True if the worker may attempt to claim/launch a job now.
- closed -> yes.
- open -> no until pause elapsed; then transition to half-open (yes, one probe).
- half-open -> yes (the single probe).
"""
with self._lock:
if self.state == "closed":
return True
if self.state == "open":
if (time.time() - self.opened_at) >= self.pause_seconds:
self.state = "half-open"
logger.info("Circuit HALF-OPEN: probing one job")
return True
return False
# half-open: allow the probe.
return True
def snapshot(self) -> dict:
with self._lock:
remaining = 0
if self.state == "open":
remaining = max(0, int(self.pause_seconds - (time.time() - self.opened_at)))
return {
"state": self.state,
"consecutive_transient": self.consecutive_transient,
"pause_remaining_s": remaining,
}
class QueueWorker:
"""Background worker that drains the persistent job queue (with resilience)."""
def __init__(self, max_concurrency: int = None, poll_interval: float = None,
breaker: CircuitBreaker = None):
self.max_concurrency = (
max_concurrency if max_concurrency is not None else settings.max_concurrency
)
self.poll_interval = (
poll_interval if poll_interval is not None else settings.queue_poll_interval
)
self.breaker = breaker or CircuitBreaker()
self.last_preflight_ok = True
self.last_preflight_reason = "not checked"
self._stop = threading.Event()
self._thread: threading.Thread | None = None
# --- circuit breaker outcome callback wired into the launcher ----------
def _on_outcome(self, transient: bool, recovered: bool):
if recovered:
self.breaker.record_recovered()
elif transient:
self.breaker.record_transient()
else:
self.breaker.record_permanent()
def _drain_once(self):
"""Claim and launch jobs until concurrency is full or the queue is empty.
Gated by the circuit breaker and preflight: if the breaker is open (and
not yet cooled down) or preflight fails, we do NOT claim — jobs stay
queued and no CLI/tokens are touched.
"""
if not self.breaker.allow_claim():
return
ok, reason = preflight.check()
self.last_preflight_ok = ok
self.last_preflight_reason = reason
if not ok:
logger.info(f"Preflight not ok ({reason}) -> not claiming jobs this tick")
return
# In half-open we only probe a single job, regardless of max_concurrency.
half_open = self.breaker.snapshot()["state"] == "half-open"
launched = 0
while not self._stop.is_set():
if half_open and launched >= 1:
return
if count_running_jobs() >= self.max_concurrency:
return
job = claim_next_job()
if not job:
return
launched += 1
try:
run_id = launcher.launch_job(job)
logger.info(
f"Worker launched job {job['id']} ({job['agent']}, "
f"repo {job['repo']}) -> run_id={run_id}"
)
except Exception as e:
# Launch itself failed (e.g. repo missing): treat as a permanent
# launch error so the job does not wedge as 'running' forever.
logger.error(f"Worker failed to launch job {job['id']}: {e}")
try:
from .db import get_job, mark_job, get_task
j = get_job(job["id"])
attempts = j.get("attempts", 0) if j else 0
max_attempts = j.get("max_attempts", 2) if j else 2
# ORCH-090 (adr-0026 / TR-2): never requeue a job whose task is
# already terminal ({done,cancelled}) — a STOP that landed between
# claim and launch must win over the retry budget.
task = get_task(job.get("task_id")) if job.get("task_id") else None
if task and task.get("stage") in ("done", "cancelled"):
mark_job(job["id"], "cancelled", error=f"launch error (task terminal): {e}")
elif attempts < max_attempts:
mark_job(job["id"], "queued", error=f"launch error: {e}")
else:
mark_job(job["id"], "failed", error=f"launch error: {e}")
except Exception:
pass
def _run(self):
logger.info(
f"Queue worker started (max_concurrency={self.max_concurrency}, "
f"poll_interval={self.poll_interval}s, breaker_threshold={self.breaker.threshold})"
)
while not self._stop.is_set():
try:
self._drain_once()
except Exception as e:
logger.error(f"Queue worker loop error: {e}")
self._stop.wait(self.poll_interval)
logger.info("Queue worker stopped")
def start(self):
if self._thread and self._thread.is_alive():
return
# Wire breaker alerting + launcher outcome callback.
try:
from .notifications import send_telegram
self.breaker.set_notifier(send_telegram)
except Exception:
pass
launcher.on_outcome = self._on_outcome
self._stop.clear()
self._thread = threading.Thread(
target=self._run, name="queue-worker", daemon=True
)
self._thread.start()
def stop(self, timeout: float = 5.0):
self._stop.set()
if self._thread:
self._thread.join(timeout=timeout)
def status(self) -> dict:
"""Resilience snapshot for /queue."""
return {
"breaker": self.breaker.snapshot(),
"preflight_ok": self.last_preflight_ok,
"preflight_reason": self.last_preflight_reason,
}
# Module-level singleton used by the FastAPI lifespan.
worker = QueueWorker()