From 51401a3ba9119a40b8ae287b121069a83b16ac9a Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 08:56:25 +0300 Subject: [PATCH] refactor(launcher,plane): delegate stage advance to stage_engine launcher._try_advance_stage and plane._try_advance_stage are now thin wrappers over stage_engine.advance_stage. The plane webhook calls the sync engine via asyncio.to_thread so there is exactly one implementation. The launcher forwards finished_agent so the agent-specific rollback branches still fire; the webhook passes None (human :approved:), matching prior behavior. Also fixes the agent-selection bug in the launcher path: it used to enqueue get_agent_for_stage(next_stage) (skipping a stage, e.g. analysis->architecture launched developer instead of architect). The unified engine uses get_agent_for_stage(current_stage), consistent with plane and gitea. --- src/agents/launcher.py | 187 ++++------------------------------------- src/webhooks/plane.py | 93 +++++--------------- 2 files changed, 39 insertions(+), 241 deletions(-) diff --git a/src/agents/launcher.py b/src/agents/launcher.py index 232cf4e..52b6ff3 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -564,7 +564,15 @@ class AgentLauncher: pass def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str): - """After agent finishes successfully, check QG and advance stage if possible.""" + """After agent finishes successfully, advance the stage via the unified engine. + + ORCH-4 / M-3: the 174-line body that used to live here moved into + src/stage_engine.advance_stage(). This is now a thin wrapper: it looks up + the task by (repo, branch) and delegates. `agent` is forwarded as + finished_agent so the analyst/reviewer/tester/architect rollback branches + still trigger exactly as before. The agent-selection bug (it used to call + get_agent_for_stage(next_stage)) is fixed inside the engine. + """ try: conn = get_db() task_row = conn.execute( @@ -576,174 +584,15 @@ class AgentLauncher: return task_id, current_stage, work_item_id = task_row - qg_name = get_qg_for_stage(current_stage) - next_stage = get_next_stage(current_stage) - - if not next_stage: - return - - # Run QG check if defined - if qg_name and qg_name in QG_CHECKS: - check_fn = QG_CHECKS[qg_name] - if qg_name in ("check_analysis_approved",): - # Requires human approval - post request comment if analyst just finished - if agent == "analyst" and qg_name == "check_analysis_approved" and work_item_id: - files_check = QG_CHECKS.get("check_analysis_complete") - if files_check: - files_ok, _ = files_check(repo, work_item_id, branch) - if files_ok: - # Full artifacts ready -> In Review - from ..plane_sync import set_issue_in_review - set_issue_in_review(work_item_id) - plane_add_comment( - work_item_id, - "\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. " - "\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture." - ) - notify_approve_requested(task_id) - logger.info(f"Task {task_id}: analyst finished, requested :approved: in Plane") - else: - # Check if questions file exists (in the task worktree) - import os as _os - questions_path = _os.path.join( - get_worktree_path(repo, branch), - f"docs/work-items/{work_item_id}/01-questions.md" - ) - if _os.path.isfile(questions_path): - # Analyst has questions -> Needs Input - from ..plane_sync import set_issue_needs_input - set_issue_needs_input(work_item_id) - with open(questions_path, "r") as qf: - questions_text = qf.read() - plane_add_comment( - work_item_id, - f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}" - ) - from ..notifications import send_telegram - send_telegram( - f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane." - ) - else: - # No artifacts and no questions - plane_add_comment( - work_item_id, - "\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433." - ) - return - elif qg_name in ("check_ci_green", "check_tests_local"): - # (repo, branch) signature — already worktree-aware. - passed, reason = check_fn(repo, branch) - elif qg_name == "check_tests_passed": - # Artifact check — pass branch so it reads from the worktree. - passed, reason = check_fn(repo, work_item_id or "", branch) - else: - # Other artifact checks (check_architecture_done, etc.) — worktree-aware. - passed, reason = check_fn(repo, work_item_id or "", branch) - - if not passed: - logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}") - # If reviewer says REQUEST_CHANGES, rollback to development - if agent == "reviewer" and "REQUEST_CHANGES" in reason: - update_task_stage(task_id, "development") - notify_stage_change(task_id, current_stage, "development") - plane_notify_stage(work_item_id, current_stage, "development") - # Count retries - conn2 = get_db() - retry_count = conn2.execute( - "SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'", - (task_id,) - ).fetchone()[0] - conn2.close() - if retry_count < 3: - task_desc = ( - f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" - f"Stage: development\nNote: REQUEST_CHANGES from reviewer " - f"(attempt {retry_count+1}/3). Fix findings in " - f"docs/work-items/{work_item_id}/12-review.md" - ) - new_job = enqueue_job("developer", repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer (job_id={new_job})") - else: - from ..notifications import send_telegram - send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.") - logger.error(f"Task {task_id}: max retries reached") - - # Task 6: Tester FAIL -> rollback to development - if agent == "tester" and qg_name == "check_tests_passed" and not passed: - update_task_stage(task_id, "development") - notify_stage_change(task_id, current_stage, "development") - plane_notify_stage(work_item_id, current_stage, "development") - from ..plane_sync import set_issue_in_progress - set_issue_in_progress(work_item_id) - plane_add_comment( - work_item_id, - f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430." - ) - conn2 = get_db() - retry_count = conn2.execute( - "SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'", - (task_id,) - ).fetchone()[0] - conn2.close() - if retry_count < 3: - task_desc = ( - f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" - f"Stage: development\nNote: Tests FAILED. " - f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md" - ) - new_job = enqueue_job("developer", repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})") - else: - from ..notifications import send_telegram - from ..plane_sync import set_issue_blocked - set_issue_blocked(work_item_id) - send_telegram(f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.") - - # Task 8: Architect conflict -> rollback to analysis - if agent == "architect" and qg_name == "check_architecture_done" and not passed: - import os as _os - conflict_path = _os.path.join( - get_worktree_path(repo, branch), - f"docs/work-items/{work_item_id}/10-conflict.md" - ) - if _os.path.isfile(conflict_path): - update_task_stage(task_id, "analysis") - notify_stage_change(task_id, current_stage, "analysis") - plane_notify_stage(work_item_id, current_stage, "analysis") - from ..plane_sync import set_issue_in_progress - set_issue_in_progress(work_item_id) - with open(conflict_path, "r") as cf: - conflict_text = cf.read()[:500] - plane_add_comment( - work_item_id, - f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. \u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}" - ) - task_desc = ( - f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" - f"Stage: analysis\nNote: Architect conflict. Revise TRZ. " - f"See docs/work-items/{work_item_id}/10-conflict.md" - ) - new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: architect conflict, enqueued analyst (job_id={new_job})") - return - - return - elif qg_name: - return - - # Advance stage - update_task_stage(task_id, next_stage) - notify_stage_change(task_id, current_stage, next_stage) - plane_notify_stage(work_item_id, current_stage, next_stage) - logger.info(f"Task {task_id}: {current_stage} -> {next_stage} (auto-advance after {agent})") - - # Launch next agent if defined - next_agent = get_agent_for_stage(next_stage) - if next_agent: - task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}" - new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})") - + from ..stage_engine import advance_stage + advance_stage( + task_id=task_id, + current_stage=current_stage, + repo=repo, + work_item_id=work_item_id, + branch=branch, + finished_agent=agent, + ) except Exception as e: logger.error(f"Auto-advance failed for run_id={run_id}: {e}") diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index 0dd23af..e7ba716 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -318,81 +318,30 @@ async def handle_comment(data: dict, project_id: str = ""): async def _try_advance_stage( task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str ): - """Run QG check for current stage and advance if passed.""" - qg_name = get_qg_for_stage(current_stage) - next_stage = get_next_stage(current_stage) + """Thin async wrapper over the unified stage engine (ORCH-4 / M-3). - if not next_stage: - logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'") - return + The QG dispatch (including the check_review_approved PR-by-branch logic) and + the advance/launch logic now live in src/stage_engine.advance_stage(), which + is synchronous. We run it off the event loop via asyncio.to_thread so there + is exactly one implementation shared with the launcher. - # Run QG check if one is required - if qg_name: - qg_func = QG_CHECKS.get(qg_name) - if not qg_func: - logger.error(f"QG function '{qg_name}' not found in registry") - return + finished_agent is None on this webhook path (a human :approved: comment, not + a finished agent), so the agent-specific rollback branches inside the engine + intentionally do not trigger — identical to the old plane behavior, which + only ran the QG and either advanced or reported the failure. + """ + import asyncio + from ..stage_engine import advance_stage - # Determine args based on QG function - if qg_name in ("check_analysis_approved", "check_analysis_complete", "check_architecture_done", "check_tests_passed", "check_reviewer_verdict"): - # ORCH-2 / S-4: pass branch so artifacts are read from the task worktree. - passed, reason = qg_func(repo, work_item_id, branch) - elif qg_name in ("check_ci_green", "check_tests_local"): - passed, reason = qg_func(repo, branch) - elif qg_name == "check_review_approved": - # Find PR number by branch via Gitea API - import httpx as _httpx - from ..config import settings as _s - _owner = _s.gitea_owner - _url = f"{_s.gitea_url}/api/v1/repos/{_owner}/{repo}/pulls?state=open&limit=50" - _headers = {"Authorization": f"token {_s.gitea_token}"} - try: - _resp = _httpx.get(_url, headers=_headers, timeout=10) - _prs = _resp.json() - _pr_number = None - for _pr in _prs: - if _pr.get("head", {}).get("ref") == branch: - _pr_number = _pr["number"] - break - if _pr_number: - passed, reason = qg_func(repo, _pr_number) - else: - # No open PR but review file exists — check file-based - import os - from ..git_worktree import get_worktree_path as _gwp - _wt = _gwp(repo, branch) if os.path.isdir(_gwp(repo, branch)) else os.path.join(_s.repos_dir, repo) - _review_path = os.path.join(_wt, f"docs/work-items/{work_item_id}/12-review.md") - _review_path2 = os.path.join(_wt, f"docs/work-items/{work_item_id}/09-review.md") - if os.path.isfile(_review_path) or os.path.isfile(_review_path2): - passed, reason = True, "Review file exists (file-based approval)" - else: - passed, reason = False, "No open PR found and no review file" - except Exception as _e: - passed, reason = False, f"Error finding PR: {_e}" - else: - passed, reason = False, f"Unknown QG: {qg_name}" - - if not passed: - notify_qg_failure(task_id, current_stage, qg_name, reason) - plane_notify_qg(work_item_id, current_stage, qg_name, reason) - return - - # Advance stage - update_task_stage(task_id, next_stage) - notify_stage_change(task_id, current_stage, next_stage) - plane_notify_stage(work_item_id, current_stage, next_stage) - - # Launch agent associated with the current stage's transition - agent = get_agent_for_stage(current_stage) - if agent: - try: - task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}" - job_id = enqueue_job(agent, repo, task_desc, task_id=task_id) - plane_notify_stage(work_item_id, current_stage, next_stage, agent) - logger.info(f"Task {task_id}: enqueued agent '{agent}', job_id={job_id}") - except Exception as e: - notify_error(task_id, f"Failed to launch agent '{agent}': {e}") - logger.error(f"Agent launch failed: {e}") + await asyncio.to_thread( + advance_stage, + task_id, + current_stage, + repo, + work_item_id, + branch, + None, + ) async def _create_gitea_branch(repo: str, branch: str):