diff --git a/src/agents/launcher.py b/src/agents/launcher.py index 3f9afcb..232cf4e 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -1,8 +1,10 @@ import subprocess import os +import json import logging import threading import signal +import time from ..config import settings from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage @@ -53,7 +55,10 @@ class AgentLauncher: } CLAUDE_BIN = "/opt/claude-code/bin/claude.exe" - AGENT_TIMEOUT = 1800 # 30 minutes + # ORCH-7 (M-2): timeout is now configurable. AGENT_TIMEOUT stays as a + # backward-compatible alias for the default; the actual value (and per-agent + # overrides) live in settings and are resolved via _resolve_timeout(). + AGENT_TIMEOUT = settings.agent_timeout_seconds def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int: """ @@ -190,7 +195,7 @@ class AgentLauncher: t = threading.Thread( target=self._watchdog, args=(proc.pid, run_id), - kwargs={"job_id": job_id}, + kwargs={"job_id": job_id, "agent": agent}, daemon=True, ) t.start() @@ -209,29 +214,100 @@ class AgentLauncher: notify_agent_started(run_id, agent, task_id) return run_id - def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None): - """Kill agent if it exceeds timeout. + @staticmethod + def _resolve_timeout(agent: str = None) -> int: + """ORCH-7 (M-2): resolve the wall-clock timeout for an agent. + + Per-agent override from settings.agent_timeout_overrides_json (a JSON object + like {"reviewer": 3600}) wins; otherwise the global default + settings.agent_timeout_seconds is used. A malformed override JSON is ignored + (falls back to the default) and only logged, so a bad env never bricks runs. + """ + default = settings.agent_timeout_seconds + raw = (settings.agent_timeout_overrides_json or "").strip() + if agent and raw: + try: + overrides = json.loads(raw) + if isinstance(overrides, dict) and agent in overrides: + return int(overrides[agent]) + except (ValueError, TypeError) as e: + logger.warning(f"Invalid agent_timeout_overrides_json, using default: {e}") + return default + + def _watchdog(self, pid: int, run_id: int, timeout: int = None, + job_id: int = None, agent: str = None): + """Kill agent if it exceeds its timeout. ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit code and drives the job retry/fail logic, so the watchdog itself only needs - to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry. + to terminate the process and record the agent_runs exit. job_id is accepted + for symmetry. + + ORCH-7 (M-2): graceful shutdown. Instead of an immediate SIGKILL (which cuts + claude off mid-write and leaves half-written artifacts), send SIGTERM first, + give the process up to settings.agent_kill_grace_seconds to flush and exit on + its own, and only SIGKILL if it is still alive after the grace window. If the + process exits during the grace window, SIGKILL is NOT sent. + ProcessLookupError is tolerated at every step (the process may already be + gone). The recorded exit_code stays -9 to match the existing retry/fail + contract regardless of which signal actually reaped it. """ - import time if timeout is None: - timeout = self.AGENT_TIMEOUT + timeout = self._resolve_timeout(agent) time.sleep(timeout) + + # Phase 1: SIGTERM (graceful). If the process is already gone, we're done. + try: + os.kill(pid, signal.SIGTERM) + logger.warning( + f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM " + f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s" + ) + except ProcessLookupError: + logger.info(f"Agent run_id={run_id} already exited before SIGTERM") + return # nothing to record: the monitor's proc.wait() owns the exit + + # Phase 2: poll for graceful exit within the grace window. + grace = settings.agent_kill_grace_seconds + poll_interval = 0.5 + waited = 0.0 + while waited < grace: + time.sleep(poll_interval) + waited += poll_interval + try: + os.kill(pid, 0) # signal 0 = liveness probe, does not kill + except ProcessLookupError: + logger.info( + f"Agent run_id={run_id} exited gracefully after SIGTERM " + f"({waited:.1f}s); no SIGKILL needed" + ) + self._record_kill(run_id) + return + + # Phase 3: still alive -> hard SIGKILL. try: os.kill(pid, signal.SIGKILL) - logger.warning(f"Agent run_id={run_id} killed after {timeout}s timeout") - conn = get_db() - conn.execute( - "UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?", - (run_id,), + logger.warning( + f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL" ) - conn.commit() - conn.close() except ProcessLookupError: - pass # Already finished + logger.info(f"Agent run_id={run_id} exited just before SIGKILL") + self._record_kill(run_id) + + @staticmethod + def _record_kill(run_id: int): + """Stamp the agent_runs row as timeout-killed (exit_code=-9). + + ORCH-1: -9 is the existing kill-exit contract the monitor/retry logic keys + off, so we keep it stable whether the reap came from SIGTERM or SIGKILL. + """ + conn = get_db() + conn.execute( + "UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?", + (run_id,), + ) + conn.commit() + conn.close() def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None): """Wait for agent to finish, commit+push results, update DB. @@ -703,47 +779,6 @@ class AgentLauncher: logger.error(f"Failed to create PR for {branch}: {e}") return None - def _auto_merge_pr(self, repo: str, branch: str, task_id: int, work_item_id: str): - import httpx - owner = settings.gitea_owner - headers = {"Authorization": f"token {settings.gitea_token}"} - base_url = f"{settings.gitea_url}/api/v1" - try: - resp = httpx.get( - f"{base_url}/repos/{owner}/{repo}/pulls", - params={"state": "open", "head": branch}, - headers=headers, timeout=10 - ) - resp.raise_for_status() - prs = resp.json() - if not prs: - pr_number = self._ensure_pr(repo, branch, 0) - if not pr_number: - return False - else: - pr_number = prs[0]["number"] - resp = httpx.post( - f"{base_url}/repos/{owner}/{repo}/pulls/{pr_number}/merge", - json={"Do": "merge"}, - headers=headers, timeout=30 - ) - if resp.status_code in (200, 204): - logger.info(f"PR #{pr_number} merged for {branch}") - update_task_stage(task_id, "done") - notify_stage_change(task_id, "deploy", "done") - plane_notify_stage(work_item_id, "deploy", "done") - from ..notifications import send_telegram - send_telegram(f"\u2705 {work_item_id}: PR #{pr_number} merged! deploy -> done. Task complete.") - return True - else: - logger.error(f"Merge failed for PR #{pr_number}: {resp.status_code} {resp.text}") - from ..notifications import send_telegram - send_telegram(f"\u26a0\ufe0f {work_item_id}: Auto-merge failed (HTTP {resp.status_code}). Manual merge needed.") - return False - except Exception as e: - logger.error(f"Auto-merge failed for {branch}: {e}") - return False - def _write_task_file(self, repo: str, branch: str, task_file: str, content: str): """Write task file directly into the task's worktree. diff --git a/src/config.py b/src/config.py index e3d3003..6d454e0 100644 --- a/src/config.py +++ b/src/config.py @@ -53,6 +53,19 @@ class Settings(BaseSettings): breaker_threshold: int = 3 breaker_pause_seconds: int = 300 + # ORCH-7 (M-2): agent timeout + graceful kill. + # agent_timeout_seconds -> default per-agent wall-clock budget; the watchdog + # kills the run after this (env ORCH_AGENT_TIMEOUT_SECONDS). + # agent_kill_grace_seconds-> pause between SIGTERM and SIGKILL so claude can + # flush artifacts before the hard kill + # (env ORCH_AGENT_KILL_GRACE_SECONDS). + # agent_timeout_overrides_json -> optional per-agent override JSON object, + # e.g. {"reviewer": 3600, "architect": 2700} + # (env ORCH_AGENT_TIMEOUT_OVERRIDES_JSON). + agent_timeout_seconds: int = 1800 + agent_kill_grace_seconds: int = 20 + agent_timeout_overrides_json: str = "" + # Telegram notifications telegram_bot_token: str = "" diff --git a/tests/test_launcher.py b/tests/test_launcher.py index 8f05dda..e2ec215 100644 --- a/tests/test_launcher.py +++ b/tests/test_launcher.py @@ -7,6 +7,7 @@ Covers the audit-2026-06-02 fixes: the YAML frontmatter only (no fragile substring matching). """ import os +import signal import tempfile import pytest @@ -20,6 +21,7 @@ os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" from src.agents.launcher import AgentLauncher from src.qg.checks import check_reviewer_verdict +from src.config import settings # --------------------------------------------------------------------------- @@ -138,3 +140,141 @@ class TestCheckReviewerVerdict: passed, reason = check_reviewer_verdict("enduro-trails", "ET-999") assert passed is False assert "not found" in reason.lower() + + +# --------------------------------------------------------------------------- +# ORCH-7 (M-4): dead code removed +# --------------------------------------------------------------------------- +class TestDeadCodeRemoved: + """M-4: _auto_merge_pr was never called (merge is the deployer's job) and is + removed. _ensure_pr (used by the auto-advance path) must stay.""" + + def test_auto_merge_pr_is_gone(self): + assert not hasattr(AgentLauncher, "_auto_merge_pr") + + def test_ensure_pr_still_present(self): + assert hasattr(AgentLauncher, "_ensure_pr") + + +# --------------------------------------------------------------------------- +# ORCH-7 (M-2): configurable timeout + per-agent override +# --------------------------------------------------------------------------- +class TestResolveTimeout: + """M-2: _resolve_timeout honours a per-agent JSON override, else the default.""" + + def test_default_when_no_override(self, monkeypatch): + monkeypatch.setattr(settings, "agent_timeout_seconds", 1800) + monkeypatch.setattr(settings, "agent_timeout_overrides_json", "") + assert AgentLauncher._resolve_timeout("developer") == 1800 + assert AgentLauncher._resolve_timeout(None) == 1800 + + def test_override_for_specific_agent(self, monkeypatch): + monkeypatch.setattr(settings, "agent_timeout_seconds", 1800) + monkeypatch.setattr( + settings, "agent_timeout_overrides_json", '{"reviewer": 3600, "architect": 2700}' + ) + assert AgentLauncher._resolve_timeout("reviewer") == 3600 + assert AgentLauncher._resolve_timeout("architect") == 2700 + # an agent not in the override map falls back to the default + assert AgentLauncher._resolve_timeout("developer") == 1800 + + def test_malformed_override_falls_back_to_default(self, monkeypatch): + monkeypatch.setattr(settings, "agent_timeout_seconds", 1800) + monkeypatch.setattr(settings, "agent_timeout_overrides_json", "{not-json") + # must not raise, must return the default + assert AgentLauncher._resolve_timeout("reviewer") == 1800 + + +class TestWatchdogGracefulKill: + """M-2: SIGTERM -> grace -> SIGKILL ordering, with graceful-exit short-circuit + and ProcessLookupError tolerance. The OS process is fully faked: we record the + signals sent and decide liveness from a script, so no real process is touched.""" + + def _patch_db(self, monkeypatch): + """Stub get_db so _record_kill does not need a real DB.""" + class _Conn: + def execute(self, *a, **k): + return self + def commit(self): + pass + def close(self): + pass + monkeypatch.setattr("src.agents.launcher.get_db", lambda: _Conn()) + + def test_sigterm_then_sigkill_after_grace(self, monkeypatch): + """Process stays alive through the whole grace window -> SIGTERM then SIGKILL.""" + self._patch_db(monkeypatch) + monkeypatch.setattr(settings, "agent_kill_grace_seconds", 1) + monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None) + + sent = [] + + def fake_kill(pid, sig): + sent.append(sig) + # signal 0 (liveness probe) -> always alive; never raise + return None + + monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill) + + launcher = AgentLauncher() + launcher._watchdog(pid=4242, run_id=1, timeout=0, agent="developer") + + assert signal.SIGTERM in sent + assert signal.SIGKILL in sent + # SIGTERM must come before SIGKILL + assert sent.index(signal.SIGTERM) < sent.index(signal.SIGKILL) + + def test_graceful_exit_in_grace_skips_sigkill(self, monkeypatch): + """Process dies during the grace window -> SIGKILL is NOT sent.""" + self._patch_db(monkeypatch) + monkeypatch.setattr(settings, "agent_kill_grace_seconds", 5) + monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None) + + sent = [] + state = {"alive": True, "probes": 0} + + def fake_kill(pid, sig): + if sig == 0: + state["probes"] += 1 + # die on the 2nd liveness probe (within grace) + if state["probes"] >= 2: + raise ProcessLookupError + return None + sent.append(sig) + return None + + monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill) + + launcher = AgentLauncher() + launcher._watchdog(pid=4242, run_id=2, timeout=0, agent="developer") + + assert signal.SIGTERM in sent + assert signal.SIGKILL not in sent + + def test_already_dead_before_sigterm(self, monkeypatch): + """Process already gone at SIGTERM -> ProcessLookupError tolerated, no SIGKILL, + and _record_kill is NOT called (the monitor's proc.wait owns the exit).""" + self._patch_db(monkeypatch) + monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None) + + sent = [] + + def fake_kill(pid, sig): + if sig == signal.SIGTERM: + raise ProcessLookupError + sent.append(sig) + return None + + recorded = {"called": False} + monkeypatch.setattr( + AgentLauncher, "_record_kill", + staticmethod(lambda rid: recorded.__setitem__("called", True)), + ) + monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill) + + launcher = AgentLauncher() + # must not raise + launcher._watchdog(pid=4242, run_id=3, timeout=0, agent="developer") + + assert signal.SIGKILL not in sent + assert recorded["called"] is False