feat(launcher): classify failures, backoff transient retry, breaker outcome (ORCH-1)
_finalize_job classifies the run log: transient (429/overload) -> backoff requeue via mark_job_transient with separate transient_attempts budget honouring Retry-After; permanent -> normal attempts<max. on_outcome callback feeds the circuit breaker. _backoff_seconds = min(2^n*base, max) | Retry-After.
This commit is contained in:
@@ -364,18 +364,35 @@ class AgentLauncher:
|
||||
# ORCH-1: drive the job-queue status for queue-launched jobs only.
|
||||
# (Legacy direct launch() has job_id=None and is unaffected.)
|
||||
if job_id is not None:
|
||||
self._finalize_job(job_id, agent, run_id, exit_code)
|
||||
self._finalize_job(job_id, agent, run_id, exit_code, output_path=output_path)
|
||||
|
||||
def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code):
|
||||
def _backoff_seconds(self, transient_attempts: int, retry_after: int = None) -> int:
|
||||
"""Exponential backoff for transient failures, honouring Retry-After.
|
||||
|
||||
backoff = min(2^transient_attempts * base, max). If the server sent a
|
||||
Retry-After, use the larger of the two (never poll sooner than asked).
|
||||
"""
|
||||
base = settings.backoff_base_seconds
|
||||
cap = settings.backoff_max_seconds
|
||||
backoff = min((2 ** max(transient_attempts, 0)) * base, cap)
|
||||
if retry_after is not None and retry_after > 0:
|
||||
backoff = max(backoff, min(retry_after, cap))
|
||||
return int(backoff)
|
||||
|
||||
def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code, output_path=None):
|
||||
"""ORCH-1: update the jobs row after the agent process finished.
|
||||
|
||||
exit_code == 0 -> done.
|
||||
exit_code != 0 -> retry while attempts < max_attempts (back to 'queued'),
|
||||
otherwise 'failed' + Telegram notification.
|
||||
attempts were already incremented at claim time, so `attempts` reflects how
|
||||
many times this job has been picked up.
|
||||
exit_code == 0 -> done (and resets the breaker streak via on_outcome).
|
||||
exit_code != 0 -> classify the failure from the run log tail (token-free):
|
||||
- TRANSIENT (429/overload/network): backoff-requeue with available_at in
|
||||
the future + a SEPARATE transient_attempts budget
|
||||
(settings.transient_max_attempts), honouring Retry-After. Reported to
|
||||
the breaker so it opens after N consecutive transient failures.
|
||||
- PERMANENT (code fault): ordinary attempts < max_attempts requeue,
|
||||
otherwise 'failed' + Telegram.
|
||||
"""
|
||||
from ..db import get_job, mark_job
|
||||
from ..error_classifier import classify_log_file
|
||||
try:
|
||||
job = get_job(job_id)
|
||||
if not job:
|
||||
@@ -383,35 +400,93 @@ class AgentLauncher:
|
||||
if exit_code == 0:
|
||||
mark_job(job_id, "done", run_id=run_id)
|
||||
logger.info(f"Job {job_id} ({agent}) done (run_id={run_id})")
|
||||
self._record_outcome(transient=False, recovered=True)
|
||||
return
|
||||
|
||||
attempts = job.get("attempts", 0)
|
||||
max_attempts = job.get("max_attempts", 2)
|
||||
err = f"agent {agent} exit_code={exit_code} (run_id={run_id})"
|
||||
if attempts < max_attempts:
|
||||
mark_job(job_id, "queued", run_id=run_id, error=err)
|
||||
logger.warning(
|
||||
f"Job {job_id} ({agent}) failed (exit={exit_code}), "
|
||||
f"requeued (attempt {attempts}/{max_attempts})"
|
||||
)
|
||||
# Classify the failure from the agent log tail (no token cost).
|
||||
kind, retry_after = "permanent", None
|
||||
log_path = output_path or f"/app/data/runs/{run_id}.log"
|
||||
try:
|
||||
kind, retry_after = classify_log_file(log_path)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if kind == "transient":
|
||||
self._finalize_transient(job_id, agent, run_id, exit_code, job, retry_after)
|
||||
else:
|
||||
mark_job(job_id, "failed", run_id=run_id, error=err)
|
||||
logger.error(
|
||||
f"Job {job_id} ({agent}) failed permanently after "
|
||||
f"{attempts} attempts (exit={exit_code})"
|
||||
)
|
||||
try:
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(
|
||||
f"\U0001f6a8 Job {job_id} ({agent}, repo {job.get('repo')}) "
|
||||
f"failed after {attempts} attempts (exit={exit_code}). "
|
||||
f"Logs: /app/data/runs/{run_id}.log"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
self._finalize_permanent(job_id, agent, run_id, exit_code, job)
|
||||
except Exception as e:
|
||||
logger.error(f"Job {job_id}: _finalize_job error: {e}")
|
||||
|
||||
def _finalize_transient(self, job_id, agent, run_id, exit_code, job, retry_after):
|
||||
"""Transient (429/overload/net) failure -> backoff requeue or fail when budget out."""
|
||||
from ..db import mark_job, mark_job_transient
|
||||
tattempts = job.get("transient_attempts", 0)
|
||||
tmax = settings.transient_max_attempts
|
||||
err = (f"transient (429/overload) agent {agent} exit={exit_code} "
|
||||
f"(run_id={run_id}); retry_after={retry_after}")
|
||||
self._record_outcome(transient=True, recovered=False)
|
||||
if tattempts < tmax:
|
||||
backoff = self._backoff_seconds(tattempts + 1, retry_after)
|
||||
mark_job_transient(job_id, backoff, error=err)
|
||||
logger.warning(
|
||||
f"Job {job_id} ({agent}) TRANSIENT fail (exit={exit_code}), "
|
||||
f"backoff {backoff}s, transient_attempt {tattempts + 1}/{tmax}"
|
||||
)
|
||||
else:
|
||||
mark_job(job_id, "failed", run_id=run_id, error=err)
|
||||
logger.error(
|
||||
f"Job {job_id} ({agent}) failed after {tattempts} transient attempts"
|
||||
)
|
||||
self._notify_failed(job_id, agent, job, run_id,
|
||||
f"transient (rate-limit) after {tattempts} attempts")
|
||||
|
||||
def _finalize_permanent(self, job_id, agent, run_id, exit_code, job):
|
||||
"""Permanent (code-fault) failure -> normal attempts<max requeue, then fail."""
|
||||
from ..db import mark_job
|
||||
attempts = job.get("attempts", 0)
|
||||
max_attempts = job.get("max_attempts", 2)
|
||||
err = f"agent {agent} exit_code={exit_code} (run_id={run_id})"
|
||||
self._record_outcome(transient=False, recovered=False)
|
||||
if attempts < max_attempts:
|
||||
mark_job(job_id, "queued", run_id=run_id, error=err)
|
||||
logger.warning(
|
||||
f"Job {job_id} ({agent}) failed (exit={exit_code}), "
|
||||
f"requeued (attempt {attempts}/{max_attempts})"
|
||||
)
|
||||
else:
|
||||
mark_job(job_id, "failed", run_id=run_id, error=err)
|
||||
logger.error(
|
||||
f"Job {job_id} ({agent}) failed permanently after "
|
||||
f"{attempts} attempts (exit={exit_code})"
|
||||
)
|
||||
self._notify_failed(job_id, agent, job, run_id,
|
||||
f"{attempts} attempts (exit={exit_code})")
|
||||
|
||||
def _notify_failed(self, job_id, agent, job, run_id, why):
|
||||
try:
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(
|
||||
f"\U0001f6a8 Job {job_id} ({agent}, repo {job.get('repo')}) "
|
||||
f"failed: {why}. Logs: /app/data/runs/{run_id}.log"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _record_outcome(self, transient: bool, recovered: bool):
|
||||
"""Forward the run outcome to the circuit breaker (if a worker is wired).
|
||||
|
||||
Decoupled via a settable callback (set by QueueWorker.start) so the launcher
|
||||
does not hard-import the worker (avoids a cycle) and tests can run the
|
||||
launcher standalone.
|
||||
"""
|
||||
cb = getattr(self, "on_outcome", None)
|
||||
if cb:
|
||||
try:
|
||||
cb(transient=transient, recovered=recovered)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
|
||||
"""After agent finishes successfully, check QG and advance stage if possible."""
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user