From 3345c2fa0aecf6c455ce737e6549a4d777b45673 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Tue, 2 Jun 2026 23:58:44 +0300 Subject: [PATCH] feat(launcher): launch_job + job-status finalize with retries (ORCH-1) Refactor launch() into shared _spawn(); add launch_job(job) that threads job_id through monitor/watchdog. _finalize_job marks done / requeue (attempts enqueue_job. B-1/B-2/M-1/ORCH-2 spawn logic unchanged. --- src/agents/launcher.py | 120 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 107 insertions(+), 13 deletions(-) diff --git a/src/agents/launcher.py b/src/agents/launcher.py index b129412..dafedac 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -4,7 +4,7 @@ import logging import threading import signal from ..config import settings -from ..db import get_db, get_task_by_repo_branch, update_task_stage +from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage from ..git_worktree import ensure_worktree, get_worktree_path from ..qg.checks import QG_CHECKS @@ -57,7 +57,10 @@ class AgentLauncher: def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int: """ - Launch a Claude CLI agent. + Launch a Claude CLI agent directly (legacy synchronous path). + + Kept for backward compatibility (direct callers / existing tests). The + ORCH-1 job queue uses launch_job() instead, but both share _spawn(). Args: agent: Agent role (analyst, architect, developer, reviewer, tester) @@ -68,6 +71,31 @@ class AgentLauncher: Returns: agent_run_id from DB """ + return self._spawn(agent, repo, task_content, task_id, job_id=None) + + def launch_job(self, job: dict) -> int: + """ORCH-1: launch an agent for a claimed queue job. + + Same spawn path as launch(), but threads job['id'] through so the monitor + can update the job's status (done / requeue / failed) and link jobs.run_id + to the agent_runs row. Returns the agent_run_id. + """ + return self._spawn( + job["agent"], + job["repo"], + job.get("task_content"), + job.get("task_id"), + job_id=job["id"], + ) + + def _spawn(self, agent: str, repo: str, task_content: str = None, + task_id: int = None, job_id: int = None) -> int: + """Shared spawn implementation for launch() and launch_job(). + + When job_id is set, the monitor/watchdog drive the jobs table status + (ORCH-1). The claude-CLI Popen logic (B-2) and worktree/task-file logic + (B-1 / ORCH-2) are unchanged. + """ config = self.AGENT_CONFIGS.get(agent) if not config: raise ValueError(f"Unknown agent: {agent}") @@ -98,6 +126,14 @@ class AgentLauncher: run_id = cursor.lastrowid conn.commit() + # ORCH-1: link this job to the agent_runs row and stamp started_at. + if job_id is not None: + conn.execute( + "UPDATE jobs SET run_id = ?, started_at = datetime('now') WHERE id = ?", + (run_id, job_id), + ) + conn.commit() + # Prepare output log path output_path = f"/app/data/runs/{run_id}.log" os.makedirs(os.path.dirname(output_path), exist_ok=True) @@ -154,6 +190,7 @@ class AgentLauncher: t = threading.Thread( target=self._watchdog, args=(proc.pid, run_id), + kwargs={"job_id": job_id}, daemon=True, ) t.start() @@ -163,6 +200,7 @@ class AgentLauncher: m = threading.Thread( target=self._monitor_agent, args=(proc, run_id, agent, repo, agent_branch, output_path, log_fh), + kwargs={"job_id": job_id}, daemon=True, ) m.start() @@ -171,8 +209,13 @@ class AgentLauncher: notify_agent_started(run_id, agent, task_id) return run_id - def _watchdog(self, pid: int, run_id: int, timeout: int = None): - """Kill agent if it exceeds timeout.""" + def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None): + """Kill agent if it exceeds timeout. + + ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit + code and drives the job retry/fail logic, so the watchdog itself only needs + to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry. + """ import time if timeout is None: timeout = self.AGENT_TIMEOUT @@ -190,7 +233,7 @@ class AgentLauncher: except ProcessLookupError: pass # Already finished - def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None): + def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None): """Wait for agent to finish, commit+push results, update DB. B-2 fix: stdout already goes straight to the log file via Popen, so we just @@ -318,6 +361,57 @@ class AgentLauncher: if exit_code == 0: self._try_advance_stage(run_id, agent, repo, branch) + # ORCH-1: drive the job-queue status for queue-launched jobs only. + # (Legacy direct launch() has job_id=None and is unaffected.) + if job_id is not None: + self._finalize_job(job_id, agent, run_id, exit_code) + + def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code): + """ORCH-1: update the jobs row after the agent process finished. + + exit_code == 0 -> done. + exit_code != 0 -> retry while attempts < max_attempts (back to 'queued'), + otherwise 'failed' + Telegram notification. + attempts were already incremented at claim time, so `attempts` reflects how + many times this job has been picked up. + """ + from ..db import get_job, mark_job + try: + job = get_job(job_id) + if not job: + return + if exit_code == 0: + mark_job(job_id, "done", run_id=run_id) + logger.info(f"Job {job_id} ({agent}) done (run_id={run_id})") + return + + attempts = job.get("attempts", 0) + max_attempts = job.get("max_attempts", 2) + err = f"agent {agent} exit_code={exit_code} (run_id={run_id})" + if attempts < max_attempts: + mark_job(job_id, "queued", run_id=run_id, error=err) + logger.warning( + f"Job {job_id} ({agent}) failed (exit={exit_code}), " + f"requeued (attempt {attempts}/{max_attempts})" + ) + else: + mark_job(job_id, "failed", run_id=run_id, error=err) + logger.error( + f"Job {job_id} ({agent}) failed permanently after " + f"{attempts} attempts (exit={exit_code})" + ) + try: + from ..notifications import send_telegram + send_telegram( + f"\U0001f6a8 Job {job_id} ({agent}, repo {job.get('repo')}) " + f"failed after {attempts} attempts (exit={exit_code}). " + f"Logs: /app/data/runs/{run_id}.log" + ) + except Exception: + pass + except Exception as e: + logger.error(f"Job {job_id}: _finalize_job error: {e}") + def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str): """After agent finishes successfully, check QG and advance stage if possible.""" try: @@ -416,8 +510,8 @@ class AgentLauncher: f"(attempt {retry_count+1}/3). Fix findings in " f"docs/work-items/{work_item_id}/12-review.md" ) - new_run = self.launch("developer", repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, relaunched developer (run_id={new_run})") + new_job = enqueue_job("developer", repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer (job_id={new_job})") else: from ..notifications import send_telegram send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.") @@ -446,8 +540,8 @@ class AgentLauncher: f"Stage: development\nNote: Tests FAILED. " f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md" ) - new_run = self.launch("developer", repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: tester FAIL, relaunched developer (run_id={new_run})") + new_job = enqueue_job("developer", repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})") else: from ..notifications import send_telegram from ..plane_sync import set_issue_blocked @@ -478,8 +572,8 @@ class AgentLauncher: f"Stage: analysis\nNote: Architect conflict. Revise TRZ. " f"See docs/work-items/{work_item_id}/10-conflict.md" ) - new_run = self.launch("analyst", repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: architect conflict, relaunched analyst") + new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: architect conflict, enqueued analyst (job_id={new_job})") return return @@ -496,8 +590,8 @@ class AgentLauncher: next_agent = get_agent_for_stage(next_stage) if next_agent: task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}" - new_run_id = self.launch(next_agent, repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: launched '{next_agent}' (run_id={new_run_id})") + new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})") except Exception as e: logger.error(f"Auto-advance failed for run_id={run_id}: {e}")