From 0befc49b1e3b90f5e2fe946620cf30407663a410 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 08:56:14 +0300 Subject: [PATCH] refactor(stage): extract unified stage_engine.advance_stage (M-3) Merge the two diverged _try_advance_stage implementations (launcher sync + plane async) into one synchronous engine. Preserves all launcher business logic (analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester FAIL rollback+retry, architect conflict rollback) and the plane check_review_approved PR-by-branch dispatch. Unifies the QG signature dispatch. Fixes agent selection: advancing FROM current_stage launches get_agent_for_stage(current_stage), not next_stage. --- src/stage_engine.py | 425 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 425 insertions(+) create mode 100644 src/stage_engine.py diff --git a/src/stage_engine.py b/src/stage_engine.py new file mode 100644 index 0000000..33bc0b1 --- /dev/null +++ b/src/stage_engine.py @@ -0,0 +1,425 @@ +"""Unified stage engine (ORCH-4 / M-3). + +Single source of truth for "an agent finished / a human approved -> run the +stage's quality gate and either advance the pipeline or roll it back". + +Before ORCH-4 this logic was duplicated in two places that had silently +diverged: + - src/agents/launcher.py::_try_advance_stage (sync, rich business logic: + analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester FAIL + rollback+retry, architect conflict rollback) — but it picked the next agent + with get_agent_for_stage(next_stage), which is WRONG. + - src/webhooks/plane.py::_try_advance_stage (async, leaner, but it had the + check_review_approved PR-by-branch dispatch and used the CORRECT + get_agent_for_stage(current_stage)). + +This module merges both into one sync `advance_stage(...)`. launcher calls it +directly; the plane webhook calls it through asyncio.to_thread so there is +exactly one implementation. + +Agent-selection bug fix (ORCH-4): + stages.py defines `agent` as "the agent to launch when advancing FROM this + stage". So when advancing current -> next, the correct agent to launch is + get_agent_for_stage(current_stage). launcher's old next_stage lookup skipped a + stage (e.g. analysis->architecture launched 'developer' instead of + 'architect'). plane and gitea already used current_stage; we unify on that. +""" + +import logging +import os +from dataclasses import dataclass, field + +from .db import get_db, update_task_stage, enqueue_job +from .stages import get_next_stage, get_qg_for_stage, get_agent_for_stage +from .git_worktree import get_worktree_path +from .qg.checks import QG_CHECKS +from .notifications import ( + notify_stage_change, + notify_qg_failure, + notify_approve_requested, + send_telegram, +) +from .plane_sync import ( + notify_stage_change as plane_notify_stage, + notify_qg_failure as plane_notify_qg, + add_comment as plane_add_comment, + set_issue_in_review, + set_issue_needs_input, + set_issue_in_progress, + set_issue_blocked, +) +from .config import settings + +logger = logging.getLogger("orchestrator.stage_engine") + +MAX_DEVELOPER_RETRIES = 3 + + +@dataclass +class AdvanceResult: + """Outcome of an advance_stage() call (mostly for tests/observability).""" + + advanced: bool = False + from_stage: str | None = None + to_stage: str | None = None + enqueued_agent: str | None = None + enqueued_job_id: int | None = None + qg_name: str | None = None + qg_passed: bool | None = None + qg_reason: str | None = None + rolled_back_to: str | None = None + alerted: bool = False + note: str | None = None + notes: list = field(default_factory=list) + + +def _run_qg(qg_name: str, repo: str, work_item_id: str, branch: str): + """Dispatch a quality-gate check to the right signature and run it. + + Signatures (unified from launcher + plane): + - check_ci_green / check_tests_local -> (repo, branch) + - check_review_approved -> (repo, pr_number) [PR found by branch] + - everything else (artifact checks) -> (repo, work_item_id, branch) + + Returns (passed: bool, reason: str). + """ + check_fn = QG_CHECKS.get(qg_name) + if not check_fn: + logger.error(f"QG function '{qg_name}' not found in registry") + return False, f"Unknown QG: {qg_name}" + + if qg_name in ("check_ci_green", "check_tests_local"): + # (repo, branch) — already worktree-aware. + return check_fn(repo, branch) + + if qg_name == "check_review_approved": + # Special case kept from plane: find the open PR for this branch via + # Gitea, then check it; fall back to a file-based review marker. + return _check_review_approved_by_branch(check_fn, repo, work_item_id, branch) + + # All other artifact checks: (repo, work_item_id, branch). Pass branch so the + # check reads from the task worktree (ORCH-2 / S-4). + return check_fn(repo, work_item_id or "", branch) + + +def _check_review_approved_by_branch(check_fn, repo: str, work_item_id: str, branch: str): + """check_review_approved dispatch preserved from plane._try_advance_stage. + + Finds the open PR whose head ref == branch via the Gitea API and runs + check_review_approved(repo, pr_number). If no open PR exists, falls back to a + file-based review marker (12-review.md / 09-review.md) like the original. + """ + import httpx as _httpx + + owner = settings.gitea_owner + url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50" + headers = {"Authorization": f"token {settings.gitea_token}"} + try: + resp = _httpx.get(url, headers=headers, timeout=10) + prs = resp.json() + pr_number = None + for pr in prs: + if pr.get("head", {}).get("ref") == branch: + pr_number = pr["number"] + break + if pr_number: + return check_fn(repo, pr_number) + # No open PR but a review file may exist — check file-based. + wt = get_worktree_path(repo, branch) + if not os.path.isdir(wt): + wt = os.path.join(settings.repos_dir, repo) + review_path = os.path.join(wt, f"docs/work-items/{work_item_id}/12-review.md") + review_path2 = os.path.join(wt, f"docs/work-items/{work_item_id}/09-review.md") + if os.path.isfile(review_path) or os.path.isfile(review_path2): + return True, "Review file exists (file-based approval)" + return False, "No open PR found and no review file" + except Exception as e: + return False, f"Error finding PR: {e}" + + +def _developer_retry_count(task_id: int) -> int: + """How many developer runs have already happened for this task.""" + conn = get_db() + n = conn.execute( + "SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'", + (task_id,), + ).fetchone()[0] + conn.close() + return n + + +def advance_stage( + task_id: int, + current_stage: str, + repo: str, + work_item_id: str, + branch: str, + finished_agent: str | None = None, +) -> AdvanceResult: + """Run the current stage's quality gate and advance / roll back the pipeline. + + This is the single merged implementation (ORCH-4 / M-3). It is synchronous; + the async plane webhook calls it via asyncio.to_thread. + + Args: + task_id: tasks.id + current_stage: the stage the task is currently in + repo: repository name + work_item_id: Plane work item id (may be "" / None) + branch: feature branch + finished_agent: the agent that just finished (launcher path). Drives the + approved/REQUEST_CHANGES/tester/architect branches. In the + plane webhook path it is None, so those agent-specific + branches simply do not trigger (matches old plane behavior). + + Returns AdvanceResult describing what happened. + """ + result = AdvanceResult(from_stage=current_stage) + agent = finished_agent + try: + qg_name = get_qg_for_stage(current_stage) + next_stage = get_next_stage(current_stage) + result.qg_name = qg_name + result.to_stage = next_stage + + if not next_stage: + logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'") + result.note = "terminal" + return result + + # --- Quality gate ---------------------------------------------------- + if qg_name and qg_name in QG_CHECKS: + # Human-approval gate: special analyst approved-flow (launcher only). + if qg_name == "check_analysis_approved": + _handle_analysis_approved_flow( + task_id, current_stage, repo, work_item_id, branch, agent, result + ) + return result + + passed, reason = _run_qg(qg_name, repo, work_item_id, branch) + result.qg_passed = passed + result.qg_reason = reason + + if not passed: + logger.info( + f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}" + ) + # Behaviour parity: + # - webhook path (finished_agent is None): emit the generic + # QG-failure notification, exactly like the old plane handler. + # - launcher path (finished_agent set): NO generic notification; + # the rollback branches below own their own messaging, exactly + # like the old launcher handler. + if agent is None: + notify_qg_failure(task_id, current_stage, qg_name, reason) + plane_notify_qg(work_item_id, current_stage, qg_name, reason) + + _handle_qg_failure_rollbacks( + task_id, current_stage, repo, work_item_id, branch, + agent, qg_name, reason, result, + ) + return result + + elif qg_name: + # QG name set but not registered — do not advance (launcher behavior). + result.note = f"qg '{qg_name}' not in registry" + return result + + # --- Advance --------------------------------------------------------- + update_task_stage(task_id, next_stage) + notify_stage_change(task_id, current_stage, next_stage) + plane_notify_stage(work_item_id, current_stage, next_stage) + result.advanced = True + logger.info( + f"Task {task_id}: {current_stage} -> {next_stage} " + f"(auto-advance after {agent})" + ) + + # --- Launch the next agent (ORCH-4 fix: current_stage, not next) ----- + next_agent = get_agent_for_stage(current_stage) + if next_agent: + task_desc = ( + f"Work item: {work_item_id}\nRepo: {repo}\n" + f"Branch: {branch}\nStage: {next_stage}" + ) + new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id) + result.enqueued_agent = next_agent + result.enqueued_job_id = new_job_id + logger.info( + f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})" + ) + + return result + + except Exception as e: + logger.error(f"advance_stage failed for task_id={task_id}: {e}") + result.note = f"error: {e}" + return result + + +def _handle_analysis_approved_flow( + task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult +): + """Analyst approved-flow (launcher only). + + Only triggers when the analyst just finished (agent == 'analyst') in the + launcher path. Decides between: artifacts ready -> In Review + request + :approved:; questions file -> Needs Input; otherwise a warning comment. + This gate never advances on its own (human approval does that via the plane + webhook), matching the original launcher behavior. + """ + result.qg_name = "check_analysis_approved" + result.note = "analysis-approval-gate" + if not (agent == "analyst" and work_item_id): + return + + files_check = QG_CHECKS.get("check_analysis_complete") + if not files_check: + return + + files_ok, _ = files_check(repo, work_item_id, branch) + if files_ok: + # Full artifacts ready -> In Review, ask for :approved:. + set_issue_in_review(work_item_id) + plane_add_comment( + work_item_id, + "\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. " + "\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: " + "\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.", + ) + notify_approve_requested(task_id) + result.note = "analysis-in-review" + logger.info( + f"Task {task_id}: analyst finished, requested :approved: in Plane" + ) + return + + questions_path = os.path.join( + get_worktree_path(repo, branch), + f"docs/work-items/{work_item_id}/01-questions.md", + ) + if os.path.isfile(questions_path): + set_issue_needs_input(work_item_id) + with open(questions_path, "r") as qf: + questions_text = qf.read() + plane_add_comment( + work_item_id, + f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}", + ) + send_telegram( + f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane." + ) + result.note = "analysis-needs-input" + return + + # No artifacts and no questions. + plane_add_comment( + work_item_id, + "\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433.", + ) + result.note = "analysis-empty" + + +def _handle_qg_failure_rollbacks( + task_id, current_stage, repo, work_item_id, branch, + agent, qg_name, reason, result: AdvanceResult, +): + """All rollback/retry branches from the original launcher, preserved verbatim. + + Only fire on the launcher path (finished_agent is set). The webhook path + passes finished_agent=None, so none of these agent-specific branches trigger + — that matches the old plane behavior (it just reported the QG failure). + """ + # Reviewer REQUEST_CHANGES -> rollback to development + retry (max 3). + if agent == "reviewer" and "REQUEST_CHANGES" in (reason or ""): + update_task_stage(task_id, "development") + notify_stage_change(task_id, current_stage, "development") + plane_notify_stage(work_item_id, current_stage, "development") + result.rolled_back_to = "development" + retry_count = _developer_retry_count(task_id) + if retry_count < MAX_DEVELOPER_RETRIES: + task_desc = ( + f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" + f"Stage: development\nNote: REQUEST_CHANGES from reviewer " + f"(attempt {retry_count+1}/3). Fix findings in " + f"docs/work-items/{work_item_id}/12-review.md" + ) + new_job = enqueue_job("developer", repo, task_desc, task_id=task_id) + result.enqueued_agent = "developer" + result.enqueued_job_id = new_job + logger.info( + f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer " + f"(job_id={new_job})" + ) + else: + send_telegram( + f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. " + f"Manual intervention needed." + ) + result.alerted = True + logger.error(f"Task {task_id}: max retries reached") + + # Tester check_tests_passed FAIL -> rollback to development + retry (max 3). + if agent == "tester" and qg_name == "check_tests_passed": + update_task_stage(task_id, "development") + notify_stage_change(task_id, current_stage, "development") + plane_notify_stage(work_item_id, current_stage, "development") + result.rolled_back_to = "development" + set_issue_in_progress(work_item_id) + plane_add_comment( + work_item_id, + f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. " + f"Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.", + ) + retry_count = _developer_retry_count(task_id) + if retry_count < MAX_DEVELOPER_RETRIES: + task_desc = ( + f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" + f"Stage: development\nNote: Tests FAILED. " + f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md" + ) + new_job = enqueue_job("developer", repo, task_desc, task_id=task_id) + result.enqueued_agent = "developer" + result.enqueued_job_id = new_job + logger.info( + f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})" + ) + else: + set_issue_blocked(work_item_id) + send_telegram( + f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer " + f"retries. Manual intervention needed." + ) + result.alerted = True + + # Architect conflict (10-conflict.md exists) -> rollback to analysis. + if agent == "architect" and qg_name == "check_architecture_done": + conflict_path = os.path.join( + get_worktree_path(repo, branch), + f"docs/work-items/{work_item_id}/10-conflict.md", + ) + if os.path.isfile(conflict_path): + update_task_stage(task_id, "analysis") + notify_stage_change(task_id, current_stage, "analysis") + plane_notify_stage(work_item_id, current_stage, "analysis") + result.rolled_back_to = "analysis" + set_issue_in_progress(work_item_id) + with open(conflict_path, "r") as cf: + conflict_text = cf.read()[:500] + plane_add_comment( + work_item_id, + f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. " + f"\u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}", + ) + task_desc = ( + f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" + f"Stage: analysis\nNote: Architect conflict. Revise TRZ. " + f"See docs/work-items/{work_item_id}/10-conflict.md" + ) + new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) + result.enqueued_agent = "analyst" + result.enqueued_job_id = new_job + logger.info( + f"Task {task_id}: architect conflict, enqueued analyst " + f"(job_id={new_job})" + )