diff --git a/src/agents/launcher.py b/src/agents/launcher.py index dafedac..3f9afcb 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -364,18 +364,35 @@ class AgentLauncher: # ORCH-1: drive the job-queue status for queue-launched jobs only. # (Legacy direct launch() has job_id=None and is unaffected.) if job_id is not None: - self._finalize_job(job_id, agent, run_id, exit_code) + self._finalize_job(job_id, agent, run_id, exit_code, output_path=output_path) - def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code): + def _backoff_seconds(self, transient_attempts: int, retry_after: int = None) -> int: + """Exponential backoff for transient failures, honouring Retry-After. + + backoff = min(2^transient_attempts * base, max). If the server sent a + Retry-After, use the larger of the two (never poll sooner than asked). + """ + base = settings.backoff_base_seconds + cap = settings.backoff_max_seconds + backoff = min((2 ** max(transient_attempts, 0)) * base, cap) + if retry_after is not None and retry_after > 0: + backoff = max(backoff, min(retry_after, cap)) + return int(backoff) + + def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code, output_path=None): """ORCH-1: update the jobs row after the agent process finished. - exit_code == 0 -> done. - exit_code != 0 -> retry while attempts < max_attempts (back to 'queued'), - otherwise 'failed' + Telegram notification. - attempts were already incremented at claim time, so `attempts` reflects how - many times this job has been picked up. + exit_code == 0 -> done (and resets the breaker streak via on_outcome). + exit_code != 0 -> classify the failure from the run log tail (token-free): + - TRANSIENT (429/overload/network): backoff-requeue with available_at in + the future + a SEPARATE transient_attempts budget + (settings.transient_max_attempts), honouring Retry-After. Reported to + the breaker so it opens after N consecutive transient failures. + - PERMANENT (code fault): ordinary attempts < max_attempts requeue, + otherwise 'failed' + Telegram. """ from ..db import get_job, mark_job + from ..error_classifier import classify_log_file try: job = get_job(job_id) if not job: @@ -383,35 +400,93 @@ class AgentLauncher: if exit_code == 0: mark_job(job_id, "done", run_id=run_id) logger.info(f"Job {job_id} ({agent}) done (run_id={run_id})") + self._record_outcome(transient=False, recovered=True) return - attempts = job.get("attempts", 0) - max_attempts = job.get("max_attempts", 2) - err = f"agent {agent} exit_code={exit_code} (run_id={run_id})" - if attempts < max_attempts: - mark_job(job_id, "queued", run_id=run_id, error=err) - logger.warning( - f"Job {job_id} ({agent}) failed (exit={exit_code}), " - f"requeued (attempt {attempts}/{max_attempts})" - ) + # Classify the failure from the agent log tail (no token cost). + kind, retry_after = "permanent", None + log_path = output_path or f"/app/data/runs/{run_id}.log" + try: + kind, retry_after = classify_log_file(log_path) + except Exception: + pass + + if kind == "transient": + self._finalize_transient(job_id, agent, run_id, exit_code, job, retry_after) else: - mark_job(job_id, "failed", run_id=run_id, error=err) - logger.error( - f"Job {job_id} ({agent}) failed permanently after " - f"{attempts} attempts (exit={exit_code})" - ) - try: - from ..notifications import send_telegram - send_telegram( - f"\U0001f6a8 Job {job_id} ({agent}, repo {job.get('repo')}) " - f"failed after {attempts} attempts (exit={exit_code}). " - f"Logs: /app/data/runs/{run_id}.log" - ) - except Exception: - pass + self._finalize_permanent(job_id, agent, run_id, exit_code, job) except Exception as e: logger.error(f"Job {job_id}: _finalize_job error: {e}") + def _finalize_transient(self, job_id, agent, run_id, exit_code, job, retry_after): + """Transient (429/overload/net) failure -> backoff requeue or fail when budget out.""" + from ..db import mark_job, mark_job_transient + tattempts = job.get("transient_attempts", 0) + tmax = settings.transient_max_attempts + err = (f"transient (429/overload) agent {agent} exit={exit_code} " + f"(run_id={run_id}); retry_after={retry_after}") + self._record_outcome(transient=True, recovered=False) + if tattempts < tmax: + backoff = self._backoff_seconds(tattempts + 1, retry_after) + mark_job_transient(job_id, backoff, error=err) + logger.warning( + f"Job {job_id} ({agent}) TRANSIENT fail (exit={exit_code}), " + f"backoff {backoff}s, transient_attempt {tattempts + 1}/{tmax}" + ) + else: + mark_job(job_id, "failed", run_id=run_id, error=err) + logger.error( + f"Job {job_id} ({agent}) failed after {tattempts} transient attempts" + ) + self._notify_failed(job_id, agent, job, run_id, + f"transient (rate-limit) after {tattempts} attempts") + + def _finalize_permanent(self, job_id, agent, run_id, exit_code, job): + """Permanent (code-fault) failure -> normal attempts