Compare commits
12 Commits
feature/OR
...
feature/OR
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ac449ff63 | ||
|
|
e6a7c6de8d | ||
|
|
0b924208dc | ||
| 2f0fd24670 | |||
|
|
6abdc220d2 | ||
|
|
51401a3ba9 | ||
|
|
0befc49b1e | ||
| fd554c8a5a | |||
|
|
c167c6930d | ||
|
|
49ecb48eb0 | ||
|
|
237732bc64 | ||
| 4e52e192e4 |
@@ -1,8 +1,10 @@
|
|||||||
import subprocess
|
import subprocess
|
||||||
import os
|
import os
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import signal
|
import signal
|
||||||
|
import time
|
||||||
from ..config import settings
|
from ..config import settings
|
||||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||||
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||||
@@ -53,7 +55,10 @@ class AgentLauncher:
|
|||||||
}
|
}
|
||||||
|
|
||||||
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
|
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
|
||||||
AGENT_TIMEOUT = 1800 # 30 minutes
|
# ORCH-7 (M-2): timeout is now configurable. AGENT_TIMEOUT stays as a
|
||||||
|
# backward-compatible alias for the default; the actual value (and per-agent
|
||||||
|
# overrides) live in settings and are resolved via _resolve_timeout().
|
||||||
|
AGENT_TIMEOUT = settings.agent_timeout_seconds
|
||||||
|
|
||||||
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
|
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
|
||||||
"""
|
"""
|
||||||
@@ -190,7 +195,7 @@ class AgentLauncher:
|
|||||||
t = threading.Thread(
|
t = threading.Thread(
|
||||||
target=self._watchdog,
|
target=self._watchdog,
|
||||||
args=(proc.pid, run_id),
|
args=(proc.pid, run_id),
|
||||||
kwargs={"job_id": job_id},
|
kwargs={"job_id": job_id, "agent": agent},
|
||||||
daemon=True,
|
daemon=True,
|
||||||
)
|
)
|
||||||
t.start()
|
t.start()
|
||||||
@@ -209,29 +214,100 @@ class AgentLauncher:
|
|||||||
notify_agent_started(run_id, agent, task_id)
|
notify_agent_started(run_id, agent, task_id)
|
||||||
return run_id
|
return run_id
|
||||||
|
|
||||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None):
|
@staticmethod
|
||||||
"""Kill agent if it exceeds timeout.
|
def _resolve_timeout(agent: str = None) -> int:
|
||||||
|
"""ORCH-7 (M-2): resolve the wall-clock timeout for an agent.
|
||||||
|
|
||||||
|
Per-agent override from settings.agent_timeout_overrides_json (a JSON object
|
||||||
|
like {"reviewer": 3600}) wins; otherwise the global default
|
||||||
|
settings.agent_timeout_seconds is used. A malformed override JSON is ignored
|
||||||
|
(falls back to the default) and only logged, so a bad env never bricks runs.
|
||||||
|
"""
|
||||||
|
default = settings.agent_timeout_seconds
|
||||||
|
raw = (settings.agent_timeout_overrides_json or "").strip()
|
||||||
|
if agent and raw:
|
||||||
|
try:
|
||||||
|
overrides = json.loads(raw)
|
||||||
|
if isinstance(overrides, dict) and agent in overrides:
|
||||||
|
return int(overrides[agent])
|
||||||
|
except (ValueError, TypeError) as e:
|
||||||
|
logger.warning(f"Invalid agent_timeout_overrides_json, using default: {e}")
|
||||||
|
return default
|
||||||
|
|
||||||
|
def _watchdog(self, pid: int, run_id: int, timeout: int = None,
|
||||||
|
job_id: int = None, agent: str = None):
|
||||||
|
"""Kill agent if it exceeds its timeout.
|
||||||
|
|
||||||
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
|
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
|
||||||
code and drives the job retry/fail logic, so the watchdog itself only needs
|
code and drives the job retry/fail logic, so the watchdog itself only needs
|
||||||
to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry.
|
to terminate the process and record the agent_runs exit. job_id is accepted
|
||||||
|
for symmetry.
|
||||||
|
|
||||||
|
ORCH-7 (M-2): graceful shutdown. Instead of an immediate SIGKILL (which cuts
|
||||||
|
claude off mid-write and leaves half-written artifacts), send SIGTERM first,
|
||||||
|
give the process up to settings.agent_kill_grace_seconds to flush and exit on
|
||||||
|
its own, and only SIGKILL if it is still alive after the grace window. If the
|
||||||
|
process exits during the grace window, SIGKILL is NOT sent.
|
||||||
|
ProcessLookupError is tolerated at every step (the process may already be
|
||||||
|
gone). The recorded exit_code stays -9 to match the existing retry/fail
|
||||||
|
contract regardless of which signal actually reaped it.
|
||||||
"""
|
"""
|
||||||
import time
|
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
timeout = self.AGENT_TIMEOUT
|
timeout = self._resolve_timeout(agent)
|
||||||
time.sleep(timeout)
|
time.sleep(timeout)
|
||||||
|
|
||||||
|
# Phase 1: SIGTERM (graceful). If the process is already gone, we're done.
|
||||||
|
try:
|
||||||
|
os.kill(pid, signal.SIGTERM)
|
||||||
|
logger.warning(
|
||||||
|
f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM "
|
||||||
|
f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s"
|
||||||
|
)
|
||||||
|
except ProcessLookupError:
|
||||||
|
logger.info(f"Agent run_id={run_id} already exited before SIGTERM")
|
||||||
|
return # nothing to record: the monitor's proc.wait() owns the exit
|
||||||
|
|
||||||
|
# Phase 2: poll for graceful exit within the grace window.
|
||||||
|
grace = settings.agent_kill_grace_seconds
|
||||||
|
poll_interval = 0.5
|
||||||
|
waited = 0.0
|
||||||
|
while waited < grace:
|
||||||
|
time.sleep(poll_interval)
|
||||||
|
waited += poll_interval
|
||||||
|
try:
|
||||||
|
os.kill(pid, 0) # signal 0 = liveness probe, does not kill
|
||||||
|
except ProcessLookupError:
|
||||||
|
logger.info(
|
||||||
|
f"Agent run_id={run_id} exited gracefully after SIGTERM "
|
||||||
|
f"({waited:.1f}s); no SIGKILL needed"
|
||||||
|
)
|
||||||
|
self._record_kill(run_id)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Phase 3: still alive -> hard SIGKILL.
|
||||||
try:
|
try:
|
||||||
os.kill(pid, signal.SIGKILL)
|
os.kill(pid, signal.SIGKILL)
|
||||||
logger.warning(f"Agent run_id={run_id} killed after {timeout}s timeout")
|
logger.warning(
|
||||||
conn = get_db()
|
f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL"
|
||||||
conn.execute(
|
|
||||||
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
|
|
||||||
(run_id,),
|
|
||||||
)
|
)
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
except ProcessLookupError:
|
except ProcessLookupError:
|
||||||
pass # Already finished
|
logger.info(f"Agent run_id={run_id} exited just before SIGKILL")
|
||||||
|
self._record_kill(run_id)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _record_kill(run_id: int):
|
||||||
|
"""Stamp the agent_runs row as timeout-killed (exit_code=-9).
|
||||||
|
|
||||||
|
ORCH-1: -9 is the existing kill-exit contract the monitor/retry logic keys
|
||||||
|
off, so we keep it stable whether the reap came from SIGTERM or SIGKILL.
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
|
||||||
|
(run_id,),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
|
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
|
||||||
"""Wait for agent to finish, commit+push results, update DB.
|
"""Wait for agent to finish, commit+push results, update DB.
|
||||||
@@ -488,7 +564,15 @@ class AgentLauncher:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
|
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
|
||||||
"""After agent finishes successfully, check QG and advance stage if possible."""
|
"""After agent finishes successfully, advance the stage via the unified engine.
|
||||||
|
|
||||||
|
ORCH-4 / M-3: the 174-line body that used to live here moved into
|
||||||
|
src/stage_engine.advance_stage(). This is now a thin wrapper: it looks up
|
||||||
|
the task by (repo, branch) and delegates. `agent` is forwarded as
|
||||||
|
finished_agent so the analyst/reviewer/tester/architect rollback branches
|
||||||
|
still trigger exactly as before. The agent-selection bug (it used to call
|
||||||
|
get_agent_for_stage(next_stage)) is fixed inside the engine.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
conn = get_db()
|
conn = get_db()
|
||||||
task_row = conn.execute(
|
task_row = conn.execute(
|
||||||
@@ -500,174 +584,15 @@ class AgentLauncher:
|
|||||||
return
|
return
|
||||||
|
|
||||||
task_id, current_stage, work_item_id = task_row
|
task_id, current_stage, work_item_id = task_row
|
||||||
qg_name = get_qg_for_stage(current_stage)
|
from ..stage_engine import advance_stage
|
||||||
next_stage = get_next_stage(current_stage)
|
advance_stage(
|
||||||
|
task_id=task_id,
|
||||||
if not next_stage:
|
current_stage=current_stage,
|
||||||
return
|
repo=repo,
|
||||||
|
work_item_id=work_item_id,
|
||||||
# Run QG check if defined
|
branch=branch,
|
||||||
if qg_name and qg_name in QG_CHECKS:
|
finished_agent=agent,
|
||||||
check_fn = QG_CHECKS[qg_name]
|
)
|
||||||
if qg_name in ("check_analysis_approved",):
|
|
||||||
# Requires human approval - post request comment if analyst just finished
|
|
||||||
if agent == "analyst" and qg_name == "check_analysis_approved" and work_item_id:
|
|
||||||
files_check = QG_CHECKS.get("check_analysis_complete")
|
|
||||||
if files_check:
|
|
||||||
files_ok, _ = files_check(repo, work_item_id, branch)
|
|
||||||
if files_ok:
|
|
||||||
# Full artifacts ready -> In Review
|
|
||||||
from ..plane_sync import set_issue_in_review
|
|
||||||
set_issue_in_review(work_item_id)
|
|
||||||
plane_add_comment(
|
|
||||||
work_item_id,
|
|
||||||
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
|
|
||||||
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture."
|
|
||||||
)
|
|
||||||
notify_approve_requested(task_id)
|
|
||||||
logger.info(f"Task {task_id}: analyst finished, requested :approved: in Plane")
|
|
||||||
else:
|
|
||||||
# Check if questions file exists (in the task worktree)
|
|
||||||
import os as _os
|
|
||||||
questions_path = _os.path.join(
|
|
||||||
get_worktree_path(repo, branch),
|
|
||||||
f"docs/work-items/{work_item_id}/01-questions.md"
|
|
||||||
)
|
|
||||||
if _os.path.isfile(questions_path):
|
|
||||||
# Analyst has questions -> Needs Input
|
|
||||||
from ..plane_sync import set_issue_needs_input
|
|
||||||
set_issue_needs_input(work_item_id)
|
|
||||||
with open(questions_path, "r") as qf:
|
|
||||||
questions_text = qf.read()
|
|
||||||
plane_add_comment(
|
|
||||||
work_item_id,
|
|
||||||
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}"
|
|
||||||
)
|
|
||||||
from ..notifications import send_telegram
|
|
||||||
send_telegram(
|
|
||||||
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# No artifacts and no questions
|
|
||||||
plane_add_comment(
|
|
||||||
work_item_id,
|
|
||||||
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433."
|
|
||||||
)
|
|
||||||
return
|
|
||||||
elif qg_name in ("check_ci_green", "check_tests_local"):
|
|
||||||
# (repo, branch) signature — already worktree-aware.
|
|
||||||
passed, reason = check_fn(repo, branch)
|
|
||||||
elif qg_name == "check_tests_passed":
|
|
||||||
# Artifact check — pass branch so it reads from the worktree.
|
|
||||||
passed, reason = check_fn(repo, work_item_id or "", branch)
|
|
||||||
else:
|
|
||||||
# Other artifact checks (check_architecture_done, etc.) — worktree-aware.
|
|
||||||
passed, reason = check_fn(repo, work_item_id or "", branch)
|
|
||||||
|
|
||||||
if not passed:
|
|
||||||
logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}")
|
|
||||||
# If reviewer says REQUEST_CHANGES, rollback to development
|
|
||||||
if agent == "reviewer" and "REQUEST_CHANGES" in reason:
|
|
||||||
update_task_stage(task_id, "development")
|
|
||||||
notify_stage_change(task_id, current_stage, "development")
|
|
||||||
plane_notify_stage(work_item_id, current_stage, "development")
|
|
||||||
# Count retries
|
|
||||||
conn2 = get_db()
|
|
||||||
retry_count = conn2.execute(
|
|
||||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
|
||||||
(task_id,)
|
|
||||||
).fetchone()[0]
|
|
||||||
conn2.close()
|
|
||||||
if retry_count < 3:
|
|
||||||
task_desc = (
|
|
||||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
|
||||||
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
|
|
||||||
f"(attempt {retry_count+1}/3). Fix findings in "
|
|
||||||
f"docs/work-items/{work_item_id}/12-review.md"
|
|
||||||
)
|
|
||||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
|
||||||
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer (job_id={new_job})")
|
|
||||||
else:
|
|
||||||
from ..notifications import send_telegram
|
|
||||||
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
|
|
||||||
logger.error(f"Task {task_id}: max retries reached")
|
|
||||||
|
|
||||||
# Task 6: Tester FAIL -> rollback to development
|
|
||||||
if agent == "tester" and qg_name == "check_tests_passed" and not passed:
|
|
||||||
update_task_stage(task_id, "development")
|
|
||||||
notify_stage_change(task_id, current_stage, "development")
|
|
||||||
plane_notify_stage(work_item_id, current_stage, "development")
|
|
||||||
from ..plane_sync import set_issue_in_progress
|
|
||||||
set_issue_in_progress(work_item_id)
|
|
||||||
plane_add_comment(
|
|
||||||
work_item_id,
|
|
||||||
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
|
|
||||||
)
|
|
||||||
conn2 = get_db()
|
|
||||||
retry_count = conn2.execute(
|
|
||||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
|
||||||
(task_id,)
|
|
||||||
).fetchone()[0]
|
|
||||||
conn2.close()
|
|
||||||
if retry_count < 3:
|
|
||||||
task_desc = (
|
|
||||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
|
||||||
f"Stage: development\nNote: Tests FAILED. "
|
|
||||||
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
|
|
||||||
)
|
|
||||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
|
||||||
logger.info(f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})")
|
|
||||||
else:
|
|
||||||
from ..notifications import send_telegram
|
|
||||||
from ..plane_sync import set_issue_blocked
|
|
||||||
set_issue_blocked(work_item_id)
|
|
||||||
send_telegram(f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.")
|
|
||||||
|
|
||||||
# Task 8: Architect conflict -> rollback to analysis
|
|
||||||
if agent == "architect" and qg_name == "check_architecture_done" and not passed:
|
|
||||||
import os as _os
|
|
||||||
conflict_path = _os.path.join(
|
|
||||||
get_worktree_path(repo, branch),
|
|
||||||
f"docs/work-items/{work_item_id}/10-conflict.md"
|
|
||||||
)
|
|
||||||
if _os.path.isfile(conflict_path):
|
|
||||||
update_task_stage(task_id, "analysis")
|
|
||||||
notify_stage_change(task_id, current_stage, "analysis")
|
|
||||||
plane_notify_stage(work_item_id, current_stage, "analysis")
|
|
||||||
from ..plane_sync import set_issue_in_progress
|
|
||||||
set_issue_in_progress(work_item_id)
|
|
||||||
with open(conflict_path, "r") as cf:
|
|
||||||
conflict_text = cf.read()[:500]
|
|
||||||
plane_add_comment(
|
|
||||||
work_item_id,
|
|
||||||
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. \u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}"
|
|
||||||
)
|
|
||||||
task_desc = (
|
|
||||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
|
||||||
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
|
|
||||||
f"See docs/work-items/{work_item_id}/10-conflict.md"
|
|
||||||
)
|
|
||||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
|
||||||
logger.info(f"Task {task_id}: architect conflict, enqueued analyst (job_id={new_job})")
|
|
||||||
return
|
|
||||||
|
|
||||||
return
|
|
||||||
elif qg_name:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Advance stage
|
|
||||||
update_task_stage(task_id, next_stage)
|
|
||||||
notify_stage_change(task_id, current_stage, next_stage)
|
|
||||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
|
||||||
logger.info(f"Task {task_id}: {current_stage} -> {next_stage} (auto-advance after {agent})")
|
|
||||||
|
|
||||||
# Launch next agent if defined
|
|
||||||
next_agent = get_agent_for_stage(next_stage)
|
|
||||||
if next_agent:
|
|
||||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
|
||||||
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
|
|
||||||
logger.info(f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
|
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
|
||||||
|
|
||||||
@@ -703,47 +628,6 @@ class AgentLauncher:
|
|||||||
logger.error(f"Failed to create PR for {branch}: {e}")
|
logger.error(f"Failed to create PR for {branch}: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _auto_merge_pr(self, repo: str, branch: str, task_id: int, work_item_id: str):
|
|
||||||
import httpx
|
|
||||||
owner = settings.gitea_owner
|
|
||||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
|
||||||
base_url = f"{settings.gitea_url}/api/v1"
|
|
||||||
try:
|
|
||||||
resp = httpx.get(
|
|
||||||
f"{base_url}/repos/{owner}/{repo}/pulls",
|
|
||||||
params={"state": "open", "head": branch},
|
|
||||||
headers=headers, timeout=10
|
|
||||||
)
|
|
||||||
resp.raise_for_status()
|
|
||||||
prs = resp.json()
|
|
||||||
if not prs:
|
|
||||||
pr_number = self._ensure_pr(repo, branch, 0)
|
|
||||||
if not pr_number:
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
pr_number = prs[0]["number"]
|
|
||||||
resp = httpx.post(
|
|
||||||
f"{base_url}/repos/{owner}/{repo}/pulls/{pr_number}/merge",
|
|
||||||
json={"Do": "merge"},
|
|
||||||
headers=headers, timeout=30
|
|
||||||
)
|
|
||||||
if resp.status_code in (200, 204):
|
|
||||||
logger.info(f"PR #{pr_number} merged for {branch}")
|
|
||||||
update_task_stage(task_id, "done")
|
|
||||||
notify_stage_change(task_id, "deploy", "done")
|
|
||||||
plane_notify_stage(work_item_id, "deploy", "done")
|
|
||||||
from ..notifications import send_telegram
|
|
||||||
send_telegram(f"\u2705 {work_item_id}: PR #{pr_number} merged! deploy -> done. Task complete.")
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
logger.error(f"Merge failed for PR #{pr_number}: {resp.status_code} {resp.text}")
|
|
||||||
from ..notifications import send_telegram
|
|
||||||
send_telegram(f"\u26a0\ufe0f {work_item_id}: Auto-merge failed (HTTP {resp.status_code}). Manual merge needed.")
|
|
||||||
return False
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Auto-merge failed for {branch}: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _write_task_file(self, repo: str, branch: str, task_file: str, content: str):
|
def _write_task_file(self, repo: str, branch: str, task_file: str, content: str):
|
||||||
"""Write task file directly into the task's worktree.
|
"""Write task file directly into the task's worktree.
|
||||||
|
|
||||||
|
|||||||
@@ -53,6 +53,19 @@ class Settings(BaseSettings):
|
|||||||
breaker_threshold: int = 3
|
breaker_threshold: int = 3
|
||||||
breaker_pause_seconds: int = 300
|
breaker_pause_seconds: int = 300
|
||||||
|
|
||||||
|
# ORCH-7 (M-2): agent timeout + graceful kill.
|
||||||
|
# agent_timeout_seconds -> default per-agent wall-clock budget; the watchdog
|
||||||
|
# kills the run after this (env ORCH_AGENT_TIMEOUT_SECONDS).
|
||||||
|
# agent_kill_grace_seconds-> pause between SIGTERM and SIGKILL so claude can
|
||||||
|
# flush artifacts before the hard kill
|
||||||
|
# (env ORCH_AGENT_KILL_GRACE_SECONDS).
|
||||||
|
# agent_timeout_overrides_json -> optional per-agent override JSON object,
|
||||||
|
# e.g. {"reviewer": 3600, "architect": 2700}
|
||||||
|
# (env ORCH_AGENT_TIMEOUT_OVERRIDES_JSON).
|
||||||
|
agent_timeout_seconds: int = 1800
|
||||||
|
agent_kill_grace_seconds: int = 20
|
||||||
|
agent_timeout_overrides_json: str = ""
|
||||||
|
|
||||||
|
|
||||||
# Telegram notifications
|
# Telegram notifications
|
||||||
telegram_bot_token: str = ""
|
telegram_bot_token: str = ""
|
||||||
|
|||||||
38
src/db.py
38
src/db.py
@@ -67,6 +67,17 @@ def init_db():
|
|||||||
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
|
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
|
||||||
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
|
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
|
||||||
_ensure_column(conn, "jobs", "available_at", "TEXT")
|
_ensure_column(conn, "jobs", "available_at", "TEXT")
|
||||||
|
# ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL
|
||||||
|
# unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows
|
||||||
|
# (which have NULL delivery_id) never collide with each other. Restart-safe:
|
||||||
|
# _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT
|
||||||
|
# EXISTS is a no-op once the index exists, so this is safe on the live prod DB.
|
||||||
|
_ensure_column(conn, "events", "delivery_id", "TEXT")
|
||||||
|
conn.execute(
|
||||||
|
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery "
|
||||||
|
"ON events(delivery_id) WHERE delivery_id IS NOT NULL"
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
@@ -141,6 +152,33 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
|||||||
return f"{prefix}-{next_num:03d}"
|
return f"{prefix}-{next_num:03d}"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# ORCH-5 (M-7): idempotent webhook event logging
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def insert_event_dedup(
|
||||||
|
source: str, event_type: str, payload: str, delivery_id: str
|
||||||
|
) -> bool:
|
||||||
|
"""Idempotently log a webhook event keyed by delivery_id.
|
||||||
|
|
||||||
|
Returns True if a NEW row was inserted (caller should dispatch the event) and
|
||||||
|
False if this delivery_id was already present (a duplicate delivery -> caller
|
||||||
|
must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE
|
||||||
|
index idx_events_delivery; rowcount==1 means the row was actually inserted.
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
try:
|
||||||
|
cur = conn.execute(
|
||||||
|
"INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) "
|
||||||
|
"VALUES (?, ?, ?, ?)",
|
||||||
|
(source, event_type, payload, delivery_id),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
return cur.rowcount == 1
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# ORCH-1 (F-2b): job queue helpers
|
# ORCH-1 (F-2b): job queue helpers
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
425
src/stage_engine.py
Normal file
425
src/stage_engine.py
Normal file
@@ -0,0 +1,425 @@
|
|||||||
|
"""Unified stage engine (ORCH-4 / M-3).
|
||||||
|
|
||||||
|
Single source of truth for "an agent finished / a human approved -> run the
|
||||||
|
stage's quality gate and either advance the pipeline or roll it back".
|
||||||
|
|
||||||
|
Before ORCH-4 this logic was duplicated in two places that had silently
|
||||||
|
diverged:
|
||||||
|
- src/agents/launcher.py::_try_advance_stage (sync, rich business logic:
|
||||||
|
analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester FAIL
|
||||||
|
rollback+retry, architect conflict rollback) — but it picked the next agent
|
||||||
|
with get_agent_for_stage(next_stage), which is WRONG.
|
||||||
|
- src/webhooks/plane.py::_try_advance_stage (async, leaner, but it had the
|
||||||
|
check_review_approved PR-by-branch dispatch and used the CORRECT
|
||||||
|
get_agent_for_stage(current_stage)).
|
||||||
|
|
||||||
|
This module merges both into one sync `advance_stage(...)`. launcher calls it
|
||||||
|
directly; the plane webhook calls it through asyncio.to_thread so there is
|
||||||
|
exactly one implementation.
|
||||||
|
|
||||||
|
Agent-selection bug fix (ORCH-4):
|
||||||
|
stages.py defines `agent` as "the agent to launch when advancing FROM this
|
||||||
|
stage". So when advancing current -> next, the correct agent to launch is
|
||||||
|
get_agent_for_stage(current_stage). launcher's old next_stage lookup skipped a
|
||||||
|
stage (e.g. analysis->architecture launched 'developer' instead of
|
||||||
|
'architect'). plane and gitea already used current_stage; we unify on that.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
|
from .db import get_db, update_task_stage, enqueue_job
|
||||||
|
from .stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||||
|
from .git_worktree import get_worktree_path
|
||||||
|
from .qg.checks import QG_CHECKS
|
||||||
|
from .notifications import (
|
||||||
|
notify_stage_change,
|
||||||
|
notify_qg_failure,
|
||||||
|
notify_approve_requested,
|
||||||
|
send_telegram,
|
||||||
|
)
|
||||||
|
from .plane_sync import (
|
||||||
|
notify_stage_change as plane_notify_stage,
|
||||||
|
notify_qg_failure as plane_notify_qg,
|
||||||
|
add_comment as plane_add_comment,
|
||||||
|
set_issue_in_review,
|
||||||
|
set_issue_needs_input,
|
||||||
|
set_issue_in_progress,
|
||||||
|
set_issue_blocked,
|
||||||
|
)
|
||||||
|
from .config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger("orchestrator.stage_engine")
|
||||||
|
|
||||||
|
MAX_DEVELOPER_RETRIES = 3
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AdvanceResult:
|
||||||
|
"""Outcome of an advance_stage() call (mostly for tests/observability)."""
|
||||||
|
|
||||||
|
advanced: bool = False
|
||||||
|
from_stage: str | None = None
|
||||||
|
to_stage: str | None = None
|
||||||
|
enqueued_agent: str | None = None
|
||||||
|
enqueued_job_id: int | None = None
|
||||||
|
qg_name: str | None = None
|
||||||
|
qg_passed: bool | None = None
|
||||||
|
qg_reason: str | None = None
|
||||||
|
rolled_back_to: str | None = None
|
||||||
|
alerted: bool = False
|
||||||
|
note: str | None = None
|
||||||
|
notes: list = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
def _run_qg(qg_name: str, repo: str, work_item_id: str, branch: str):
|
||||||
|
"""Dispatch a quality-gate check to the right signature and run it.
|
||||||
|
|
||||||
|
Signatures (unified from launcher + plane):
|
||||||
|
- check_ci_green / check_tests_local -> (repo, branch)
|
||||||
|
- check_review_approved -> (repo, pr_number) [PR found by branch]
|
||||||
|
- everything else (artifact checks) -> (repo, work_item_id, branch)
|
||||||
|
|
||||||
|
Returns (passed: bool, reason: str).
|
||||||
|
"""
|
||||||
|
check_fn = QG_CHECKS.get(qg_name)
|
||||||
|
if not check_fn:
|
||||||
|
logger.error(f"QG function '{qg_name}' not found in registry")
|
||||||
|
return False, f"Unknown QG: {qg_name}"
|
||||||
|
|
||||||
|
if qg_name in ("check_ci_green", "check_tests_local"):
|
||||||
|
# (repo, branch) — already worktree-aware.
|
||||||
|
return check_fn(repo, branch)
|
||||||
|
|
||||||
|
if qg_name == "check_review_approved":
|
||||||
|
# Special case kept from plane: find the open PR for this branch via
|
||||||
|
# Gitea, then check it; fall back to a file-based review marker.
|
||||||
|
return _check_review_approved_by_branch(check_fn, repo, work_item_id, branch)
|
||||||
|
|
||||||
|
# All other artifact checks: (repo, work_item_id, branch). Pass branch so the
|
||||||
|
# check reads from the task worktree (ORCH-2 / S-4).
|
||||||
|
return check_fn(repo, work_item_id or "", branch)
|
||||||
|
|
||||||
|
|
||||||
|
def _check_review_approved_by_branch(check_fn, repo: str, work_item_id: str, branch: str):
|
||||||
|
"""check_review_approved dispatch preserved from plane._try_advance_stage.
|
||||||
|
|
||||||
|
Finds the open PR whose head ref == branch via the Gitea API and runs
|
||||||
|
check_review_approved(repo, pr_number). If no open PR exists, falls back to a
|
||||||
|
file-based review marker (12-review.md / 09-review.md) like the original.
|
||||||
|
"""
|
||||||
|
import httpx as _httpx
|
||||||
|
|
||||||
|
owner = settings.gitea_owner
|
||||||
|
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50"
|
||||||
|
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||||
|
try:
|
||||||
|
resp = _httpx.get(url, headers=headers, timeout=10)
|
||||||
|
prs = resp.json()
|
||||||
|
pr_number = None
|
||||||
|
for pr in prs:
|
||||||
|
if pr.get("head", {}).get("ref") == branch:
|
||||||
|
pr_number = pr["number"]
|
||||||
|
break
|
||||||
|
if pr_number:
|
||||||
|
return check_fn(repo, pr_number)
|
||||||
|
# No open PR but a review file may exist — check file-based.
|
||||||
|
wt = get_worktree_path(repo, branch)
|
||||||
|
if not os.path.isdir(wt):
|
||||||
|
wt = os.path.join(settings.repos_dir, repo)
|
||||||
|
review_path = os.path.join(wt, f"docs/work-items/{work_item_id}/12-review.md")
|
||||||
|
review_path2 = os.path.join(wt, f"docs/work-items/{work_item_id}/09-review.md")
|
||||||
|
if os.path.isfile(review_path) or os.path.isfile(review_path2):
|
||||||
|
return True, "Review file exists (file-based approval)"
|
||||||
|
return False, "No open PR found and no review file"
|
||||||
|
except Exception as e:
|
||||||
|
return False, f"Error finding PR: {e}"
|
||||||
|
|
||||||
|
|
||||||
|
def _developer_retry_count(task_id: int) -> int:
|
||||||
|
"""How many developer runs have already happened for this task."""
|
||||||
|
conn = get_db()
|
||||||
|
n = conn.execute(
|
||||||
|
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
||||||
|
(task_id,),
|
||||||
|
).fetchone()[0]
|
||||||
|
conn.close()
|
||||||
|
return n
|
||||||
|
|
||||||
|
|
||||||
|
def advance_stage(
|
||||||
|
task_id: int,
|
||||||
|
current_stage: str,
|
||||||
|
repo: str,
|
||||||
|
work_item_id: str,
|
||||||
|
branch: str,
|
||||||
|
finished_agent: str | None = None,
|
||||||
|
) -> AdvanceResult:
|
||||||
|
"""Run the current stage's quality gate and advance / roll back the pipeline.
|
||||||
|
|
||||||
|
This is the single merged implementation (ORCH-4 / M-3). It is synchronous;
|
||||||
|
the async plane webhook calls it via asyncio.to_thread.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: tasks.id
|
||||||
|
current_stage: the stage the task is currently in
|
||||||
|
repo: repository name
|
||||||
|
work_item_id: Plane work item id (may be "" / None)
|
||||||
|
branch: feature branch
|
||||||
|
finished_agent: the agent that just finished (launcher path). Drives the
|
||||||
|
approved/REQUEST_CHANGES/tester/architect branches. In the
|
||||||
|
plane webhook path it is None, so those agent-specific
|
||||||
|
branches simply do not trigger (matches old plane behavior).
|
||||||
|
|
||||||
|
Returns AdvanceResult describing what happened.
|
||||||
|
"""
|
||||||
|
result = AdvanceResult(from_stage=current_stage)
|
||||||
|
agent = finished_agent
|
||||||
|
try:
|
||||||
|
qg_name = get_qg_for_stage(current_stage)
|
||||||
|
next_stage = get_next_stage(current_stage)
|
||||||
|
result.qg_name = qg_name
|
||||||
|
result.to_stage = next_stage
|
||||||
|
|
||||||
|
if not next_stage:
|
||||||
|
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
|
||||||
|
result.note = "terminal"
|
||||||
|
return result
|
||||||
|
|
||||||
|
# --- Quality gate ----------------------------------------------------
|
||||||
|
if qg_name and qg_name in QG_CHECKS:
|
||||||
|
# Human-approval gate: special analyst approved-flow (launcher only).
|
||||||
|
if qg_name == "check_analysis_approved":
|
||||||
|
_handle_analysis_approved_flow(
|
||||||
|
task_id, current_stage, repo, work_item_id, branch, agent, result
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
|
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
|
||||||
|
result.qg_passed = passed
|
||||||
|
result.qg_reason = reason
|
||||||
|
|
||||||
|
if not passed:
|
||||||
|
logger.info(
|
||||||
|
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
|
||||||
|
)
|
||||||
|
# Behaviour parity:
|
||||||
|
# - webhook path (finished_agent is None): emit the generic
|
||||||
|
# QG-failure notification, exactly like the old plane handler.
|
||||||
|
# - launcher path (finished_agent set): NO generic notification;
|
||||||
|
# the rollback branches below own their own messaging, exactly
|
||||||
|
# like the old launcher handler.
|
||||||
|
if agent is None:
|
||||||
|
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
||||||
|
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
||||||
|
|
||||||
|
_handle_qg_failure_rollbacks(
|
||||||
|
task_id, current_stage, repo, work_item_id, branch,
|
||||||
|
agent, qg_name, reason, result,
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
|
elif qg_name:
|
||||||
|
# QG name set but not registered — do not advance (launcher behavior).
|
||||||
|
result.note = f"qg '{qg_name}' not in registry"
|
||||||
|
return result
|
||||||
|
|
||||||
|
# --- Advance ---------------------------------------------------------
|
||||||
|
update_task_stage(task_id, next_stage)
|
||||||
|
notify_stage_change(task_id, current_stage, next_stage)
|
||||||
|
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||||
|
result.advanced = True
|
||||||
|
logger.info(
|
||||||
|
f"Task {task_id}: {current_stage} -> {next_stage} "
|
||||||
|
f"(auto-advance after {agent})"
|
||||||
|
)
|
||||||
|
|
||||||
|
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
|
||||||
|
next_agent = get_agent_for_stage(current_stage)
|
||||||
|
if next_agent:
|
||||||
|
task_desc = (
|
||||||
|
f"Work item: {work_item_id}\nRepo: {repo}\n"
|
||||||
|
f"Branch: {branch}\nStage: {next_stage}"
|
||||||
|
)
|
||||||
|
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
|
||||||
|
result.enqueued_agent = next_agent
|
||||||
|
result.enqueued_job_id = new_job_id
|
||||||
|
logger.info(
|
||||||
|
f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})"
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"advance_stage failed for task_id={task_id}: {e}")
|
||||||
|
result.note = f"error: {e}"
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_analysis_approved_flow(
|
||||||
|
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
|
||||||
|
):
|
||||||
|
"""Analyst approved-flow (launcher only).
|
||||||
|
|
||||||
|
Only triggers when the analyst just finished (agent == 'analyst') in the
|
||||||
|
launcher path. Decides between: artifacts ready -> In Review + request
|
||||||
|
:approved:; questions file -> Needs Input; otherwise a warning comment.
|
||||||
|
This gate never advances on its own (human approval does that via the plane
|
||||||
|
webhook), matching the original launcher behavior.
|
||||||
|
"""
|
||||||
|
result.qg_name = "check_analysis_approved"
|
||||||
|
result.note = "analysis-approval-gate"
|
||||||
|
if not (agent == "analyst" and work_item_id):
|
||||||
|
return
|
||||||
|
|
||||||
|
files_check = QG_CHECKS.get("check_analysis_complete")
|
||||||
|
if not files_check:
|
||||||
|
return
|
||||||
|
|
||||||
|
files_ok, _ = files_check(repo, work_item_id, branch)
|
||||||
|
if files_ok:
|
||||||
|
# Full artifacts ready -> In Review, ask for :approved:.
|
||||||
|
set_issue_in_review(work_item_id)
|
||||||
|
plane_add_comment(
|
||||||
|
work_item_id,
|
||||||
|
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||||
|
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: "
|
||||||
|
"\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.",
|
||||||
|
)
|
||||||
|
notify_approve_requested(task_id)
|
||||||
|
result.note = "analysis-in-review"
|
||||||
|
logger.info(
|
||||||
|
f"Task {task_id}: analyst finished, requested :approved: in Plane"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
questions_path = os.path.join(
|
||||||
|
get_worktree_path(repo, branch),
|
||||||
|
f"docs/work-items/{work_item_id}/01-questions.md",
|
||||||
|
)
|
||||||
|
if os.path.isfile(questions_path):
|
||||||
|
set_issue_needs_input(work_item_id)
|
||||||
|
with open(questions_path, "r") as qf:
|
||||||
|
questions_text = qf.read()
|
||||||
|
plane_add_comment(
|
||||||
|
work_item_id,
|
||||||
|
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}",
|
||||||
|
)
|
||||||
|
send_telegram(
|
||||||
|
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
|
||||||
|
)
|
||||||
|
result.note = "analysis-needs-input"
|
||||||
|
return
|
||||||
|
|
||||||
|
# No artifacts and no questions.
|
||||||
|
plane_add_comment(
|
||||||
|
work_item_id,
|
||||||
|
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433.",
|
||||||
|
)
|
||||||
|
result.note = "analysis-empty"
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_qg_failure_rollbacks(
|
||||||
|
task_id, current_stage, repo, work_item_id, branch,
|
||||||
|
agent, qg_name, reason, result: AdvanceResult,
|
||||||
|
):
|
||||||
|
"""All rollback/retry branches from the original launcher, preserved verbatim.
|
||||||
|
|
||||||
|
Only fire on the launcher path (finished_agent is set). The webhook path
|
||||||
|
passes finished_agent=None, so none of these agent-specific branches trigger
|
||||||
|
— that matches the old plane behavior (it just reported the QG failure).
|
||||||
|
"""
|
||||||
|
# Reviewer REQUEST_CHANGES -> rollback to development + retry (max 3).
|
||||||
|
if agent == "reviewer" and "REQUEST_CHANGES" in (reason or ""):
|
||||||
|
update_task_stage(task_id, "development")
|
||||||
|
notify_stage_change(task_id, current_stage, "development")
|
||||||
|
plane_notify_stage(work_item_id, current_stage, "development")
|
||||||
|
result.rolled_back_to = "development"
|
||||||
|
retry_count = _developer_retry_count(task_id)
|
||||||
|
if retry_count < MAX_DEVELOPER_RETRIES:
|
||||||
|
task_desc = (
|
||||||
|
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||||
|
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
|
||||||
|
f"(attempt {retry_count+1}/3). Fix findings in "
|
||||||
|
f"docs/work-items/{work_item_id}/12-review.md"
|
||||||
|
)
|
||||||
|
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||||
|
result.enqueued_agent = "developer"
|
||||||
|
result.enqueued_job_id = new_job
|
||||||
|
logger.info(
|
||||||
|
f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer "
|
||||||
|
f"(job_id={new_job})"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
send_telegram(
|
||||||
|
f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. "
|
||||||
|
f"Manual intervention needed."
|
||||||
|
)
|
||||||
|
result.alerted = True
|
||||||
|
logger.error(f"Task {task_id}: max retries reached")
|
||||||
|
|
||||||
|
# Tester check_tests_passed FAIL -> rollback to development + retry (max 3).
|
||||||
|
if agent == "tester" and qg_name == "check_tests_passed":
|
||||||
|
update_task_stage(task_id, "development")
|
||||||
|
notify_stage_change(task_id, current_stage, "development")
|
||||||
|
plane_notify_stage(work_item_id, current_stage, "development")
|
||||||
|
result.rolled_back_to = "development"
|
||||||
|
set_issue_in_progress(work_item_id)
|
||||||
|
plane_add_comment(
|
||||||
|
work_item_id,
|
||||||
|
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. "
|
||||||
|
f"Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
|
||||||
|
)
|
||||||
|
retry_count = _developer_retry_count(task_id)
|
||||||
|
if retry_count < MAX_DEVELOPER_RETRIES:
|
||||||
|
task_desc = (
|
||||||
|
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||||
|
f"Stage: development\nNote: Tests FAILED. "
|
||||||
|
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
|
||||||
|
)
|
||||||
|
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||||
|
result.enqueued_agent = "developer"
|
||||||
|
result.enqueued_job_id = new_job
|
||||||
|
logger.info(
|
||||||
|
f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
set_issue_blocked(work_item_id)
|
||||||
|
send_telegram(
|
||||||
|
f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer "
|
||||||
|
f"retries. Manual intervention needed."
|
||||||
|
)
|
||||||
|
result.alerted = True
|
||||||
|
|
||||||
|
# Architect conflict (10-conflict.md exists) -> rollback to analysis.
|
||||||
|
if agent == "architect" and qg_name == "check_architecture_done":
|
||||||
|
conflict_path = os.path.join(
|
||||||
|
get_worktree_path(repo, branch),
|
||||||
|
f"docs/work-items/{work_item_id}/10-conflict.md",
|
||||||
|
)
|
||||||
|
if os.path.isfile(conflict_path):
|
||||||
|
update_task_stage(task_id, "analysis")
|
||||||
|
notify_stage_change(task_id, current_stage, "analysis")
|
||||||
|
plane_notify_stage(work_item_id, current_stage, "analysis")
|
||||||
|
result.rolled_back_to = "analysis"
|
||||||
|
set_issue_in_progress(work_item_id)
|
||||||
|
with open(conflict_path, "r") as cf:
|
||||||
|
conflict_text = cf.read()[:500]
|
||||||
|
plane_add_comment(
|
||||||
|
work_item_id,
|
||||||
|
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. "
|
||||||
|
f"\u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}",
|
||||||
|
)
|
||||||
|
task_desc = (
|
||||||
|
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||||
|
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
|
||||||
|
f"See docs/work-items/{work_item_id}/10-conflict.md"
|
||||||
|
)
|
||||||
|
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||||
|
result.enqueued_agent = "analyst"
|
||||||
|
result.enqueued_job_id = new_job
|
||||||
|
logger.info(
|
||||||
|
f"Task {task_id}: architect conflict, enqueued analyst "
|
||||||
|
f"(job_id={new_job})"
|
||||||
|
)
|
||||||
52
src/webhooks/_dedup.py
Normal file
52
src/webhooks/_dedup.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
"""ORCH-5 (M-7): webhook delivery de-duplication helper.
|
||||||
|
|
||||||
|
Webhook providers (Gitea/Plane) retry deliveries on timeout, network reset, or
|
||||||
|
manual replay. Without idempotency a retried delivery re-enters the pipeline and
|
||||||
|
spawns a duplicate run (the ET-009 incident class: parallel conveyors on one
|
||||||
|
repo). This module computes a stable per-delivery id so the webhook handlers can
|
||||||
|
INSERT-OR-IGNORE into events and skip the dispatch on a repeat.
|
||||||
|
|
||||||
|
delivery_id format: ``f"{source}:{raw_or_hash}"`` where source prefixes
|
||||||
|
gitea/plane so their id-spaces never collide. ``raw`` is the provider's native
|
||||||
|
delivery header (a GUID) when present; otherwise we fall back to a sha256 of the
|
||||||
|
body (a retried identical body yields the same hash).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
|
||||||
|
def _sha256_hex(*parts: str) -> str:
|
||||||
|
h = hashlib.sha256()
|
||||||
|
for p in parts:
|
||||||
|
h.update(p.encode("utf-8", "replace"))
|
||||||
|
return h.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def gitea_delivery_id(headers, event_type: str, body: bytes) -> str:
|
||||||
|
"""Compute the delivery_id for a Gitea webhook.
|
||||||
|
|
||||||
|
Prefers the ``X-Gitea-Delivery`` header (a per-delivery GUID). Falls back to
|
||||||
|
sha256(source + event_type + body) so a retried identical body still maps to
|
||||||
|
one id even if Gitea omitted the header.
|
||||||
|
"""
|
||||||
|
raw = (headers.get("X-Gitea-Delivery") or "").strip()
|
||||||
|
if not raw:
|
||||||
|
raw = _sha256_hex("gitea", event_type or "", body.decode("utf-8", "replace"))
|
||||||
|
return f"gitea:{raw}"
|
||||||
|
|
||||||
|
|
||||||
|
def plane_delivery_id(headers, body: bytes) -> str:
|
||||||
|
"""Compute the delivery_id for a Plane webhook.
|
||||||
|
|
||||||
|
Plane does not reliably send a delivery header, so we try a couple of common
|
||||||
|
names and otherwise fall back to sha256("plane" + body): a retried identical
|
||||||
|
body yields the same id.
|
||||||
|
"""
|
||||||
|
raw = (
|
||||||
|
headers.get("X-Plane-Delivery")
|
||||||
|
or headers.get("X-Hook-Delivery")
|
||||||
|
or ""
|
||||||
|
).strip()
|
||||||
|
if not raw:
|
||||||
|
raw = _sha256_hex("plane", body.decode("utf-8", "replace"))
|
||||||
|
return f"plane:{raw}"
|
||||||
@@ -10,7 +10,14 @@ import httpx
|
|||||||
from fastapi import APIRouter, Request, HTTPException
|
from fastapi import APIRouter, Request, HTTPException
|
||||||
|
|
||||||
from ..config import settings
|
from ..config import settings
|
||||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
from ..db import (
|
||||||
|
get_db,
|
||||||
|
get_task_by_repo_branch,
|
||||||
|
update_task_stage,
|
||||||
|
enqueue_job,
|
||||||
|
insert_event_dedup,
|
||||||
|
)
|
||||||
|
from ._dedup import gitea_delivery_id
|
||||||
from ..stages import get_next_stage, get_agent_for_stage
|
from ..stages import get_next_stage, get_agent_for_stage
|
||||||
from ..qg.checks import check_ci_green, check_review_approved
|
from ..qg.checks import check_ci_green, check_review_approved
|
||||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||||||
@@ -51,15 +58,17 @@ async def gitea_webhook(request: Request):
|
|||||||
|
|
||||||
payload = json.loads(body)
|
payload = json.loads(body)
|
||||||
|
|
||||||
# Log event
|
# ORCH-5 (M-7): idempotent logging. Compute a stable delivery_id (X-Gitea-Delivery
|
||||||
conn = get_db()
|
# GUID, or sha256 fallback) and INSERT OR IGNORE. A repeated delivery (Gitea retry
|
||||||
|
# / manual replay) returns inserted=False -> log + return {"status":"duplicate"}
|
||||||
|
# WITHOUT re-dispatching, so the pipeline is not re-triggered (ET-009 class).
|
||||||
|
# Runs AFTER HMAC verification above.
|
||||||
event_type = request.headers.get("X-Gitea-Event", "unknown")
|
event_type = request.headers.get("X-Gitea-Event", "unknown")
|
||||||
conn.execute(
|
delivery_id = gitea_delivery_id(request.headers, event_type, body)
|
||||||
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
|
inserted = insert_event_dedup("gitea", event_type, body.decode(), delivery_id)
|
||||||
("gitea", event_type, body.decode()),
|
if not inserted:
|
||||||
)
|
logger.info(f"Gitea webhook duplicate delivery_id={delivery_id}, skipping dispatch")
|
||||||
conn.commit()
|
return {"status": "duplicate"}
|
||||||
conn.close()
|
|
||||||
|
|
||||||
if event_type == "push":
|
if event_type == "push":
|
||||||
await handle_push(payload)
|
await handle_push(payload)
|
||||||
|
|||||||
@@ -15,7 +15,9 @@ from ..db import (
|
|||||||
get_next_work_item_id,
|
get_next_work_item_id,
|
||||||
update_task_stage,
|
update_task_stage,
|
||||||
enqueue_job,
|
enqueue_job,
|
||||||
|
insert_event_dedup,
|
||||||
)
|
)
|
||||||
|
from ._dedup import plane_delivery_id
|
||||||
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
||||||
from ..qg.checks import QG_CHECKS
|
from ..qg.checks import QG_CHECKS
|
||||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||||||
@@ -61,14 +63,18 @@ async def plane_webhook(request: Request):
|
|||||||
|
|
||||||
payload = json.loads(body)
|
payload = json.loads(body)
|
||||||
|
|
||||||
# Log event
|
# ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the
|
||||||
conn = get_db()
|
# delivery_id falls back to sha256("plane" + body) (a retried identical body maps
|
||||||
conn.execute(
|
# to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return
|
||||||
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
|
# {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6
|
||||||
("plane", payload.get("event", "unknown"), body.decode()),
|
# project filter, so a repeat does no extra work; the FIRST delivery of an unknown
|
||||||
)
|
# project still falls through to the filter below and returns {"status":"ignored"}.
|
||||||
conn.commit()
|
event_type = payload.get("event", "unknown")
|
||||||
conn.close()
|
delivery_id = plane_delivery_id(request.headers, body)
|
||||||
|
inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id)
|
||||||
|
if not inserted:
|
||||||
|
logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch")
|
||||||
|
return {"status": "duplicate"}
|
||||||
|
|
||||||
event = payload.get("event")
|
event = payload.get("event")
|
||||||
action = payload.get("action", "")
|
action = payload.get("action", "")
|
||||||
@@ -318,81 +324,30 @@ async def handle_comment(data: dict, project_id: str = ""):
|
|||||||
async def _try_advance_stage(
|
async def _try_advance_stage(
|
||||||
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str
|
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str
|
||||||
):
|
):
|
||||||
"""Run QG check for current stage and advance if passed."""
|
"""Thin async wrapper over the unified stage engine (ORCH-4 / M-3).
|
||||||
qg_name = get_qg_for_stage(current_stage)
|
|
||||||
next_stage = get_next_stage(current_stage)
|
|
||||||
|
|
||||||
if not next_stage:
|
The QG dispatch (including the check_review_approved PR-by-branch logic) and
|
||||||
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
|
the advance/launch logic now live in src/stage_engine.advance_stage(), which
|
||||||
return
|
is synchronous. We run it off the event loop via asyncio.to_thread so there
|
||||||
|
is exactly one implementation shared with the launcher.
|
||||||
|
|
||||||
# Run QG check if one is required
|
finished_agent is None on this webhook path (a human :approved: comment, not
|
||||||
if qg_name:
|
a finished agent), so the agent-specific rollback branches inside the engine
|
||||||
qg_func = QG_CHECKS.get(qg_name)
|
intentionally do not trigger — identical to the old plane behavior, which
|
||||||
if not qg_func:
|
only ran the QG and either advanced or reported the failure.
|
||||||
logger.error(f"QG function '{qg_name}' not found in registry")
|
"""
|
||||||
return
|
import asyncio
|
||||||
|
from ..stage_engine import advance_stage
|
||||||
|
|
||||||
# Determine args based on QG function
|
await asyncio.to_thread(
|
||||||
if qg_name in ("check_analysis_approved", "check_analysis_complete", "check_architecture_done", "check_tests_passed", "check_reviewer_verdict"):
|
advance_stage,
|
||||||
# ORCH-2 / S-4: pass branch so artifacts are read from the task worktree.
|
task_id,
|
||||||
passed, reason = qg_func(repo, work_item_id, branch)
|
current_stage,
|
||||||
elif qg_name in ("check_ci_green", "check_tests_local"):
|
repo,
|
||||||
passed, reason = qg_func(repo, branch)
|
work_item_id,
|
||||||
elif qg_name == "check_review_approved":
|
branch,
|
||||||
# Find PR number by branch via Gitea API
|
None,
|
||||||
import httpx as _httpx
|
)
|
||||||
from ..config import settings as _s
|
|
||||||
_owner = _s.gitea_owner
|
|
||||||
_url = f"{_s.gitea_url}/api/v1/repos/{_owner}/{repo}/pulls?state=open&limit=50"
|
|
||||||
_headers = {"Authorization": f"token {_s.gitea_token}"}
|
|
||||||
try:
|
|
||||||
_resp = _httpx.get(_url, headers=_headers, timeout=10)
|
|
||||||
_prs = _resp.json()
|
|
||||||
_pr_number = None
|
|
||||||
for _pr in _prs:
|
|
||||||
if _pr.get("head", {}).get("ref") == branch:
|
|
||||||
_pr_number = _pr["number"]
|
|
||||||
break
|
|
||||||
if _pr_number:
|
|
||||||
passed, reason = qg_func(repo, _pr_number)
|
|
||||||
else:
|
|
||||||
# No open PR but review file exists — check file-based
|
|
||||||
import os
|
|
||||||
from ..git_worktree import get_worktree_path as _gwp
|
|
||||||
_wt = _gwp(repo, branch) if os.path.isdir(_gwp(repo, branch)) else os.path.join(_s.repos_dir, repo)
|
|
||||||
_review_path = os.path.join(_wt, f"docs/work-items/{work_item_id}/12-review.md")
|
|
||||||
_review_path2 = os.path.join(_wt, f"docs/work-items/{work_item_id}/09-review.md")
|
|
||||||
if os.path.isfile(_review_path) or os.path.isfile(_review_path2):
|
|
||||||
passed, reason = True, "Review file exists (file-based approval)"
|
|
||||||
else:
|
|
||||||
passed, reason = False, "No open PR found and no review file"
|
|
||||||
except Exception as _e:
|
|
||||||
passed, reason = False, f"Error finding PR: {_e}"
|
|
||||||
else:
|
|
||||||
passed, reason = False, f"Unknown QG: {qg_name}"
|
|
||||||
|
|
||||||
if not passed:
|
|
||||||
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
|
||||||
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Advance stage
|
|
||||||
update_task_stage(task_id, next_stage)
|
|
||||||
notify_stage_change(task_id, current_stage, next_stage)
|
|
||||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
|
||||||
|
|
||||||
# Launch agent associated with the current stage's transition
|
|
||||||
agent = get_agent_for_stage(current_stage)
|
|
||||||
if agent:
|
|
||||||
try:
|
|
||||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
|
||||||
job_id = enqueue_job(agent, repo, task_desc, task_id=task_id)
|
|
||||||
plane_notify_stage(work_item_id, current_stage, next_stage, agent)
|
|
||||||
logger.info(f"Task {task_id}: enqueued agent '{agent}', job_id={job_id}")
|
|
||||||
except Exception as e:
|
|
||||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
|
||||||
logger.error(f"Agent launch failed: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
async def _create_gitea_branch(repo: str, branch: str):
|
async def _create_gitea_branch(repo: str, branch: str):
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ Covers the audit-2026-06-02 fixes:
|
|||||||
the YAML frontmatter only (no fragile substring matching).
|
the YAML frontmatter only (no fragile substring matching).
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
|
import signal
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@@ -20,6 +21,7 @@ os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
|||||||
|
|
||||||
from src.agents.launcher import AgentLauncher
|
from src.agents.launcher import AgentLauncher
|
||||||
from src.qg.checks import check_reviewer_verdict
|
from src.qg.checks import check_reviewer_verdict
|
||||||
|
from src.config import settings
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -138,3 +140,141 @@ class TestCheckReviewerVerdict:
|
|||||||
passed, reason = check_reviewer_verdict("enduro-trails", "ET-999")
|
passed, reason = check_reviewer_verdict("enduro-trails", "ET-999")
|
||||||
assert passed is False
|
assert passed is False
|
||||||
assert "not found" in reason.lower()
|
assert "not found" in reason.lower()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# ORCH-7 (M-4): dead code removed
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestDeadCodeRemoved:
|
||||||
|
"""M-4: _auto_merge_pr was never called (merge is the deployer's job) and is
|
||||||
|
removed. _ensure_pr (used by the auto-advance path) must stay."""
|
||||||
|
|
||||||
|
def test_auto_merge_pr_is_gone(self):
|
||||||
|
assert not hasattr(AgentLauncher, "_auto_merge_pr")
|
||||||
|
|
||||||
|
def test_ensure_pr_still_present(self):
|
||||||
|
assert hasattr(AgentLauncher, "_ensure_pr")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# ORCH-7 (M-2): configurable timeout + per-agent override
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestResolveTimeout:
|
||||||
|
"""M-2: _resolve_timeout honours a per-agent JSON override, else the default."""
|
||||||
|
|
||||||
|
def test_default_when_no_override(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||||
|
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "")
|
||||||
|
assert AgentLauncher._resolve_timeout("developer") == 1800
|
||||||
|
assert AgentLauncher._resolve_timeout(None) == 1800
|
||||||
|
|
||||||
|
def test_override_for_specific_agent(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
settings, "agent_timeout_overrides_json", '{"reviewer": 3600, "architect": 2700}'
|
||||||
|
)
|
||||||
|
assert AgentLauncher._resolve_timeout("reviewer") == 3600
|
||||||
|
assert AgentLauncher._resolve_timeout("architect") == 2700
|
||||||
|
# an agent not in the override map falls back to the default
|
||||||
|
assert AgentLauncher._resolve_timeout("developer") == 1800
|
||||||
|
|
||||||
|
def test_malformed_override_falls_back_to_default(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||||
|
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "{not-json")
|
||||||
|
# must not raise, must return the default
|
||||||
|
assert AgentLauncher._resolve_timeout("reviewer") == 1800
|
||||||
|
|
||||||
|
|
||||||
|
class TestWatchdogGracefulKill:
|
||||||
|
"""M-2: SIGTERM -> grace -> SIGKILL ordering, with graceful-exit short-circuit
|
||||||
|
and ProcessLookupError tolerance. The OS process is fully faked: we record the
|
||||||
|
signals sent and decide liveness from a script, so no real process is touched."""
|
||||||
|
|
||||||
|
def _patch_db(self, monkeypatch):
|
||||||
|
"""Stub get_db so _record_kill does not need a real DB."""
|
||||||
|
class _Conn:
|
||||||
|
def execute(self, *a, **k):
|
||||||
|
return self
|
||||||
|
def commit(self):
|
||||||
|
pass
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
monkeypatch.setattr("src.agents.launcher.get_db", lambda: _Conn())
|
||||||
|
|
||||||
|
def test_sigterm_then_sigkill_after_grace(self, monkeypatch):
|
||||||
|
"""Process stays alive through the whole grace window -> SIGTERM then SIGKILL."""
|
||||||
|
self._patch_db(monkeypatch)
|
||||||
|
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 1)
|
||||||
|
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||||
|
|
||||||
|
sent = []
|
||||||
|
|
||||||
|
def fake_kill(pid, sig):
|
||||||
|
sent.append(sig)
|
||||||
|
# signal 0 (liveness probe) -> always alive; never raise
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||||
|
|
||||||
|
launcher = AgentLauncher()
|
||||||
|
launcher._watchdog(pid=4242, run_id=1, timeout=0, agent="developer")
|
||||||
|
|
||||||
|
assert signal.SIGTERM in sent
|
||||||
|
assert signal.SIGKILL in sent
|
||||||
|
# SIGTERM must come before SIGKILL
|
||||||
|
assert sent.index(signal.SIGTERM) < sent.index(signal.SIGKILL)
|
||||||
|
|
||||||
|
def test_graceful_exit_in_grace_skips_sigkill(self, monkeypatch):
|
||||||
|
"""Process dies during the grace window -> SIGKILL is NOT sent."""
|
||||||
|
self._patch_db(monkeypatch)
|
||||||
|
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 5)
|
||||||
|
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||||
|
|
||||||
|
sent = []
|
||||||
|
state = {"alive": True, "probes": 0}
|
||||||
|
|
||||||
|
def fake_kill(pid, sig):
|
||||||
|
if sig == 0:
|
||||||
|
state["probes"] += 1
|
||||||
|
# die on the 2nd liveness probe (within grace)
|
||||||
|
if state["probes"] >= 2:
|
||||||
|
raise ProcessLookupError
|
||||||
|
return None
|
||||||
|
sent.append(sig)
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||||
|
|
||||||
|
launcher = AgentLauncher()
|
||||||
|
launcher._watchdog(pid=4242, run_id=2, timeout=0, agent="developer")
|
||||||
|
|
||||||
|
assert signal.SIGTERM in sent
|
||||||
|
assert signal.SIGKILL not in sent
|
||||||
|
|
||||||
|
def test_already_dead_before_sigterm(self, monkeypatch):
|
||||||
|
"""Process already gone at SIGTERM -> ProcessLookupError tolerated, no SIGKILL,
|
||||||
|
and _record_kill is NOT called (the monitor's proc.wait owns the exit)."""
|
||||||
|
self._patch_db(monkeypatch)
|
||||||
|
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||||
|
|
||||||
|
sent = []
|
||||||
|
|
||||||
|
def fake_kill(pid, sig):
|
||||||
|
if sig == signal.SIGTERM:
|
||||||
|
raise ProcessLookupError
|
||||||
|
sent.append(sig)
|
||||||
|
return None
|
||||||
|
|
||||||
|
recorded = {"called": False}
|
||||||
|
monkeypatch.setattr(
|
||||||
|
AgentLauncher, "_record_kill",
|
||||||
|
staticmethod(lambda rid: recorded.__setitem__("called", True)),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||||
|
|
||||||
|
launcher = AgentLauncher()
|
||||||
|
# must not raise
|
||||||
|
launcher._watchdog(pid=4242, run_id=3, timeout=0, agent="developer")
|
||||||
|
|
||||||
|
assert signal.SIGKILL not in sent
|
||||||
|
assert recorded["called"] is False
|
||||||
|
|||||||
395
tests/test_stage_engine.py
Normal file
395
tests/test_stage_engine.py
Normal file
@@ -0,0 +1,395 @@
|
|||||||
|
"""ORCH-4 / M-3: tests for the unified stage engine (src/stage_engine.advance_stage).
|
||||||
|
|
||||||
|
These verify the MERGED behavior of what used to be two diverged
|
||||||
|
_try_advance_stage implementations (launcher sync + plane async):
|
||||||
|
|
||||||
|
* happy-path advance for every stage launches the CORRECT agent
|
||||||
|
(the ORCH-4 fix: agent = get_agent_for_stage(current_stage), NOT next_stage);
|
||||||
|
* a QG failure does not advance;
|
||||||
|
* reviewer REQUEST_CHANGES -> rollback to development + enqueue developer;
|
||||||
|
* developer retries > 3 -> telegram alert, no further enqueue;
|
||||||
|
* tester FAIL -> rollback to development + enqueue developer;
|
||||||
|
* architect conflict (10-conflict.md) -> rollback to analysis + enqueue analyst;
|
||||||
|
* launcher AND plane both delegate to the engine.
|
||||||
|
|
||||||
|
Network/Plane/Telegram side effects are mocked at the src.stage_engine level so
|
||||||
|
the engine runs against a real isolated sqlite DB.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Isolated test DB (same convention as the other suites).
|
||||||
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_stage_engine.db")
|
||||||
|
os.environ["ORCH_DB_PATH"] = _test_db
|
||||||
|
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||||
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||||
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||||
|
|
||||||
|
from unittest.mock import MagicMock, patch # noqa: E402
|
||||||
|
|
||||||
|
import src.db as _db # noqa: E402
|
||||||
|
from src.db import init_db, get_db # noqa: E402
|
||||||
|
from src import stage_engine # noqa: E402
|
||||||
|
from src.stage_engine import advance_stage # noqa: E402
|
||||||
|
from src.stages import get_agent_for_stage # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Fixtures
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def fresh_db(monkeypatch):
|
||||||
|
"""Fresh isolated DB per test."""
|
||||||
|
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
init_db()
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def silence_side_effects(monkeypatch):
|
||||||
|
"""Mock all Plane/Telegram/notification side effects in the engine.
|
||||||
|
|
||||||
|
Everything imported into src.stage_engine that touches the network or sends
|
||||||
|
a message becomes a no-op MagicMock so tests are deterministic and offline.
|
||||||
|
"""
|
||||||
|
for name in (
|
||||||
|
"notify_stage_change",
|
||||||
|
"notify_qg_failure",
|
||||||
|
"notify_approve_requested",
|
||||||
|
"send_telegram",
|
||||||
|
"plane_notify_stage",
|
||||||
|
"plane_notify_qg",
|
||||||
|
"plane_add_comment",
|
||||||
|
"set_issue_in_review",
|
||||||
|
"set_issue_needs_input",
|
||||||
|
"set_issue_in_progress",
|
||||||
|
"set_issue_blocked",
|
||||||
|
):
|
||||||
|
monkeypatch.setattr(stage_engine, name, MagicMock())
|
||||||
|
|
||||||
|
|
||||||
|
def _make_task(stage, repo="enduro-trails", branch="feature/ET-001-x", wi="ET-001"):
|
||||||
|
conn = get_db()
|
||||||
|
cur = conn.execute(
|
||||||
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?)",
|
||||||
|
(f"plane-{wi}", wi, repo, branch, stage),
|
||||||
|
)
|
||||||
|
task_id = cur.lastrowid
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
return task_id
|
||||||
|
|
||||||
|
|
||||||
|
def _stage(task_id):
|
||||||
|
conn = get_db()
|
||||||
|
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||||
|
conn.close()
|
||||||
|
return row[0]
|
||||||
|
|
||||||
|
|
||||||
|
def _jobs():
|
||||||
|
conn = get_db()
|
||||||
|
rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall()
|
||||||
|
conn.close()
|
||||||
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
|
|
||||||
|
def _add_developer_runs(task_id, n):
|
||||||
|
conn = get_db()
|
||||||
|
for _ in range(n):
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO agent_runs (task_id, agent) VALUES (?, 'developer')",
|
||||||
|
(task_id,),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _pass(*a, **k):
|
||||||
|
return (True, "ok")
|
||||||
|
|
||||||
|
|
||||||
|
def _fail(reason):
|
||||||
|
def _f(*a, **k):
|
||||||
|
return (False, reason)
|
||||||
|
return _f
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Happy path: each stage advances and launches the CORRECT agent (ORCH-4 fix)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestHappyPathAgentSelection:
|
||||||
|
"""The fixed agent-selection: when advancing FROM current_stage, the engine
|
||||||
|
must enqueue get_agent_for_stage(current_stage), NOT next_stage.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"current_stage,expected_next,expected_agent",
|
||||||
|
[
|
||||||
|
("architecture", "development", "developer"),
|
||||||
|
("development", "review", "reviewer"),
|
||||||
|
("review", "testing", "tester"),
|
||||||
|
("testing", "deploy", "deployer"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_advance_launches_current_stage_agent(
|
||||||
|
self, monkeypatch, current_stage, expected_next, expected_agent
|
||||||
|
):
|
||||||
|
# All QG checks pass for this happy-path suite.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||||
|
)
|
||||||
|
task_id = _make_task(current_stage)
|
||||||
|
|
||||||
|
res = advance_stage(
|
||||||
|
task_id, current_stage, "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert res.advanced is True
|
||||||
|
assert res.to_stage == expected_next
|
||||||
|
assert _stage(task_id) == expected_next
|
||||||
|
# The ORCH-4 fix: correct agent == get_agent_for_stage(current_stage).
|
||||||
|
assert expected_agent == get_agent_for_stage(current_stage)
|
||||||
|
assert res.enqueued_agent == expected_agent
|
||||||
|
jobs = _jobs()
|
||||||
|
assert len(jobs) == 1
|
||||||
|
assert jobs[0]["agent"] == expected_agent
|
||||||
|
|
||||||
|
def test_deploy_to_done_no_agent(self, monkeypatch):
|
||||||
|
"""deploy -> done advances but launches no agent (terminal-ish)."""
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||||
|
)
|
||||||
|
task_id = _make_task("deploy")
|
||||||
|
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent=None)
|
||||||
|
assert res.advanced is True
|
||||||
|
assert _stage(task_id) == "done"
|
||||||
|
assert res.enqueued_agent is None
|
||||||
|
assert _jobs() == []
|
||||||
|
|
||||||
|
def test_done_is_terminal(self):
|
||||||
|
task_id = _make_task("done")
|
||||||
|
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent=None)
|
||||||
|
assert res.advanced is False
|
||||||
|
assert _stage(task_id) == "done"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# QG failure: do not advance
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestQgFailureDoesNotAdvance:
|
||||||
|
def test_qg_fail_keeps_stage(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
|
||||||
|
)
|
||||||
|
task_id = _make_task("architecture")
|
||||||
|
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent="architect")
|
||||||
|
assert res.advanced is False
|
||||||
|
assert res.qg_passed is False
|
||||||
|
assert _stage(task_id) == "architecture"
|
||||||
|
assert _jobs() == []
|
||||||
|
|
||||||
|
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
|
||||||
|
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
|
||||||
|
)
|
||||||
|
task_id = _make_task("development")
|
||||||
|
advance_stage(task_id, "development", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent=None)
|
||||||
|
assert stage_engine.notify_qg_failure.called
|
||||||
|
assert stage_engine.plane_notify_qg.called
|
||||||
|
|
||||||
|
def test_launcher_path_no_generic_qg_notification(self, monkeypatch):
|
||||||
|
"""finished_agent set -> NO generic QG notification (launcher parity)."""
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
|
||||||
|
)
|
||||||
|
task_id = _make_task("architecture")
|
||||||
|
advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent="architect")
|
||||||
|
assert not stage_engine.notify_qg_failure.called
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Reviewer REQUEST_CHANGES -> rollback to development + enqueue developer
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestReviewerRequestChanges:
|
||||||
|
def test_rollback_and_enqueue_developer(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS,
|
||||||
|
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
|
||||||
|
)
|
||||||
|
task_id = _make_task("review")
|
||||||
|
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent="reviewer")
|
||||||
|
assert res.advanced is False
|
||||||
|
assert res.rolled_back_to == "development"
|
||||||
|
assert _stage(task_id) == "development"
|
||||||
|
jobs = _jobs()
|
||||||
|
assert len(jobs) == 1
|
||||||
|
assert jobs[0]["agent"] == "developer"
|
||||||
|
|
||||||
|
def test_retry_over_3_alerts_no_enqueue(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS,
|
||||||
|
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
|
||||||
|
)
|
||||||
|
task_id = _make_task("review")
|
||||||
|
_add_developer_runs(task_id, 3) # already at the max
|
||||||
|
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent="reviewer")
|
||||||
|
assert res.rolled_back_to == "development"
|
||||||
|
assert res.alerted is True
|
||||||
|
assert stage_engine.send_telegram.called
|
||||||
|
# No new developer job enqueued past the retry cap.
|
||||||
|
assert _jobs() == []
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Tester FAIL -> rollback to development + enqueue developer
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestTesterFail:
|
||||||
|
def test_rollback_and_enqueue_developer(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("2 tests failed")},
|
||||||
|
)
|
||||||
|
task_id = _make_task("testing")
|
||||||
|
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent="tester")
|
||||||
|
assert res.advanced is False
|
||||||
|
assert res.rolled_back_to == "development"
|
||||||
|
assert _stage(task_id) == "development"
|
||||||
|
jobs = _jobs()
|
||||||
|
assert len(jobs) == 1
|
||||||
|
assert jobs[0]["agent"] == "developer"
|
||||||
|
|
||||||
|
def test_retry_over_3_blocks_and_alerts(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("still failing")},
|
||||||
|
)
|
||||||
|
task_id = _make_task("testing")
|
||||||
|
_add_developer_runs(task_id, 3)
|
||||||
|
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent="tester")
|
||||||
|
assert res.rolled_back_to == "development"
|
||||||
|
assert res.alerted is True
|
||||||
|
assert stage_engine.set_issue_blocked.called
|
||||||
|
assert _jobs() == []
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Architect conflict -> rollback to analysis + enqueue analyst
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestArchitectConflict:
|
||||||
|
def test_conflict_rolls_back_to_analysis(self, monkeypatch, tmp_path):
|
||||||
|
# 10-conflict.md must exist in the worktree path the engine inspects.
|
||||||
|
wt = tmp_path / "wt"
|
||||||
|
conflict_dir = wt / "docs" / "work-items" / "ET-001"
|
||||||
|
conflict_dir.mkdir(parents=True)
|
||||||
|
(conflict_dir / "10-conflict.md").write_text("conflict with TRZ")
|
||||||
|
|
||||||
|
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("conflict")},
|
||||||
|
)
|
||||||
|
task_id = _make_task("architecture")
|
||||||
|
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent="architect")
|
||||||
|
assert res.advanced is False
|
||||||
|
assert res.rolled_back_to == "analysis"
|
||||||
|
assert _stage(task_id) == "analysis"
|
||||||
|
jobs = _jobs()
|
||||||
|
assert len(jobs) == 1
|
||||||
|
assert jobs[0]["agent"] == "analyst"
|
||||||
|
|
||||||
|
def test_no_conflict_file_no_rollback(self, monkeypatch, tmp_path):
|
||||||
|
wt = tmp_path / "wt"
|
||||||
|
(wt / "docs").mkdir(parents=True)
|
||||||
|
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("incomplete")},
|
||||||
|
)
|
||||||
|
task_id = _make_task("architecture")
|
||||||
|
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent="architect")
|
||||||
|
assert res.advanced is False
|
||||||
|
assert res.rolled_back_to is None
|
||||||
|
assert _stage(task_id) == "architecture"
|
||||||
|
assert _jobs() == []
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Analyst approved-flow (analysis gate): never auto-advances
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestAnalysisApprovedFlow:
|
||||||
|
def test_artifacts_ready_requests_approval_no_advance(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS, "check_analysis_complete": _pass},
|
||||||
|
)
|
||||||
|
task_id = _make_task("analysis")
|
||||||
|
res = advance_stage(task_id, "analysis", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent="analyst")
|
||||||
|
assert res.advanced is False
|
||||||
|
assert _stage(task_id) == "analysis"
|
||||||
|
assert stage_engine.set_issue_in_review.called
|
||||||
|
assert stage_engine.notify_approve_requested.called
|
||||||
|
assert _jobs() == []
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# launcher + plane both delegate to the engine
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestDelegation:
|
||||||
|
def test_launcher_calls_engine(self):
|
||||||
|
from src.agents.launcher import AgentLauncher
|
||||||
|
task_id = _make_task("development", branch="feature/ET-777-deleg")
|
||||||
|
with patch("src.stage_engine.advance_stage") as m:
|
||||||
|
AgentLauncher()._try_advance_stage(
|
||||||
|
run_id=1, agent="developer", repo="enduro-trails",
|
||||||
|
branch="feature/ET-777-deleg",
|
||||||
|
)
|
||||||
|
m.assert_called_once()
|
||||||
|
kwargs = m.call_args.kwargs
|
||||||
|
assert kwargs["task_id"] == task_id
|
||||||
|
assert kwargs["current_stage"] == "development"
|
||||||
|
assert kwargs["finished_agent"] == "developer"
|
||||||
|
|
||||||
|
def test_plane_calls_engine(self):
|
||||||
|
import asyncio
|
||||||
|
from src.webhooks import plane as plane_mod
|
||||||
|
with patch("src.stage_engine.advance_stage") as m:
|
||||||
|
asyncio.run(
|
||||||
|
plane_mod._try_advance_stage(
|
||||||
|
task_id=5, current_stage="analysis", repo="enduro-trails",
|
||||||
|
work_item_id="ET-001", branch="feature/ET-001-x",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
m.assert_called_once()
|
||||||
|
# plane passes positional args; finished_agent (last positional) is None.
|
||||||
|
args = m.call_args.args
|
||||||
|
assert args[0] == 5
|
||||||
|
assert args[1] == "analysis"
|
||||||
|
assert args[-1] is None
|
||||||
277
tests/test_webhook_dedup.py
Normal file
277
tests/test_webhook_dedup.py
Normal file
@@ -0,0 +1,277 @@
|
|||||||
|
"""ORCH-5 (M-7): webhook delivery de-duplication tests.
|
||||||
|
|
||||||
|
A retried/replayed webhook delivery must be processed exactly once. We mock
|
||||||
|
enqueue_job (imported into the gitea/plane module namespaces) and assert its
|
||||||
|
call_count does not grow on a repeat. HMAC is bypassed here by forcing the
|
||||||
|
webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate
|
||||||
|
baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature
|
||||||
|
guarantee by re-enabling the secret.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from unittest.mock import patch, AsyncMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Override DB path + project registry BEFORE importing app (same pattern as
|
||||||
|
# tests/test_webhooks.py).
|
||||||
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db")
|
||||||
|
os.environ["ORCH_DB_PATH"] = _test_db
|
||||||
|
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||||
|
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
||||||
|
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||||
|
os.environ["ORCH_GITEA_OWNER"] = "admin"
|
||||||
|
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
|
||||||
|
os.environ["ORCH_PROJECTS_JSON"] = (
|
||||||
|
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
|
||||||
|
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
|
||||||
|
)
|
||||||
|
|
||||||
|
from fastapi.testclient import TestClient # noqa: E402
|
||||||
|
from src.main import app # noqa: E402
|
||||||
|
from src.db import init_db, get_db # noqa: E402
|
||||||
|
from src import db as db_module # noqa: E402
|
||||||
|
from src.webhooks import gitea as gitea_mod # noqa: E402
|
||||||
|
from src.webhooks import plane as plane_mod # noqa: E402
|
||||||
|
from src import projects as projects_mod # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def setup_db(monkeypatch):
|
||||||
|
# settings is a process-wide singleton; another test module may have fixed
|
||||||
|
# settings.db_path to its own file at import time. get_db() reads it live, so
|
||||||
|
# pin it to OUR db for the duration of each test here.
|
||||||
|
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
init_db()
|
||||||
|
yield
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def proj_registry():
|
||||||
|
"""Pin the shared project registry to proj-1/enduro-trails.
|
||||||
|
|
||||||
|
The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton
|
||||||
|
built at import; test_projects.py rebuilds it via reload_projects(), which can
|
||||||
|
leave it on the built-in default where proj-1 is unknown -> ORCH-6 would
|
||||||
|
ignore our fixtures. Force ours for each test, then rebuild after.
|
||||||
|
"""
|
||||||
|
os.environ["ORCH_PROJECTS_JSON"] = (
|
||||||
|
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
|
||||||
|
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
|
||||||
|
)
|
||||||
|
projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"]
|
||||||
|
projects_mod.reload_projects()
|
||||||
|
yield
|
||||||
|
projects_mod.reload_projects()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def no_hmac(monkeypatch):
|
||||||
|
"""Bypass HMAC so dedup behavior (not signing) is under test.
|
||||||
|
|
||||||
|
settings is shared, so override the secret on the module-level settings that
|
||||||
|
each verify_* function reads.
|
||||||
|
"""
|
||||||
|
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False)
|
||||||
|
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False)
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
|
||||||
|
def _events_count():
|
||||||
|
conn = get_db()
|
||||||
|
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
||||||
|
conn.close()
|
||||||
|
return n
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Migration
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_migration_adds_delivery_id_and_index():
|
||||||
|
"""events has delivery_id + a partial unique index idx_events_delivery."""
|
||||||
|
conn = get_db()
|
||||||
|
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
|
||||||
|
idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()]
|
||||||
|
conn.close()
|
||||||
|
assert "delivery_id" in cols
|
||||||
|
assert "idx_events_delivery" in idxs
|
||||||
|
|
||||||
|
|
||||||
|
def test_migration_on_old_db_without_column_does_not_crash():
|
||||||
|
"""init_db() over a pre-existing events table WITHOUT delivery_id is safe."""
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
import sqlite3
|
||||||
|
conn = sqlite3.connect(_test_db)
|
||||||
|
# Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id.
|
||||||
|
conn.executescript(
|
||||||
|
"""
|
||||||
|
CREATE TABLE events (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
timestamp TEXT DEFAULT (datetime('now')),
|
||||||
|
source TEXT NOT NULL,
|
||||||
|
event_type TEXT NOT NULL,
|
||||||
|
payload TEXT NOT NULL,
|
||||||
|
processed INTEGER DEFAULT 0
|
||||||
|
);
|
||||||
|
INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}');
|
||||||
|
INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}');
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
# Should add the column + index without raising and keep the legacy rows.
|
||||||
|
init_db()
|
||||||
|
|
||||||
|
conn = get_db()
|
||||||
|
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
|
||||||
|
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
||||||
|
conn.close()
|
||||||
|
assert "delivery_id" in cols
|
||||||
|
assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Gitea dedup
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@patch.object(gitea_mod, "enqueue_job")
|
||||||
|
def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue):
|
||||||
|
"""Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}."""
|
||||||
|
# Task at architecture so the ADR push would enqueue.
|
||||||
|
conn = get_db()
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?)",
|
||||||
|
("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
body = {
|
||||||
|
"ref": "refs/heads/feature/ET-100-x",
|
||||||
|
"repository": {"name": "enduro-trails"},
|
||||||
|
"commits": [
|
||||||
|
{"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"}
|
||||||
|
|
||||||
|
r1 = client.post("/webhook/gitea", json=body, headers=hdrs)
|
||||||
|
assert r1.status_code == 200
|
||||||
|
assert r1.json()["status"] == "accepted"
|
||||||
|
assert mock_enqueue.call_count == 1
|
||||||
|
assert _events_count() == 1
|
||||||
|
|
||||||
|
# Same delivery id again -> duplicate, no new enqueue, no new event row.
|
||||||
|
r2 = client.post("/webhook/gitea", json=body, headers=hdrs)
|
||||||
|
assert r2.status_code == 200
|
||||||
|
assert r2.json()["status"] == "duplicate"
|
||||||
|
assert mock_enqueue.call_count == 1
|
||||||
|
assert _events_count() == 1
|
||||||
|
|
||||||
|
|
||||||
|
@patch.object(gitea_mod, "enqueue_job")
|
||||||
|
def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue):
|
||||||
|
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
|
||||||
|
r1 = client.post("/webhook/gitea", json=body,
|
||||||
|
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"})
|
||||||
|
r2 = client.post("/webhook/gitea", json=body,
|
||||||
|
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"})
|
||||||
|
assert r1.json()["status"] == "accepted"
|
||||||
|
assert r2.json()["status"] == "accepted"
|
||||||
|
assert _events_count() == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_gitea_fallback_hash_when_no_delivery_header():
|
||||||
|
"""No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate."""
|
||||||
|
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
|
||||||
|
r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
|
||||||
|
r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
|
||||||
|
assert r1.json()["status"] == "accepted"
|
||||||
|
assert r2.json()["status"] == "duplicate"
|
||||||
|
assert _events_count() == 1
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Plane dedup
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@patch.object(plane_mod, "enqueue_job")
|
||||||
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
|
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
|
||||||
|
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate."""
|
||||||
|
body = {
|
||||||
|
"event": "work_item.created",
|
||||||
|
"data": {
|
||||||
|
"id": "pd-001",
|
||||||
|
"name": "Dedup plane task",
|
||||||
|
"description_stripped": "A sufficiently long description for QG-0 to pass.",
|
||||||
|
"project": "proj-1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r1 = client.post("/webhook/plane", json=body)
|
||||||
|
assert r1.status_code == 200
|
||||||
|
assert r1.json()["status"] == "accepted"
|
||||||
|
assert mock_enqueue.call_count == 1
|
||||||
|
assert _events_count() == 1
|
||||||
|
|
||||||
|
r2 = client.post("/webhook/plane", json=body)
|
||||||
|
assert r2.status_code == 200
|
||||||
|
assert r2.json()["status"] == "duplicate"
|
||||||
|
assert mock_enqueue.call_count == 1 # not re-enqueued
|
||||||
|
assert _events_count() == 1
|
||||||
|
|
||||||
|
|
||||||
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
|
def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch):
|
||||||
|
"""ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}."""
|
||||||
|
body = {
|
||||||
|
"event": "work_item.created",
|
||||||
|
"data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"},
|
||||||
|
}
|
||||||
|
r1 = client.post("/webhook/plane", json=body)
|
||||||
|
assert r1.status_code == 200
|
||||||
|
assert r1.json()["status"] == "ignored"
|
||||||
|
# Event WAS logged (dedup happens before the project filter), so a retry of the
|
||||||
|
# SAME body is a duplicate, not re-evaluated.
|
||||||
|
assert _events_count() == 1
|
||||||
|
r2 = client.post("/webhook/plane", json=body)
|
||||||
|
assert r2.json()["status"] == "duplicate"
|
||||||
|
assert _events_count() == 1
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# HMAC still guarded (acceptance #4) — independent of the dedup path
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_gitea_invalid_signature_still_401(monkeypatch):
|
||||||
|
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False)
|
||||||
|
r = client.post(
|
||||||
|
"/webhook/gitea",
|
||||||
|
json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []},
|
||||||
|
headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"},
|
||||||
|
)
|
||||||
|
assert r.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
def test_plane_invalid_signature_still_401(monkeypatch):
|
||||||
|
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False)
|
||||||
|
r = client.post(
|
||||||
|
"/webhook/plane",
|
||||||
|
json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}},
|
||||||
|
headers={"X-Plane-Signature": "deadbeef"},
|
||||||
|
)
|
||||||
|
assert r.status_code == 401
|
||||||
Reference in New Issue
Block a user