ORCH-7: cleanup + hardening (M-4 dead code + M-2 graceful timeout) #4

Merged
admin merged 3 commits from feature/ORCH-7-hardening into main 2026-06-03 08:31:26 +03:00
3 changed files with 244 additions and 56 deletions

View File

@@ -1,8 +1,10 @@
import subprocess
import os
import json
import logging
import threading
import signal
import time
from ..config import settings
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
@@ -53,7 +55,10 @@ class AgentLauncher:
}
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
AGENT_TIMEOUT = 1800 # 30 minutes
# ORCH-7 (M-2): timeout is now configurable. AGENT_TIMEOUT stays as a
# backward-compatible alias for the default; the actual value (and per-agent
# overrides) live in settings and are resolved via _resolve_timeout().
AGENT_TIMEOUT = settings.agent_timeout_seconds
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
"""
@@ -190,7 +195,7 @@ class AgentLauncher:
t = threading.Thread(
target=self._watchdog,
args=(proc.pid, run_id),
kwargs={"job_id": job_id},
kwargs={"job_id": job_id, "agent": agent},
daemon=True,
)
t.start()
@@ -209,29 +214,100 @@ class AgentLauncher:
notify_agent_started(run_id, agent, task_id)
return run_id
def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None):
"""Kill agent if it exceeds timeout.
@staticmethod
def _resolve_timeout(agent: str = None) -> int:
"""ORCH-7 (M-2): resolve the wall-clock timeout for an agent.
Per-agent override from settings.agent_timeout_overrides_json (a JSON object
like {"reviewer": 3600}) wins; otherwise the global default
settings.agent_timeout_seconds is used. A malformed override JSON is ignored
(falls back to the default) and only logged, so a bad env never bricks runs.
"""
default = settings.agent_timeout_seconds
raw = (settings.agent_timeout_overrides_json or "").strip()
if agent and raw:
try:
overrides = json.loads(raw)
if isinstance(overrides, dict) and agent in overrides:
return int(overrides[agent])
except (ValueError, TypeError) as e:
logger.warning(f"Invalid agent_timeout_overrides_json, using default: {e}")
return default
def _watchdog(self, pid: int, run_id: int, timeout: int = None,
job_id: int = None, agent: str = None):
"""Kill agent if it exceeds its timeout.
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
code and drives the job retry/fail logic, so the watchdog itself only needs
to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry.
to terminate the process and record the agent_runs exit. job_id is accepted
for symmetry.
ORCH-7 (M-2): graceful shutdown. Instead of an immediate SIGKILL (which cuts
claude off mid-write and leaves half-written artifacts), send SIGTERM first,
give the process up to settings.agent_kill_grace_seconds to flush and exit on
its own, and only SIGKILL if it is still alive after the grace window. If the
process exits during the grace window, SIGKILL is NOT sent.
ProcessLookupError is tolerated at every step (the process may already be
gone). The recorded exit_code stays -9 to match the existing retry/fail
contract regardless of which signal actually reaped it.
"""
import time
if timeout is None:
timeout = self.AGENT_TIMEOUT
timeout = self._resolve_timeout(agent)
time.sleep(timeout)
# Phase 1: SIGTERM (graceful). If the process is already gone, we're done.
try:
os.kill(pid, signal.SIGTERM)
logger.warning(
f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM "
f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s"
)
except ProcessLookupError:
logger.info(f"Agent run_id={run_id} already exited before SIGTERM")
return # nothing to record: the monitor's proc.wait() owns the exit
# Phase 2: poll for graceful exit within the grace window.
grace = settings.agent_kill_grace_seconds
poll_interval = 0.5
waited = 0.0
while waited < grace:
time.sleep(poll_interval)
waited += poll_interval
try:
os.kill(pid, 0) # signal 0 = liveness probe, does not kill
except ProcessLookupError:
logger.info(
f"Agent run_id={run_id} exited gracefully after SIGTERM "
f"({waited:.1f}s); no SIGKILL needed"
)
self._record_kill(run_id)
return
# Phase 3: still alive -> hard SIGKILL.
try:
os.kill(pid, signal.SIGKILL)
logger.warning(f"Agent run_id={run_id} killed after {timeout}s timeout")
conn = get_db()
conn.execute(
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
(run_id,),
logger.warning(
f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL"
)
conn.commit()
conn.close()
except ProcessLookupError:
pass # Already finished
logger.info(f"Agent run_id={run_id} exited just before SIGKILL")
self._record_kill(run_id)
@staticmethod
def _record_kill(run_id: int):
"""Stamp the agent_runs row as timeout-killed (exit_code=-9).
ORCH-1: -9 is the existing kill-exit contract the monitor/retry logic keys
off, so we keep it stable whether the reap came from SIGTERM or SIGKILL.
"""
conn = get_db()
conn.execute(
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
(run_id,),
)
conn.commit()
conn.close()
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
"""Wait for agent to finish, commit+push results, update DB.
@@ -703,47 +779,6 @@ class AgentLauncher:
logger.error(f"Failed to create PR for {branch}: {e}")
return None
def _auto_merge_pr(self, repo: str, branch: str, task_id: int, work_item_id: str):
import httpx
owner = settings.gitea_owner
headers = {"Authorization": f"token {settings.gitea_token}"}
base_url = f"{settings.gitea_url}/api/v1"
try:
resp = httpx.get(
f"{base_url}/repos/{owner}/{repo}/pulls",
params={"state": "open", "head": branch},
headers=headers, timeout=10
)
resp.raise_for_status()
prs = resp.json()
if not prs:
pr_number = self._ensure_pr(repo, branch, 0)
if not pr_number:
return False
else:
pr_number = prs[0]["number"]
resp = httpx.post(
f"{base_url}/repos/{owner}/{repo}/pulls/{pr_number}/merge",
json={"Do": "merge"},
headers=headers, timeout=30
)
if resp.status_code in (200, 204):
logger.info(f"PR #{pr_number} merged for {branch}")
update_task_stage(task_id, "done")
notify_stage_change(task_id, "deploy", "done")
plane_notify_stage(work_item_id, "deploy", "done")
from ..notifications import send_telegram
send_telegram(f"\u2705 {work_item_id}: PR #{pr_number} merged! deploy -> done. Task complete.")
return True
else:
logger.error(f"Merge failed for PR #{pr_number}: {resp.status_code} {resp.text}")
from ..notifications import send_telegram
send_telegram(f"\u26a0\ufe0f {work_item_id}: Auto-merge failed (HTTP {resp.status_code}). Manual merge needed.")
return False
except Exception as e:
logger.error(f"Auto-merge failed for {branch}: {e}")
return False
def _write_task_file(self, repo: str, branch: str, task_file: str, content: str):
"""Write task file directly into the task's worktree.

View File

@@ -53,6 +53,19 @@ class Settings(BaseSettings):
breaker_threshold: int = 3
breaker_pause_seconds: int = 300
# ORCH-7 (M-2): agent timeout + graceful kill.
# agent_timeout_seconds -> default per-agent wall-clock budget; the watchdog
# kills the run after this (env ORCH_AGENT_TIMEOUT_SECONDS).
# agent_kill_grace_seconds-> pause between SIGTERM and SIGKILL so claude can
# flush artifacts before the hard kill
# (env ORCH_AGENT_KILL_GRACE_SECONDS).
# agent_timeout_overrides_json -> optional per-agent override JSON object,
# e.g. {"reviewer": 3600, "architect": 2700}
# (env ORCH_AGENT_TIMEOUT_OVERRIDES_JSON).
agent_timeout_seconds: int = 1800
agent_kill_grace_seconds: int = 20
agent_timeout_overrides_json: str = ""
# Telegram notifications
telegram_bot_token: str = ""

View File

@@ -7,6 +7,7 @@ Covers the audit-2026-06-02 fixes:
the YAML frontmatter only (no fragile substring matching).
"""
import os
import signal
import tempfile
import pytest
@@ -20,6 +21,7 @@ os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
from src.agents.launcher import AgentLauncher
from src.qg.checks import check_reviewer_verdict
from src.config import settings
# ---------------------------------------------------------------------------
@@ -138,3 +140,141 @@ class TestCheckReviewerVerdict:
passed, reason = check_reviewer_verdict("enduro-trails", "ET-999")
assert passed is False
assert "not found" in reason.lower()
# ---------------------------------------------------------------------------
# ORCH-7 (M-4): dead code removed
# ---------------------------------------------------------------------------
class TestDeadCodeRemoved:
"""M-4: _auto_merge_pr was never called (merge is the deployer's job) and is
removed. _ensure_pr (used by the auto-advance path) must stay."""
def test_auto_merge_pr_is_gone(self):
assert not hasattr(AgentLauncher, "_auto_merge_pr")
def test_ensure_pr_still_present(self):
assert hasattr(AgentLauncher, "_ensure_pr")
# ---------------------------------------------------------------------------
# ORCH-7 (M-2): configurable timeout + per-agent override
# ---------------------------------------------------------------------------
class TestResolveTimeout:
"""M-2: _resolve_timeout honours a per-agent JSON override, else the default."""
def test_default_when_no_override(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "")
assert AgentLauncher._resolve_timeout("developer") == 1800
assert AgentLauncher._resolve_timeout(None) == 1800
def test_override_for_specific_agent(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(
settings, "agent_timeout_overrides_json", '{"reviewer": 3600, "architect": 2700}'
)
assert AgentLauncher._resolve_timeout("reviewer") == 3600
assert AgentLauncher._resolve_timeout("architect") == 2700
# an agent not in the override map falls back to the default
assert AgentLauncher._resolve_timeout("developer") == 1800
def test_malformed_override_falls_back_to_default(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "{not-json")
# must not raise, must return the default
assert AgentLauncher._resolve_timeout("reviewer") == 1800
class TestWatchdogGracefulKill:
"""M-2: SIGTERM -> grace -> SIGKILL ordering, with graceful-exit short-circuit
and ProcessLookupError tolerance. The OS process is fully faked: we record the
signals sent and decide liveness from a script, so no real process is touched."""
def _patch_db(self, monkeypatch):
"""Stub get_db so _record_kill does not need a real DB."""
class _Conn:
def execute(self, *a, **k):
return self
def commit(self):
pass
def close(self):
pass
monkeypatch.setattr("src.agents.launcher.get_db", lambda: _Conn())
def test_sigterm_then_sigkill_after_grace(self, monkeypatch):
"""Process stays alive through the whole grace window -> SIGTERM then SIGKILL."""
self._patch_db(monkeypatch)
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 1)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
def fake_kill(pid, sig):
sent.append(sig)
# signal 0 (liveness probe) -> always alive; never raise
return None
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
launcher._watchdog(pid=4242, run_id=1, timeout=0, agent="developer")
assert signal.SIGTERM in sent
assert signal.SIGKILL in sent
# SIGTERM must come before SIGKILL
assert sent.index(signal.SIGTERM) < sent.index(signal.SIGKILL)
def test_graceful_exit_in_grace_skips_sigkill(self, monkeypatch):
"""Process dies during the grace window -> SIGKILL is NOT sent."""
self._patch_db(monkeypatch)
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 5)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
state = {"alive": True, "probes": 0}
def fake_kill(pid, sig):
if sig == 0:
state["probes"] += 1
# die on the 2nd liveness probe (within grace)
if state["probes"] >= 2:
raise ProcessLookupError
return None
sent.append(sig)
return None
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
launcher._watchdog(pid=4242, run_id=2, timeout=0, agent="developer")
assert signal.SIGTERM in sent
assert signal.SIGKILL not in sent
def test_already_dead_before_sigterm(self, monkeypatch):
"""Process already gone at SIGTERM -> ProcessLookupError tolerated, no SIGKILL,
and _record_kill is NOT called (the monitor's proc.wait owns the exit)."""
self._patch_db(monkeypatch)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
def fake_kill(pid, sig):
if sig == signal.SIGTERM:
raise ProcessLookupError
sent.append(sig)
return None
recorded = {"called": False}
monkeypatch.setattr(
AgentLauncher, "_record_kill",
staticmethod(lambda rid: recorded.__setitem__("called", True)),
)
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
# must not raise
launcher._watchdog(pid=4242, run_id=3, timeout=0, agent="developer")
assert signal.SIGKILL not in sent
assert recorded["called"] is False