Merge pull request 'ORCH-7: cleanup + hardening (M-4 dead code + M-2 graceful timeout)' (#4) from feature/ORCH-7-hardening into main
This commit was merged in pull request #4.
This commit is contained in:
@@ -1,8 +1,10 @@
|
|||||||
import subprocess
|
import subprocess
|
||||||
import os
|
import os
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import signal
|
import signal
|
||||||
|
import time
|
||||||
from ..config import settings
|
from ..config import settings
|
||||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||||
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||||
@@ -53,7 +55,10 @@ class AgentLauncher:
|
|||||||
}
|
}
|
||||||
|
|
||||||
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
|
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
|
||||||
AGENT_TIMEOUT = 1800 # 30 minutes
|
# ORCH-7 (M-2): timeout is now configurable. AGENT_TIMEOUT stays as a
|
||||||
|
# backward-compatible alias for the default; the actual value (and per-agent
|
||||||
|
# overrides) live in settings and are resolved via _resolve_timeout().
|
||||||
|
AGENT_TIMEOUT = settings.agent_timeout_seconds
|
||||||
|
|
||||||
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
|
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
|
||||||
"""
|
"""
|
||||||
@@ -190,7 +195,7 @@ class AgentLauncher:
|
|||||||
t = threading.Thread(
|
t = threading.Thread(
|
||||||
target=self._watchdog,
|
target=self._watchdog,
|
||||||
args=(proc.pid, run_id),
|
args=(proc.pid, run_id),
|
||||||
kwargs={"job_id": job_id},
|
kwargs={"job_id": job_id, "agent": agent},
|
||||||
daemon=True,
|
daemon=True,
|
||||||
)
|
)
|
||||||
t.start()
|
t.start()
|
||||||
@@ -209,29 +214,100 @@ class AgentLauncher:
|
|||||||
notify_agent_started(run_id, agent, task_id)
|
notify_agent_started(run_id, agent, task_id)
|
||||||
return run_id
|
return run_id
|
||||||
|
|
||||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None):
|
@staticmethod
|
||||||
"""Kill agent if it exceeds timeout.
|
def _resolve_timeout(agent: str = None) -> int:
|
||||||
|
"""ORCH-7 (M-2): resolve the wall-clock timeout for an agent.
|
||||||
|
|
||||||
|
Per-agent override from settings.agent_timeout_overrides_json (a JSON object
|
||||||
|
like {"reviewer": 3600}) wins; otherwise the global default
|
||||||
|
settings.agent_timeout_seconds is used. A malformed override JSON is ignored
|
||||||
|
(falls back to the default) and only logged, so a bad env never bricks runs.
|
||||||
|
"""
|
||||||
|
default = settings.agent_timeout_seconds
|
||||||
|
raw = (settings.agent_timeout_overrides_json or "").strip()
|
||||||
|
if agent and raw:
|
||||||
|
try:
|
||||||
|
overrides = json.loads(raw)
|
||||||
|
if isinstance(overrides, dict) and agent in overrides:
|
||||||
|
return int(overrides[agent])
|
||||||
|
except (ValueError, TypeError) as e:
|
||||||
|
logger.warning(f"Invalid agent_timeout_overrides_json, using default: {e}")
|
||||||
|
return default
|
||||||
|
|
||||||
|
def _watchdog(self, pid: int, run_id: int, timeout: int = None,
|
||||||
|
job_id: int = None, agent: str = None):
|
||||||
|
"""Kill agent if it exceeds its timeout.
|
||||||
|
|
||||||
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
|
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
|
||||||
code and drives the job retry/fail logic, so the watchdog itself only needs
|
code and drives the job retry/fail logic, so the watchdog itself only needs
|
||||||
to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry.
|
to terminate the process and record the agent_runs exit. job_id is accepted
|
||||||
|
for symmetry.
|
||||||
|
|
||||||
|
ORCH-7 (M-2): graceful shutdown. Instead of an immediate SIGKILL (which cuts
|
||||||
|
claude off mid-write and leaves half-written artifacts), send SIGTERM first,
|
||||||
|
give the process up to settings.agent_kill_grace_seconds to flush and exit on
|
||||||
|
its own, and only SIGKILL if it is still alive after the grace window. If the
|
||||||
|
process exits during the grace window, SIGKILL is NOT sent.
|
||||||
|
ProcessLookupError is tolerated at every step (the process may already be
|
||||||
|
gone). The recorded exit_code stays -9 to match the existing retry/fail
|
||||||
|
contract regardless of which signal actually reaped it.
|
||||||
"""
|
"""
|
||||||
import time
|
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
timeout = self.AGENT_TIMEOUT
|
timeout = self._resolve_timeout(agent)
|
||||||
time.sleep(timeout)
|
time.sleep(timeout)
|
||||||
|
|
||||||
|
# Phase 1: SIGTERM (graceful). If the process is already gone, we're done.
|
||||||
|
try:
|
||||||
|
os.kill(pid, signal.SIGTERM)
|
||||||
|
logger.warning(
|
||||||
|
f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM "
|
||||||
|
f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s"
|
||||||
|
)
|
||||||
|
except ProcessLookupError:
|
||||||
|
logger.info(f"Agent run_id={run_id} already exited before SIGTERM")
|
||||||
|
return # nothing to record: the monitor's proc.wait() owns the exit
|
||||||
|
|
||||||
|
# Phase 2: poll for graceful exit within the grace window.
|
||||||
|
grace = settings.agent_kill_grace_seconds
|
||||||
|
poll_interval = 0.5
|
||||||
|
waited = 0.0
|
||||||
|
while waited < grace:
|
||||||
|
time.sleep(poll_interval)
|
||||||
|
waited += poll_interval
|
||||||
|
try:
|
||||||
|
os.kill(pid, 0) # signal 0 = liveness probe, does not kill
|
||||||
|
except ProcessLookupError:
|
||||||
|
logger.info(
|
||||||
|
f"Agent run_id={run_id} exited gracefully after SIGTERM "
|
||||||
|
f"({waited:.1f}s); no SIGKILL needed"
|
||||||
|
)
|
||||||
|
self._record_kill(run_id)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Phase 3: still alive -> hard SIGKILL.
|
||||||
try:
|
try:
|
||||||
os.kill(pid, signal.SIGKILL)
|
os.kill(pid, signal.SIGKILL)
|
||||||
logger.warning(f"Agent run_id={run_id} killed after {timeout}s timeout")
|
logger.warning(
|
||||||
conn = get_db()
|
f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL"
|
||||||
conn.execute(
|
|
||||||
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
|
|
||||||
(run_id,),
|
|
||||||
)
|
)
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
except ProcessLookupError:
|
except ProcessLookupError:
|
||||||
pass # Already finished
|
logger.info(f"Agent run_id={run_id} exited just before SIGKILL")
|
||||||
|
self._record_kill(run_id)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _record_kill(run_id: int):
|
||||||
|
"""Stamp the agent_runs row as timeout-killed (exit_code=-9).
|
||||||
|
|
||||||
|
ORCH-1: -9 is the existing kill-exit contract the monitor/retry logic keys
|
||||||
|
off, so we keep it stable whether the reap came from SIGTERM or SIGKILL.
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
|
||||||
|
(run_id,),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
|
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
|
||||||
"""Wait for agent to finish, commit+push results, update DB.
|
"""Wait for agent to finish, commit+push results, update DB.
|
||||||
@@ -703,47 +779,6 @@ class AgentLauncher:
|
|||||||
logger.error(f"Failed to create PR for {branch}: {e}")
|
logger.error(f"Failed to create PR for {branch}: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _auto_merge_pr(self, repo: str, branch: str, task_id: int, work_item_id: str):
|
|
||||||
import httpx
|
|
||||||
owner = settings.gitea_owner
|
|
||||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
|
||||||
base_url = f"{settings.gitea_url}/api/v1"
|
|
||||||
try:
|
|
||||||
resp = httpx.get(
|
|
||||||
f"{base_url}/repos/{owner}/{repo}/pulls",
|
|
||||||
params={"state": "open", "head": branch},
|
|
||||||
headers=headers, timeout=10
|
|
||||||
)
|
|
||||||
resp.raise_for_status()
|
|
||||||
prs = resp.json()
|
|
||||||
if not prs:
|
|
||||||
pr_number = self._ensure_pr(repo, branch, 0)
|
|
||||||
if not pr_number:
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
pr_number = prs[0]["number"]
|
|
||||||
resp = httpx.post(
|
|
||||||
f"{base_url}/repos/{owner}/{repo}/pulls/{pr_number}/merge",
|
|
||||||
json={"Do": "merge"},
|
|
||||||
headers=headers, timeout=30
|
|
||||||
)
|
|
||||||
if resp.status_code in (200, 204):
|
|
||||||
logger.info(f"PR #{pr_number} merged for {branch}")
|
|
||||||
update_task_stage(task_id, "done")
|
|
||||||
notify_stage_change(task_id, "deploy", "done")
|
|
||||||
plane_notify_stage(work_item_id, "deploy", "done")
|
|
||||||
from ..notifications import send_telegram
|
|
||||||
send_telegram(f"\u2705 {work_item_id}: PR #{pr_number} merged! deploy -> done. Task complete.")
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
logger.error(f"Merge failed for PR #{pr_number}: {resp.status_code} {resp.text}")
|
|
||||||
from ..notifications import send_telegram
|
|
||||||
send_telegram(f"\u26a0\ufe0f {work_item_id}: Auto-merge failed (HTTP {resp.status_code}). Manual merge needed.")
|
|
||||||
return False
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Auto-merge failed for {branch}: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _write_task_file(self, repo: str, branch: str, task_file: str, content: str):
|
def _write_task_file(self, repo: str, branch: str, task_file: str, content: str):
|
||||||
"""Write task file directly into the task's worktree.
|
"""Write task file directly into the task's worktree.
|
||||||
|
|
||||||
|
|||||||
@@ -53,6 +53,19 @@ class Settings(BaseSettings):
|
|||||||
breaker_threshold: int = 3
|
breaker_threshold: int = 3
|
||||||
breaker_pause_seconds: int = 300
|
breaker_pause_seconds: int = 300
|
||||||
|
|
||||||
|
# ORCH-7 (M-2): agent timeout + graceful kill.
|
||||||
|
# agent_timeout_seconds -> default per-agent wall-clock budget; the watchdog
|
||||||
|
# kills the run after this (env ORCH_AGENT_TIMEOUT_SECONDS).
|
||||||
|
# agent_kill_grace_seconds-> pause between SIGTERM and SIGKILL so claude can
|
||||||
|
# flush artifacts before the hard kill
|
||||||
|
# (env ORCH_AGENT_KILL_GRACE_SECONDS).
|
||||||
|
# agent_timeout_overrides_json -> optional per-agent override JSON object,
|
||||||
|
# e.g. {"reviewer": 3600, "architect": 2700}
|
||||||
|
# (env ORCH_AGENT_TIMEOUT_OVERRIDES_JSON).
|
||||||
|
agent_timeout_seconds: int = 1800
|
||||||
|
agent_kill_grace_seconds: int = 20
|
||||||
|
agent_timeout_overrides_json: str = ""
|
||||||
|
|
||||||
|
|
||||||
# Telegram notifications
|
# Telegram notifications
|
||||||
telegram_bot_token: str = ""
|
telegram_bot_token: str = ""
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ Covers the audit-2026-06-02 fixes:
|
|||||||
the YAML frontmatter only (no fragile substring matching).
|
the YAML frontmatter only (no fragile substring matching).
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
|
import signal
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@@ -20,6 +21,7 @@ os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
|||||||
|
|
||||||
from src.agents.launcher import AgentLauncher
|
from src.agents.launcher import AgentLauncher
|
||||||
from src.qg.checks import check_reviewer_verdict
|
from src.qg.checks import check_reviewer_verdict
|
||||||
|
from src.config import settings
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -138,3 +140,141 @@ class TestCheckReviewerVerdict:
|
|||||||
passed, reason = check_reviewer_verdict("enduro-trails", "ET-999")
|
passed, reason = check_reviewer_verdict("enduro-trails", "ET-999")
|
||||||
assert passed is False
|
assert passed is False
|
||||||
assert "not found" in reason.lower()
|
assert "not found" in reason.lower()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# ORCH-7 (M-4): dead code removed
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestDeadCodeRemoved:
|
||||||
|
"""M-4: _auto_merge_pr was never called (merge is the deployer's job) and is
|
||||||
|
removed. _ensure_pr (used by the auto-advance path) must stay."""
|
||||||
|
|
||||||
|
def test_auto_merge_pr_is_gone(self):
|
||||||
|
assert not hasattr(AgentLauncher, "_auto_merge_pr")
|
||||||
|
|
||||||
|
def test_ensure_pr_still_present(self):
|
||||||
|
assert hasattr(AgentLauncher, "_ensure_pr")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# ORCH-7 (M-2): configurable timeout + per-agent override
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestResolveTimeout:
|
||||||
|
"""M-2: _resolve_timeout honours a per-agent JSON override, else the default."""
|
||||||
|
|
||||||
|
def test_default_when_no_override(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||||
|
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "")
|
||||||
|
assert AgentLauncher._resolve_timeout("developer") == 1800
|
||||||
|
assert AgentLauncher._resolve_timeout(None) == 1800
|
||||||
|
|
||||||
|
def test_override_for_specific_agent(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
settings, "agent_timeout_overrides_json", '{"reviewer": 3600, "architect": 2700}'
|
||||||
|
)
|
||||||
|
assert AgentLauncher._resolve_timeout("reviewer") == 3600
|
||||||
|
assert AgentLauncher._resolve_timeout("architect") == 2700
|
||||||
|
# an agent not in the override map falls back to the default
|
||||||
|
assert AgentLauncher._resolve_timeout("developer") == 1800
|
||||||
|
|
||||||
|
def test_malformed_override_falls_back_to_default(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||||
|
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "{not-json")
|
||||||
|
# must not raise, must return the default
|
||||||
|
assert AgentLauncher._resolve_timeout("reviewer") == 1800
|
||||||
|
|
||||||
|
|
||||||
|
class TestWatchdogGracefulKill:
|
||||||
|
"""M-2: SIGTERM -> grace -> SIGKILL ordering, with graceful-exit short-circuit
|
||||||
|
and ProcessLookupError tolerance. The OS process is fully faked: we record the
|
||||||
|
signals sent and decide liveness from a script, so no real process is touched."""
|
||||||
|
|
||||||
|
def _patch_db(self, monkeypatch):
|
||||||
|
"""Stub get_db so _record_kill does not need a real DB."""
|
||||||
|
class _Conn:
|
||||||
|
def execute(self, *a, **k):
|
||||||
|
return self
|
||||||
|
def commit(self):
|
||||||
|
pass
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
monkeypatch.setattr("src.agents.launcher.get_db", lambda: _Conn())
|
||||||
|
|
||||||
|
def test_sigterm_then_sigkill_after_grace(self, monkeypatch):
|
||||||
|
"""Process stays alive through the whole grace window -> SIGTERM then SIGKILL."""
|
||||||
|
self._patch_db(monkeypatch)
|
||||||
|
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 1)
|
||||||
|
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||||
|
|
||||||
|
sent = []
|
||||||
|
|
||||||
|
def fake_kill(pid, sig):
|
||||||
|
sent.append(sig)
|
||||||
|
# signal 0 (liveness probe) -> always alive; never raise
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||||
|
|
||||||
|
launcher = AgentLauncher()
|
||||||
|
launcher._watchdog(pid=4242, run_id=1, timeout=0, agent="developer")
|
||||||
|
|
||||||
|
assert signal.SIGTERM in sent
|
||||||
|
assert signal.SIGKILL in sent
|
||||||
|
# SIGTERM must come before SIGKILL
|
||||||
|
assert sent.index(signal.SIGTERM) < sent.index(signal.SIGKILL)
|
||||||
|
|
||||||
|
def test_graceful_exit_in_grace_skips_sigkill(self, monkeypatch):
|
||||||
|
"""Process dies during the grace window -> SIGKILL is NOT sent."""
|
||||||
|
self._patch_db(monkeypatch)
|
||||||
|
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 5)
|
||||||
|
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||||
|
|
||||||
|
sent = []
|
||||||
|
state = {"alive": True, "probes": 0}
|
||||||
|
|
||||||
|
def fake_kill(pid, sig):
|
||||||
|
if sig == 0:
|
||||||
|
state["probes"] += 1
|
||||||
|
# die on the 2nd liveness probe (within grace)
|
||||||
|
if state["probes"] >= 2:
|
||||||
|
raise ProcessLookupError
|
||||||
|
return None
|
||||||
|
sent.append(sig)
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||||
|
|
||||||
|
launcher = AgentLauncher()
|
||||||
|
launcher._watchdog(pid=4242, run_id=2, timeout=0, agent="developer")
|
||||||
|
|
||||||
|
assert signal.SIGTERM in sent
|
||||||
|
assert signal.SIGKILL not in sent
|
||||||
|
|
||||||
|
def test_already_dead_before_sigterm(self, monkeypatch):
|
||||||
|
"""Process already gone at SIGTERM -> ProcessLookupError tolerated, no SIGKILL,
|
||||||
|
and _record_kill is NOT called (the monitor's proc.wait owns the exit)."""
|
||||||
|
self._patch_db(monkeypatch)
|
||||||
|
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||||
|
|
||||||
|
sent = []
|
||||||
|
|
||||||
|
def fake_kill(pid, sig):
|
||||||
|
if sig == signal.SIGTERM:
|
||||||
|
raise ProcessLookupError
|
||||||
|
sent.append(sig)
|
||||||
|
return None
|
||||||
|
|
||||||
|
recorded = {"called": False}
|
||||||
|
monkeypatch.setattr(
|
||||||
|
AgentLauncher, "_record_kill",
|
||||||
|
staticmethod(lambda rid: recorded.__setitem__("called", True)),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||||
|
|
||||||
|
launcher = AgentLauncher()
|
||||||
|
# must not raise
|
||||||
|
launcher._watchdog(pid=4242, run_id=3, timeout=0, agent="developer")
|
||||||
|
|
||||||
|
assert signal.SIGKILL not in sent
|
||||||
|
assert recorded["called"] is False
|
||||||
|
|||||||
Reference in New Issue
Block a user