fix(launcher): write task file to /repos without docker; stdout->file, no PIPE zombies (B-1, B-2)
- _write_task_file writes directly to mounted /repos/<repo>, raises on failure - Popen stdout=log_fh at OS level; _monitor_agent simplified to proc.wait()+close - remove PIPE reader thread and startup-timeout (watchdog by pid stays) - dispatch check_tests_local args (repo, branch)
This commit is contained in:
@@ -73,15 +73,13 @@ class AgentLauncher:
|
||||
|
||||
# Container-local path (repos mounted at /repos)
|
||||
local_repo_path = os.path.join(settings.repos_dir, repo)
|
||||
# Host path (for docker run write operations)
|
||||
host_repo_path = os.path.join(settings.host_repos_dir, repo)
|
||||
|
||||
if not os.path.isdir(local_repo_path):
|
||||
raise FileNotFoundError(f"Repo not found: {local_repo_path}")
|
||||
|
||||
# Write task file if content provided
|
||||
# Write task file if content provided (B-1: direct write to mounted /repos, no docker)
|
||||
if task_content:
|
||||
self._write_task_file(host_repo_path, config["task_file"], task_content)
|
||||
self._write_task_file(repo, config["task_file"], task_content)
|
||||
|
||||
# Record run in DB
|
||||
conn = get_db()
|
||||
@@ -119,10 +117,14 @@ class AgentLauncher:
|
||||
|
||||
logger.info(f"Launching agent '{agent}' for repo '{repo}', run_id={run_id}")
|
||||
|
||||
# Launch as background process
|
||||
# Launch as background process.
|
||||
# B-2 fix: redirect stdout/stderr straight to the log file at the OS level.
|
||||
# No PIPE in the orchestrator process -> no PIPE deadlock, no reader thread,
|
||||
# no zombies. log_fh is closed by _monitor_agent after proc.wait().
|
||||
log_fh = open(output_path, "w")
|
||||
proc = subprocess.Popen(
|
||||
["bash", "-c", cmd],
|
||||
stdout=subprocess.PIPE,
|
||||
stdout=log_fh,
|
||||
stderr=subprocess.STDOUT,
|
||||
env={
|
||||
**os.environ,
|
||||
@@ -154,7 +156,7 @@ class AgentLauncher:
|
||||
# agent_branch already computed above
|
||||
m = threading.Thread(
|
||||
target=self._monitor_agent,
|
||||
args=(proc, run_id, agent, repo, agent_branch, output_path),
|
||||
args=(proc, run_id, agent, repo, agent_branch, output_path, log_fh),
|
||||
daemon=True,
|
||||
)
|
||||
m.start()
|
||||
@@ -182,59 +184,23 @@ class AgentLauncher:
|
||||
except ProcessLookupError:
|
||||
pass # Already finished
|
||||
|
||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None):
|
||||
"""Wait for agent to finish, commit+push results, update DB."""
|
||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None):
|
||||
"""Wait for agent to finish, commit+push results, update DB.
|
||||
|
||||
B-2 fix: stdout already goes straight to the log file via Popen, so we just
|
||||
block on proc.wait() (guaranteed reap -> no zombie, real exit_code) and then
|
||||
close the log file handle. No PIPE, no select loop, no startup timeout here
|
||||
(the watchdog still enforces the overall AGENT_TIMEOUT by pid).
|
||||
"""
|
||||
import time as _time
|
||||
_start_ts = _time.time()
|
||||
|
||||
# Stream stdout PIPE to log file with startup timeout
|
||||
if output_path and proc.stdout:
|
||||
try:
|
||||
with open(output_path, "w") as log_file:
|
||||
_got_real_output = False
|
||||
_startup_timeout = 120 # seconds
|
||||
|
||||
import select
|
||||
while True:
|
||||
# Check if process has finished
|
||||
if proc.poll() is not None:
|
||||
# Read remaining output
|
||||
remaining = proc.stdout.read()
|
||||
if remaining:
|
||||
log_file.write(remaining.decode("utf-8", errors="replace"))
|
||||
log_file.flush()
|
||||
break
|
||||
|
||||
# Use select to wait for output with timeout
|
||||
ready, _, _ = select.select([proc.stdout], [], [], 10)
|
||||
if ready:
|
||||
line = proc.stdout.readline()
|
||||
if not line:
|
||||
break # EOF
|
||||
decoded = line.decode("utf-8", errors="replace")
|
||||
log_file.write(decoded)
|
||||
log_file.flush()
|
||||
|
||||
# Check if this is real output (not just git checkout noise)
|
||||
stripped = decoded.strip()
|
||||
if stripped and not stripped.startswith(("M\t", "Your branch", "Already on", "Switched to")):
|
||||
_got_real_output = True
|
||||
else:
|
||||
# Timeout on select - check startup timeout
|
||||
if not _got_real_output and (_time.time() - _start_ts) > _startup_timeout:
|
||||
logger.error(f"Agent run_id={run_id} ({agent}): no output after {_startup_timeout}s, killing")
|
||||
try:
|
||||
proc.kill()
|
||||
except Exception:
|
||||
pass
|
||||
log_file.write(f"\n[TIMEOUT] No output after {_startup_timeout}s - process killed\n")
|
||||
break
|
||||
|
||||
proc.stdout.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Agent run_id={run_id}: log streaming error: {e}")
|
||||
|
||||
exit_code = proc.wait()
|
||||
if log_fh is not None:
|
||||
try:
|
||||
log_fh.close()
|
||||
except Exception:
|
||||
pass
|
||||
_duration_s = int(_time.time() - _start_ts)
|
||||
logger.info(f"Agent run_id={run_id} ({agent}) finished with exit_code={exit_code}")
|
||||
|
||||
@@ -425,7 +391,7 @@ class AgentLauncher:
|
||||
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433."
|
||||
)
|
||||
return
|
||||
elif qg_name == "check_ci_green":
|
||||
elif qg_name in ("check_ci_green", "check_tests_local"):
|
||||
passed, reason = check_fn(repo, branch)
|
||||
elif qg_name == "check_tests_passed":
|
||||
passed, reason = check_fn(repo, work_item_id or "")
|
||||
@@ -612,31 +578,22 @@ class AgentLauncher:
|
||||
logger.error(f"Auto-merge failed for {branch}: {e}")
|
||||
return False
|
||||
|
||||
def _write_task_file(self, host_repo_path: str, task_file: str, content: str):
|
||||
"""Write task file to host repo via docker run with stdin."""
|
||||
full_path = os.path.join(host_repo_path, task_file)
|
||||
# Use docker run with stdin to write content to the file
|
||||
cmd = [
|
||||
"docker", "run", "--rm", "-i",
|
||||
"-v", f"{host_repo_path}:{host_repo_path}",
|
||||
"-w", host_repo_path,
|
||||
"python:3.12-slim",
|
||||
"bash", "-c", f"cat > {full_path}",
|
||||
]
|
||||
def _write_task_file(self, repo: str, task_file: str, content: str):
|
||||
"""Write task file directly to the mounted repo volume (/repos).
|
||||
|
||||
B-1 fix: no docker. The repos directory is mounted RW at settings.repos_dir
|
||||
(/repos inside the container), so write straight to /repos/<repo>/<task_file>.
|
||||
Raise on failure instead of silently swallowing errors.
|
||||
"""
|
||||
container_repo_path = os.path.join(settings.repos_dir, repo) # /repos/<repo>
|
||||
full_path = os.path.join(container_repo_path, task_file)
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
input=content,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logger.error(f"Failed to write task file: {result.stderr}")
|
||||
raise RuntimeError(f"Failed to write task file: {result.stderr}")
|
||||
logger.info(f"Task file written: {full_path}")
|
||||
except subprocess.TimeoutExpired:
|
||||
raise RuntimeError("Timeout writing task file")
|
||||
with open(full_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
logger.info(f"Task file written: {full_path} ({len(content)} bytes)")
|
||||
except OSError as e:
|
||||
logger.error(f"Failed to write task file {full_path}: {e}")
|
||||
raise RuntimeError(f"Failed to write task file: {e}")
|
||||
|
||||
|
||||
launcher = AgentLauncher()
|
||||
|
||||
Reference in New Issue
Block a user