Compare commits

...

8 Commits

Author SHA1 Message Date
Dev Agent
4ac449ff63 test(webhook): cover delivery dedup + migration safety (M-7) 2026-06-03 09:18:02 +03:00
Dev Agent
e6a7c6de8d feat(webhook): dedup deliveries by delivery_id (M-7) 2026-06-03 09:18:02 +03:00
Dev Agent
0b924208dc feat(db): add events.delivery_id + partial unique index (M-7) 2026-06-03 09:18:02 +03:00
2f0fd24670 Merge pull request 'ORCH-4: unified stage-engine (M-3)' (#5) from feature/ORCH-4-stage-engine into main 2026-06-03 08:59:51 +03:00
Dev Agent
6abdc220d2 test(stage): cover unified stage_engine + launcher/plane delegation
18 tests: happy-path advance per stage with correct agent (ORCH-4 fix),
QG-fail no-advance, reviewer REQUEST_CHANGES rollback+retry/alert, tester FAIL
rollback+retry/block, architect conflict rollback to analysis, analyst
approved-flow no-advance, and launcher+plane both delegating to the engine.
2026-06-03 08:56:25 +03:00
Dev Agent
51401a3ba9 refactor(launcher,plane): delegate stage advance to stage_engine
launcher._try_advance_stage and plane._try_advance_stage are now thin
wrappers over stage_engine.advance_stage. The plane webhook calls the sync
engine via asyncio.to_thread so there is exactly one implementation. The
launcher forwards finished_agent so the agent-specific rollback branches still
fire; the webhook passes None (human :approved:), matching prior behavior.

Also fixes the agent-selection bug in the launcher path: it used to enqueue
get_agent_for_stage(next_stage) (skipping a stage, e.g. analysis->architecture
launched developer instead of architect). The unified engine uses
get_agent_for_stage(current_stage), consistent with plane and gitea.
2026-06-03 08:56:25 +03:00
Dev Agent
0befc49b1e refactor(stage): extract unified stage_engine.advance_stage (M-3)
Merge the two diverged _try_advance_stage implementations (launcher sync +
plane async) into one synchronous engine. Preserves all launcher business
logic (analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester
FAIL rollback+retry, architect conflict rollback) and the plane
check_review_approved PR-by-branch dispatch. Unifies the QG signature
dispatch. Fixes agent selection: advancing FROM current_stage launches
get_agent_for_stage(current_stage), not next_stage.
2026-06-03 08:56:14 +03:00
fd554c8a5a Merge pull request 'ORCH-7: cleanup + hardening (M-4 dead code + M-2 graceful timeout)' (#4) from feature/ORCH-7-hardening into main 2026-06-03 08:31:26 +03:00
8 changed files with 1258 additions and 258 deletions

View File

@@ -564,7 +564,15 @@ class AgentLauncher:
pass
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
"""After agent finishes successfully, check QG and advance stage if possible."""
"""After agent finishes successfully, advance the stage via the unified engine.
ORCH-4 / M-3: the 174-line body that used to live here moved into
src/stage_engine.advance_stage(). This is now a thin wrapper: it looks up
the task by (repo, branch) and delegates. `agent` is forwarded as
finished_agent so the analyst/reviewer/tester/architect rollback branches
still trigger exactly as before. The agent-selection bug (it used to call
get_agent_for_stage(next_stage)) is fixed inside the engine.
"""
try:
conn = get_db()
task_row = conn.execute(
@@ -576,174 +584,15 @@ class AgentLauncher:
return
task_id, current_stage, work_item_id = task_row
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
if not next_stage:
return
# Run QG check if defined
if qg_name and qg_name in QG_CHECKS:
check_fn = QG_CHECKS[qg_name]
if qg_name in ("check_analysis_approved",):
# Requires human approval - post request comment if analyst just finished
if agent == "analyst" and qg_name == "check_analysis_approved" and work_item_id:
files_check = QG_CHECKS.get("check_analysis_complete")
if files_check:
files_ok, _ = files_check(repo, work_item_id, branch)
if files_ok:
# Full artifacts ready -> In Review
from ..plane_sync import set_issue_in_review
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture."
)
notify_approve_requested(task_id)
logger.info(f"Task {task_id}: analyst finished, requested :approved: in Plane")
else:
# Check if questions file exists (in the task worktree)
import os as _os
questions_path = _os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/01-questions.md"
)
if _os.path.isfile(questions_path):
# Analyst has questions -> Needs Input
from ..plane_sync import set_issue_needs_input
set_issue_needs_input(work_item_id)
with open(questions_path, "r") as qf:
questions_text = qf.read()
plane_add_comment(
work_item_id,
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}"
)
from ..notifications import send_telegram
send_telegram(
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
)
else:
# No artifacts and no questions
plane_add_comment(
work_item_id,
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433."
)
return
elif qg_name in ("check_ci_green", "check_tests_local"):
# (repo, branch) signature — already worktree-aware.
passed, reason = check_fn(repo, branch)
elif qg_name == "check_tests_passed":
# Artifact check — pass branch so it reads from the worktree.
passed, reason = check_fn(repo, work_item_id or "", branch)
else:
# Other artifact checks (check_architecture_done, etc.) — worktree-aware.
passed, reason = check_fn(repo, work_item_id or "", branch)
if not passed:
logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}")
# If reviewer says REQUEST_CHANGES, rollback to development
if agent == "reviewer" and "REQUEST_CHANGES" in reason:
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
# Count retries
conn2 = get_db()
retry_count = conn2.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,)
).fetchone()[0]
conn2.close()
if retry_count < 3:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
f"(attempt {retry_count+1}/3). Fix findings in "
f"docs/work-items/{work_item_id}/12-review.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer (job_id={new_job})")
else:
from ..notifications import send_telegram
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
logger.error(f"Task {task_id}: max retries reached")
# Task 6: Tester FAIL -> rollback to development
if agent == "tester" and qg_name == "check_tests_passed" and not passed:
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
plane_add_comment(
work_item_id,
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
)
conn2 = get_db()
retry_count = conn2.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,)
).fetchone()[0]
conn2.close()
if retry_count < 3:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: Tests FAILED. "
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})")
else:
from ..notifications import send_telegram
from ..plane_sync import set_issue_blocked
set_issue_blocked(work_item_id)
send_telegram(f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.")
# Task 8: Architect conflict -> rollback to analysis
if agent == "architect" and qg_name == "check_architecture_done" and not passed:
import os as _os
conflict_path = _os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/10-conflict.md"
)
if _os.path.isfile(conflict_path):
update_task_stage(task_id, "analysis")
notify_stage_change(task_id, current_stage, "analysis")
plane_notify_stage(work_item_id, current_stage, "analysis")
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
with open(conflict_path, "r") as cf:
conflict_text = cf.read()[:500]
plane_add_comment(
work_item_id,
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. \u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}"
)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
f"See docs/work-items/{work_item_id}/10-conflict.md"
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: architect conflict, enqueued analyst (job_id={new_job})")
return
return
elif qg_name:
return
# Advance stage
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
logger.info(f"Task {task_id}: {current_stage} -> {next_stage} (auto-advance after {agent})")
# Launch next agent if defined
next_agent = get_agent_for_stage(next_stage)
if next_agent:
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})")
from ..stage_engine import advance_stage
advance_stage(
task_id=task_id,
current_stage=current_stage,
repo=repo,
work_item_id=work_item_id,
branch=branch,
finished_agent=agent,
)
except Exception as e:
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")

View File

@@ -67,6 +67,17 @@ def init_db():
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
_ensure_column(conn, "jobs", "available_at", "TEXT")
# ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL
# unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows
# (which have NULL delivery_id) never collide with each other. Restart-safe:
# _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT
# EXISTS is a no-op once the index exists, so this is safe on the live prod DB.
_ensure_column(conn, "events", "delivery_id", "TEXT")
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery "
"ON events(delivery_id) WHERE delivery_id IS NOT NULL"
)
conn.commit()
conn.close()
@@ -141,6 +152,33 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
return f"{prefix}-{next_num:03d}"
# ---------------------------------------------------------------------------
# ORCH-5 (M-7): idempotent webhook event logging
# ---------------------------------------------------------------------------
def insert_event_dedup(
source: str, event_type: str, payload: str, delivery_id: str
) -> bool:
"""Idempotently log a webhook event keyed by delivery_id.
Returns True if a NEW row was inserted (caller should dispatch the event) and
False if this delivery_id was already present (a duplicate delivery -> caller
must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE
index idx_events_delivery; rowcount==1 means the row was actually inserted.
"""
conn = get_db()
try:
cur = conn.execute(
"INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) "
"VALUES (?, ?, ?, ?)",
(source, event_type, payload, delivery_id),
)
conn.commit()
return cur.rowcount == 1
finally:
conn.close()
# ---------------------------------------------------------------------------
# ORCH-1 (F-2b): job queue helpers
# ---------------------------------------------------------------------------

425
src/stage_engine.py Normal file
View File

@@ -0,0 +1,425 @@
"""Unified stage engine (ORCH-4 / M-3).
Single source of truth for "an agent finished / a human approved -> run the
stage's quality gate and either advance the pipeline or roll it back".
Before ORCH-4 this logic was duplicated in two places that had silently
diverged:
- src/agents/launcher.py::_try_advance_stage (sync, rich business logic:
analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester FAIL
rollback+retry, architect conflict rollback) — but it picked the next agent
with get_agent_for_stage(next_stage), which is WRONG.
- src/webhooks/plane.py::_try_advance_stage (async, leaner, but it had the
check_review_approved PR-by-branch dispatch and used the CORRECT
get_agent_for_stage(current_stage)).
This module merges both into one sync `advance_stage(...)`. launcher calls it
directly; the plane webhook calls it through asyncio.to_thread so there is
exactly one implementation.
Agent-selection bug fix (ORCH-4):
stages.py defines `agent` as "the agent to launch when advancing FROM this
stage". So when advancing current -> next, the correct agent to launch is
get_agent_for_stage(current_stage). launcher's old next_stage lookup skipped a
stage (e.g. analysis->architecture launched 'developer' instead of
'architect'). plane and gitea already used current_stage; we unify on that.
"""
import logging
import os
from dataclasses import dataclass, field
from .db import get_db, update_task_stage, enqueue_job
from .stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
from .git_worktree import get_worktree_path
from .qg.checks import QG_CHECKS
from .notifications import (
notify_stage_change,
notify_qg_failure,
notify_approve_requested,
send_telegram,
)
from .plane_sync import (
notify_stage_change as plane_notify_stage,
notify_qg_failure as plane_notify_qg,
add_comment as plane_add_comment,
set_issue_in_review,
set_issue_needs_input,
set_issue_in_progress,
set_issue_blocked,
)
from .config import settings
logger = logging.getLogger("orchestrator.stage_engine")
MAX_DEVELOPER_RETRIES = 3
@dataclass
class AdvanceResult:
"""Outcome of an advance_stage() call (mostly for tests/observability)."""
advanced: bool = False
from_stage: str | None = None
to_stage: str | None = None
enqueued_agent: str | None = None
enqueued_job_id: int | None = None
qg_name: str | None = None
qg_passed: bool | None = None
qg_reason: str | None = None
rolled_back_to: str | None = None
alerted: bool = False
note: str | None = None
notes: list = field(default_factory=list)
def _run_qg(qg_name: str, repo: str, work_item_id: str, branch: str):
"""Dispatch a quality-gate check to the right signature and run it.
Signatures (unified from launcher + plane):
- check_ci_green / check_tests_local -> (repo, branch)
- check_review_approved -> (repo, pr_number) [PR found by branch]
- everything else (artifact checks) -> (repo, work_item_id, branch)
Returns (passed: bool, reason: str).
"""
check_fn = QG_CHECKS.get(qg_name)
if not check_fn:
logger.error(f"QG function '{qg_name}' not found in registry")
return False, f"Unknown QG: {qg_name}"
if qg_name in ("check_ci_green", "check_tests_local"):
# (repo, branch) — already worktree-aware.
return check_fn(repo, branch)
if qg_name == "check_review_approved":
# Special case kept from plane: find the open PR for this branch via
# Gitea, then check it; fall back to a file-based review marker.
return _check_review_approved_by_branch(check_fn, repo, work_item_id, branch)
# All other artifact checks: (repo, work_item_id, branch). Pass branch so the
# check reads from the task worktree (ORCH-2 / S-4).
return check_fn(repo, work_item_id or "", branch)
def _check_review_approved_by_branch(check_fn, repo: str, work_item_id: str, branch: str):
"""check_review_approved dispatch preserved from plane._try_advance_stage.
Finds the open PR whose head ref == branch via the Gitea API and runs
check_review_approved(repo, pr_number). If no open PR exists, falls back to a
file-based review marker (12-review.md / 09-review.md) like the original.
"""
import httpx as _httpx
owner = settings.gitea_owner
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50"
headers = {"Authorization": f"token {settings.gitea_token}"}
try:
resp = _httpx.get(url, headers=headers, timeout=10)
prs = resp.json()
pr_number = None
for pr in prs:
if pr.get("head", {}).get("ref") == branch:
pr_number = pr["number"]
break
if pr_number:
return check_fn(repo, pr_number)
# No open PR but a review file may exist — check file-based.
wt = get_worktree_path(repo, branch)
if not os.path.isdir(wt):
wt = os.path.join(settings.repos_dir, repo)
review_path = os.path.join(wt, f"docs/work-items/{work_item_id}/12-review.md")
review_path2 = os.path.join(wt, f"docs/work-items/{work_item_id}/09-review.md")
if os.path.isfile(review_path) or os.path.isfile(review_path2):
return True, "Review file exists (file-based approval)"
return False, "No open PR found and no review file"
except Exception as e:
return False, f"Error finding PR: {e}"
def _developer_retry_count(task_id: int) -> int:
"""How many developer runs have already happened for this task."""
conn = get_db()
n = conn.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,),
).fetchone()[0]
conn.close()
return n
def advance_stage(
task_id: int,
current_stage: str,
repo: str,
work_item_id: str,
branch: str,
finished_agent: str | None = None,
) -> AdvanceResult:
"""Run the current stage's quality gate and advance / roll back the pipeline.
This is the single merged implementation (ORCH-4 / M-3). It is synchronous;
the async plane webhook calls it via asyncio.to_thread.
Args:
task_id: tasks.id
current_stage: the stage the task is currently in
repo: repository name
work_item_id: Plane work item id (may be "" / None)
branch: feature branch
finished_agent: the agent that just finished (launcher path). Drives the
approved/REQUEST_CHANGES/tester/architect branches. In the
plane webhook path it is None, so those agent-specific
branches simply do not trigger (matches old plane behavior).
Returns AdvanceResult describing what happened.
"""
result = AdvanceResult(from_stage=current_stage)
agent = finished_agent
try:
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
result.qg_name = qg_name
result.to_stage = next_stage
if not next_stage:
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
result.note = "terminal"
return result
# --- Quality gate ----------------------------------------------------
if qg_name and qg_name in QG_CHECKS:
# Human-approval gate: special analyst approved-flow (launcher only).
if qg_name == "check_analysis_approved":
_handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result
)
return result
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
result.qg_passed = passed
result.qg_reason = reason
if not passed:
logger.info(
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
)
# Behaviour parity:
# - webhook path (finished_agent is None): emit the generic
# QG-failure notification, exactly like the old plane handler.
# - launcher path (finished_agent set): NO generic notification;
# the rollback branches below own their own messaging, exactly
# like the old launcher handler.
if agent is None:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
_handle_qg_failure_rollbacks(
task_id, current_stage, repo, work_item_id, branch,
agent, qg_name, reason, result,
)
return result
elif qg_name:
# QG name set but not registered — do not advance (launcher behavior).
result.note = f"qg '{qg_name}' not in registry"
return result
# --- Advance ---------------------------------------------------------
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
result.advanced = True
logger.info(
f"Task {task_id}: {current_stage} -> {next_stage} "
f"(auto-advance after {agent})"
)
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
next_agent = get_agent_for_stage(current_stage)
if next_agent:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\n"
f"Branch: {branch}\nStage: {next_stage}"
)
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
result.enqueued_agent = next_agent
result.enqueued_job_id = new_job_id
logger.info(
f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})"
)
return result
except Exception as e:
logger.error(f"advance_stage failed for task_id={task_id}: {e}")
result.note = f"error: {e}"
return result
def _handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
):
"""Analyst approved-flow (launcher only).
Only triggers when the analyst just finished (agent == 'analyst') in the
launcher path. Decides between: artifacts ready -> In Review + request
:approved:; questions file -> Needs Input; otherwise a warning comment.
This gate never advances on its own (human approval does that via the plane
webhook), matching the original launcher behavior.
"""
result.qg_name = "check_analysis_approved"
result.note = "analysis-approval-gate"
if not (agent == "analyst" and work_item_id):
return
files_check = QG_CHECKS.get("check_analysis_complete")
if not files_check:
return
files_ok, _ = files_check(repo, work_item_id, branch)
if files_ok:
# Full artifacts ready -> In Review, ask for :approved:.
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: "
"\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.",
)
notify_approve_requested(task_id)
result.note = "analysis-in-review"
logger.info(
f"Task {task_id}: analyst finished, requested :approved: in Plane"
)
return
questions_path = os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/01-questions.md",
)
if os.path.isfile(questions_path):
set_issue_needs_input(work_item_id)
with open(questions_path, "r") as qf:
questions_text = qf.read()
plane_add_comment(
work_item_id,
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}",
)
send_telegram(
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
)
result.note = "analysis-needs-input"
return
# No artifacts and no questions.
plane_add_comment(
work_item_id,
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433.",
)
result.note = "analysis-empty"
def _handle_qg_failure_rollbacks(
task_id, current_stage, repo, work_item_id, branch,
agent, qg_name, reason, result: AdvanceResult,
):
"""All rollback/retry branches from the original launcher, preserved verbatim.
Only fire on the launcher path (finished_agent is set). The webhook path
passes finished_agent=None, so none of these agent-specific branches trigger
— that matches the old plane behavior (it just reported the QG failure).
"""
# Reviewer REQUEST_CHANGES -> rollback to development + retry (max 3).
if agent == "reviewer" and "REQUEST_CHANGES" in (reason or ""):
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
retry_count = _developer_retry_count(task_id)
if retry_count < MAX_DEVELOPER_RETRIES:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
f"(attempt {retry_count+1}/3). Fix findings in "
f"docs/work-items/{work_item_id}/12-review.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
result.enqueued_agent = "developer"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer "
f"(job_id={new_job})"
)
else:
send_telegram(
f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. "
f"Manual intervention needed."
)
result.alerted = True
logger.error(f"Task {task_id}: max retries reached")
# Tester check_tests_passed FAIL -> rollback to development + retry (max 3).
if agent == "tester" and qg_name == "check_tests_passed":
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
set_issue_in_progress(work_item_id)
plane_add_comment(
work_item_id,
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. "
f"Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
)
retry_count = _developer_retry_count(task_id)
if retry_count < MAX_DEVELOPER_RETRIES:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: Tests FAILED. "
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
result.enqueued_agent = "developer"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})"
)
else:
set_issue_blocked(work_item_id)
send_telegram(
f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer "
f"retries. Manual intervention needed."
)
result.alerted = True
# Architect conflict (10-conflict.md exists) -> rollback to analysis.
if agent == "architect" and qg_name == "check_architecture_done":
conflict_path = os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/10-conflict.md",
)
if os.path.isfile(conflict_path):
update_task_stage(task_id, "analysis")
notify_stage_change(task_id, current_stage, "analysis")
plane_notify_stage(work_item_id, current_stage, "analysis")
result.rolled_back_to = "analysis"
set_issue_in_progress(work_item_id)
with open(conflict_path, "r") as cf:
conflict_text = cf.read()[:500]
plane_add_comment(
work_item_id,
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. "
f"\u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}",
)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
f"See docs/work-items/{work_item_id}/10-conflict.md"
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
result.enqueued_agent = "analyst"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: architect conflict, enqueued analyst "
f"(job_id={new_job})"
)

52
src/webhooks/_dedup.py Normal file
View File

@@ -0,0 +1,52 @@
"""ORCH-5 (M-7): webhook delivery de-duplication helper.
Webhook providers (Gitea/Plane) retry deliveries on timeout, network reset, or
manual replay. Without idempotency a retried delivery re-enters the pipeline and
spawns a duplicate run (the ET-009 incident class: parallel conveyors on one
repo). This module computes a stable per-delivery id so the webhook handlers can
INSERT-OR-IGNORE into events and skip the dispatch on a repeat.
delivery_id format: ``f"{source}:{raw_or_hash}"`` where source prefixes
gitea/plane so their id-spaces never collide. ``raw`` is the provider's native
delivery header (a GUID) when present; otherwise we fall back to a sha256 of the
body (a retried identical body yields the same hash).
"""
import hashlib
def _sha256_hex(*parts: str) -> str:
h = hashlib.sha256()
for p in parts:
h.update(p.encode("utf-8", "replace"))
return h.hexdigest()
def gitea_delivery_id(headers, event_type: str, body: bytes) -> str:
"""Compute the delivery_id for a Gitea webhook.
Prefers the ``X-Gitea-Delivery`` header (a per-delivery GUID). Falls back to
sha256(source + event_type + body) so a retried identical body still maps to
one id even if Gitea omitted the header.
"""
raw = (headers.get("X-Gitea-Delivery") or "").strip()
if not raw:
raw = _sha256_hex("gitea", event_type or "", body.decode("utf-8", "replace"))
return f"gitea:{raw}"
def plane_delivery_id(headers, body: bytes) -> str:
"""Compute the delivery_id for a Plane webhook.
Plane does not reliably send a delivery header, so we try a couple of common
names and otherwise fall back to sha256("plane" + body): a retried identical
body yields the same id.
"""
raw = (
headers.get("X-Plane-Delivery")
or headers.get("X-Hook-Delivery")
or ""
).strip()
if not raw:
raw = _sha256_hex("plane", body.decode("utf-8", "replace"))
return f"plane:{raw}"

View File

@@ -10,7 +10,14 @@ import httpx
from fastapi import APIRouter, Request, HTTPException
from ..config import settings
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
from ..db import (
get_db,
get_task_by_repo_branch,
update_task_stage,
enqueue_job,
insert_event_dedup,
)
from ._dedup import gitea_delivery_id
from ..stages import get_next_stage, get_agent_for_stage
from ..qg.checks import check_ci_green, check_review_approved
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
@@ -51,15 +58,17 @@ async def gitea_webhook(request: Request):
payload = json.loads(body)
# Log event
conn = get_db()
# ORCH-5 (M-7): idempotent logging. Compute a stable delivery_id (X-Gitea-Delivery
# GUID, or sha256 fallback) and INSERT OR IGNORE. A repeated delivery (Gitea retry
# / manual replay) returns inserted=False -> log + return {"status":"duplicate"}
# WITHOUT re-dispatching, so the pipeline is not re-triggered (ET-009 class).
# Runs AFTER HMAC verification above.
event_type = request.headers.get("X-Gitea-Event", "unknown")
conn.execute(
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
("gitea", event_type, body.decode()),
)
conn.commit()
conn.close()
delivery_id = gitea_delivery_id(request.headers, event_type, body)
inserted = insert_event_dedup("gitea", event_type, body.decode(), delivery_id)
if not inserted:
logger.info(f"Gitea webhook duplicate delivery_id={delivery_id}, skipping dispatch")
return {"status": "duplicate"}
if event_type == "push":
await handle_push(payload)

View File

@@ -15,7 +15,9 @@ from ..db import (
get_next_work_item_id,
update_task_stage,
enqueue_job,
insert_event_dedup,
)
from ._dedup import plane_delivery_id
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
from ..qg.checks import QG_CHECKS
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
@@ -61,14 +63,18 @@ async def plane_webhook(request: Request):
payload = json.loads(body)
# Log event
conn = get_db()
conn.execute(
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
("plane", payload.get("event", "unknown"), body.decode()),
)
conn.commit()
conn.close()
# ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the
# delivery_id falls back to sha256("plane" + body) (a retried identical body maps
# to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return
# {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6
# project filter, so a repeat does no extra work; the FIRST delivery of an unknown
# project still falls through to the filter below and returns {"status":"ignored"}.
event_type = payload.get("event", "unknown")
delivery_id = plane_delivery_id(request.headers, body)
inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id)
if not inserted:
logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch")
return {"status": "duplicate"}
event = payload.get("event")
action = payload.get("action", "")
@@ -318,81 +324,30 @@ async def handle_comment(data: dict, project_id: str = ""):
async def _try_advance_stage(
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str
):
"""Run QG check for current stage and advance if passed."""
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
"""Thin async wrapper over the unified stage engine (ORCH-4 / M-3).
if not next_stage:
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
return
The QG dispatch (including the check_review_approved PR-by-branch logic) and
the advance/launch logic now live in src/stage_engine.advance_stage(), which
is synchronous. We run it off the event loop via asyncio.to_thread so there
is exactly one implementation shared with the launcher.
# Run QG check if one is required
if qg_name:
qg_func = QG_CHECKS.get(qg_name)
if not qg_func:
logger.error(f"QG function '{qg_name}' not found in registry")
return
finished_agent is None on this webhook path (a human :approved: comment, not
a finished agent), so the agent-specific rollback branches inside the engine
intentionally do not trigger — identical to the old plane behavior, which
only ran the QG and either advanced or reported the failure.
"""
import asyncio
from ..stage_engine import advance_stage
# Determine args based on QG function
if qg_name in ("check_analysis_approved", "check_analysis_complete", "check_architecture_done", "check_tests_passed", "check_reviewer_verdict"):
# ORCH-2 / S-4: pass branch so artifacts are read from the task worktree.
passed, reason = qg_func(repo, work_item_id, branch)
elif qg_name in ("check_ci_green", "check_tests_local"):
passed, reason = qg_func(repo, branch)
elif qg_name == "check_review_approved":
# Find PR number by branch via Gitea API
import httpx as _httpx
from ..config import settings as _s
_owner = _s.gitea_owner
_url = f"{_s.gitea_url}/api/v1/repos/{_owner}/{repo}/pulls?state=open&limit=50"
_headers = {"Authorization": f"token {_s.gitea_token}"}
try:
_resp = _httpx.get(_url, headers=_headers, timeout=10)
_prs = _resp.json()
_pr_number = None
for _pr in _prs:
if _pr.get("head", {}).get("ref") == branch:
_pr_number = _pr["number"]
break
if _pr_number:
passed, reason = qg_func(repo, _pr_number)
else:
# No open PR but review file exists — check file-based
import os
from ..git_worktree import get_worktree_path as _gwp
_wt = _gwp(repo, branch) if os.path.isdir(_gwp(repo, branch)) else os.path.join(_s.repos_dir, repo)
_review_path = os.path.join(_wt, f"docs/work-items/{work_item_id}/12-review.md")
_review_path2 = os.path.join(_wt, f"docs/work-items/{work_item_id}/09-review.md")
if os.path.isfile(_review_path) or os.path.isfile(_review_path2):
passed, reason = True, "Review file exists (file-based approval)"
else:
passed, reason = False, "No open PR found and no review file"
except Exception as _e:
passed, reason = False, f"Error finding PR: {_e}"
else:
passed, reason = False, f"Unknown QG: {qg_name}"
if not passed:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
return
# Advance stage
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
# Launch agent associated with the current stage's transition
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo, task_desc, task_id=task_id)
plane_notify_stage(work_item_id, current_stage, next_stage, agent)
logger.info(f"Task {task_id}: enqueued agent '{agent}', job_id={job_id}")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
logger.error(f"Agent launch failed: {e}")
await asyncio.to_thread(
advance_stage,
task_id,
current_stage,
repo,
work_item_id,
branch,
None,
)
async def _create_gitea_branch(repo: str, branch: str):

395
tests/test_stage_engine.py Normal file
View File

@@ -0,0 +1,395 @@
"""ORCH-4 / M-3: tests for the unified stage engine (src/stage_engine.advance_stage).
These verify the MERGED behavior of what used to be two diverged
_try_advance_stage implementations (launcher sync + plane async):
* happy-path advance for every stage launches the CORRECT agent
(the ORCH-4 fix: agent = get_agent_for_stage(current_stage), NOT next_stage);
* a QG failure does not advance;
* reviewer REQUEST_CHANGES -> rollback to development + enqueue developer;
* developer retries > 3 -> telegram alert, no further enqueue;
* tester FAIL -> rollback to development + enqueue developer;
* architect conflict (10-conflict.md) -> rollback to analysis + enqueue analyst;
* launcher AND plane both delegate to the engine.
Network/Plane/Telegram side effects are mocked at the src.stage_engine level so
the engine runs against a real isolated sqlite DB.
"""
import os
import tempfile
import pytest
# Isolated test DB (same convention as the other suites).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_stage_engine.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock, patch # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
from src.stages import get_agent_for_stage # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch):
"""Fresh isolated DB per test."""
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
"""Mock all Plane/Telegram/notification side effects in the engine.
Everything imported into src.stage_engine that touches the network or sends
a message becomes a no-op MagicMock so tests are deterministic and offline.
"""
for name in (
"notify_stage_change",
"notify_qg_failure",
"notify_approve_requested",
"send_telegram",
"plane_notify_stage",
"plane_notify_qg",
"plane_add_comment",
"set_issue_in_review",
"set_issue_needs_input",
"set_issue_in_progress",
"set_issue_blocked",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo="enduro-trails", branch="feature/ET-001-x", wi="ET-001"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _jobs():
conn = get_db()
rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall()
conn.close()
return [dict(r) for r in rows]
def _add_developer_runs(task_id, n):
conn = get_db()
for _ in range(n):
conn.execute(
"INSERT INTO agent_runs (task_id, agent) VALUES (?, 'developer')",
(task_id,),
)
conn.commit()
conn.close()
def _pass(*a, **k):
return (True, "ok")
def _fail(reason):
def _f(*a, **k):
return (False, reason)
return _f
# ---------------------------------------------------------------------------
# Happy path: each stage advances and launches the CORRECT agent (ORCH-4 fix)
# ---------------------------------------------------------------------------
class TestHappyPathAgentSelection:
"""The fixed agent-selection: when advancing FROM current_stage, the engine
must enqueue get_agent_for_stage(current_stage), NOT next_stage.
"""
@pytest.mark.parametrize(
"current_stage,expected_next,expected_agent",
[
("architecture", "development", "developer"),
("development", "review", "reviewer"),
("review", "testing", "tester"),
("testing", "deploy", "deployer"),
],
)
def test_advance_launches_current_stage_agent(
self, monkeypatch, current_stage, expected_next, expected_agent
):
# All QG checks pass for this happy-path suite.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task(current_stage)
res = advance_stage(
task_id, current_stage, "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None,
)
assert res.advanced is True
assert res.to_stage == expected_next
assert _stage(task_id) == expected_next
# The ORCH-4 fix: correct agent == get_agent_for_stage(current_stage).
assert expected_agent == get_agent_for_stage(current_stage)
assert res.enqueued_agent == expected_agent
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == expected_agent
def test_deploy_to_done_no_agent(self, monkeypatch):
"""deploy -> done advances but launches no agent (terminal-ish)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task("deploy")
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert res.advanced is True
assert _stage(task_id) == "done"
assert res.enqueued_agent is None
assert _jobs() == []
def test_done_is_terminal(self):
task_id = _make_task("done")
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert res.advanced is False
assert _stage(task_id) == "done"
# ---------------------------------------------------------------------------
# QG failure: do not advance
# ---------------------------------------------------------------------------
class TestQgFailureDoesNotAdvance:
def test_qg_fail_keeps_stage(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.qg_passed is False
assert _stage(task_id) == "architecture"
assert _jobs() == []
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
)
task_id = _make_task("development")
advance_stage(task_id, "development", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert stage_engine.notify_qg_failure.called
assert stage_engine.plane_notify_qg.called
def test_launcher_path_no_generic_qg_notification(self, monkeypatch):
"""finished_agent set -> NO generic QG notification (launcher parity)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
)
task_id = _make_task("architecture")
advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert not stage_engine.notify_qg_failure.called
# ---------------------------------------------------------------------------
# Reviewer REQUEST_CHANGES -> rollback to development + enqueue developer
# ---------------------------------------------------------------------------
class TestReviewerRequestChanges:
def test_rollback_and_enqueue_developer(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
)
task_id = _make_task("review")
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="reviewer")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "developer"
def test_retry_over_3_alerts_no_enqueue(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
)
task_id = _make_task("review")
_add_developer_runs(task_id, 3) # already at the max
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="reviewer")
assert res.rolled_back_to == "development"
assert res.alerted is True
assert stage_engine.send_telegram.called
# No new developer job enqueued past the retry cap.
assert _jobs() == []
# ---------------------------------------------------------------------------
# Tester FAIL -> rollback to development + enqueue developer
# ---------------------------------------------------------------------------
class TestTesterFail:
def test_rollback_and_enqueue_developer(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("2 tests failed")},
)
task_id = _make_task("testing")
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="tester")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "developer"
def test_retry_over_3_blocks_and_alerts(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("still failing")},
)
task_id = _make_task("testing")
_add_developer_runs(task_id, 3)
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="tester")
assert res.rolled_back_to == "development"
assert res.alerted is True
assert stage_engine.set_issue_blocked.called
assert _jobs() == []
# ---------------------------------------------------------------------------
# Architect conflict -> rollback to analysis + enqueue analyst
# ---------------------------------------------------------------------------
class TestArchitectConflict:
def test_conflict_rolls_back_to_analysis(self, monkeypatch, tmp_path):
# 10-conflict.md must exist in the worktree path the engine inspects.
wt = tmp_path / "wt"
conflict_dir = wt / "docs" / "work-items" / "ET-001"
conflict_dir.mkdir(parents=True)
(conflict_dir / "10-conflict.md").write_text("conflict with TRZ")
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("conflict")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.rolled_back_to == "analysis"
assert _stage(task_id) == "analysis"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "analyst"
def test_no_conflict_file_no_rollback(self, monkeypatch, tmp_path):
wt = tmp_path / "wt"
(wt / "docs").mkdir(parents=True)
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("incomplete")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.rolled_back_to is None
assert _stage(task_id) == "architecture"
assert _jobs() == []
# ---------------------------------------------------------------------------
# Analyst approved-flow (analysis gate): never auto-advances
# ---------------------------------------------------------------------------
class TestAnalysisApprovedFlow:
def test_artifacts_ready_requests_approval_no_advance(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_analysis_complete": _pass},
)
task_id = _make_task("analysis")
res = advance_stage(task_id, "analysis", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="analyst")
assert res.advanced is False
assert _stage(task_id) == "analysis"
assert stage_engine.set_issue_in_review.called
assert stage_engine.notify_approve_requested.called
assert _jobs() == []
# ---------------------------------------------------------------------------
# launcher + plane both delegate to the engine
# ---------------------------------------------------------------------------
class TestDelegation:
def test_launcher_calls_engine(self):
from src.agents.launcher import AgentLauncher
task_id = _make_task("development", branch="feature/ET-777-deleg")
with patch("src.stage_engine.advance_stage") as m:
AgentLauncher()._try_advance_stage(
run_id=1, agent="developer", repo="enduro-trails",
branch="feature/ET-777-deleg",
)
m.assert_called_once()
kwargs = m.call_args.kwargs
assert kwargs["task_id"] == task_id
assert kwargs["current_stage"] == "development"
assert kwargs["finished_agent"] == "developer"
def test_plane_calls_engine(self):
import asyncio
from src.webhooks import plane as plane_mod
with patch("src.stage_engine.advance_stage") as m:
asyncio.run(
plane_mod._try_advance_stage(
task_id=5, current_stage="analysis", repo="enduro-trails",
work_item_id="ET-001", branch="feature/ET-001-x",
)
)
m.assert_called_once()
# plane passes positional args; finished_agent (last positional) is None.
args = m.call_args.args
assert args[0] == 5
assert args[1] == "analysis"
assert args[-1] is None

277
tests/test_webhook_dedup.py Normal file
View File

@@ -0,0 +1,277 @@
"""ORCH-5 (M-7): webhook delivery de-duplication tests.
A retried/replayed webhook delivery must be processed exactly once. We mock
enqueue_job (imported into the gitea/plane module namespaces) and assert its
call_count does not grow on a repeat. HMAC is bypassed here by forcing the
webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate
baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature
guarantee by re-enabling the secret.
"""
import os
import tempfile
from unittest.mock import patch, AsyncMock
import pytest
# Override DB path + project registry BEFORE importing app (same pattern as
# tests/test_webhooks.py).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
os.environ["ORCH_GITEA_OWNER"] = "admin"
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
os.environ["ORCH_PROJECTS_JSON"] = (
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
)
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import db as db_module # noqa: E402
from src.webhooks import gitea as gitea_mod # noqa: E402
from src.webhooks import plane as plane_mod # noqa: E402
from src import projects as projects_mod # noqa: E402
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
# settings is a process-wide singleton; another test module may have fixed
# settings.db_path to its own file at import time. get_db() reads it live, so
# pin it to OUR db for the duration of each test here.
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
@pytest.fixture(autouse=True)
def proj_registry():
"""Pin the shared project registry to proj-1/enduro-trails.
The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton
built at import; test_projects.py rebuilds it via reload_projects(), which can
leave it on the built-in default where proj-1 is unknown -> ORCH-6 would
ignore our fixtures. Force ours for each test, then rebuild after.
"""
os.environ["ORCH_PROJECTS_JSON"] = (
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
)
projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"]
projects_mod.reload_projects()
yield
projects_mod.reload_projects()
@pytest.fixture(autouse=True)
def no_hmac(monkeypatch):
"""Bypass HMAC so dedup behavior (not signing) is under test.
settings is shared, so override the secret on the module-level settings that
each verify_* function reads.
"""
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False)
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False)
yield
client = TestClient(app)
def _events_count():
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
conn.close()
return n
# ---------------------------------------------------------------------------
# Migration
# ---------------------------------------------------------------------------
def test_migration_adds_delivery_id_and_index():
"""events has delivery_id + a partial unique index idx_events_delivery."""
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()]
conn.close()
assert "delivery_id" in cols
assert "idx_events_delivery" in idxs
def test_migration_on_old_db_without_column_does_not_crash():
"""init_db() over a pre-existing events table WITHOUT delivery_id is safe."""
if os.path.exists(_test_db):
os.unlink(_test_db)
import sqlite3
conn = sqlite3.connect(_test_db)
# Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id.
conn.executescript(
"""
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
source TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
processed INTEGER DEFAULT 0
);
INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}');
INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}');
"""
)
conn.commit()
conn.close()
# Should add the column + index without raising and keep the legacy rows.
init_db()
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
conn.close()
assert "delivery_id" in cols
assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist
# ---------------------------------------------------------------------------
# Gitea dedup
# ---------------------------------------------------------------------------
@patch.object(gitea_mod, "enqueue_job")
def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue):
"""Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}."""
# Task at architecture so the ADR push would enqueue.
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"),
)
conn.commit()
conn.close()
body = {
"ref": "refs/heads/feature/ET-100-x",
"repository": {"name": "enduro-trails"},
"commits": [
{"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []}
],
}
hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"}
r1 = client.post("/webhook/gitea", json=body, headers=hdrs)
assert r1.status_code == 200
assert r1.json()["status"] == "accepted"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
# Same delivery id again -> duplicate, no new enqueue, no new event row.
r2 = client.post("/webhook/gitea", json=body, headers=hdrs)
assert r2.status_code == 200
assert r2.json()["status"] == "duplicate"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
@patch.object(gitea_mod, "enqueue_job")
def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue):
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
r1 = client.post("/webhook/gitea", json=body,
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"})
r2 = client.post("/webhook/gitea", json=body,
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"})
assert r1.json()["status"] == "accepted"
assert r2.json()["status"] == "accepted"
assert _events_count() == 2
def test_gitea_fallback_hash_when_no_delivery_header():
"""No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate."""
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
assert r1.json()["status"] == "accepted"
assert r2.json()["status"] == "duplicate"
assert _events_count() == 1
# ---------------------------------------------------------------------------
# Plane dedup
# ---------------------------------------------------------------------------
@patch.object(plane_mod, "enqueue_job")
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate."""
body = {
"event": "work_item.created",
"data": {
"id": "pd-001",
"name": "Dedup plane task",
"description_stripped": "A sufficiently long description for QG-0 to pass.",
"project": "proj-1",
},
}
r1 = client.post("/webhook/plane", json=body)
assert r1.status_code == 200
assert r1.json()["status"] == "accepted"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
r2 = client.post("/webhook/plane", json=body)
assert r2.status_code == 200
assert r2.json()["status"] == "duplicate"
assert mock_enqueue.call_count == 1 # not re-enqueued
assert _events_count() == 1
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch):
"""ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}."""
body = {
"event": "work_item.created",
"data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"},
}
r1 = client.post("/webhook/plane", json=body)
assert r1.status_code == 200
assert r1.json()["status"] == "ignored"
# Event WAS logged (dedup happens before the project filter), so a retry of the
# SAME body is a duplicate, not re-evaluated.
assert _events_count() == 1
r2 = client.post("/webhook/plane", json=body)
assert r2.json()["status"] == "duplicate"
assert _events_count() == 1
# ---------------------------------------------------------------------------
# HMAC still guarded (acceptance #4) — independent of the dedup path
# ---------------------------------------------------------------------------
def test_gitea_invalid_signature_still_401(monkeypatch):
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False)
r = client.post(
"/webhook/gitea",
json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []},
headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"},
)
assert r.status_code == 401
def test_plane_invalid_signature_still_401(monkeypatch):
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False)
r = client.post(
"/webhook/plane",
json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}},
headers={"X-Plane-Signature": "deadbeef"},
)
assert r.status_code == 401