Introduce the dedicated Plane STOP status as a single declarative task-cancel
mechanism: stop the active agent (graceful SIGTERM cascade), cancel all jobs
(terminal `cancelled`, never requeued), remove the worktree + delete the remote
feature branch (never main, never force-push), drive the task to the new
system-terminal state `cancelled` and tombstone the natural keys so a later
"To Analyse" re-creates it from scratch (docs artefacts preserved). STOP during a
critical merge/deploy window is deferred until the irreversible step finishes
honestly. Also closes the relaunch hole: handle_status_start relaunch is gated to
the `analysis` stage; the only pipeline-start entry point remains "To Analyse".
Cross-cutting (adr-0026): the "task terminal" predicate is widened {done} ->
{done, cancelled} in serial_gate / task_deps / stages sink + reaper/worker
requeue guards. STAGE_TRANSITIONS exit-gates / QG_CHECKS / check_* are unchanged
(`cancelled` is a sink, not a new edge). Additive, never-raise, restart-safe,
under kill-switch ORCH_STOP_STATUS_ENABLED (off -> zero regression).
New: src/cancel.py (leaf), src/gitea.py (delete_remote_branch), tasks columns
cancelled_at/cancel_requested_at, jobs status `cancelled`, GET /queue `stop` block.
Tests: tests/test_stop_status.py (TC-01..TC-14 + D7); full suite green (1345).
Docs updated in-PR (architecture README, CLAUDE.md, README.md, .env.example,
CHANGELOG). ADR-001 D4 refinement: plane_issue_id is tombstoned too (the lookup
ORs on it) — original UUID recoverable from the parseable suffix.
Refs: ORCH-090
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
253 lines
9.9 KiB
Python
253 lines
9.9 KiB
Python
"""ORCH-1 (F-2b): background job-queue worker with resilience layer.
|
|
|
|
A single background thread polls the `jobs` table and spawns agents:
|
|
|
|
while running:
|
|
if breaker.open and not cooled_down: sleep; continue # don't touch CLI
|
|
if not preflight.ok: sleep; continue # CLI/net down -> wait
|
|
while count_running_jobs() < max_concurrency:
|
|
job = claim_next_job() # atomic queued -> running (available_at-gated)
|
|
if not job: break
|
|
launcher.launch_job(job) # spawns claude (Popen) + monitor thread
|
|
sleep(poll_interval)
|
|
|
|
Resilience (ДОПОЛНЕНИЕ):
|
|
A. Preflight — cheap local CLI/net check (cached, no tokens) gates claiming.
|
|
B/C. The launcher classifies failures (transient vs permanent) and applies
|
|
backoff via available_at; the worker only needs to honour available_at
|
|
(claim_next_job does) and react to transient outcomes via the breaker.
|
|
D. Circuit breaker — N consecutive transient failures -> open (pause M minutes,
|
|
no CLI calls, Telegram alert) -> half-open (probe one job) -> closed.
|
|
|
|
Design: plain daemon thread + threading.Event (the launcher already manages its
|
|
own monitor/watchdog threads + blocking Popen).
|
|
"""
|
|
import time
|
|
import logging
|
|
import threading
|
|
|
|
from .config import settings
|
|
from .db import claim_next_job, count_running_jobs
|
|
from .agents.launcher import launcher
|
|
from . import preflight
|
|
|
|
logger = logging.getLogger("orchestrator.queue_worker")
|
|
|
|
|
|
class CircuitBreaker:
|
|
"""Trips after `threshold` consecutive transient failures.
|
|
|
|
States: closed -> (threshold transient) -> open -> (after pause) half-open
|
|
-> (recovered) closed | (transient again) open.
|
|
Thread-safe enough for our single-worker + monitor-thread callbacks (a lock
|
|
guards the counters).
|
|
"""
|
|
|
|
def __init__(self, threshold: int = None, pause_seconds: int = None):
|
|
self.threshold = threshold if threshold is not None else settings.breaker_threshold
|
|
self.pause_seconds = (
|
|
pause_seconds if pause_seconds is not None else settings.breaker_pause_seconds
|
|
)
|
|
self._lock = threading.Lock()
|
|
self.state = "closed" # closed | open | half-open
|
|
self.consecutive_transient = 0
|
|
self.opened_at = 0.0
|
|
self._notify = None # optional callable(message) for alerts
|
|
|
|
def set_notifier(self, fn):
|
|
self._notify = fn
|
|
|
|
def record_transient(self):
|
|
with self._lock:
|
|
self.consecutive_transient += 1
|
|
if self.state == "half-open":
|
|
# Probe failed -> re-open.
|
|
self._open("circuit re-opened: probe job hit transient again")
|
|
elif self.consecutive_transient >= self.threshold and self.state == "closed":
|
|
self._open(
|
|
f"circuit OPEN: {self.consecutive_transient} consecutive "
|
|
f"transient failures; pausing {self.pause_seconds}s (no CLI calls)"
|
|
)
|
|
|
|
def record_recovered(self):
|
|
with self._lock:
|
|
self.consecutive_transient = 0
|
|
if self.state in ("half-open", "open"):
|
|
self.state = "closed"
|
|
logger.info("Circuit CLOSED: recovered")
|
|
|
|
def record_permanent(self):
|
|
# A clean permanent (code-fault) failure breaks the transient streak.
|
|
with self._lock:
|
|
self.consecutive_transient = 0
|
|
|
|
def _open(self, msg: str):
|
|
self.state = "open"
|
|
self.opened_at = time.time()
|
|
logger.warning(msg)
|
|
if self._notify:
|
|
try:
|
|
self._notify(f"\U0001f534 {msg}")
|
|
except Exception:
|
|
pass
|
|
|
|
def allow_claim(self) -> bool:
|
|
"""Return True if the worker may attempt to claim/launch a job now.
|
|
|
|
- closed -> yes.
|
|
- open -> no until pause elapsed; then transition to half-open (yes, one probe).
|
|
- half-open -> yes (the single probe).
|
|
"""
|
|
with self._lock:
|
|
if self.state == "closed":
|
|
return True
|
|
if self.state == "open":
|
|
if (time.time() - self.opened_at) >= self.pause_seconds:
|
|
self.state = "half-open"
|
|
logger.info("Circuit HALF-OPEN: probing one job")
|
|
return True
|
|
return False
|
|
# half-open: allow the probe.
|
|
return True
|
|
|
|
def snapshot(self) -> dict:
|
|
with self._lock:
|
|
remaining = 0
|
|
if self.state == "open":
|
|
remaining = max(0, int(self.pause_seconds - (time.time() - self.opened_at)))
|
|
return {
|
|
"state": self.state,
|
|
"consecutive_transient": self.consecutive_transient,
|
|
"pause_remaining_s": remaining,
|
|
}
|
|
|
|
|
|
class QueueWorker:
|
|
"""Background worker that drains the persistent job queue (with resilience)."""
|
|
|
|
def __init__(self, max_concurrency: int = None, poll_interval: float = None,
|
|
breaker: CircuitBreaker = None):
|
|
self.max_concurrency = (
|
|
max_concurrency if max_concurrency is not None else settings.max_concurrency
|
|
)
|
|
self.poll_interval = (
|
|
poll_interval if poll_interval is not None else settings.queue_poll_interval
|
|
)
|
|
self.breaker = breaker or CircuitBreaker()
|
|
self.last_preflight_ok = True
|
|
self.last_preflight_reason = "not checked"
|
|
self._stop = threading.Event()
|
|
self._thread: threading.Thread | None = None
|
|
|
|
# --- circuit breaker outcome callback wired into the launcher ----------
|
|
def _on_outcome(self, transient: bool, recovered: bool):
|
|
if recovered:
|
|
self.breaker.record_recovered()
|
|
elif transient:
|
|
self.breaker.record_transient()
|
|
else:
|
|
self.breaker.record_permanent()
|
|
|
|
def _drain_once(self):
|
|
"""Claim and launch jobs until concurrency is full or the queue is empty.
|
|
|
|
Gated by the circuit breaker and preflight: if the breaker is open (and
|
|
not yet cooled down) or preflight fails, we do NOT claim — jobs stay
|
|
queued and no CLI/tokens are touched.
|
|
"""
|
|
if not self.breaker.allow_claim():
|
|
return
|
|
ok, reason = preflight.check()
|
|
self.last_preflight_ok = ok
|
|
self.last_preflight_reason = reason
|
|
if not ok:
|
|
logger.info(f"Preflight not ok ({reason}) -> not claiming jobs this tick")
|
|
return
|
|
|
|
# In half-open we only probe a single job, regardless of max_concurrency.
|
|
half_open = self.breaker.snapshot()["state"] == "half-open"
|
|
launched = 0
|
|
while not self._stop.is_set():
|
|
if half_open and launched >= 1:
|
|
return
|
|
if count_running_jobs() >= self.max_concurrency:
|
|
return
|
|
job = claim_next_job()
|
|
if not job:
|
|
return
|
|
launched += 1
|
|
try:
|
|
run_id = launcher.launch_job(job)
|
|
logger.info(
|
|
f"Worker launched job {job['id']} ({job['agent']}, "
|
|
f"repo {job['repo']}) -> run_id={run_id}"
|
|
)
|
|
except Exception as e:
|
|
# Launch itself failed (e.g. repo missing): treat as a permanent
|
|
# launch error so the job does not wedge as 'running' forever.
|
|
logger.error(f"Worker failed to launch job {job['id']}: {e}")
|
|
try:
|
|
from .db import get_job, mark_job, get_task
|
|
|
|
j = get_job(job["id"])
|
|
attempts = j.get("attempts", 0) if j else 0
|
|
max_attempts = j.get("max_attempts", 2) if j else 2
|
|
# ORCH-090 (adr-0026 / TR-2): never requeue a job whose task is
|
|
# already terminal ({done,cancelled}) — a STOP that landed between
|
|
# claim and launch must win over the retry budget.
|
|
task = get_task(job.get("task_id")) if job.get("task_id") else None
|
|
if task and task.get("stage") in ("done", "cancelled"):
|
|
mark_job(job["id"], "cancelled", error=f"launch error (task terminal): {e}")
|
|
elif attempts < max_attempts:
|
|
mark_job(job["id"], "queued", error=f"launch error: {e}")
|
|
else:
|
|
mark_job(job["id"], "failed", error=f"launch error: {e}")
|
|
except Exception:
|
|
pass
|
|
|
|
def _run(self):
|
|
logger.info(
|
|
f"Queue worker started (max_concurrency={self.max_concurrency}, "
|
|
f"poll_interval={self.poll_interval}s, breaker_threshold={self.breaker.threshold})"
|
|
)
|
|
while not self._stop.is_set():
|
|
try:
|
|
self._drain_once()
|
|
except Exception as e:
|
|
logger.error(f"Queue worker loop error: {e}")
|
|
self._stop.wait(self.poll_interval)
|
|
logger.info("Queue worker stopped")
|
|
|
|
def start(self):
|
|
if self._thread and self._thread.is_alive():
|
|
return
|
|
# Wire breaker alerting + launcher outcome callback.
|
|
try:
|
|
from .notifications import send_telegram
|
|
self.breaker.set_notifier(send_telegram)
|
|
except Exception:
|
|
pass
|
|
launcher.on_outcome = self._on_outcome
|
|
self._stop.clear()
|
|
self._thread = threading.Thread(
|
|
target=self._run, name="queue-worker", daemon=True
|
|
)
|
|
self._thread.start()
|
|
|
|
def stop(self, timeout: float = 5.0):
|
|
self._stop.set()
|
|
if self._thread:
|
|
self._thread.join(timeout=timeout)
|
|
|
|
def status(self) -> dict:
|
|
"""Resilience snapshot for /queue."""
|
|
return {
|
|
"breaker": self.breaker.snapshot(),
|
|
"preflight_ok": self.last_preflight_ok,
|
|
"preflight_reason": self.last_preflight_reason,
|
|
}
|
|
|
|
|
|
# Module-level singleton used by the FastAPI lifespan.
|
|
worker = QueueWorker()
|