refactor(webhooks): enqueue_job instead of in-process launch (ORCH-1)
All 8 webhook launch points (plane x4, gitea x4) now enqueue a job and return immediately instead of synchronously spawning claude in the uvicorn process.
This commit is contained in:
@@ -10,7 +10,7 @@ import httpx
|
||||
from fastapi import APIRouter, Request, HTTPException
|
||||
|
||||
from ..config import settings
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||
from ..stages import get_next_stage, get_agent_for_stage
|
||||
from ..qg.checks import check_ci_green, check_review_approved
|
||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||||
@@ -123,8 +123,8 @@ async def handle_push(payload: dict):
|
||||
if agent:
|
||||
try:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
|
||||
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: push triggered {current_stage} → {next_stage}, launched '{agent}' (run_id={run_id})")
|
||||
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: push triggered {current_stage} → {next_stage}, enqueued '{agent}' (job_id={job_id})")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||
|
||||
@@ -200,8 +200,8 @@ async def handle_ci_status(payload: dict):
|
||||
if agent:
|
||||
try:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
|
||||
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: CI green → {next_stage}, launched '{agent}' (run_id={run_id})")
|
||||
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||
else:
|
||||
@@ -272,8 +272,8 @@ async def handle_pr(payload: dict):
|
||||
if agent:
|
||||
try:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}"
|
||||
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: PR approved → {next_stage}, launched '{agent}' (run_id={run_id})")
|
||||
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||
else:
|
||||
@@ -297,8 +297,8 @@ async def handle_pr(payload: dict):
|
||||
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n"
|
||||
f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
|
||||
)
|
||||
run_id = launcher.launch("developer", repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: changes requested, relaunching developer (attempt {retry_count + 1})")
|
||||
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to relaunch developer: {e}")
|
||||
else:
|
||||
|
||||
@@ -14,6 +14,7 @@ from ..db import (
|
||||
get_task_by_plane_id,
|
||||
get_next_work_item_id,
|
||||
update_task_stage,
|
||||
enqueue_job,
|
||||
)
|
||||
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
||||
from ..qg.checks import QG_CHECKS
|
||||
@@ -186,8 +187,8 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
|
||||
if task_row:
|
||||
task_id = task_row[0]
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}"
|
||||
run_id = launcher.launch("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: launched analyst (run_id={run_id})")
|
||||
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
|
||||
# Post start comment to Plane
|
||||
from ..plane_sync import add_comment as _add_comment
|
||||
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).")
|
||||
@@ -231,10 +232,10 @@ async def handle_comment(data: dict, project_id: str = ""):
|
||||
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
|
||||
f"Reason: {reason}\nRevise and improve."
|
||||
)
|
||||
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
from ..plane_sync import add_comment as _plane_comment
|
||||
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}")
|
||||
logger.info(f"Task {task_id}: rejected at analysis, relaunched analyst")
|
||||
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
|
||||
else:
|
||||
# Rollback to previous stage
|
||||
prev_stage = get_previous_stage(current_stage)
|
||||
@@ -305,10 +306,10 @@ async def handle_comment(data: dict, project_id: str = ""):
|
||||
f"Read the latest comment in Plane and revise your artifacts.\n"
|
||||
f"Answer: {comment_body[:500]}"
|
||||
)
|
||||
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
from ..plane_sync import add_comment as _pc2
|
||||
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.")
|
||||
logger.info(f"Task {task_id}: stakeholder answered questions, relaunched analyst (run_id={new_run})")
|
||||
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to check issue state: {e}")
|
||||
@@ -386,9 +387,9 @@ async def _try_advance_stage(
|
||||
if agent:
|
||||
try:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
||||
run_id = launcher.launch(agent, repo, task_desc, task_id=task_id)
|
||||
job_id = enqueue_job(agent, repo, task_desc, task_id=task_id)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage, agent)
|
||||
logger.info(f"Task {task_id}: launched agent '{agent}', run_id={run_id}")
|
||||
logger.info(f"Task {task_id}: enqueued agent '{agent}', job_id={job_id}")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||
logger.error(f"Agent launch failed: {e}")
|
||||
|
||||
Reference in New Issue
Block a user