feat(launcher): graceful SIGTERM->SIGKILL + configurable agent timeout (M-2)
The watchdog used to time.sleep(timeout) then immediately SIGKILL, which cut
claude off mid-write and left half-written artifacts. It now sends SIGTERM,
polls os.kill(pid, 0) for up to agent_kill_grace_seconds, and only SIGKILL if
the process is still alive; ProcessLookupError is tolerated at every step.
Timeout is now configurable via config.py: agent_timeout_seconds (default 1800),
agent_kill_grace_seconds (default 20), and agent_timeout_overrides_json for
per-agent overrides (e.g. {"reviewer": 3600}). AGENT_TIMEOUT is kept as a
backward-compatible alias. The recorded exit_code stays -9 so the ORCH-1
monitor retry/fail logic is unchanged (timeout-kills classify as permanent and
requeue within max_attempts, no retry loop).
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
import subprocess
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import signal
|
||||
import time
|
||||
from ..config import settings
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||
@@ -53,7 +55,10 @@ class AgentLauncher:
|
||||
}
|
||||
|
||||
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
|
||||
AGENT_TIMEOUT = 1800 # 30 minutes
|
||||
# ORCH-7 (M-2): timeout is now configurable. AGENT_TIMEOUT stays as a
|
||||
# backward-compatible alias for the default; the actual value (and per-agent
|
||||
# overrides) live in settings and are resolved via _resolve_timeout().
|
||||
AGENT_TIMEOUT = settings.agent_timeout_seconds
|
||||
|
||||
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
|
||||
"""
|
||||
@@ -190,7 +195,7 @@ class AgentLauncher:
|
||||
t = threading.Thread(
|
||||
target=self._watchdog,
|
||||
args=(proc.pid, run_id),
|
||||
kwargs={"job_id": job_id},
|
||||
kwargs={"job_id": job_id, "agent": agent},
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
@@ -209,29 +214,100 @@ class AgentLauncher:
|
||||
notify_agent_started(run_id, agent, task_id)
|
||||
return run_id
|
||||
|
||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None):
|
||||
"""Kill agent if it exceeds timeout.
|
||||
@staticmethod
|
||||
def _resolve_timeout(agent: str = None) -> int:
|
||||
"""ORCH-7 (M-2): resolve the wall-clock timeout for an agent.
|
||||
|
||||
Per-agent override from settings.agent_timeout_overrides_json (a JSON object
|
||||
like {"reviewer": 3600}) wins; otherwise the global default
|
||||
settings.agent_timeout_seconds is used. A malformed override JSON is ignored
|
||||
(falls back to the default) and only logged, so a bad env never bricks runs.
|
||||
"""
|
||||
default = settings.agent_timeout_seconds
|
||||
raw = (settings.agent_timeout_overrides_json or "").strip()
|
||||
if agent and raw:
|
||||
try:
|
||||
overrides = json.loads(raw)
|
||||
if isinstance(overrides, dict) and agent in overrides:
|
||||
return int(overrides[agent])
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.warning(f"Invalid agent_timeout_overrides_json, using default: {e}")
|
||||
return default
|
||||
|
||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None,
|
||||
job_id: int = None, agent: str = None):
|
||||
"""Kill agent if it exceeds its timeout.
|
||||
|
||||
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
|
||||
code and drives the job retry/fail logic, so the watchdog itself only needs
|
||||
to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry.
|
||||
to terminate the process and record the agent_runs exit. job_id is accepted
|
||||
for symmetry.
|
||||
|
||||
ORCH-7 (M-2): graceful shutdown. Instead of an immediate SIGKILL (which cuts
|
||||
claude off mid-write and leaves half-written artifacts), send SIGTERM first,
|
||||
give the process up to settings.agent_kill_grace_seconds to flush and exit on
|
||||
its own, and only SIGKILL if it is still alive after the grace window. If the
|
||||
process exits during the grace window, SIGKILL is NOT sent.
|
||||
ProcessLookupError is tolerated at every step (the process may already be
|
||||
gone). The recorded exit_code stays -9 to match the existing retry/fail
|
||||
contract regardless of which signal actually reaped it.
|
||||
"""
|
||||
import time
|
||||
if timeout is None:
|
||||
timeout = self.AGENT_TIMEOUT
|
||||
timeout = self._resolve_timeout(agent)
|
||||
time.sleep(timeout)
|
||||
|
||||
# Phase 1: SIGTERM (graceful). If the process is already gone, we're done.
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
logger.warning(
|
||||
f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM "
|
||||
f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s"
|
||||
)
|
||||
except ProcessLookupError:
|
||||
logger.info(f"Agent run_id={run_id} already exited before SIGTERM")
|
||||
return # nothing to record: the monitor's proc.wait() owns the exit
|
||||
|
||||
# Phase 2: poll for graceful exit within the grace window.
|
||||
grace = settings.agent_kill_grace_seconds
|
||||
poll_interval = 0.5
|
||||
waited = 0.0
|
||||
while waited < grace:
|
||||
time.sleep(poll_interval)
|
||||
waited += poll_interval
|
||||
try:
|
||||
os.kill(pid, 0) # signal 0 = liveness probe, does not kill
|
||||
except ProcessLookupError:
|
||||
logger.info(
|
||||
f"Agent run_id={run_id} exited gracefully after SIGTERM "
|
||||
f"({waited:.1f}s); no SIGKILL needed"
|
||||
)
|
||||
self._record_kill(run_id)
|
||||
return
|
||||
|
||||
# Phase 3: still alive -> hard SIGKILL.
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
logger.warning(f"Agent run_id={run_id} killed after {timeout}s timeout")
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
|
||||
(run_id,),
|
||||
logger.warning(
|
||||
f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL"
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except ProcessLookupError:
|
||||
pass # Already finished
|
||||
logger.info(f"Agent run_id={run_id} exited just before SIGKILL")
|
||||
self._record_kill(run_id)
|
||||
|
||||
@staticmethod
|
||||
def _record_kill(run_id: int):
|
||||
"""Stamp the agent_runs row as timeout-killed (exit_code=-9).
|
||||
|
||||
ORCH-1: -9 is the existing kill-exit contract the monitor/retry logic keys
|
||||
off, so we keep it stable whether the reap came from SIGTERM or SIGKILL.
|
||||
"""
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
|
||||
(run_id,),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
|
||||
"""Wait for agent to finish, commit+push results, update DB.
|
||||
|
||||
@@ -53,6 +53,19 @@ class Settings(BaseSettings):
|
||||
breaker_threshold: int = 3
|
||||
breaker_pause_seconds: int = 300
|
||||
|
||||
# ORCH-7 (M-2): agent timeout + graceful kill.
|
||||
# agent_timeout_seconds -> default per-agent wall-clock budget; the watchdog
|
||||
# kills the run after this (env ORCH_AGENT_TIMEOUT_SECONDS).
|
||||
# agent_kill_grace_seconds-> pause between SIGTERM and SIGKILL so claude can
|
||||
# flush artifacts before the hard kill
|
||||
# (env ORCH_AGENT_KILL_GRACE_SECONDS).
|
||||
# agent_timeout_overrides_json -> optional per-agent override JSON object,
|
||||
# e.g. {"reviewer": 3600, "architect": 2700}
|
||||
# (env ORCH_AGENT_TIMEOUT_OVERRIDES_JSON).
|
||||
agent_timeout_seconds: int = 1800
|
||||
agent_kill_grace_seconds: int = 20
|
||||
agent_timeout_overrides_json: str = ""
|
||||
|
||||
|
||||
# Telegram notifications
|
||||
telegram_bot_token: str = ""
|
||||
|
||||
Reference in New Issue
Block a user