import subprocess import os import json import logging import threading import signal import time from ..config import settings from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage from ..git_worktree import ensure_worktree, get_worktree_path from ..qg.checks import QG_CHECKS from ..notifications import notify_stage_change, notify_qg_failure, notify_agent_started, notify_agent_finished, notify_approve_requested from ..plane_sync import notify_stage_change as plane_notify_stage, add_comment as plane_add_comment logger = logging.getLogger("orchestrator.launcher") def prune_run_logs(runs_dir, keep_days=30, keep_max=500, active_paths=None): """L-2: best-effort rotation of per-run logs (/*.log). A log file is removed if it is older than keep_days OR it is not within the keep_max most-recent logs (whichever condition is met first). Only *.log files directly inside runs_dir are considered; non-.log files and subdirectories are never touched. Files whose path is in active_paths (the currently running log) are always kept. Returns the number of files removed. Never raises: any error is logged and swallowed so log rotation can never bring the app down. """ removed = 0 try: active = set() for ap in (active_paths or []): try: active.add(os.path.realpath(ap)) except Exception: active.add(ap) if not os.path.isdir(runs_dir): return 0 logs = [] for name in os.listdir(runs_dir): if not name.endswith(".log"): continue path = os.path.join(runs_dir, name) if not os.path.isfile(path): continue if os.path.realpath(path) in active: continue try: mtime = os.path.getmtime(path) except OSError: continue logs.append((path, mtime)) logs.sort(key=lambda t: t[1], reverse=True) cutoff = time.time() - keep_days * 86400 for idx, (path, mtime) in enumerate(logs): too_old = mtime < cutoff over_max = idx >= keep_max if too_old or over_max: try: os.remove(path) removed += 1 except OSError as e: logger.warning(f"prune_run_logs: failed to remove {path}: {e}") except Exception as e: logger.warning(f"prune_run_logs failed for {runs_dir}: {e}") return removed class AgentLauncher: """Launch Claude CLI agents directly (binary mounted into container).""" AGENT_CONFIGS = { "analyst": { "system_prompt": ".openclaw/agents/analyst.md", "task_file": ".task.md", "allowed_tools": "Read,Write,Edit,Bash", }, "architect": { "system_prompt": ".openclaw/agents/architect.md", "task_file": ".task-arch.md", "allowed_tools": "Read,Write,Edit,Bash", "model": "opus", }, "developer": { "system_prompt": ".openclaw/agents/developer.md", "task_file": ".task-dev.md", "allowed_tools": "Read,Write,Edit,Bash", }, "reviewer": { "system_prompt": ".openclaw/agents/reviewer.md", "task_file": ".task-review.md", "allowed_tools": "Read,Write,Edit,Bash", "model": "opus", }, "tester": { "system_prompt": ".openclaw/agents/tester.md", "task_file": ".task-test.md", "allowed_tools": "Read,Write,Edit,Bash", }, "deployer": { "task_file": ".task-deploy.md", "system_prompt": ".openclaw/agents/deployer.md", "allowed_tools": "Read,Write,Edit,Bash", }, } CLAUDE_BIN = "/opt/claude-code/bin/claude.exe" # ORCH-7 (M-2): timeout is now configurable. AGENT_TIMEOUT stays as a # backward-compatible alias for the default; the actual value (and per-agent # overrides) live in settings and are resolved via _resolve_timeout(). AGENT_TIMEOUT = settings.agent_timeout_seconds def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int: """ Launch a Claude CLI agent directly (legacy synchronous path). Kept for backward compatibility (direct callers / existing tests). The ORCH-1 job queue uses launch_job() instead, but both share _spawn(). Args: agent: Agent role (analyst, architect, developer, reviewer, tester) repo: Repository name task_content: Optional task content to write to task file task_id: Optional task ID to associate with this run Returns: agent_run_id from DB """ return self._spawn(agent, repo, task_content, task_id, job_id=None) def launch_job(self, job: dict) -> int: """ORCH-1: launch an agent for a claimed queue job. Same spawn path as launch(), but threads job['id'] through so the monitor can update the job's status (done / requeue / failed) and link jobs.run_id to the agent_runs row. Returns the agent_run_id. """ return self._spawn( job["agent"], job["repo"], job.get("task_content"), job.get("task_id"), job_id=job["id"], ) def _spawn(self, agent: str, repo: str, task_content: str = None, task_id: int = None, job_id: int = None) -> int: """Shared spawn implementation for launch() and launch_job(). When job_id is set, the monitor/watchdog drive the jobs table status (ORCH-1). The claude-CLI Popen logic (B-2) and worktree/task-file logic (B-1 / ORCH-2) are unchanged. """ config = self.AGENT_CONFIGS.get(agent) if not config: raise ValueError(f"Unknown agent: {agent}") # Main clone lives at /repos/; the agent works in an isolated worktree # (ORCH-2 / S-4) so concurrent tasks never fight over a shared checkout. local_repo_path = os.path.join(settings.repos_dir, repo) if not os.path.isdir(local_repo_path): raise FileNotFoundError(f"Repo not found: {local_repo_path}") # Determine branch (needed before we touch the worktree / task file). _br_row = get_db().execute("SELECT branch FROM tasks WHERE id=?", (task_id,)).fetchone() if task_id else None agent_branch = _br_row[0] if _br_row else "main" # Ensure the per-branch worktree exists and is on the right branch. work_path = ensure_worktree(repo, agent_branch) # Write task file if content provided (B-1: direct write; now into the worktree). if task_content: self._write_task_file(repo, agent_branch, config["task_file"], task_content) # Record run in DB conn = get_db() cursor = conn.execute( "INSERT INTO agent_runs (task_id, agent) VALUES (?, ?)", (task_id, agent), ) run_id = cursor.lastrowid conn.commit() # ORCH-1: link this job to the agent_runs row and stamp started_at. if job_id is not None: conn.execute( "UPDATE jobs SET run_id = ?, started_at = datetime('now') WHERE id = ?", (run_id, job_id), ) conn.commit() # Prepare output log path output_path = f"/app/data/runs/{run_id}.log" os.makedirs(os.path.dirname(output_path), exist_ok=True) # Build the claude command task_file = config["task_file"] system_prompt = config["system_prompt"] allowed_tools = config["allowed_tools"] model = config.get("model", "") model_flag = f"--model {model} " if model else "" # No git fetch/checkout here: ensure_worktree() already put the worktree on # the right branch. The agent simply runs inside its isolated work_path. # Feature 4 (token usage): --output-format json makes claude emit a single # result JSON (with usage + total_cost_usd) at the end of stdout. The log # still captures it; _monitor_agent parses the trailing JSON after the run # to record per-agent tokens/cost. _monitor_agent's failure handling keys # off the process exit_code (not stdout shape), so this is safe. cmd = ( f'cd {work_path} && ' f'{self.CLAUDE_BIN} --print ' f'--output-format json ' f'{model_flag}' f'"$(cat {task_file})" ' f'--system-prompt "$(cat {system_prompt})" ' f'--allowedTools {allowed_tools}' ) logger.info(f"Launching agent '{agent}' for repo '{repo}', run_id={run_id}") # Launch as background process. # B-2 fix: redirect stdout/stderr straight to the log file at the OS level. # No PIPE in the orchestrator process -> no PIPE deadlock, no reader thread, # no zombies. log_fh is closed by _monitor_agent after proc.wait(). log_fh = open(output_path, "w") proc = subprocess.Popen( ["bash", "-c", cmd], stdout=log_fh, stderr=subprocess.STDOUT, env={ **os.environ, "HOME": "/home/slin", "GIT_AUTHOR_NAME": "claude-bot", "GIT_AUTHOR_EMAIL": "claude-bot@mva154.local", "GIT_COMMITTER_NAME": "claude-bot", "GIT_COMMITTER_EMAIL": "claude-bot@mva154.local", }, ) # Update DB with output path conn.execute( "UPDATE agent_runs SET output_path = ? WHERE id = ?", (output_path, run_id), ) conn.commit() conn.close() # Start timeout watchdog t = threading.Thread( target=self._watchdog, args=(proc.pid, run_id), kwargs={"job_id": job_id, "agent": agent}, daemon=True, ) t.start() # Start monitor thread (waits for completion, commits, pushes) # agent_branch already computed above m = threading.Thread( target=self._monitor_agent, args=(proc, run_id, agent, repo, agent_branch, output_path, log_fh), kwargs={"job_id": job_id}, daemon=True, ) m.start() logger.info(f"Agent '{agent}' launched, pid={proc.pid}, run_id={run_id}") notify_agent_started(run_id, agent, task_id) return run_id @staticmethod def _resolve_timeout(agent: str = None) -> int: """ORCH-7 (M-2): resolve the wall-clock timeout for an agent. Per-agent override from settings.agent_timeout_overrides_json (a JSON object like {"reviewer": 3600}) wins; otherwise the global default settings.agent_timeout_seconds is used. A malformed override JSON is ignored (falls back to the default) and only logged, so a bad env never bricks runs. """ default = settings.agent_timeout_seconds raw = (settings.agent_timeout_overrides_json or "").strip() if agent and raw: try: overrides = json.loads(raw) if isinstance(overrides, dict) and agent in overrides: return int(overrides[agent]) except (ValueError, TypeError) as e: logger.warning(f"Invalid agent_timeout_overrides_json, using default: {e}") return default def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None, agent: str = None): """Kill agent if it exceeds its timeout. ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit code and drives the job retry/fail logic, so the watchdog itself only needs to terminate the process and record the agent_runs exit. job_id is accepted for symmetry. ORCH-7 (M-2): graceful shutdown. Instead of an immediate SIGKILL (which cuts claude off mid-write and leaves half-written artifacts), send SIGTERM first, give the process up to settings.agent_kill_grace_seconds to flush and exit on its own, and only SIGKILL if it is still alive after the grace window. If the process exits during the grace window, SIGKILL is NOT sent. ProcessLookupError is tolerated at every step (the process may already be gone). The recorded exit_code stays -9 to match the existing retry/fail contract regardless of which signal actually reaped it. """ if timeout is None: timeout = self._resolve_timeout(agent) time.sleep(timeout) # Phase 1: SIGTERM (graceful). If the process is already gone, we're done. try: os.kill(pid, signal.SIGTERM) logger.warning( f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM " f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s" ) except ProcessLookupError: logger.info(f"Agent run_id={run_id} already exited before SIGTERM") return # nothing to record: the monitor's proc.wait() owns the exit # Phase 2: poll for graceful exit within the grace window. grace = settings.agent_kill_grace_seconds poll_interval = 0.5 waited = 0.0 while waited < grace: time.sleep(poll_interval) waited += poll_interval try: os.kill(pid, 0) # signal 0 = liveness probe, does not kill except ProcessLookupError: logger.info( f"Agent run_id={run_id} exited gracefully after SIGTERM " f"({waited:.1f}s); no SIGKILL needed" ) self._record_kill(run_id) return # Phase 3: still alive -> hard SIGKILL. try: os.kill(pid, signal.SIGKILL) logger.warning( f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL" ) except ProcessLookupError: logger.info(f"Agent run_id={run_id} exited just before SIGKILL") self._record_kill(run_id) @staticmethod def _record_kill(run_id: int): """Stamp the agent_runs row as timeout-killed (exit_code=-9). ORCH-1: -9 is the existing kill-exit contract the monitor/retry logic keys off, so we keep it stable whether the reap came from SIGTERM or SIGKILL. """ conn = get_db() conn.execute( "UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?", (run_id,), ) conn.commit() conn.close() def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None): """Wait for agent to finish, commit+push results, update DB. B-2 fix: stdout already goes straight to the log file via Popen, so we just block on proc.wait() (guaranteed reap -> no zombie, real exit_code) and then close the log file handle. No PIPE, no select loop, no startup timeout here (the watchdog still enforces the overall AGENT_TIMEOUT by pid). """ import time as _time _start_ts = _time.time() exit_code = proc.wait() if log_fh is not None: try: log_fh.close() except Exception: pass _duration_s = int(_time.time() - _start_ts) logger.info(f"Agent run_id={run_id} ({agent}) finished with exit_code={exit_code}") # Update DB conn = get_db() conn.execute( "UPDATE agent_runs SET finished_at=datetime('now'), exit_code=? WHERE id=?", (exit_code, run_id), ) conn.commit() # Get task_id for notification _row = conn.execute("SELECT task_id FROM agent_runs WHERE id=?", (run_id,)).fetchone() _task_id = _row[0] if _row else None conn.close() notify_agent_finished(run_id, agent, exit_code, task_id=_task_id, duration_s=_duration_s) # Feature 4: parse token usage / cost from the (json) run log and record # it on the agent_runs row. Never fatal — a garbled/missing JSON records # NULLs and logs a warning so a broken run can't crash the monitor. try: from ..usage import parse_usage_from_log, record_usage _usage = parse_usage_from_log(output_path) if output_path else None record_usage(run_id, _usage) except Exception as e: logger.warning(f"run_id={run_id}: usage accounting failed: {e}") _usage = None # Commit and push any changes — in the per-branch worktree (ORCH-2 / S-4), # NOT in the shared /repos/. The worktree is already on `branch` # (ensure_worktree did the checkout), so no checkout is needed here. repo_path = get_worktree_path(repo, branch) try: git_env = { **os.environ, "HOME": "/home/slin", "GIT_AUTHOR_NAME": "claude-bot", "GIT_AUTHOR_EMAIL": "claude-bot@mva154.local", "GIT_COMMITTER_NAME": "claude-bot", "GIT_COMMITTER_EMAIL": "claude-bot@mva154.local", } result = subprocess.run( ["git", "-C", repo_path, "status", "--porcelain"], capture_output=True, text=True, timeout=10, env=git_env ) if result.stdout.strip(): # Add docs/ always subprocess.run( ["git", "-C", repo_path, "add", "docs/"], capture_output=True, text=True, timeout=10, env=git_env ) # Add src/ and tests/ for developer if agent == "developer": subprocess.run( ["git", "-C", repo_path, "add", "src/", "tests/"], capture_output=True, text=True, timeout=10, env=git_env ) # Commit commit_result = subprocess.run( ["git", "-C", repo_path, "commit", "-m", f"{agent}(ET): auto-commit from {agent} run_id={run_id}"], capture_output=True, text=True, timeout=30, env=git_env ) if commit_result.returncode == 0: push_result = subprocess.run( ["git", "-C", repo_path, "push", "origin", branch], capture_output=True, text=True, timeout=60, env=git_env ) if push_result.returncode == 0: logger.info(f"Agent run_id={run_id}: committed and pushed to {branch}") # Auto-create PR after developer pushes if agent == "developer": self._ensure_pr(repo, branch, run_id) else: logger.error(f"Agent run_id={run_id}: push failed: {push_result.stderr}") else: logger.warning(f"Agent run_id={run_id}: commit failed: {commit_result.stderr}") else: logger.info(f"Agent run_id={run_id}: no changes to commit") except Exception as e: logger.error(f"Agent run_id={run_id}: post-run git failed: {e}") # Handle deployer failure (smoke/healthcheck failed) — Task 7 if exit_code != 0 and agent == "deployer": conn = get_db() task_row = conn.execute( "SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?", (repo, branch), ).fetchone() conn.close() if task_row: _tid, _wid = task_row update_task_stage(_tid, "development") notify_stage_change(_tid, "deploy", "development") plane_notify_stage(_wid, "deploy", "development") from ..plane_sync import set_issue_blocked set_issue_blocked(_wid) plane_add_comment( _wid, "\u274c Deploy FAILED (smoke/healthcheck). Rolled back. Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.", author="deployer", ) from ..notifications import send_telegram send_telegram(f"\U0001f6a8 {_wid}: Deploy failed! Rolled back. Needs fix.") # Notify on startup timeout (exit_code from kill = -9 or 137) if exit_code != 0 and exit_code not in (None,): conn = get_db() task_row = conn.execute( "SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?", (repo, branch), ).fetchone() conn.close() if task_row and agent != "deployer": # deployer handled above _tid, _wid = task_row from ..notifications import send_telegram send_telegram(f"\u26a0\ufe0f {_wid}: Agent {agent} failed (exit_code={exit_code}). Check logs: /app/data/runs/{run_id}.log") # Feature 4: post the per-agent usage comment under that agent's bot, and # — for the deployer finishing the task — the per-task usage summary. if exit_code == 0: try: self._post_usage_comments(run_id, agent, repo, branch, _usage) except Exception as e: logger.warning(f"run_id={run_id}: usage comment failed: {e}") # Auto-advance stage if agent finished successfully and QG passes if exit_code == 0: self._try_advance_stage(run_id, agent, repo, branch) # ORCH-1: drive the job-queue status for queue-launched jobs only. # (Legacy direct launch() has job_id=None and is unaffected.) if job_id is not None: self._finalize_job(job_id, agent, run_id, exit_code, output_path=output_path) def _backoff_seconds(self, transient_attempts: int, retry_after: int = None) -> int: """Exponential backoff for transient failures, honouring Retry-After. backoff = min(2^transient_attempts * base, max). If the server sent a Retry-After, use the larger of the two (never poll sooner than asked). """ base = settings.backoff_base_seconds cap = settings.backoff_max_seconds backoff = min((2 ** max(transient_attempts, 0)) * base, cap) if retry_after is not None and retry_after > 0: backoff = max(backoff, min(retry_after, cap)) return int(backoff) def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code, output_path=None): """ORCH-1: update the jobs row after the agent process finished. exit_code == 0 -> done (and resets the breaker streak via on_outcome). exit_code != 0 -> classify the failure from the run log tail (token-free): - TRANSIENT (429/overload/network): backoff-requeue with available_at in the future + a SEPARATE transient_attempts budget (settings.transient_max_attempts), honouring Retry-After. Reported to the breaker so it opens after N consecutive transient failures. - PERMANENT (code fault): ordinary attempts < max_attempts requeue, otherwise 'failed' + Telegram. """ from ..db import get_job, mark_job from ..error_classifier import classify_log_file try: job = get_job(job_id) if not job: return if exit_code == 0: mark_job(job_id, "done", run_id=run_id) logger.info(f"Job {job_id} ({agent}) done (run_id={run_id})") self._record_outcome(transient=False, recovered=True) return # Classify the failure from the agent log tail (no token cost). kind, retry_after = "permanent", None log_path = output_path or f"/app/data/runs/{run_id}.log" try: kind, retry_after = classify_log_file(log_path) except Exception: pass if kind == "transient": self._finalize_transient(job_id, agent, run_id, exit_code, job, retry_after) else: self._finalize_permanent(job_id, agent, run_id, exit_code, job) except Exception as e: logger.error(f"Job {job_id}: _finalize_job error: {e}") def _finalize_transient(self, job_id, agent, run_id, exit_code, job, retry_after): """Transient (429/overload/net) failure -> backoff requeue or fail when budget out.""" from ..db import mark_job, mark_job_transient tattempts = job.get("transient_attempts", 0) tmax = settings.transient_max_attempts err = (f"transient (429/overload) agent {agent} exit={exit_code} " f"(run_id={run_id}); retry_after={retry_after}") self._record_outcome(transient=True, recovered=False) if tattempts < tmax: backoff = self._backoff_seconds(tattempts + 1, retry_after) mark_job_transient(job_id, backoff, error=err) logger.warning( f"Job {job_id} ({agent}) TRANSIENT fail (exit={exit_code}), " f"backoff {backoff}s, transient_attempt {tattempts + 1}/{tmax}" ) else: mark_job(job_id, "failed", run_id=run_id, error=err) logger.error( f"Job {job_id} ({agent}) failed after {tattempts} transient attempts" ) self._notify_failed(job_id, agent, job, run_id, f"transient (rate-limit) after {tattempts} attempts") def _finalize_permanent(self, job_id, agent, run_id, exit_code, job): """Permanent (code-fault) failure -> normal attempts/), not the shared /repos/, so the agent reads the task ZADANIE from its own isolated working copy. Raise on failure instead of silently swallowing errors. """ work_path = get_worktree_path(repo, branch) # /repos/_wt// full_path = os.path.join(work_path, task_file) try: with open(full_path, "w", encoding="utf-8") as f: f.write(content) logger.info(f"Task file written: {full_path} ({len(content)} bytes)") except OSError as e: logger.error(f"Failed to write task file {full_path}: {e}") raise RuntimeError(f"Failed to write task file: {e}") launcher = AgentLauncher()