diff --git a/src/agents/launcher.py b/src/agents/launcher.py index 3a60484..47d6574 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -73,15 +73,13 @@ class AgentLauncher: # Container-local path (repos mounted at /repos) local_repo_path = os.path.join(settings.repos_dir, repo) - # Host path (for docker run write operations) - host_repo_path = os.path.join(settings.host_repos_dir, repo) if not os.path.isdir(local_repo_path): raise FileNotFoundError(f"Repo not found: {local_repo_path}") - # Write task file if content provided + # Write task file if content provided (B-1: direct write to mounted /repos, no docker) if task_content: - self._write_task_file(host_repo_path, config["task_file"], task_content) + self._write_task_file(repo, config["task_file"], task_content) # Record run in DB conn = get_db() @@ -119,10 +117,14 @@ class AgentLauncher: logger.info(f"Launching agent '{agent}' for repo '{repo}', run_id={run_id}") - # Launch as background process + # Launch as background process. + # B-2 fix: redirect stdout/stderr straight to the log file at the OS level. + # No PIPE in the orchestrator process -> no PIPE deadlock, no reader thread, + # no zombies. log_fh is closed by _monitor_agent after proc.wait(). + log_fh = open(output_path, "w") proc = subprocess.Popen( ["bash", "-c", cmd], - stdout=subprocess.PIPE, + stdout=log_fh, stderr=subprocess.STDOUT, env={ **os.environ, @@ -154,7 +156,7 @@ class AgentLauncher: # agent_branch already computed above m = threading.Thread( target=self._monitor_agent, - args=(proc, run_id, agent, repo, agent_branch, output_path), + args=(proc, run_id, agent, repo, agent_branch, output_path, log_fh), daemon=True, ) m.start() @@ -182,59 +184,23 @@ class AgentLauncher: except ProcessLookupError: pass # Already finished - def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None): - """Wait for agent to finish, commit+push results, update DB.""" + def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None): + """Wait for agent to finish, commit+push results, update DB. + + B-2 fix: stdout already goes straight to the log file via Popen, so we just + block on proc.wait() (guaranteed reap -> no zombie, real exit_code) and then + close the log file handle. No PIPE, no select loop, no startup timeout here + (the watchdog still enforces the overall AGENT_TIMEOUT by pid). + """ import time as _time _start_ts = _time.time() - # Stream stdout PIPE to log file with startup timeout - if output_path and proc.stdout: - try: - with open(output_path, "w") as log_file: - _got_real_output = False - _startup_timeout = 120 # seconds - - import select - while True: - # Check if process has finished - if proc.poll() is not None: - # Read remaining output - remaining = proc.stdout.read() - if remaining: - log_file.write(remaining.decode("utf-8", errors="replace")) - log_file.flush() - break - - # Use select to wait for output with timeout - ready, _, _ = select.select([proc.stdout], [], [], 10) - if ready: - line = proc.stdout.readline() - if not line: - break # EOF - decoded = line.decode("utf-8", errors="replace") - log_file.write(decoded) - log_file.flush() - - # Check if this is real output (not just git checkout noise) - stripped = decoded.strip() - if stripped and not stripped.startswith(("M\t", "Your branch", "Already on", "Switched to")): - _got_real_output = True - else: - # Timeout on select - check startup timeout - if not _got_real_output and (_time.time() - _start_ts) > _startup_timeout: - logger.error(f"Agent run_id={run_id} ({agent}): no output after {_startup_timeout}s, killing") - try: - proc.kill() - except Exception: - pass - log_file.write(f"\n[TIMEOUT] No output after {_startup_timeout}s - process killed\n") - break - - proc.stdout.close() - except Exception as e: - logger.error(f"Agent run_id={run_id}: log streaming error: {e}") - exit_code = proc.wait() + if log_fh is not None: + try: + log_fh.close() + except Exception: + pass _duration_s = int(_time.time() - _start_ts) logger.info(f"Agent run_id={run_id} ({agent}) finished with exit_code={exit_code}") @@ -425,7 +391,7 @@ class AgentLauncher: "\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433." ) return - elif qg_name == "check_ci_green": + elif qg_name in ("check_ci_green", "check_tests_local"): passed, reason = check_fn(repo, branch) elif qg_name == "check_tests_passed": passed, reason = check_fn(repo, work_item_id or "") @@ -612,31 +578,22 @@ class AgentLauncher: logger.error(f"Auto-merge failed for {branch}: {e}") return False - def _write_task_file(self, host_repo_path: str, task_file: str, content: str): - """Write task file to host repo via docker run with stdin.""" - full_path = os.path.join(host_repo_path, task_file) - # Use docker run with stdin to write content to the file - cmd = [ - "docker", "run", "--rm", "-i", - "-v", f"{host_repo_path}:{host_repo_path}", - "-w", host_repo_path, - "python:3.12-slim", - "bash", "-c", f"cat > {full_path}", - ] + def _write_task_file(self, repo: str, task_file: str, content: str): + """Write task file directly to the mounted repo volume (/repos). + + B-1 fix: no docker. The repos directory is mounted RW at settings.repos_dir + (/repos inside the container), so write straight to /repos//. + Raise on failure instead of silently swallowing errors. + """ + container_repo_path = os.path.join(settings.repos_dir, repo) # /repos/ + full_path = os.path.join(container_repo_path, task_file) try: - result = subprocess.run( - cmd, - input=content, - capture_output=True, - text=True, - timeout=30, - ) - if result.returncode != 0: - logger.error(f"Failed to write task file: {result.stderr}") - raise RuntimeError(f"Failed to write task file: {result.stderr}") - logger.info(f"Task file written: {full_path}") - except subprocess.TimeoutExpired: - raise RuntimeError("Timeout writing task file") + with open(full_path, "w", encoding="utf-8") as f: + f.write(content) + logger.info(f"Task file written: {full_path} ({len(content)} bytes)") + except OSError as e: + logger.error(f"Failed to write task file {full_path}: {e}") + raise RuntimeError(f"Failed to write task file: {e}") launcher = AgentLauncher()