refactor(stage): extract unified stage_engine.advance_stage (M-3)
Merge the two diverged _try_advance_stage implementations (launcher sync + plane async) into one synchronous engine. Preserves all launcher business logic (analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester FAIL rollback+retry, architect conflict rollback) and the plane check_review_approved PR-by-branch dispatch. Unifies the QG signature dispatch. Fixes agent selection: advancing FROM current_stage launches get_agent_for_stage(current_stage), not next_stage.
This commit is contained in:
425
src/stage_engine.py
Normal file
425
src/stage_engine.py
Normal file
@@ -0,0 +1,425 @@
|
||||
"""Unified stage engine (ORCH-4 / M-3).
|
||||
|
||||
Single source of truth for "an agent finished / a human approved -> run the
|
||||
stage's quality gate and either advance the pipeline or roll it back".
|
||||
|
||||
Before ORCH-4 this logic was duplicated in two places that had silently
|
||||
diverged:
|
||||
- src/agents/launcher.py::_try_advance_stage (sync, rich business logic:
|
||||
analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester FAIL
|
||||
rollback+retry, architect conflict rollback) — but it picked the next agent
|
||||
with get_agent_for_stage(next_stage), which is WRONG.
|
||||
- src/webhooks/plane.py::_try_advance_stage (async, leaner, but it had the
|
||||
check_review_approved PR-by-branch dispatch and used the CORRECT
|
||||
get_agent_for_stage(current_stage)).
|
||||
|
||||
This module merges both into one sync `advance_stage(...)`. launcher calls it
|
||||
directly; the plane webhook calls it through asyncio.to_thread so there is
|
||||
exactly one implementation.
|
||||
|
||||
Agent-selection bug fix (ORCH-4):
|
||||
stages.py defines `agent` as "the agent to launch when advancing FROM this
|
||||
stage". So when advancing current -> next, the correct agent to launch is
|
||||
get_agent_for_stage(current_stage). launcher's old next_stage lookup skipped a
|
||||
stage (e.g. analysis->architecture launched 'developer' instead of
|
||||
'architect'). plane and gitea already used current_stage; we unify on that.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from .db import get_db, update_task_stage, enqueue_job
|
||||
from .stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||
from .git_worktree import get_worktree_path
|
||||
from .qg.checks import QG_CHECKS
|
||||
from .notifications import (
|
||||
notify_stage_change,
|
||||
notify_qg_failure,
|
||||
notify_approve_requested,
|
||||
send_telegram,
|
||||
)
|
||||
from .plane_sync import (
|
||||
notify_stage_change as plane_notify_stage,
|
||||
notify_qg_failure as plane_notify_qg,
|
||||
add_comment as plane_add_comment,
|
||||
set_issue_in_review,
|
||||
set_issue_needs_input,
|
||||
set_issue_in_progress,
|
||||
set_issue_blocked,
|
||||
)
|
||||
from .config import settings
|
||||
|
||||
logger = logging.getLogger("orchestrator.stage_engine")
|
||||
|
||||
MAX_DEVELOPER_RETRIES = 3
|
||||
|
||||
|
||||
@dataclass
|
||||
class AdvanceResult:
|
||||
"""Outcome of an advance_stage() call (mostly for tests/observability)."""
|
||||
|
||||
advanced: bool = False
|
||||
from_stage: str | None = None
|
||||
to_stage: str | None = None
|
||||
enqueued_agent: str | None = None
|
||||
enqueued_job_id: int | None = None
|
||||
qg_name: str | None = None
|
||||
qg_passed: bool | None = None
|
||||
qg_reason: str | None = None
|
||||
rolled_back_to: str | None = None
|
||||
alerted: bool = False
|
||||
note: str | None = None
|
||||
notes: list = field(default_factory=list)
|
||||
|
||||
|
||||
def _run_qg(qg_name: str, repo: str, work_item_id: str, branch: str):
|
||||
"""Dispatch a quality-gate check to the right signature and run it.
|
||||
|
||||
Signatures (unified from launcher + plane):
|
||||
- check_ci_green / check_tests_local -> (repo, branch)
|
||||
- check_review_approved -> (repo, pr_number) [PR found by branch]
|
||||
- everything else (artifact checks) -> (repo, work_item_id, branch)
|
||||
|
||||
Returns (passed: bool, reason: str).
|
||||
"""
|
||||
check_fn = QG_CHECKS.get(qg_name)
|
||||
if not check_fn:
|
||||
logger.error(f"QG function '{qg_name}' not found in registry")
|
||||
return False, f"Unknown QG: {qg_name}"
|
||||
|
||||
if qg_name in ("check_ci_green", "check_tests_local"):
|
||||
# (repo, branch) — already worktree-aware.
|
||||
return check_fn(repo, branch)
|
||||
|
||||
if qg_name == "check_review_approved":
|
||||
# Special case kept from plane: find the open PR for this branch via
|
||||
# Gitea, then check it; fall back to a file-based review marker.
|
||||
return _check_review_approved_by_branch(check_fn, repo, work_item_id, branch)
|
||||
|
||||
# All other artifact checks: (repo, work_item_id, branch). Pass branch so the
|
||||
# check reads from the task worktree (ORCH-2 / S-4).
|
||||
return check_fn(repo, work_item_id or "", branch)
|
||||
|
||||
|
||||
def _check_review_approved_by_branch(check_fn, repo: str, work_item_id: str, branch: str):
|
||||
"""check_review_approved dispatch preserved from plane._try_advance_stage.
|
||||
|
||||
Finds the open PR whose head ref == branch via the Gitea API and runs
|
||||
check_review_approved(repo, pr_number). If no open PR exists, falls back to a
|
||||
file-based review marker (12-review.md / 09-review.md) like the original.
|
||||
"""
|
||||
import httpx as _httpx
|
||||
|
||||
owner = settings.gitea_owner
|
||||
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50"
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
try:
|
||||
resp = _httpx.get(url, headers=headers, timeout=10)
|
||||
prs = resp.json()
|
||||
pr_number = None
|
||||
for pr in prs:
|
||||
if pr.get("head", {}).get("ref") == branch:
|
||||
pr_number = pr["number"]
|
||||
break
|
||||
if pr_number:
|
||||
return check_fn(repo, pr_number)
|
||||
# No open PR but a review file may exist — check file-based.
|
||||
wt = get_worktree_path(repo, branch)
|
||||
if not os.path.isdir(wt):
|
||||
wt = os.path.join(settings.repos_dir, repo)
|
||||
review_path = os.path.join(wt, f"docs/work-items/{work_item_id}/12-review.md")
|
||||
review_path2 = os.path.join(wt, f"docs/work-items/{work_item_id}/09-review.md")
|
||||
if os.path.isfile(review_path) or os.path.isfile(review_path2):
|
||||
return True, "Review file exists (file-based approval)"
|
||||
return False, "No open PR found and no review file"
|
||||
except Exception as e:
|
||||
return False, f"Error finding PR: {e}"
|
||||
|
||||
|
||||
def _developer_retry_count(task_id: int) -> int:
|
||||
"""How many developer runs have already happened for this task."""
|
||||
conn = get_db()
|
||||
n = conn.execute(
|
||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
||||
(task_id,),
|
||||
).fetchone()[0]
|
||||
conn.close()
|
||||
return n
|
||||
|
||||
|
||||
def advance_stage(
|
||||
task_id: int,
|
||||
current_stage: str,
|
||||
repo: str,
|
||||
work_item_id: str,
|
||||
branch: str,
|
||||
finished_agent: str | None = None,
|
||||
) -> AdvanceResult:
|
||||
"""Run the current stage's quality gate and advance / roll back the pipeline.
|
||||
|
||||
This is the single merged implementation (ORCH-4 / M-3). It is synchronous;
|
||||
the async plane webhook calls it via asyncio.to_thread.
|
||||
|
||||
Args:
|
||||
task_id: tasks.id
|
||||
current_stage: the stage the task is currently in
|
||||
repo: repository name
|
||||
work_item_id: Plane work item id (may be "" / None)
|
||||
branch: feature branch
|
||||
finished_agent: the agent that just finished (launcher path). Drives the
|
||||
approved/REQUEST_CHANGES/tester/architect branches. In the
|
||||
plane webhook path it is None, so those agent-specific
|
||||
branches simply do not trigger (matches old plane behavior).
|
||||
|
||||
Returns AdvanceResult describing what happened.
|
||||
"""
|
||||
result = AdvanceResult(from_stage=current_stage)
|
||||
agent = finished_agent
|
||||
try:
|
||||
qg_name = get_qg_for_stage(current_stage)
|
||||
next_stage = get_next_stage(current_stage)
|
||||
result.qg_name = qg_name
|
||||
result.to_stage = next_stage
|
||||
|
||||
if not next_stage:
|
||||
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
|
||||
result.note = "terminal"
|
||||
return result
|
||||
|
||||
# --- Quality gate ----------------------------------------------------
|
||||
if qg_name and qg_name in QG_CHECKS:
|
||||
# Human-approval gate: special analyst approved-flow (launcher only).
|
||||
if qg_name == "check_analysis_approved":
|
||||
_handle_analysis_approved_flow(
|
||||
task_id, current_stage, repo, work_item_id, branch, agent, result
|
||||
)
|
||||
return result
|
||||
|
||||
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
|
||||
result.qg_passed = passed
|
||||
result.qg_reason = reason
|
||||
|
||||
if not passed:
|
||||
logger.info(
|
||||
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
|
||||
)
|
||||
# Behaviour parity:
|
||||
# - webhook path (finished_agent is None): emit the generic
|
||||
# QG-failure notification, exactly like the old plane handler.
|
||||
# - launcher path (finished_agent set): NO generic notification;
|
||||
# the rollback branches below own their own messaging, exactly
|
||||
# like the old launcher handler.
|
||||
if agent is None:
|
||||
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
||||
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
||||
|
||||
_handle_qg_failure_rollbacks(
|
||||
task_id, current_stage, repo, work_item_id, branch,
|
||||
agent, qg_name, reason, result,
|
||||
)
|
||||
return result
|
||||
|
||||
elif qg_name:
|
||||
# QG name set but not registered — do not advance (launcher behavior).
|
||||
result.note = f"qg '{qg_name}' not in registry"
|
||||
return result
|
||||
|
||||
# --- Advance ---------------------------------------------------------
|
||||
update_task_stage(task_id, next_stage)
|
||||
notify_stage_change(task_id, current_stage, next_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||
result.advanced = True
|
||||
logger.info(
|
||||
f"Task {task_id}: {current_stage} -> {next_stage} "
|
||||
f"(auto-advance after {agent})"
|
||||
)
|
||||
|
||||
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
|
||||
next_agent = get_agent_for_stage(current_stage)
|
||||
if next_agent:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\n"
|
||||
f"Branch: {branch}\nStage: {next_stage}"
|
||||
)
|
||||
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = next_agent
|
||||
result.enqueued_job_id = new_job_id
|
||||
logger.info(
|
||||
f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})"
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"advance_stage failed for task_id={task_id}: {e}")
|
||||
result.note = f"error: {e}"
|
||||
return result
|
||||
|
||||
|
||||
def _handle_analysis_approved_flow(
|
||||
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
|
||||
):
|
||||
"""Analyst approved-flow (launcher only).
|
||||
|
||||
Only triggers when the analyst just finished (agent == 'analyst') in the
|
||||
launcher path. Decides between: artifacts ready -> In Review + request
|
||||
:approved:; questions file -> Needs Input; otherwise a warning comment.
|
||||
This gate never advances on its own (human approval does that via the plane
|
||||
webhook), matching the original launcher behavior.
|
||||
"""
|
||||
result.qg_name = "check_analysis_approved"
|
||||
result.note = "analysis-approval-gate"
|
||||
if not (agent == "analyst" and work_item_id):
|
||||
return
|
||||
|
||||
files_check = QG_CHECKS.get("check_analysis_complete")
|
||||
if not files_check:
|
||||
return
|
||||
|
||||
files_ok, _ = files_check(repo, work_item_id, branch)
|
||||
if files_ok:
|
||||
# Full artifacts ready -> In Review, ask for :approved:.
|
||||
set_issue_in_review(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: "
|
||||
"\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.",
|
||||
)
|
||||
notify_approve_requested(task_id)
|
||||
result.note = "analysis-in-review"
|
||||
logger.info(
|
||||
f"Task {task_id}: analyst finished, requested :approved: in Plane"
|
||||
)
|
||||
return
|
||||
|
||||
questions_path = os.path.join(
|
||||
get_worktree_path(repo, branch),
|
||||
f"docs/work-items/{work_item_id}/01-questions.md",
|
||||
)
|
||||
if os.path.isfile(questions_path):
|
||||
set_issue_needs_input(work_item_id)
|
||||
with open(questions_path, "r") as qf:
|
||||
questions_text = qf.read()
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}",
|
||||
)
|
||||
send_telegram(
|
||||
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
|
||||
)
|
||||
result.note = "analysis-needs-input"
|
||||
return
|
||||
|
||||
# No artifacts and no questions.
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433.",
|
||||
)
|
||||
result.note = "analysis-empty"
|
||||
|
||||
|
||||
def _handle_qg_failure_rollbacks(
|
||||
task_id, current_stage, repo, work_item_id, branch,
|
||||
agent, qg_name, reason, result: AdvanceResult,
|
||||
):
|
||||
"""All rollback/retry branches from the original launcher, preserved verbatim.
|
||||
|
||||
Only fire on the launcher path (finished_agent is set). The webhook path
|
||||
passes finished_agent=None, so none of these agent-specific branches trigger
|
||||
— that matches the old plane behavior (it just reported the QG failure).
|
||||
"""
|
||||
# Reviewer REQUEST_CHANGES -> rollback to development + retry (max 3).
|
||||
if agent == "reviewer" and "REQUEST_CHANGES" in (reason or ""):
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
result.rolled_back_to = "development"
|
||||
retry_count = _developer_retry_count(task_id)
|
||||
if retry_count < MAX_DEVELOPER_RETRIES:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
|
||||
f"(attempt {retry_count+1}/3). Fix findings in "
|
||||
f"docs/work-items/{work_item_id}/12-review.md"
|
||||
)
|
||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = "developer"
|
||||
result.enqueued_job_id = new_job
|
||||
logger.info(
|
||||
f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer "
|
||||
f"(job_id={new_job})"
|
||||
)
|
||||
else:
|
||||
send_telegram(
|
||||
f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. "
|
||||
f"Manual intervention needed."
|
||||
)
|
||||
result.alerted = True
|
||||
logger.error(f"Task {task_id}: max retries reached")
|
||||
|
||||
# Tester check_tests_passed FAIL -> rollback to development + retry (max 3).
|
||||
if agent == "tester" and qg_name == "check_tests_passed":
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
result.rolled_back_to = "development"
|
||||
set_issue_in_progress(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. "
|
||||
f"Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
|
||||
)
|
||||
retry_count = _developer_retry_count(task_id)
|
||||
if retry_count < MAX_DEVELOPER_RETRIES:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: Tests FAILED. "
|
||||
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
|
||||
)
|
||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = "developer"
|
||||
result.enqueued_job_id = new_job
|
||||
logger.info(
|
||||
f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})"
|
||||
)
|
||||
else:
|
||||
set_issue_blocked(work_item_id)
|
||||
send_telegram(
|
||||
f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer "
|
||||
f"retries. Manual intervention needed."
|
||||
)
|
||||
result.alerted = True
|
||||
|
||||
# Architect conflict (10-conflict.md exists) -> rollback to analysis.
|
||||
if agent == "architect" and qg_name == "check_architecture_done":
|
||||
conflict_path = os.path.join(
|
||||
get_worktree_path(repo, branch),
|
||||
f"docs/work-items/{work_item_id}/10-conflict.md",
|
||||
)
|
||||
if os.path.isfile(conflict_path):
|
||||
update_task_stage(task_id, "analysis")
|
||||
notify_stage_change(task_id, current_stage, "analysis")
|
||||
plane_notify_stage(work_item_id, current_stage, "analysis")
|
||||
result.rolled_back_to = "analysis"
|
||||
set_issue_in_progress(work_item_id)
|
||||
with open(conflict_path, "r") as cf:
|
||||
conflict_text = cf.read()[:500]
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. "
|
||||
f"\u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}",
|
||||
)
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
|
||||
f"See docs/work-items/{work_item_id}/10-conflict.md"
|
||||
)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = "analyst"
|
||||
result.enqueued_job_id = new_job
|
||||
logger.info(
|
||||
f"Task {task_id}: architect conflict, enqueued analyst "
|
||||
f"(job_id={new_job})"
|
||||
)
|
||||
Reference in New Issue
Block a user