From 20d6556e22603253272547516608da26494cf2d9 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Tue, 2 Jun 2026 23:58:44 +0300 Subject: [PATCH] refactor(webhooks): enqueue_job instead of in-process launch (ORCH-1) All 8 webhook launch points (plane x4, gitea x4) now enqueue a job and return immediately instead of synchronously spawning claude in the uvicorn process. --- src/webhooks/gitea.py | 18 +++++++++--------- src/webhooks/plane.py | 17 +++++++++-------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/webhooks/gitea.py b/src/webhooks/gitea.py index 33c318c..f6cd58a 100644 --- a/src/webhooks/gitea.py +++ b/src/webhooks/gitea.py @@ -10,7 +10,7 @@ import httpx from fastapi import APIRouter, Request, HTTPException from ..config import settings -from ..db import get_db, get_task_by_repo_branch, update_task_stage +from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job from ..stages import get_next_stage, get_agent_for_stage from ..qg.checks import check_ci_green, check_review_approved from ..notifications import notify_stage_change, notify_qg_failure, notify_error @@ -123,8 +123,8 @@ async def handle_push(payload: dict): if agent: try: task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}" - run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: push triggered {current_stage} → {next_stage}, launched '{agent}' (run_id={run_id})") + job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: push triggered {current_stage} → {next_stage}, enqueued '{agent}' (job_id={job_id})") except Exception as e: notify_error(task_id, f"Failed to launch agent '{agent}': {e}") @@ -200,8 +200,8 @@ async def handle_ci_status(payload: dict): if agent: try: task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}" - run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: CI green → {next_stage}, launched '{agent}' (run_id={run_id})") + job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})") except Exception as e: notify_error(task_id, f"Failed to launch agent '{agent}': {e}") else: @@ -272,8 +272,8 @@ async def handle_pr(payload: dict): if agent: try: task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}" - run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: PR approved → {next_stage}, launched '{agent}' (run_id={run_id})") + job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})") except Exception as e: notify_error(task_id, f"Failed to launch agent '{agent}': {e}") else: @@ -297,8 +297,8 @@ async def handle_pr(payload: dict): f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n" f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})" ) - run_id = launcher.launch("developer", repo_name, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: changes requested, relaunching developer (attempt {retry_count + 1})") + job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})") except Exception as e: notify_error(task_id, f"Failed to relaunch developer: {e}") else: diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index 177ce6e..0dd23af 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -14,6 +14,7 @@ from ..db import ( get_task_by_plane_id, get_next_work_item_id, update_task_stage, + enqueue_job, ) from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage from ..qg.checks import QG_CHECKS @@ -186,8 +187,8 @@ async def handle_work_item_created(data: dict, project_id: str = ""): if task_row: task_id = task_row[0] task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}" - run_id = launcher.launch("analyst", repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: launched analyst (run_id={run_id})") + job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})") # Post start comment to Plane from ..plane_sync import add_comment as _add_comment _add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).") @@ -231,10 +232,10 @@ async def handle_comment(data: dict, project_id: str = ""): f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. " f"Reason: {reason}\nRevise and improve." ) - new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id) + new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) from ..plane_sync import add_comment as _plane_comment _plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}") - logger.info(f"Task {task_id}: rejected at analysis, relaunched analyst") + logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})") else: # Rollback to previous stage prev_stage = get_previous_stage(current_stage) @@ -305,10 +306,10 @@ async def handle_comment(data: dict, project_id: str = ""): f"Read the latest comment in Plane and revise your artifacts.\n" f"Answer: {comment_body[:500]}" ) - new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id) + new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) from ..plane_sync import add_comment as _pc2 _pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.") - logger.info(f"Task {task_id}: stakeholder answered questions, relaunched analyst (run_id={new_run})") + logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})") return except Exception as e: logger.error(f"Failed to check issue state: {e}") @@ -386,9 +387,9 @@ async def _try_advance_stage( if agent: try: task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}" - run_id = launcher.launch(agent, repo, task_desc, task_id=task_id) + job_id = enqueue_job(agent, repo, task_desc, task_id=task_id) plane_notify_stage(work_item_id, current_stage, next_stage, agent) - logger.info(f"Task {task_id}: launched agent '{agent}', run_id={run_id}") + logger.info(f"Task {task_id}: enqueued agent '{agent}', job_id={job_id}") except Exception as e: notify_error(task_id, f"Failed to launch agent '{agent}': {e}") logger.error(f"Agent launch failed: {e}")