feat(launcher): launch_job + job-status finalize with retries (ORCH-1)
Refactor launch() into shared _spawn(); add launch_job(job) that threads job_id through monitor/watchdog. _finalize_job marks done / requeue (attempts<max) / failed+notify. Internal advance-chain self.launch -> enqueue_job. B-1/B-2/M-1/ORCH-2 spawn logic unchanged.
This commit is contained in:
@@ -4,7 +4,7 @@ import logging
|
||||
import threading
|
||||
import signal
|
||||
from ..config import settings
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||
from ..git_worktree import ensure_worktree, get_worktree_path
|
||||
from ..qg.checks import QG_CHECKS
|
||||
@@ -57,7 +57,10 @@ class AgentLauncher:
|
||||
|
||||
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
|
||||
"""
|
||||
Launch a Claude CLI agent.
|
||||
Launch a Claude CLI agent directly (legacy synchronous path).
|
||||
|
||||
Kept for backward compatibility (direct callers / existing tests). The
|
||||
ORCH-1 job queue uses launch_job() instead, but both share _spawn().
|
||||
|
||||
Args:
|
||||
agent: Agent role (analyst, architect, developer, reviewer, tester)
|
||||
@@ -68,6 +71,31 @@ class AgentLauncher:
|
||||
Returns:
|
||||
agent_run_id from DB
|
||||
"""
|
||||
return self._spawn(agent, repo, task_content, task_id, job_id=None)
|
||||
|
||||
def launch_job(self, job: dict) -> int:
|
||||
"""ORCH-1: launch an agent for a claimed queue job.
|
||||
|
||||
Same spawn path as launch(), but threads job['id'] through so the monitor
|
||||
can update the job's status (done / requeue / failed) and link jobs.run_id
|
||||
to the agent_runs row. Returns the agent_run_id.
|
||||
"""
|
||||
return self._spawn(
|
||||
job["agent"],
|
||||
job["repo"],
|
||||
job.get("task_content"),
|
||||
job.get("task_id"),
|
||||
job_id=job["id"],
|
||||
)
|
||||
|
||||
def _spawn(self, agent: str, repo: str, task_content: str = None,
|
||||
task_id: int = None, job_id: int = None) -> int:
|
||||
"""Shared spawn implementation for launch() and launch_job().
|
||||
|
||||
When job_id is set, the monitor/watchdog drive the jobs table status
|
||||
(ORCH-1). The claude-CLI Popen logic (B-2) and worktree/task-file logic
|
||||
(B-1 / ORCH-2) are unchanged.
|
||||
"""
|
||||
config = self.AGENT_CONFIGS.get(agent)
|
||||
if not config:
|
||||
raise ValueError(f"Unknown agent: {agent}")
|
||||
@@ -98,6 +126,14 @@ class AgentLauncher:
|
||||
run_id = cursor.lastrowid
|
||||
conn.commit()
|
||||
|
||||
# ORCH-1: link this job to the agent_runs row and stamp started_at.
|
||||
if job_id is not None:
|
||||
conn.execute(
|
||||
"UPDATE jobs SET run_id = ?, started_at = datetime('now') WHERE id = ?",
|
||||
(run_id, job_id),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# Prepare output log path
|
||||
output_path = f"/app/data/runs/{run_id}.log"
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
@@ -154,6 +190,7 @@ class AgentLauncher:
|
||||
t = threading.Thread(
|
||||
target=self._watchdog,
|
||||
args=(proc.pid, run_id),
|
||||
kwargs={"job_id": job_id},
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
@@ -163,6 +200,7 @@ class AgentLauncher:
|
||||
m = threading.Thread(
|
||||
target=self._monitor_agent,
|
||||
args=(proc, run_id, agent, repo, agent_branch, output_path, log_fh),
|
||||
kwargs={"job_id": job_id},
|
||||
daemon=True,
|
||||
)
|
||||
m.start()
|
||||
@@ -171,8 +209,13 @@ class AgentLauncher:
|
||||
notify_agent_started(run_id, agent, task_id)
|
||||
return run_id
|
||||
|
||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None):
|
||||
"""Kill agent if it exceeds timeout."""
|
||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None):
|
||||
"""Kill agent if it exceeds timeout.
|
||||
|
||||
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
|
||||
code and drives the job retry/fail logic, so the watchdog itself only needs
|
||||
to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry.
|
||||
"""
|
||||
import time
|
||||
if timeout is None:
|
||||
timeout = self.AGENT_TIMEOUT
|
||||
@@ -190,7 +233,7 @@ class AgentLauncher:
|
||||
except ProcessLookupError:
|
||||
pass # Already finished
|
||||
|
||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None):
|
||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
|
||||
"""Wait for agent to finish, commit+push results, update DB.
|
||||
|
||||
B-2 fix: stdout already goes straight to the log file via Popen, so we just
|
||||
@@ -318,6 +361,57 @@ class AgentLauncher:
|
||||
if exit_code == 0:
|
||||
self._try_advance_stage(run_id, agent, repo, branch)
|
||||
|
||||
# ORCH-1: drive the job-queue status for queue-launched jobs only.
|
||||
# (Legacy direct launch() has job_id=None and is unaffected.)
|
||||
if job_id is not None:
|
||||
self._finalize_job(job_id, agent, run_id, exit_code)
|
||||
|
||||
def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code):
|
||||
"""ORCH-1: update the jobs row after the agent process finished.
|
||||
|
||||
exit_code == 0 -> done.
|
||||
exit_code != 0 -> retry while attempts < max_attempts (back to 'queued'),
|
||||
otherwise 'failed' + Telegram notification.
|
||||
attempts were already incremented at claim time, so `attempts` reflects how
|
||||
many times this job has been picked up.
|
||||
"""
|
||||
from ..db import get_job, mark_job
|
||||
try:
|
||||
job = get_job(job_id)
|
||||
if not job:
|
||||
return
|
||||
if exit_code == 0:
|
||||
mark_job(job_id, "done", run_id=run_id)
|
||||
logger.info(f"Job {job_id} ({agent}) done (run_id={run_id})")
|
||||
return
|
||||
|
||||
attempts = job.get("attempts", 0)
|
||||
max_attempts = job.get("max_attempts", 2)
|
||||
err = f"agent {agent} exit_code={exit_code} (run_id={run_id})"
|
||||
if attempts < max_attempts:
|
||||
mark_job(job_id, "queued", run_id=run_id, error=err)
|
||||
logger.warning(
|
||||
f"Job {job_id} ({agent}) failed (exit={exit_code}), "
|
||||
f"requeued (attempt {attempts}/{max_attempts})"
|
||||
)
|
||||
else:
|
||||
mark_job(job_id, "failed", run_id=run_id, error=err)
|
||||
logger.error(
|
||||
f"Job {job_id} ({agent}) failed permanently after "
|
||||
f"{attempts} attempts (exit={exit_code})"
|
||||
)
|
||||
try:
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(
|
||||
f"\U0001f6a8 Job {job_id} ({agent}, repo {job.get('repo')}) "
|
||||
f"failed after {attempts} attempts (exit={exit_code}). "
|
||||
f"Logs: /app/data/runs/{run_id}.log"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"Job {job_id}: _finalize_job error: {e}")
|
||||
|
||||
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
|
||||
"""After agent finishes successfully, check QG and advance stage if possible."""
|
||||
try:
|
||||
@@ -416,8 +510,8 @@ class AgentLauncher:
|
||||
f"(attempt {retry_count+1}/3). Fix findings in "
|
||||
f"docs/work-items/{work_item_id}/12-review.md"
|
||||
)
|
||||
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, relaunched developer (run_id={new_run})")
|
||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer (job_id={new_job})")
|
||||
else:
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
|
||||
@@ -446,8 +540,8 @@ class AgentLauncher:
|
||||
f"Stage: development\nNote: Tests FAILED. "
|
||||
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
|
||||
)
|
||||
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: tester FAIL, relaunched developer (run_id={new_run})")
|
||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})")
|
||||
else:
|
||||
from ..notifications import send_telegram
|
||||
from ..plane_sync import set_issue_blocked
|
||||
@@ -478,8 +572,8 @@ class AgentLauncher:
|
||||
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
|
||||
f"See docs/work-items/{work_item_id}/10-conflict.md"
|
||||
)
|
||||
new_run = self.launch("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: architect conflict, relaunched analyst")
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: architect conflict, enqueued analyst (job_id={new_job})")
|
||||
return
|
||||
|
||||
return
|
||||
@@ -496,8 +590,8 @@ class AgentLauncher:
|
||||
next_agent = get_agent_for_stage(next_stage)
|
||||
if next_agent:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
||||
new_run_id = self.launch(next_agent, repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: launched '{next_agent}' (run_id={new_run_id})")
|
||||
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
|
||||
|
||||
Reference in New Issue
Block a user