feat: full pipeline fixes - CI status branch lookup, review webhook routing, auto-advance, plane sync
- handle_ci_status: fallback git branch -r --contains when branches[] empty - webhook router: handle pull_request_approved event type - handle_pr: map review.type to review.state for new Gitea format - launcher: auto-advance stage after agent completion (_try_advance_stage) - plane_sync: notify Plane on stage changes - stages.py: stage machine with QG definitions - notifications.py: stage change notifications - safe.directory fix for container git operations
This commit is contained in:
@@ -1,11 +1,20 @@
|
||||
import subprocess
|
||||
import os
|
||||
import logging
|
||||
import threading
|
||||
import signal
|
||||
from ..config import settings
|
||||
from ..db import get_db
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage
|
||||
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||
from ..qg.checks import QG_CHECKS
|
||||
from ..notifications import notify_stage_change, notify_qg_failure
|
||||
from ..plane_sync import notify_stage_change as plane_notify_stage
|
||||
|
||||
logger = logging.getLogger("orchestrator.launcher")
|
||||
|
||||
|
||||
class AgentLauncher:
|
||||
"""Launch Claude CLI agents for specific tasks."""
|
||||
"""Launch Claude CLI agents directly (binary mounted into container)."""
|
||||
|
||||
AGENT_CONFIGS = {
|
||||
"analyst": {
|
||||
@@ -35,7 +44,10 @@ class AgentLauncher:
|
||||
},
|
||||
}
|
||||
|
||||
def launch(self, agent: str, repo: str, task_content: str = None) -> int:
|
||||
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
|
||||
AGENT_TIMEOUT = 1800 # 30 minutes
|
||||
|
||||
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
|
||||
"""
|
||||
Launch a Claude CLI agent.
|
||||
|
||||
@@ -43,6 +55,7 @@ class AgentLauncher:
|
||||
agent: Agent role (analyst, architect, developer, reviewer, tester)
|
||||
repo: Repository name
|
||||
task_content: Optional task content to write to task file
|
||||
task_id: Optional task ID to associate with this run
|
||||
|
||||
Returns:
|
||||
agent_run_id from DB
|
||||
@@ -51,44 +64,60 @@ class AgentLauncher:
|
||||
if not config:
|
||||
raise ValueError(f"Unknown agent: {agent}")
|
||||
|
||||
repo_path = os.path.join(settings.repos_dir, repo)
|
||||
if not os.path.isdir(repo_path):
|
||||
raise FileNotFoundError(f"Repo not found: {repo_path}")
|
||||
# Container-local path (repos mounted at /repos)
|
||||
local_repo_path = os.path.join(settings.repos_dir, repo)
|
||||
# Host path (for docker run write operations)
|
||||
host_repo_path = os.path.join(settings.host_repos_dir, repo)
|
||||
|
||||
if not os.path.isdir(local_repo_path):
|
||||
raise FileNotFoundError(f"Repo not found: {local_repo_path}")
|
||||
|
||||
# Write task file if content provided
|
||||
if task_content:
|
||||
task_path = os.path.join(repo_path, config["task_file"])
|
||||
with open(task_path, "w") as f:
|
||||
f.write(task_content)
|
||||
self._write_task_file(host_repo_path, config["task_file"], task_content)
|
||||
|
||||
# Record run in DB
|
||||
conn = get_db()
|
||||
cursor = conn.execute(
|
||||
"INSERT INTO agent_runs (task_id, agent) VALUES (NULL, ?)",
|
||||
(agent,),
|
||||
"INSERT INTO agent_runs (task_id, agent) VALUES (?, ?)",
|
||||
(task_id, agent),
|
||||
)
|
||||
run_id = cursor.lastrowid
|
||||
conn.commit()
|
||||
|
||||
# Prepare output log
|
||||
# Prepare output log path
|
||||
output_path = f"/app/data/runs/{run_id}.log"
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
|
||||
# Build shell command
|
||||
# Build the claude command
|
||||
task_file = config["task_file"]
|
||||
system_prompt = config["system_prompt"]
|
||||
allowed_tools = config["allowed_tools"]
|
||||
|
||||
cmd = (
|
||||
f'cd {repo_path} && {settings.claude_bin} --print '
|
||||
f'"$(cat {config["task_file"]})" '
|
||||
f'--system-prompt "$(cat {config["system_prompt"]})" '
|
||||
f'--allowedTools {config["allowed_tools"]}'
|
||||
f'cd {local_repo_path} && '
|
||||
f'{self.CLAUDE_BIN} --print '
|
||||
f'"$(cat {task_file})" '
|
||||
f'--system-prompt "$(cat {system_prompt})" '
|
||||
f'--allowedTools {allowed_tools}'
|
||||
)
|
||||
|
||||
logger.info(f"Launching agent '{agent}' for repo '{repo}', run_id={run_id}")
|
||||
|
||||
# Launch as background process
|
||||
with open(output_path, "w") as log_file:
|
||||
subprocess.Popen(
|
||||
proc = subprocess.Popen(
|
||||
["bash", "-c", cmd],
|
||||
stdout=log_file,
|
||||
stderr=subprocess.STDOUT,
|
||||
cwd=repo_path,
|
||||
env={
|
||||
**os.environ,
|
||||
"HOME": "/home/slin",
|
||||
"GIT_AUTHOR_NAME": "claude-bot",
|
||||
"GIT_AUTHOR_EMAIL": "claude-bot@mva154.local",
|
||||
"GIT_COMMITTER_NAME": "claude-bot",
|
||||
"GIT_COMMITTER_EMAIL": "claude-bot@mva154.local",
|
||||
},
|
||||
)
|
||||
|
||||
# Update DB with output path
|
||||
@@ -99,7 +128,192 @@ class AgentLauncher:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Start timeout watchdog
|
||||
t = threading.Thread(
|
||||
target=self._watchdog,
|
||||
args=(proc.pid, run_id),
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
|
||||
# Start monitor thread (waits for completion, commits, pushes)
|
||||
task_row = get_db().execute("SELECT branch FROM tasks WHERE id=?", (task_id,)).fetchone() if task_id else None
|
||||
agent_branch = task_row[0] if task_row else "main"
|
||||
m = threading.Thread(
|
||||
target=self._monitor_agent,
|
||||
args=(proc, run_id, agent, repo, agent_branch),
|
||||
daemon=True,
|
||||
)
|
||||
m.start()
|
||||
|
||||
logger.info(f"Agent '{agent}' launched, pid={proc.pid}, run_id={run_id}")
|
||||
return run_id
|
||||
|
||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None):
|
||||
"""Kill agent if it exceeds timeout."""
|
||||
import time
|
||||
if timeout is None:
|
||||
timeout = self.AGENT_TIMEOUT
|
||||
time.sleep(timeout)
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
logger.warning(f"Agent run_id={run_id} killed after {timeout}s timeout")
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
|
||||
(run_id,),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except ProcessLookupError:
|
||||
pass # Already finished
|
||||
|
||||
def _monitor_agent(self, proc, run_id, agent, repo, branch):
|
||||
"""Wait for agent to finish, commit+push results, update DB."""
|
||||
exit_code = proc.wait()
|
||||
logger.info(f"Agent run_id={run_id} ({agent}) finished with exit_code={exit_code}")
|
||||
|
||||
# Update DB
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=? WHERE id=?",
|
||||
(exit_code, run_id),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Commit and push any changes
|
||||
repo_path = os.path.join(settings.repos_dir, repo)
|
||||
try:
|
||||
git_env = {
|
||||
**os.environ,
|
||||
"HOME": "/home/slin",
|
||||
"GIT_AUTHOR_NAME": "claude-bot",
|
||||
"GIT_AUTHOR_EMAIL": "claude-bot@mva154.local",
|
||||
"GIT_COMMITTER_NAME": "claude-bot",
|
||||
"GIT_COMMITTER_EMAIL": "claude-bot@mva154.local",
|
||||
}
|
||||
result = subprocess.run(
|
||||
["git", "-C", repo_path, "status", "--porcelain"],
|
||||
capture_output=True, text=True, timeout=10, env=git_env
|
||||
)
|
||||
if result.stdout.strip():
|
||||
# Add docs/ always
|
||||
subprocess.run(
|
||||
["git", "-C", repo_path, "add", "docs/"],
|
||||
capture_output=True, text=True, timeout=10, env=git_env
|
||||
)
|
||||
# Add src/ and tests/ for developer
|
||||
if agent == "developer":
|
||||
subprocess.run(
|
||||
["git", "-C", repo_path, "add", "src/", "tests/"],
|
||||
capture_output=True, text=True, timeout=10, env=git_env
|
||||
)
|
||||
# Commit
|
||||
commit_result = subprocess.run(
|
||||
["git", "-C", repo_path, "commit", "-m",
|
||||
f"{agent}(ET): auto-commit from {agent} run_id={run_id}"],
|
||||
capture_output=True, text=True, timeout=30, env=git_env
|
||||
)
|
||||
if commit_result.returncode == 0:
|
||||
push_result = subprocess.run(
|
||||
["git", "-C", repo_path, "push", "origin", branch],
|
||||
capture_output=True, text=True, timeout=60, env=git_env
|
||||
)
|
||||
if push_result.returncode == 0:
|
||||
logger.info(f"Agent run_id={run_id}: committed and pushed to {branch}")
|
||||
else:
|
||||
logger.error(f"Agent run_id={run_id}: push failed: {push_result.stderr}")
|
||||
else:
|
||||
logger.warning(f"Agent run_id={run_id}: commit failed: {commit_result.stderr}")
|
||||
else:
|
||||
logger.info(f"Agent run_id={run_id}: no changes to commit")
|
||||
except Exception as e:
|
||||
logger.error(f"Agent run_id={run_id}: post-run git failed: {e}")
|
||||
|
||||
# Auto-advance stage if agent finished successfully and QG passes
|
||||
if exit_code == 0:
|
||||
self._try_advance_stage(run_id, agent, repo, branch)
|
||||
|
||||
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
|
||||
"""After agent finishes successfully, check QG and advance stage if possible."""
|
||||
try:
|
||||
conn = get_db()
|
||||
task_row = conn.execute(
|
||||
"SELECT id, stage, work_item_id FROM tasks WHERE repo=? AND branch=?",
|
||||
(repo, branch),
|
||||
).fetchone()
|
||||
conn.close()
|
||||
if not task_row:
|
||||
return
|
||||
|
||||
task_id, current_stage, work_item_id = task_row
|
||||
qg_name = get_qg_for_stage(current_stage)
|
||||
next_stage = get_next_stage(current_stage)
|
||||
|
||||
if not next_stage:
|
||||
return
|
||||
|
||||
# Run QG check if defined
|
||||
if qg_name and qg_name in QG_CHECKS:
|
||||
check_fn = QG_CHECKS[qg_name]
|
||||
if qg_name == "check_review_approved":
|
||||
# Skip — handled by PR webhook
|
||||
return
|
||||
elif qg_name == "check_ci_green":
|
||||
passed, reason = check_fn(repo, branch)
|
||||
elif qg_name == "check_tests_passed":
|
||||
passed, reason = check_fn(repo, work_item_id or "")
|
||||
else:
|
||||
passed, reason = check_fn(repo, work_item_id or "")
|
||||
|
||||
if not passed:
|
||||
logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}")
|
||||
return
|
||||
elif qg_name:
|
||||
return
|
||||
|
||||
# Advance stage
|
||||
update_task_stage(task_id, next_stage)
|
||||
notify_stage_change(task_id, current_stage, next_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||
logger.info(f"Task {task_id}: {current_stage} -> {next_stage} (auto-advance after {agent})")
|
||||
|
||||
# Launch next agent if defined
|
||||
next_agent = get_agent_for_stage(next_stage)
|
||||
if next_agent:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
||||
new_run_id = self.launch(next_agent, repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: launched '{next_agent}' (run_id={new_run_id})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
|
||||
|
||||
def _write_task_file(self, host_repo_path: str, task_file: str, content: str):
|
||||
"""Write task file to host repo via docker run with stdin."""
|
||||
full_path = os.path.join(host_repo_path, task_file)
|
||||
# Use docker run with stdin to write content to the file
|
||||
cmd = [
|
||||
"docker", "run", "--rm", "-i",
|
||||
"-v", f"{host_repo_path}:{host_repo_path}",
|
||||
"-w", host_repo_path,
|
||||
"python:3.12-slim",
|
||||
"bash", "-c", f"cat > {full_path}",
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
input=content,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logger.error(f"Failed to write task file: {result.stderr}")
|
||||
raise RuntimeError(f"Failed to write task file: {result.stderr}")
|
||||
logger.info(f"Task file written: {full_path}")
|
||||
except subprocess.TimeoutExpired:
|
||||
raise RuntimeError("Timeout writing task file")
|
||||
|
||||
|
||||
launcher = AgentLauncher()
|
||||
|
||||
Reference in New Issue
Block a user